2021-01-13-Flink-18(Flink Sink)

经过一系列Transformation转换操作后,最后一定要调用Sink操作,才会形成一个完整的DataFlow拓扑。只有调用了Sink操作,才会产生最终的计算结果,这些数据可以写入到的文件、输出到指定的网络端口、消息中间件、外部的文件系统或者是打印到控制台

1.Print

手动实现

public class Print {
    /**
     * Created with IntelliJ IDEA.
     * Description:
     * User:
     * Date: 2021-01-14
     * Time: 20:05
     */
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
        DataStreamSource<String> source = env.socketTextStream("localhost", 8888);
        DataStreamSink<String> sink = source.addSink(new Print_Sink());
        System.out.println(sink);
        env.execute("job");
    }

    public  static  class Print_Sink extends RichSinkFunction<String>{
        int indexOfThisSubtask = 0 ;
        @Override
        public void open(Configuration parameters) throws Exception {
              //获取分区
            indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
        }

        @Override
        public void invoke(String value, Context context) throws Exception {
            System.out.println(indexOfThisSubtask + ">" +value);
        }
    }
}

源码

    public PrintSinkFunction(final String sinkIdentifier, final boolean stdErr) {
        writer = new PrintSinkOutputWriter<>(sinkIdentifier, stdErr);
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
        writer.open(context.getIndexOfThisSubtask(), context.getNumberOfParallelSubtasks());
    }

    @Override
    public void invoke(IN record) {
        writer.write(record);
    }

    @Override
    public String toString() {
        return writer.toString();
    }

2.KafkaSink

在实际的生产环境中,经常会有一些场景,需要将Flink处理后的数据快速地写入到一个分布式、高吞吐、高可用、可用保证Exactly Once的消息中间件中,供其他的应用消费处理后的数据。Kafka就是Flink最好的黄金搭档,Flink不但可以从Kafka中消费数据,还可以将处理后的数据写入到Kafka,并且吞吐量高、数据安全、可以保证Exactly Once等。
Flink可以和Kafka多个版本整合,比如0.11.x、1.x、2.x等,从Flink1.9开始,使用的是kafka 2.2的客户端,所以这里使用kafka的版本是2.2.2,并且使用最新的API。
下面的例子就是将数据写入到Kafka中,首先要定义一个类实现KafkaSerializationSchema接口,指定一个泛型,String代表要写入到Kafka的数据为String类型。该类的功能是指定写入到Kafka中数据的序列化Schema,需要重写serialize方法,将要写入的数据转成二进制数组,并封装到一个ProducerRecord中返回。

//自定义String类型数据Kafka的序列化Schema
public class KafkaStringSerializationSchema implements KafkaSerializationSchema<String> {
    private String topic;
    private String charset;
    //构造方法传入要写入的topic和字符集,默认使用UTF-8
    public KafkaStringSerializationSchema(String topic) {
        this(topic, “UTF-8”);
    }
    public KafkaStringSerializationSchema(String topic, String charset) {
        this.topic = topic;
        this.charset = charset;
    }
    //调用该方法将数据进行序列化
    @Override
    public ProducerRecord<byte[], byte[]> serialize(
            String element, @Nullable Long timestamp) {
        //将数据转成bytes数组
        byte[] bytes = element.getBytes(Charset.forName(charset));
        //返回ProducerRecord
        return new ProducerRecord<>(topic, bytes);
    }
}

然后将Kafka相关的参数设置到Properties中,再new FlinkKafkaProducer,将要写入的topic名称、Kafka序列化Schema、Properties和写入到Kafka的Semantic语义作为FlinkKafkaProducer构造方法参数传入。最好调用addSink方法将FlinkKafkaProducer的引用传入到该方法中。虽然下面的代码指定了EXACTLY_ONCE语义,但是没有开启Checkpointing,是没法实现的。

DataStream<String> dataSteam = …
//写入Kafka的topic
String topic = “test”;
//设置Kafka相关参数
Properties properties = new Properties();
properties.setProperty(“bootstrap.servers”, “node-1:9092,node-2:9092,node-3:9092”);
//创建FlinkKafkaProducer
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>(
        topic, //指定topic
        new KafkaStringSerializationSchema(topic), //指定写入Kafka的序列化Schema
        properties, //指定Kafka的相关参数
        FlinkKafkaProducer.Semantic.EXACTLY_ONCE //指定写入Kafka为EXACTLY_ONCE语义
);
//添加KafkaSink
dataSteam.addSink(kafkaProducer);

