1.Clickhouse的Nested数据结构
Nested是一种嵌套表结构。一张数据表,可以定义任意多个嵌套类型字段,但每个字段的嵌套层级只支持一级,即嵌套表内不能继续使用嵌套类型。对于简单场景的层级关系或关联关系,使用嵌套类型也是一种不错的选择。
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18create table test_nested( uid Int8 , name String , props Nested( pid Int8, pnames String , pvalues String ) ) engine = MergeTree ORDER BY uid ; desc test_nested; ┌─name──────────┬─type───────┬ │ uid │ Int8 │ │ name │ String │ │ props.pid │ Array(Int8) │ │ props.pnames │ Array(String) │ │ props.pvalues │ Array(String) │ └─────────────┴─────────────┴
嵌套类型本质是一种多维数组的结构。嵌套表中的每个字段都是一个数组,并且行与行之间数组的长度无须对齐。需要注意的是,在同一行数据内每个数组字段的长度必须相等。
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25insert into test_nested values(1,'hadoop',[1,2,3],['p1','p2','p3'],['v1','v2','v3']); -- 行和行之间的属性的个数可以不一致 ,但是当前行的Nested类型中的数组个数必须一致 insert into test_nested values(2,'spark',[1,2],['p1','p2'],['v1','v2']); SELECT * FROM test_nested ┌─uid─┬─name───┬─props.pid─┬─props.pnames─────┬─props.pvalues────┐ │ 1 │ hadoop │ [1,2,3] │ ['p1','p2','p3'] │ ['v1','v2','v3'] │ └─────┴────────┴───────────┴──────────────────┴──────────────────┘ ┌─uid─┬─name──┬─props.pid─┬─props.pnames─┬─props.pvalues─┐ │ 2 │ spark │ [1,2] │ ['p1','p2'] │ ['v1','v2'] │ └─────┴───────┴───────────┴──────────────┴───────────────┘ SELECT uid, name, props.pid, props.pnames[1] FROM test_nested; ┌─uid─┬─name───┬─props.pid─┬─arrayElement(props.pnames, 1)─┐ │ 1 │ hadoop │ [1,2,3] │ p1 │ └─────┴────────┴───────────┴───────────────────────────────┘ ┌─uid─┬─name──┬─props.pid─┬─arrayElement(props.pnames, 1)─┐ │ 2 │ spark │ [1,2] │ p1 │ └─────┴───────┴───────────┴───────────────────────────────┘
2.使用JDBC插入Nested数据
通过查询表结构可以看到Clickhouse存储Nested数据,本质上是将Nested
数据拆成了多列存储,列数等于元素属性的个数,每一列存储的是一个Array
类型的数据
因此使用insert into table values(?,?...)
时,计算占位符的个数应当等于拆完Nested
后总的列数
复制代码
1
2
3
4
5-- 使用以下语句作为PreparedStatement时, 计算占位符的个数应当等于拆完Nested后总的列数 insert into test_nested values(?,?,?,?,?) -- 也可以指定需要插入的列, 例如 insert into test_nested (uid, name, props.pid) values(?,?,?)
3.基于Flink的JDBCSink插入案例
① Pojo定义
- Nested
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20package com.zyx.flinkdemo.pojo; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import lombok.NonNull; import java.util.List; @Data @AllArgsConstructor @NoArgsConstructor @NonNull public class Nested { private String uid; private String name; private List<Props> props; }
- Props
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17package com.zyx.flinkdemo.pojo; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import lombok.NonNull; @Data @NonNull @AllArgsConstructor @NoArgsConstructor public class Props { private String pid; private String pnames; private String pvalues; }
② Clickhouse工具类
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57package com.zyx.flinkdemo.stream.utils; import com.zyx.flinkdemo.pojo.Nested; import com.zyx.flinkdemo.pojo.Props; import com.zyx.flinkdemo.pojo.TransientSink; import com.zyx.flinkdemo.stream.cons.CommonConfig; import org.apache.flink.connector.jdbc.JdbcConnectionOptions; import org.apache.flink.connector.jdbc.JdbcExecutionOptions; import org.apache.flink.connector.jdbc.JdbcSink; import org.apache.flink.connector.jdbc.JdbcStatementBuilder; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import java.lang.reflect.Field; import java.util.List; public class ClickHouseUtil { public static <T> SinkFunction<T> getNestedJdbcSink(String sql) { // obj 就是流中的一条数据对象 return JdbcSink.sink( //要执行的SQL语句 sql, // 执行写入操作 就是将当前流中的对象属性赋值给SQL的占位符 (JdbcStatementBuilder<T>) (ps, obj) -> { // 获取当前类中 所有的属性 Field[] fields = obj.getClass().getDeclaredFields(); int j = 1; for (Field field : fields) { // 设置私有属性可访问 field.setAccessible(true); if ("props".equals(field.getName())) { Nested nested = (Nested) obj; List<String[]> listStrArray = CommonUtils .listToStringArrayList(nested.getProps(), Props.class); for (String[] strArray : listStrArray) { ps.setArray(j++, ps.getConnection().createArrayOf("String", strArray)); } continue; } try { // 获取属性值 Object o = field.get(obj); ps.setObject(j++, o); } catch (IllegalAccessException e) { e.printStackTrace(); } } }, new JdbcExecutionOptions.Builder().withBatchSize(5).build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl(CommonConfig.CLICKHOUSE_URL) .withDriverName("ru.yandex.clickhouse.ClickHouseDriver") .build() ); } }
③ List
转换成List<String[]>
工具类
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37package com.zyx.flinkdemo.stream.utils; import java.lang.reflect.Field; import java.util.ArrayList; import java.util.List; public class CommonUtils { public static <T> List<String[]> listToStringArrayList(List<T> list, Class<T> tClass) { // 取出list中的元素并添加到字符串数组中 Field[] fields = tClass.getDeclaredFields(); List<String[]> resList = new ArrayList<>(); if (list != null && list.size() > 0) { try { int listSize = list.size(); for (Field field : fields) { field.setAccessible(true); String[] strArray = new String[listSize]; for (int j = 0; j < listSize; j++) { Object obj = field.get(list.get(j)); strArray[j] = obj == null ? "" : obj.toString(); } resList.add(strArray); } } catch (IllegalAccessException e) { e.printStackTrace(); } return resList; } else { for (int i = 0; i < fields.length; i++) { String[] init = {}; resList.add(init); } return resList; } } }
④ Flink主程序代码
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38package com.zyx.flinkdemo.stream.sink; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.zyx.flinkdemo.pojo.Nested; import com.zyx.flinkdemo.pojo.Props; import com.zyx.flinkdemo.stream.utils.ClickHouseUtil; import lombok.extern.slf4j.Slf4j; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class ClickHouseNestedSinkDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); Props props = new Props(); props.setPid("1002"); props.setPnames("p1"); JSONArray jsonArray = new JSONArray(); jsonArray.add(props); JSONObject jsonObj1 = new JSONObject(); jsonObj1.put("uid", "1001"); jsonObj1.put("name", "zhangsan"); jsonObj1.put("props", jsonArray); Nested nested1 = JSONObject.parseObject(jsonObj1.toJSONString(), Nested.class); JSONObject jsonObj2 = new JSONObject(); jsonObj2.put("uid", "1001"); Nested nested2 = JSONObject.parseObject(jsonObj2.toJSONString(), Nested.class); env.fromElements(nested1, nested2).addSink(ClickHouseUtil.getNestedJdbcSink("insert into test_nested values(?,?,?,?,?)")); env.execute(); } }
最后
以上就是开朗往事最近收集整理的关于基于Flink的JDBC插入Nested结构数据到Clickhouse的全部内容,更多相关基于Flink内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复