我是靠谱客的博主 爱笑柜子,最近开发中收集的这篇文章主要介绍java Flink(九)窗口模式之计数窗口 以及窗口模式中 触发器trigger 移除器evictor等用法介绍以及延迟处理,觉得挺不错的,现在分享给大家,希望可以做个参考。
概述
窗口模式的介绍见上文,本文简单记录计数窗口模式的使用以及触发器trigger 移除器evictor等用法
.trigger() 名为触发器,定义window什么时候关闭,触发计算并输出结果
.evictor()名为移除器,定义移除某些数据的逻辑
.allowed;ateness(),允许处理迟到的数据
.sideOutputLateDate,将迟到的数据放入侧输出流
.getSideOutput,获取侧输出流
计数窗口模式demo
package window;
import beans.SensorReading;
import org.apache.commons.collections.IteratorUtils;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
public class WindowTest1 {
public static void main(String[] args) throws Exception{
//获取当前执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度
env.setParallelism(1);
//从文件读取数据 ***demo模拟的文本流处理,其实不该用文本流处理,因为读取文本根本不许要分桶,速度过快了
DataStream<String> inputStream = env.readTextFile("D:\idle\FlinkTest\src\main\resources");
//转换成SensorReading类型
DataStream<SensorReading> dataStream = inputStream.map(new MapFunction<String, SensorReading>() {
public SensorReading map(String line) throws Exception {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
}
});
//开计数窗口测试
dataStream.keyBy("id")
.countWindow(10,2)//滑动模式,每次滑动两个
.aggregate(new MyAvgTemp()).print();//输出按两个每次输出
env.execute();
}
public static class MyAvgTemp implements AggregateFunction<SensorReading, Tuple2<Double, Integer>, Double>{
public Tuple2<Double, Integer> createAccumulator() {
return new Tuple2<Double, Integer>(0.0, 0);
}
public Tuple2<Double, Integer> add(SensorReading sensorReading, Tuple2<Double, Integer> doubleIntegerTuple2) {
return new Tuple2<Double, Integer>();
}
public Double getResult(Tuple2<Double, Integer> doubleIntegerTuple2) {
return null;
}
public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> doubleIntegerTuple2, Tuple2<Double, Integer> acc1) {
return null;
}
}
}
延迟处理
OutputTag<SensorReading> outputTag = new OutputTag<SensorReading>("late") {
};
//开计数窗口测试
SingleOutputStreamOperator<SensorReading> sumStream = dataStream.keyBy("id")
.timeWindow(Time.seconds(15))
.allowedLateness(Time.minutes(1)) //允许延迟1分钟,等到结束时间吗,先计算输出一次结果,然后不会关闭窗口。
//一分钟以内进入的数据还会进入结果继续计算
.sideOutputLateData(outputTag)//将超过1分钟的数据再进行处理,输出到测边流
.sum("");
sumStream.getSideOutput(outputTag).print("late"); //把侧边流中超时的计算结果进行运算输出
env.execute();
最后
以上就是爱笑柜子为你收集整理的java Flink(九)窗口模式之计数窗口 以及窗口模式中 触发器trigger 移除器evictor等用法介绍以及延迟处理的全部内容,希望文章能够帮你解决java Flink(九)窗口模式之计数窗口 以及窗口模式中 触发器trigger 移除器evictor等用法介绍以及延迟处理所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复