1. 背景
系列文章
- Kafka系列《一》-- 生产者Producer流程及Partition详解
- Kafka系列《二》-- 生产者Producer中的请求及源码详解
- Kafka系列《三》-- 生产者Producer中的幂等性
- Kafka系列《四》-- 生产者Producer中的事务性
- Kafka系列《五》-- 消费者Consumer流程概览
- Kafka系列《六》-- 消费者Consumer中的消费方案分配算法解析
- Kafka系列《七》-- 消费者Consumer中的重平衡解析
- Kafka系列《八》-- 消费者Consumer中的消费过程解析
- Kafka系列《九》-- 消费者Consumer中的消费session会话和transaction事务
一行简单的初始化代码
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
其实就已经完成了producer的初始化,理解producer的代码最重要的有两个地方
Metadata:原数据更新,主要包含brokens list 以及topics list
NIO:伴随着
producer的daemon线程,处理所有的网络请求
2. 整体流程
NIO的使用使得我们必须使用循环的思维来理解代码,也就是一次循环完不成的事情,那就再下次循环再做;否则很容器被代码绕进去了。
下面的这个流程比较细节,需要对源码有一点了解。
先梳理下整个producer的流程:
初始化Metadata,并表明metadata需要更新,设置
needFullUpdate = true-
启动NIO daemon线程,线程主要干三件事
- 检查metadata是否需要更新
- 运行NIO的select方法,处理监听的事件
- selector方法返回后(超时返回或者有事件返回),处理事件,可能是连接事件、可能是写数据事件、可能是读数据事件
首先是检查metadata是否需要更新,主要检查的标志位有两项,一是设置了
needFullUpdate = true;二是metadata的缓存时间到期了,默认是5分钟由于metadata启动时候就设置了
needFullUpdate = true,因此NIO线程会检测到需要进行metadata更新需要更新metadata时,会随机从broken list中找一个节点发送
METADATA请求;而节点能够正式发送请求的前提是,缓存在producer的节点状态必须是Ready,所有节点的初始状态都是空,因此在发送METADATA请求之前必须先初始化节点状态;初始化节点状态其实就是与broken建立连接,对应的在NIO编程的创建SocketChannel通道,并注册OP_CONNECT事件到NIO;等待NIO从selector中监听到OP_CONNECT事件,表示成功连上了这个broken,此时就会移除OP_CONNECT事件,并注册OP_READ事件到通道;连上这个broken后还需要和broken协商API版本号,确保客户端和broken都能够支持这个版本号;协商的过程其实也很简单,就是producer使用最新的版本号发送API_VERSIONS请求给broken,如果broken支持这个最新的版本号,那么就会正常返回broken支持的所有请求版本,如果broken不支持这个最新的版本号,则会返回错误UNSUPPORTED_VERSION,并带上自己支持的最大版本号,此时producer收到这个错误后,使用broken返回的最大版本号再发送一次API_VERSIONS请求给broken,此时就完成了版本协商;本地缓存的节点状态也会变成Ready当节点状态变成
Ready之后,才会发送METADATA请求,需要注意的是,NIO编程环境下,发送请求其实就是往通道中注册OP_WRITE事件,这样在通道可写(通道缓冲区没满就是可写)时NIO就会从select方法返回,然后将METADATA的请求内容写入通道,全部写入完成后,再移除OP_WRITE事件,由于连接完成时就已经注册了OP_READ事件,因此broken响应后,NIO的select方法也会监听到并成功返回,将broken返回的metadata信息写入到producer缓存中,包括broken list 和 topic list;其中如果producer还没往任何topic发送消息时, topic list 将会返回空列表。这个NIO的处理流程对于所有producer的请求都是一样的处理方式,包括上面的API_VERSIONS请求。至此,producer拥有了metadata缓存,可以开始往topic发送消息了;
producer通过
send()方法往topic中发送消息,如果metadata缓存没刷新之前,send方法也会一直wait等待metadata更新完成;metadata更新完成后,再通过notify方法唤醒send方法。因为metadata的更新是再NIO线程里进行的,而send方法一般是我们main线程调用的,不同线程之间通过wait/notify方式通信metadata更新完成后,send线程被唤醒,开始发送消息;对于没有指定手动指定partition分区号的,以及没有自定义实现
Partitioner接口分区器的,则会使用producer自带的分区方式;否则使用你自己指定的分区号或者分区器;自带的分区方式还是比较复杂的,下文单独详解send方法并不会真的进行消息发送,而是将消息先缓存再producer本地,然后再通过NIO线程发送;这样可以避免生产者写入速度很快时,频繁的唤醒NIO线程处理;send方法发送的消息都会先缓存在batches这个Map中,Map的key是topic的分区号,value是send方法写入到这个topic分区的所有消息
public final ConcurrentMap<Integer /partition/, Deque<ProducerBatch>> batches = new CopyOnWriteMap<>();
-
send方法写入缓存的流程:
先预估当前消息的大小,producer的消息格式由几部分组成:
61字节的头部+最长21字节的消息头部+消息KEY的长度+消息KEY的字节内容+消息VALUE的长度+消息VALUE的字节内容;通过这个计算可以预估出这条消息最大的长度,可变部分主要在21字节的消息头部上,21是这部分的最大值分配ByteBuffer,分配大小最小为producer的批次大小,默认为16k;如果消息小于16k也是直接分配16k的ByteBuffer大小,如果消息超过16k的,那么直接分配消息大小的ByteBuffer;这样预分配16k大小不仅可以避免频繁的分配小内存,同时大小为16k的ByteBuffer还会在使用完成后重新利用,即下次再需要16k大小的ByteBuffer时,也不需要重新分配了,直接重新使用之前分配的16k大小的ByteBuffer;
使用ByteBuffer:将消息写入到ByteBuffer中,如果16k的ByteBuffer还没写满,那么下次写入消息时,消息仍会被写入到这个ByteBuffer中,直到这个ByteBuffer无法容纳更多消息时,将会创建新的ByteBuffer去容纳更多的消息;而每次新创建的ByteBuffer都会作为一个批次被一起处理;
-
写入ByteBuffer:Kafka的消息都有固定的格式,因此写入的时候也是需要按照固定的格式写入,由于Kafka消息都有61字节的固定头部,因此会先将ByteBuffer的position置为61,从这个位置开始写入消息内容,而前61个字节在真正发送时才会写入;先从position=61开始写入:
计算消息大小,消息包括offset、timestamp、key、value,计算得到消息大小为多少字节,然后将这个大小先写入到ByteBuffer
写入一字节的attributes,默认为0
写入timestamp,默认为0
写入偏移量offset,从0开始递增
写入key的长度,key不存在则长度为-1;key存在再接着写入key的字节数组
写入value的长度,value不存在则长度为-1;value存在再接着写入value的字节数组
至此,才完成了一个消息的写入;send方法再次发送消息时,如果这个ByteBuffer还能够容纳新的消息,则会接着写入这个ByteBuffer,直到这个ByteBuffer不能容纳新的消息,再从头开始写入新的ByteBuffer;因为写ByteBuffer的时候最先写入的是消息大小,所以也不怕多个消息在同一个ByteBuffer里无法处理了,因此只需要在读取ByteBuffer时,先读出消息的大小,然后再读出消息大小个字节数,那就是一个完整的消息了。
如果一个批次写满了或者创建了新的ByteBuffer,那么会主动唤醒NIO线程(调用NIO的wakeup()方法),让NIO线程通过轮询开始发送缓存的消息;否则就等待NIO线程自动超时唤醒后发送缓存的消息
NIO线程被唤醒后,就会开始检测上面的batches这个Map,首先遍历Map找到需要发送消息的broken,因为每个partition分区都会有一个leader partition(这部分信息也在metadata中),这个leader partition所在的broken即为需要发送消息的broken;然后以broken维度去统计需要发送到这个broken的所有消息;首先从metadata中找到这个broken上的所有topic partition,如果这个topic partition存在待发送的数据,即batches这个Map中包含这个partition的批次数据,不存在待发送数据的直接忽略即可,存在待发送数据的需要先判断发送到这个broken的请求大小,默认不能超过1M,超过1M的请求只会在只有一个批次数据时发送,否则将数据留到下一次请求在处理;对于没超过1M的请求,会拼接当前broken的多个topic partition的批次数据
每个topic partition的队首批次在拼接之前,会先写入上面空出来的61字节固定头部,即先将position置为0,然后开始写入固定头部,主要包括:
baseOffset(8字节,默认0)、消息大小(4字节)、leader epoch (4字节)、Magic版本(1字节)、CRC容错(4字节)等等按照broken进行请求拼接后,待发送的消息按照Map进行组织,其中key是broken id,value是将要发送到这个broken的消息列表
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(metadataSnapshot, result.readyNodes, this.maxRequestSize, now);
- 将每个broken上的消息通过
PRODUCE请求发送给对应的broken,请求的方式和上面METADATA请求过程基本上一致;再深究一点,在构建请求时,都会按照这个格式构建请求内容,其中主要包括开始的4字节、请求Header(包括请求类型、版本号信息)、请求体(消息主体);其中最开始的4字节存储的是整个请求消息的大小,这个数据对于NIO编程中处理TCP粘包/拆包至关重要,有了这个大小,我们才能够知道这个消息的实际大小,不管是粘包/拆包问题,就都能够迎刃而解了
SendBuilder builder = new SendBuilder(messageSize.sizeExcludingZeroCopy() + 4);
builder.writeInt(messageSize.totalSize());
header.write(builder, serializationCache, headerVersion);
apiMessage.write(builder, serializationCache, apiVersion);
- 同时再深究一点,处理响应时,也是先读取4字节的大小,如果不能够正常读取到4字节的大小,则需要等待下一次select触发,然后继续读;读到4字节的消息大小后,则直接分配这个大小的ByteBuffer,然后接着读取这个大小的字节数,即一直读取到ByteBuffer满,就表示读完了一次响应数据。这里发送和读取的消息大小4字节用的是魔法值,对NIO不熟悉的话很难把这两个数据联想到一起,而只有联想到一起才知道这个4字节解决的是什么问题。
public NetworkReceive(int maxSize, String source, MemoryPool memoryPool) {
this.source = source;
this.size = ByteBuffer.allocate(4);
this.buffer = null;
this.maxSize = maxSize;
this.memoryPool = memoryPool;
}
- 至此,才算了完成了消息的发送!
以上,producer的整个发送流程都清晰了,再对照看下源码,就基本上没有看不懂的地方了。
3. producer自带的分区方式
上面的producer整体流程中,没有深究在我们没有指定分区号,也没有自定义实现Partitioner接口分区器的时候,producer是怎么为我们指定分区号的呢?
3.1 简单的分区
通过METADATA请求,我们可以清晰的知道每个topic有多少个partition,当producer首次通过send方法发送数据时,会随机挑选一个partition作为分区号,即通过多线程环境下效率更高的ThreadLocalRandom获取2的32次方整数里的随机数
ThreadLocalRandom.current().nextInt() % partitions.size()
然后接下来的send发送的消息都会到这个partition,直达一个批次被写满(不能容纳新的消息了);当批次不能容纳新的消息时,则会重新随机挑选一个partition作为新的分区号继续写入
3.2 自适应分区
当producer写入数据很快的时候,一直到所有分区都有数据等待发送了,这时候producer会自动开始自适应的选择分区;那怎么个自适应法呢?
其实就是根据当前每个分区写入的批次数量,让写入批次更少的分区号再后续继续写入数据时更容易被选中,producer是通过均匀分布来实现的,至此,你是否还记得概率论中的均匀分布是什么呢?
举个例子说明:
假设topic有3个分区,现在每个分区写入的批次数量为:
1 4 3
它的均匀分布可以通过下面的表达式计算得到,先找到最大的批次数量然后加1,即为5l
然后计算每个分区距离5还差多少个批次
4 1 2
然后按照总和(4+1+2=7)进行划分,首元素不动,后面的元素为前一个元素+差值;转成算法实现就下面的代码了
4 5 7
queueSizes[0] = maxSizePlus1 - queueSizes[0];
for (int i = 1; i < length; i++) {
queueSizes[i] = maxSizePlus1 - queueSizes[i] + queueSizes[i - 1];
}
然后可以把这个理解为[0,7)的均匀分布,相当于在坐标系上,有0~6个点(从0开始编号);坐标为0,1,2,3对应的是partition 1;坐标4对应的是partition 2;坐标5,6对应的是partition 3;这样在选择随机数的时候就能让批次更少的partition有更大的机会被选中;对应的代码实现
int weightedRandom = random % cumulativeFrequencyTable[partitionLoadStats.length - 1];
int searchResult = Arrays.binarySearch(cumulativeFrequencyTable, 0, partitionLoadStats.length, weightedRandom);
int partitionIndex = Math.abs(searchResult + 1);
取一个0~6之间的随机数,然后通过二分查找找这个随机数在数组(4,5,7)中的位置,二分查找的结果是元素下标或者(负插入位置-1),二分查找结果+1即为对应partition;比如随机数为3,那么二分查找的结果为-1,二分查找结果+1后为0,那么对应的就是partition分区为1.
想要了解更多关于producer请求和源码的直接前往系列文章学习
