我是靠谱客的博主 开朗往事,这篇文章主要介绍基于Flink的JDBC插入Nested结构数据到Clickhouse,现在分享给大家,希望可以做个参考。

1.Clickhouse的Nested数据结构

Nested是一种嵌套表结构。一张数据表,可以定义任意多个嵌套类型字段,但每个字段的嵌套层级只支持一级,即嵌套表内不能继续使用嵌套类型。对于简单场景的层级关系或关联关系,使用嵌套类型也是一种不错的选择。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
create table test_nested( uid Int8 , name String , props Nested( pid Int8, pnames String , pvalues String ) ) engine = MergeTree ORDER BY uid ; desc test_nested; ┌─name──────────┬─type───────┬ │ uid │ Int8 │ │ name │ String │ │ props.pid │ Array(Int8) │ │ props.pnames │ Array(String) │ │ props.pvalues │ Array(String) │ └─────────────┴─────────────┴

嵌套类型本质是一种多维数组的结构。嵌套表中的每个字段都是一个数组,并且行与行之间数组的长度无须对齐。需要注意的是,在同一行数据内每个数组字段的长度必须相等。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
insert into test_nested values(1,'hadoop',[1,2,3],['p1','p2','p3'],['v1','v2','v3']); -- 行和行之间的属性的个数可以不一致 ,但是当前行的Nested类型中的数组个数必须一致 insert into test_nested values(2,'spark',[1,2],['p1','p2'],['v1','v2']); SELECT * FROM test_nested ┌─uid─┬─name───┬─props.pid─┬─props.pnames─────┬─props.pvalues────┐ │ 1 │ hadoop │ [1,2,3]['p1','p2','p3']['v1','v2','v3'] │ └─────┴────────┴───────────┴──────────────────┴──────────────────┘ ┌─uid─┬─name──┬─props.pid─┬─props.pnames─┬─props.pvalues─┐ │ 2 │ spark │ [1,2]['p1','p2']['v1','v2'] │ └─────┴───────┴───────────┴──────────────┴───────────────┘ SELECT uid, name, props.pid, props.pnames[1] FROM test_nested; ┌─uid─┬─name───┬─props.pid─┬─arrayElement(props.pnames, 1)─┐ │ 1 │ hadoop │ [1,2,3] │ p1 │ └─────┴────────┴───────────┴───────────────────────────────┘ ┌─uid─┬─name──┬─props.pid─┬─arrayElement(props.pnames, 1)─┐ │ 2 │ spark │ [1,2] │ p1 │ └─────┴───────┴───────────┴───────────────────────────────┘

2.使用JDBC插入Nested数据

通过查询表结构可以看到Clickhouse存储Nested数据,本质上是将Nested数据拆成了多列存储,列数等于元素属性的个数,每一列存储的是一个Array类型的数据
因此使用insert into table values(?,?...)时,计算占位符的个数应当等于拆完Nested后总的列数

复制代码
1
2
3
4
5
-- 使用以下语句作为PreparedStatement时, 计算占位符的个数应当等于拆完Nested后总的列数 insert into test_nested values(?,?,?,?,?) -- 也可以指定需要插入的列, 例如 insert into test_nested (uid, name, props.pid) values(?,?,?)

3.基于Flink的JDBCSink插入案例

① Pojo定义

  • Nested
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.zyx.flinkdemo.pojo; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import lombok.NonNull; import java.util.List; @Data @AllArgsConstructor @NoArgsConstructor @NonNull public class Nested { private String uid; private String name; private List<Props> props; }
  • Props
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.zyx.flinkdemo.pojo; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import lombok.NonNull; @Data @NonNull @AllArgsConstructor @NoArgsConstructor public class Props { private String pid; private String pnames; private String pvalues; }

