我是靠谱客的博主 辛勤朋友,最近开发中收集的这篇文章主要介绍Flink实时仓库-DWD层(流量域)模板代码简介工具类流量域独立访客实时表跳出事务事实表,觉得挺不错的,现在分享给大家,希望可以做个参考。
概述
简介
对于DWD层开发起到简介的作用
工具类
时间工具类
public class DateFormatUtil {
private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd");
private static final DateTimeFormatter dtfFull = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
public static Long toTs(String dtStr, boolean isFull) {
LocalDateTime localDateTime = null;
if (!isFull) {
dtStr = dtStr + " 00:00:00";
}
localDateTime = LocalDateTime.parse(dtStr, dtfFull);
return localDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli();
}
public static Long toTs(String dtStr) {
return toTs(dtStr, false);
}
public static String toDate(Long ts) {
Date dt = new Date(ts);
LocalDateTime localDateTime = LocalDateTime.ofInstant(dt.toInstant(), ZoneId.systemDefault());
return dtf.format(localDateTime);
}
public static String toYmdHms(Long ts) {
Date dt = new Date(ts);
LocalDateTime localDateTime = LocalDateTime.ofInstant(dt.toInstant(), ZoneId.systemDefault());
return dtfFull.format(localDateTime);
}
public static void main(String[] args) {
System.out.println(toYmdHms(System.currentTimeMillis()));
}
}
kafka工具类
public class KafkaUtil {
static String BOOTSTRAP_SERVERS = "master:9092, node1:9092, node2:9092";
static String DEFAULT_TOPIC = "default_topic";
/**
* 根据主题还有消费者组得到消费者
* @param topic
* @param groupId
* @return
*/
public static FlinkKafkaConsumer<String> getKafkaConsumer(String topic, String groupId) {
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", BOOTSTRAP_SERVERS);
prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(topic,
//由于默认的解码器,如果字符串为空的时候他会保存,所以自定义一个
new KafkaDeserializationSchema<String>() {
@Override
public boolean isEndOfStream(String nextElement) {
return false;
}
@Override
public String deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
if(record == null || record.value() == null) {
return "";
}
return new String(record.value());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}, prop);
return consumer;
}
/**
* 根据主题得到生产者
* @param topic
* @return
*/
public static FlinkKafkaProducer<String> getKafkaProducer(String topic) {
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", BOOTSTRAP_SERVERS);
prop.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60 * 15 * 1000 + "");
FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<String>(DEFAULT_TOPIC, new KafkaSerializationSchema<String>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(String jsonStr, @Nullable Long timestamp) {
return new ProducerRecord<byte[], byte[]>(topic, jsonStr.getBytes());
}
}, prop,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
return producer;
}
}
流量域
前置知识
键值状态
public class Status {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> initData = env.fromElements("a", "a", "b", "c");
//先装换成键值状态才能够使用状态变量
SingleOutputStreamOperator<Tuple2<String, Integer>> mapKeyData = initData.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String dataItem) throws Exception {
return Tuple2.of(dataItem, 1);
}
});
SingleOutputStreamOperator<Tuple2<String, Integer>> firstViewDtState = mapKeyData.keyBy(data -> data.f0)
.process(new KeyedProcessFunction<String, Tuple2<String, Integer>, Tuple2<String, Integer>>() {
ValueState<String> firstViewDtState;
@Override
public void open(Configuration param) throws Exception {
super.open(param);
firstViewDtState = getRuntimeContext().getState(new ValueStateDescriptor<String>(
"firstViewDtState", String.class
));
}
@Override
public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
String keyStatus = firstViewDtState.value();
if (keyStatus == null) {
//用key的值保存进去
firstViewDtState.update(value.f0);
} else {
System.out.println(value + " 重复来了");
}
}
});
firstViewDtState.print();
env.execute();
}
}
结果
(a,1) 重复来了
侧输出流
public class OutputTagTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> initData = env.fromElements("a", "a", "b", "c");
// 1 定义侧输出流
OutputTag<String> a = new OutputTag<String>("a") {
};
OutputTag<String> b = new OutputTag<String>("b") {
};
SingleOutputStreamOperator<String> processData = initData.process(new ProcessFunction<String, String>() {
@Override
public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
if (value.equals("a")) {
//写到侧输出流a
ctx.output(a, value);
} else if (value.equals("b")) {
//写到侧输出流b
ctx.output(b, value);
} else {
//写出到主流
out.collect(value);
}
}
});
//得到a的侧输出流
processData.getSideOutput(a).print("a>>");
//得到b的侧输出流
processData.getSideOutput(b).print("b>>");
//主流数据输出
processData.print("主流>>");
env.execute();
}
}
结果
b>>:4> b
a>>:3> a
a>>:2> a
主流>>:5> c
日志数据分流
首先为什么要对于启动日志进行分流?因为可以对于不同的日志类型分别分析,在分析的时候可以减少数据量
分流实现的程序示例
public class BaseLogApp {
public static void main(String[] args) throws Exception {
// TODO 1. 初始化环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
// TODO 2. 启用状态后端
env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
env.setRestartStrategy(RestartStrategies
.failureRateRestart(10,
Time.of(3L, TimeUnit.DAYS),
Time.of(1L, TimeUnit.MINUTES)));
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage("hdfs://master:8020/gmall/ck");
System.setProperty("HADOOP_USER_NAME", "bigdata");
// TODO 3. 从 Kafka 读取主流数据
String topic = "topic_logg";
String groupId = "base_log_consumer";
DataStreamSource<String> source = env.addSource(KafkaUtil.getKafkaConsumer(topic, groupId));
// TODO 4. 数据清洗,转换结构
// 4.1 定义错误侧输出流
OutputTag<String> dirtyStreamTag = new OutputTag<String>("dirtyStream") {
};
SingleOutputStreamOperator<String> cleanedStream = source.process(
new ProcessFunction<String, String>() {
@Override
public void processElement(String jsonStr, Context ctx, Collector<String> out) throws Exception {
try {
JSONObject jsonObj = JSON.parseObject(jsonStr);
out.collect(jsonStr);
} catch (Exception e) {
ctx.output(dirtyStreamTag, jsonStr);
}
}
}
);
// 4.2 将脏数据写出到 Kafka 指定主题
DataStream<String> dirtyStream = cleanedStream.getSideOutput(dirtyStreamTag);
String dirtyTopic = "dirty_data";
dirtyStream.addSink(KafkaUtil.getKafkaProducer(dirtyTopic));
// 4.3 转换主流数据结构 jsonStr -> jsonObj
SingleOutputStreamOperator<JSONObject> mappedStream = cleanedStream.map(JSON::parseObject);
// TODO 5. 新老访客状态标记修复
// 5.1 按照 mid 对数据进行分组
KeyedStream<JSONObject, String> keyedStream = mappedStream.keyBy(r -> r.getJSONObject("common").getString("mid"));
// 5.2 新老访客状态标记修复
SingleOutputStreamOperator<JSONObject> fixedStream = keyedStream.process(
new KeyedProcessFunction<String, JSONObject, JSONObject>() {
ValueState<String> firstViewDtState;
@Override
public void open(Configuration param) throws Exception {
super.open(param);
firstViewDtState = getRuntimeContext().getState(new ValueStateDescriptor<String>(
"lastLoginDt", String.class
));
}
@Override
public void processElement(JSONObject jsonObj, Context ctx, Collector<JSONObject> out) throws Exception {
String isNew = jsonObj.getJSONObject("common").getString("is_new");
String firstViewDt = firstViewDtState.value();
Long ts = jsonObj.getLong("ts");
String dt = DateFormatUtil.toDate(ts);
if ("1".equals(isNew)) {
if (firstViewDt == null) {
firstViewDtState.update(dt);
} else {
if (!firstViewDt.equals(dt)) {
isNew = "0";
jsonObj.getJSONObject("common").put("is_new", isNew);
}
}
} else {
if (firstViewDt == null) {
// 将首次访问日期置为昨日
String yesterday = DateFormatUtil.toDate(ts - 1000 * 60 * 60 * 24);
firstViewDtState.update(yesterday);
}
}
out.collect(jsonObj);
}
}
);
// TODO 6. 分流
// 6.1 定义启动、曝光、动作、错误侧输出流
OutputTag<String> startTag = new OutputTag<String>("startTag") {
};
OutputTag<String> displayTag = new OutputTag<String>("displayTag") {
};
OutputTag<String> actionTag = new OutputTag<String>("actionTag") {
};
OutputTag<String> errorTag = new OutputTag<String>("errorTag") {
};
// 6.2 分流
SingleOutputStreamOperator<String> separatedStream = fixedStream.process(
new ProcessFunction<JSONObject, String>() {
@Override
public void processElement(JSONObject jsonObj, Context context, Collector<String> out) throws Exception {
// 6.2.1 收集错误数据
JSONObject error = jsonObj.getJSONObject("err");
if (error != null) {
context.output(errorTag, jsonObj.toJSONString());
}
// 剔除 "err" 字段
jsonObj.remove("err");
// 6.2.2 收集启动数据
JSONObject start = jsonObj.getJSONObject("start");
if (start != null) {
context.output(startTag, jsonObj.toJSONString());
} else {
// 获取 "page" 字段
JSONObject page = jsonObj.getJSONObject("page");
// 获取 "common" 字段
JSONObject common = jsonObj.getJSONObject("common");
// 获取 "ts"
Long ts = jsonObj.getLong("ts");
// 6.2.3 收集曝光数据
JSONArray displays = jsonObj.getJSONArray("displays");
if (displays != null) {
for (int i = 0; i < displays.size(); i++) {
JSONObject display = displays.getJSONObject(i);
JSONObject displayObj = new JSONObject();
displayObj.put("display", display);
displayObj.put("common", common);
displayObj.put("page", page);
displayObj.put("ts", ts);
context.output(displayTag, displayObj.toJSONString());
}
}
// 6.2.4 收集动作数据
JSONArray actions = jsonObj.getJSONArray("actions");
if (actions != null) {
for (int i = 0; i < actions.size(); i++) {
JSONObject action = actions.getJSONObject(i);
JSONObject actionObj = new JSONObject();
actionObj.put("action", action);
actionObj.put("common", common);
actionObj.put("page", page);
actionObj.put("ts", ts);
context.output(actionTag, actionObj.toJSONString());
}
}
// 6.2.5 收集页面数据
jsonObj.remove("displays");
jsonObj.remove("actions");
out.collect(jsonObj.toJSONString());
}
}
}
);
// 打印主流和各侧输出流查看分流效果
separatedStream.print("page>>>");
separatedStream.getSideOutput(startTag).print("start!!!");
separatedStream.getSideOutput(displayTag).print("display@@@");
separatedStream.getSideOutput(actionTag).print("action###");
separatedStream.getSideOutput(errorTag).print("error$$$");
// TODO 7. 将数据输出到 Kafka 的不同主题
// // 7.1 提取各侧输出流
// DataStream<String> startDS = separatedStream.getSideOutput(startTag);
// DataStream<String> displayDS = separatedStream.getSideOutput(displayTag);
// DataStream<String> actionDS = separatedStream.getSideOutput(actionTag);
// DataStream<String> errorDS = separatedStream.getSideOutput(errorTag);
//
// // 7.2 定义不同日志输出到 Kafka 的主题名称
// String page_topic = "dwd_traffic_page_log";
// String start_topic = "dwd_traffic_start_log";
// String display_topic = "dwd_traffic_display_log";
// String action_topic = "dwd_traffic_action_log";
// String error_topic = "dwd_traffic_error_log";
//
// separatedStream.addSink(KafkaUtil.getKafkaProducer(page_topic));
// startDS.addSink(KafkaUtil.getKafkaProducer(start_topic));
// displayDS.addSink(KafkaUtil.getKafkaProducer(display_topic));
// actionDS.addSink(KafkaUtil.getKafkaProducer(action_topic));
// errorDS.addSink(KafkaUtil.getKafkaProducer(error_topic));
env.execute();
}
}
初始日志数据
{
"actions": [
{
"action_id": "get_coupon",
"item": "3",
"item_type": "coupon_id",
"ts": 1592134620882
}
],
"common": {
"ar": "110000",
"ba": "Oneplus",
"ch": "oppo",
"is_new": "0",
"md": "Oneplus 7",
"mid": "mid_232163",
"os": "Android 10.0",
"uid": "898",
"vc": "v2.1.134"
},
"displays": [
{
"display_type": "query",
"item": "18",
"item_type": "sku_id",
"order": 1,
"pos_id": 2
},
{
"display_type": "promotion",
"item": "13",
"item_type": "sku_id",
"order": 2,
"pos_id": 4
},
{
"display_type": "query",
"item": "29",
"item_type": "sku_id",
"order": 3,
"pos_id": 2
},
{
"display_type": "query",
"item": "7",
"item_type": "sku_id",
"order": 4,
"pos_id": 4
},
{
"display_type": "query",
"item": "19",
"item_type": "sku_id",
"order": 5,
"pos_id": 4
},
{
"display_type": "promotion",
"item": "22",
"item_type": "sku_id",
"order": 6,
"pos_id": 4
},
{
"display_type": "query",
"item": "25",
"item_type": "sku_id",
"order": 7,
"pos_id": 5
}
],
"page": {
"during_time": 11764,
"item": "31",
"item_type": "sku_id",
"last_page_id": "good_list",
"page_id": "good_detail",
"source_type": "query"
},
"ts": 1592134615000
}
分流后数据
主流(启动日志数据)
{
"common": {
"ar": "310000",
"uid": "780",
"os": "Android 11.0",
"ch": "oppo",
"is_new": "1",
"md": "Huawei P30",
"mid": "mid_619118",
"vc": "v2.1.134",
"ba": "Huawei"
},
"page": {
"page_id": "payment",
"item": "33,25",
"during_time": 5439,
"item_type": "sku_ids",
"last_page_id": "trade"
},
"ts": 1592134614000
}
曝光日志数据
{
"common": {
"ar": "110000",
"uid": "914",
"os": "iOS 13.3.1",
"ch": "Appstore",
"is_new": "0",
"md": "iPhone Xs Max",
"mid": "mid_319090",
"vc": "v2.1.134",
"ba": "iPhone"
},
"display": {
"display_type": "query",
"item": "8",
"item_type": "sku_id",
"pos_id": 1,
"order": 7
},
"page": {
"page_id": "home",
"during_time": 4328
},
"ts": 1592134610000
}
行为日志数据
{
"common": {
"ar": "230000",
"uid": "257",
"os": "Android 11.0",
"ch": "vivo",
"is_new": "0",
"md": "Xiaomi 9",
"mid": "mid_227516",
"vc": "v2.0.1",
"ba": "Xiaomi"
},
"action": {
"item": "35",
"action_id": "cart_minus_num",
"item_type": "sku_id",
"ts": 1592134612791
},
"page": {
"page_id": "cart",
"during_time": 3583,
"last_page_id": "good_detail"
},
"ts": 1592134611000
}
错误日志数据
{
"common": {
"ar": "110000",
"uid": "780",
"os": "Android 11.0",
"ch": "huawei",
"is_new": "1",
"md": "Xiaomi 9",
"mid": "mid_503805",
"vc": "v2.1.134",
"ba": "Xiaomi"
},
"err": {
"msg": " Exception in thread \ java.net.SocketTimeoutException\n \tat com.atgugu.gmall2020.mock.bean.log.AppError.main(AppError.java:xxxxxx)",
"error_code": 1245
},
"page": {
"page_id": "home",
"during_time": 17642
},
"displays": [
{
"display_type": "activity",
"item": "1",
"item_type": "activity_id",
"pos_id": 2,
"order": 1
},
{
"display_type": "query",
"item": "2",
"item_type": "sku_id",
"pos_id": 5,
"order": 2
},
{
"display_type": "query",
"item": "6",
"item_type": "sku_id",
"pos_id": 2,
"order": 3
},
{
"display_type": "query",
"item": "8",
"item_type": "sku_id",
"pos_id": 4,
"order": 4
},
{
"display_type": "query",
"item": "6",
"item_type": "sku_id",
"pos_id": 3,
"order": 5
},
{
"display_type": "promotion",
"item": "29",
"item_type": "sku_id",
"pos_id": 3,
"order": 6
}
],
"ts": 1592134611000
}
最终的结果
独立访客实时表
前提知识
状态TTL
public class StateTTl {
public static void main(String[] args) throws Exception {
// 环境准备
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//初始化数据
DataStreamSource<String> initData = env.fromElements("a", "a", "b", "c", "b");
SingleOutputStreamOperator<Tuple2<String,Integer>> targetData = initData.map(new MapFunction<String, Tuple2<String,Integer>>() {
@Override
public Tuple2<String,Integer> map(String value) throws Exception {
if (value.equals("a")) {
Thread.sleep(2000);
}
return Tuple2.of(value,1);
}
});
//测试状态过期
SingleOutputStreamOperator<Tuple2<String, Integer>> testTTL = targetData.keyBy(data -> data.f0)
.process(new KeyedProcessFunction<String, Tuple2<String, Integer>, Tuple2<String, Integer>>() {
private ValueState<String> lastVisitDt;
@Override
public void open(Configuration paramenters) throws Exception {
super.open(paramenters);
ValueStateDescriptor<String> valueStateDescriptor =
new ValueStateDescriptor<>("testTTL", String.class);
valueStateDescriptor.enableTimeToLive(
StateTtlConfig
.newBuilder(Time.seconds(1L))
// 设置在创建和更新状态时更新存活时间
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.build()
);
//对于keyStatus设置ttl
lastVisitDt = getRuntimeContext().getState(valueStateDescriptor);
}
@Override
public void processElement(Tuple2<String, Integer> value, KeyedProcessFunction<String, Tuple2<String, Integer>, Tuple2<String, Integer>>.Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
String valueState = lastVisitDt.value();
if (valueState == null) {
System.out.println(value);
lastVisitDt.update(value.f0);
} else {
System.out.println(value.f0 + " 有对应的状态了");
}
}
});
testTTL.print();
env.execute();
}
}
输出结果
(a,1)
(a,1)
(b,1)
(c,1)
b 有对应的状态了
结果可以看出第一个a经过一秒以后删除了
代码实现
public class DwdTrafficUniqueVisitorDetail {
public static void main(String[] args) throws Exception {
// TODO 1. 环境准备
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
// TODO 2. 状态后端设置
env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(30 * 1000L);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, Time.days(1), Time.minutes(1)
));
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage(
"hdfs://master:8020/ck"
);
System.setProperty("HADOOP_USER_NAME", "bigdata");
// TODO 3. 从 kafka dwd_traffic_page_log 主题读取日志数据,封装为流
String topic = "dwd_traffic_page_log";
String groupId = "dwd_traffic_user_jump_detail";
FlinkKafkaConsumer<String> kafkaConsumer = KafkaUtil.getKafkaConsumer(topic, groupId);
DataStreamSource<String> pageLog = env.addSource(kafkaConsumer);
// TODO 4. 转换结构
SingleOutputStreamOperator<JSONObject> mappedStream = pageLog.map(JSON::parseObject);
// TODO 5. 过滤 last_page_id 不为 null 的数据
SingleOutputStreamOperator<JSONObject> firstPageStream = mappedStream.filter(
jsonObj -> jsonObj
.getJSONObject("page")
.getString("last_page_id") == null
);
// TODO 6. 按照 mid 分组
KeyedStream<JSONObject, String> keyedStream = firstPageStream
.keyBy(jsonObj -> jsonObj.getJSONObject("common").getString("mid"));
// TODO 7. 通过 Flink 状态编程过滤独立访客记录
SingleOutputStreamOperator<JSONObject> filteredStream = keyedStream.filter(
new RichFilterFunction<JSONObject>() {
private ValueState<String> lastVisitDt;
@Override
public void open(Configuration paramenters) throws Exception {
super.open(paramenters);
ValueStateDescriptor<String> valueStateDescriptor =
new ValueStateDescriptor<>("last_visit_dt", String.class);
valueStateDescriptor.enableTimeToLive(
StateTtlConfig
.newBuilder(Time.days(1L))
// 设置在创建和更新状态时更新存活时间
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.build()
);
lastVisitDt = getRuntimeContext().getState(valueStateDescriptor);
}
@Override
public boolean filter(JSONObject jsonObj) throws Exception {
String visitDt = DateFormatUtil.toDate(jsonObj.getLong("ts"));
String lastDt = lastVisitDt.value();
if (lastDt == null || !lastDt.equals(visitDt)) {
lastVisitDt.update(visitDt);
return true;
}
return false;
}
}
);
// TODO 8. 将独立访客数据写入
// Kafka dwd_traffic_unique_visitor_detail 主题
String targetTopic = "dwd_traffic_unique_visitor_detail";
FlinkKafkaProducer<String> kafkaProducer = KafkaUtil.getKafkaProducer(targetTopic);
filteredStream.map(JSONAware::toJSONString).addSink(kafkaProducer);
// TODO 9. 启动任务
env.execute();
}
}
结果
{
"common": {
"ar": "310000",
"uid": "201",
"os": "Android 11.0",
"ch": "vivo",
"is_new": "1",
"md": "Xiaomi Mix2 ",
"mid": "mid_994205",
"vc": "v2.0.1",
"ba": "Xiaomi"
},
"page": {
"page_id": "home",
"during_time": 19868
},
"ts": 1592133292000
}
跳出事务事实表
前提知识
要保证数据是有序的首先使用的是水位线,然后使用窗口的操作才能使数据有序
FlinkCEP的使用
public class CEPTest {
public static void main(String[] args) throws Exception {
//TODO 1得到执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//TODO 注意在多个分区的时候 因为并行度多个的话 ,watermark还是不会被提升的,还是触发不了计算
env.setParallelism(1);
//TODO 2得到网络端口的数据 源
DataStreamSource<String> socketTextStream = env.socketTextStream("master", 9998);
//TODO 3假设我们得到的数据是a 1,第一个是数据第二个是秒
SingleOutputStreamOperator<Tuple2<String, Long>> mappedStream = socketTextStream.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String value) throws Exception {
String[] s = value.split(" ");
return Tuple2.of(s[0], Integer.parseInt(s[1])*1000L);
}
});
//TODO 4为了保证数据的有序就得使用水位线
SingleOutputStreamOperator<Tuple2<String, Long>> withWatermarkStream = mappedStream.assignTimestampsAndWatermarks(
WatermarkStrategy
//设置延迟0秒
.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(0))
.withTimestampAssigner(
new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> initData, long recordTimestamp) {
// System.out.println(initData);
return initData.f1;
}
}
)
);
//TODO 5 分组以后对于每一个组进行分组开窗
KeyedStream<Tuple2<String, Long>, String> keyedStream = withWatermarkStream.keyBy(data -> {
return data.f0;
});
// keyedStream.print("keyedStream");
//TODO 6这里主要是使用CEP匹配到自己想要的数据,我们这里想要的是a 和 b在10秒内是连续的,否则就是超时了
Pattern<Tuple2<String, Long>, Tuple2<String, Long>> pattern = Pattern.<Tuple2<String, Long>>begin("first").where(
new SimpleCondition<Tuple2<String, Long>>() {
@Override
public boolean filter(Tuple2<String, Long> firstData) throws Exception {
return firstData.f0.equals("a");
}
}
).next("second").where(
new SimpleCondition<Tuple2<String, Long>>() {
@Override
public boolean filter(Tuple2<String, Long> secondData) throws Exception {
return secondData.f0.equals("a");
}
}
// 上文调用了同名 Time 类,此处需要使用全类名
).within(org.apache.flink.streaming.api.windowing.time.Time.seconds(10L));
PatternStream<Tuple2<String, Long>> patternStream = CEP.pattern(keyedStream, pattern);
//超时侧输出流
OutputTag<Tuple2<String, Long>> timeoutTag = new OutputTag<Tuple2<String, Long>>("timeoutTag") {
};
SingleOutputStreamOperator<Tuple2<String, Long>> flatSelectStream = patternStream.flatSelect(
timeoutTag,
new PatternFlatTimeoutFunction<Tuple2<String, Long>, Tuple2<String, Long>>() {
@Override
public void timeout(Map<String, List<Tuple2<String, Long>>> pattern, long timeoutTimestamp, Collector<Tuple2<String, Long>> out) throws Exception {
//得到超时的数据
Tuple2<String, Long> first = pattern.get("first").get(0);
out.collect(first);
}
},
new PatternFlatSelectFunction<Tuple2<String, Long>, Tuple2<String, Long>>() {
@Override
public void flatSelect(Map<String, List<Tuple2<String, Long>>> pattern, Collector<Tuple2<String, Long>> out) throws Exception {
//得到了匹配到的数据
Tuple2<String, Long> second = pattern.get("second").get(0);
out.collect(second);
}
}
);
DataStream<Tuple2<String, Long>> timeOutDStream = flatSelectStream.getSideOutput(timeoutTag);
//打印超时的数据
timeOutDStream.print("timeOut");
//打印正常匹配到的数据
flatSelectStream.print("flatSelectStream");
env.execute();
}
}
测试数据
[bigdata@master createdata]$ nc -lk 9998
a 2
a 3
a 15
得到结果
flatSelectStream> (a,3000)
timeOut> (a,3000)
结论在a 2,a 3的时候还没有关闭10秒的窗口,输入15的时候关闭窗口,由于a 2,a 3然后a 3是符合10秒内的数据的所以输出,然后是输入a 15,那么前面的a 3就是超时的数据
Union
public class UnionTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Integer> dataStream1 = env.fromElements(1, 2, 3);
DataStreamSource<Integer> dataStream2 = env.fromElements(4, 5, 6);
dataStream1.union(dataStream2)
.print();
env.execute();
}
}
输出
2> 5
3> 6
1> 4
11> 3
9> 1
10> 2
实战代码
public class DwdTrafficUserJumpDetail {
public static void main(String[] args) throws Exception {
// TODO 1. 环境准备
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
// TODO 2. 状态后端设置
env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(30 * 1000L);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, Time.days(1), Time.minutes(1)
));
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage(
"hdfs://master:8020/ck"
);
System.setProperty("HADOOP_USER_NAME", "bigdata");
// TODO 3. 从 kafka dwd_traffic_page_log 主题读取日志数据,封装为流
String topic = "dwd_traffic_page_log";
String groupId = "dwd_traffic_user_jump_detail";
FlinkKafkaConsumer<String> kafkaConsumer = KafkaUtil.getKafkaConsumer(topic, groupId);
DataStreamSource<String> pageLog = env.addSource(kafkaConsumer);
// 测试数据
/*DataStream<String> kafkaDS = env
.fromElements(
"{"common":{"mid":"101"},"page":{"page_id":"home"},"ts":10000} ",
"{"common":{"mid":"102"},"page":{"page_id":"home"},"ts":12000}",
"{"common":{"mid":"102"},"page":{"page_id":"good_list","last_page_id":" +
""home"},"ts":15000} ",
"{"common":{"mid":"102"},"page":{"page_id":"good_list","last_page_id":" +
""detail"},"ts":30000} "
);*/
// TODO 4. 转换结构
SingleOutputStreamOperator<JSONObject> mappedStream = pageLog.map(JSON::parseObject);
// TODO 5. 设置水位线,用于用户跳出统计
SingleOutputStreamOperator<JSONObject> withWatermarkStream = mappedStream.assignTimestampsAndWatermarks(
WatermarkStrategy
.<JSONObject>forMonotonousTimestamps()
.withTimestampAssigner(
new SerializableTimestampAssigner<JSONObject>() {
@Override
public long extractTimestamp(JSONObject jsonObj, long recordTimestamp) {
return jsonObj.getLong("ts");
}
}
)
);
// TODO 6. 按照 mid 分组
KeyedStream<JSONObject, String> keyedStream = withWatermarkStream.keyBy(jsonOjb -> jsonOjb.getJSONObject("common").getString("mid"));
// TODO 7. 定义 CEP 匹配规则
Pattern<JSONObject, JSONObject> pattern = Pattern.<JSONObject>begin("first").where(
new SimpleCondition<JSONObject>() {
@Override
public boolean filter(JSONObject jsonObj) throws Exception {
String lastPageId = jsonObj.getJSONObject("page").getString("last_page_id");
return lastPageId == null;
}
}
).next("second").where(
new SimpleCondition<JSONObject>() {
@Override
public boolean filter(JSONObject jsonObj) throws Exception {
String lastPageId = jsonObj.getJSONObject("page").getString("last_page_id");
return lastPageId == null;
}
}
// 上文调用了同名 Time 类,此处需要使用全类名
).within(org.apache.flink.streaming.api.windowing.time.Time.seconds(10L));
// TODO 8. 把 Pattern 应用到流上
PatternStream<JSONObject> patternStream = CEP.pattern(keyedStream, pattern);
// TODO 9. 提取匹配上的事件以及超时事件
OutputTag<JSONObject> timeoutTag = new OutputTag<JSONObject>("timeoutTag") {
};
SingleOutputStreamOperator<JSONObject> flatSelectStream = patternStream.flatSelect(
timeoutTag,
new PatternFlatTimeoutFunction<JSONObject, JSONObject>() {
@Override
public void timeout(Map<String, List<JSONObject>> pattern, long timeoutTimestamp, Collector<JSONObject> out) throws Exception {
JSONObject element = pattern.get("first").get(0);
out.collect(element);
}
},
new PatternFlatSelectFunction<JSONObject, JSONObject>() {
@Override
public void flatSelect(Map<String, List<JSONObject>> pattern, Collector<JSONObject> out) throws Exception {
JSONObject element = pattern.get("first").get(0);
out.collect(element);
}
}
);
DataStream<JSONObject> timeOutDStream = flatSelectStream.getSideOutput(timeoutTag);
// TODO 11. 合并两个流并将数据写出到 Kafka
DataStream<JSONObject> unionDStream = flatSelectStream.union(timeOutDStream);
String targetTopic = "dwd_traffic_user_jump_detail";
FlinkKafkaProducer<String> kafkaProducer = KafkaUtil.getKafkaProducer(targetTopic);
unionDStream .map(JSONAware::toJSONString)
.addSink(kafkaProducer);
env.execute();
}
}
最后
以上就是辛勤朋友为你收集整理的Flink实时仓库-DWD层(流量域)模板代码简介工具类流量域独立访客实时表跳出事务事实表的全部内容,希望文章能够帮你解决Flink实时仓库-DWD层(流量域)模板代码简介工具类流量域独立访客实时表跳出事务事实表所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复