概述
说明,就是数据大于2 或者 时间到凌晨的时候就触发执行。
针对下面的业务做的简单的案例:
实际的意思是正常的情况下正常触发,在到达凌晨的时候直接触发,然后缓存清零
package application; import com.alibaba.fastjson.JSONObject; import operator.DayProcessOperator; import operator.DayTrigger; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; import java.util.Properties; /** * todo 数据分析 * todo http://docs-aliyun.cn-hangzhou.oss.aliyun-inc.com/assets/pic/49939/cn_zh/1487929553566/man-screen-shot.jpg * * todo 指标维度: 1,活跃用户 2,新增用户 3,登陆会员 4,新注册用户 5,启动次数 * * todo 思路1: 每秒活跃用户数量都要存储到hbase,使用窗口函数,窗口大小为1s * todo 思路2:每天凌晨状态清零,开始重新累加计算,所以需要 * */ public class Data_analysis_demo { public static void main(String[] args) { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); //todo 获取kafka的配置属性 args = new String[]{"--input-topic", "topn_test", "--bootstrap.servers", "node2.hadoop:9092,node3.hadoop:9092", "--zookeeper.connect", "node1.hadoop:2181,node2.hadoop:2181,node3.hadoop:2181", "--group.id", "cc2"}; ParameterTool parameterTool = ParameterTool.fromArgs(args); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); Properties sendPros = parameterTool.getProperties(); Properties pros = parameterTool.getProperties(); //todo 指定输入数据为kafka topic DataStream<String> kafkaDstream = env.addSource(new FlinkKafkaConsumer010<String>( pros.getProperty("input-topic"), new SimpleStringSchema(), pros).setStartFromLatest() ); //todo 先转成JSON DataStream<JSONObject> str2JsonDstream = kafkaDstream.map(new MapFunction<String, JSONObject>() { @Override public JSONObject map(String input) throws Exception { JSONObject inputJson = null; try { inputJson = JSONObject.parseObject(input); } catch (Exception e) { e.printStackTrace(); } return inputJson; } }); //todo 使用window算子 str2JsonDstream.keyBy(value -> value.getString("appKey")) .window(TumblingProcessingTimeWindows.of(Time.milliseconds(2000))) .trigger(new DayTrigger()) .process(new DayProcessOperator()) .print(); try { env.execute("开始执行...................."); } catch (Exception e) { e.printStackTrace(); } } }
package operator; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; import org.apache.flink.streaming.api.windowing.windows.Window; public class DayTrigger<JSONObject> extends Trigger<com.alibaba.fastjson.JSONObject, Window> { private ReducingStateDescriptor<Long> countStateDescriptor = new ReducingStateDescriptor("counter", new Sum(), LongSerializer.INSTANCE); ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("total", Integer.class); //每个元素都会触发 @Override public TriggerResult onElement(com.alibaba.fastjson.JSONObject element, long timestamp, Window window, TriggerContext ctx) throws Exception { System.out.println("先打印timestamp = " + timestamp+",本地时间:"+System.currentTimeMillis()); ValueState<Integer> sumState = ctx.getPartitionedState(stateDescriptor); if (null == sumState.value()) { sumState.update(0); } sumState.update(1 + sumState.value()); if (sumState.value() >= 2) { //这里可以选择手动处理状态 // 默认的trigger发送是TriggerResult.FIRE 不会清除窗口数据 System.out.println("触发.....数据条数为:" + (1 + sumState.value())); return TriggerResult.FIRE; } Long todayZeroPointTimestamps = getTodayZeroPointTimestamps(); System.out.println("todayZeroPointTimestamps = " + todayZeroPointTimestamps); if (timestamp >= todayZeroPointTimestamps) { return TriggerResult.FIRE_AND_PURGE; } if (timestamp >= window.maxTimestamp()) { return TriggerResult.FIRE_AND_PURGE; } else { return TriggerResult.CONTINUE; } // ReducingState<Long> countState = ctx.getPartitionedState(countStateDescriptor); // countState.add(1L); // System.out.println("打印countState.get() = " + countState.get()); // if (countState.get() >= 3) { // System.out.println("触发了............. " ); // //这里是触发并且清除 // return TriggerResult.FIRE_AND_PURGE; // } // return TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime(long time, Window window, TriggerContext triggerContext) throws Exception { return TriggerResult.CONTINUE; } @Override public TriggerResult onEventTime(long time, Window window, TriggerContext triggerContext) throws Exception { return TriggerResult.CONTINUE; } @Override public void clear(Window window, TriggerContext ctx) throws Exception { // System.out.println("清理窗口状态 窗口内保存值为" + ctx.getPartitionedState(stateDescriptor).value()); ctx.getPartitionedState(stateDescriptor).clear(); } class Sum implements ReduceFunction<Long> { @Override public Long reduce(Long value1, Long value2) throws Exception { return value1 + value2; } } public static Long getTodayZeroPointTimestamps() { long now = System.currentTimeMillis(); long daySecond = 60 * 60 * 24 * 1000; long dayTime = now - (now + 8 * 3600 * 1000) % daySecond + 1 * daySecond; return dayTime; } public static void main(String[] args) { Long todayZeroPointTimestamps = getTodayZeroPointTimestamps(); System.out.println("todayZeroPointTimestamps = " + todayZeroPointTimestamps); } }
最后
以上就是结实胡萝卜为你收集整理的Flink Trigger代码实战的全部内容,希望文章能够帮你解决Flink Trigger代码实战所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复