摘要:Kafka中的网络模型就是基于主从Reactor多线程进行设计的。
本文分享自华为云社区《图解Kafka服务端网络模型》,作者:石臻臻的杂货铺 。
Kafka中的网络模型就是基于主从Reactor多线程进行设计的, 在整体讲述Kafka网络模型之前,我们现在按照源码中的相关类来讲解一下他们分别都是用来做什么的。
这个类是网络通信的核心类,它持有这Acceptor和 Processor对象。
这个是控制连接数配额的类,
涉及到的Broker配置有:
AbstractServerThread 类:这是Acceptor线程和Processor线程的抽象基类,它定义了一个抽象方法wakeup()
,主要是用来唤醒Acceptor 线程和 Processor 对应的Selector
的, 当然还有一些共用方法
Acceptor 线程类:继承自AbstractServerThread, 这是接收和创建外部 TCP 连接的线程。每个 SocketServer 实例一般会创建一个 Acceptor 线程(如果listeners
配置了多个就会创建多个Acceptor)。它的唯一目的就是创建连接,并将接收到的 SocketChannel(SocketChannel通道用于传输数据) 传递给下游的 Processor 线程处理,Processor主要是处理连接之后的事情,例如读写I/O。
涉及到的Broker配置有:
Processor 线程类:这是处理单个TCP 连接上所有请求的处理线程。每个Acceptor 实例创建若干个(num.network.threads)Processor 线程。Processor 线程负责将接收到的 SocketChannel(SocketChannel通道用于传输数据。), 注册读写事件,当数据传送过来的时候,会立即读取Request数据,通过解析之后, 然后将其添加到 RequestChannel 的 requestQueue 队列上,同时还负责将 Response 返还给 Request 发送方。
涉及到的Broker配置有:
简单画了一张两个类之间的关系图
既然两个都是可执行线程,那我们看看两个线程的run方法都做了哪些事情
def run(): Unit = {
//将serverChannel 注册到nioSelector上,并且对 Accept事件感兴趣:表示服务器监听到了客户连接,那么服务器可以接收这个连接了
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
try {
var currentProcessorIndex = 0
while (isRunning) {
try {
//返回感兴趣的事件数量 这里是感兴趣的是SelectionKey.OP_ACCEPT,监听到新的链接
val ready = nioSelector.select(500)
if (ready > 0) {
//获取所有就绪通道
val keys = nioSelector.selectedKeys()
val iter = keys.iterator()
//遍历所有就绪通道
while (iter.hasNext && isRunning) {
try {
val key = iter.next
iter.remove()
//只处理 Accept事件,其他的事件则抛出异常,ServerSocketChannel是 监听Tcp的链接通道
if (key.isAcceptable) {
//根据Key 拿到SocketChannle = serverSocketChannel.accept(),然后再遍历
accept(key).foreach { socketChannel =>
//将socketChannel分配给我们的 processor来处理,如果有多个socketChannel 则按照轮训分配的原则
//如果一个processor 中能够处理的newconnection 队列满了放不下了,则找下一个
// 如果所有的都放不下,则会一直循环直到有processor能够处理。
var retriesLeft = synchronized(processors.length)
var processor: Processor = null
do {
retriesLeft -= 1
//轮训每个processors来处理
processor = synchronized {
// adjust the index (if necessary) and retrieve the processor atomically for
// correct behaviour in case the number of processors is reduced dynamically
currentProcessorIndex = currentProcessorIndex % processors.length
processors(currentProcessorIndex)
}
currentProcessorIndex += 1
} while (!assignNewConnection(socketChannel, processor, retriesLeft == 0))
}
} else
throw new IllegalStateException("Unrecognized key state for acceptor thread.")
} catch {
case e: Throwable => error("Error while accepting connection", e)
}
}
}
}
catch {
省略
}
}
} finally {
省略
}
}
1、将ServerSocketChannel通道注册到nioSelector 上,并关注事件SelectionKey.OP_ACCEPT
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
2、while循环,持续阻塞监听事件,超时时间500ms
// 阻塞查询Selector是否有监听到新的事件
val ready = nioSelector.select(500)
// 如果有事件,则查询具体的事件和通道
if(ready>0>{
//获取所有就绪事件准备处理
val keys = nioSelector.selectedKeys()
}
3、遍历刚刚监听到的事件, 如果该SelectionKey不包含OP_ACCEPT(建立连接)事件,则抛出异常,通常不会出现这个异常。
Unrecognized key state for acceptor thread
4、如果SelectionKey包含OP_ACCEPT(建立连接)事件,则可以通过这个SelectionKey拿到serverSocketChannel,通过serverSocketChannel 拿到socketChannel,并且将SocketChannel设置为非阻塞模式。
val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
// 调用accept方法就可以拿到ScoketChannel了。
val socketChannel = serverSocketChannel.accept()
//设置为非阻塞模式 就可以在异步模式下调用connect(), read() 和write()了。
socketChannel.configureBlocking(false)
5、接下来,把上面拿到的SocketChannel以遍历的形式给Acceptor下面的Procesor, 让Processor来执行后面的处理。分配的体现形式是, 将拿到的SocketChannel保存在Processor中的newConnections阻塞队列中,这个newConnections上限是20,在代码里面写死了的,也就是说一个Processor同时最多只能处理20个连接, 那么所有的Processor能处理的最大连接就是Processor数量 * 20;如果你的连接请求并发度很高,可以尝试调大num.network.threads
6、最后,如果newConnections队列放入了一个新的SocketChannel,则会调用一下对应Processor实例的wakeup()方法。
override def run(): Unit = {
startupComplete()
try {
while (isRunning) {
try {
// setup any new connections that have been queued up
// 将之前监听到的TCP链接(暂时保存在newConnections中) 开始注册监听OP_READ事件到每个Processor的 KSelector选择器中。
configureNewConnections()
// register any new responses for writing
processNewResponses()
//在不阻塞的情况下对每个连接执行任何 I/O 操作。这包括完成连接、完成断开连接、启动新发送或在进行中的发送或接收上取得进展。
// 当此调用完成时,用户可以使用completedSends() 、 completedReceives() 、 connected() 、 disconnected()检查已完成的发送、接收、连接或断开连接。
poll()
// 把请求解析后放到 requestChannels 队列中,异步处理
processCompletedReceives()
//处理已经发送完成的请求
processCompletedSends()
processDisconnected()
closeExcessConnections()
} catch {
// We catch all the throwables here to prevent the processor thread from exiting. We do this because
// letting a processor exit might cause a bigger impact on the broker. This behavior might need to be
// reviewed if we see an exception that needs the entire broker to stop. Usually the exceptions thrown would
// be either associated with a specific socket channel or a bad request. These exceptions are caught and
// processed by the individual methods above which close the failing channel and continue processing other
// channels. So this catch block should only ever see ControlThrowables.
case e: Throwable => processException("Processor got uncaught exception.", e)
}
}
} finally {
debug(s"Closing selector - processor $id")
CoreUtils.swallow(closeAll(), this, Level.ERROR)
shutdownComplete()
}
}
这个类保存这所有的Processor,还有一个阻塞队列保存这待处理请求。这个队列最大长度由queued.max.requests控制,当待处理请求超过这个数值的时候网络就会阻塞
在这里插入图片描述
涉及到的Broker配置有:
具体Request的处理类, 所有的请求方法处理逻辑都放在这个里面。
KafkaRequestHandler的线程池,KafkaRequestHandler线程的数量由配置num.io.threads决定。
在这里插入图片描述
涉及到的Broker配置有:
请求处理类, 每个Handler都会去 requestChannel的requestQueue队列里面poll请求, 然后去处理,最终调用的处理方法是KafkaApis.handle()
这几个类之间的关系如下
在这里插入图片描述
在这里插入图片描述
数据面板是用来处理 Broker与Broker/Client之间的网络模型模块, 与之相对的是控制器面板。
控制器面板 是专门用于Controller与Broker之间的网络通信模块。
其实本质上他们都是一模一样的, 但是为了将Controller的通信和普通通信隔离,才有这么两个概念。
上面的网络通信模型就是以数据面板来分析的,因为本质是一样的, 只是有一些配置不一样。
那么,数据面板就不详细讲了,我们主要讲下控制器面板的不一样的地方。
控制器面板是用来专门处理 Controller相关请求的独立通信模块。
大家都知道,Controller是一个很重要的角色,基本上大部分协调整个集群的相关请求都跟它有关系, 比如创建Topic、删除Topic、分区副本重分配、等等。他们都很重要。
但是一般情况下数据面板的请求很多,如果因为请求过多而导致Controller相关请求被阻塞不能执行,那么可能会造成一些影响, 所以我们可以让Controller类的请求有一个单独的通信模块。
首先,要启用控制器面板,必须配置control.plane.listener.name. 并且这个监听器名称必须在listeners里面有配置
否则的话,是不会专用的控制器链接的EndPoint的。
例如:
## 所有的监听器
isteners = INTERNAL://192.1.1.8:9092, EXTERNAL://10.1.1.5:9093, CONTROLLER://192.1.1.8:9094
listener.security.protocol.map = INTERNAL: PLAINTEXT, EXTERNAL:SSL, CONTROLLER:SSL
control.plane.listener.name = CONTROLLER
在启动时,代理将开始使用安全协议“SSL”监听“192.1.1.8:9094”。
在控制器端,当它通过 zookeeper 发现代理发布的端点时,它将使用 control.plane.listener.name 找到端点,它将用于建立与代理的连接。
涉及到的Broker配置有:
上面我们主要分析了一下, Kafka中的网络通信模型, 那么聪明的你应该肯定能够看的出来,它是使用线程模型中的 Reactor模式来实现的。
该模块详细请参考Reactor 模型
Reactor 模式,是指通过一个或多个输入同时传递给服务处理器的服务请求的事件驱动处理模式。
服务端程序处理传入多路请求,并将它们同步分派给请求对应的处理线程,Reactor 模式也叫 Dispatcher 模式。
即 I/O 多路复用统一监听事件,收到事件后分发(Dispatch 给某进程),是编写高性能网络服务器的必备技术之一。
根据 Reactor 的数量和处理资源池线程的数量不同,有 3 种典型的实现:
我们主要了解一下 主从Reactor 多线程
该图来源于网络
针对单 Reactor 多线程模型中,Reactor 在单线程中运行,高并发场景下容易成为性能瓶颈,可以让 Reactor 在多线程中运行。
方案说明:
更详细的介绍可以看 Reactor 模型
(1)Kafka的网络模型使用了Reactor模式的哪种实现方式?
答案: 3 。 使用了主从Reactor多线程的实现方式.
在这里插入图片描述
MainReactor(Acceptor)只负责监听OP_ACCEPT事件, 监听到之后把SocketChannel 传递给 SubReactor(Processor), 每个Processor都有自己的Selector。SubReactor会监听并处理其他的事件,并最终把具体的请求传递给KafkaRequestHandlerPool。
很典型的主从Reactor多线程模式。
(2)什么是ControllerPlane(控制器面板),什么是DataPlane(数据面板)?
控制器面板: 主要处理控制器类的的请求
数据面板: 主要处理数据类的请求。
让他们隔离,互不影响,比如说普通的请求太多,导致了阻塞, 那么Controller相关的请求也可能被阻塞了,所以让他们隔离,不会互相影响。
但是默认情况下, ControllerPlane是没有设置的,也就是Controller相关的请求还是走的DataPlane。 想要隔离的话必须设置control.plane.listener.name .
手机扫一扫
移动阅读更方便
你可能感兴趣的文章