Kafka系列《一》-- 生产者Producer流程及Partition详解

1. 背景

系列文章

一行简单的初始化代码

KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

其实就已经完成了producer的初始化,理解producer的代码最重要的有两个地方

  • Metadata:原数据更新,主要包含brokens list 以及topics list

  • NIO:伴随着producer的daemon线程,处理所有的网络请求

2. 整体流程

NIO的使用使得我们必须使用循环的思维来理解代码,也就是一次循环完不成的事情,那就再下次循环再做;否则很容器被代码绕进去了。

下面的这个流程比较细节,需要对源码有一点了解。

先梳理下整个producer的流程:

  • 初始化Metadata,并表明metadata需要更新,设置needFullUpdate = true

  • 启动NIO daemon线程,线程主要干三件事

    • 检查metadata是否需要更新
    • 运行NIO的select方法,处理监听的事件
    • selector方法返回后(超时返回或者有事件返回),处理事件,可能是连接事件、可能是写数据事件、可能是读数据事件
  • 首先是检查metadata是否需要更新,主要检查的标志位有两项,一是设置了needFullUpdate = true;二是metadata的缓存时间到期了,默认是5分钟

  • 由于metadata启动时候就设置了needFullUpdate = true,因此NIO线程会检测到需要进行metadata更新

  • 需要更新metadata时,会随机从broken list中找一个节点发送METADATA请求;而节点能够正式发送请求的前提是,缓存在producer的节点状态必须是Ready,所有节点的初始状态都是空,因此在发送METADATA请求之前必须先初始化节点状态;初始化节点状态其实就是与broken建立连接,对应的在NIO编程的创建SocketChannel通道,并注册OP_CONNECT事件到NIO;等待NIO从selector中监听到OP_CONNECT事件,表示成功连上了这个broken,此时就会移除OP_CONNECT事件,并注册OP_READ事件到通道;连上这个broken后还需要和broken协商API版本号,确保客户端和broken都能够支持这个版本号;协商的过程其实也很简单,就是producer使用最新的版本号发送API_VERSIONS请求给broken,如果broken支持这个最新的版本号,那么就会正常返回broken支持的所有请求版本,如果broken不支持这个最新的版本号,则会返回错误UNSUPPORTED_VERSION,并带上自己支持的最大版本号,此时producer收到这个错误后,使用broken返回的最大版本号再发送一次API_VERSIONS请求给broken,此时就完成了版本协商;本地缓存的节点状态也会变成Ready

  • 当节点状态变成Ready之后,才会发送METADATA请求,需要注意的是,NIO编程环境下,发送请求其实就是往通道中注册OP_WRITE事件,这样在通道可写(通道缓冲区没满就是可写)时NIO就会从select方法返回,然后将METADATA的请求内容写入通道,全部写入完成后,再移除OP_WRITE事件,由于连接完成时就已经注册了OP_READ事件,因此broken响应后,NIO的select方法也会监听到并成功返回,将broken返回的metadata信息写入到producer缓存中,包括broken list 和 topic list;其中如果producer还没往任何topic发送消息时, topic list 将会返回空列表。这个NIO的处理流程对于所有producer的请求都是一样的处理方式,包括上面的API_VERSIONS请求。

  • 至此,producer拥有了metadata缓存,可以开始往topic发送消息了;

  • producer通过send()方法往topic中发送消息,如果metadata缓存没刷新之前,send方法也会一直wait等待metadata更新完成;metadata更新完成后,再通过notify方法唤醒send方法。因为metadata的更新是再NIO线程里进行的,而send方法一般是我们main线程调用的,不同线程之间通过wait/notify方式通信

  • metadata更新完成后,send线程被唤醒,开始发送消息;对于没有指定手动指定partition分区号的,以及没有自定义实现Partitioner接口分区器的,则会使用producer自带的分区方式;否则使用你自己指定的分区号或者分区器;自带的分区方式还是比较复杂的,下文单独详解

  • send方法并不会真的进行消息发送,而是将消息先缓存再producer本地,然后再通过NIO线程发送;这样可以避免生产者写入速度很快时,频繁的唤醒NIO线程处理;send方法发送的消息都会先缓存在batches这个Map中,Map的key是topic的分区号,value是send方法写入到这个topic分区的所有消息

