2021-02-06-Flink-41(Flink 电商用户行为分析案例 三)

1.恶意登录

用户在短时间内频繁登录失败,有程序恶意攻击的可能
同一用户(可以是不同IP)在2秒内连续两次登录失败,需要报警

LoginEvent
private Long userId;
private String ip;
private String loginState;
private Long timestamp;

LoginFailWarning
private Long userId;
private Long firstFailTime;
private Long lastFailTime;
private String warningMsg;

定时器,问题时效性不强

public class LoginFail {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    // 1. 从文件中读取数据
    URL resource = LoginFail.class.getResource("/LoginLog.csv");
    SingleOutputStreamOperator<LoginEvent> loginEventStream = env.readTextFile(resource.getPath())
      .map(line -> {
        String[] fields = line.split(",");
        return new LoginEvent(new Long(fields[0]), fields[1], fields[2], new Long(fields[3]));
      }).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarksAdapter.Strategy<>(
      new BoundedOutOfOrdernessTimestampExtractor<LoginEvent>(Time.of(3, TimeUnit.SECONDS)) {
        @Override
        public long extractTimestamp(LoginEvent element) {
          return element.getTimestamp() * 1000L;
        }
      }
    ));
    // 自定义处理函数检测连续登录失败事件
    SingleOutputStreamOperator<LoginFailWarning> warningStream = loginEventStream
      .keyBy(LoginEvent::getUserId)
      .process(new LoginFailDetectWarning(1));

    warningStream.print();

    env.execute("login fail detect job");
  }

  // 实现自定义KeyedProcessFunction
  public static class LoginFailDetectWarning extends KeyedProcessFunction<Long, LoginEvent, LoginFailWarning> {
    // 定义属性,最大连续登录失败次数
    private Integer maxFailTimes;

    // 定义状态:保存2秒内所有的登录失败事件
    ListState<LoginEvent> loginFailEventListState;
    // 定义状态:保存注册的定时器时间戳
    ValueState<Long> timerTsState;

    public LoginFailDetectWarning(Integer maxFailTimes) {
      this.maxFailTimes = maxFailTimes;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
      loginFailEventListState = getRuntimeContext().getListState(new ListStateDescriptor<LoginEvent>("login-fail-list", LoginEvent.class));
      timerTsState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timer-ts", Long.class));
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<LoginFailWarning> out) throws Exception {
      // 定时器触发,说明2秒内没有登录成功,判读ListState中失败的个数
      ArrayList<LoginEvent> loginFailEvents = Lists.newArrayList(loginFailEventListState.get().iterator());
      int failTimes = loginFailEvents.size();

      if (failTimes >= maxFailTimes) {
        // 如果超出设定的最大失败次数,输出报警
        out.collect(new LoginFailWarning(ctx.getCurrentKey(),
                                         loginFailEvents.get(0).getTimestamp(),
                                         loginFailEvents.get(failTimes - 1).getTimestamp(),
                                         "login fail in 2s for " + failTimes + " times"));
      }

      // 清空状态
      loginFailEventListState.clear();
      timerTsState.clear();
    }

    @Override
    public void processElement(LoginEvent value, Context ctx, Collector<LoginFailWarning> out) throws Exception {
      // 判断当前登录事件类型
      if ("fail".equals(value.getLoginState())) {
        // 1. 如果是失败事件,添加到表状态中
        loginFailEventListState.add(value);
        // 如果没有定时器,注册一个2秒之后的定时器
        if (null == timerTsState.value()) {
          long ts = (value.getTimestamp() + 2) * 1000L;
          ctx.timerService().registerEventTimeTimer(ts);
          timerTsState.update(ts);
        } else {
          // 2. 如果是登录成功,删除定时器,清空状态,重新开始
          if (null != timerTsState.value()) {
            ctx.timerService().deleteEventTimeTimer(timerTsState.value());
          }
          loginFailEventListState.clear();
          timerTsState.clear();
        }
      }
    }
  }
}

缺点,如果是乱序数据就会影响结果 ,打乱处理流程

