我是靠谱客的博主 爱笑柜子,最近开发中收集的这篇文章主要介绍java Flink(九)窗口模式之计数窗口 以及窗口模式中 触发器trigger 移除器evictor等用法介绍以及延迟处理,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

窗口模式的介绍见上文,本文简单记录计数窗口模式的使用以及触发器trigger 移除器evictor等用法

.trigger() 名为触发器,定义window什么时候关闭,触发计算并输出结果

.evictor()名为移除器,定义移除某些数据的逻辑

.allowed;ateness(),允许处理迟到的数据

.sideOutputLateDate,将迟到的数据放入侧输出流

.getSideOutput,获取侧输出流

 

计数窗口模式demo

package window;

import beans.SensorReading;
import org.apache.commons.collections.IteratorUtils;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

public class WindowTest1 {
    public static void main(String[] args) throws Exception{
        //获取当前执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度
        env.setParallelism(1);
        //从文件读取数据 ***demo模拟的文本流处理,其实不该用文本流处理,因为读取文本根本不许要分桶,速度过快了
        DataStream<String> inputStream = env.readTextFile("D:\idle\FlinkTest\src\main\resources");

        //转换成SensorReading类型
        DataStream<SensorReading> dataStream = inputStream.map(new MapFunction<String, SensorReading>() {
            public SensorReading map(String line) throws Exception {
                String[] fields = line.split(",");
                return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
            }
        });
        //开计数窗口测试
        dataStream.keyBy("id")
                .countWindow(10,2)//滑动模式,每次滑动两个
                .aggregate(new MyAvgTemp()).print();//输出按两个每次输出

        env.execute();
    }

    public static class MyAvgTemp implements AggregateFunction<SensorReading, Tuple2<Double, Integer>, Double>{
        public Tuple2<Double, Integer> createAccumulator() {
            return new Tuple2<Double, Integer>(0.0, 0);
        }

        public Tuple2<Double, Integer> add(SensorReading sensorReading, Tuple2<Double, Integer> doubleIntegerTuple2) {
            return new Tuple2<Double, Integer>();
        }

        public Double getResult(Tuple2<Double, Integer> doubleIntegerTuple2) {
            return null;
        }

        public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> doubleIntegerTuple2, Tuple2<Double, Integer> acc1) {
            return null;
        }
    }
}

延迟处理 

OutputTag<SensorReading> outputTag = new OutputTag<SensorReading>("late") {
        };
        //开计数窗口测试
        SingleOutputStreamOperator<SensorReading> sumStream = dataStream.keyBy("id")
                .timeWindow(Time.seconds(15))
                .allowedLateness(Time.minutes(1)) //允许延迟1分钟,等到结束时间吗,先计算输出一次结果,然后不会关闭窗口。
                //一分钟以内进入的数据还会进入结果继续计算
                .sideOutputLateData(outputTag)//将超过1分钟的数据再进行处理,输出到测边流
                .sum("");
        sumStream.getSideOutput(outputTag).print("late"); //把侧边流中超时的计算结果进行运算输出
        env.execute();

 

最后

以上就是爱笑柜子为你收集整理的java Flink(九)窗口模式之计数窗口 以及窗口模式中 触发器trigger 移除器evictor等用法介绍以及延迟处理的全部内容,希望文章能够帮你解决java Flink(九)窗口模式之计数窗口 以及窗口模式中 触发器trigger 移除器evictor等用法介绍以及延迟处理所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(55)

评论列表共有 0 条评论

立即
投稿
返回
顶部