This commit is contained in:
louzefeng
2024-07-09 18:38:56 +00:00
parent 8bafaef34d
commit bf99793fd0
6071 changed files with 1017944 additions and 0 deletions

View File

@@ -0,0 +1,352 @@
<audio id="audio" title="06 | 请求通道如何实现Kafka请求队列" controls="" preload="none"><source id="mp3" src="https://static001.geekbang.org/resource/audio/10/27/10062b1488ba1dacaf12e89c3f717f27.mp3"></audio>
你好,我是胡夕。日志模块我们已经讲完了,你掌握得怎样了呢?如果你在探索源码的过程中碰到了问题,记得在留言区里写下你的困惑,我保证做到知无不言。
现在让我们开启全新的“请求处理模块”的源码学习之旅。坦率地讲这是我自己给Kafka源码划分的模块在源码中可没有所谓的“请求处理模块”的提法。但我始终觉得这样划分能够帮助你清晰地界定不同部分的源码的作用可以让你的阅读更有针对性学习效果也会事半功倍。
在这个模块我会带你了解请求处理相关的重点内容包括请求处理通道、请求处理全流程分析和请求入口类分析等。今天我们先来学习下Kafka是如何实现请求队列的。源码中位于core/src/main/scala/kafka/network下的RequestChannel.scala文件是主要的实现类。
当我们说到Kafka服务器端也就是Broker的时候往往会说它承担着消息持久化的功能但本质上它其实就是**一个不断接收外部请求、处理请求然后发送处理结果的Java进程**。
你可能会觉得奇怪Broker不是用Scala语言编写的吗怎么这里又是Java进程了呢这是因为Scala代码被编译之后生成.class文件它和Java代码被编译后的效果是一样的因此Broker启动后也仍然是一个普通的Java进程。
**高效地保存排队中的请求是确保Broker高处理性能的关键**。既然这样那你一定很想知道Broker上的请求队列是怎么实现的呢接下来我们就一起看下**Broker底层请求对象的建模**和**请求队列的实现原理**以及Broker**请求处理方面的核心监控指标**。
目前Broker与Clients进行交互主要是基于Request/Response机制所以我们很有必要学习一下源码是如何建模或定义Request和Response的。
## 请求Request
我们先来看一下RequestChannel源码中的Request定义代码。
```
sealed trait BaseRequest
case object ShutdownRequest extends BaseRequest
class Request(val processor: Int,
val context: RequestContext,
val startTimeNanos: Long,
memoryPool: MemoryPool,
@volatile private var buffer: ByteBuffer,
metrics: RequestChannel.Metrics) extends BaseRequest {
......
}
```
简单提一句Scala语言中的“trait”关键字大致类似于Java中的interface接口。从代码中我们可以知道BaseRequest是一个trait接口定义了基础的请求类型。它有两个实现类**ShutdownRequest类**和**Request类**。
ShutdownRequest仅仅起到一个标志位的作用。当Broker进程关闭时请求处理器RequestHandler在第9讲我会具体讲到它会发送ShutdownRequest到专属的请求处理线程。该线程接收到此请求后会主动触发一系列的Broker关闭逻辑。
Request则是真正定义各类Clients端或Broker端请求的实现类。它定义的属性包括processor、context、startTimeNanos、memoryPool、buffer和metrics。下面我们一一来看。
### processor
processor是Processor线程的序号即**这个请求是由哪个Processor线程接收处理的**。Broker端参数num.network.threads控制了Broker每个监听器上创建的Processor线程数。
假设你的listeners配置为PLAINTEXT://localhost:9092,SSL://localhost:9093那么在默认情况下Broker启动时会创建6个Processor线程每3个为一组分别给listeners参数中设置的两个监听器使用每组的序号分别是0、1、2。
你可能会问为什么要保存Processor线程序号呢这是因为当Request被后面的I/O线程处理完成后还要依靠Processor线程发送Response给请求发送方因此Request中必须记录它之前是被哪个Processor线程接收的。另外这里我们要先明确一点**Processor线程仅仅是网络接收线程不会执行真正的Request请求处理逻辑**那是I/O线程负责的事情。
### context
**context是用来标识请求上下文信息的**。Kafka源码中定义了RequestContext类顾名思义它保存了有关Request的所有上下文信息。RequestContext类定义在clients工程中下面是它主要的逻辑代码。我用注释的方式解释下主体代码的含义。
```
public class RequestContext implements AuthorizableRequestContext {
public final RequestHeader header; // Request头部数据主要是一些对用户不可见的元数据信息如Request类型、Request API版本、clientId等
public final String connectionId; // Request发送方的TCP连接串标识由Kafka根据一定规则定义主要用于表示TCP连接
public final InetAddress clientAddress; // Request发送方IP地址
public final KafkaPrincipal principal; // Kafka用户认证类用于认证授权
public final ListenerName listenerName; // 监听器名称可以是预定义的监听器如PLAINTEXT也可自行定义
public final SecurityProtocol securityProtocol; // 安全协议类型目前支持4种PLAINTEXT、SSL、SASL_PLAINTEXT、SASL_SSL
public final ClientInformation clientInformation; // 用户自定义的一些连接方信息
// 从给定的ByteBuffer中提取出Request和对应的Size值
public RequestAndSize parseRequest(ByteBuffer buffer) {
......
}
// 其他Getter方法
......
}
```
### startTimeNanos
**startTimeNanos记录了Request对象被创建的时间主要用于各种时间统计指标的计算**
请求对象中的很多JMX指标特别是时间类的统计指标都需要使用startTimeNanos字段。你要注意的是**它是以纳秒为单位的时间戳信息,可以实现非常细粒度的时间统计精度**。
### memoryPool
memoryPool表示源码定义的一个非阻塞式的内存缓冲区主要作用是**避免Request对象无限使用内存**。
当前该内存缓冲区的接口类和实现类分别是MemoryPool和SimpleMemoryPool。你可以重点关注下SimpleMemoryPool的tryAllocate方法看看它是怎么为Request对象分配内存的。
### buffer
buffer是真正保存Request对象内容的字节缓冲区。Request发送方必须按照Kafka RPC协议规定的格式向该缓冲区写入字节否则将抛出InvalidRequestException异常。**这个逻辑主要是由RequestContext的parseRequest方法实现的**。
```
public RequestAndSize parseRequest(ByteBuffer buffer) {
if (isUnsupportedApiVersionsRequest()) {
// 不支持的ApiVersions请求类型被视为是V0版本的请求并且不做解析操作直接返回
ApiVersionsRequest apiVersionsRequest = new ApiVersionsRequest(new ApiVersionsRequestData(), (short) 0, header.apiVersion());
return new RequestAndSize(apiVersionsRequest, 0);
} else {
// 从请求头部数据中获取ApiKeys信息
ApiKeys apiKey = header.apiKey();
try {
// 从请求头部数据中获取版本信息
short apiVersion = header.apiVersion();
// 解析请求
Struct struct = apiKey.parseRequest(apiVersion, buffer);
AbstractRequest body = AbstractRequest.parseRequest(apiKey, apiVersion, struct);
// 封装解析后的请求对象以及请求大小返回
return new RequestAndSize(body, struct.sizeOf());
} catch (Throwable ex) {
// 解析过程中出现任何问题都视为无效请求,抛出异常
throw new InvalidRequestException(&quot;Error getting request for apiKey: &quot; + apiKey +
&quot;, apiVersion: &quot; + header.apiVersion() +
&quot;, connectionId: &quot; + connectionId +
&quot;, listenerName: &quot; + listenerName +
&quot;, principal: &quot; + principal, ex);
}
}
}
```
就像前面说过的,这个方法的主要目的是**从ByteBuffer中提取对应的Request对象以及它的大小**。
首先代码会判断该Request是不是Kafka不支持的ApiVersions请求版本。如果是不支持的就直接构造一个V0版本的ApiVersions请求然后返回。否则的话就继续下面的代码。
这里我稍微解释一下ApiVersions请求的作用。当Broker接收到一个ApiVersionsRequest的时候它会返回Broker当前支持的请求类型列表包括请求类型名称、支持的最早版本号和最新版本号。如果你查看Kafka的bin目录的话你应该能找到一个名为kafka-broker-api-versions.sh的脚本工具。它的实现原理就是构造ApiVersionsRequest对象然后发送给对应的Broker。
你可能会问如果是ApiVersions类型的请求代码中为什么要判断一下它的版本呢这是因为和处理其他类型请求不同的是Kafka必须保证版本号比最新支持版本还要高的ApiVersions请求也能被处理。这主要是考虑到了客户端和服务器端版本的兼容问题。客户端发送请求给Broker的时候很可能不知道Broker到底支持哪些版本的请求它需要使用ApiVersionsRequest去获取完整的请求版本支持列表。但是如果不做这个判断Broker可能无法处理客户端发送的ApiVersionsRequest。
通过这个检查之后代码会从请求头部数据中获取ApiKeys信息以及对应的版本信息然后解析请求最后封装解析后的请求对象以及请求大小并返回。
### metrics
metrics是Request相关的各种监控指标的一个管理类。它里面构建了一个Map封装了所有的请求JMX指标。除了上面这些重要的字段属性之外Request类中的大部分代码都是与监控指标相关的后面我们再详细说。
## 响应Response
说完了Request代码我们再来说下Response。Kafka为Response定义了1个抽象父类和5个具体子类如下图所示
<img src="https://static001.geekbang.org/resource/image/a0/6a/a03ecdba3c118efbc3910b5a1badc96a.jpg" alt="">
看到这么多类,你可能会有点蒙,这些都是干吗的呢?别着急,现在我分别给你介绍下各个类的作用。
- Response定义Response的抽象基类。每个Response对象都包含了对应的Request对象。这个类里最重要的方法是onComplete方法用来实现每类Response被处理后需要执行的回调逻辑。
- SendResponseKafka大多数Request处理完成后都需要执行一段回调逻辑SendResponse就是保存返回结果的Response子类。里面最重要的字段是**onCompletionCallback**,即**指定处理完成之后的回调逻辑**。
- NoResponse有些Request处理完成后无需单独执行额外的回调逻辑。NoResponse就是为这类Response准备的。
- CloseConnectionResponse用于出错后需要关闭TCP连接的场景此时返回CloseConnectionResponse给Request发送方显式地通知它关闭连接。
- StartThrottlingResponse用于通知Broker的Socket Server组件后面几节课我会讲到它某个TCP连接通信通道开始被限流throttling
- EndThrottlingResponse与StartThrottlingResponse对应通知Broker的SocketServer组件某个TCP连接通信通道的限流已结束。
你可能又会问了“这么多类我都要掌握吗”其实是不用的。你只要了解SendResponse表示正常需要发送Response而NoResponse表示无需发送Response就可以了。至于CloseConnectionResponse它是用于标识关闭连接通道的Response。而后面两个Response类不是很常用它们仅仅在对Socket连接进行限流时才会派上用场这里我就不具体展开讲了。
Okay现在我们看下Response相关的代码部分。
```
abstract class Response(val request: Request) {
locally {
val nowNs = Time.SYSTEM.nanoseconds
request.responseCompleteTimeNanos = nowNs
if (request.apiLocalCompleteTimeNanos == -1L)
request.apiLocalCompleteTimeNanos = nowNs
}
def processor: Int = request.processor
def responseString: Option[String] = Some(&quot;&quot;)
def onComplete: Option[Send =&gt; Unit] = None
override def toString: String
}
```
这个抽象基类只有一个属性字段request。这就是说**每个Response对象都要保存它对应的Request对象**。我在前面说过onComplete方法是调用指定回调逻辑的地方。SendResponse类就是复写Override了这个方法如下所示
```
class SendResponse(request: Request,
val responseSend: Send,
val responseAsString: Option[String],
val onCompleteCallback: Option[Send =&gt; Unit])
extends Response(request) {
......
override def onComplete: Option[Send =&gt; Unit] = onCompleteCallback
}
```
这里的SendResponse类继承了Response父类并重新定义了onComplete方法。复写的逻辑很简单就是指定输入参数onCompleteCallback。其实方法本身没有什么可讲的反倒是这里的Scala语法值得多说几句。
Scala中的Unit类似于Java中的void而“Send =&gt; Unit”表示一个方法。这个方法接收一个Send类实例然后执行一段代码逻辑。Scala是函数式编程语言函数在Scala中是“一等公民”因此你可以把一个函数作为一个参数传给另一个函数也可以把函数作为结果返回。这里的onComplete方法就应用了第二种用法也就是把函数赋值给另一个函数并作为结果返回。这样做的好处在于你可以灵活地变更onCompleteCallback来实现不同的回调逻辑。
## RequestChannel
RequestChannel顾名思义就是传输Request/Response的通道。有了Request和Response的基础下面我们可以学习RequestChannel类的实现了。
我们先看下RequestChannel类的定义和重要的字段属性。
```
class RequestChannel(val queueSize: Int, val metricNamePrefix : String) extends KafkaMetricsGroup {
import RequestChannel._
val metrics = new RequestChannel.Metrics
private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
private val processors = new ConcurrentHashMap[Int, Processor]()
val requestQueueSizeMetricName = metricNamePrefix.concat(RequestQueueSizeMetric)
val responseQueueSizeMetricName = metricNamePrefix.concat(ResponseQueueSizeMetric)
......
}
```
RequestChannel类实现了KafkaMetricsGroup trait后者封装了许多实用的指标监控方法比如newGauge方法用于创建数值型的监控指标newHistogram方法用于创建直方图型的监控指标。
**就RequestChannel类本身的主体功能而言它定义了最核心的3个属性requestQueue、queueSize和processors**。下面我分别解释下它们的含义。
每个RequestChannel对象实例创建时会定义一个队列来保存Broker接收到的各类请求这个队列被称为请求队列或Request队列。Kafka使用**Java提供的阻塞队列ArrayBlockingQueue**实现这个请求队列并利用它天然提供的线程安全性来保证多个线程能够并发安全高效地访问请求队列。在代码中这个队列由变量requestQueue定义。
而字段queueSize就是Request队列的最大长度。当Broker启动时SocketServer组件会创建RequestChannel对象并把Broker端参数queued.max.requests赋值给queueSize。因此在默认情况下每个RequestChannel上的队列长度是500。
字段processors封装的是RequestChannel下辖的Processor线程池。每个Processor线程负责具体的请求处理逻辑。下面我详细说说Processor的管理。
### Processor管理
上面代码中的第六行创建了一个Processor线程池——当然它是用Java的ConcurrentHashMap数据结构去保存的。Map中的Key就是前面我们说的processor序号而Value则对应具体的Processor线程对象。
这个线程池的存在告诉了我们一个事实:**当前Kafka Broker端所有网络线程都是在RequestChannel中维护的**。既然创建了线程池代码中必然要有管理线程池的操作。RequestChannel中的addProcessor和removeProcessor方法就是做这些事的。
```
def addProcessor(processor: Processor): Unit = {
// 添加Processor到Processor线程池
if (processors.putIfAbsent(processor.id, processor) != null)
warn(s&quot;Unexpected processor with processorId ${processor.id}&quot;)
newGauge(responseQueueSizeMetricName,
() =&gt; processor.responseQueueSize,
// 为给定Processor对象创建对应的监控指标
Map(ProcessorMetricTag -&gt; processor.id.toString))
}
def removeProcessor(processorId: Int): Unit = {
processors.remove(processorId) // 从Processor线程池中移除给定Processor线程
removeMetric(responseQueueSizeMetricName, Map(ProcessorMetricTag -&gt; processorId.toString)) // 移除对应Processor的监控指标
}
```
代码很简单基本上就是调用ConcurrentHashMap的putIfAbsent和remove方法分别实现增加和移除线程。每当Broker启动时它都会调用addProcessor方法向RequestChannel对象添加num.network.threads个Processor线程。
如果查询Kafka官方文档的话你就会发现num.network.threads这个参数的更新模式Update Mode是Cluster-wide。这就说明Kafka允许你动态地修改此参数值。比如Broker启动时指定num.network.threads为8之后你通过kafka-configs命令将其修改为3。显然这个操作会减少Processor线程池中的线程数量。在这个场景下removeProcessor方法会被调用。
### 处理Request和Response
除了Processor的管理之外RequestChannel的另一个重要功能是处理**Request和Response**具体表现为收发Request和发送Response。比如收发Request的方法有sendRequest和receiveRequest
```
def sendRequest(request: RequestChannel.Request): Unit = {
requestQueue.put(request)
}
def receiveRequest(timeout: Long): RequestChannel.BaseRequest =
requestQueue.poll(timeout, TimeUnit.MILLISECONDS)
def receiveRequest(): RequestChannel.BaseRequest =
requestQueue.take()
```
所谓的发送Request仅仅是将Request对象放置在Request队列中而已而接收Request则是从队列中取出Request。整个流程构成了一个迷你版的“生产者-消费者”模式然后依靠ArrayBlockingQueue的线程安全性来确保整个过程的线程安全如下所示
<img src="https://static001.geekbang.org/resource/image/b8/cc/b83a2856a7f8e7b895f47e277e007ecc.jpg" alt="">
对于Response而言则没有所谓的接收Response只有发送Response即sendResponse方法。sendResponse是啥意思呢其实就是把Response对象发送出去也就是将Response添加到Response队列的过程。
```
def sendResponse(response: RequestChannel.Response): Unit = {
if (isTraceEnabled) { // 构造Trace日志输出字符串
val requestHeader = response.request.header
val message = response match {
case sendResponse: SendResponse =&gt;
s&quot;Sending ${requestHeader.apiKey} response to client ${requestHeader.clientId} of ${sendResponse.responseSend.size} bytes.&quot;
case _: NoOpResponse =&gt;
s&quot;Not sending ${requestHeader.apiKey} response to client ${requestHeader.clientId} as it's not required.&quot;
case _: CloseConnectionResponse =&gt;
s&quot;Closing connection for client ${requestHeader.clientId} due to error during ${requestHeader.apiKey}.&quot;
case _: StartThrottlingResponse =&gt;
s&quot;Notifying channel throttling has started for client ${requestHeader.clientId} for ${requestHeader.apiKey}&quot;
case _: EndThrottlingResponse =&gt;
s&quot;Notifying channel throttling has ended for client ${requestHeader.clientId} for ${requestHeader.apiKey}&quot;
}
trace(message)
}
// 找出response对应的Processor线程即request当初是由哪个Processor线程处理的
val processor = processors.get(response.processor)
// 将response对象放置到对应Processor线程的Response队列中
if (processor != null) {
processor.enqueueResponse(response)
}
}
```
sendResponse方法的逻辑其实非常简单。
前面的一大段if代码块仅仅是构造Trace日志要输出的内容。根据不同类型的Response代码需要确定要输出的Trace日志内容。
接着代码会找出Response对象对应的Processor线程。当Processor处理完某个Request后会把自己的序号封装进对应的Response对象。一旦找出了之前是由哪个Processor线程处理的代码直接调用该Processor的enqueueResponse方法将Response放入Response队列中等待后续发送。
## 监控指标实现
RequestChannel类还定义了丰富的监控指标用于实时动态地监测Request和Response的性能表现。我们来看下具体指标项都有哪些。
```
object RequestMetrics {
val consumerFetchMetricName = ApiKeys.FETCH.name + &quot;Consumer&quot;
val followFetchMetricName = ApiKeys.FETCH.name + &quot;Follower&quot;
val RequestsPerSec = &quot;RequestsPerSec&quot;
val RequestQueueTimeMs = &quot;RequestQueueTimeMs&quot;
val LocalTimeMs = &quot;LocalTimeMs&quot;
val RemoteTimeMs = &quot;RemoteTimeMs&quot;
val ThrottleTimeMs = &quot;ThrottleTimeMs&quot;
val ResponseQueueTimeMs = &quot;ResponseQueueTimeMs&quot;
val ResponseSendTimeMs = &quot;ResponseSendTimeMs&quot;
val TotalTimeMs = &quot;TotalTimeMs&quot;
val RequestBytes = &quot;RequestBytes&quot;
val MessageConversionsTimeMs = &quot;MessageConversionsTimeMs&quot;
val TemporaryMemoryBytes = &quot;TemporaryMemoryBytes&quot;
val ErrorsPerSec = &quot;ErrorsPerSec&quot;
}
```
可以看到,指标有很多,不过别有压力,我们只要掌握几个重要的就行了。
- **RequestsPerSec**每秒处理的Request数用来评估Broker的繁忙状态。
- **RequestQueueTimeMs**计算Request在Request队列中的平均等候时间单位是毫秒。倘若Request在队列的等待时间过长你通常需要增加后端I/O线程的数量来加快队列中Request的拿取速度。
- **LocalTimeMs**计算Request实际被处理的时间单位是毫秒。一旦定位到这个监控项的值很大你就需要进一步研究Request被处理的逻辑了具体分析到底是哪一步消耗了过多的时间。
- **RemoteTimeMs**Kafka的读写请求PRODUCE请求和FETCH请求逻辑涉及等待其他Broker操作的步骤。RemoteTimeMs计算的就是等待其他Broker完成指定逻辑的时间。因为等待的是其他Broker因此被称为Remote Time。这个监控项非常重要Kafka生产环境中设置acks=all的Producer程序发送消息延时高的主要原因往往就是Remote Time高。因此如果你也碰到了这样的问题不妨先定位一下Remote Time是不是瓶颈。
- **TotalTimeMs**计算Request被处理的完整流程时间。**这是最实用的监控指标,没有之一!**毕竟我们通常都是根据TotalTimeMs来判断系统是否出现问题的。一旦发现了问题我们才会利用前面的几个监控项进一步定位问题的原因。
RequestChannel定义了updateMetrics方法用于实现监控项的更新因为逻辑非常简单我就不展开说了你可以自己阅读一下。
## 总结
好了,又到了总结时间。
今天我带你阅读了Kafka请求队列的实现源码。围绕这个问题我们学习了几个重点内容。
- Request定义了Kafka Broker支持的各类请求。
- Response定义了与Request对应的各类响应。
- RequestChannel实现了Kafka Request队列。
- 监控指标封装了与Request队列相关的重要监控指标。
<img src="https://static001.geekbang.org/resource/image/4b/15/4bf7b31f368743496018b3f21a528b15.jpg" alt="">
希望你结合今天所讲的内容思考一下Request和Response在请求通道甚至是SocketServer中的流转过程这将极大地帮助你了解Kafka是如何处理外部发送的请求的。当然如果你觉得这个有难度也不必着急因为后面我会专门用一节课来告诉你这些内容。
## 课后讨论
如果我想监控Request队列当前的使用情况比如当前已保存了多少个Request你可以结合源码指出应该使用哪个监控指标吗
欢迎你在留言区畅所欲言,跟我交流讨论,也欢迎你把今天的内容分享给你的朋友。