public class LoginFail {
    /**
     * Created with IntelliJ IDEA.
     * Description:
     * User:
     * Date: 2021-02-17
     * Time: 10:09
     */

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStreamSource<String> source = env.readTextFile("C:\\Users\\hp\\IdeaProjects\\Flink_Java\\src\\main\\resources\\LoginLog.csv");
        SingleOutputStreamOperator<LoginEvent> operator = source.map(new MapFunction<String, LoginEvent>() {
            @Override
            public LoginEvent map(String s) throws Exception {
                String[] split = s.split(",");
                return new LoginEvent(Long.parseLong(split[0]), split[1], split[2], Long.parseLong(split[3]));
            }
        }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<LoginEvent>(Time.seconds(2L)) {
            @Override
            public long extractTimestamp(LoginEvent element) {
                return element.getTimestamp() * 1000L;
            }
        });

        operator.keyBy(LoginEvent::getUserId)
                .process(new KeyedProcessFunction<Long, LoginEvent, LoginFailWarning>() {
                    ListState<LoginEvent> listState;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        listState = getRuntimeContext().getListState(new ListStateDescriptor<>("", LoginEvent.class));
                    }

                    @Override
                    public void processElement(LoginEvent value, Context ctx, Collector<LoginFailWarning> out) throws Exception {
                        //方法一,延迟性高,采用来一个就比较
                        if ("fail".equals(value.getLoginState())) {
                            //由于状态中只有一条数据,next比较时间的间隔是否再2秒之内,报警
                            Iterator<LoginEvent> loginEventIterator = listState.get().iterator();
                            if (loginEventIterator.hasNext()) {
                                LoginEvent loginEvent = loginEventIterator.next();
                                //比较时间
                                if (value.getTimestamp() - loginEvent.getTimestamp() <= 2){
                                       out.collect(new LoginFailWarning(value.getUserId(),loginEvent.getTimestamp()
                                                               ,value.getTimestamp(),"fail"));
                                }
                                //把状态给清除了
                                listState.clear();
                                //把最新的数据添加进去
                                listState.add(value);
                            } else {
                                //没有的情形,添加第一条数据
                                listState.add(value);
                            }
                        } else {
                            //登录成功清除状态
                            listState.clear();
                        }


                    }
                }).print();

CEP
为什么是这个 Map<String, List<LoginEvent>> pattern?循环模式可能有多个数据

public class LoginFailWithCep {

  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    // 1. 从文件中读取数据
    URL resource = LoginFail.class.getResource("/LoginLog.csv");
    DataStream<LoginEvent> loginEventStream = env.readTextFile(resource.getPath())
      .map(line -> {
        String[] fields = line.split(",");
        return new LoginEvent(new Long(fields[0]), fields[1], fields[2], new Long(fields[3]));
      })
      .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarksAdapter.Strategy<>(
        new BoundedOutOfOrdernessTimestampExtractor<LoginEvent>(Time.of(200, TimeUnit.MILLISECONDS)) {
          @Override
          public long extractTimestamp(LoginEvent element) {
            return element.getTimestamp() * 1000L;
          }
        }
      ));

    // 1. 定义一个匹配模式
    // firstFail -> secondFail, within 2s
    Pattern<LoginEvent, LoginEvent> loginFailPattern = Pattern
      .<LoginEvent>begin("firstFail").where(new SimpleCondition<LoginEvent>() {
      @Override
      public boolean filter(LoginEvent value) throws Exception {
        return "fail".equals(value.getLoginState());
      }
    })
      .next("secondFail").where(new SimpleCondition<LoginEvent>() {
      @Override
      public boolean filter(LoginEvent value) throws Exception {
        return "fail".equals(value.getLoginState());
      }
    })
      .within(Time.seconds(2));

    // 2. 将匹配模式应用到数据流上,得到一个pattern stream
    PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventStream.keyBy(LoginEvent::getUserId), loginFailPattern);

    // 3. 检出符合匹配条件的复杂事件,进行转换处理,得到报警信息
    SingleOutputStreamOperator<LoginFailWarning> warningStream = patternStream.select(new LoginFailMatchDetectWarning());

    warningStream.print();

    env.execute("login fail detect with cep job");
  }

  // 实现自定义的PatternSelectFunction
  public static class LoginFailMatchDetectWarning implements PatternSelectFunction<LoginEvent, LoginFailWarning> {
    @Override
    public LoginFailWarning select(Map<String, List<LoginEvent>> pattern) throws Exception {
      LoginEvent firstFailEvent = pattern.get("firstFail").iterator().next();
      LoginEvent lastFailEvent = pattern.get("secondFail").get(0);
      return new LoginFailWarning(firstFailEvent.getUserId(), firstFailEvent.getTimestamp(), lastFailEvent.getTimestamp(), "login fail 2 times");
    }
  }
}

