说明,就是数据大于2 或者 时间到凌晨的时候就触发执行。
针对下面的业务做的简单的案例:
实际的意思是正常的情况下正常触发,在到达凌晨的时候直接触发,然后缓存清零

package application;
import com.alibaba.fastjson.JSONObject;
import operator.DayProcessOperator;
import operator.DayTrigger;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import java.util.Properties;
/**
* todo 数据分析
* todo http://docs-aliyun.cn-hangzhou.oss.aliyun-inc.com/assets/pic/49939/cn_zh/1487929553566/man-screen-shot.jpg
*
* todo 指标维度: 1,活跃用户 2,新增用户 3,登陆会员 4,新注册用户 5,启动次数
*
* todo 思路1: 每秒活跃用户数量都要存储到hbase,使用窗口函数,窗口大小为1s
* todo 思路2:每天凌晨状态清零,开始重新累加计算,所以需要
*
*/
public class Data_analysis_demo {
public static void main(String[] args) {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
//todo 获取kafka的配置属性
args = new String[]{"--input-topic", "topn_test", "--bootstrap.servers", "node2.hadoop:9092,node3.hadoop:9092",
"--zookeeper.connect", "node1.hadoop:2181,node2.hadoop:2181,node3.hadoop:2181", "--group.id", "cc2"};
ParameterTool parameterTool = ParameterTool.fromArgs(args);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Properties sendPros = parameterTool.getProperties();
Properties pros = parameterTool.getProperties();
//todo 指定输入数据为kafka topic
DataStream<String> kafkaDstream = env.addSource(new FlinkKafkaConsumer010<String>(
pros.getProperty("input-topic"),
new SimpleStringSchema(),
pros).setStartFromLatest()
);
//todo 先转成JSON
DataStream<JSONObject> str2JsonDstream = kafkaDstream.map(new MapFunction<String, JSONObject>() {
@Override
public JSONObject map(String input) throws Exception {
JSONObject inputJson = null;
try {
inputJson = JSONObject.parseObject(input);
} catch (Exception e) {
e.printStackTrace();
}
return inputJson;
}
});
//todo 使用window算子
str2JsonDstream.keyBy(value -> value.getString("appKey"))
.window(TumblingProcessingTimeWindows.of(Time.milliseconds(2000)))
.trigger(new DayTrigger())
.process(new DayProcessOperator())
.print();
try {
env.execute("开始执行....................");
} catch (Exception e) {
e.printStackTrace();
}
}
}
package operator; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; import org.apache.flink.streaming.api.windowing.windows.Window; public class DayTrigger<JSONObject> extends Trigger<com.alibaba.fastjson.JSONObject, Window> { private ReducingStateDescriptor<Long> countStateDescriptor = new ReducingStateDescriptor("counter", new Sum(), LongSerializer.INSTANCE); ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("total", Integer.class); //每个元素都会触发 @Override public TriggerResult onElement(com.alibaba.fastjson.JSONObject element, long timestamp, Window window, TriggerContext ctx) throws Exception { System.out.println("先打印timestamp = " + timestamp+",本地时间:"+System.currentTimeMillis()); ValueState<Integer> sumState = ctx.getPartitionedState(stateDescriptor); if (null == sumState.value()) { sumState.update(0); } sumState.update(1 + sumState.value()); if (sumState.value() >= 2) { //这里可以选择手动处理状态 // 默认的trigger发送是TriggerResult.FIRE 不会清除窗口数据 System.out.println("触发.....数据条数为:" + (1 + sumState.value())); return TriggerResult.FIRE; } Long todayZeroPointTimestamps = getTodayZeroPointTimestamps(); System.out.println("todayZeroPointTimestamps = " + todayZeroPointTimestamps); if (timestamp >= todayZeroPointTimestamps) { return TriggerResult.FIRE_AND_PURGE; } if (timestamp >= window.maxTimestamp()) { return TriggerResult.FIRE_AND_PURGE; } else { return TriggerResult.CONTINUE; } // ReducingState<Long> countState = ctx.getPartitionedState(countStateDescriptor); // countState.add(1L); // System.out.println("打印countState.get() = " + countState.get()); // if (countState.get() >= 3) { // System.out.println("触发了............. " ); // //这里是触发并且清除 // return TriggerResult.FIRE_AND_PURGE; // } // return TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime(long time, Window window, TriggerContext triggerContext) throws Exception { return TriggerResult.CONTINUE; } @Override public TriggerResult onEventTime(long time, Window window, TriggerContext triggerContext) throws Exception { return TriggerResult.CONTINUE; } @Override public void clear(Window window, TriggerContext ctx) throws Exception { // System.out.println("清理窗口状态 窗口内保存值为" + ctx.getPartitionedState(stateDescriptor).value()); ctx.getPartitionedState(stateDescriptor).clear(); } class Sum implements ReduceFunction<Long> { @Override public Long reduce(Long value1, Long value2) throws Exception { return value1 + value2; } } public static Long getTodayZeroPointTimestamps() { long now = System.currentTimeMillis(); long daySecond = 60 * 60 * 24 * 1000; long dayTime = now - (now + 8 * 3600 * 1000) % daySecond + 1 * daySecond; return dayTime; } public static void main(String[] args) { Long todayZeroPointTimestamps = getTodayZeroPointTimestamps(); System.out.println("todayZeroPointTimestamps = " + todayZeroPointTimestamps); } }
最后
以上就是结实胡萝卜最近收集整理的关于Flink Trigger代码实战的全部内容,更多相关Flink内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复