View File

@@ -0,0 +1,532 @@
<audio id="audio" title="07 | SocketServerKafka到底是怎么应用NIO实现网络通信的" controls="" preload="none"><source id="mp3" src="https://static001.geekbang.org/resource/audio/3f/8c/3f354d9a26a03aeb946bbefca5cbd88c.mp3"></audio>
你好我是胡夕。这节课我们来说说Kafka底层的NIO通信机制源码。
在谈到Kafka高性能、高吞吐量实现原理的时候很多人都对它使用了Java NIO这件事津津乐道。实际上搞懂“Kafka究竟是怎么应用NIO来实现网络通信的”不仅是我们掌握Kafka请求全流程处理的前提条件对我们了解Reactor模式的实现大有裨益而且还能帮助我们解决很多实际问题。
比如说当Broker处理速度很慢、需要优化的时候你只有明确知道SocketServer组件的工作原理才能制定出恰当的解决方案并有针对性地给出对应的调优参数。
那么今天我们就一起拿下这个至关重要的NIO通信机制吧。
## 网络通信层
在深入学习Kafka各个网络组件之前我们先从整体上看一下完整的网络通信层架构如下图所示
<img src="https://static001.geekbang.org/resource/image/52/e8/52c3226ad4736751b4b1ccfcb2a09ee8.jpg" alt="">
可以看出Kafka网络通信组件主要由两大部分构成**SocketServer**和**KafkaRequestHandlerPool**。
**SocketServer组件是核心**主要实现了Reactor模式用于处理外部多个Clients这里的Clients指的是广义的Clients可能包含Producer、Consumer或其他Broker的并发请求并负责将处理结果封装进Response中返还给Clients。
**KafkaRequestHandlerPool组件就是我们常说的I/O线程池**里面定义了若干个I/O线程用于执行真实的请求处理逻辑。
两者的交互点在于SocketServer中定义的RequestChannel对象和Processor线程。对了我所说的线程在代码中本质上都是Runnable类型不管是Acceptor类、Processor类还是后面我们会单独讨论的KafkaRequestHandler类。
讲到这里我稍微提示你一下。在第9节课我会给出KafkaRequestHandlerPool线程池的详细介绍。但你现在需要知道的是KafkaRequestHandlerPool线程池定义了多个KafkaRequestHandler线程而KafkaRequestHandler线程是真正处理请求逻辑的地方。和KafkaRequestHandler相比今天所说的Acceptor和Processor线程从某种意义上来说只能算是请求和响应的“搬运工”罢了。
了解了完整的网络通信层架构之后我们要重点关注一下SocketServer组件。**这个组件是Kafka网络通信层中最重要的子模块。它下辖的Acceptor线程、Processor线程和RequestChannel等对象都是实施网络通信的重要组成部分**。你可能会感到意外的是,这套线程组合在源码中有多套,分别具有不同的用途。在下节课,我会具体跟你分享一下,不同的线程组合会被应用到哪些实际场景中。
下面我们进入到SocketServer组件的学习。
## SocketServer概览
SocketServer组件的源码位于Kafka工程的core包下具体位置是src/main/scala/kafka/network路径下的SocketServer.scala文件。
SocketServer.scala可谓是元老级的源码文件了。在Kafka的源码演进历史中很多代码文件进进出出这个文件却一直“坚强地活着”而且还在不断完善。如果翻开它的Git修改历史你会发现它最早的修改提交历史可回溯到2011年8月足见它的资历之老。
目前SocketServer.scala文件是一个近2000行的大文件共有8个代码部分。我使用一张思维导图帮你梳理下
<img src="https://static001.geekbang.org/resource/image/18/18/18b39a0fe7817bf6c2344bf6b49eaa18.jpg" alt="">
乍一看组件有很多但你也不必担心我先对这些组件做个简单的介绍然后我们重点学习一下Acceptor类和Processor类的源码。毕竟**这两个类是实现网络通信的关键部件**。另外今天我给出的都是SocketServer组件的基本情况介绍下节课我再详细向你展示它的定义。
1.**AbstractServerThread类**这是Acceptor线程和Processor线程的抽象基类定义了这两个线程的公有方法如shutdown关闭线程等。我不会重点展开这个抽象类的代码但你要重点关注下CountDownLatch类在线程启动和线程关闭时的作用。
如果你苦于寻找Java线程安全编程的最佳实践案例那一定不要错过CountDownLatch这个类。Kafka中的线程控制代码大量使用了基于CountDownLatch的编程技术依托于它来实现优雅的线程启动、线程关闭等操作。因此我建议你熟练掌握它们并应用到你日后的工作当中去。
2.**Acceptor线程类**这是接收和创建外部TCP连接的线程。每个SocketServer实例只会创建一个Acceptor线程。它的唯一目的就是创建连接并将接收到的Request传递给下游的Processor线程处理。
3.**Processor线程类**这是处理单个TCP连接上所有请求的线程。每个SocketServer实例默认创建若干个num.network.threadsProcessor线程。Processor线程负责将接收到的Request添加到RequestChannel的Request队列上同时还负责将Response返还给Request发送方。
4.**Processor伴生对象类**仅仅定义了一些与Processor线程相关的常见监控指标和常量等如Processor线程空闲率等。
5.**ConnectionQuotas类**是控制连接数配额的类。我们能够设置单个IP创建Broker连接的最大数量以及单个Broker能够允许的最大连接数。
6.**TooManyConnectionsException类**SocketServer定义的一个异常类用于标识连接数配额超限情况。
7.**SocketServer类**实现了对以上所有组件的管理和操作如创建和关闭Acceptor、Processor线程等。
8.**SocketServer伴生对象类**定义了一些有用的常量同时明确了SocketServer组件中的哪些参数是允许动态修改的。
## Acceptor线程
经典的Reactor模式有个Dispatcher的角色接收外部请求并分发给下面的实际处理线程。在Kafka中这个Dispatcher就是Acceptor线程。
我们看下它的定义:
```
private[kafka] class Acceptor(val endPoint: EndPoint,
val sendBufferSize: Int,
val recvBufferSize: Int,
brokerId: Int,
connectionQuotas: ConnectionQuotas,
metricPrefix: String) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
// 创建底层的NIO Selector对象
// Selector对象负责执行底层实际I/O操作如监听连接创建请求、读写请求等
private val nioSelector = NSelector.open()
// Broker端创建对应的ServerSocketChannel实例
// 后续把该Channel向上一步的Selector对象注册
val serverChannel = openServerSocket(endPoint.host, endPoint.port)
// 创建Processor线程池实际上是Processor线程数组
private val processors = new ArrayBuffer[Processor]()
private val processorsStarted = new AtomicBoolean
private val blockedPercentMeter = newMeter(s&quot;${metricPrefix}AcceptorBlockedPercent&quot;,
&quot;blocked time&quot;, TimeUnit.NANOSECONDS, Map(ListenerMetricTag -&gt; endPoint.listenerName.value))
......
}
```
从定义来看Acceptor线程接收5个参数其中比较重要的有3个。
- **endPoint**。它就是你定义的Kafka Broker连接信息比如PLAINTEXT://localhost:9092。Acceptor需要用到endPoint包含的主机名和端口信息创建Server Socket。
- **sendBufferSize**。它设置的是SocketOptions的SO_SNDBUF即用于设置出站Outbound网络I/O的底层缓冲区大小。该值默认是Broker端参数socket.send.buffer.bytes的值即100KB。
- **recvBufferSize**。它设置的是SocketOptions的SO_RCVBUF即用于设置入站Inbound网络I/O的底层缓冲区大小。该值默认是Broker端参数socket.receive.buffer.bytes的值即100KB。
说到这儿我想给你提一个优化建议。如果在你的生产环境中Clients与Broker的通信网络延迟很大比如RTT&gt;10ms那么我建议你调大控制缓冲区大小的两个参数也就是sendBufferSize和recvBufferSize。通常来说默认值100KB太小了。
除了类定义的字段Acceptor线程还有两个非常关键的自定义属性。
- **nioSelector**是Java NIO库的Selector对象实例也是后续所有网络通信组件实现Java NIO机制的基础。如果你不熟悉Java NIO那么我推荐你学习这个系列教程[Java NIO](http://tutorials.jenkov.com/java-nio/index.html)。
- **processors**网络Processor线程池。Acceptor线程在初始化时需要创建对应的网络Processor线程池。可见Processor线程是在Acceptor线程中管理和维护的。
既然如此那它就必须要定义相关的方法。Acceptor代码中提供了3个与Processor相关的方法分别是addProcessors、startProcessors和removeProcessors。鉴于它们的代码都非常简单我用注释的方式给出主体逻辑的步骤
### addProcessors
```
private[network] def addProcessors(
newProcessors: Buffer[Processor], processorThreadPrefix: String): Unit = synchronized {
processors ++= newProcessors // 添加一组新的Processor线程
if (processorsStarted.get) // 如果Processor线程池已经启动
startProcessors(newProcessors, processorThreadPrefix) // 启动新的Processor线程
}
```
### startProcessors
```
private[network] def startProcessors(processorThreadPrefix: String): Unit = synchronized {
if (!processorsStarted.getAndSet(true)) { // 如果Processor线程池未启动
startProcessors(processors, processorThreadPrefix) // 启动给定的Processor线程
}
}
private def startProcessors(processors: Seq[Processor], processorThreadPrefix: String): Unit = synchronized {
processors.foreach { processor =&gt; // 依次创建并启动Processor线程
// 线程命名规范processor线程前缀-kafka-network-thread-broker序号-监听器名称-安全协议-Processor序号
// 假设为序号为0的Broker设置PLAINTEXT://localhost:9092作为连接信息那么3个Processor线程名称分别为
// data-plane-kafka-network-thread-0-ListenerName(PLAINTEXT)-PLAINTEXT-0
// data-plane-kafka-network-thread-0-ListenerName(PLAINTEXT)-PLAINTEXT-1
// data-plane-kafka-network-thread-0-ListenerName(PLAINTEXT)-PLAINTEXT-2
KafkaThread.nonDaemon(s&quot;${processorThreadPrefix}-kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}&quot;, processor).start()
}
}
```
### removeProcessors
```
private[network] def removeProcessors(removeCount: Int, requestChannel: RequestChannel): Unit = synchronized {
// 获取Processor线程池中最后removeCount个线程
val toRemove = processors.takeRight(removeCount)
// 移除最后removeCount个线程
processors.remove(processors.size - removeCount, removeCount)
// 关闭最后removeCount个线程
toRemove.foreach(_.shutdown())
// 在RequestChannel中移除这些Processor
toRemove.foreach(processor =&gt; requestChannel.removeProcessor(processor.id))
}
```
为了更加形象地展示这些方法的逻辑我画了一张图它同时包含了这3个方法的执行流程如下图所示
<img src="https://static001.geekbang.org/resource/image/49/2b/494e5bac80f19a2533bb9e7b30003e2b.jpg" alt="">
刚才我们学到的addProcessors、startProcessors和removeProcessors方法是管理Processor线程用的。应该这么说有了这三个方法Acceptor类就具备了基本的Processor线程池管理功能。不过**Acceptor类逻辑的重头戏其实是run方法它是处理Reactor模式中分发逻辑的主要实现方法**。下面我使用注释的方式给出run方法的大体运行逻辑如下所示
```
def run(): Unit = {
//注册OP_ACCEPT事件
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
// 等待Acceptor线程启动完成
startupComplete()
try {
// 当前使用的Processor序号从0开始最大值是num.network.threads - 1
var currentProcessorIndex = 0
while (isRunning) {
try {
// 每500毫秒获取一次就绪I/O事件
val ready = nioSelector.select(500)
if (ready &gt; 0) { // 如果有I/O事件准备就绪
val keys = nioSelector.selectedKeys()
val iter = keys.iterator()
while (iter.hasNext &amp;&amp; isRunning) {
try {
val key = iter.next
iter.remove()
if (key.isAcceptable) {
// 调用accept方法创建Socket连接
accept(key).foreach { socketChannel =&gt;
var retriesLeft = synchronized(processors.length)
var processor: Processor = null
do {
retriesLeft -= 1
// 指定由哪个Processor线程进行处理
processor = synchronized {
currentProcessorIndex = currentProcessorIndex % processors.length
processors(currentProcessorIndex)
}
// 更新Processor线程序号
currentProcessorIndex += 1
} while (!assignNewConnection(socketChannel, processor, retriesLeft == 0)) // Processor是否接受了该连接
}
} else
throw new IllegalStateException(&quot;Unrecognized key state for acceptor thread.&quot;)
} catch {
case e: Throwable =&gt; error(&quot;Error while accepting connection&quot;, e)
}
}
}
}
catch {
case e: ControlThrowable =&gt; throw e
case e: Throwable =&gt; error(&quot;Error occurred&quot;, e)
}
}
} finally { // 执行各种资源关闭逻辑
debug(&quot;Closing server socket and selector.&quot;)
CoreUtils.swallow(serverChannel.close(), this, Level.ERROR)
CoreUtils.swallow(nioSelector.close(), this, Level.ERROR)
shutdownComplete()
}
}
```
看上去代码似乎有点多我再用一张图来说明一下run方法的主要处理逻辑吧。这里的关键点在于Acceptor线程会先为每个入站请求确定要处理它的Processor线程然后调用assignNewConnection方法令Processor线程创建与发送方的连接。
<img src="https://static001.geekbang.org/resource/image/1c/55/1c8320c702c1e18b37b992cadc61d555.jpg" alt="">
基本上Acceptor线程使用Java NIO的Selector + SocketChannel的方式循环地轮询准备就绪的I/O事件。这里的I/O事件主要是指网络连接创建事件即代码中的SelectionKey.OP_ACCEPT。一旦接收到外部连接请求Acceptor就会指定一个Processor线程并将该请求交由它让它创建真正的网络连接。总的来说Acceptor线程就做这么点事。
## Processor线程
下面我们进入到Processor线程源码的学习。
**如果说Acceptor是做入站连接处理的那么Processor代码则是真正创建连接以及分发请求的地方**。显然它要做的事情远比Acceptor要多得多。我先给出Processor线程的run方法你大致感受一下
```
override def run(): Unit = {
startupComplete() // 等待Processor线程启动完成
try {
while (isRunning) {
try {
configureNewConnections() // 创建新连接
// register any new responses for writing
processNewResponses() // 发送Response并将Response放入到inflightResponses临时队列
poll() // 执行NIO poll获取对应SocketChannel上准备就绪的I/O操作
processCompletedReceives() // 将接收到的Request放入Request队列
processCompletedSends() // 为临时Response队列中的Response执行回调逻辑
processDisconnected() // 处理因发送失败而导致的连接断开
closeExcessConnections() // 关闭超过配额限制部分的连接
} catch {
case e: Throwable =&gt; processException(&quot;Processor got uncaught exception.&quot;, e)
}
}
} finally { // 关闭底层资源
debug(s&quot;Closing selector - processor $id&quot;)
CoreUtils.swallow(closeAll(), this, Level.ERROR)
shutdownComplete()
}
}
```
run方法逻辑被切割得相当好各个子方法的边界非常清楚。因此从整体上看该方法呈现出了面向对象领域中非常难得的封装特性。我使用一张图来展示下该方法要做的事情
<img src="https://static001.geekbang.org/resource/image/1d/42/1d6f59036ea2797bfc1591f57980df42.jpg" alt="">
在详细说run方法之前我们先来看下Processor线程初始化时要做的事情。
每个Processor线程在创建时都会创建3个队列。注意这里的队列是广义的队列其底层使用的数据结构可能是阻塞队列也可能是一个Map对象而已如下所示
```
private val newConnections = new ArrayBlockingQueue[SocketChannel](connectionQueueSize)
private val inflightResponses = mutable.Map[String, RequestChannel.Response]()
private val responseQueue = new LinkedBlockingDeque[RequestChannel.Response]()
```
**队列一newConnections**
**它保存的是要创建的新连接信息**具体来说就是SocketChannel对象。这是一个默认上限是20的队列而且目前代码中硬编码了队列的长度因此你无法变更这个队列的长度。
每当Processor线程接收新的连接请求时都会将对应的SocketChannel放入这个队列。后面在创建连接时也就是调用configureNewConnections时就从该队列中取出SocketChannel然后注册新的连接。
**队列二inflightResponses**
严格来说这是一个临时Response队列。当Processor线程将Response返还给Request发送方之后还要将Response放入这个临时队列。
为什么需要这个临时队列呢这是因为有些Response回调逻辑要在Response被发送回发送方之后才能执行因此需要暂存在一个临时队列里面。这就是inflightResponses存在的意义。
**队列三responseQueue**
看名字我们就可以知道这是Response队列而不是Request队列。这告诉了我们一个事实**每个Processor线程都会维护自己的Response队列**而不是像网上的某些文章说的Response队列是线程共享的或是保存在RequestChannel中的。Response队列里面保存着需要被返还给发送方的所有Response对象。
好了了解了这些之后现在我们来深入地查看一下Processor线程的工作逻辑。根据run方法中的方法调用顺序我先来介绍下configureNewConnections方法。
### configureNewConnections
就像我前面所说的configureNewConnections负责处理新连接请求。接下来我用注释的方式给出这个方法的主体逻辑
```
private def configureNewConnections(): Unit = {
var connectionsProcessed = 0 // 当前已配置的连接数计数器
while (connectionsProcessed &lt; connectionQueueSize &amp;&amp; !newConnections.isEmpty) { // 如果没超配额并且有待处理新连接
val channel = newConnections.poll() // 从连接队列中取出SocketChannel
try {
debug(s&quot;Processor $id listening to new connection from ${channel.socket.getRemoteSocketAddress}&quot;)
// 用给定Selector注册该Channel
// 底层就是调用Java NIO的SocketChannel.register(selector, SelectionKey.OP_READ)
selector.register(connectionId(channel.socket), channel)
connectionsProcessed += 1 // 更新计数器
} catch {
case e: Throwable =&gt;
val remoteAddress = channel.socket.getRemoteSocketAddress
close(listenerName, channel)
processException(s&quot;Processor $id closed connection from $remoteAddress&quot;, e)
}
}
}
```
**该方法最重要的逻辑是调用selector的register来注册SocketChannel**。每个Processor线程都维护了一个Selector类实例。Selector类是社区提供的一个基于Java NIO Selector的接口用于执行非阻塞多通道的网络I/O操作。在核心功能上Kafka提供的Selector和Java提供的是一致的。
### processNewResponses
它负责发送Response给Request发送方并且将Response放入临时Response队列。处理逻辑如下
```
private def processNewResponses(): Unit = {
var currentResponse: RequestChannel.Response = null
while ({currentResponse = dequeueResponse(); currentResponse != null}) { // Response队列中存在待处理Response
val channelId = currentResponse.request.context.connectionId // 获取连接通道ID
try {
currentResponse match {
case response: NoOpResponse =&gt; // 无需发送Response
updateRequestMetrics(response)
trace(s&quot;Socket server received empty response to send, registering for read: $response&quot;)
handleChannelMuteEvent(channelId, ChannelMuteEvent.RESPONSE_SENT)
tryUnmuteChannel(channelId)
case response: SendResponse =&gt; // 发送Response并将Response放入inflightResponses
sendResponse(response, response.responseSend)
case response: CloseConnectionResponse =&gt; // 关闭对应的连接
updateRequestMetrics(response)
trace(&quot;Closing socket connection actively according to the response code.&quot;)
close(channelId)
case _: StartThrottlingResponse =&gt;
handleChannelMuteEvent(channelId, ChannelMuteEvent.THROTTLE_STARTED)
case _: EndThrottlingResponse =&gt;
handleChannelMuteEvent(channelId, ChannelMuteEvent.THROTTLE_ENDED)
tryUnmuteChannel(channelId)
case _ =&gt;
throw new IllegalArgumentException(s&quot;Unknown response type: ${currentResponse.getClass}&quot;)
}
} catch {
case e: Throwable =&gt;
processChannelException(channelId, s&quot;Exception while processing response for $channelId&quot;, e)
}
}
}
```
这里的关键是**SendResponse分支上的sendResponse方法**。这个方法的核心代码其实只有三行:
```
if (openOrClosingChannel(connectionId).isDefined) { // 如果该连接处于可连接状态
selector.send(responseSend) // 发送Response
inflightResponses += (connectionId -&gt; response) // 将Response加入到inflightResponses队列
}
```
### poll
严格来说上面提到的所有发送的逻辑都不是执行真正的发送。真正执行I/O动作的方法是这里的poll方法。
poll方法的核心代码就只有1行**selector.poll(pollTimeout)**。在底层它实际上调用的是Java NIO Selector的select方法去执行那些准备就绪的I/O操作不管是接收Request还是发送Response。因此你需要记住的是**poll方法才是真正执行I/O操作逻辑的地方**。
### processCompletedReceives
它是接收和处理Request的。代码如下
```
private def processCompletedReceives(): Unit = {
// 遍历所有已接收的Request
selector.completedReceives.asScala.foreach { receive =&gt;
try {
// 保证对应连接通道已经建立
openOrClosingChannel(receive.source) match {
case Some(channel) =&gt;
val header = RequestHeader.parse(receive.payload)
if (header.apiKey == ApiKeys.SASL_HANDSHAKE &amp;&amp; channel.maybeBeginServerReauthentication(receive, nowNanosSupplier))
trace(s&quot;Begin re-authentication: $channel&quot;)
else {
val nowNanos = time.nanoseconds()
// 如果认证会话已过期,则关闭连接
if (channel.serverAuthenticationSessionExpired(nowNanos)) {
debug(s&quot;Disconnecting expired channel: $channel : $header&quot;)
close(channel.id)
expiredConnectionsKilledCount.record(null, 1, 0)
} else {
val connectionId = receive.source
val context = new RequestContext(header, connectionId, channel.socketAddress,
channel.principal, listenerName, securityProtocol,
channel.channelMetadataRegistry.clientInformation)
val req = new RequestChannel.Request(processor = id, context = context,
startTimeNanos = nowNanos, memoryPool, receive.payload, requestChannel.metrics)
if (header.apiKey == ApiKeys.API_VERSIONS) {
val apiVersionsRequest = req.body[ApiVersionsRequest]
if (apiVersionsRequest.isValid) {
channel.channelMetadataRegistry.registerClientInformation(new ClientInformation(
apiVersionsRequest.data.clientSoftwareName,
apiVersionsRequest.data.clientSoftwareVersion))
}
}
// 核心代码将Request添加到Request队列
requestChannel.sendRequest(req)
selector.mute(connectionId)
handleChannelMuteEvent(connectionId, ChannelMuteEvent.REQUEST_RECEIVED)
}
}
case None =&gt;
throw new IllegalStateException(s&quot;Channel ${receive.source} removed from selector before processing completed receive&quot;)
}
} catch {
case e: Throwable =&gt;
processChannelException(receive.source, s&quot;Exception while processing request from ${receive.source}&quot;, e)
}
}
}
```
看上去代码有很多但其实最核心的代码就只有1行**requestChannel.sendRequest(req)**也就是将此Request放入Request队列。其他代码只是一些常规化的校验和辅助逻辑。
这个方法的意思是说,**Processor从底层Socket通道不断读取已接收到的网络请求然后转换成Request实例并将其放入到Request队列**。整个逻辑还是很简单的,对吧?
### processCompletedSends
它负责处理Response的回调逻辑。我之前说过Response需要被发送之后才能执行对应的回调逻辑这便是该方法代码要实现的功能
```
private def processCompletedSends(): Unit = {
// 遍历底层SocketChannel已发送的Response
selector.completedSends.asScala.foreach { send =&gt;
try {
// 取出对应inflightResponses中的Response
val response = inflightResponses.remove(send.destination).getOrElse {
throw new IllegalStateException(s&quot;Send for ${send.destination} completed, but not in `inflightResponses`&quot;)
}
updateRequestMetrics(response) // 更新一些统计指标
// 执行回调逻辑
response.onComplete.foreach(onComplete =&gt; onComplete(send))
handleChannelMuteEvent(send.destination, ChannelMuteEvent.RESPONSE_SENT)
tryUnmuteChannel(send.destination)
} catch {
case e: Throwable =&gt; processChannelException(send.destination,
s&quot;Exception while processing completed send to ${send.destination}&quot;, e)
}
}
}
```
这里通过调用Response对象的onComplete方法来实现回调函数的执行。
### processDisconnected
顾名思义,它就是处理已断开连接的。该方法的逻辑很简单,我用注释标注了主要的执行步骤:
```
private def processDisconnected(): Unit = {
// 遍历底层SocketChannel的那些已经断开的连接
selector.disconnected.keySet.asScala.foreach { connectionId =&gt;
try {
// 获取断开连接的远端主机名信息
val remoteHost = ConnectionId.fromString(connectionId).getOrElse {
throw new IllegalStateException(s&quot;connectionId has unexpected format: $connectionId&quot;)
}.remoteHost
// 将该连接从inflightResponses中移除同时更新一些监控指标
inflightResponses.remove(connectionId).foreach(updateRequestMetrics)
// 更新配额数据
connectionQuotas.dec(listenerName, InetAddress.getByName(remoteHost))
} catch {
case e: Throwable =&gt; processException(s&quot;Exception while processing disconnection of $connectionId&quot;, e)
}
}
}
```
比较关键的代码是需要从底层Selector中获取那些已经断开的连接之后把它们从inflightResponses中移除掉同时也要更新它们的配额数据。
### closeExcessConnections
这是Processor线程的run方法执行的最后一步即**关闭超限连接**。代码很简单:
```
private def closeExcessConnections(): Unit = {
// 如果配额超限了
if (connectionQuotas.maxConnectionsExceeded(listenerName)) {
// 找出优先关闭的那个连接
val channel = selector.lowestPriorityChannel()
if (channel != null)
close(channel.id) // 关闭该连接
}
}
```
所谓优先关闭是指在诸多TCP连接中找出最近未被使用的那个。这里“未被使用”就是说在最近一段时间内没有任何Request经由这个连接被发送到Processor线程。
## 总结
今天我带你了解了Kafka网络通信层的全貌大致介绍了核心组件SocketServer还花了相当多的时间研究SocketServer下的Acceptor和Processor线程代码。我们来简单总结一下。
- 网络通信层由SocketServer组件和KafkaRequestHandlerPool组件构成。
- SocketServer实现了Reactor模式用于高性能地并发处理I/O请求。
- SocketServer底层使用了Java的Selector实现NIO通信。
<img src="https://static001.geekbang.org/resource/image/41/51/41317d400ed096bbca8efadf43186f51.jpg" alt="">
在下节课我会重点介绍SocketServer处理不同类型Request所做的设计及其对应的代码。这是社区为了提高Broker处理控制类请求的重大举措也是为了改善Broker一致性所做的努力非常值得我们重点关注。
## 课后讨论
最后请思考这样一个问题为什么Request队列被设计成线程共享的而Response队列则是每个Processor线程专属的
欢迎你在留言区畅所欲言,跟我交流讨论,也欢迎你把今天的内容分享给你的朋友。

View File

@@ -0,0 +1,392 @@
<audio id="audio" title="08 | SocketServer请求还要区分优先级" controls="" preload="none"><source id="mp3" src="https://static001.geekbang.org/resource/audio/7b/ab/7b5aa20bdd75b0d125e89ca827da80ab.mp3"></audio>
你好,我是胡夕。
在上节课我给你详细地介绍了Kafka网络层的架构以及SocketServer组件中的Acceptor线程和Processor线程是如何利用Java NIO实现网络通信的还简单提到了请求队列和响应队列。
今天我们接着说SocketServer源码重点学习下社区是如何对不同类型的请求进行优先级划分的。
## 案例分享
在Kafka中处理请求是不区分优先级的Kafka对待所有请求都一视同仁。**这种绝对公平的策略有时候是有问题的**。我跟你分享一个真实的案例,你就明白了。我敢保证,你在真实的线上系统中一定遇到过类似的问题。
曾经我们在生产环境中创建过一个单分区双副本的主题当时集群中的Broker A机器保存了分区的Leader副本Broker B保存了Follower副本。某天外部业务量激增导致Broker A瞬间积压了大量的未处理PRODUCE请求。更糟的是运维人员“不凑巧”地执行了一次Preferred Leader选举将Broker B显式地调整成了Leader。
这个时候问题就来了如果Producer程序把acks设置为all那么在LeaderAndIsr请求它是负责调整副本角色的比如Follower和Leader角色转换等之前积压的那些PRODUCE请求就无法正常完成了因为这些请求要一直等待ISR中所有Follower副本同步完成。
但是此时Broker B成为了Leader它上面的副本停止了拉取消息这就可能出现一种结果这些未完成的PRODUCE请求会一直保存在Broker A上的Purgatory缓存中。Leader/Follower的角色转换导致无法完成副本间同步所以这些请求无法被成功处理最终Broker A抛出超时异常返回给Producer程序。
值得一提的是Purgatory缓存是Broker端暂存延时请求的地方。课程后面我会详细介绍这个组件。
这个问题就是对请求不区分优先级造成的后来我们在SocketServer源码中确认了此事。同时结合阅读源码得到的知识我在Jira官网搜到了对应的[Jira ticket](https://issues.apache.org/jira/browse/KAFKA-4453),进而完整地了解了社区是如何解决该问题的。
其实这也是我非常推荐你深入学习Kafka的一个方法**根据实际环境中碰到的问题找到对应的源码,仔细阅读它,形成自己的解决思路,然后去社区印证自己方案的优劣**。在不断地循环这个过程的同时你会发现你对Kafka的代码越来越了解了而且能够很轻松地解决线上环境的各种问题。
说了这么多,相信你已经迫不及待地想要跟我一起阅读这部分源码了,那我们就正式开始吧。
## 必要术语和概念
在阅读SocketServer代码、深入学习请求优先级实现机制之前我们要先掌握一些基本概念这是我们理解后面内容的基础。
**1.Data plane和Control plane**
社区将Kafka请求类型划分为两大类**数据类请求和控制类请求**。Data plane和Control plane的字面意思是数据面和控制面各自对应数据类请求和控制类请求也就是说Data plane负责处理数据类请求Control plane负责处理控制类请求。
目前Controller与Broker交互的请求类型有3种**LeaderAndIsrRequest**、**StopReplicaRequest**和**UpdateMetadataRequest**。这3类请求属于控制类请求通常应该被赋予高优先级。像我们熟知的PRODUCE和FETCH请求就是典型的数据类请求。
**对这两大类请求区分处理是SocketServer源码实现的核心逻辑**
**2.监听器Listener**
目前,**源码区分数据类请求和控制类请求不同处理方式的主要途径,就是通过监听器**。也就是说,创建多组监听器分别来执行数据类和控制类请求的处理代码。
在Kafka中Broker端参数**listeners**和**advertised.listeners**就是用来配置监听器的。在源码中监听器使用EndPoint类来定义如下面代码所示
```
case class EndPoint(host: String, port: Int, listenerName: ListenerName, securityProtocol: SecurityProtocol) {
// 构造完整的监听器连接字符串
// 格式为:监听器名称://主机名:端口
// 比如PLAINTEXT://kafka-host:9092
def connectionString: String = {
val hostport =
if (host == null)
&quot;:&quot;+port
else
Utils.formatAddress(host, port)
listenerName.value + &quot;://&quot; + hostport
}
// clients工程下有一个Java版本的Endpoint类供clients端代码使用
// 此方法是构造Java版本的Endpoint类实例
def toJava: JEndpoint = {
new JEndpoint(listenerName.value, securityProtocol, host, port)
}
}
```
每个EndPoint对象定义了4个属性我们分别来看下。
- hostBroker主机名。
- portBroker端口号。
- listenerName监听器名字。目前预定义的名称包括PLAINTEXT、SSL、SASL_PLAINTEXT和SASL_SSL。Kafka允许你自定义其他监听器名称比如CONTROLLER、INTERNAL等。
- securityProtocol监听器使用的安全协议。Kafka支持4种安全协议分别是**PLAINTEXT**、**SSL**、**SASL_PLAINTEXT**和**SASL_SSL**。
这里简单提一下,**Broker端参数listener.security.protocol.map用于指定不同名字的监听器都使用哪种安全协议**。
我举个例子如果Broker端相应参数配置如下
```
listener.security.protocol.map=CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:SSL
listeners=CONTROLLER://192.1.1.8:9091,INTERNAL://192.1.1.8:9092,EXTERNAL://10.1.1.5:9093
```
那么这就表示Kafka配置了3套监听器名字分别是CONTROLLER、INTERNAL和EXTERNAL使用的安全协议分别是PLAINTEXT、PLAINTEXT和SSL。
有了这些基础知识接下来我们就可以看一下SocketServer是如何实现Data plane与Control plane的分离的。
当然在此之前我们要先了解下SocketServer的定义。
## SocketServer定义
首先我们来看下SocketServer类有哪些基础属性。我使用思维导图给你展示一下跟实现请求优先级相关的字段或属性
<img src="https://static001.geekbang.org/resource/image/95/e5/95e4a958d84263e4606ab096d5695be5.jpg" alt="">
这些字段都是啥意思呢?我们结合代码来看下。
```
class SocketServer(val config: KafkaConfig,
val metrics: Metrics,
val time: Time,
val credentialProvider: CredentialProvider)
extends Logging with KafkaMetricsGroup with BrokerReconfigurable {
// SocketServer实现BrokerReconfigurable trait表明SocketServer的一些参数配置是允许动态修改的
// 即在Broker不停机的情况下修改它们
// SocketServer的请求队列长度由Broker端参数queued.max.requests值而定默认值是500
private val maxQueuedRequests = config.queuedMaxRequests
......
// data-plane
private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]() // 处理数据类请求的Processor线程池
// 处理数据类请求的Acceptor线程池每套监听器对应一个Acceptor线程
private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, Acceptor]()
// 处理数据类请求专属的RequestChannel对象
val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneMetricPrefix)
// control-plane
// 用于处理控制类请求的Processor线程
// 注意目前定义了专属的Processor线程而非线程池处理控制类请求
private var controlPlaneProcessorOpt : Option[Processor] = None
private[network] var controlPlaneAcceptorOpt : Option[Acceptor] = None
// 处理控制类请求专属的RequestChannel对象
val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_ =&gt; new RequestChannel(20, ControlPlaneMetricPrefix))
......
}
```
首先SocketServer类定义了一个maxQueuedRequests字段它定义了请求队列的最大长度。默认值是Broker端queued.max.requests参数值。
其次在上面的代码中你一定看到了SocketServer实现了BrokerReconfigurable接口在Scala中是trait。这就说明SocketServer中的某些配置是允许动态修改值的。如果查看SocketServer伴生对象类的定义的话你能找到下面这些代码
```
object SocketServer {
......
val ReconfigurableConfigs = Set(
KafkaConfig.MaxConnectionsPerIpProp,
KafkaConfig.MaxConnectionsPerIpOverridesProp,
KafkaConfig.MaxConnectionsProp)
......
}
```
根据这段代码我们可以知道Broker端参数max.connections.per.ip、max.connections.per.ip.overrides和max.connections是可以动态修改的。
另外在我们刚刚看的SocketServer定义的那段代码中Data plane和Control plane注释下面分别定义了一组变量即**Processor线程池**、**Acceptor线程池**和**RequestChannel**实例。
- Processor线程池即上节课提到的网络线程池负责将请求高速地放入到请求队列中。
- Acceptor线程池保存了SocketServer为每个监听器定义的Acceptor线程此线程负责分发该监听器上的入站连接建立请求。
- RequestChannel承载请求队列的请求处理通道。
严格地说对于Data plane来说线程池的说法是没有问题的因为Processor线程确实有很多个而Acceptor也可能有多个因为SocketServer会为每个EndPoint即每套监听器创建一个对应的Acceptor线程。
但是对于Control plane而言情况就不一样了。
细心的你一定发现了Control plane那组属性变量都是以Opt结尾的即它们都是Option类型。这说明了一个重要的事实你完全可以不使用Control plane套装即你可以让Kafka不区分请求类型就像2.2.0之前设计的那样。
但是一旦你开启了Control plane设置其Processor线程就只有1个Acceptor线程也是1个。另外你要注意它对应的RequestChannel里面的请求队列长度被硬编码成了20而不是一个可配置的值。这揭示了社区在这里所做的一个假设即**控制类请求的数量应该远远小于数据类请求,因而不需要为它创建线程池和较深的请求队列**。
## 创建Data plane所需资源
知道了SocketServer类的定义之后我们就可以开始学习SocketServer是如何为Data plane和Control plane创建所需资源的操作了。我们先来看为Data plane创建资源。
SocketServer的**createDataPlaneAcceptorsAndProcessors方法**负责为Data plane创建所需资源。我们看下它的实现
```
private def createDataPlaneAcceptorsAndProcessors(
dataProcessorsPerListener: Int, endpoints: Seq[EndPoint]): Unit = {
// 遍历监听器集合
endpoints.foreach { endpoint =&gt;
// 将监听器纳入到连接配额管理之下
connectionQuotas.addListener(config, endpoint.listenerName)
// 为监听器创建对应的Acceptor线程
val dataPlaneAcceptor = createAcceptor(endpoint, DataPlaneMetricPrefix)
// 为监听器创建多个Processor线程。具体数目由num.network.threads决定
addDataPlaneProcessors(dataPlaneAcceptor, endpoint, dataProcessorsPerListener)
// 将&lt;监听器Acceptor线程&gt;对保存起来统一管理
dataPlaneAcceptors.put(endpoint, dataPlaneAcceptor)
info(s&quot;Created data-plane acceptor and processors for endpoint : ${endpoint.listenerName}&quot;)
}
}
```
这段代码的逻辑非常清晰,我用一张图来解释说明下:
<img src="https://static001.geekbang.org/resource/image/b6/60/b6952e86566cdfe92d69e9d96a031560.jpg" alt="">
createDataPlaneAcceptorsAndProcessors方法会遍历你配置的所有监听器然后为每个监听器执行下面的逻辑。
1. 初始化该监听器对应的最大连接数计数器。后续这些计数器将被用来确保没有配额超限的情形发生。
1. 为该监听器创建Acceptor线程也就是调用Acceptor类的构造函数生成对应的Acceptor线程实例。
1. 创建Processor线程池。对于Data plane而言线程池的数量由Broker端参数num.network.threads决定。
1.&lt;监听器Acceptor线程&gt;对加入到Acceptor线程池统一管理。
切记源码会为每套用于Data plane的监听器执行以上这4步。
举个例子假设你配置listeners=PLAINTEXT://localhost:9092, SSL://localhost:9093那么在默认情况下源码会为PLAINTEXT和SSL这两套监听器分别创建一个Acceptor线程和一个Processor线程池。
需要注意的是,具体为哪几套监听器创建是依据配置而定的,最重要的是,**Kafka只会为Data plane所使的监听器创建这些资源**。至于如何指定监听器到底是为Data plane所用还是归Control plane我会再详细说明。
## 创建Control plane所需资源
前面说过了基于控制类请求的负载远远小于数据类请求负载的假设Control plane的配套资源只有1个Acceptor线程 + 1个Processor线程 + 1个深度是20的请求队列而已。和Data plane相比这些配置稍显寒酸不过在大部分情况下应该是够用了。
SocketServer提供了createControlPlaneAcceptorAndProcessor方法用于为Control plane创建所需资源源码如下
```
private def createControlPlaneAcceptorAndProcessor(
endpointOpt: Option[EndPoint]): Unit = {
// 如果为Control plane配置了监听器
endpointOpt.foreach { endpoint =&gt;
// 将监听器纳入到连接配额管理之下
connectionQuotas.addListener(config, endpoint.listenerName)
// 为监听器创建对应的Acceptor线程
val controlPlaneAcceptor = createAcceptor(endpoint, ControlPlaneMetricPrefix)
// 为监听器创建对应的Processor线程
val controlPlaneProcessor = newProcessor(nextProcessorId, controlPlaneRequestChannelOpt.get, connectionQuotas, endpoint.listenerName, endpoint.securityProtocol, memoryPool)
controlPlaneAcceptorOpt = Some(controlPlaneAcceptor)
controlPlaneProcessorOpt = Some(controlPlaneProcessor)
val listenerProcessors = new ArrayBuffer[Processor]()
listenerProcessors += controlPlaneProcessor
// 将Processor线程添加到控制类请求专属RequestChannel中
// 即添加到RequestChannel实例保存的Processor线程池中
controlPlaneRequestChannelOpt.foreach(
_.addProcessor(controlPlaneProcessor))
nextProcessorId += 1
// 把Processor对象也添加到Acceptor线程管理的Processor线程池中
controlPlaneAcceptor.addProcessors(listenerProcessors, ControlPlaneThreadPrefix)
info(s&quot;Created control-plane acceptor and processor for endpoint : ${endpoint.listenerName}&quot;)
}
}
```
我同样使用一张流程图来说明:
<img src="https://static001.geekbang.org/resource/image/69/1d/69ba01a158bcf63be4e7606feb95521d.jpg" alt="">
总体流程和createDataPlaneAcceptorsAndProcessors非常类似只是方法开头需要判断是否配置了用于Control plane的监听器。目前Kafka规定只能有1套监听器用于Control plane而不能像Data plane那样可以配置多套监听器。
如果认真看的话你会发现上面两张图中都没有提到启动Acceptor和Processor线程。那这些线程到底是在什么时候启动呢
实际上Processor和Acceptor线程是在启动SocketServer组件之后启动的具体代码在KafkaServer.scala文件的startup方法中如下所示
```
// KafkaServer.scala
def startup(): Unit = {
try {
info(&quot;starting&quot;)
......
// 创建SocketServer组件
socketServer = new SocketServer(config, metrics, time, credentialProvider)
// 启动SocketServer但不启动Processor线程
socketServer.startup(startProcessingRequests = false)
......
// 启动Data plane和Control plane的所有线程
socketServer.startProcessingRequests(authorizerFutures)
......
} catch {
......
}
}
```
还是没看到启动Acceptor和Processor线程的代码啊实际上SocketServer的startProcessingRequests方法就是启动这些线程的方法。我们看下这个方法的逻辑
```
def startProcessingRequests(authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = Map.empty): Unit = {
info(&quot;Starting socket server acceptors and processors&quot;)
this.synchronized {
if (!startedProcessingRequests) {
// 启动处理控制类请求的Processor和Acceptor线程
startControlPlaneProcessorAndAcceptor(authorizerFutures)
// 启动处理数据类请求的Processor和Acceptor线程
startDataPlaneProcessorsAndAcceptors(authorizerFutures)
startedProcessingRequests = true
} else {
info(&quot;Socket server acceptors and processors already started&quot;)
}
}
info(&quot;Started socket server acceptors and processors&quot;)
}
```
如果在你的环境中你看不到startProcessingRequests方法不用感到惊慌。这是今年4月16日刚刚添加的方法。你需要使用git命令去拉取最新的Trunk分支代码就能看到这个方法了。
这个方法又进一步调用了startDataPlaneProcessorsAndAcceptors和startControlPlaneProcessorAndAcceptor方法分别启动Data plane的Control plane的线程。鉴于这两个方法的逻辑类似我们重点学习下startDataPlaneProcessorsAndAcceptors方法的实现。
```
private def startDataPlaneProcessorsAndAcceptors(
authorizerFutures: Map[Endpoint, CompletableFuture[Void]]): Unit = {
// 获取Broker间通讯所用的监听器默认是PLAINTEXT
val interBrokerListener = dataPlaneAcceptors.asScala.keySet
.find(_.listenerName == config.interBrokerListenerName)
.getOrElse(throw new IllegalStateException(s&quot;Inter-broker listener ${config.interBrokerListenerName} not found, endpoints=${dataPlaneAcceptors.keySet}&quot;))
val orderedAcceptors = List(dataPlaneAcceptors.get(interBrokerListener)) ++
dataPlaneAcceptors.asScala.filter { case (k, _) =&gt; k != interBrokerListener }.values
orderedAcceptors.foreach { acceptor =&gt;
val endpoint = acceptor.endPoint
// 启动Processor和Acceptor线程
startAcceptorAndProcessors(DataPlaneThreadPrefix, endpoint, acceptor, authorizerFutures)
}
}
```
该方法主要的逻辑是调用startAcceptorAndProcessors方法启动Acceptor和Processor线程。当然在此之前代码要获取Broker间通讯所用的监听器并找出该监听器对应的Acceptor线程以及它维护的Processor线程池。
好了现在我要告诉你到底是在哪里设置用于Control plane的监听器了。Broker端参数control.plane.listener.name就是用于设置Control plane所用的监听器的地方。
在默认情况下这个参数的值是空Null。Null的意思就是告诉Kafka不要启用请求优先级区分机制但如果你设置了这个参数Kafka就会利用它去listeners中寻找对应的监听器了。
我举个例子说明下。假设你的Broker端相应配置如下
```
listener.security.protocol.map=CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:SSL
listeners=CONTROLLER://192.1.1.8:9091,INTERNAL://192.1.1.8:9092,EXTERNAL://10.1.1.5:9093
control.plane.listener.name=CONTROLLER
```
那么名字是CONTROLLER的那套监听器将被用于Control plane。换句话说名字是INTERNAL和EXTERNAL的这两组监听器用于Data plane。在代码中Kafka是如何知道CONTROLLER这套监听器是给Control plane使用的呢简单来说这是通过KafkaConfig中的3个方法完成的。KafkaConfig类封装了Broker端所有参数的信息同时还定义了很多实用的工具方法。
现在,我结合上面的配置例子,用一张图的方式来说明这些代码方法的调用关系,以及主要方法的实现逻辑。
<img src="https://static001.geekbang.org/resource/image/f2/9c/f28bb3b2fc5c32fb05b3e585f7889e9c.jpg" alt="">
图中涉及3个方法它们的调用关系是自底向上即最下面的方法调用中间的方法中间的方法调用最上面的方法。现在我来具体解释下代码是怎么找到Control plane对应的监听器的。
首先代码要去获取Broker端参数control.plane.listener.name的值。在这个例子中该值是CONTROLLER字符串
之后读取Broker端参数listener.security.protocol.map值并找出CONTROLLER对应的安全认证协议。在这个例子中CONTROLLER对应的安全认证协议是PLAINTEXT。controlPlaneListenerName方法的作用是拿到这组值&lt;CONTROLLERPLAINTEXT&gt;对。
最后controlPlaneListener方法拿到这组值后取出监听器名称CONTROLLER去寻找Broker端参数listeners中对应的监听器。在这里这个监听器就是CONTROLLER://192.1.1.8:9091。这就是确认Control plane监听器完整的查找逻辑。
你可以打开KafkaConfig.scala文件依次找到这3个方法的实现代码。这里我们重点查看下getControlPlaneListenerNameAndSecurityProtocol方法的代码实现
```
private def getControlPlaneListenerNameAndSecurityProtocol: Option[(ListenerName, SecurityProtocol)] = {
// 查看Broker端参数control.plane.listener.name值
// 即是否启用了control plane监听器
Option(getString(KafkaConfig.ControlPlaneListenerNameProp)) match {
// 如果启用了
case Some(name) =&gt;
val listenerName = ListenerName.normalised(name)
// 必须同时设置Broker端参数listener.security.protocol.map
// 并从该参数值中提取出该监听器对应的安全认证协议
val securityProtocol = listenerSecurityProtocolMap.getOrElse(listenerName,
throw new ConfigException(s&quot;Listener with ${listenerName.value} defined in &quot; +
s&quot;${KafkaConfig.ControlPlaneListenerNameProp} not found in ${KafkaConfig.ListenerSecurityProtocolMapProp}.&quot;))
// 返回&lt;监听器名称,安全认证协议&gt;对
Some(listenerName, securityProtocol)
// 如果没有设置该参数值直接返回None说明没有启用control plane监听器
case None =&gt; None
}
}
```
这段代码的核心就是getString那一行即Kafka会提取名为ControlPlaneListenerNameProp参数的值而它就是control.plane.listener.name参数值。
拿到了这个参数值之后controlPlaneListener方法会记录下这个值然后把它传入到SocketServer的createControlPlaneAcceptorAndProcessor方法中。这样SocketServer就能知道你到底有没有为Control plane设置专属监听器了。
讲到这里Data plane和Control plane的内容我就说完了。现在我再来具体解释下它们和请求优先级之间的关系。
严格来说Kafka没有为请求设置数值型的优先级因此我们并不能把所有请求按照所谓的优先级进行排序。到目前为止Kafka仅仅实现了粗粒度的优先级处理即整体上把请求分为数据类请求和控制类请求两类而且没有为这两类定义可相互比较的优先级。那我们应该如何把刚刚说的所有东西和这里的优先级进行关联呢
通过刚刚的学习,我们知道,社区定义了多套监听器以及底层处理线程的方式来区分这两大类请求。虽然我们很难直接比较这两大类请求的优先级,但在实际应用中,由于数据类请求的数量要远多于控制类请求,因此,为控制类请求单独定义处理资源的做法,实际上就等同于拔高了控制类请求的优先处理权。从这个角度上来说,这套做法间接实现了优先级的区别对待。
## 总结
好了我们来小结一下。今天我们重点学习了社区实现不同类型请求优先级的方法。结合监听器的概念我们深入到SocketServer的源码中分析了Data plane和Control plane的实现原理。我们来回顾一下这节课的重点。
- Data plane负责处理数据类请求这类请求通常不需要高优先级处理。
- Control plane负责处理控制类请求这类请求需要高优先级处理。
- 监听器Kafka允许Broker定义多套监听器每套监听器可用于Data plane或Control plane。
- 优先级实现原理你要知道的是社区设计了两套资源分别处理Data plane和Control plane请求。
<img src="https://static001.geekbang.org/resource/image/ee/8c/eec8d1027bf77384d8d2fb8116af948c.jpg" alt="">
下节课我会带你串联起网络I/O层的所有组件并且结合源码带你深入理解一个请求在Kafka中是如何被处理的。敬请期待。
## 课后讨论
最后,我们来思考一个问题:如果不使用多套资源的方案,而是在请求队列这个层面进行改进,你觉得能够实现不同请求不同优先级的需求吗?比如说,将请求队列改造成支持抢占式的优先级队列方案,你可以说出这两个方案的优劣吗?
欢迎你在留言区畅所欲言,跟我交流讨论,也欢迎你把今天的内容分享给你的朋友。

View File

@@ -0,0 +1,467 @@
<audio id="audio" title="09 | SocketServer请求处理全流程源码分析" controls="" preload="none"><source id="mp3" src="https://static001.geekbang.org/resource/audio/ba/74/baed9c709c9de3f084d51aedeeb93f74.mp3"></audio>
你好我是胡夕。前几节课我们花了很多时间学习SocketServer核心组件的源代码包括Acceptor线程、Processor线程也研究了Data plane和Control plane针对不同类型请求的处理方案。
今天我带你完整地梳理一下Kafka请求处理的全流程。这个全流程涉及到多个源码文件为了弄懂其中的原理我们必须在不同的方法间“跳来跳去”。比起学习单个源码文件将多个文件中的方法组合在一起串成完整流程要难得多因此你最好多花一些时间仔细研读一下跟这套流程相关的所有方法。
当然了,你可能有这样的疑问:“我为什么要关心请求被处理的流程呢?阅读这部分源码的意义是什么呢?”其实,**弄明白这部分原理非常有助于我们有针对性地调优Broker端请求处理的性能**。
举个例子Broker端有两个参数与这个流程相关分别是num.network.threads和num.io.threads。如果我们不掌握请求被处理的流程是没有办法有的放矢地调整这些参数的。
要知道Kafka官网可没有告诉我们什么是网络线程和I/O线程。如果不明白“请求是被网络线程接收并放入请求队列的”这件事我们就很可能犯这样的错误——当请求队列快满了的时候我们会以为是网络线程处理能力不够进而盲目地增加num.network.threads值但最终效果很可能是适得其反的。我相信在今天的课程结束之后你就会知道碰到这种情况的时候我们更应该增加的是num.io.threads的值。
num.io.threads参数表征的就是I/O线程池的大小。所谓的I/O线程池即KafkaRequestHandlerPool也称请求处理线程池。这节课我会先讲解**KafkaRequestHandlerPool源码**,再具体解析**请求处理全流程的代码**。
## KafkaRequestHandlerPool
**KafkaRequestHandlerPool是真正处理Kafka请求的地方**。切记Kafka中处理请求的类不是SocketServer也不是RequestChannel而是KafkaRequestHandlerPool。
它所在的文件是KafkaRequestHandler.scala位于core包的src/main/scala/kafka/server下。这是一个不到400行的小文件掌握起来并不难。
我先用一张图给你展示下这个文件里都有哪些组件:
<img src="https://static001.geekbang.org/resource/image/d3/f9/d3e7713bab984782dec557c534c558f9.jpg" alt="">
- **KafkaRequestHandler**请求处理线程类。每个请求处理线程实例负责从SocketServer的RequestChannel的请求队列中获取请求对象并进行处理。
- **KafkaRequestHandlerPool**:请求处理线程池,负责创建、维护、管理和销毁下辖的请求处理线程。
- **BrokerTopicMetrics**Broker端与主题相关的监控指标的管理类。
- **BrokerTopicStatsC**定义Broker端与主题相关的监控指标的管理操作。
- **BrokerTopicStatsO**BrokerTopicStats的伴生对象类定义Broker端与主题相关的监控指标比如常见的MessagesInPerSec和MessagesOutPerSec等。
我们重点看前两个组件的代码。后面的三个类或对象都是与监控指标相关的代码多为一些工具类方法或定义常量非常容易理解。所以我们不必在它们身上花费太多时间要把主要精力放在KafkaRequestHandler及其相关管理类的学习上。
### KafkaRequestHandler
首先,我们来看下它的定义:
```
// 关键字段说明
// id: I/O线程序号
// brokerId所在Broker序号即broker.id值
// totalHandlerThreadsI/O线程池大小
// requestChannel请求处理通道
// apisKafkaApis类用于真正实现请求处理逻辑的类
class KafkaRequestHandler(
id: Int,
brokerId: Int,
val aggregateIdleMeter: Meter,
val totalHandlerThreads: AtomicInteger,
val requestChannel: RequestChannel,
apis: KafkaApis,
time: Time) extends Runnable with Logging {
......
}
```
从定义可知KafkaRequestHandler是一个Runnable对象因此你可以把它当成是一个线程。每个KafkaRequestHandler实例都有4个关键的属性。
- **id**请求处理线程的序号类似于Processor线程的ID序号仅仅用于标识这是线程池中的第几个线程。
- **brokerId**Broker序号用于标识这是哪个Broker上的请求处理线程。
- **requestChannel**SocketServer中的请求通道对象。KafkaRequestHandler对象为什么要定义这个字段呢我们说过它是负责处理请求的类那请求保存在什么地方呢实际上请求恰恰是保存在RequestChannel中的请求队列中因此Kafka在构造KafkaRequestHandler实例时必须关联SocketServer组件中的RequestChannel实例也就是说要让I/O线程能够找到请求被保存的地方。
- **apis**这是一个KafkaApis类。如果说KafkaRequestHandler是真正处理请求的那么KafkaApis类就是真正执行请求处理逻辑的地方。在第10节课我会具体讲解KafkaApis的代码。目前你需要知道的是它有个handle方法用于执行请求处理逻辑。
既然KafkaRequestHandler是一个线程类那么除去常规的close、stop、initiateShutdown和awaitShutdown方法最重要的当属run方法实现了如下所示
```
def run(): Unit = {
// 只要该线程尚未关闭,循环运行处理逻辑
while (!stopped) {
val startSelectTime = time.nanoseconds
// 从请求队列中获取下一个待处理的请求
val req = requestChannel.receiveRequest(300)
val endTime = time.nanoseconds
// 统计线程空闲时间
val idleTime = endTime - startSelectTime
// 更新线程空闲百分比指标
aggregateIdleMeter.mark(idleTime / totalHandlerThreads.get)
req match {
// 关闭线程请求
case RequestChannel.ShutdownRequest =&gt;
debug(s&quot;Kafka request handler $id on broker $brokerId received shut down command&quot;)
// 关闭线程
shutdownComplete.countDown()
return
// 普通请求
case request: RequestChannel.Request =&gt;
try {
request.requestDequeueTimeNanos = endTime
trace(s&quot;Kafka request handler $id on broker $brokerId handling request $request&quot;)
// 由KafkaApis.handle方法执行相应处理逻辑
apis.handle(request)
} catch {
// 如果出现严重错误,立即关闭线程
case e: FatalExitError =&gt;
shutdownComplete.countDown()
Exit.exit(e.statusCode)
// 如果是普通异常,记录错误日志
case e: Throwable =&gt; error(&quot;Exception when handling request&quot;, e)
} finally {
// 释放请求对象占用的内存缓冲区资源
request.releaseBuffer()
}
case null =&gt; // 继续
}
}
shutdownComplete.countDown()
}
```
虽然我给一些主要的代码都标记了注释但为了方便你更好地理解我画一张图借助它来展示下KafkaRequestHandler线程的处理逻辑
<img src="https://static001.geekbang.org/resource/image/b5/4d/b5f6d3b4ecea86a3e66a29953034dc4d.jpg" alt="">
我来解释下run方法的主要运行逻辑。它的所有执行逻辑都在while循环之下因此只要标志线程关闭状态的stopped为falserun方法将一直循环执行while下的语句。
第1步是从请求队列中获取下一个待处理的请求同时更新一些相关的统计指标。如果本次循环没取到那么本轮循环结束进入到下一轮。如果是ShutdownRequest请求则说明该Broker发起了关闭操作。
而Broker关闭时会调用KafkaRequestHandler的shutdown方法进而调用initiateShutdown方法以及RequestChannel的sendShutdownRequest方法而后者就是将ShutdownRequest写入到请求队列。
一旦从请求队列中获取到ShutdownRequestrun方法代码会调用shutdownComplete的countDown方法正式完成对KafkaRequestHandler线程的关闭操作。你看看KafkaRequestHandlerPool的shutdown方法代码就能明白这是怎么回事了。
```
def shutdown(): Unit = synchronized {
info(&quot;shutting down&quot;)
for (handler &lt;- runnables)
handler.initiateShutdown() // 调用initiateShutdown方法发起关闭
for (handler &lt;- runnables)
// 调用awaitShutdown方法等待关闭完成
// run方法一旦调用countDown方法这里将解除等待状态
handler.awaitShutdown()
info(&quot;shut down completely&quot;)
}
```
就像代码注释中写的那样一旦run方法执行了countDown方法程序流解除在awaitShutdown方法这里的等待从而完成整个线程的关闭操作。
我们继续说回run方法。如果从请求队列中获取的是普通请求那么首先更新请求移出队列的时间戳然后交由KafkaApis的handle方法执行实际的请求处理逻辑代码。待请求处理完成并被释放缓冲区资源后代码进入到下一轮循环周而复始地执行以上所说的逻辑。
### KafkaRequestHandlerPool
从上面的分析来看KafkaRequestHandler逻辑大体上还是比较简单的。下面我们来看下KafkaRequestHandlerPool线程池的实现。它是管理I/O线程池的实现逻辑也不复杂。它的shutdown方法前面我讲过了这里我们重点学习下**它是如何创建这些线程的,以及创建它们的时机**。
首先看它的定义:
```
// 关键字段说明
// brokerId所属Broker的序号即broker.id值
// requestChannelSocketServer组件下的RequestChannel对象
// apiKafkaApis类实际请求处理逻辑类
// numThreadsI/O线程池初始大小
class KafkaRequestHandlerPool(
val brokerId: Int,
val requestChannel: RequestChannel,
val apis: KafkaApis,
time: Time,
numThreads: Int,
requestHandlerAvgIdleMetricName: String,
logAndThreadNamePrefix : String)
extends Logging with KafkaMetricsGroup {
// I/O线程池大小
private val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads)
// I/O线程池
val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads)
......
}
```
KafkaRequestHandlerPool对象定义了7个属性其中比较关键的有4个我分别来解释下。
- **brokerId**和KafkaRequestHandler中的一样保存Broker的序号。
- **requestChannel**SocketServer的请求处理通道它下辖的请求队列为所有I/O线程所共享。requestChannel字段也是KafkaRequestHandler类的一个重要属性。
- **apis**KafkaApis实例执行实际的请求处理逻辑。它同时也是KafkaRequestHandler类的一个重要属性。
- **numThreads**线程池中的初始线程数量。它是Broker端参数num.io.threads的值。目前Kafka支持动态修改I/O线程池的大小因此这里的numThreads是初始线程数调整后的I/O线程池的实际大小可以和numThreads不一致。
这里我再详细解释一下numThreads属性和实际线程池中线程数的关系。就像我刚刚说过的I/O线程池的大小是可以修改的。如果你查看KafkaServer.scala中的startup方法你会看到以下这两行代码
```
// KafkaServer.scala
dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time, config.numIoThreads, s&quot;${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent&quot;, SocketServer.DataPlaneThreadPrefix)
controlPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.controlPlaneRequestChannelOpt.get, controlPlaneRequestProcessor, time, 1, s&quot;${SocketServer.ControlPlaneMetricPrefix}RequestHandlerAvgIdlePercent&quot;, SocketServer.ControlPlaneThreadPrefix)
```
由代码可知Data plane所属的KafkaRequestHandlerPool线程池的初始数量就是Broker端的参数nums.io.threads即这里的config.numIoThreads值而用于Control plane的线程池的数量则硬编码为1。
因此你可以发现Broker端参数num.io.threads的值控制的是Broker启动时KafkaRequestHandler线程的数量。因此**当你想要在一开始就提升Broker端请求处理能力的时候不妨试着增加这个参数值**。
除了上面那4个属性该类还定义了一个threadPoolSize变量。本质上它就是用AtomicInteger包了一层numThreads罢了。
为什么要这么做呢这是因为目前Kafka支持动态调整KafkaRequestHandlerPool线程池的线程数量但类定义中的numThreads一旦传入就不可变更了因此需要单独创建一个支持更新操作的线程池数量的变量。至于为什么使用AtomicInteger你应该可以想到这是为了保证多线程访问的线程安全性。毕竟这个线程池大小的属性可能被多个线程访问到而AtomicInteger本身提供的原子操作能够有效地确保这种并发访问同时还能提供必要的内存可见性。
既然是管理I/O线程池的类KafkaRequestHandlerPool中最重要的字段当属线程池字段runnables了。就代码而言Kafka选择使用Scala的数组对象类实现I/O线程池。
**createHandler方法**
当线程池初始化时Kafka使用下面这段代码批量创建线程并将它们添加到线程池中
```
for (i &lt;- 0 until numThreads) {
createHandler(i) // 创建numThreads个I/O线程
}
// 创建序号为指定id的I/O线程对象并启动该线程
def createHandler(id: Int): Unit = synchronized {
// 创建KafkaRequestHandler实例并加入到runnables中
runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time)
// 启动KafkaRequestHandler线程
KafkaThread.daemon(logAndThreadNamePrefix + &quot;-kafka-request-handler-&quot; + id, runnables(id)).start()
}
```
我来解释下这段代码。源码使用for循环批量调用createHandler方法创建多个I/O线程。createHandler方法的主体逻辑分为三步
1. 创建KafkaRequestHandler实例
1. 将创建的线程实例加入到线程池数组;
1. 启动该线程。
**resizeThreadPool方法**
下面我们说说resizeThreadPool方法的代码。这个方法的目的是**把I/O线程池的线程数重设为指定的数值**。代码如下:
```
def resizeThreadPool(newSize: Int): Unit = synchronized {
val currentSize = threadPoolSize.get
info(s&quot;Resizing request handler thread pool size from $currentSize to $newSize&quot;)
if (newSize &gt; currentSize) {
for (i &lt;- currentSize until newSize) {
createHandler(i)
}
} else if (newSize &lt; currentSize) {
for (i &lt;- 1 to (currentSize - newSize)) {
runnables.remove(currentSize - i).stop()
}
}
threadPoolSize.set(newSize)
}
```
该方法首先获取当前线程数量。如果目标数量比当前数量大就利用刚才说到的createHandler方法将线程数补齐到目标值newSize否则的话就将多余的线程从线程池中移除并停止它们。最后把标识线程数量的变量threadPoolSize的值调整为目标值newSize。
至此KafkaRequestHandlerPool类的3个方法shutdown、createHandler和resizeThreadPool我们就学完了。总体而言它就是负责管理I/O线程池的类。
## 全处理流程
有了上面的这些铺垫我们就可以来学习下Kafka请求处理全流程的代码路径了。
我们再来看一下[第7讲](https://time.geekbang.org/column/article/231139)里的这张图。上一次,我主要是想借助它,让你对网络线程池有个整体的了解,今天,我来具体给你讲解下,这张图所展示的完整请求处理逻辑。
<img src="https://static001.geekbang.org/resource/image/52/e8/52c3226ad4736751b4b1ccfcb2a09ee8.jpg" alt="">
图中一共有6步。我分别解释一下同时还会带你去找寻对应的源码。
### 第1步Clients或其他Broker发送请求给Acceptor线程
我在第7节课讲过Acceptor线程实时接收来自外部的发送请求。一旦接收到了之后就会创建对应的Socket通道就像下面这段代码所示
```
// SocketServer.scala中Acceptor的run方法片段
// 读取底层通道上准备就绪I/O操作的数量
val ready = nioSelector.select(500)
// 如果存在准备就绪的I/O事件
if (ready &gt; 0) {
// 获取对应的SelectionKey集合
val keys = nioSelector.selectedKeys()
val iter = keys.iterator()
// 遍历这些SelectionKey
while (iter.hasNext &amp;&amp; isRunning) {
try {
val key = iter.next
iter.remove()
// 测试SelectionKey的底层通道是否能够接受新Socket连接
if (key.isAcceptable) {
// 接受此连接并分配对应的Processor线程
accept(key).foreach { socketChannel =&gt;
var processor: Processor = null
do {
retriesLeft -= 1
processor = synchronized {
currentProcessorIndex = currentProcessorIndex % processors.length
processors(currentProcessorIndex)
}
currentProcessorIndex += 1
// 将新Socket连接加入到Processor线程待处理连接队列
// 等待Processor线程后续处理
} while (!assignNewConnection(socketChannel, processor, retriesLeft == 0))
}
} else {
......
}
......
}
```
可以看到Acceptor线程通过调用accept方法创建对应的SocketChannel然后将该Channel实例传给assignNewConnection方法等待Processor线程将该Socket连接请求放入到它维护的待处理连接队列中。后续Processor线程的run方法会不断地从该队列中取出这些Socket连接请求然后创建对应的Socket连接。
assignNewConnection方法的主要作用是将这个新建的SocketChannel对象存入Processors线程的newConnections队列中。之后Processor线程会不断轮询这个队列中的待处理Channel可以参考第7讲的configureNewConnections方法并向这些Channel注册基于Java NIO的Selector用于真正的请求获取和响应发送I/O操作。
严格来说Acceptor线程处理的这一步并非真正意义上的获取请求仅仅是Acceptor线程为后续Processor线程获取请求铺路而已也就是把需要用到的Socket通道创建出来传给下面的Processor线程使用。
### 第2 &amp; 3步Processor线程处理请求并放入请求队列
一旦Processor线程成功地向SocketChannel注册了SelectorClients端或其他Broker端发送的请求就能通过该SocketChannel被获取到具体的方法是Processor的processCompleteReceives
```
// SocketServer.scala
private def processCompletedReceives(): Unit = {
// 从Selector中提取已接收到的所有请求数据
selector.completedReceives.asScala.foreach { receive =&gt;
try {
// 打开与发送方对应的Socket Channel如果不存在可用的Channel抛出异常
openOrClosingChannel(receive.source) match {
case Some(channel) =&gt;
......
val header = RequestHeader.parse(receive.payload)
if (header.apiKey == ApiKeys.SASL_HANDSHAKE &amp;&amp; channel.maybeBeginServerReauthentication(receive, nowNanosSupplier))
……
else {
val nowNanos = time.nanoseconds()
if (channel.serverAuthenticationSessionExpired(nowNanos)) {
……
} else {
val connectionId = receive.source
val context = new RequestContext(header, connectionId, channel.socketAddress,
channel.principal, listenerName, securityProtocol,
channel.channelMetadataRegistry.clientInformation)
// 根据Channel中获取的Receive对象构建Request对象
val req = new RequestChannel.Request(processor = id, context = context,
startTimeNanos = nowNanos, memoryPool, receive.payload, requestChannel.metrics)
……
// 将该请求放入请求队列
requestChannel.sendRequest(req)
......
}
}
……
}
} catch {
……
}
}
}
```
因为代码很多我进行了精简只保留了最关键的逻辑。该方法会将Selector获取到的所有Receive对象转换成对应的Request对象然后将这些Request实例放置到请求队列中就像上图中第2、3步展示的那样。
所谓的Processor线程处理请求就是指它从底层I/O获取到发送数据将其转换成Request对象实例并最终添加到请求队列的过程。
### 第4步I/O线程处理请求
所谓的I/O线程就是我们开头提到的KafkaRequestHandler线程它的处理逻辑就在KafkaRequestHandler类的run方法中
```
// KafkaRequestHandler.scala
def run(): Unit = {
while (!stopped) {
......
// 从请求队列中获取Request实例
val req = requestChannel.receiveRequest(300)
......
req match {
case RequestChannel.ShutdownRequest =&gt;
......
case request: RequestChannel.Request =&gt;
try {
......
apis.handle(request)
} {
......
}
case null =&gt; // 什么也不做
}
}
......
}
```
KafkaRequestHandler线程循环地从请求队列中获取Request实例然后交由KafkaApis的handle方法执行真正的请求处理逻辑。
### 第5步KafkaRequestHandler线程将Response放入Processor线程的Response队列
这一步的工作由KafkaApis类完成。当然这依然是由KafkaRequestHandler线程来完成的。KafkaApis.scala中有个sendResponse方法将Request的处理结果Response发送出去。本质上它就是调用了RequestChannel的sendResponse方法代码如下
```
def sendResponse(response: RequestChannel.Response): Unit = {
......
// 找到这个Request当初是由哪个Processor线程处理的
val processor = processors.get(response.processor)
if (processor != null) {
// 将Response添加到该Processor线程的Response队列上
processor.enqueueResponse(response)
}
}
```
### 第6步Processor线程发送Response给Request发送方
最后一步是Processor线程取出Response队列中的Response返还给Request发送方。具体代码位于Processor线程的processNewResponses方法中
```
// SocketServer.scala
private def processNewResponses(): Unit = {
var currentResponse: RequestChannel.Response = null
while ({currentResponse = dequeueResponse(); currentResponse != null}) { // 循环获取Response队列中的Response
val channelId = currentResponse.request.context.connectionId
try {
currentResponse match {
case response: NoOpResponse =&gt; // 不需要发送Response
updateRequestMetrics(response)
trace(s&quot;Socket server received empty response to send, registering for read: $response&quot;)
handleChannelMuteEvent(channelId, ChannelMuteEvent.RESPONSE_SENT)
tryUnmuteChannel(channelId)
case response: SendResponse =&gt; // 需要发送Response
sendResponse(response, response.responseSend)
......
}
}
......
}
}
```
从这段代码可知最核心的部分是sendResponse方法来执行Response发送。该方法底层使用Selector实现真正的发送逻辑。至此一个请求被完整处理的流程我就讲完了。
最后我想再补充一点还记得我之前说过有些Response是需要有回调逻辑的吗
实际上在第6步执行完毕之后Processor线程通常还会尝试执行Response中的回调逻辑即Processor类的processCompletedSends方法。不过并非所有Request或Response都指定了回调逻辑。事实上只有很少的Response携带了回调逻辑。比如说FETCH请求在发送Response之后就要求更新下Broker端与消息格式转换操作相关的统计指标。
## 总结
今天我们学习了KafkaRequestHandlerPool线程池及其下辖的KafkaRequestHandler线程该线程就是Kafka社区所称的I/O线程。另外我结合源代码把Kafka的请求处理流程串讲了一遍。我们来回顾下这节课的重点。
- KafkaRequestHandlerI/O线程负责处理Processor线程下发的Request对象。
- KafkaRequestHandlerPool创建和管理一组KafkaRequestHandler线程。
- 请求处理流程总共分为6步。
1. Clients或其他Broker通过Selector机制发起创建连接请求。
1. Processor线程接收请求并将其转换成可处理的Request对象。
1. Processor线程将Request对象放入Request队列。
1. KafkaRequestHandler线程从Request队列中取出待处理请求并进行处理。
1. KafkaRequestHandler线程将Response放回到对应Processor线程的Response队列。
1. Processor线程发送Response给Request发送方。
<img src="https://static001.geekbang.org/resource/image/45/c4/458e65efcab7964911bb6a1755fa89c4.jpg" alt="">
其实今天在谈到Request逻辑执行的时候我卖了个关子——我提到KafkaApis是请求逻辑的真正处理方法。也就是说所有类型的请求处理逻辑都封装在KafkaApis文件下但我并没有深入地去讲它。下节课我会重点和你聊聊这个KafkaApis类。我一直认为该类是查看所有Kafka源码的首要入口类绝对值得我们花一整节课的时间去学习。
## 课后讨论
最后,请你结合今天的内容思考一个问题:你觉得,请求处理流程的哪些部分应用了经典的“生产者-消费者”模式?
欢迎你在留言区畅所欲言,跟我交流讨论,也欢迎你把今天的内容分享给你的朋友。

View File

@@ -0,0 +1,325 @@
<audio id="audio" title="10 | KafkaApisKafka最重要的源码入口没有之一" controls="" preload="none"><source id="mp3" src="https://static001.geekbang.org/resource/audio/05/61/05454b44470855c818a8aa247ebca361.mp3"></audio>
你好我是胡夕。今天我们来收尾Kafka请求处理模块的源码学习。讲到这里关于整个模块我们还有最后一个知识点尚未掌握那就是KafkaApis类。
在上节课中我提到过请求的实际处理逻辑是封装在KafkaApis类中的。你一定很想知道这个类到底是做什么的吧。
实际上我一直认为KafkaApis是Kafka最重要的源码入口。因为每次要查找Kafka某个功能的实现代码时我们几乎总要从这个KafkaApis.scala文件开始找起然后一层一层向下钻取直到定位到实现功能的代码处为止。比如如果你想知道创建Topic的流程你只需要查看KafkaApis的handleCreateTopicsRequest方法如果你想弄懂Consumer提交位移是怎么实现的查询handleOffsetCommitRequest方法就行了。
除此之外,在这一遍遍的钻取过程中,我们还会慢慢地**掌握Kafka实现各种功能的代码路径和源码分布从而建立起对整个Kafka源码工程的完整认识**。
如果这些还不足以吸引你阅读这部分源码,那么,我再给你分享一个真实的案例。
之前在使用Kafka时我发现Producer程序一旦向一个不存在的主题发送消息在创建主题之后Producer端会抛出一个警告
```
Error while fetching metadata with correlation id 3 : {test-topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
```
我一直很好奇这里的LEADER_NOT_AVAILABLE异常是在哪里抛出来的。直到有一天我在浏览KafkaApis代码时突然发现了createTopics方法的这两行代码
```
private def createTopic(topic: String,
numPartitions: Int, replicationFactor: Int,
properties: util.Properties = new util.Properties()): MetadataResponseTopic = {
try {
adminZkClient.createTopic(topic, numPartitions, replicationFactor, properties, RackAwareMode.Safe)
......
// 显式封装一个LEADER_NOT_AVAILABLE Response
metadataResponseTopic(Errors.LEADER_NOT_AVAILABLE, topic, isInternal(topic), util.Collections.emptyList())
} catch {
......
}
}
```
这时我才恍然大悟原来Broker端创建完主题后会显式地通知Clients端LEADER_NOT_AVAILABLE异常。Clients端接收到该异常后会主动更新元数据去获取新创建主题的信息。你看如果不是亲自查看源代码我们是无法解释这种现象的。
那么既然KafkaApis这么重要现在我们就来看看这个大名鼎鼎的入口文件吧。我会先给你介绍下它的定义以及最重要的handle方法然后再解释一下其他的重要方法。学完这节课以后你就能掌握从KafkaApis类开始去寻找单个功能具体代码位置的方法了。
事实上,相比于之前更多是向你分享知识的做法,**这节课我分享的是学习知识的方法**。
## KafkaApis类定义
好了, 我们首先来看下KafkaApis类的定义。KafkaApis类定义在源码文件KafkaApis.scala中。该文件位于core工程的server包下是一个将近3000行的巨型文件。好在它实现的逻辑并不复杂绝大部分代码都是用来处理所有Kafka请求类型的因此代码结构整体上显得非常规整。一会儿我们在学习handle方法时你一定会所有体会。
KafkaApis类的定义代码如下
```
class KafkaApis(
val requestChannel: RequestChannel, // 请求通道
val replicaManager: ReplicaManager, // 副本管理器
val adminManager: AdminManager, // 主题、分区、配置等方面的管理器
val groupCoordinator: GroupCoordinator, // 消费者组协调器组件
val txnCoordinator: TransactionCoordinator, // 事务管理器组件
val controller: KafkaController, // 控制器组件
val zkClient: KafkaZkClient, // ZooKeeper客户端程序Kafka依赖于该类实现与ZooKeeper交互
val brokerId: Int, // broker.id参数值
val config: KafkaConfig, // Kafka配置类
val metadataCache: MetadataCache, // 元数据缓存类
val metrics: Metrics,
val authorizer: Option[Authorizer],
val quotas: QuotaManagers, // 配额管理器组件
val fetchManager: FetchManager,
brokerTopicStats: BrokerTopicStats,
val clusterId: String,
time: Time,
val tokenManager: DelegationTokenManager) extends Logging {
type FetchResponseStats = Map[TopicPartition, RecordConversionStats]
this.logIdent = &quot;[KafkaApi-%d] &quot;.format(brokerId)
val adminZkClient = new AdminZkClient(zkClient)
private val alterAclsPurgatory = new DelayedFuturePurgatory(purgatoryName = &quot;AlterAcls&quot;, brokerId = config.brokerId)
......
}
```
我为一些重要的字段添加了注释信息。为了方便你理解,我还画了一张思维导图,罗列出了比较重要的组件:
<img src="https://static001.geekbang.org/resource/image/4f/cc/4fc050472d3c81fa27564297e07d67cc.jpg" alt="">
从这张图可以看出KafkaApis下可谓是大牌云集。放眼整个源码工程KafkaApis关联的“大佬级”组件都是最多的在KafkaApis中你几乎能找到Kafka所有重量级的组件比如负责副本管理的ReplicaManager、维护消费者组的GroupCoordinator以及操作Controller组件的KafkaController等等。
在处理不同类型的RPC请求时KafkaApis会用到不同的组件因此在创建KafkaApis实例时我们必须把可能用到的组件一并传给它这也是它汇聚众多大牌组件于一身的原因。
我说KafkaApis是入口类的另一个原因也在于此。你完全可以打开KafkaApis.scala文件然后根据它的定义一个一个地去研习这些重量级组件的实现原理。等你对这些组件的代码了然于胸了说不定下一个写源码课的人就是你了。
## KafkaApis方法入口
作为Kafka源码的入口类它都定义了哪些方法呢
如果你翻开KafkaApis类的代码你会发现它封装了很多以handle开头的方法。每一个这样的方法都对应于一类请求类型而它们的总方法入口就是handle方法。实际上你完全可以在handle方法间不断跳转去到任意一类请求被处理的实际代码中。下面这段代码就是handle方法的完整实现我们来看一下
```
def handle(request: RequestChannel.Request): Unit = {
try {
trace(s&quot;Handling request:${request.requestDesc(true)} from connection ${request.context.connectionId};&quot; +
s&quot;securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}&quot;)
// 根据请求头部信息中的apiKey字段判断属于哪类请求
// 然后调用响应的handle***方法
// 如果新增RPC协议类型
// 1. 添加新的apiKey标识新请求类型
// 2. 添加新的case分支
// 3. 添加对应的handle***方法
request.header.apiKey match {
case ApiKeys.PRODUCE =&gt; handleProduceRequest(request)
case ApiKeys.FETCH =&gt; handleFetchRequest(request)
case ApiKeys.LIST_OFFSETS =&gt; handleListOffsetRequest(request)
case ApiKeys.METADATA =&gt; handleTopicMetadataRequest(request)
case ApiKeys.LEADER_AND_ISR =&gt; handleLeaderAndIsrRequest(request)
case ApiKeys.STOP_REPLICA =&gt; handleStopReplicaRequest(request)
case ApiKeys.UPDATE_METADATA =&gt; handleUpdateMetadataRequest(request)
case ApiKeys.CONTROLLED_SHUTDOWN =&gt; handleControlledShutdownRequest(request)
case ApiKeys.OFFSET_COMMIT =&gt; handleOffsetCommitRequest(request)
case ApiKeys.OFFSET_FETCH =&gt; handleOffsetFetchRequest(request)
case ApiKeys.FIND_COORDINATOR =&gt; handleFindCoordinatorRequest(request)
case ApiKeys.JOIN_GROUP =&gt; handleJoinGroupRequest(request)
case ApiKeys.HEARTBEAT =&gt; handleHeartbeatRequest(request)
case ApiKeys.LEAVE_GROUP =&gt; handleLeaveGroupRequest(request)
case ApiKeys.SYNC_GROUP =&gt; handleSyncGroupRequest(request)
case ApiKeys.DESCRIBE_GROUPS =&gt; handleDescribeGroupRequest(request)
case ApiKeys.LIST_GROUPS =&gt; handleListGroupsRequest(request)
case ApiKeys.SASL_HANDSHAKE =&gt; handleSaslHandshakeRequest(request)
case ApiKeys.API_VERSIONS =&gt; handleApiVersionsRequest(request)
case ApiKeys.CREATE_TOPICS =&gt; handleCreateTopicsRequest(request)
case ApiKeys.DELETE_TOPICS =&gt; handleDeleteTopicsRequest(request)
case ApiKeys.DELETE_RECORDS =&gt; handleDeleteRecordsRequest(request)
case ApiKeys.INIT_PRODUCER_ID =&gt; handleInitProducerIdRequest(request)
case ApiKeys.OFFSET_FOR_LEADER_EPOCH =&gt; handleOffsetForLeaderEpochRequest(request)
case ApiKeys.ADD_PARTITIONS_TO_TXN =&gt; handleAddPartitionToTxnRequest(request)
case ApiKeys.ADD_OFFSETS_TO_TXN =&gt; handleAddOffsetsToTxnRequest(request)
case ApiKeys.END_TXN =&gt; handleEndTxnRequest(request)
case ApiKeys.WRITE_TXN_MARKERS =&gt; handleWriteTxnMarkersRequest(request)
case ApiKeys.TXN_OFFSET_COMMIT =&gt; handleTxnOffsetCommitRequest(request)
case ApiKeys.DESCRIBE_ACLS =&gt; handleDescribeAcls(request)
case ApiKeys.CREATE_ACLS =&gt; handleCreateAcls(request)
case ApiKeys.DELETE_ACLS =&gt; handleDeleteAcls(request)
case ApiKeys.ALTER_CONFIGS =&gt; handleAlterConfigsRequest(request)
case ApiKeys.DESCRIBE_CONFIGS =&gt; handleDescribeConfigsRequest(request)
case ApiKeys.ALTER_REPLICA_LOG_DIRS =&gt; handleAlterReplicaLogDirsRequest(request)
case ApiKeys.DESCRIBE_LOG_DIRS =&gt; handleDescribeLogDirsRequest(request)
case ApiKeys.SASL_AUTHENTICATE =&gt; handleSaslAuthenticateRequest(request)
case ApiKeys.CREATE_PARTITIONS =&gt; handleCreatePartitionsRequest(request)
case ApiKeys.CREATE_DELEGATION_TOKEN =&gt; handleCreateTokenRequest(request)
case ApiKeys.RENEW_DELEGATION_TOKEN =&gt; handleRenewTokenRequest(request)
case ApiKeys.EXPIRE_DELEGATION_TOKEN =&gt; handleExpireTokenRequest(request)
case ApiKeys.DESCRIBE_DELEGATION_TOKEN =&gt; handleDescribeTokensRequest(request)
case ApiKeys.DELETE_GROUPS =&gt; handleDeleteGroupsRequest(request)
case ApiKeys.ELECT_LEADERS =&gt; handleElectReplicaLeader(request)
case ApiKeys.INCREMENTAL_ALTER_CONFIGS =&gt; handleIncrementalAlterConfigsRequest(request)
case ApiKeys.ALTER_PARTITION_REASSIGNMENTS =&gt; handleAlterPartitionReassignmentsRequest(request)
case ApiKeys.LIST_PARTITION_REASSIGNMENTS =&gt; handleListPartitionReassignmentsRequest(request)
case ApiKeys.OFFSET_DELETE =&gt; handleOffsetDeleteRequest(request)
case ApiKeys.DESCRIBE_CLIENT_QUOTAS =&gt; handleDescribeClientQuotasRequest(request)
case ApiKeys.ALTER_CLIENT_QUOTAS =&gt; handleAlterClientQuotasRequest(request)
}
} catch {
// 如果是严重错误,则抛出异常
case e: FatalExitError =&gt; throw e
// 普通异常的话,记录下错误日志
case e: Throwable =&gt; handleError(request, e)
} finally {
// 记录一下请求本地完成时间即Broker处理完该请求的时间
if (request.apiLocalCompleteTimeNanos &lt; 0)
request.apiLocalCompleteTimeNanos = time.nanoseconds
}
}
```
如果你跟着这门课一直学习的话你应该会发现我很少贴某个类或方法的完整代码因为没必要还会浪费你的时间。但是这个handle方法有点特殊所以我把完整的代码展现给你。
它利用Scala语言中的模式匹配语法完整地列出了对所有请求类型的处理逻辑。通过该方法你能串联出Kafka处理任何请求的源码路径。我强烈推荐你在课下以几个比较重要的请求类型为学习目标从handle方法出发去探寻一下代码是如何为这些请求服务的以加深你对Broker端代码的整体熟练度。这对你后续深入学习源码或解决实际问题非常有帮助。
从上面的代码中,你应该很容易就能找到其中的规律:**这个方法是处理具体请求用的**。处理每类请求的方法名均以handle开头即handle×××Request。比如处理PRODUCE请求的方法叫handleProduceRequest处理FETCH请求的方法叫handleFetchRequest等。
如果你点开ApiKeys你会发现**它实际上是一个枚举类型里面封装了目前Kafka定义所有的RPC协议**。值得一提的是Kafka社区维护了一个官方文档专门记录这些RPC协议包括不同版本所需的Request格式和Response格式。
从这个handle方法中我们也能得到这样的结论每当社区添加新的RPC协议时Broker端大致需要做三件事情。
1. 更新ApiKeys枚举加入新的RPC ApiKey
1. 在KafkaApis中添加对应的handle×××Request方法实现对该RPC请求的处理逻辑
1. 更新KafkaApis的handle方法添加针对RPC协议的case分支。
## 其他重要方法
抛开KafkaApis的定义和handle方法还有几个常用的方法也很重要比如用于发送Response的一组方法以及用于鉴权的方法。特别是前者它是任何一类请求被处理之后都要做的必要步骤。毕竟请求被处理完成还不够Kafka还需要把处理结果发送给请求发送方。
首先就是**sendResponse系列方法**。
为什么说是系列方法呢因为源码中带有sendResponse字眼的方法有7个之多。我分别来介绍一下。
- **sendResponse**RequestChannel.Response最底层的Response发送方法。本质上它调用了SocketServer组件中RequestChannel的sendResponse方法我在前面的课程中讲到过RequestChannel的sendResponse方法会把待发送的Response对象添加到对应Processor线程的Response队列上然后交由Processor线程完成网络间的数据传输。
- **sendResponse**RequestChannel.RequestresponseOpt: Option[AbstractResponse]onComplete: Option[Send =&gt; Unit]该方法接收的实际上是Request而非Response因此它会在内部构造出Response对象之后再调用sendResponse方法。
- **sendNoOpResponseExemptThrottle**发送NoOpResponse类型的Response而不受请求通道上限流throttling的限制。所谓的NoOpResponse是指Processor线程取出该类型的Response后不执行真正的I/O发送操作。
- **sendErrorResponseExemptThrottle**发送携带错误信息的Response而不受限流限制。
- **sendResponseExemptThrottle**发送普通Response而不受限流限制。
- **sendErrorResponseMaybeThrottle**发送携带错误信息的Response但接受限流的约束。
- **sendResponseMaybeThrottle**发送普通Response但接受限流的约束。
这组方法最关键的还是第一个sendResponse方法。大部分类型的请求被处理完成后都会使用这个方法将Response发送出去。至于上面这组方法中的其他方法它们会在内部调用第一个sendResponse方法。当然在调用之前这些方法通常都拥有一些定制化的逻辑。比如sendResponseMaybeThrottle方法就会在执行sendResponse逻辑前先尝试对请求所属的请求通道进行限流操作。因此**我们要着重掌握第一个sendResponse方法是怎么将Response对象发送出去的**。
就像我前面说的,**KafkaApis实际上是把处理完成的Response放回到前端Processor线程的Response队列中而真正将Response返还给Clients或其他Broker的其实是Processor线程而不是执行KafkaApis逻辑的KafkaRequestHandler线程**。
另一个非常重要的方法是authorize方法咱们看看它的代码
```
private[server] def authorize(requestContext: RequestContext,
operation: AclOperation,
resourceType: ResourceType,
resourceName: String,
logIfAllowed: Boolean = true,
logIfDenied: Boolean = true,
refCount: Int = 1): Boolean = {
authorizer.forall { authZ =&gt;
// 获取待鉴权的资源类型
// 常见的资源类型如TOPIC、GROUP、CLUSTER等
val resource = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL)
val actions = Collections.singletonList(new Action(operation, resource, refCount, logIfAllowed, logIfDenied))
// 返回鉴权结果是ALLOWED还是DENIED
authZ.authorize(requestContext, actions).asScala.head == AuthorizationResult.ALLOWED
}
}
```
这个方法是做**授权检验**的。目前Kafka所有的RPC请求都要求发送者无论是Clients还是其他Broker必须具备特定的权限。
接下来我用创建主题的代码来举个例子说明一下authorize方法的实际应用以下是handleCreateTopicsRequest方法的片段
```
// 是否具有CLUSTER资源的CREATE权限
val hasClusterAuthorization = authorize(request, CREATE, CLUSTER, CLUSTER_NAME, logIfDenied = false)
val topics = createTopicsRequest.data.topics.asScala.map(_.name)
// 如果具有CLUSTER CREATE权限则允许主题创建否则还要查看是否具有TOPIC资源的CREATE权限
val authorizedTopics = if (hasClusterAuthorization) topics.toSet else filterAuthorized(request, CREATE, TOPIC, topics.toSeq)
// 是否具有TOPIC资源的DESCRIBE_CONFIGS权限
val authorizedForDescribeConfigs = filterAuthorized(request, DESCRIBE_CONFIGS, TOPIC, topics.toSeq, logIfDenied = false)
.map(name =&gt; name -&gt; results.find(name)).toMap
results.asScala.foreach(topic =&gt; {
if (results.findAll(topic.name).size &gt; 1) {
topic.setErrorCode(Errors.INVALID_REQUEST.code)
topic.setErrorMessage(&quot;Found multiple entries for this topic.&quot;)
} else if (!authorizedTopics.contains(topic.name)) { // 如果不具备CLUSTER资源的CREATE权限或TOPIC资源的CREATE权限认证失败
topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
topic.setErrorMessage(&quot;Authorization failed.&quot;)
}
if (!authorizedForDescribeConfigs.contains(topic.name)) { // 如果不具备TOPIC资源的DESCRIBE_CONFIGS权限设置主题配置错误码
topic.setTopicConfigErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
}
})
......
```
这段代码调用authorize方法来判断Clients方法是否具有创建主题的权限如果没有则显式标记TOPIC_AUTHORIZATION_FAILED告知Clients端。目前Kafka所有的权限控制均发生在KafkaApis中即**所有请求在处理前都需要调用authorize方法做权限校验以保证请求能够被继续执行**。
## KafkaApis请求处理实例解析
在了解了KafkaApis的代码结构之后我拿一段真实的代码来说明一下该类中某个协议处理方法大致的执行流程是什么样的以便让你更清楚地了解请求处理逻辑。
值得注意的是,这里的请求处理逻辑和之前所说的请求处理全流程是有所区别的。今天,我们关注的是**功能层面上请求被处理的逻辑代码**之前的请求处理全流程主要聚焦流程方面的代码即一个请求从被发送到Broker端到Broker端返还Response的代码路径。应该这么说**所有类型请求的被处理流程都是相同的,但是,每类请求却有不同的功能实现逻辑**而这就是KafkaApis类中的各个handle×××Request方法要做的事情。
下面我以handleListGroupsRequest方法为例来介绍一下。顾名思义这是处理ListGroupsRequest请求的方法。这类请求的Response应该返回集群中的消费者组信息。我们来看下它的实现
```
def handleListGroupsRequest(request: RequestChannel.Request): Unit = {
val (error, groups) = groupCoordinator.handleListGroups() // 调用GroupCoordinator的handleListGroups方法拿到所有Group信息
// 如果Clients具备CLUSTER资源的DESCRIBE权限
if (authorize(request, DESCRIBE, CLUSTER, CLUSTER_NAME))
// 直接使用刚才拿到的Group数据封装进Response然后发送
sendResponseMaybeThrottle(request, requestThrottleMs =&gt;
new ListGroupsResponse(new ListGroupsResponseData()
.setErrorCode(error.code)
.setGroups(groups.map { group =&gt; new ListGroupsResponseData.ListedGroup()
.setGroupId(group.groupId)
.setProtocolType(group.protocolType)}.asJava
)
.setThrottleTimeMs(requestThrottleMs)
))
else {
// 找出Clients对哪些Group有GROUP资源的DESCRIBE权限返回这些Group信息
val filteredGroups = groups.filter(group =&gt; authorize(request, DESCRIBE, GROUP, group.groupId))
sendResponseMaybeThrottle(request, requestThrottleMs =&gt;
new ListGroupsResponse(new ListGroupsResponseData()
.setErrorCode(error.code)
.setGroups(filteredGroups.map { group =&gt; new ListGroupsResponseData.ListedGroup()
.setGroupId(group.groupId)
.setProtocolType(group.protocolType)}.asJava
)
.setThrottleTimeMs(requestThrottleMs)
))
}
}
```
我用一张流程图,来说明一下这个执行逻辑:
<img src="https://static001.geekbang.org/resource/image/75/f3/7529b94b80cead7158be5a277e7ff4f3.jpg" alt="">
大体来看handleListGroupsRequest方法的实现逻辑非常简单。通过GroupCoordinator组件获取到所有的消费者组信息之后代码对这些Group进行了权限校验并最终根据校验结果决定给Clients返回哪些可见的消费者组。
## 总结
好了, 我们总结一下KafkaApis类的要点。如前所述我们重点学习了KafkaApis类的定义及其重要方法handle。下面这些关键知识点希望你能掌握。
- KafkaApis是Broker端所有功能的入口同时关联了超多的Kafka组件。它绝对是你学习源码的第一入口。面对庞大的源码工程如果你不知道从何下手那就先从KafkaApis.scala这个文件开始吧。
- handle方法封装了所有RPC请求的具体处理逻辑。每当社区新增RPC协议时增加对应的handle×××Request方法和case分支都是首要的。
- sendResponse系列方法负责发送Response给请求发送方。发送Response的逻辑是将Response对象放置在Processor线程的Response队列中然后交由Processor线程实现网络发送。
- authorize方法是请求处理前权限校验层的主要逻辑实现。你可以查看一下[官方文档](https://docs.confluent.io/current/kafka/authorization.html)了解一下当前都有哪些权限然后对照着具体的方法找出每类RPC协议都要求Clients端具备什么权限。
<img src="https://static001.geekbang.org/resource/image/9e/4c/9ebd3f25518e387a7a60200a8b62114c.jpg" alt="">
至此关于Kafka请求处理模块的内容我们就全部学完了。在这个模块中我们先从RequestChannel入手探讨了Kafka中请求队列的实现原理之后我花了两节课的时间重点介绍了SocketServer组件包括Acceptor线程、Processor线程等子组件的源码以及请求被处理的全流程。今天我们重点研究了KafkaApis类这个顶层的请求功能处理逻辑入口补齐了请求处理的最后一块“拼图”。我希望你能够把这个模块的课程多看几遍认真思考一下这里面的关键实现要点彻底搞明白Kafka网络通信的核心机制。
从下节课开始我们将进入鼎鼎有名的控制器Controller组件的源码学习。我会花5节课的时间带你深入学习Controller的方方面面敬请期待。
## 课后讨论
最后请思考这样一个问题如果一个Consumer要向Broker提交位移它应该具备什么权限你能说出KafkaApis中的哪段代码说明了所需的权限要求吗
欢迎你在留言区写下你的思考和答案,跟我交流讨论,也欢迎你把今天的内容分享给你的朋友。