循环模式改进

public class LoginFailWithCep {

  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    // 1. 从文件中读取数据
    URL resource = LoginFail.class.getResource("/LoginLog.csv");
    DataStream<LoginEvent> loginEventStream = env.readTextFile(resource.getPath())
      .map(line -> {
        String[] fields = line.split(",");
        return new LoginEvent(new Long(fields[0]), fields[1], fields[2], new Long(fields[3]));
      })
      .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarksAdapter.Strategy<>(
        new BoundedOutOfOrdernessTimestampExtractor<LoginEvent>(Time.of(200, TimeUnit.MILLISECONDS)) {
          @Override
          public long extractTimestamp(LoginEvent element) {
            return element.getTimestamp() * 1000L;
          }
        }
      ));

    // 1. 定义一个匹配模式
    // firstFail -> secondFail, within 2s
    Pattern<LoginEvent, LoginEvent> loginFailPattern = Pattern
      .<LoginEvent>begin("failEvents").where(new SimpleCondition<LoginEvent>() {
      @Override
      public boolean filter(LoginEvent value) throws Exception {
        return "fail".equals(value.getLoginState());
      }
    }).times(3).consecutive()
      .within(Time.seconds(5));

    // 2. 将匹配模式应用到数据流上,得到一个pattern stream
    PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventStream.keyBy(LoginEvent::getUserId), loginFailPattern);

    // 3. 检出符合匹配条件的复杂事件,进行转换处理,得到报警信息
    SingleOutputStreamOperator<LoginFailWarning> warningStream = patternStream.select(new LoginFailMatchDetectWarning());

    warningStream.print();

    env.execute("login fail detect with cep job");
  }

  // 实现自定义的PatternSelectFunction
  public static class LoginFailMatchDetectWarning implements PatternSelectFunction<LoginEvent, LoginFailWarning> {
    @Override
    public LoginFailWarning select(Map<String, List<LoginEvent>> pattern) throws Exception {
      LoginEvent firstFailEvent = pattern.get("failEvents").get(0);
      LoginEvent lastFailEvent = pattern.get("failEvents").get(pattern.get("failEvents").size() - 1);
      return new LoginFailWarning(firstFailEvent.getUserId(), firstFailEvent.getTimestamp(), lastFailEvent.getTimestamp(), "login fail " + pattern.get("failEvents").size() + " times");
    }
  }
}

2.订单超时

用户下单之后,应设置订单失效事件,以提高用户支付的意愿,并降低系统风险
用户下单后15分钟未支付,则输出监控信息

OrderEvent
private Long orderId;
private String eventType;
private String txId;
private Long timestamp;

OrderResult
private Long orderId;
private String resultState;

