Flink消费kafka如何获取每条消息对应的topic

1.首先自定义个 KafkaDeserializationSchema

public class CustomKafkaDeserializationSchema implements KafkaDeserializationSchema<Tuple2<String, String>> {
    @Override
    //nextElement 是否表示流的最后一条元素,我们要设置为 false ,因为我们需要 msg 源源不断的被消费
    public boolean isEndOfStream(Tuple2<String, String> nextElement) {
        return false;
    }
    
    @Override
    // 反序列化 kafka 的 record,我们直接返回一个 tuple2<kafkaTopicName,kafkaMsgValue>
    public Tuple2<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
        return new Tuple2<>(record.topic(), new String(record.value(), "UTF-8"));
    }
    
    @Override
    //告诉 Flink 我输入的数据类型, 方便 Flink 的类型推断
    public TypeInformation<Tuple2<String, String>> getProducedType() {
        return new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
    }
}

2.使用自定义的 KafkaDeserializationSchema 进行消费

public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "test");
        
        FlinkKafkaConsumer<Tuple2<String, String>> kafkaConsumer = new FlinkKafkaConsumer<>("test", new CustomKafkaDeserializationSchema(), properties);
        kafkaConsumer.setStartFromEarliest();
        env.addSource(kafkaConsumer).flatMap(new FlatMapFunction<Tuple2<String, String>, Object>() {
            @Override
            public void flatMap(Tuple2<String, String> value, Collector<Object> out) throws Exception {
                System.out.println("topic==== " + value.f0);
            }
        });
        
        // execute program
        env.execute("Flink Streaming Java API Skeleton");
    }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 【Android 动画】 动画分类补间动画(Tween动画)帧动画(Frame 动画)属性动画(Property ...
    Rtia阅读 11,373评论 1 38
  • 今天看了一部战争电影。 平时很少看电影、看剧。今天才知道,原来最近很火的、朋友圈刷屏的《长安十二时辰》,是电视剧。...
    Fang2023阅读 1,118评论 0 0
  • -o 的意思是 输出到文件而非标准输出,作用等于>。 -n 表示最多输出多少行文件。
    VanJordan阅读 1,501评论 0 0
  • 今日惊蛰。 《月令七十二候集解》中说:“二月节,万物出乎震,震为雷,故曰惊蛰。是蛰虫惊而出走矣。” 惊蛰的意思是天...
    风之舞555阅读 4,516评论 12 31