概述
不谈需求的铺代码==耍流氓。。。
需求(概略):从kafka读数据=》业务处理=》写入es=》写入kafka(通过kafka通知其他业务:某条数据的id,以及所在es的索引)
第一版代码(摘要):
//写入es
mapDs.addSink(new ElasticsearchSink<JSONObject>(esProps, transportAddresses, (ElasticsearchSinkFunction<JSONObject>) (jsonObject, runtimeContext, requestIndexer) -> {
long createTime = jsonObject.getLong("create_time");
String indexName = esPrefix.concat(Utils.longDate2String(createTime, "yyyy.MM.dd"));
requestIndexer.add(Requests.indexRequest().index(indexName)
.type(esType)
.source(jsonObject));
}, new RetryRejectedExecutionFailureHandler())).name("sop-controller-es-sink");
//写入kafka
mapDs.filter(jsonObject -> "初始化".equals(jsonObject.getString("
最后
以上就是朴素银耳汤为你收集整理的Flink es sink的一次历险的全部内容,希望文章能够帮你解决Flink es sink的一次历险所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复