ReceiptEvent
private String txId;
private String payChannel;
private Long timestamp;
public class OrderPayTimeout {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    // 读取数据并转换成POJO类型
    URL resource = OrderPayTimeout.class.getResource("/OrderLog.csv");
    DataStream<OrderEvent> orderEventDataStream = env.readTextFile(resource.getPath())
      .map(line -> {
        String[] fields = line.split(",");
        return new OrderEvent(new Long(fields[0]), fields[1], fields[2], new Long(fields[3]));
      }).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarksAdapter.Strategy<>(
      new BoundedOutOfOrdernessTimestampExtractor<OrderEvent>(Time.of(200, TimeUnit.MILLISECONDS)) {
        @Override
        public long extractTimestamp(OrderEvent element) {
          return element.getTimestamp() * 1000L;
        }
      }
    ));

    // 1. 定义一个待时间限制的模式
    Pattern<OrderEvent, OrderEvent> orderPayPattern = Pattern.<OrderEvent>begin("create").where(new SimpleCondition<OrderEvent>() {
      @Override
      public boolean filter(OrderEvent value) throws Exception {
        return "create".equals(value.getEventType());
      }
    })
      .followedBy("pay").where(new SimpleCondition<OrderEvent>() {
      @Override
      public boolean filter(OrderEvent value) throws Exception {
        return "pay".equals(value.getEventType());
      }
    })
      .within(Time.minutes(5));

    // 2. 定义侧输出流标签,用来表示超时事件
    OutputTag<OrderResult> orderTimeoutTag = new OutputTag<OrderResult>("order-timeout") {
    };

    // 3. 将pattern应用到输入数据上,得到pattern stream
    PatternStream<OrderEvent> patternStream = CEP.pattern(orderEventDataStream.keyBy(OrderEvent::getOrderId), orderPayPattern);

    // 4. 调用select方法,实现对匹配复杂事件和超时复杂事件的提取和处理
    SingleOutputStreamOperator<OrderResult> resultStream = patternStream
      .select(orderTimeoutTag, new OrderTimeoutSelect(), new OrderPaySelect());

    resultStream.print("payed normally");
    resultStream.getSideOutput(orderTimeoutTag).print("timeout");

    env.execute("order timeout detect job");

  }

  // 实现自定义的超时事件处理函数
  public static class OrderTimeoutSelect implements PatternTimeoutFunction<OrderEvent, OrderResult> {

    @Override
    public OrderResult timeout(Map<String, List<OrderEvent>> pattern, long timeoutTimestamp) throws Exception {
      Long timeoutOrderId = pattern.get("create").iterator().next().getOrderId();
      return new OrderResult(timeoutOrderId, "timeout " + timeoutTimestamp);
    }
  }

  // 实现自定义的正常匹配事件处理函数
  public static class OrderPaySelect implements PatternSelectFunction<OrderEvent, OrderResult> {
    @Override
    public OrderResult select(Map<String, List<OrderEvent>> pattern) throws Exception {
      Long payedOrderId = pattern.get("pay").iterator().next().getOrderId();
      return new OrderResult(payedOrderId, "payed");
    }
  }
}
public class OrderTimeoutWithoutCep {