public final ConcurrentMap<Integer /partition/, Deque<ProducerBatch>> batches = new CopyOnWriteMap<>();

  • send方法写入缓存的流程:

    • 先预估当前消息的大小,producer的消息格式由几部分组成:61字节的头部+最长21字节的消息头部+消息KEY的长度+消息KEY的字节内容+消息VALUE的长度+消息VALUE的字节内容;通过这个计算可以预估出这条消息最大的长度,可变部分主要在21字节的消息头部上,21是这部分的最大值

    • 分配ByteBuffer,分配大小最小为producer的批次大小,默认为16k;如果消息小于16k也是直接分配16k的ByteBuffer大小,如果消息超过16k的,那么直接分配消息大小的ByteBuffer;这样预分配16k大小不仅可以避免频繁的分配小内存,同时大小为16k的ByteBuffer还会在使用完成后重新利用,即下次再需要16k大小的ByteBuffer时,也不需要重新分配了,直接重新使用之前分配的16k大小的ByteBuffer;

    • 使用ByteBuffer:将消息写入到ByteBuffer中,如果16k的ByteBuffer还没写满,那么下次写入消息时,消息仍会被写入到这个ByteBuffer中,直到这个ByteBuffer无法容纳更多消息时,将会创建新的ByteBuffer去容纳更多的消息;而每次新创建的ByteBuffer都会作为一个批次被一起处理;

    • 写入ByteBuffer:Kafka的消息都有固定的格式,因此写入的时候也是需要按照固定的格式写入,由于Kafka消息都有61字节的固定头部,因此会先将ByteBuffer的position置为61,从这个位置开始写入消息内容,而前61个字节在真正发送时才会写入;先从position=61开始写入:

      • 计算消息大小,消息包括offset、timestamp、key、value,计算得到消息大小为多少字节,然后将这个大小先写入到ByteBuffer

      • 写入一字节的attributes,默认为0

      • 写入timestamp,默认为0

      • 写入偏移量offset,从0开始递增

      • 写入key的长度,key不存在则长度为-1;key存在再接着写入key的字节数组

      • 写入value的长度,value不存在则长度为-1;value存在再接着写入value的字节数组

  • 至此,才完成了一个消息的写入;send方法再次发送消息时,如果这个ByteBuffer还能够容纳新的消息,则会接着写入这个ByteBuffer,直到这个ByteBuffer不能容纳新的消息,再从头开始写入新的ByteBuffer;因为写ByteBuffer的时候最先写入的是消息大小,所以也不怕多个消息在同一个ByteBuffer里无法处理了,因此只需要在读取ByteBuffer时,先读出消息的大小,然后再读出消息大小个字节数,那就是一个完整的消息了。

  • 如果一个批次写满了或者创建了新的ByteBuffer,那么会主动唤醒NIO线程(调用NIO的wakeup()方法),让NIO线程通过轮询开始发送缓存的消息;否则就等待NIO线程自动超时唤醒后发送缓存的消息

  • NIO线程被唤醒后,就会开始检测上面的batches这个Map,首先遍历Map找到需要发送消息的broken,因为每个partition分区都会有一个leader partition(这部分信息也在metadata中),这个leader partition所在的broken即为需要发送消息的broken;然后以broken维度去统计需要发送到这个broken的所有消息;首先从metadata中找到这个broken上的所有topic partition,如果这个topic partition存在待发送的数据,即batches这个Map中包含这个partition的批次数据,不存在待发送数据的直接忽略即可,存在待发送数据的需要先判断发送到这个broken的请求大小,默认不能超过1M,超过1M的请求只会在只有一个批次数据时发送,否则将数据留到下一次请求在处理;对于没超过1M的请求,会拼接当前broken的多个topic partition的批次数据

  • 每个topic partition的队首批次在拼接之前,会先写入上面空出来的61字节固定头部,即先将position置为0,然后开始写入固定头部,主要包括:baseOffset(8字节,默认0)、消息大小(4字节)、leader epoch (4字节)、Magic版本(1字节)、CRC容错(4字节)等等

  • 按照broken进行请求拼接后,待发送的消息按照Map进行组织,其中key是broken id,value是将要发送到这个broken的消息列表

Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(metadataSnapshot, result.readyNodes, this.maxRequestSize, now);

  • 将每个broken上的消息通过PRODUCE请求发送给对应的broken,请求的方式和上面METADATA请求过程基本上一致;再深究一点,在构建请求时,都会按照这个格式构建请求内容,其中主要包括开始的4字节、请求Header(包括请求类型、版本号信息)、请求体(消息主体);其中最开始的4字节存储的是整个请求消息的大小,这个数据对于NIO编程中处理TCP粘包/拆包至关重要,有了这个大小,我们才能够知道这个消息的实际大小,不管是粘包/拆包问题,就都能够迎刃而解了
