聊一聊Kafka中的偏移量

偏移量管理

Kafka 1.0.2,__consumer_offsets主题中保存各个消费组的偏移量。
早期由zookeeper管理消费组的偏移量。
查询方法:
通过原生 kafka 提供的工具脚本进行查询。
工具脚本的位置与名称为 bin/kafka-consumer-groups.sh
首先运行脚本,查看帮助:


image.png

image.png

image.png

这里我们先编写⼀个生产者,消费者的例子:
我们先启动消费者,再启动生产者, 再通过 bin/kafka-consumer-groups.sh 进行消费偏移量查询,由于kafka 消费者记录group的消费偏移量有两种方式 :
1)kafka ⾃维护 (新)
2)zookpeer 维护 (旧) ,已经逐渐被废弃
所以 ,脚本只查看由broker维护的,由zookeeper维护的可以将 --bootstrap-server 换成 --zookeeper 即可。

查看有那些 group ID 正在进行消费:
[root@node11 ~]# kafka-consumer-groups.sh --bootstrap-server node1:9092 --list
Note: This will not show information about old Zookeeper-based consumers.
group
image.png

注意:

  1. 这⾥⾯是没有指定 topic,查看的是所有topic消费者的 group.id 的列表。
  2. 注意: 重名的 group.id 只会显示⼀次
查看指定group.id 的消费者消费情况
[root@node11 ~]# kafka-consumer-groups.sh --bootstrap-server node1:9092 --describe --
group group
Note: This will not show information about old Zookeeper-based consumers.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG 
CONSUMER-ID HOST 
CLIENT-ID
tp_demo_02 0 923 923 0 
consumer-1-6d88cc72-1bf1-4ad7-8c6c-060d26dc1c49 /192.168.100.1 
consumer-1
tp_demo_02 1 872 872 0 
consumer-1-6d88cc72-1bf1-4ad7-8c6c-060d26dc1c49 /192.168.100.1 
consumer-1
tp_demo_02 2 935 935 0 
consumer-1-6d88cc72-1bf1-4ad7-8c6c-060d26dc1c49 /192.168.100.1 
consumer-1
[root@node11 ~]#

如果消费者停⽌,查看偏移量信息:


image.png

将偏移量设置为最早的:


image.png

将偏移量设置为最新的:
image.png

分别将指定主题的指定分区的偏移量向前移动10个消息:


image.png
KafkaProducerSingleton类实现
package com.lagou.kafka.demo.producer;
import org.apache.kafka.clients.producer.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
public class KafkaProducerSingleton {
 private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaProducerSingleton.class);
 private static KafkaProducer<String, String> kafkaProducer;
 private Random random = new Random();
 private String topic;
 private int retry;
 private KafkaProducerSingleton() {
 }
 /**
 * 静态内部类
 *
 * @author tanjie
 */
 private static class LazyHandler {
 private static final KafkaProducerSingleton instance = new
KafkaProducerSingleton();
}
 /**
 * 单例模式,kafkaProducer是线程安全的,可以多线程共享⼀个实例
 * @return
 */
 public static final KafkaProducerSingleton getInstance() {
 return LazyHandler.instance;
 }
 /**
 * kafka⽣产者进⾏初始化
 *
 * @return KafkaProducer
 */
 public void init(String topic, int retry) {
 this.topic = topic;
 this.retry = retry;
 if (null == kafkaProducer) {
 Properties props = new Properties();
 props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"node1:9092");
 props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
 props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
 props.setProperty(ProducerConfig.ACKS_CONFIG, "1");
 kafkaProducer = new KafkaProducer<String, String>(props);
 }
 }
 /**
 * 通过kafkaProducer发送消息
 * @param message
 */
 public void sendKafkaMessage(final String message) {
 ProducerRecord<String, String> record = new ProducerRecord<String, String>(
 topic, random.nextInt(3), "", message);
 kafkaProducer.send(record, new Callback() {
 public void onCompletion(RecordMetadata recordMetadata,
 Exception exception) {
 if (null != exception) {
 LOGGER.error("kafka发送消息失败:" + exception.getMessage(),
exception);
 retryKakfaMessage(message);
 }
 }
 });
}
 /**
 * 当kafka消息发送失败后,重试
 *
 * @param retryMessage
 */
 private void retryKakfaMessage(final String retryMessage) {
 ProducerRecord<String, String> record = new ProducerRecord<String, String>(
 topic, random.nextInt(3), "", retryMessage);
 for (int i = 1; i <= retry; i++) {
 try {
 kafkaProducer.send(record);
 return;
 } catch (Exception e) {
 LOGGER.error("kafka发送消息失败:" + e.getMessage(), e);
 retryKakfaMessage(retryMessage);
 }
 }
 }
 /**
 * kafka实例销毁
 */
 public void close() {
 if (null != kafkaProducer) {
 kafkaProducer.close();
 }
 }
 public String getTopic() {
 return topic;
 }
 public void setTopic(String topic) {
 this.topic = topic;
 }
 public int getRetry() {
 return retry;
 }
 public void setRetry(int retry) {
 this.retry = retry;
 }
}
ProducerHandler类实现
package com.lagou.kafka.demo.producer;