  // 定义超时事件的侧输出流标签
  private final static OutputTag<OrderResult> orderTimeoutTag = new OutputTag<OrderResult>("order-timeout") {
  };

  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    // 读取数据并转换成POJO类型
    URL resource = OrderPayTimeout.class.getResource("/OrderLog.csv");
    DataStream<OrderEvent> orderEventDataStream = env.readTextFile(resource.getPath())
      .map(line -> {
        String[] fields = line.split(",");
        return new OrderEvent(new Long(fields[0]), fields[1], fields[2], new Long(fields[3]));
      }).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarksAdapter.Strategy<>(
      new BoundedOutOfOrdernessTimestampExtractor<OrderEvent>(Time.of(200, TimeUnit.MILLISECONDS)) {
        @Override
        public long extractTimestamp(OrderEvent element) {
          return element.getTimestamp() * 1000L;
        }
      }
    ));

    // 定义自定义处理函数,主流输出正常匹配订单事件,侧输出流输出超时报警事件
    SingleOutputStreamOperator<OrderResult> resultStream = orderEventDataStream.keyBy(OrderEvent::getOrderId)
      .process(new OrderPayMatchDetect());

    resultStream.print("pay normally");
    resultStream.getSideOutput(orderTimeoutTag).print("timeout");

    env.execute("order timeout detect without cep job");
  }

  public static class OrderPayMatchDetect extends KeyedProcessFunction<Long, OrderEvent, OrderResult> {
    // 定义状态,保存之前点单是否已经来过create、pay的事件
    ValueState<Boolean> isPayedState;
    ValueState<Boolean> isCreatedState;
    // 定义状态,保存定时器时间戳
    ValueState<Long> timerTsState;

    @Override
    public void open(Configuration parameters) throws Exception {
      isPayedState = getRuntimeContext().getState(new ValueStateDescriptor<Boolean>("is-payed", Boolean.class, false));
      isCreatedState = getRuntimeContext().getState(new ValueStateDescriptor<Boolean>("is-created", Boolean.class, false));
      timerTsState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timer-ts", Long.class));
    }

    @Override
    public void processElement(OrderEvent value, Context ctx, Collector<OrderResult> out) throws Exception {
      // 先获取当前状态
      Boolean isPayed = isPayedState.value();
      Boolean isCreated = isCreatedState.value();
      Long timerTs = timerTsState.value();

      // 判断当前事件类型
      if ("create".equals(value.getEventType())) {
        // 1. 如果来的是create,要判断是否支付过
        if (isPayed) {
          // 1.1 如果已经正常支付,输出正常匹配结果
          out.collect(new OrderResult(value.getOrderId(), "payed successfully"));
          // 清空状态,删除定时器
          isCreatedState.clear();
          isPayedState.clear();
          timerTsState.clear();
          ctx.timerService().deleteEventTimeTimer(timerTs);
        } else {
          // 1.2 如果没有支付过,注册15分钟后的定时器,开始等待支付事件
          Long ts = (value.getTimestamp() + 15 * 60) * 1000L;
          ctx.timerService().registerEventTimeTimer(ts);
          // 更新状态
          timerTsState.update(ts);
          isCreatedState.update(true);
        }
      } else if ("pay".equals(value.getEventType())) {
        // 2. 如果来的是pay,要判断是否有下单事件来过
        if (isCreated) {
          // 2.1 已经有过下单事件,要继续判断支付的时间戳是否超过15分钟
          if (value.getTimestamp() * 1000L < timerTs) {
            // 2.1.1 在15分钟内,没有超时,正常匹配输出
            out.collect(new OrderResult(value.getOrderId(), "payed successfully"));
          } else {
            // 2.1.2 已经超时,输出侧输出流报警
            ctx.output(orderTimeoutTag, new OrderResult(value.getOrderId(), "payed but already timeout"));
          }
          // 统一清空状态
          isCreatedState.clear();
          isPayedState.clear();
          timerTsState.clear();
          ctx.timerService().deleteEventTimeTimer(timerTs);
        } else {
          // 2.2 没有下单事件,乱序,注册一个定时器,等待下单事件
          ctx.timerService().registerEventTimeTimer(value.getTimestamp() * 1000L);
          // 更新状态
          timerTsState.update(value.getTimestamp() * 1000L);
          isPayedState.update(true);
        }
      }
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<OrderResult> out) throws Exception {
      // 定时器触发,说明一定有一个事件没来
      if (isPayedState.value()) {
        // 如果pay来了,说明create没来
        ctx.output(orderTimeoutTag, new OrderResult(ctx.getCurrentKey(), "payed but not found created log"));
      } else {
        // 如果pay没来,支付超时
        ctx.output(orderTimeoutTag, new OrderResult(ctx.getCurrentKey(), "timeout"));
      }
      // 清空状态
      isCreatedState.clear();
      isPayedState.clear();
      timerTsState.clear();
    }
  }
}

3.实时对账

用户下单并支付之后,应查询到账信息,进行实时对帐
如果有不匹配的支付信息或者到账信息,输出提示信息

ReceiptEvent

private String txId;
private String payChannel;
private Long timestamp;
public class TxPayMatch {