SendBuilder builder = new SendBuilder(messageSize.sizeExcludingZeroCopy() + 4);
builder.writeInt(messageSize.totalSize());
header.write(builder, serializationCache, headerVersion);
apiMessage.write(builder, serializationCache, apiVersion);
  • 同时再深究一点,处理响应时,也是先读取4字节的大小,如果不能够正常读取到4字节的大小,则需要等待下一次select触发,然后继续读;读到4字节的消息大小后,则直接分配这个大小的ByteBuffer,然后接着读取这个大小的字节数,即一直读取到ByteBuffer满,就表示读完了一次响应数据。这里发送和读取的消息大小4字节用的是魔法值,对NIO不熟悉的话很难把这两个数据联想到一起,而只有联想到一起才知道这个4字节解决的是什么问题。
public NetworkReceive(int maxSize, String source, MemoryPool memoryPool) {
        this.source = source;
        this.size = ByteBuffer.allocate(4);
        this.buffer = null;
        this.maxSize = maxSize;
        this.memoryPool = memoryPool;
    }
  • 至此,才算了完成了消息的发送!

以上,producer的整个发送流程都清晰了,再对照看下源码,就基本上没有看不懂的地方了。

3. producer自带的分区方式

上面的producer整体流程中,没有深究在我们没有指定分区号,也没有自定义实现Partitioner接口分区器的时候,producer是怎么为我们指定分区号的呢?

3.1 简单的分区

通过METADATA请求,我们可以清晰的知道每个topic有多少个partition,当producer首次通过send方法发送数据时,会随机挑选一个partition作为分区号,即通过多线程环境下效率更高的ThreadLocalRandom获取2的32次方整数里的随机数

ThreadLocalRandom.current().nextInt() % partitions.size()

然后接下来的send发送的消息都会到这个partition,直达一个批次被写满(不能容纳新的消息了);当批次不能容纳新的消息时,则会重新随机挑选一个partition作为新的分区号继续写入

3.2 自适应分区

当producer写入数据很快的时候,一直到所有分区都有数据等待发送了,这时候producer会自动开始自适应的选择分区;那怎么个自适应法呢?

其实就是根据当前每个分区写入的批次数量,让写入批次更少的分区号再后续继续写入数据时更容易被选中,producer是通过均匀分布来实现的,至此,你是否还记得概率论中的均匀分布是什么呢?

举个例子说明:

假设topic有3个分区,现在每个分区写入的批次数量为:
1 4 3

它的均匀分布可以通过下面的表达式计算得到,先找到最大的批次数量然后加1,即为5l

然后计算每个分区距离5还差多少个批次
4 1 2

然后按照总和(4+1+2=7)进行划分,首元素不动,后面的元素为前一个元素+差值;转成算法实现就下面的代码了
4 5 7

queueSizes[0] = maxSizePlus1 - queueSizes[0];
        for (int i = 1; i < length; i++) {
            queueSizes[i] = maxSizePlus1 - queueSizes[i] + queueSizes[i - 1];
        }

然后可以把这个理解为[0,7)的均匀分布,相当于在坐标系上,有0~6个点(从0开始编号);坐标为0,1,2,3对应的是partition 1;坐标4对应的是partition 2;坐标5,6对应的是partition 3;这样在选择随机数的时候就能让批次更少的partition有更大的机会被选中;对应的代码实现

int weightedRandom = random % cumulativeFrequencyTable[partitionLoadStats.length - 1];

int searchResult = Arrays.binarySearch(cumulativeFrequencyTable, 0, partitionLoadStats.length, weightedRandom);

int partitionIndex = Math.abs(searchResult + 1);

取一个0~6之间的随机数,然后通过二分查找找这个随机数在数组(4,5,7)中的位置,二分查找的结果是元素下标或者(负插入位置-1),二分查找结果+1即为对应partition;比如随机数为3,那么二分查找的结果为-1,二分查找结果+1后为0,那么对应的就是partition分区为1.

想要了解更多关于producer请求和源码的直接前往系列文章学习

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容