public class ProducerHandler implements Runnable {
    private String message;

    public ProducerHandler(String message) {
        this.message = message;
    }
 
   @Override
    public void run() {
        KafkaProducerSingleton kafkaProducerSingleton =
        KafkaProducerSingleton.getInstance();
        kafkaProducerSingleton.init("tp_demo_02", 3);

        int i = 0;

        while (true) {
            try {
                System.out.println("当前线程:" + Thread.currentThread().getName()+ "\t获取的kafka实例:" + kafkaProducerSingleton);
                 kafkaProducerSingleton.sendKafkaMessage("发送消息: " + message + " " + (++i));
                 Thread.sleep(100);
             } catch (Exception e) {
          }
       }
    }
}

MyProducer类实现

package com.lagou.kafka.demo.producer;
public class MyProducer {
 public static void main(String[] args){
 Thread thread = new Thread(new ProducerHandler("hello lagou "));
 thread.start();
 }
}

KafkaConsumerAuto类实现

package com.lagou.kafka.demo.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class KafkaConsumerAuto {
    /**
    * kafka消费者不是线程安全的
    */
    private final KafkaConsumer<String, String> consumer;
    private ExecutorService executorService;

    public KafkaConsumerAuto() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");

        // 关闭⾃动提交
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put("auto.commit.interval.ms", "100");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");

        consumer = new KafkaConsumer<String, String>(props);
        // 订阅主题
        consumer.subscribe(Collections.singleton("tp_demo_02"));
    }

    public void execute() throws InterruptedException {
        executorService = Executors.newFixedThreadPool(2);
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(2_000);
            if (null != records) {
            executorService.submit(new ConsumerThreadAuto(records, consumer));
            }
           Thread.sleep(1000);
       }
    }

    public void shutdown() {
        try {
            if (consumer != null) {
                consumer.close();
            }

            if (executorService != null) {
                 executorService.shutdown();
            }

            if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
                System.out.println("关闭线程池超时。。。");
            }
        } catch (InterruptedException ex) {
             Thread.currentThread().interrupt();
        }
    }
}

ConsumerThreadAuto类实现

package com.lagou.kafka.demo.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class ConsumerThreadAuto implements Runnable {
    private ConsumerRecords<String, String> records;
    private KafkaConsumer<String, String> consumer;

    public ConsumerThreadAuto(ConsumerRecords<String, String> records,KafkaConsumer<String, String> consumer) {
        this.records = records;
        this.consumer = consumer;
    }

    @Override
    public void run() {
        for(ConsumerRecord<String,String> record : records){
           System.out.println("当前线程:" + Thread.currentThread()+ "\t主题:" 
           + record.topic()+ "\t偏移量:" + record.offset() + "\t分区:" + record.partition()
           + "\t获取的消息:" + record.value());
    }
   }
}
ConsumerAutoMain类实现
package com.lagou.kafka.demo.consumer;

public class ConsumerAutoMain {
    public static void main(String[] args) {
        KafkaConsumerAuto kafka_consumerAuto = new KafkaConsumerAuto();
        try {
            kafka_consumerAuto.execute();
            Thread.sleep(20000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            kafka_consumerAuto.shutdown();
        }
    }
}

偏移量如何保证消息的稳定性

Zookeeper不适合大批量的频繁写⼊操作。Kafka 1.0.2将consumer的位移信息保存在Kafka内部的topic中,即__consumer_offsets主题,并且默认提供了kafka_consumer_groups.sh脚本供⽤户查看consumer信息。