  // 定义侧输出流标签
  private final static OutputTag<OrderEvent> unmatchedPays = new OutputTag<OrderEvent>("unmatched-pays"){};
  private final static OutputTag<ReceiptEvent> unmatchedReceipts = new OutputTag<ReceiptEvent>("unmatched-receipts"){};

  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    // 读取数据并转换成POJO类型
    // 读取订单支付事件数据
    URL orderResource = TxPayMatch.class.getResource("/OrderLog.csv");
    DataStream<OrderEvent> orderEventStream = env.readTextFile(orderResource.getPath())
      .map(line -> {
        String[] fields = line.split(",");
        return new OrderEvent(new Long(fields[0]), fields[1], fields[2], new Long(fields[3]));
      })
      .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarksAdapter.Strategy<>(
        new BoundedOutOfOrdernessTimestampExtractor<OrderEvent>(Time.of(200, TimeUnit.MILLISECONDS)) {
          @Override
          public long extractTimestamp(OrderEvent element) {
            return element.getTimestamp() * 1000L;
          }
        }
      ))
      // 交易id不为空,必须是pay事件
      .filter(data -> !"".equals(data.getTxId()));

    // 读取到账事件数据
    URL receiptResource = TxPayMatch.class.getResource("/ReceiptLog.csv");
    SingleOutputStreamOperator<ReceiptEvent> receiptEventStream = env.readTextFile(receiptResource.getPath())
      .map(line -> {
        String[] fields = line.split(",");
        return new ReceiptEvent(fields[0], fields[1], new Long(fields[2]));
      })
      .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarksAdapter.Strategy<>(
        new BoundedOutOfOrdernessTimestampExtractor<ReceiptEvent>(Time.of(200, TimeUnit.MILLISECONDS)) {
          @Override
          public long extractTimestamp(ReceiptEvent element) {
            return element.getTimestamp() * 1000L;
          }
        }
      ));

    // 将两条流进行连接合并,进行匹配处理,不匹配的事件输出到侧输出流
    SingleOutputStreamOperator<Tuple2<OrderEvent, ReceiptEvent>> resultStream = orderEventStream.keyBy(OrderEvent::getTxId)
      .connect(receiptEventStream.keyBy(ReceiptEvent::getTxId))
      .process(new TxPayMatchDetect());

    resultStream.print("matched-pays");
    resultStream.getSideOutput(unmatchedPays).print("unmatched-pays");
    resultStream.getSideOutput(unmatchedReceipts).print("unmatched-receipts");

    env.execute("tx match detect job");
  }

  // 实现自定义CoProcessFunction
  public static class TxPayMatchDetect extends CoProcessFunction<OrderEvent, ReceiptEvent, Tuple2<OrderEvent, ReceiptEvent>> {
    // 定义状态,保存当前已经到来的订单支付事件和到账时间
    ValueState<OrderEvent> payState;
    ValueState<ReceiptEvent> receiptState;

    @Override
    public void open(Configuration parameters) throws Exception {
      payState = getRuntimeContext().getState(new ValueStateDescriptor<OrderEvent>("pay", OrderEvent.class));
      receiptState = getRuntimeContext().getState(new ValueStateDescriptor<ReceiptEvent>("receipt", ReceiptEvent.class));
    }

    @Override
    public void processElement1(OrderEvent pay, Context ctx, Collector<Tuple2<OrderEvent, ReceiptEvent>> out) throws Exception {
      // 订单支付事件来了,判断是否已经有对应的到账事件
      ReceiptEvent receipt = receiptState.value();
      if( receipt != null ){
        // 如果receipt不为空,说明到账事件已经来过,输出匹配事件,清空状态
        out.collect( new Tuple2<>(pay, receipt) );
        payState.clear();
        receiptState.clear();
      } else {
        // 如果receipt没来,注册一个定时器,开始等待
        ctx.timerService().registerEventTimeTimer( (pay.getTimestamp() + 5) * 1000L );    // 等待5秒钟,具体要看数据
        // 更新状态
        payState.update(pay);
      }
    }

    @Override
    public void processElement2(ReceiptEvent receipt, Context ctx, Collector<Tuple2<OrderEvent, ReceiptEvent>> out) throws Exception {
      // 到账事件来了,判断是否已经有对应的支付事件
      OrderEvent pay = payState.value();
      if( pay != null ){
        // 如果pay不为空,说明支付事件已经来过,输出匹配事件,清空状态
        out.collect( new Tuple2<>(pay, receipt) );
        payState.clear();
        receiptState.clear();
      } else {
        // 如果pay没来,注册一个定时器,开始等待
        ctx.timerService().registerEventTimeTimer( (receipt.getTimestamp() + 3) * 1000L );    // 等待3秒钟,具体要看数据
        // 更新状态
        receiptState.update(receipt);
      }
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<OrderEvent, ReceiptEvent>> out) throws Exception {
      // 定时器触发,有可能是有一个事件没来,不匹配,也有可能是都来过了,已经输出并清空状态
      // 判断哪个不为空,那么另一个就没来
      if( payState.value() != null ){
        ctx.output(unmatchedPays, payState.value());
      }
      if( receiptState.value() != null ){
        ctx.output(unmatchedReceipts, receiptState.value());
      }
      // 清空状态
      payState.clear();
      receiptState.clear();
    }
  }
}