② Clickhouse工具类

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.zyx.flinkdemo.stream.utils; import com.zyx.flinkdemo.pojo.Nested; import com.zyx.flinkdemo.pojo.Props; import com.zyx.flinkdemo.pojo.TransientSink; import com.zyx.flinkdemo.stream.cons.CommonConfig; import org.apache.flink.connector.jdbc.JdbcConnectionOptions; import org.apache.flink.connector.jdbc.JdbcExecutionOptions; import org.apache.flink.connector.jdbc.JdbcSink; import org.apache.flink.connector.jdbc.JdbcStatementBuilder; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import java.lang.reflect.Field; import java.util.List; public class ClickHouseUtil { public static <T> SinkFunction<T> getNestedJdbcSink(String sql) { // obj 就是流中的一条数据对象 return JdbcSink.sink( //要执行的SQL语句 sql, // 执行写入操作 就是将当前流中的对象属性赋值给SQL的占位符 (JdbcStatementBuilder<T>) (ps, obj) -> { // 获取当前类中 所有的属性 Field[] fields = obj.getClass().getDeclaredFields(); int j = 1; for (Field field : fields) { // 设置私有属性可访问 field.setAccessible(true); if ("props".equals(field.getName())) { Nested nested = (Nested) obj; List<String[]> listStrArray = CommonUtils .listToStringArrayList(nested.getProps(), Props.class); for (String[] strArray : listStrArray) { ps.setArray(j++, ps.getConnection().createArrayOf("String", strArray)); } continue; } try { // 获取属性值 Object o = field.get(obj); ps.setObject(j++, o); } catch (IllegalAccessException e) { e.printStackTrace(); } } }, new JdbcExecutionOptions.Builder().withBatchSize(5).build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl(CommonConfig.CLICKHOUSE_URL) .withDriverName("ru.yandex.clickhouse.ClickHouseDriver") .build() ); } }

List转换成List<String[]>工具类

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.zyx.flinkdemo.stream.utils; import java.lang.reflect.Field; import java.util.ArrayList; import java.util.List; public class CommonUtils { public static <T> List<String[]> listToStringArrayList(List<T> list, Class<T> tClass) { // 取出list中的元素并添加到字符串数组中 Field[] fields = tClass.getDeclaredFields(); List<String[]> resList = new ArrayList<>(); if (list != null && list.size() > 0) { try { int listSize = list.size(); for (Field field : fields) { field.setAccessible(true); String[] strArray = new String[listSize]; for (int j = 0; j < listSize; j++) { Object obj = field.get(list.get(j)); strArray[j] = obj == null ? "" : obj.toString(); } resList.add(strArray); } } catch (IllegalAccessException e) { e.printStackTrace(); } return resList; } else { for (int i = 0; i < fields.length; i++) { String[] init = {}; resList.add(init); } return resList; } } }

④ Flink主程序代码

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.zyx.flinkdemo.stream.sink; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.zyx.flinkdemo.pojo.Nested; import com.zyx.flinkdemo.pojo.Props; import com.zyx.flinkdemo.stream.utils.ClickHouseUtil; import lombok.extern.slf4j.Slf4j; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class ClickHouseNestedSinkDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); Props props = new Props(); props.setPid("1002"); props.setPnames("p1"); JSONArray jsonArray = new JSONArray(); jsonArray.add(props); JSONObject jsonObj1 = new JSONObject(); jsonObj1.put("uid", "1001"); jsonObj1.put("name", "zhangsan"); jsonObj1.put("props", jsonArray); Nested nested1 = JSONObject.parseObject(jsonObj1.toJSONString(), Nested.class); JSONObject jsonObj2 = new JSONObject(); jsonObj2.put("uid", "1001"); Nested nested2 = JSONObject.parseObject(jsonObj2.toJSONString(), Nested.class); env.fromElements(nested1, nested2).addSink(ClickHouseUtil.getNestedJdbcSink("insert into test_nested values(?,?,?,?,?)")); env.execute(); } }

最后

以上就是开朗往事最近收集整理的关于基于Flink的JDBC插入Nested结构数据到Clickhouse的全部内容,更多相关基于Flink内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(54)

评论列表共有 0 条评论

立即
投稿
返回
顶部