2021-01-19-Flink-24(Flink State)

1.使用Hashmap模拟KeyedState

//设置重启策略
public class MyKeyedState01 {

    public static void main(String[] args) throws Exception{

        //创建Flink流计算执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.enableCheckpointing(10000);
        //设置重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));

        //创建DataStream
        //Source
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        //调用Transformation开始
        //调用Transformation
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] words = line.split(" ");
                for (String word : words) {
                    if("error".equals(word)) {
                        throw new RuntimeException("出现异常了!!!!!");
                    }
                    //new Tuple2<String, Integer>(word, 1)
                    collector.collect(Tuple2.of(word, 1));
                }
            }
        });

        //分组
        KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndOne.keyBy(t -> t.f0);

        keyed.map(new RichMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {

            //存储中间结果的一个集合
            private HashMap<String, Integer> counter = new HashMap<>();

            @Override
            public Tuple2<String, Integer> map(Tuple2<String, Integer> input) throws Exception {
                String word = input.f0;
                Integer count = input.f1;
                //从map中取出历史次数
                Integer historyCount = counter.get(word);
                if(historyCount == null) {
                    historyCount = 0;
                }
                int sum = historyCount + count; //当前输入跟历史次数进行累加
                //更新map中的数据
                counter.put(word, sum);
                //输出结果
                return Tuple2.of(word, sum);
            }
        }).print();




        //启动执行
        env.execute("StreamingWordCount");

    }
}

2.实现一个简单的KeyedState

/**
 * 自己每个keyBy之后的SubTask中定义一个hashMap保存中间结果
 * 可以定期的将hashmap中的数据持久好到磁盘
 * 并且subTask出现异常重启,在open方法中可以读取磁盘中的文件,恢复历史状态
 * (区别:每个定时器都是在SubTask中定期执行的)
 */
public class MyKeyedState02 {

    public static void main(String[] args) throws Exception{

        //创建Flink流计算执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        //要开启,否则从新开始计数
        env.enableCheckpointing(10000);
        //设置重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));

        //创建DataStream
        //Source
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        //调用Transformation开始
        //调用Transformation
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] words = line.split(" ");
                for (String word : words) {
                    if("error".equals(word)) {
                        throw new RuntimeException("出现异常了!!!!!");
                    }
                    //new Tuple2<String, Integer>(word, 1)
                    collector.collect(Tuple2.of(word, 1));
                }
            }
        });

        //分组
        KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndOne.keyBy(t -> t.f0);

        keyed.map(new RichMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {

            private HashMap<String, Integer> counter;

            @Override
            public void open(Configuration parameters) throws Exception {
                //初始化hashMap或恢复历史数据
                //获取当前subTask的编号
                int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
                File ckFile = new File("/Users/xing/Desktop/myck/" + indexOfThisSubtask);
                if(ckFile.exists()) {
                    FileInputStream fileInputStream = new FileInputStream(ckFile);
                    ObjectInputStream objectInputStream = new ObjectInputStream(fileInputStream);
                    counter = (HashMap<String, Integer>) objectInputStream.readObject();
                } else {
                   counter = new HashMap<>();
                }
                //简化:直接在当前的subTask中启动一个定时器
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                       while (true) {
                           try {
                               Thread.sleep(10000);
                               if (!ckFile.exists()) {
                                   ckFile.createNewFile();
                               }
                               //将hashMap中的数据持久化到文件中
                               ObjectOutputStream objectOutputStream = new ObjectOutputStream(new FileOutputStream(ckFile));
                               objectOutputStream.writeObject(counter);
                               objectOutputStream.flush();
                               objectOutputStream.close();
                           } catch (Exception e) {
                               e.printStackTrace();
                           }
                       }
                    }
                }).start();



            }

            @Override
            public Tuple2<String, Integer> map(Tuple2<String, Integer> input) throws Exception {
                String word = input.f0;
                Integer count = input.f1;
                //从map中取出历史次数
                Integer historyCount = counter.get(word);
                if(historyCount == null) {
                    historyCount = 0;
                }
                int sum = historyCount + count; //当前输入跟历史次数进行累加
                //更新map中的数据
                counter.put(word, sum);
                //输出结果
                return Tuple2.of(word, sum);
            }
        }).print();


        //启动执行
        env.execute("StreamingWordCount");

    }
}

3.KeyedState的简单使用

/**
 * KeyBy之后,用来存储K-V类型的状态,叫做KeyedState
 *
 * KeyedState种类有:ValueState<T> (value是一个基本类型、集合类型、自定义类型)
 * MapState<小K,V> (存储的是k,v类型)  (大K -> (小K, 小V))
 * ListState(Value是一个list集合)
 *
 */
public class KeyedStateDemo01 {

