我是靠谱客的博主 朴素银耳汤,最近开发中收集的这篇文章主要介绍Flink es sink的一次历险,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

不谈需求的铺代码==耍流氓。。。

需求(概略):从kafka读数据=》业务处理=》写入es=》写入kafka(通过kafka通知其他业务:某条数据的id,以及所在es的索引)

第一版代码(摘要):

//写入es        
mapDs.addSink(new ElasticsearchSink<JSONObject>(esProps, transportAddresses, (ElasticsearchSinkFunction<JSONObject>) (jsonObject, runtimeContext, requestIndexer) -> {
            long createTime = jsonObject.getLong("create_time");
            String indexName = esPrefix.concat(Utils.longDate2String(createTime, "yyyy.MM.dd"));
            requestIndexer.add(Requests.indexRequest().index(indexName)
                    .type(esType)
                    .source(jsonObject));
        }, new RetryRejectedExecutionFailureHandler())).name("sop-controller-es-sink");

//写入kafka
mapDs.filter(jsonObject -> "初始化".equals(jsonObject.getString("

最后

以上就是朴素银耳汤为你收集整理的Flink es sink的一次历险的全部内容,希望文章能够帮你解决Flink es sink的一次历险所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(64)

评论列表共有 0 条评论

立即
投稿
返回
顶部