偏移量管理
Kafka 1.0.2,__consumer_offsets主题中保存各个消费组的偏移量。
早期由zookeeper管理消费组的偏移量。
查询方法:
通过原生 kafka 提供的工具脚本进行查询。
工具脚本的位置与名称为 bin/kafka-consumer-groups.sh
首先运行脚本,查看帮助:



这里我们先编写⼀个生产者,消费者的例子:
我们先启动消费者,再启动生产者, 再通过 bin/kafka-consumer-groups.sh 进行消费偏移量查询,由于kafka 消费者记录group的消费偏移量有两种方式 :
1)kafka ⾃维护 (新)
2)zookpeer 维护 (旧) ,已经逐渐被废弃
所以 ,脚本只查看由broker维护的,由zookeeper维护的可以将 --bootstrap-server 换成 --zookeeper 即可。
查看有那些 group ID 正在进行消费:
[root@node11 ~]# kafka-consumer-groups.sh --bootstrap-server node1:9092 --list
Note: This will not show information about old Zookeeper-based consumers.
group

注意:
- 这⾥⾯是没有指定 topic,查看的是所有topic消费者的 group.id 的列表。
- 注意: 重名的 group.id 只会显示⼀次
查看指定group.id 的消费者消费情况
[root@node11 ~]# kafka-consumer-groups.sh --bootstrap-server node1:9092 --describe --
group group
Note: This will not show information about old Zookeeper-based consumers.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
CONSUMER-ID HOST
CLIENT-ID
tp_demo_02 0 923 923 0
consumer-1-6d88cc72-1bf1-4ad7-8c6c-060d26dc1c49 /192.168.100.1
consumer-1
tp_demo_02 1 872 872 0
consumer-1-6d88cc72-1bf1-4ad7-8c6c-060d26dc1c49 /192.168.100.1
consumer-1
tp_demo_02 2 935 935 0
consumer-1-6d88cc72-1bf1-4ad7-8c6c-060d26dc1c49 /192.168.100.1
consumer-1
[root@node11 ~]#
如果消费者停⽌,查看偏移量信息:

将偏移量设置为最早的:

将偏移量设置为最新的:

分别将指定主题的指定分区的偏移量向前移动10个消息:

KafkaProducerSingleton类实现
package com.lagou.kafka.demo.producer;
import org.apache.kafka.clients.producer.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
public class KafkaProducerSingleton {
private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaProducerSingleton.class);
private static KafkaProducer<String, String> kafkaProducer;
private Random random = new Random();
private String topic;
private int retry;
private KafkaProducerSingleton() {
}
/**
* 静态内部类
*
* @author tanjie
*/
private static class LazyHandler {
private static final KafkaProducerSingleton instance = new
KafkaProducerSingleton();
}
/**
* 单例模式,kafkaProducer是线程安全的,可以多线程共享⼀个实例
* @return
*/
public static final KafkaProducerSingleton getInstance() {
return LazyHandler.instance;
}
/**
* kafka⽣产者进⾏初始化
*
* @return KafkaProducer
*/
public void init(String topic, int retry) {
this.topic = topic;
this.retry = retry;
if (null == kafkaProducer) {
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"node1:9092");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.setProperty(ProducerConfig.ACKS_CONFIG, "1");
kafkaProducer = new KafkaProducer<String, String>(props);
}
}
/**
* 通过kafkaProducer发送消息
* @param message
*/
public void sendKafkaMessage(final String message) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>(
topic, random.nextInt(3), "", message);
kafkaProducer.send(record, new Callback() {
public void onCompletion(RecordMetadata recordMetadata,
Exception exception) {
if (null != exception) {
LOGGER.error("kafka发送消息失败:" + exception.getMessage(),
exception);
retryKakfaMessage(message);
}
}
});
}
/**
* 当kafka消息发送失败后,重试
*
* @param retryMessage
*/
private void retryKakfaMessage(final String retryMessage) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>(
topic, random.nextInt(3), "", retryMessage);
for (int i = 1; i <= retry; i++) {
try {
kafkaProducer.send(record);
return;
} catch (Exception e) {
LOGGER.error("kafka发送消息失败:" + e.getMessage(), e);
retryKakfaMessage(retryMessage);
}
}
}
/**
* kafka实例销毁
*/
public void close() {
if (null != kafkaProducer) {
kafkaProducer.close();
}
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public int getRetry() {
return retry;
}
public void setRetry(int retry) {
this.retry = retry;
}
}
ProducerHandler类实现
package com.lagou.kafka.demo.producer;
public class ProducerHandler implements Runnable {
private String message;
public ProducerHandler(String message) {
this.message = message;
}
@Override
public void run() {
KafkaProducerSingleton kafkaProducerSingleton =
KafkaProducerSingleton.getInstance();
kafkaProducerSingleton.init("tp_demo_02", 3);
int i = 0;
while (true) {
try {
System.out.println("当前线程:" + Thread.currentThread().getName()+ "\t获取的kafka实例:" + kafkaProducerSingleton);
kafkaProducerSingleton.sendKafkaMessage("发送消息: " + message + " " + (++i));
Thread.sleep(100);
} catch (Exception e) {
}
}
}
}
MyProducer类实现
package com.lagou.kafka.demo.producer;
public class MyProducer {
public static void main(String[] args){
Thread thread = new Thread(new ProducerHandler("hello lagou "));
thread.start();
}
}
KafkaConsumerAuto类实现
package com.lagou.kafka.demo.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class KafkaConsumerAuto {
/**
* kafka消费者不是线程安全的
*/
private final KafkaConsumer<String, String> consumer;
private ExecutorService executorService;
public KafkaConsumerAuto() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
// 关闭⾃动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put("auto.commit.interval.ms", "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<String, String>(props);
// 订阅主题
consumer.subscribe(Collections.singleton("tp_demo_02"));
}
public void execute() throws InterruptedException {
executorService = Executors.newFixedThreadPool(2);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(2_000);
if (null != records) {
executorService.submit(new ConsumerThreadAuto(records, consumer));
}
Thread.sleep(1000);
}
}
public void shutdown() {
try {
if (consumer != null) {
consumer.close();
}
if (executorService != null) {
executorService.shutdown();
}
if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
System.out.println("关闭线程池超时。。。");
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}
ConsumerThreadAuto类实现
package com.lagou.kafka.demo.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class ConsumerThreadAuto implements Runnable {
private ConsumerRecords<String, String> records;
private KafkaConsumer<String, String> consumer;
public ConsumerThreadAuto(ConsumerRecords<String, String> records,KafkaConsumer<String, String> consumer) {
this.records = records;
this.consumer = consumer;
}
@Override
public void run() {
for(ConsumerRecord<String,String> record : records){
System.out.println("当前线程:" + Thread.currentThread()+ "\t主题:"
+ record.topic()+ "\t偏移量:" + record.offset() + "\t分区:" + record.partition()
+ "\t获取的消息:" + record.value());
}
}
}
ConsumerAutoMain类实现
package com.lagou.kafka.demo.consumer;
public class ConsumerAutoMain {
public static void main(String[] args) {
KafkaConsumerAuto kafka_consumerAuto = new KafkaConsumerAuto();
try {
kafka_consumerAuto.execute();
Thread.sleep(20000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
kafka_consumerAuto.shutdown();
}
}
}
偏移量如何保证消息的稳定性
Zookeeper不适合大批量的频繁写⼊操作。Kafka 1.0.2将consumer的位移信息保存在Kafka内部的topic中,即__consumer_offsets主题,并且默认提供了kafka_consumer_groups.sh脚本供⽤户查看consumer信息。
- 创建topic “tp_test_01”
[root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --create --topic
tp_test_01 --partitions 5 --replication-factor 1
- 使⽤kafka-console-producer.sh脚本⽣产消息
[root@node1 ~]# for i in `seq 100`; do echo "hello lagou $i" >> messages.txt; done
[root@node1 ~]# kafka-console-producer.sh --broker-list node1:9092 --topic tp_test_01 < messages.txt
由于默认没有指定key,所以根据round-robin⽅式,消息分布到不同的分区上。 (本例中⽣产了100条消息)
- 验证消息⽣产成功
[root@node1 ~]# kafka-console-producer.sh --broker-list node1:9092 --topic tp_test_01
< messages.txt
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>[root@node1 ~]# kafka-run-class.sh kafka.tools.GetOffsetShell --broker�list node1:9092 --topic tp_test_01 --time -1
tp_test_01:2:20
tp_test_01:4:20
tp_test_01:1:20
tp_test_01:3:20
tp_test_01:0:20
[root@node1 ~]#
结果输出表明100条消息全部⽣产成功!
- 创建⼀个console consumer group
[root@node1 ~]#kafka-console-consumer.sh --bootstrap-server node1:9092 --topic
tp_test_01 --from-beginning
- 获取该consumer group的group id(后⾯需要根据该id查询它的位移信息)
[root@node1 ~]# kafka-consumer-groups.sh --bootstrap-server node1:9092 --list
输出: console-consumer-49366 (记住这个id!)
- 查询__consumer_offsets topic所有内容
注意:运⾏下⾯命令前先要在consumer.properties中设置exclude.internal.topics=false
[root@node1 ~]# kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server
node1:9092 --formatter
"kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --
consumer.config config/consumer.properties --from-beginning
默认情况下__consumer_offsets有50个分区,如果你的系统中consumer group也很多的话,那么这个命令的输出结果会很多。
-
计算指定consumer group在__consumer_offsets topic中分区信息
这时候就⽤到了第5步获取的group.id(本例中是console-consumer-49366)。Kafka会使⽤下⾯公式计算该group
位移保存在__consumer_offsets的哪个分区上:
image.png
对应的分区=Math.abs("console-consumer-49366".hashCode()) % 50 = 19,即__consumer_offsets的分区19保存了这个consumer group的位移信息。
- 获取指定consumer group的位移信息
[root@node1 ~]# kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition
19 --broker-list node1:9092 --formatter
"kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
下⾯是输出结果:
[console-consumer-49366,tp_test_01,3]::[OffsetMetadata[20,NO_METADATA],CommitTime
1596424702212,ExpirationTime 1596511102212] [console-consumer-49366,tp_test_01,4]::[OffsetMetadata[20,NO_METADATA],CommitTime
1596424702212,ExpirationTime 1596511102212] [console-consumer-49366,tp_test_01,0]::[OffsetMetadata[20,NO_METADATA],CommitTime
1596424702212,ExpirationTime 1596511102212] [console-consumer-49366,tp_test_01,1]::[OffsetMetadata[20,NO_METADATA],CommitTime
1596424702212,ExpirationTime 1596511102212] [console-consumer-49366,tp_test_01,2]::[OffsetMetadata[20,NO_METADATA],CommitTime
1596424702212,ExpirationTime 1596511102212] [console-consumer-49366,tp_test_01,3]::[OffsetMetadata[20,NO_METADATA],CommitTime
1596424707212,ExpirationTime 1596511107212] [console-consumer-49366,tp_test_01,4]::[OffsetMetadata[20,NO_METADATA],CommitTime
1596424707212,ExpirationTime 1596511107212] [console-consumer-49366,tp_test_01,0]::[OffsetMetadata[20,NO_METADATA],CommitTime
1596424707212,ExpirationTime 1596511107212]
上图可⻅,该consumer group果然保存在分区11上,且位移信息都是对的(这⾥的位移信息是已消费的位移,严格来说不是第3步中的位移。由于我的consumer已经消费完了所有的消息,所以这⾥的位移与第3步中的位移相同)。
另外,可以看到__consumer_offsets topic的每⼀⽇志项的格式都是:[Group, Topic, Partition]::[OffsetMetadata[Offset, Metadata], CommitTime, ExpirationTime]。

