我是靠谱客的博主 虚幻鼠标,最近开发中收集的这篇文章主要介绍flink-sql所有表格式format-1.151. 版本说明2. 所有格式3. CSV4. JSON5. AVRO6. Confluent Avro7. Debezium8. Canal9. Maxwell10. Ogg11. Parquet12. Orc13. Raw,觉得挺不错的,现在分享给大家,希望可以做个参考。
概述
1. 版本说明
本文档内容基于flink-1.15.x
,其他版本的整理,请查看本人博客的 flink 专栏其他文章。
2. 所有格式
Flink提供了一组可以与表连接器一起使用的表格式。表格式是一种存储格式,定义如何将二进制数据映射到表字段。
Flink支持以下格式:
格式 | 连接器 |
---|---|
CSV | Apache Kafka, Upsert Kafka, Amazon Kinesis Data Streams, Filesystem |
JSON | Apache Kafka, Upsert Kafka, Amazon Kinesis Data Streams, Filesystem, Elasticsearch |
Apache Avro | Apache Kafka, Upsert Kafka, Amazon Kinesis Data Streams, Filesystem |
Confluent Avro | Apache Kafka, Upsert Kafka |
Debezium CDC | Apache Kafka, Filesystem |
Canal CDC | Apache Kafka, Filesystem |
Maxwell CDC | Apache Kafka, Filesystem |
OGG CDC | Apache Kafka, Filesystem(从 flink-1.15.x 开始支持) |
Apache Parquet | Filesystem |
Apache ORC | Filesystem |
Raw | Apache Kafka, Upsert Kafka, Amazon Kinesis Data Streams, Filesystem |
3. CSV
3.1. 说明
支持:
- Format: Serialization Schema 序列化格式
- Format: Deserialization Schema 反序列化格式
CSV格式允许基于CSV schema读写CSV格式的数据。目前,CSV schema来源于表schema定义。
3.2. 依赖
为了使用CSV格式,以下依赖项对于使用自动化构建工具(如Maven或SBT)的项目和带有SQL JAR包的SQL Client都是必需的。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.15.2</version>
</dependency>
注意自己使用的 flink 版本。
3.3. 使用CSV格式
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'csv',
'csv.ignore-parse-errors' = 'true',
'csv.allow-comments' = 'true'
)
3.4. Format参数
选项 | 要求 | 可被转发 | 默认值 | 类型 | 描述 |
---|---|---|---|---|---|
format | 必选 | 否 | (none) | String | 指定使用哪种格式,这儿应该是 csv 。 |
csv.field-delimiter | 可选 | 是 | , | String | 字段值分隔符号(默认为英文逗号**,),必须是单个字符。 可以使用反斜杠来指定特殊字符,比如t**代表制表符。 也可以在纯SQL中使用unicode编码来指定,比如:csv.field-delimiter’ = U&' |