  1. 创建topic “tp_test_01”
[root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --create --topic
tp_test_01 --partitions 5 --replication-factor 1
  1. 使⽤kafka-console-producer.sh脚本⽣产消息
[root@node1 ~]# for i in `seq 100`; do echo "hello lagou $i" >> messages.txt; done
[root@node1 ~]# kafka-console-producer.sh --broker-list node1:9092 --topic tp_test_01 < messages.txt

由于默认没有指定key,所以根据round-robin⽅式,消息分布到不同的分区上。 (本例中⽣产了100条消息)

  1. 验证消息⽣产成功
[root@node1 ~]# kafka-console-producer.sh --broker-list node1:9092 --topic tp_test_01
< messages.txt
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>[root@node1 ~]# kafka-run-class.sh kafka.tools.GetOffsetShell --broker�list node1:9092 --topic tp_test_01 --time -1
tp_test_01:2:20
tp_test_01:4:20
tp_test_01:1:20
tp_test_01:3:20
tp_test_01:0:20
[root@node1 ~]#

结果输出表明100条消息全部⽣产成功!

  1. 创建⼀个console consumer group
[root@node1 ~]#kafka-console-consumer.sh --bootstrap-server node1:9092 --topic
tp_test_01 --from-beginning
  1. 获取该consumer group的group id(后⾯需要根据该id查询它的位移信息)
[root@node1 ~]# kafka-consumer-groups.sh --bootstrap-server node1:9092 --list

输出: console-consumer-49366 (记住这个id!)

  1. 查询__consumer_offsets topic所有内容
    注意:运⾏下⾯命令前先要在consumer.properties中设置exclude.internal.topics=false
[root@node1 ~]# kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server
node1:9092 --formatter
"kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --
consumer.config config/consumer.properties --from-beginning

默认情况下__consumer_offsets有50个分区,如果你的系统中consumer group也很多的话,那么这个命令的输出结果会很多。

  1. 计算指定consumer group在__consumer_offsets topic中分区信息
    这时候就⽤到了第5步获取的group.id(本例中是console-consumer-49366)。Kafka会使⽤下⾯公式计算该group
    位移保存在__consumer_offsets的哪个分区上:


    image.png

对应的分区=Math.abs("console-consumer-49366".hashCode()) % 50 = 19,即__consumer_offsets的分区19保存了这个consumer group的位移信息。

  1. 获取指定consumer group的位移信息
[root@node1 ~]# kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition
19 --broker-list node1:9092 --formatter
"kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"

下⾯是输出结果:

[console-consumer-49366,tp_test_01,3]::[OffsetMetadata[20,NO_METADATA],CommitTime
1596424702212,ExpirationTime 1596511102212] [console-consumer-49366,tp_test_01,4]::[OffsetMetadata[20,NO_METADATA],CommitTime
1596424702212,ExpirationTime 1596511102212] [console-consumer-49366,tp_test_01,0]::[OffsetMetadata[20,NO_METADATA],CommitTime
1596424702212,ExpirationTime 1596511102212] [console-consumer-49366,tp_test_01,1]::[OffsetMetadata[20,NO_METADATA],CommitTime
1596424702212,ExpirationTime 1596511102212] [console-consumer-49366,tp_test_01,2]::[OffsetMetadata[20,NO_METADATA],CommitTime
1596424702212,ExpirationTime 1596511102212] [console-consumer-49366,tp_test_01,3]::[OffsetMetadata[20,NO_METADATA],CommitTime
1596424707212,ExpirationTime 1596511107212] [console-consumer-49366,tp_test_01,4]::[OffsetMetadata[20,NO_METADATA],CommitTime
1596424707212,ExpirationTime 1596511107212] [console-consumer-49366,tp_test_01,0]::[OffsetMetadata[20,NO_METADATA],CommitTime
1596424707212,ExpirationTime 1596511107212]

上图可⻅,该consumer group果然保存在分区11上,且位移信息都是对的(这⾥的位移信息是已消费的位移,严格来说不是第3步中的位移。由于我的consumer已经消费完了所有的消息,所以这⾥的位移与第3步中的位移相同)。
另外,可以看到__consumer_offsets topic的每⼀⽇志项的格式都是:[Group, Topic, Partition]::[OffsetMetadata[Offset, Metadata], CommitTime, ExpirationTime]。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

友情链接更多精彩内容