只能获得正常匹配的结果,不能获得未匹配成功的记录,未匹配成功的到数据库中取查找进行匹配

public class TxPayMatchByJoin {

  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    env.setParallelism(1);

    // 读取数据并转换成POJO类型
    // 读取订单支付事件数据
    URL orderResource = TxPayMatch.class.getResource("/OrderLog.csv");
    DataStream<OrderEvent> orderEventStream = env.readTextFile(orderResource.getPath())
      .map(line -> {
        String[] fields = line.split(",");
        return new OrderEvent(new Long(fields[0]), fields[1], fields[2], new Long(fields[3]));
      })
      .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarksAdapter.Strategy<>(
        new BoundedOutOfOrdernessTimestampExtractor<OrderEvent>(Time.of(200, TimeUnit.MILLISECONDS)) {
          @Override
          public long extractTimestamp(OrderEvent element) {
            return element.getTimestamp() * 1000L;
          }
        }
      ))
      // 交易id不为空,必须是pay事件
      .filter(data -> !"".equals(data.getTxId()));

    // 读取到账事件数据
    URL receiptResource = TxPayMatch.class.getResource("/ReceiptLog.csv");
    SingleOutputStreamOperator<ReceiptEvent> receiptEventStream = env.readTextFile(receiptResource.getPath())
      .map(line -> {
        String[] fields = line.split(",");
        return new ReceiptEvent(fields[0], fields[1], new Long(fields[2]));
      })
      .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarksAdapter.Strategy<>(
        new BoundedOutOfOrdernessTimestampExtractor<ReceiptEvent>(Time.of(200, TimeUnit.MILLISECONDS)) {
          @Override
          public long extractTimestamp(ReceiptEvent element) {
            return element.getTimestamp() * 1000L;
          }
        }
      ));

    // 区间连接两条流,得到匹配的数据
    SingleOutputStreamOperator<Tuple2<OrderEvent, ReceiptEvent>> resultStream = orderEventStream
      .keyBy(OrderEvent::getTxId)
      .intervalJoin(receiptEventStream.keyBy(ReceiptEvent::getTxId))
      .between(Time.seconds(-3), Time.seconds(5))    // -3,5 区间范围
      .process(new TxPayMatchDetectByJoin());

    resultStream.print();

    env.execute("tx pay match by join job");
  }

  // 实现自定义ProcessJoinFunction
  public static class TxPayMatchDetectByJoin extends ProcessJoinFunction<OrderEvent, ReceiptEvent, Tuple2<OrderEvent, ReceiptEvent>> {
    @Override
    public void processElement(OrderEvent left, ReceiptEvent right, Context ctx, Collector<Tuple2<OrderEvent, ReceiptEvent>> out) throws Exception {
      out.collect(new Tuple2<>(left, right));
    }
  }
}
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容