flink任务遇坑之IllegalArgumentException异常

异常详情

flink任务启动后,报IllegalArgumentException

java.lang.IllegalArgumentException
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
at org.apache.flink.streaming.api.operators.InternalTimerHeap.globalKeyGroupToLocalIndex(InternalTimerHeap.java:343)
at org.apache.flink.streaming.api.operators.InternalTimerHeap.getDedupMapForKeyGroup(InternalTimerHeap.java:320)
at org.apache.flink.streaming.api.operators.InternalTimerHeap.getDedupMapForTimer(InternalTimerHeap.java:339)
at org.apache.flink.streaming.api.operators.InternalTimerHeap.addInternal(InternalTimerHeap.java:229)
at org.apache.flink.streaming.api.operators.InternalTimerHeap.scheduleTimer(InternalTimerHeap.java:140)
at org.apache.flink.streaming.api.operators.HeapInternalTimerService.registerEventTimeTimer(HeapInternalTimerService.java:205)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.registerEventTimeTimer(WindowOperator.java:885)
at org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger.onElement(EventTimeTrigger.java:42)
at org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger.onElement(EventTimeTrigger.java:30)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onElement(WindowOperator.java:899)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:396)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:211)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:720)
at java.lang.Thread.run(Thread.java:748)

错误栈没有任何涉及到具体业务代码的地方,让人摸不到头脑。幸好我是在原有代码基础上迭代,所以定位到了异常原因。

异常原因

在执行打散操作时,所重写的getKey方法有问题。

@AllArgsConstructor
public class RebalanceKeySelector implements KeySelector<User, String> {
    
    pricate Integer parallelism;
    
    @Override
    public String getKey(User user) threws Exception {
        return String.valueOf((int) Math.floor(Math.random() * parallelism));
    }
}

改为如下就没有报错了

@AllArgsConstructor
public class RebalanceKeySelector implements KeySelector<User, String> {
    
    pricate Integer parallelism;
    
    @Override
    public String getKey(User user) threws Exception {
        return String.valueOf(user.hashCode() % parallelism);
    }
}

总结就是做打散分流的话,生成key要用到流中的对象。具体原因以后有空研究。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容