    public static void main(String[] args) throws Exception{

        //创建Flink流计算执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.enableCheckpointing(10000);
        //设置重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));

        //创建DataStream
        //Source
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        //调用Transformation开始
        //调用Transformation
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] words = line.split(" ");
                for (String word : words) {
                    if("error".equals(word)) {
                        throw new RuntimeException("出现异常了!!!!!");
                    }
                    //new Tuple2<String, Integer>(word, 1)
                    collector.collect(Tuple2.of(word, 1));
                }
            }
        });

        //分组
        KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndOne.keyBy(t -> t.f0);

        keyed.map(new RichMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {

            private transient ValueState<Integer> counter;

            @Override
            public void open(Configuration parameters) throws Exception {
                //想使用状态,先定义一个状态描述器(State的类型,名称)
                ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("wc-desc", Integer.class);
                //初始化或恢复历史状态
                counter = getRuntimeContext().getState(stateDescriptor);
            }

            @Override
            public Tuple2<String, Integer> map(Tuple2<String, Integer> input) throws Exception {
                //String word = input.f0;
                Integer currentCount = input.f1;
                //从ValueState中取出历史次数
                Integer historyCount = counter.value(); //获取当前key对应的value
                if(historyCount == null) {
                    historyCount = 0;
                }
                Integer total = historyCount + currentCount; //累加
                //跟新状态(内存中)
                counter.update(total);
                input.f1 = total; //累加后的次数
                return input;
            }
        }).print();

        //启动执行
        env.execute("StreamingWordCount");

    }
}

4.MapState简单使用

/**
 * KeyBy之后,用来存储K-V类型的状态,叫做KeyedState
 *
 * KeyedState种类有:ValueState<T> (value是一个基本类型、集合类型、自定义类型)
 * MapState<小K,V> (存储的是k,v类型)  (大K -> (小K, 小V))
 * ListState(Value是一个list集合)
 *
 */
public class MapStateDemo01 {

    public static void main(String[] args) throws Exception{

        //创建Flink流计算执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.enableCheckpointing(10000);
        //设置重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));

        //创建DataStream
        //Source
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        //调用Transformation开始
        //调用Transformation
        SingleOutputStreamOperator<Tuple3<String, String, Double>> tpDataStream = lines.map(new MapFunction<String, Tuple3<String, String, Double>>() {
            @Override
            public Tuple3<String, String, Double> map(String value) throws Exception {
                String[] fields = value.split(",");
                return Tuple3.of(fields[0], fields[1], Double.parseDouble(fields[2]));
            }
        });

        KeyedStream<Tuple3<String, String, Double>, String> keyedStream = tpDataStream.keyBy(t -> t.f0);

        SingleOutputStreamOperator<Tuple3<String, String, Double>> result = keyedStream.process(new KeyedProcessFunction<String, Tuple3<String, String, Double>, Tuple3<String, String, Double>>() {

            private transient MapState<String, Double> mapState;

            @Override
            public void open(Configuration parameters) throws Exception {
                //定义一个状态描述器
                MapStateDescriptor<String, Double> stateDescriptor = new MapStateDescriptor<String, Double>("kv-state", String.class, Double.class);
                //初始化或恢复历史状态
                mapState = getRuntimeContext().getMapState(stateDescriptor);
            }

            @Override
            public void processElement(Tuple3<String, String, Double> value, Context ctx, Collector<Tuple3<String, String, Double>> out) throws Exception {
                String city = value.f1;
                Double money = value.f2;
                Double historyMoney = mapState.get(city);
                if (historyMoney == null) {
                    historyMoney = 0.0;
                }
                Double totalMoney = historyMoney + money; //累加
                //更新到state中
                mapState.put(city, totalMoney);
                //输出
                value.f2 = totalMoney;
                out.collect(value);
            }
        });

        result.print();

        //启动执行
        env.execute("StreamingWordCount");

    }
}

5.ListState简单使用

/**
 * KeyBy之后,用来存储K-V类型的状态,叫做KeyedState
 *
 * ListState(Value是一个list集合)
 *
 */
public class ListStateDemo01 {

    public static void main(String[] args) throws Exception{

        //创建Flink流计算执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.enableCheckpointing(10000);
        //设置重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));

        //创建DataStream
        //Source
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        //调用Transformation开始
        //调用Transformation
        SingleOutputStreamOperator<Tuple2<String, String>> tpDataStream = lines.map(new MapFunction<String, Tuple2<String, String>>() {
            @Override
            public Tuple2<String, String> map(String value) throws Exception {
                String[] fields = value.split(",");
                return Tuple2.of(fields[0], fields[1]);
            }
        });

        KeyedStream<Tuple2<String, String>, String> keyedStream = tpDataStream.keyBy(t -> t.f0);

        keyedStream.process(new KeyedProcessFunction<String, Tuple2<String, String>, Tuple2<String, List<String>>>() {

            private transient ListState<String> listState;

            @Override
            public void open(Configuration parameters) throws Exception {
                //定义一个状态描述器
                ListStateDescriptor<String> stateDescriptor = new ListStateDescriptor<>("lst-state", String.class);
                //初始化状态或恢复状态
                listState = getRuntimeContext().getListState(stateDescriptor);
            }

            @Override
            public void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, List<String>>> out) throws Exception {
                String action = value.f1;
                listState.add(action);
                Iterable<String> iterator = listState.get();
                ArrayList<String> events = new ArrayList<>();
                for (String name : iterator) {
                    events.add(name);
                }
                out.collect(Tuple2.of(value.f0, events));
            }
        }).print();

