1.恶意登录
用户在短时间内频繁登录失败,有程序恶意攻击的可能
同一用户(可以是不同IP)在2秒内连续两次登录失败,需要报警
LoginEvent
private Long userId;
private String ip;
private String loginState;
private Long timestamp;
LoginFailWarning
private Long userId;
private Long firstFailTime;
private Long lastFailTime;
private String warningMsg;
定时器,问题时效性不强
public class LoginFail {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 1. 从文件中读取数据
URL resource = LoginFail.class.getResource("/LoginLog.csv");
SingleOutputStreamOperator<LoginEvent> loginEventStream = env.readTextFile(resource.getPath())
.map(line -> {
String[] fields = line.split(",");
return new LoginEvent(new Long(fields[0]), fields[1], fields[2], new Long(fields[3]));
}).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarksAdapter.Strategy<>(
new BoundedOutOfOrdernessTimestampExtractor<LoginEvent>(Time.of(3, TimeUnit.SECONDS)) {
@Override
public long extractTimestamp(LoginEvent element) {
return element.getTimestamp() * 1000L;
}
}
));
// 自定义处理函数检测连续登录失败事件
SingleOutputStreamOperator<LoginFailWarning> warningStream = loginEventStream
.keyBy(LoginEvent::getUserId)
.process(new LoginFailDetectWarning(1));
warningStream.print();
env.execute("login fail detect job");
}
// 实现自定义KeyedProcessFunction
public static class LoginFailDetectWarning extends KeyedProcessFunction<Long, LoginEvent, LoginFailWarning> {
// 定义属性,最大连续登录失败次数
private Integer maxFailTimes;
// 定义状态:保存2秒内所有的登录失败事件
ListState<LoginEvent> loginFailEventListState;
// 定义状态:保存注册的定时器时间戳
ValueState<Long> timerTsState;
public LoginFailDetectWarning(Integer maxFailTimes) {
this.maxFailTimes = maxFailTimes;
}
@Override
public void open(Configuration parameters) throws Exception {
loginFailEventListState = getRuntimeContext().getListState(new ListStateDescriptor<LoginEvent>("login-fail-list", LoginEvent.class));
timerTsState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timer-ts", Long.class));
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<LoginFailWarning> out) throws Exception {
// 定时器触发,说明2秒内没有登录成功,判读ListState中失败的个数
ArrayList<LoginEvent> loginFailEvents = Lists.newArrayList(loginFailEventListState.get().iterator());
int failTimes = loginFailEvents.size();
if (failTimes >= maxFailTimes) {
// 如果超出设定的最大失败次数,输出报警
out.collect(new LoginFailWarning(ctx.getCurrentKey(),
loginFailEvents.get(0).getTimestamp(),
loginFailEvents.get(failTimes - 1).getTimestamp(),
"login fail in 2s for " + failTimes + " times"));
}
// 清空状态
loginFailEventListState.clear();
timerTsState.clear();
}
@Override
public void processElement(LoginEvent value, Context ctx, Collector<LoginFailWarning> out) throws Exception {
// 判断当前登录事件类型
if ("fail".equals(value.getLoginState())) {
// 1. 如果是失败事件,添加到表状态中
loginFailEventListState.add(value);
// 如果没有定时器,注册一个2秒之后的定时器
if (null == timerTsState.value()) {
long ts = (value.getTimestamp() + 2) * 1000L;
ctx.timerService().registerEventTimeTimer(ts);
timerTsState.update(ts);
} else {
// 2. 如果是登录成功,删除定时器,清空状态,重新开始
if (null != timerTsState.value()) {
ctx.timerService().deleteEventTimeTimer(timerTsState.value());
}
loginFailEventListState.clear();
timerTsState.clear();
}
}
}
}
}
缺点,如果是乱序数据就会影响结果 ,打乱处理流程
public class LoginFail {
/**
* Created with IntelliJ IDEA.
* Description:
* User:
* Date: 2021-02-17
* Time: 10:09
*/
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<String> source = env.readTextFile("C:\\Users\\hp\\IdeaProjects\\Flink_Java\\src\\main\\resources\\LoginLog.csv");
SingleOutputStreamOperator<LoginEvent> operator = source.map(new MapFunction<String, LoginEvent>() {
@Override
public LoginEvent map(String s) throws Exception {
String[] split = s.split(",");
return new LoginEvent(Long.parseLong(split[0]), split[1], split[2], Long.parseLong(split[3]));
}
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<LoginEvent>(Time.seconds(2L)) {
@Override
public long extractTimestamp(LoginEvent element) {
return element.getTimestamp() * 1000L;
}
});
operator.keyBy(LoginEvent::getUserId)
.process(new KeyedProcessFunction<Long, LoginEvent, LoginFailWarning>() {
ListState<LoginEvent> listState;
@Override
public void open(Configuration parameters) throws Exception {
listState = getRuntimeContext().getListState(new ListStateDescriptor<>("", LoginEvent.class));
}
@Override
public void processElement(LoginEvent value, Context ctx, Collector<LoginFailWarning> out) throws Exception {
//方法一,延迟性高,采用来一个就比较
if ("fail".equals(value.getLoginState())) {
//由于状态中只有一条数据,next比较时间的间隔是否再2秒之内,报警
Iterator<LoginEvent> loginEventIterator = listState.get().iterator();
if (loginEventIterator.hasNext()) {
LoginEvent loginEvent = loginEventIterator.next();
//比较时间
if (value.getTimestamp() - loginEvent.getTimestamp() <= 2){
out.collect(new LoginFailWarning(value.getUserId(),loginEvent.getTimestamp()
,value.getTimestamp(),"fail"));
}
//把状态给清除了
listState.clear();
//把最新的数据添加进去
listState.add(value);
} else {
//没有的情形,添加第一条数据
listState.add(value);
}
} else {
//登录成功清除状态
listState.clear();
}
}
}).print();
CEP
为什么是这个 Map<String, List<LoginEvent>> pattern?循环模式可能有多个数据
public class LoginFailWithCep {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 1. 从文件中读取数据
URL resource = LoginFail.class.getResource("/LoginLog.csv");
DataStream<LoginEvent> loginEventStream = env.readTextFile(resource.getPath())
.map(line -> {
String[] fields = line.split(",");
return new LoginEvent(new Long(fields[0]), fields[1], fields[2], new Long(fields[3]));
})
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarksAdapter.Strategy<>(
new BoundedOutOfOrdernessTimestampExtractor<LoginEvent>(Time.of(200, TimeUnit.MILLISECONDS)) {
@Override
public long extractTimestamp(LoginEvent element) {
return element.getTimestamp() * 1000L;
}
}
));
// 1. 定义一个匹配模式
// firstFail -> secondFail, within 2s
Pattern<LoginEvent, LoginEvent> loginFailPattern = Pattern
.<LoginEvent>begin("firstFail").where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent value) throws Exception {
return "fail".equals(value.getLoginState());
}
})
.next("secondFail").where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent value) throws Exception {
return "fail".equals(value.getLoginState());
}
})
.within(Time.seconds(2));
// 2. 将匹配模式应用到数据流上,得到一个pattern stream
PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventStream.keyBy(LoginEvent::getUserId), loginFailPattern);
// 3. 检出符合匹配条件的复杂事件,进行转换处理,得到报警信息
SingleOutputStreamOperator<LoginFailWarning> warningStream = patternStream.select(new LoginFailMatchDetectWarning());
warningStream.print();
env.execute("login fail detect with cep job");
}
// 实现自定义的PatternSelectFunction
public static class LoginFailMatchDetectWarning implements PatternSelectFunction<LoginEvent, LoginFailWarning> {
@Override
public LoginFailWarning select(Map<String, List<LoginEvent>> pattern) throws Exception {
LoginEvent firstFailEvent = pattern.get("firstFail").iterator().next();
LoginEvent lastFailEvent = pattern.get("secondFail").get(0);
return new LoginFailWarning(firstFailEvent.getUserId(), firstFailEvent.getTimestamp(), lastFailEvent.getTimestamp(), "login fail 2 times");
}
}
}
循环模式改进
public class LoginFailWithCep {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 1. 从文件中读取数据
URL resource = LoginFail.class.getResource("/LoginLog.csv");
DataStream<LoginEvent> loginEventStream = env.readTextFile(resource.getPath())
.map(line -> {
String[] fields = line.split(",");
return new LoginEvent(new Long(fields[0]), fields[1], fields[2], new Long(fields[3]));
})
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarksAdapter.Strategy<>(
new BoundedOutOfOrdernessTimestampExtractor<LoginEvent>(Time.of(200, TimeUnit.MILLISECONDS)) {
@Override
public long extractTimestamp(LoginEvent element) {
return element.getTimestamp() * 1000L;
}
}
));
// 1. 定义一个匹配模式
// firstFail -> secondFail, within 2s
Pattern<LoginEvent, LoginEvent> loginFailPattern = Pattern
.<LoginEvent>begin("failEvents").where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent value) throws Exception {
return "fail".equals(value.getLoginState());
}
}).times(3).consecutive()
.within(Time.seconds(5));
// 2. 将匹配模式应用到数据流上,得到一个pattern stream
PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventStream.keyBy(LoginEvent::getUserId), loginFailPattern);
// 3. 检出符合匹配条件的复杂事件,进行转换处理,得到报警信息
SingleOutputStreamOperator<LoginFailWarning> warningStream = patternStream.select(new LoginFailMatchDetectWarning());
warningStream.print();
env.execute("login fail detect with cep job");
}
// 实现自定义的PatternSelectFunction
public static class LoginFailMatchDetectWarning implements PatternSelectFunction<LoginEvent, LoginFailWarning> {
@Override
public LoginFailWarning select(Map<String, List<LoginEvent>> pattern) throws Exception {
LoginEvent firstFailEvent = pattern.get("failEvents").get(0);
LoginEvent lastFailEvent = pattern.get("failEvents").get(pattern.get("failEvents").size() - 1);
return new LoginFailWarning(firstFailEvent.getUserId(), firstFailEvent.getTimestamp(), lastFailEvent.getTimestamp(), "login fail " + pattern.get("failEvents").size() + " times");
}
}
}
2.订单超时
用户下单之后,应设置订单失效事件,以提高用户支付的意愿,并降低系统风险
用户下单后15分钟未支付,则输出监控信息
OrderEvent
private Long orderId;
private String eventType;
private String txId;
private Long timestamp;
OrderResult
private Long orderId;
private String resultState;
ReceiptEvent
private String txId;
private String payChannel;
private Long timestamp;
public class OrderPayTimeout {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 读取数据并转换成POJO类型
URL resource = OrderPayTimeout.class.getResource("/OrderLog.csv");
DataStream<OrderEvent> orderEventDataStream = env.readTextFile(resource.getPath())
.map(line -> {
String[] fields = line.split(",");
return new OrderEvent(new Long(fields[0]), fields[1], fields[2], new Long(fields[3]));
}).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarksAdapter.Strategy<>(
new BoundedOutOfOrdernessTimestampExtractor<OrderEvent>(Time.of(200, TimeUnit.MILLISECONDS)) {
@Override
public long extractTimestamp(OrderEvent element) {
return element.getTimestamp() * 1000L;
}
}
));
// 1. 定义一个待时间限制的模式
Pattern<OrderEvent, OrderEvent> orderPayPattern = Pattern.<OrderEvent>begin("create").where(new SimpleCondition<OrderEvent>() {
@Override
public boolean filter(OrderEvent value) throws Exception {
return "create".equals(value.getEventType());
}
})
.followedBy("pay").where(new SimpleCondition<OrderEvent>() {
@Override
public boolean filter(OrderEvent value) throws Exception {
return "pay".equals(value.getEventType());
}
})
.within(Time.minutes(5));
// 2. 定义侧输出流标签,用来表示超时事件
OutputTag<OrderResult> orderTimeoutTag = new OutputTag<OrderResult>("order-timeout") {
};
// 3. 将pattern应用到输入数据上,得到pattern stream
PatternStream<OrderEvent> patternStream = CEP.pattern(orderEventDataStream.keyBy(OrderEvent::getOrderId), orderPayPattern);
// 4. 调用select方法,实现对匹配复杂事件和超时复杂事件的提取和处理
SingleOutputStreamOperator<OrderResult> resultStream = patternStream
.select(orderTimeoutTag, new OrderTimeoutSelect(), new OrderPaySelect());
resultStream.print("payed normally");
resultStream.getSideOutput(orderTimeoutTag).print("timeout");
env.execute("order timeout detect job");
}
// 实现自定义的超时事件处理函数
public static class OrderTimeoutSelect implements PatternTimeoutFunction<OrderEvent, OrderResult> {
@Override
public OrderResult timeout(Map<String, List<OrderEvent>> pattern, long timeoutTimestamp) throws Exception {
Long timeoutOrderId = pattern.get("create").iterator().next().getOrderId();
return new OrderResult(timeoutOrderId, "timeout " + timeoutTimestamp);
}
}
// 实现自定义的正常匹配事件处理函数
public static class OrderPaySelect implements PatternSelectFunction<OrderEvent, OrderResult> {
@Override
public OrderResult select(Map<String, List<OrderEvent>> pattern) throws Exception {
Long payedOrderId = pattern.get("pay").iterator().next().getOrderId();
return new OrderResult(payedOrderId, "payed");
}
}
}
public class OrderTimeoutWithoutCep {
// 定义超时事件的侧输出流标签
private final static OutputTag<OrderResult> orderTimeoutTag = new OutputTag<OrderResult>("order-timeout") {
};
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 读取数据并转换成POJO类型
URL resource = OrderPayTimeout.class.getResource("/OrderLog.csv");
DataStream<OrderEvent> orderEventDataStream = env.readTextFile(resource.getPath())
.map(line -> {
String[] fields = line.split(",");
return new OrderEvent(new Long(fields[0]), fields[1], fields[2], new Long(fields[3]));
}).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarksAdapter.Strategy<>(
new BoundedOutOfOrdernessTimestampExtractor<OrderEvent>(Time.of(200, TimeUnit.MILLISECONDS)) {
@Override
public long extractTimestamp(OrderEvent element) {
return element.getTimestamp() * 1000L;
}
}
));
// 定义自定义处理函数,主流输出正常匹配订单事件,侧输出流输出超时报警事件
SingleOutputStreamOperator<OrderResult> resultStream = orderEventDataStream.keyBy(OrderEvent::getOrderId)
.process(new OrderPayMatchDetect());
resultStream.print("pay normally");
resultStream.getSideOutput(orderTimeoutTag).print("timeout");
env.execute("order timeout detect without cep job");
}
public static class OrderPayMatchDetect extends KeyedProcessFunction<Long, OrderEvent, OrderResult> {
// 定义状态,保存之前点单是否已经来过create、pay的事件
ValueState<Boolean> isPayedState;
ValueState<Boolean> isCreatedState;
// 定义状态,保存定时器时间戳
ValueState<Long> timerTsState;
@Override
public void open(Configuration parameters) throws Exception {
isPayedState = getRuntimeContext().getState(new ValueStateDescriptor<Boolean>("is-payed", Boolean.class, false));
isCreatedState = getRuntimeContext().getState(new ValueStateDescriptor<Boolean>("is-created", Boolean.class, false));
timerTsState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timer-ts", Long.class));
}
@Override
public void processElement(OrderEvent value, Context ctx, Collector<OrderResult> out) throws Exception {
// 先获取当前状态
Boolean isPayed = isPayedState.value();
Boolean isCreated = isCreatedState.value();
Long timerTs = timerTsState.value();
// 判断当前事件类型
if ("create".equals(value.getEventType())) {
// 1. 如果来的是create,要判断是否支付过
if (isPayed) {
// 1.1 如果已经正常支付,输出正常匹配结果
out.collect(new OrderResult(value.getOrderId(), "payed successfully"));
// 清空状态,删除定时器
isCreatedState.clear();
isPayedState.clear();
timerTsState.clear();
ctx.timerService().deleteEventTimeTimer(timerTs);
} else {
// 1.2 如果没有支付过,注册15分钟后的定时器,开始等待支付事件
Long ts = (value.getTimestamp() + 15 * 60) * 1000L;
ctx.timerService().registerEventTimeTimer(ts);
// 更新状态
timerTsState.update(ts);
isCreatedState.update(true);
}
} else if ("pay".equals(value.getEventType())) {
// 2. 如果来的是pay,要判断是否有下单事件来过
if (isCreated) {
// 2.1 已经有过下单事件,要继续判断支付的时间戳是否超过15分钟
if (value.getTimestamp() * 1000L < timerTs) {
// 2.1.1 在15分钟内,没有超时,正常匹配输出
out.collect(new OrderResult(value.getOrderId(), "payed successfully"));
} else {
// 2.1.2 已经超时,输出侧输出流报警
ctx.output(orderTimeoutTag, new OrderResult(value.getOrderId(), "payed but already timeout"));
}
// 统一清空状态
isCreatedState.clear();
isPayedState.clear();
timerTsState.clear();
ctx.timerService().deleteEventTimeTimer(timerTs);
} else {
// 2.2 没有下单事件,乱序,注册一个定时器,等待下单事件
ctx.timerService().registerEventTimeTimer(value.getTimestamp() * 1000L);
// 更新状态
timerTsState.update(value.getTimestamp() * 1000L);
isPayedState.update(true);
}
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<OrderResult> out) throws Exception {
// 定时器触发,说明一定有一个事件没来
if (isPayedState.value()) {
// 如果pay来了,说明create没来
ctx.output(orderTimeoutTag, new OrderResult(ctx.getCurrentKey(), "payed but not found created log"));
} else {
// 如果pay没来,支付超时
ctx.output(orderTimeoutTag, new OrderResult(ctx.getCurrentKey(), "timeout"));
}
// 清空状态
isCreatedState.clear();
isPayedState.clear();
timerTsState.clear();
}
}
}
3.实时对账
用户下单并支付之后,应查询到账信息,进行实时对帐
如果有不匹配的支付信息或者到账信息,输出提示信息
ReceiptEvent
private String txId;
private String payChannel;
private Long timestamp;
public class TxPayMatch {
// 定义侧输出流标签
private final static OutputTag<OrderEvent> unmatchedPays = new OutputTag<OrderEvent>("unmatched-pays"){};
private final static OutputTag<ReceiptEvent> unmatchedReceipts = new OutputTag<ReceiptEvent>("unmatched-receipts"){};
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 读取数据并转换成POJO类型
// 读取订单支付事件数据
URL orderResource = TxPayMatch.class.getResource("/OrderLog.csv");
DataStream<OrderEvent> orderEventStream = env.readTextFile(orderResource.getPath())
.map(line -> {
String[] fields = line.split(",");
return new OrderEvent(new Long(fields[0]), fields[1], fields[2], new Long(fields[3]));
})
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarksAdapter.Strategy<>(
new BoundedOutOfOrdernessTimestampExtractor<OrderEvent>(Time.of(200, TimeUnit.MILLISECONDS)) {
@Override
public long extractTimestamp(OrderEvent element) {
return element.getTimestamp() * 1000L;
}
}
))
// 交易id不为空,必须是pay事件
.filter(data -> !"".equals(data.getTxId()));
// 读取到账事件数据
URL receiptResource = TxPayMatch.class.getResource("/ReceiptLog.csv");
SingleOutputStreamOperator<ReceiptEvent> receiptEventStream = env.readTextFile(receiptResource.getPath())
.map(line -> {
String[] fields = line.split(",");
return new ReceiptEvent(fields[0], fields[1], new Long(fields[2]));
})
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarksAdapter.Strategy<>(
new BoundedOutOfOrdernessTimestampExtractor<ReceiptEvent>(Time.of(200, TimeUnit.MILLISECONDS)) {
@Override
public long extractTimestamp(ReceiptEvent element) {
return element.getTimestamp() * 1000L;
}
}
));
// 将两条流进行连接合并,进行匹配处理,不匹配的事件输出到侧输出流
SingleOutputStreamOperator<Tuple2<OrderEvent, ReceiptEvent>> resultStream = orderEventStream.keyBy(OrderEvent::getTxId)
.connect(receiptEventStream.keyBy(ReceiptEvent::getTxId))
.process(new TxPayMatchDetect());
resultStream.print("matched-pays");
resultStream.getSideOutput(unmatchedPays).print("unmatched-pays");
resultStream.getSideOutput(unmatchedReceipts).print("unmatched-receipts");
env.execute("tx match detect job");
}
// 实现自定义CoProcessFunction
public static class TxPayMatchDetect extends CoProcessFunction<OrderEvent, ReceiptEvent, Tuple2<OrderEvent, ReceiptEvent>> {
// 定义状态,保存当前已经到来的订单支付事件和到账时间
ValueState<OrderEvent> payState;
ValueState<ReceiptEvent> receiptState;
@Override
public void open(Configuration parameters) throws Exception {
payState = getRuntimeContext().getState(new ValueStateDescriptor<OrderEvent>("pay", OrderEvent.class));
receiptState = getRuntimeContext().getState(new ValueStateDescriptor<ReceiptEvent>("receipt", ReceiptEvent.class));
}
@Override
public void processElement1(OrderEvent pay, Context ctx, Collector<Tuple2<OrderEvent, ReceiptEvent>> out) throws Exception {
// 订单支付事件来了,判断是否已经有对应的到账事件
ReceiptEvent receipt = receiptState.value();
if( receipt != null ){
// 如果receipt不为空,说明到账事件已经来过,输出匹配事件,清空状态
out.collect( new Tuple2<>(pay, receipt) );
payState.clear();
receiptState.clear();
} else {
// 如果receipt没来,注册一个定时器,开始等待
ctx.timerService().registerEventTimeTimer( (pay.getTimestamp() + 5) * 1000L ); // 等待5秒钟,具体要看数据
// 更新状态
payState.update(pay);
}
}
@Override
public void processElement2(ReceiptEvent receipt, Context ctx, Collector<Tuple2<OrderEvent, ReceiptEvent>> out) throws Exception {
// 到账事件来了,判断是否已经有对应的支付事件
OrderEvent pay = payState.value();
if( pay != null ){
// 如果pay不为空,说明支付事件已经来过,输出匹配事件,清空状态
out.collect( new Tuple2<>(pay, receipt) );
payState.clear();
receiptState.clear();
} else {
// 如果pay没来,注册一个定时器,开始等待
ctx.timerService().registerEventTimeTimer( (receipt.getTimestamp() + 3) * 1000L ); // 等待3秒钟,具体要看数据
// 更新状态
receiptState.update(receipt);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<OrderEvent, ReceiptEvent>> out) throws Exception {
// 定时器触发,有可能是有一个事件没来,不匹配,也有可能是都来过了,已经输出并清空状态
// 判断哪个不为空,那么另一个就没来
if( payState.value() != null ){
ctx.output(unmatchedPays, payState.value());
}
if( receiptState.value() != null ){
ctx.output(unmatchedReceipts, receiptState.value());
}
// 清空状态
payState.clear();
receiptState.clear();
}
}
}
只能获得正常匹配的结果,不能获得未匹配成功的记录,未匹配成功的到数据库中取查找进行匹配
public class TxPayMatchByJoin {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
// 读取数据并转换成POJO类型
// 读取订单支付事件数据
URL orderResource = TxPayMatch.class.getResource("/OrderLog.csv");
DataStream<OrderEvent> orderEventStream = env.readTextFile(orderResource.getPath())
.map(line -> {
String[] fields = line.split(",");
return new OrderEvent(new Long(fields[0]), fields[1], fields[2], new Long(fields[3]));
})
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarksAdapter.Strategy<>(
new BoundedOutOfOrdernessTimestampExtractor<OrderEvent>(Time.of(200, TimeUnit.MILLISECONDS)) {
@Override
public long extractTimestamp(OrderEvent element) {
return element.getTimestamp() * 1000L;
}
}
))
// 交易id不为空,必须是pay事件
.filter(data -> !"".equals(data.getTxId()));
// 读取到账事件数据
URL receiptResource = TxPayMatch.class.getResource("/ReceiptLog.csv");
SingleOutputStreamOperator<ReceiptEvent> receiptEventStream = env.readTextFile(receiptResource.getPath())
.map(line -> {
String[] fields = line.split(",");
return new ReceiptEvent(fields[0], fields[1], new Long(fields[2]));
})
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarksAdapter.Strategy<>(
new BoundedOutOfOrdernessTimestampExtractor<ReceiptEvent>(Time.of(200, TimeUnit.MILLISECONDS)) {
@Override
public long extractTimestamp(ReceiptEvent element) {
return element.getTimestamp() * 1000L;
}
}
));
// 区间连接两条流,得到匹配的数据
SingleOutputStreamOperator<Tuple2<OrderEvent, ReceiptEvent>> resultStream = orderEventStream
.keyBy(OrderEvent::getTxId)
.intervalJoin(receiptEventStream.keyBy(ReceiptEvent::getTxId))
.between(Time.seconds(-3), Time.seconds(5)) // -3,5 区间范围
.process(new TxPayMatchDetectByJoin());
resultStream.print();
env.execute("tx pay match by join job");
}
// 实现自定义ProcessJoinFunction
public static class TxPayMatchDetectByJoin extends ProcessJoinFunction<OrderEvent, ReceiptEvent, Tuple2<OrderEvent, ReceiptEvent>> {
@Override
public void processElement(OrderEvent left, ReceiptEvent right, Context ctx, Collector<Tuple2<OrderEvent, ReceiptEvent>> out) throws Exception {
out.collect(new Tuple2<>(left, right));
}
}
}
