我是靠谱客的博主 大意机器猫,最近开发中收集的这篇文章主要介绍flink整合kafka&Debezium(一)项目启动Debezium的搭建产生数据Flink的搭建开发flink项目,觉得挺不错的,现在分享给大家,希望可以做个参考。
概述
Debezium的搭建
参见之前的blog使用docker启动一套
注意kafka的启动参数要加上ADVERTISED_HOST_NAME作为外部访问的地址,因为这个地址会放入zookeeper里,如果不设置就会变成docker内部的地址,外部是无法访问的,并且后面flink也不报错,只是收不到消息。。。
可以使用下面命令
docker run -d --name kafka -p 9092:9092 -e ADVERTISED_HOST_NAME=$(ip a|grep 192.168|awk '{print $2}'|awk -F / '{print $1}') --link zookeeper:zookeeper debezium/kafka:0.10
产生数据
创建表
创建一个有各种类型column的表
create table table_name
(
id_c varchar(64)
constraint table_name_pk
primary key,
json_c json,
uuid_c uuid,
xml_c xml,
int_c int,
money_c money,
jsonb_c jsonb,
numeric_c numeric,
boolean_c boolean,
date_c date,
timestamp_c timestamp,
timestamptz_c timestamptz,
time_c time,
text_c text,
char_c char(11),
varchar_c varchar(12)
);
null数据
在数据库中执行
INSERT INTO public.table_name (id_c) VALUES ('null_value')
监听kafka消息
docker run -it --rm --link zookeeper:zookeeper --link kafka:kafka debezium/kafka:0.10 watch-topic -a -k -m minBytes postgres.public.table_name
会打印以下信息(注意最后一行是刚才的sql)
ARNING: Using default BROKER_ID=1, which is valid only for non-clustered installations.
Using ZOOKEEPER_CONNECT=172.17.0.5:2181
Using KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.17.0.8:9092
Using KAFKA_BROKER=172.17.0.6:9092
Contents of topic postgres.public.table_name:
{"schema":{"type":"struct","fields":[{"type":"string","optional":false,"field":"id_c"}],"optional":false,"name":"postgres.public.table_name.Key"},"payload":{"id_c":"null_value"}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"string","optional":false,"field":"id_c"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"json_c"},{"type":"string","optional":true,"name":"io.debezium.data.Uuid","version":1,"field":"uuid_c"},{"type":"string","optional":true,"name":"io.debezium.data.Xml","version":1,"field":"xml_c"},{"type":"int32","optional":true,"field":"int_c"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"2"},"field":"money_c"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"jsonb_c"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"scale"},{"type":"bytes","optional":false,"field":"value"}],"optional":true,"name":"io.debezium.data.VariableScaleDecimal","version":1,"doc":"Variable scaled decimal","field":"numeric_c"},{"type":"boolean","optional":true,"field":"boolean_c"},{"type":"int32","optional":true,"name":"io.debezium.time.Date","version":1,"field":"date_c"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"timestamp_c"},{"type":"string","optional":true,"name":"io.debezium.time.ZonedTimestamp","version":1,"field":"timestamptz_c"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTime","version":1,"field":"time_c"},{"type":"string","optional":true,"field":"text_c"},{"type":"string","optional":true,"field":"char_c"},{"type":"string","optional":true,"field":"varchar_c"}],"optional":true,"name":"postgres.public.table_name.Value","field":"before"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id_c"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"json_c"},{"type":"string","optional":true,"name":"io.debezium.data.Uuid","version":1,"field":"uuid_c"},{"type":"string","optional":true,"name":"io.debezium.data.Xml","version":1,"field":"xml_c"},{"type":"int32","optional":true,"field":"int_c"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"2"},"field":"money_c"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"jsonb_c"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"scale"},{"type":"bytes","optional":false,"field":"value"}],"optional":true,"name":"io.debezium.data.VariableScaleDecimal","version":1,"doc":"Variable scaled decimal","field":"numeric_c"},{"type":"boolean","optional":true,"field":"boolean_c"},{"type":"int32","optional":true,"name":"io.debezium.time.Date","version":1,"field":"date_c"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"timestamp_c"},{"type":"string","optional":true,"name":"io.debezium.time.ZonedTimestamp","version":1,"field":"timestamptz_c"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTime","version":1,"field":"time_c"},{"type":"string","optional":true,"field":"text_c"},{"type":"string","optional":true,"field":"char_c"},{"type":"string","optional":true,"field":"varchar_c"}],"optional":true,"name":"postgres.public.table_name.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"postgres.public.table_name.Envelope"},"payload":{"before":null,"after":{"id_c":"null_value","json_c":null,"uuid_c":null,"xml_c":null,"int_c":null,"money_c":null,"jsonb_c":null,"numeric_c":null,"boolean_c":null,"date_c":null,"timestamp_c":null,"timestamptz_c":null,"time_c":null,"text_c":null,"char_c":null,"varchar_c":null},"source":{"version":"0.10.0.Final","connector":"postgresql","name":"postgres","ts_ms":1577691115382,"snapshot":"false","db":"postgres","schema":"public","table":"table_name","txId":576,"lsn":24630784,"xmin":null},"op":"c","ts_ms":1577691115691}}
从}{分成两个json后格式化
{
"schema": {
"type": "struct",
"fields": [{
"type": "string",
"optional": false,
"field": "id_c"
}],
"optional": false,
"name": "postgres.public.table_name.Key"
},
"payload": {
"id_c": "null_value"
}
}
{
"schema": {
"type": "struct",
"fields": [{
"type": "struct",
"fields": [{
"type": "string",
"optional": false,
"field": "id_c"
}, {
"type": "string",
"optional": true,
"name": "io.debezium.data.Json",
"version": 1,
"field": "json_c"
}, {
"type": "string",
"optional": true,
"name": "io.debezium.data.Uuid",
"version": 1,
"field": "uuid_c"
}, {
"type": "string",
"optional": true,
"name": "io.debezium.data.Xml",
"version": 1,
"field": "xml_c"
}, {
"type": "int32",
"optional": true,
"field": "int_c"
}, {
"type": "bytes",
"optional": true,
"name": "org.apache.kafka.connect.data.Decimal",
"version": 1,
"parameters": {
"scale": "2"
},
"field": "money_c"
}, {
"type": "string",
"optional": true,
"name": "io.debezium.data.Json",
"version": 1,
"field": "jsonb_c"
}, {
"type": "struct",
"fields": [{
"type": "int32",
"optional": false,
"field": "scale"
}, {
"type": "bytes",
"optional": false,
"field": "value"
}],
"optional": true,
"name": "io.debezium.data.VariableScaleDecimal",
"version": 1,
"doc": "Variable scaled decimal",
"field": "numeric_c"
}, {
"type": "boolean",
"optional": true,
"field": "boolean_c"
}, {
"type": "int32",
"optional": true,
"name": "io.debezium.time.Date",
"version": 1,
"field": "date_c"
}, {
"type": "int64",
"optional": true,
"name": "io.debezium.time.MicroTimestamp",
"version": 1,
"field": "timestamp_c"
}, {
"type": "string",
"optional": true,
"name": "io.debezium.time.ZonedTimestamp",
"version": 1,
"field": "timestamptz_c"
}, {
"type": "int64",
"optional": true,
"name": "io.debezium.time.MicroTime",
"version": 1,
"field": "time_c"
}, {
"type": "string",
"optional": true,
"field": "text_c"
}, {
"type": "string",
"optional": true,
"field": "char_c"
}, {
"type": "string",
"optional": true,
"field": "varchar_c"
}],
"optional": true,
"name": "postgres.public.table_name.Value",
"field": "before"
}, {
"type": "struct",
"fields": [{
"type": "string",
"optional": false,
"field": "id_c"
}, {
"type": "string",
"optional": true,
"name": "io.debezium.data.Json",
"version": 1,
"field": "json_c"
}, {
"type": "string",
"optional": true,
"name": "io.debezium.data.Uuid",
"version": 1,
"field": "uuid_c"
}, {
"type": "string",
"optional": true,
"name": "io.debezium.data.Xml",
"version": 1,
"field": "xml_c"
}, {
"type": "int32",
"optional": true,
"field": "int_c"
}, {
"type": "bytes",
"optional": true,
"name": "org.apache.kafka.connect.data.Decimal",
"version": 1,
"parameters": {
"scale": "2"
},
"field": "money_c"
}, {
"type": "string",
"optional": true,
"name": "io.debezium.data.Json",
"version": 1,
"field": "jsonb_c"
}, {
"type": "struct",
"fields": [{
"type": "int32",
"optional": false,
"field": "scale"
}, {
"type": "bytes",
"optional": false,
"field": "value"
}],
"optional": true,
"name": "io.debezium.data.VariableScaleDecimal",
"version": 1,
"doc": "Variable scaled decimal",
"field": "numeric_c"
}, {
"type": "boolean",
"optional": true,
"field": "boolean_c"
}, {
"type": "int32",
"optional": true,
"name": "io.debezium.time.Date",
"version": 1,
"field": "date_c"
}, {
"type": "int64",
"optional": true,
"name": "io.debezium.time.MicroTimestamp",
"version": 1,
"field": "timestamp_c"
}, {
"type": "string",
"optional": true,
"name": "io.debezium.time.ZonedTimestamp",
"version": 1,
"field": "timestamptz_c"
}, {
"type": "int64",
"optional": true,
"name": "io.debezium.time.MicroTime",
"version": 1,
"field": "time_c"
}, {
"type": "string",
"optional": true,
"field": "text_c"
}, {
"type": "string",
"optional": true,
"field": "char_c"
}, {
"type": "string",
"optional": true,
"field": "varchar_c"
}],
"optional": true,
"name": "postgres.public.table_name.Value",
"field": "after"
}, {
"type": "struct",
"fields": [{
"type": "string",
"optional": false,
"field": "version"
}, {
"type": "string",
"optional": false,
"field": "connector"
}, {
"type": "string",
"optional": false,
"field": "name"
}, {
"type": "int64",
"optional": false,
"field": "ts_ms"
}, {
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false"
},
"default": "false",
"field": "snapshot"
}, {
"type": "string",
"optional": false,
"field": "db"
}, {
"type": "string",
"optional": false,
"field": "schema"
}, {
"type": "string",
"optional": false,
"field": "table"
}, {
"type": "int64",
"optional": true,
"field": "txId"
}, {
"type": "int64",
"optional": true,
"field": "lsn"
}, {
"type": "int64",
"optional": true,
"field": "xmin"
}],
"optional": false,
"name": "io.debezium.connector.postgresql.Source",
"field": "source"
}, {
"type": "string",
"optional": false,
"field": "op"
}, {
"type": "int64",
"optional": true,
"field": "ts_ms"
}],
"optional": false,
"name": "postgres.public.table_name.Envelope"
},
"payload": {
"before": null,
"after": {
"id_c": "null_value",
"json_c": null,
"uuid_c": null,
"xml_c": null,
"int_c": null,
"money_c": null,
"jsonb_c": null,
"numeric_c": null,
"boolean_c": null,
"date_c": null,
"timestamp_c": null,
"timestamptz_c": null,
"time_c": null,
"text_c": null,
"char_c": null,
"varchar_c": null
},
"source": {
"version": "0.10.0.Final",
"connector": "postgresql",
"name": "postgres",
"ts_ms": 1577691115382,
"snapshot": "false",
"db": "postgres",
"schema": "public",
"table": "table_name",
"txId": 576,
"lsn": 24630784,
"xmin": null
},
"op": "c",
"ts_ms": 1577691115691
}
}
Flink的搭建
这里参考官方docker方式
创建目录
mkdir flink
vi docker-compose.yml
黏贴以下配置
version: "2.1"
services:
jobmanager:
image: ${FLINK_DOCKER_IMAGE_NAME:-flink}
expose:
- "6123"
ports:
- "8081:8081"
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
taskmanager:
image: ${FLINK_DOCKER_IMAGE_NAME:-flink}
expose:
- "6121"
- "6122"
depends_on:
- jobmanager
command: taskmanager
links:
- "jobmanager:jobmanager"
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
启动集群
docker-compose up
用浏览器访问http://localhost:8081进入flink控制面板
开发flink项目
初始化
使用下面命令用maven生成一个flink项目
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.9.0 -DgroupId=qbit.flink -DartifactId=flink -Dversion=1.0.0 -Dpackage=qbit.flink -DinteractiveMode=false
对接kafka
maven依赖
去掉pom.xml文件里的相应注释即可加入kafka依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
编写java代码
public class StreamingJob {
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties=new Properties();
properties.setProperty("bootstrap.servers","192.168.1.191:9092");
properties.setProperty("group.id","flink-test");
FlinkKafkaConsumer010<String> consumer= new FlinkKafkaConsumer010<>("postgres.public.table_name", new SimpleStringSchema(), properties);
// consumer.setStartFromEarliest();
DataStream<String> stream=env.addSource(consumer);
stream.print();
// execute program
env.execute("debezium-test");
}
}
使用maven package命令打包成jar
上传
进入Submit New Job页面点击Add New上传刚才打包好的jar,然后会看到一个叫debezium的job,状态是RUNNING
运行
如果kafka配置错误会在日志里看到以下内容
INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> Sink: Print to Std. Out (1/1) (ea5b645e70cf101f4fe5da1fd5a63b80) switched from RUNNING to FAILED.
taskmanager_1 | org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
注意下面的一行打印的ip地址必须是kafka宿主机的地址而不是容器的地址
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered coordinator 192.168.1.191:9092 (id: 2147483646 rack: null) for group flink-test.
然后操作数据库,flink就会在CLI界面打印出数据
最后
以上就是大意机器猫为你收集整理的flink整合kafka&Debezium(一)项目启动Debezium的搭建产生数据Flink的搭建开发flink项目的全部内容,希望文章能够帮你解决flink整合kafka&Debezium(一)项目启动Debezium的搭建产生数据Flink的搭建开发flink项目所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复