        //启动执行
        env.execute("StreamingWordCount");

    }
}

注意TypeHint嵌套类型的使用

/**
 * KeyBy之后,用来存储K-V类型的状态,叫做KeyedState
 *
 * ListState(Value是一个list集合)
 *
 * 不是以ListState,而是使用ValueState,实现ListState的功能
 *
 * ValueState<List<String>>
 *
 */
public class ListStateDemo02 {

    public static void main(String[] args) throws Exception{

        //创建Flink流计算执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.enableCheckpointing(10000);
        //设置重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));

        //创建DataStream
        //Source
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        //调用Transformation开始
        //调用Transformation
        SingleOutputStreamOperator<Tuple2<String, String>> tpDataStream = lines.map(new MapFunction<String, Tuple2<String, String>>() {
            @Override
            public Tuple2<String, String> map(String value) throws Exception {
                String[] fields = value.split(",");
                return Tuple2.of(fields[0], fields[1]);
            }
        });

        KeyedStream<Tuple2<String, String>, String> keyedStream = tpDataStream.keyBy(t -> t.f0);

        keyedStream.process(new KeyedProcessFunction<String, Tuple2<String, String>, Tuple2<String, List<String>>>() {

            private transient ValueState<List<String>> listState;

            @Override
            public void open(Configuration parameters) throws Exception {
                //定义一个状态描述器
                ValueStateDescriptor<List<String>> listStateDescriptor = new ValueStateDescriptor<>("lst-state", TypeInformation.of(new TypeHint<List<String>>() {}));
                listState = getRuntimeContext().getState(listStateDescriptor);
            }

            @Override
            public void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, List<String>>> out) throws Exception {
                String action = value.f1;
                List<String> lst = listState.value();
                if(lst == null) {
                    lst = new ArrayList<String>();
                }
                lst.add(action);
                //更新状态
                listState.update(lst);
                out.collect(Tuple2.of(value.f0, lst));
            }
        }).print();

        //启动执行
        env.execute("StreamingWordCount");

    }
}

6.AtLeastOnceSource

image.png
image.png
public class MyAtLeastOnceSource extends RichParallelSourceFunction<String> implements CheckpointedFunction {


    private String path;

    public MyAtLeastOnceSource(String path) {
        this.path = path;
    }

    private boolean flag = true;
    private Long offset = 0L;
    private transient ListState<Long> listState;
    /*
     * @param context
     * @throws Exception
     * 初始化状态或恢复状态执行一次,在run方法执行之前执行一次
     */
    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        ListStateDescriptor<Long> stateDescriptor = new ListStateDescriptor<>("offset-state", Long.class);
        listState = context.getOperatorStateStore().getListState(stateDescriptor);
        //当前的状态是否已经恢复了
        if(context.isRestored()) {
            //从ListState中恢复偏移量
            Iterable<Long> iterable = listState.get();
            for (Long l : iterable) {
                    offset =+ l;
            }
        }
    }

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
        RandomAccessFile randomAccessFile = new RandomAccessFile(path + "/" + indexOfThisSubtask + ".txt", "r");
        randomAccessFile.seek(offset); //从指定的位置读取数据
        while (flag) {
            String line = randomAccessFile.readLine();
            if(line != null) {
                line = new String(line.getBytes(Charsets.ISO_8859_1), Charsets.UTF_8);
                synchronized (ctx.getCheckpointLock()) {
                    offset = randomAccessFile.getFilePointer();
                    ctx.collect(indexOfThisSubtask + ".txt : " + line);
                }
            } else {
                Thread.sleep(1000);
            }
        }
    }

    /**
     *
     * @param context
     * @throws Exception
     * 在checkpoint时会支持一次,该方法会周期性的调用
     */
    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        //定期的更新OperatorState
        listState.clear();
        listState.add(offset);
    }


    @Override
    public void cancel() {
        flag = false;
    }
}
public class MyAtLeastOnceSourceDemo {

    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //可以定期将状态保存到StateBackend(对状态做快照)
        env.enableCheckpointing(30000);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));


        DataStreamSource<String> lines1 = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator<String> errorData = lines1.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                if (value.startsWith("error")) {
                    int i = 10 / 0;
                }
                return value;
            }
        });

        DataStreamSource<String> lines2 = env.addSource(new MyAtLeastOnceSource("/Users/xing/Desktop/data"));

        DataStream<String> union = errorData.union(lines2);

        union.print();

        env.execute();


    }
}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容