3.RedisSink

<dependency>
    <groupId>org.apache.bahir</groupId>
    <artifactId>flink-connector-redis_2.12</artifactId>
    <version>1.1-SNAPSHOT</version>
</dependency>

定义一个类(或者静态内部类)实现RedisMapper即可,需要指定一个泛型,这里是Tuple2<String, Integer>,即写入到Redis中的数据的类型,并实现三个方法。第一个方法是getCommandDescription方法,返回RedisCommandDescription实例,在该构造方法中可以指定写入到Redis的方法类型为HSET,和Redis的additionalKey即value为HASH类型外面key的值;第二个方法getKeyFromData是指定value为HASH类型对应key的值;第三个方法geVauleFromData是指定value为HASH类型对应value的值

public static class RedisWordCountMapper implements RedisMapper<Tuple2<String, Integer>> {
    @Override
    public RedisCommandDescription getCommandDescription() {
        //写入Redis的方法,value使用HASH类型,并指定外面key的值得名称
        return new RedisCommandDescription(RedisCommand.HSET, “WORD_COUNT”);
    }
    @Override
    public String getKeyFromData(Tuple2<String, Integer> data) {
        return data.f0; //指定写入Redis的value里面key的值
    }
    @Override
    public String getValueFromData(Tuple2<String, Integer> data) {
        return data.f1.toString(); //指定写入value里面value的值
    }
}
DataStream<Tuple2<String, Integer>> result = wordAndOne.keyBy(0).sum(1);//设置
Redis的参数,如地址、端口号等
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost(“localhost”).setPassword(“123456”).build();

//将数据写入Redis
result.addSink(new RedisSink<>(conf, new RedisWordCountMapper()));

4.StreamFileDataSink

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-filesystem_2.11</artifactId>
    <version>1.9.2</version>
</dependency>

通过DefaultRollingPolicy这个工具类,指定文件滚动生成的策略。这里设置的文件滚动生成策略有两个,一个是距离上一次生成文件时间超过30秒,另一个是文件大小达到100 mb。这两个条件只要满足其中一个即可滚动生成文件。然后StreamingFileSink.forRowFormat方法将文件输出目录、文件写入的编码传入,再调用withRollingPolicy关联上面的文件滚动生成策略,接着调用build方法构建好StreamingFileSink,最后将其作为参数传入到addSink方法中

DataStream<String> dataSteam = …//构建文件滚动生成的策略
DefaultRollingPolicy<String, String> rollingPolicy = DefaultRollingPolicy.create()
        .withRolloverInterval(30 * 1000L) //30秒滚动生成一个文件
        .withMaxPartSize(1024L * 1024L * 100L) //当文件达到100m滚动生成一个文件
        .build();
//创建StreamingFileSink,数据以行格式写入
StreamingFileSink<String> sink = StreamingFileSink.forRowFormat(
                new Path(outputPath), //指的文件存储目录
                new SimpleStringEncoder<String>(“UTF-8”)) //指的文件的编码
        .withRollingPolicy(rollingPolicy) //传入文件滚动生成策略
        .build();
//调用DataStream的addSink添加该Sink
dataSteam.addSink(sink);
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 1.Flink API介绍 Flink提供了不同的抽象级别以开发流式或者批处理应用程序 Stateful Stre...
    Rex_2013阅读 1,185评论 0 2
  • 1 概览 1.1 预定义的源和接收器 Flink内置了一些基本数据源和接收器,并且始终可用。该预定义的数据源包括...
    JavaEdge阅读 3,044评论 0 1
  • 说明:本文为《Flink大数据项目实战》学习笔记,想通过视频系统学习Flink这个最火爆的大数据计算框架的同学,推...
    大数据研习社阅读 2,301评论 0 0
  • Redis 1.redis是什么? Remote Dictionary Server(Redis)是一个开源的使用...
    袁小胜阅读 293评论 0 0
  • 推荐指数: 6.0 书籍主旨关键词:特权、焦点、注意力、语言联想、情景联想 观点: 1.统计学现在叫数据分析,社会...
    Jenaral阅读 5,756评论 0 5