概述
刚接触Flink不久,照着之前的一个老项目重构。好不容易写完代码并解决完各种报错,结果发现即使不报错也无法运出自己想要的效果。调试发现能正常消费到Kafka的消息,但是却无法触发窗口计算。在网上翻到一篇博客问题定位:Flink水位线不触发问题 ,上面说是watermark有问题,通过Flink的管理控制台发现watermark没生成:
注意,截图的时候我的问题已经解决了,没有watermark的时候显示的是no watermark,而不是上图中的0
既然定位到是watermark的问题,就知道解决问题的方向了。通过对比重构前可以运行的旧代码,发现了端倪。
旧代码大致如下:
我重构后的错误代码:
assignTimestampsAndWatermarks()方法返回的是一个全新的对象SingleOutputStreamOperator,而不再是原来那个DataStream,之所以第一种写法可以,是因为SingleOutputStreamOperator是DataStream的子类,看起来引用类型没发生变化,但实际上返回的对象已经发生了改变。后面要用assignTimestampsAndWatermarks()方法返回的对象来创建一个临时视图而不是addSource()返回的那个:
DataStream<CallInfo> callInfoStream = environment
.addSource(new FlinkKafkaConsumer<>(
config.getInputTopic(),
new CallInfoSchema(),
kafkaConf))
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<CallInfo>() {
private static final long serialVersionUID = 1L;
private long currentMaxTimestamp = 0;
@Override
public long extractTimestamp(CallInfo element, long previousElementTimestamp) {
long timestamp = element.getEndTime();
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
}
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp);
}
}).setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(environment);
tableEnv.createTemporaryView(
"CallInfo",
callInfoStream,
"traceType, traceId, elapsedTime, deviceId, callResult, streamId, streamType, "
+ "userId, groupId, childNum, recordType, startTime, endTime, dateStr, "
+ "successNum, failNum, rowtime.rowtime");
搞了半天,原来是我代码写错了。这个问题坑就坑在它不会报错,也没有任何提示信息。
最后
以上就是欣慰乌龟为你收集整理的Flink不触发窗口计算又不报错的问题定位的全部内容,希望文章能够帮你解决Flink不触发窗口计算又不报错的问题定位所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复