我是靠谱客的博主 辛勤朋友,这篇文章主要介绍Flink实时仓库-DWD层(流量域)模板代码简介工具类流量域独立访客实时表跳出事务事实表,现在分享给大家,希望可以做个参考。

简介

对于DWD层开发起到简介的作用

工具类

时间工具类

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class DateFormatUtil { private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd"); private static final DateTimeFormatter dtfFull = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); public static Long toTs(String dtStr, boolean isFull) { LocalDateTime localDateTime = null; if (!isFull) { dtStr = dtStr + " 00:00:00"; } localDateTime = LocalDateTime.parse(dtStr, dtfFull); return localDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli(); } public static Long toTs(String dtStr) { return toTs(dtStr, false); } public static String toDate(Long ts) { Date dt = new Date(ts); LocalDateTime localDateTime = LocalDateTime.ofInstant(dt.toInstant(), ZoneId.systemDefault()); return dtf.format(localDateTime); } public static String toYmdHms(Long ts) { Date dt = new Date(ts); LocalDateTime localDateTime = LocalDateTime.ofInstant(dt.toInstant(), ZoneId.systemDefault()); return dtfFull.format(localDateTime); } public static void main(String[] args) { System.out.println(toYmdHms(System.currentTimeMillis())); } }

kafka工具类 

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public class KafkaUtil { static String BOOTSTRAP_SERVERS = "master:9092, node1:9092, node2:9092"; static String DEFAULT_TOPIC = "default_topic"; /** * 根据主题还有消费者组得到消费者 * @param topic * @param groupId * @return */ public static FlinkKafkaConsumer<String> getKafkaConsumer(String topic, String groupId) { Properties prop = new Properties(); prop.setProperty("bootstrap.servers", BOOTSTRAP_SERVERS); prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(topic, //由于默认的解码器,如果字符串为空的时候他会保存,所以自定义一个 new KafkaDeserializationSchema<String>() { @Override public boolean isEndOfStream(String nextElement) { return false; } @Override public String deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception { if(record == null || record.value() == null) { return ""; } return new String(record.value()); } @Override public TypeInformation<String> getProducedType() { return BasicTypeInfo.STRING_TYPE_INFO; } }, prop); return consumer; } /** * 根据主题得到生产者 * @param topic * @return */ public static FlinkKafkaProducer<String> getKafkaProducer(String topic) { Properties prop = new Properties(); prop.setProperty("bootstrap.servers", BOOTSTRAP_SERVERS); prop.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60 * 15 * 1000 + ""); FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<String>(DEFAULT_TOPIC, new KafkaSerializationSchema<String>() { @Override public ProducerRecord<byte[], byte[]> serialize(String jsonStr, @Nullable Long timestamp) { return new ProducerRecord<byte[], byte[]>(topic, jsonStr.getBytes()); } }, prop, FlinkKafkaProducer.Semantic.EXACTLY_ONCE); return producer; } }

流量域

前置知识

键值状态

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class Status { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> initData = env.fromElements("a", "a", "b", "c"); //先装换成键值状态才能够使用状态变量 SingleOutputStreamOperator<Tuple2<String, Integer>> mapKeyData = initData.map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String dataItem) throws Exception { return Tuple2.of(dataItem, 1); } }); SingleOutputStreamOperator<Tuple2<String, Integer>> firstViewDtState = mapKeyData.keyBy(data -> data.f0) .process(new KeyedProcessFunction<String, Tuple2<String, Integer>, Tuple2<String, Integer>>() { ValueState<String> firstViewDtState; @Override public void open(Configuration param) throws Exception { super.open(param); firstViewDtState = getRuntimeContext().getState(new ValueStateDescriptor<String>( "firstViewDtState", String.class )); } @Override public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception { String keyStatus = firstViewDtState.value(); if (keyStatus == null) { //用key的值保存进去 firstViewDtState.update(value.f0); } else { System.out.println(value + " 重复来了"); } } }); firstViewDtState.print(); env.execute(); } }

结果

复制代码
1
(a,1) 重复来了

侧输出流

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class OutputTagTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> initData = env.fromElements("a", "a", "b", "c"); // 1 定义侧输出流 OutputTag<String> a = new OutputTag<String>("a") { }; OutputTag<String> b = new OutputTag<String>("b") { }; SingleOutputStreamOperator<String> processData = initData.process(new ProcessFunction<String, String>() { @Override public void processElement(String value, Context ctx, Collector<String> out) throws Exception { if (value.equals("a")) { //写到侧输出流a ctx.output(a, value); } else if (value.equals("b")) { //写到侧输出流b ctx.output(b, value); } else { //写出到主流 out.collect(value); } } }); //得到a的侧输出流 processData.getSideOutput(a).print("a>>"); //得到b的侧输出流 processData.getSideOutput(b).print("b>>"); //主流数据输出 processData.print("主流>>"); env.execute(); } }

结果

复制代码
1
2
3
4
b>>:4> b a>>:3> a a>>:2> a 主流>>:5> c

日志数据分流

首先为什么要对于启动日志进行分流?因为可以对于不同的日志类型分别分析,在分析的时候可以减少数据量

分流实现的程序示例

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
public class BaseLogApp { public static void main(String[] args) throws Exception { // TODO 1. 初始化环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); // TODO 2. 启用状态后端 env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L); env.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION ); env.setRestartStrategy(RestartStrategies .failureRateRestart(10, Time.of(3L, TimeUnit.DAYS), Time.of(1L, TimeUnit.MINUTES))); env.setStateBackend(new HashMapStateBackend()); env.getCheckpointConfig().setCheckpointStorage("hdfs://master:8020/gmall/ck"); System.setProperty("HADOOP_USER_NAME", "bigdata"); // TODO 3. 从 Kafka 读取主流数据 String topic = "topic_logg"; String groupId = "base_log_consumer"; DataStreamSource<String> source = env.addSource(KafkaUtil.getKafkaConsumer(topic, groupId)); // TODO 4. 数据清洗,转换结构 // 4.1 定义错误侧输出流 OutputTag<String> dirtyStreamTag = new OutputTag<String>("dirtyStream") { }; SingleOutputStreamOperator<String> cleanedStream = source.process( new ProcessFunction<String, String>() { @Override public void processElement(String jsonStr, Context ctx, Collector<String> out) throws Exception { try { JSONObject jsonObj = JSON.parseObject(jsonStr); out.collect(jsonStr); } catch (Exception e) { ctx.output(dirtyStreamTag, jsonStr); } } } ); // 4.2 将脏数据写出到 Kafka 指定主题 DataStream<String> dirtyStream = cleanedStream.getSideOutput(dirtyStreamTag); String dirtyTopic = "dirty_data"; dirtyStream.addSink(KafkaUtil.getKafkaProducer(dirtyTopic)); // 4.3 转换主流数据结构 jsonStr -> jsonObj SingleOutputStreamOperator<JSONObject> mappedStream = cleanedStream.map(JSON::parseObject); // TODO 5. 新老访客状态标记修复 // 5.1 按照 mid 对数据进行分组 KeyedStream<JSONObject, String> keyedStream = mappedStream.keyBy(r -> r.getJSONObject("common").getString("mid")); // 5.2 新老访客状态标记修复 SingleOutputStreamOperator<JSONObject> fixedStream = keyedStream.process( new KeyedProcessFunction<String, JSONObject, JSONObject>() { ValueState<String> firstViewDtState; @Override public void open(Configuration param) throws Exception { super.open(param); firstViewDtState = getRuntimeContext().getState(new ValueStateDescriptor<String>( "lastLoginDt", String.class )); } @Override public void processElement(JSONObject jsonObj, Context ctx, Collector<JSONObject> out) throws Exception { String isNew = jsonObj.getJSONObject("common").getString("is_new"); String firstViewDt = firstViewDtState.value(); Long ts = jsonObj.getLong("ts"); String dt = DateFormatUtil.toDate(ts); if ("1".equals(isNew)) { if (firstViewDt == null) { firstViewDtState.update(dt); } else { if (!firstViewDt.equals(dt)) { isNew = "0"; jsonObj.getJSONObject("common").put("is_new", isNew); } } } else { if (firstViewDt == null) { // 将首次访问日期置为昨日 String yesterday = DateFormatUtil.toDate(ts - 1000 * 60 * 60 * 24); firstViewDtState.update(yesterday); } } out.collect(jsonObj); } } ); // TODO 6. 分流 // 6.1 定义启动、曝光、动作、错误侧输出流 OutputTag<String> startTag = new OutputTag<String>("startTag") { }; OutputTag<String> displayTag = new OutputTag<String>("displayTag") { }; OutputTag<String> actionTag = new OutputTag<String>("actionTag") { }; OutputTag<String> errorTag = new OutputTag<String>("errorTag") { }; // 6.2 分流 SingleOutputStreamOperator<String> separatedStream = fixedStream.process( new ProcessFunction<JSONObject, String>() { @Override public void processElement(JSONObject jsonObj, Context context, Collector<String> out) throws Exception { // 6.2.1 收集错误数据 JSONObject error = jsonObj.getJSONObject("err"); if (error != null) { context.output(errorTag, jsonObj.toJSONString()); } // 剔除 "err" 字段 jsonObj.remove("err"); // 6.2.2 收集启动数据 JSONObject start = jsonObj.getJSONObject("start"); if (start != null) { context.output(startTag, jsonObj.toJSONString()); } else { // 获取 "page" 字段 JSONObject page = jsonObj.getJSONObject("page"); // 获取 "common" 字段 JSONObject common = jsonObj.getJSONObject("common"); // 获取 "ts" Long ts = jsonObj.getLong("ts"); // 6.2.3 收集曝光数据 JSONArray displays = jsonObj.getJSONArray("displays"); if (displays != null) { for (int i = 0; i < displays.size(); i++) { JSONObject display = displays.getJSONObject(i); JSONObject displayObj = new JSONObject(); displayObj.put("display", display); displayObj.put("common", common); displayObj.put("page", page); displayObj.put("ts", ts); context.output(displayTag, displayObj.toJSONString()); } } // 6.2.4 收集动作数据 JSONArray actions = jsonObj.getJSONArray("actions"); if (actions != null) { for (int i = 0; i < actions.size(); i++) { JSONObject action = actions.getJSONObject(i); JSONObject actionObj = new JSONObject(); actionObj.put("action", action); actionObj.put("common", common); actionObj.put("page", page); actionObj.put("ts", ts); context.output(actionTag, actionObj.toJSONString()); } } // 6.2.5 收集页面数据 jsonObj.remove("displays"); jsonObj.remove("actions"); out.collect(jsonObj.toJSONString()); } } } ); // 打印主流和各侧输出流查看分流效果 separatedStream.print("page>>>"); separatedStream.getSideOutput(startTag).print("start!!!"); separatedStream.getSideOutput(displayTag).print("display@@@"); separatedStream.getSideOutput(actionTag).print("action###"); separatedStream.getSideOutput(errorTag).print("error$$$"); // TODO 7. 将数据输出到 Kafka 的不同主题 // // 7.1 提取各侧输出流 // DataStream<String> startDS = separatedStream.getSideOutput(startTag); // DataStream<String> displayDS = separatedStream.getSideOutput(displayTag); // DataStream<String> actionDS = separatedStream.getSideOutput(actionTag); // DataStream<String> errorDS = separatedStream.getSideOutput(errorTag); // // // 7.2 定义不同日志输出到 Kafka 的主题名称 // String page_topic = "dwd_traffic_page_log"; // String start_topic = "dwd_traffic_start_log"; // String display_topic = "dwd_traffic_display_log"; // String action_topic = "dwd_traffic_action_log"; // String error_topic = "dwd_traffic_error_log"; // // separatedStream.addSink(KafkaUtil.getKafkaProducer(page_topic)); // startDS.addSink(KafkaUtil.getKafkaProducer(start_topic)); // displayDS.addSink(KafkaUtil.getKafkaProducer(display_topic)); // actionDS.addSink(KafkaUtil.getKafkaProducer(action_topic)); // errorDS.addSink(KafkaUtil.getKafkaProducer(error_topic)); env.execute(); } }

初始日志数据

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
{ "actions": [ { "action_id": "get_coupon", "item": "3", "item_type": "coupon_id", "ts": 1592134620882 } ], "common": { "ar": "110000", "ba": "Oneplus", "ch": "oppo", "is_new": "0", "md": "Oneplus 7", "mid": "mid_232163", "os": "Android 10.0", "uid": "898", "vc": "v2.1.134" }, "displays": [ { "display_type": "query", "item": "18", "item_type": "sku_id", "order": 1, "pos_id": 2 }, { "display_type": "promotion", "item": "13", "item_type": "sku_id", "order": 2, "pos_id": 4 }, { "display_type": "query", "item": "29", "item_type": "sku_id", "order": 3, "pos_id": 2 }, { "display_type": "query", "item": "7", "item_type": "sku_id", "order": 4, "pos_id": 4 }, { "display_type": "query", "item": "19", "item_type": "sku_id", "order": 5, "pos_id": 4 }, { "display_type": "promotion", "item": "22", "item_type": "sku_id", "order": 6, "pos_id": 4 }, { "display_type": "query", "item": "25", "item_type": "sku_id", "order": 7, "pos_id": 5 } ], "page": { "during_time": 11764, "item": "31", "item_type": "sku_id", "last_page_id": "good_list", "page_id": "good_detail", "source_type": "query" }, "ts": 1592134615000 }

分流后数据

主流(启动日志数据)

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
{ "common": { "ar": "310000", "uid": "780", "os": "Android 11.0", "ch": "oppo", "is_new": "1", "md": "Huawei P30", "mid": "mid_619118", "vc": "v2.1.134", "ba": "Huawei" }, "page": { "page_id": "payment", "item": "33,25", "during_time": 5439, "item_type": "sku_ids", "last_page_id": "trade" }, "ts": 1592134614000 }

曝光日志数据

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
{ "common": { "ar": "110000", "uid": "914", "os": "iOS 13.3.1", "ch": "Appstore", "is_new": "0", "md": "iPhone Xs Max", "mid": "mid_319090", "vc": "v2.1.134", "ba": "iPhone" }, "display": { "display_type": "query", "item": "8", "item_type": "sku_id", "pos_id": 1, "order": 7 }, "page": { "page_id": "home", "during_time": 4328 }, "ts": 1592134610000 }

行为日志数据

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
{ "common": { "ar": "230000", "uid": "257", "os": "Android 11.0", "ch": "vivo", "is_new": "0", "md": "Xiaomi 9", "mid": "mid_227516", "vc": "v2.0.1", "ba": "Xiaomi" }, "action": { "item": "35", "action_id": "cart_minus_num", "item_type": "sku_id", "ts": 1592134612791 }, "page": { "page_id": "cart", "during_time": 3583, "last_page_id": "good_detail" }, "ts": 1592134611000 }

错误日志数据

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
{ "common": { "ar": "110000", "uid": "780", "os": "Android 11.0", "ch": "huawei", "is_new": "1", "md": "Xiaomi 9", "mid": "mid_503805", "vc": "v2.1.134", "ba": "Xiaomi" }, "err": { "msg": " Exception in thread \ java.net.SocketTimeoutException\n \tat com.atgugu.gmall2020.mock.bean.log.AppError.main(AppError.java:xxxxxx)", "error_code": 1245 }, "page": { "page_id": "home", "during_time": 17642 }, "displays": [ { "display_type": "activity", "item": "1", "item_type": "activity_id", "pos_id": 2, "order": 1 }, { "display_type": "query", "item": "2", "item_type": "sku_id", "pos_id": 5, "order": 2 }, { "display_type": "query", "item": "6", "item_type": "sku_id", "pos_id": 2, "order": 3 }, { "display_type": "query", "item": "8", "item_type": "sku_id", "pos_id": 4, "order": 4 }, { "display_type": "query", "item": "6", "item_type": "sku_id", "pos_id": 3, "order": 5 }, { "display_type": "promotion", "item": "29", "item_type": "sku_id", "pos_id": 3, "order": 6 } ], "ts": 1592134611000 }

最终的结果

独立访客实时表

前提知识

状态TTL

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class StateTTl { public static void main(String[] args) throws Exception { // 环境准备 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //初始化数据 DataStreamSource<String> initData = env.fromElements("a", "a", "b", "c", "b"); SingleOutputStreamOperator<Tuple2<String,Integer>> targetData = initData.map(new MapFunction<String, Tuple2<String,Integer>>() { @Override public Tuple2<String,Integer> map(String value) throws Exception { if (value.equals("a")) { Thread.sleep(2000); } return Tuple2.of(value,1); } }); //测试状态过期 SingleOutputStreamOperator<Tuple2<String, Integer>> testTTL = targetData.keyBy(data -> data.f0) .process(new KeyedProcessFunction<String, Tuple2<String, Integer>, Tuple2<String, Integer>>() { private ValueState<String> lastVisitDt; @Override public void open(Configuration paramenters) throws Exception { super.open(paramenters); ValueStateDescriptor<String> valueStateDescriptor = new ValueStateDescriptor<>("testTTL", String.class); valueStateDescriptor.enableTimeToLive( StateTtlConfig .newBuilder(Time.seconds(1L)) // 设置在创建和更新状态时更新存活时间 .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .build() ); //对于keyStatus设置ttl lastVisitDt = getRuntimeContext().getState(valueStateDescriptor); } @Override public void processElement(Tuple2<String, Integer> value, KeyedProcessFunction<String, Tuple2<String, Integer>, Tuple2<String, Integer>>.Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception { String valueState = lastVisitDt.value(); if (valueState == null) { System.out.println(value); lastVisitDt.update(value.f0); } else { System.out.println(value.f0 + " 有对应的状态了"); } } }); testTTL.print(); env.execute(); } }

输出结果

复制代码
1
2
3
4
5
(a,1) (a,1) (b,1) (c,1) b 有对应的状态了

结果可以看出第一个a经过一秒以后删除了

代码实现

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
public class DwdTrafficUniqueVisitorDetail { public static void main(String[] args) throws Exception { // TODO 1. 环境准备 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); // TODO 2. 状态后端设置 env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointTimeout(30 * 1000L); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L); env.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION ); env.setRestartStrategy(RestartStrategies.failureRateRestart( 3, Time.days(1), Time.minutes(1) )); env.setStateBackend(new HashMapStateBackend()); env.getCheckpointConfig().setCheckpointStorage( "hdfs://master:8020/ck" ); System.setProperty("HADOOP_USER_NAME", "bigdata"); // TODO 3. 从 kafka dwd_traffic_page_log 主题读取日志数据,封装为流 String topic = "dwd_traffic_page_log"; String groupId = "dwd_traffic_user_jump_detail"; FlinkKafkaConsumer<String> kafkaConsumer = KafkaUtil.getKafkaConsumer(topic, groupId); DataStreamSource<String> pageLog = env.addSource(kafkaConsumer); // TODO 4. 转换结构 SingleOutputStreamOperator<JSONObject> mappedStream = pageLog.map(JSON::parseObject); // TODO 5. 过滤 last_page_id 不为 null 的数据 SingleOutputStreamOperator<JSONObject> firstPageStream = mappedStream.filter( jsonObj -> jsonObj .getJSONObject("page") .getString("last_page_id") == null ); // TODO 6. 按照 mid 分组 KeyedStream<JSONObject, String> keyedStream = firstPageStream .keyBy(jsonObj -> jsonObj.getJSONObject("common").getString("mid")); // TODO 7. 通过 Flink 状态编程过滤独立访客记录 SingleOutputStreamOperator<JSONObject> filteredStream = keyedStream.filter( new RichFilterFunction<JSONObject>() { private ValueState<String> lastVisitDt; @Override public void open(Configuration paramenters) throws Exception { super.open(paramenters); ValueStateDescriptor<String> valueStateDescriptor = new ValueStateDescriptor<>("last_visit_dt", String.class); valueStateDescriptor.enableTimeToLive( StateTtlConfig .newBuilder(Time.days(1L)) // 设置在创建和更新状态时更新存活时间 .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .build() ); lastVisitDt = getRuntimeContext().getState(valueStateDescriptor); } @Override public boolean filter(JSONObject jsonObj) throws Exception { String visitDt = DateFormatUtil.toDate(jsonObj.getLong("ts")); String lastDt = lastVisitDt.value(); if (lastDt == null || !lastDt.equals(visitDt)) { lastVisitDt.update(visitDt); return true; } return false; } } ); // TODO 8. 将独立访客数据写入 // Kafka dwd_traffic_unique_visitor_detail 主题 String targetTopic = "dwd_traffic_unique_visitor_detail"; FlinkKafkaProducer<String> kafkaProducer = KafkaUtil.getKafkaProducer(targetTopic); filteredStream.map(JSONAware::toJSONString).addSink(kafkaProducer); // TODO 9. 启动任务 env.execute(); } }

结果

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
{ "common": { "ar": "310000", "uid": "201", "os": "Android 11.0", "ch": "vivo", "is_new": "1", "md": "Xiaomi Mix2 ", "mid": "mid_994205", "vc": "v2.0.1", "ba": "Xiaomi" }, "page": { "page_id": "home", "during_time": 19868 }, "ts": 1592133292000 }

跳出事务事实表

前提知识

要保证数据是有序的首先使用的是水位线,然后使用窗口的操作才能使数据有序

FlinkCEP的使用

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
public class CEPTest { public static void main(String[] args) throws Exception { //TODO 1得到执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //TODO 注意在多个分区的时候 因为并行度多个的话 ,watermark还是不会被提升的,还是触发不了计算 env.setParallelism(1); //TODO 2得到网络端口的数据 源 DataStreamSource<String> socketTextStream = env.socketTextStream("master", 9998); //TODO 3假设我们得到的数据是a 1,第一个是数据第二个是秒 SingleOutputStreamOperator<Tuple2<String, Long>> mappedStream = socketTextStream.map(new MapFunction<String, Tuple2<String, Long>>() { @Override public Tuple2<String, Long> map(String value) throws Exception { String[] s = value.split(" "); return Tuple2.of(s[0], Integer.parseInt(s[1])*1000L); } }); //TODO 4为了保证数据的有序就得使用水位线 SingleOutputStreamOperator<Tuple2<String, Long>> withWatermarkStream = mappedStream.assignTimestampsAndWatermarks( WatermarkStrategy //设置延迟0秒 .<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(0)) .withTimestampAssigner( new SerializableTimestampAssigner<Tuple2<String, Long>>() { @Override public long extractTimestamp(Tuple2<String, Long> initData, long recordTimestamp) { // System.out.println(initData); return initData.f1; } } ) ); //TODO 5 分组以后对于每一个组进行分组开窗 KeyedStream<Tuple2<String, Long>, String> keyedStream = withWatermarkStream.keyBy(data -> { return data.f0; }); // keyedStream.print("keyedStream"); //TODO 6这里主要是使用CEP匹配到自己想要的数据,我们这里想要的是a 和 b在10秒内是连续的,否则就是超时了 Pattern<Tuple2<String, Long>, Tuple2<String, Long>> pattern = Pattern.<Tuple2<String, Long>>begin("first").where( new SimpleCondition<Tuple2<String, Long>>() { @Override public boolean filter(Tuple2<String, Long> firstData) throws Exception { return firstData.f0.equals("a"); } } ).next("second").where( new SimpleCondition<Tuple2<String, Long>>() { @Override public boolean filter(Tuple2<String, Long> secondData) throws Exception { return secondData.f0.equals("a"); } } // 上文调用了同名 Time 类,此处需要使用全类名 ).within(org.apache.flink.streaming.api.windowing.time.Time.seconds(10L)); PatternStream<Tuple2<String, Long>> patternStream = CEP.pattern(keyedStream, pattern); //超时侧输出流 OutputTag<Tuple2<String, Long>> timeoutTag = new OutputTag<Tuple2<String, Long>>("timeoutTag") { }; SingleOutputStreamOperator<Tuple2<String, Long>> flatSelectStream = patternStream.flatSelect( timeoutTag, new PatternFlatTimeoutFunction<Tuple2<String, Long>, Tuple2<String, Long>>() { @Override public void timeout(Map<String, List<Tuple2<String, Long>>> pattern, long timeoutTimestamp, Collector<Tuple2<String, Long>> out) throws Exception { //得到超时的数据 Tuple2<String, Long> first = pattern.get("first").get(0); out.collect(first); } }, new PatternFlatSelectFunction<Tuple2<String, Long>, Tuple2<String, Long>>() { @Override public void flatSelect(Map<String, List<Tuple2<String, Long>>> pattern, Collector<Tuple2<String, Long>> out) throws Exception { //得到了匹配到的数据 Tuple2<String, Long> second = pattern.get("second").get(0); out.collect(second); } } ); DataStream<Tuple2<String, Long>> timeOutDStream = flatSelectStream.getSideOutput(timeoutTag); //打印超时的数据 timeOutDStream.print("timeOut"); //打印正常匹配到的数据 flatSelectStream.print("flatSelectStream"); env.execute(); } }

测试数据

复制代码
1
2
3
4
[bigdata@master createdata]$ nc -lk 9998 a 2 a 3 a 15

得到结果

复制代码
1
2
3
flatSelectStream> (a,3000) timeOut> (a,3000)

结论在a 2,a 3的时候还没有关闭10秒的窗口,输入15的时候关闭窗口,由于a 2,a 3然后a 3是符合10秒内的数据的所以输出,然后是输入a 15,那么前面的a 3就是超时的数据

Union

复制代码
1
2
3
4
5
6
7
8
9
10
11
public class UnionTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Integer> dataStream1 = env.fromElements(1, 2, 3); DataStreamSource<Integer> dataStream2 = env.fromElements(4, 5, 6); dataStream1.union(dataStream2) .print(); env.execute(); } }

输出

复制代码
1
2
3
4
5
6
2> 5 3> 6 1> 4 11> 3 9> 1 10> 2

实战代码

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
public class DwdTrafficUserJumpDetail { public static void main(String[] args) throws Exception { // TODO 1. 环境准备 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); // TODO 2. 状态后端设置 env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointTimeout(30 * 1000L); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L); env.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION ); env.setRestartStrategy(RestartStrategies.failureRateRestart( 3, Time.days(1), Time.minutes(1) )); env.setStateBackend(new HashMapStateBackend()); env.getCheckpointConfig().setCheckpointStorage( "hdfs://master:8020/ck" ); System.setProperty("HADOOP_USER_NAME", "bigdata"); // TODO 3. 从 kafka dwd_traffic_page_log 主题读取日志数据,封装为流 String topic = "dwd_traffic_page_log"; String groupId = "dwd_traffic_user_jump_detail"; FlinkKafkaConsumer<String> kafkaConsumer = KafkaUtil.getKafkaConsumer(topic, groupId); DataStreamSource<String> pageLog = env.addSource(kafkaConsumer); // 测试数据 /*DataStream<String> kafkaDS = env .fromElements( "{"common":{"mid":"101"},"page":{"page_id":"home"},"ts":10000} ", "{"common":{"mid":"102"},"page":{"page_id":"home"},"ts":12000}", "{"common":{"mid":"102"},"page":{"page_id":"good_list","last_page_id":" + ""home"},"ts":15000} ", "{"common":{"mid":"102"},"page":{"page_id":"good_list","last_page_id":" + ""detail"},"ts":30000} " );*/ // TODO 4. 转换结构 SingleOutputStreamOperator<JSONObject> mappedStream = pageLog.map(JSON::parseObject); // TODO 5. 设置水位线,用于用户跳出统计 SingleOutputStreamOperator<JSONObject> withWatermarkStream = mappedStream.assignTimestampsAndWatermarks( WatermarkStrategy .<JSONObject>forMonotonousTimestamps() .withTimestampAssigner( new SerializableTimestampAssigner<JSONObject>() { @Override public long extractTimestamp(JSONObject jsonObj, long recordTimestamp) { return jsonObj.getLong("ts"); } } ) ); // TODO 6. 按照 mid 分组 KeyedStream<JSONObject, String> keyedStream = withWatermarkStream.keyBy(jsonOjb -> jsonOjb.getJSONObject("common").getString("mid")); // TODO 7. 定义 CEP 匹配规则 Pattern<JSONObject, JSONObject> pattern = Pattern.<JSONObject>begin("first").where( new SimpleCondition<JSONObject>() { @Override public boolean filter(JSONObject jsonObj) throws Exception { String lastPageId = jsonObj.getJSONObject("page").getString("last_page_id"); return lastPageId == null; } } ).next("second").where( new SimpleCondition<JSONObject>() { @Override public boolean filter(JSONObject jsonObj) throws Exception { String lastPageId = jsonObj.getJSONObject("page").getString("last_page_id"); return lastPageId == null; } } // 上文调用了同名 Time 类,此处需要使用全类名 ).within(org.apache.flink.streaming.api.windowing.time.Time.seconds(10L)); // TODO 8. 把 Pattern 应用到流上 PatternStream<JSONObject> patternStream = CEP.pattern(keyedStream, pattern); // TODO 9. 提取匹配上的事件以及超时事件 OutputTag<JSONObject> timeoutTag = new OutputTag<JSONObject>("timeoutTag") { }; SingleOutputStreamOperator<JSONObject> flatSelectStream = patternStream.flatSelect( timeoutTag, new PatternFlatTimeoutFunction<JSONObject, JSONObject>() { @Override public void timeout(Map<String, List<JSONObject>> pattern, long timeoutTimestamp, Collector<JSONObject> out) throws Exception { JSONObject element = pattern.get("first").get(0); out.collect(element); } }, new PatternFlatSelectFunction<JSONObject, JSONObject>() { @Override public void flatSelect(Map<String, List<JSONObject>> pattern, Collector<JSONObject> out) throws Exception { JSONObject element = pattern.get("first").get(0); out.collect(element); } } ); DataStream<JSONObject> timeOutDStream = flatSelectStream.getSideOutput(timeoutTag); // TODO 11. 合并两个流并将数据写出到 Kafka DataStream<JSONObject> unionDStream = flatSelectStream.union(timeOutDStream); String targetTopic = "dwd_traffic_user_jump_detail"; FlinkKafkaProducer<String> kafkaProducer = KafkaUtil.getKafkaProducer(targetTopic); unionDStream .map(JSONAware::toJSONString) .addSink(kafkaProducer); env.execute(); } }

最后

以上就是辛勤朋友最近收集整理的关于Flink实时仓库-DWD层(流量域)模板代码简介工具类流量域独立访客实时表跳出事务事实表的全部内容,更多相关Flink实时仓库-DWD层(流量域)模板代码简介工具类流量域独立访客实时表跳出事务事实表内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(74)

评论列表共有 0 条评论

立即
投稿
返回
顶部