我是靠谱客的博主 可耐香菇,最近开发中收集的这篇文章主要介绍Flink DataStream写入Elasticsearch与容错从Kafka 0.10.1读数据并写入到Elasticsearch 6.2.4,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

本文基于Flink 1.8 和ES 6.2.4 总结Flink写入Elasticsearch的问题点。

在使用ElasticsearchSink时,需要注意以下几点:

  1. ElasticsearchSink内部使用RestHighLevelClient(ES>=6.x)或TransportClient(ES<6.x)与Elasticsearch集群进行通信。

  2. ElasticsearchSink内部使用BulkProcessor一次将一批动作(ActionRequest)发送到ES集群。在发送批量动作前,BulkProcessor先缓存,再刷新。缓存刷新的间隔,支持基于Action数量、基于Action大小、基于时间间隔3种策略。BulkProcessor支持在同一次Bulk中有多种ActionRequest(如: IndexRequestDeleteRequestUpdateRequest)等等。

  3. 配置权限认证,实现RestClientFactory接口即可,主要实现RestClientFactory#configureRestClientBuilder方法。如下示例。

  4. 开启Checkpoint,且ElasticsearchSink启用了flushOnCheckpoint(默认启用),则ElasticsearchSink将在检查点快照状态前确保所有的请求都已被成功发送到ES集群。

  5. ElasticsearchSink中的ActionRequest可能会因各种原因导致失败,如ES的Reject异常,ES偶尔timeout异常等等。默认,ElasticsearchSink将使用NoOpFailureHandler失败处理机制,即遇到异常会简单抛出,容易导致ElasticsearchSink丢失数据,可实现ActionRequestFailureHandler接口来自定义异常处理。如下示例。

  6. ElasticsearchSink重要参数:

    • bulk.flush.max.actions: 默认1000。每个Bulk请求,最大缓冲Action个数。
    • bulk.flush.max.size.mb: 默认5mb。每个Bulk请求,最大缓冲的Action大小。
    • bulk.flush.interval.ms: 默认为空,单位毫秒。Bulk刷新间隔。不论Action个数或Action大小如何设置,到刷新间隔了,就会刷新缓冲,发起Bulk请求。
    • 延迟重试策略: 默认启用指数级间隔重试策略,初始等待50ms,8次重试。如需自定义延迟重试策略,可通过以下参数配置。
      • bulk.flush.backoff.enable: 延迟重试是否启用。
      • bulk.flush.backoff.type: 延迟重试类型,CONSTANT(固定间隔)或EXPONENTIAL(指数级间隔)。
      • bulk.flush.backoff.delay: 延迟重试间隔。对于CONSTANT类型,此值为每次重试间的间隔;对于EXPONENTIAL,此值为初始延迟。
      • bulk.flush.backoff.retries: 延迟重试次数。

从Kafka 0.10.1读数据并写入到Elasticsearch 6.2.4

部分依赖

<!--Kafka连接器-->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
    <version>1.8.0</version>
</dependency>

<!--ES连接器-->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
    <version>1.8.0</version>
</dependency>

代码实现

package com.bigdata.flink;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.flink.streaming.connectors.elasticsearch6.RestClientFactory;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.xcontent.XContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.SocketTimeoutException;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;

/**
 * Author: Wang Pei
 * Summary:
 *      数据写入ES以及容错处理
 */
public class ReadKafkaWriteES {

    private static Logger logger = LoggerFactory.getLogger(ReadKafkaWriteES.class);

    public static void main(String[] args) throws Exception{

        /** 解析命令行参数*/
        ParameterTool fromArgs = ParameterTool.fromArgs(args);
        ParameterTool parameterTool = ParameterTool.fromPropertiesFile(fromArgs.getRequired("applicationProperties"));

        // checkpoint参数
        String checkpointDirectory = parameterTool.getRequired("checkpointDirectory");
        long checkpointSecondInterval = parameterTool.getLong("checkpointSecondInterval");

        // fromKafka参数
        String fromKafkaBootstrapServers = parameterTool.getRequired("fromKafka.bootstrap.servers");
        String fromKafkaGroupID = parameterTool.getRequired("fromKafka.group.id");
        String fromKafkaAutoOffsetReset= parameterTool.getRequired("fromKafka.auto.offset.reset");
        String fromKafkaTopic = parameterTool.getRequired("fromKafka.topic");

        // toES参数
        String toESHost = parameterTool.getRequired("toES.host");
        String toESUsername = parameterTool.getRequired("toES.username");
        String toESPassword = parameterTool.getRequired("toES.password");


        /** 配置运行环境*/
        // 设置Local Web Server
        Configuration config = new Configuration();
        config.setInteger(RestOptions.PORT,8081);
        config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);
        //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // 设置StateBackend
        env.setStateBackend((StateBackend) new FsStateBackend(checkpointDirectory, true));

        // 设置Checkpoint
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        checkpointConfig.setCheckpointInterval(checkpointSecondInterval * 1000);
        checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        /** 配置Kafka数据源*/
        Properties kafkaProperties = new Properties();
        kafkaProperties.put("bootstrap.servers",fromKafkaBootstrapServers);
        kafkaProperties.put("group.id",fromKafkaGroupID);
        kafkaProperties.put("auto.offset.reset",fromKafkaAutoOffsetReset);
        FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>(fromKafkaTopic, new SimpleStringSchema(), kafkaProperties);
        kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
        DataStream<String> source = env.addSource(kafkaConsumer).name("KafkaSource").uid("source-id");

        /** 简单转换加载*/
        // 对于每条输入数据,均调用ProcessFunction
        // ProcessFunction可产生0条或多条数据
        SingleOutputStreamOperator<JSONObject> etl = source.process(new ProcessFunction<String, JSONObject>() {
            @Override
            public void processElement(String value, Context ctx, Collector<JSONObject> out) {
                try {
                    JSONObject obj = JSON.parseObject(value);
                    out.collect(obj);
                } catch (Exception ex) {
                    logger.error("ExceptionData: {}",value,ex);
                }
            }
        });

        /** 配置ES目的地*/
        // 构造HttpHost
        List<HttpHost> httpHosts = Arrays.stream(toESHost.split(",")).map(value -> new HttpHost(value.split(":")[0], Integer.parseInt(value.split(":")[1]))).collect(Collectors.toList());

        // 构造ElasticsearchSinkBuilder
        ElasticsearchSink.Builder<JSONObject> elasticsearchSinkBuilder = new ElasticsearchSink.Builder<>(
                httpHosts,
                new ElasticsearchSinkFunctionImpl());

        // 1、设置每次Bulk最大Action数
        elasticsearchSinkBuilder.setBulkFlushMaxActions(100);

        // 2、添加权限认证
        RestClientFactoryImpl restClientFactory = new RestClientFactoryImpl(toESUsername, toESPassword);
        restClientFactory.configureRestClientBuilder(RestClient.builder(httpHosts.toArray(new HttpHost[httpHosts.size()])));
        elasticsearchSinkBuilder.setRestClientFactory(restClientFactory);

        // 3、添加异常处理
        elasticsearchSinkBuilder.setFailureHandler(new ActionRequestFailureHandlerImp());

        // 4、构造ElasticsearchSink
        ElasticsearchSink<JSONObject> elasticsearchSink = elasticsearchSinkBuilder.build();

        etl.addSink(elasticsearchSink);

        env.execute();
    }

    /**
     * 根据输入创建一个或多个{@link ActionRequest ActionRequests}
     * ${@link IndexRequest IndexRequest}: 索引数据
     * ${@link DeleteRequest DeleteRequest}: 删除数据
     * ${@link UpdateRequest UpdateRequest}: 更新数据
     * 实现${@link ElasticsearchSinkFunction#process(Object, RuntimeContext, RequestIndexer)}方法
     */
    static class ElasticsearchSinkFunctionImpl implements ElasticsearchSinkFunction<JSONObject>{
        private IndexRequest indexRequest(JSONObject element) {
            return Requests.indexRequest()
                    .index(element.getString("eventType"))
                    .type("_doc")
                    .source(element.toJSONString(), XContentType.JSON);
        }

        @Override
        public void process(JSONObject element, RuntimeContext ctx, RequestIndexer indexer) {
            indexer.add(indexRequest(element));
        }
    }

    /**
     * 配置${@link org.elasticsearch.client.RestHighLevelClient}
     * 添加权限认证、设置超时时间等等
     * 实现${@link RestClientFactory#configureRestClientBuilder(RestClientBuilder)}方法
     */
    static class RestClientFactoryImpl implements RestClientFactory {

        private String username;

        private String password;

        private RestClientFactoryImpl(String username, String password) {
            this.username = username;
            this.password = password;
        }

        @Override
        public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
            BasicCredentialsProvider basicCredentialsProvider = new BasicCredentialsProvider();
            basicCredentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username,password));

            restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> httpAsyncClientBuilder.setDefaultCredentialsProvider(basicCredentialsProvider));
        }
    }

    /**
     * 自定义异常处理
     * 实现${@link ActionRequestFailureHandler#onFailure(ActionRequest, Throwable, int, RequestIndexer)}方法
     */

    @Slf4j
    static class ActionRequestFailureHandlerImp implements ActionRequestFailureHandler {

        @Override
        public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) {

            // 异常1: ES队列满了(Reject异常),放回队列
            if(ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()){
                indexer.add(action);

            // 异常2: ES超时异常(timeout异常),放回队列
            }else if(ExceptionUtils.findThrowable(failure, SocketTimeoutException.class).isPresent()){
                indexer.add(action);
                
            // 异常3: ES语法异常,丢弃数据,记录日志
            }else if(ExceptionUtils.findThrowable(failure, ElasticsearchParseException.class).isPresent()){
                log.error("Sink to es exception ,exceptionData: {} ,exceptionStackTrace: {}",action.toString(),org.apache.commons.lang.exception.ExceptionUtils.getFullStackTrace(failure));
                
            // 异常4: 其他异常,丢弃数据,记录日志
            }else{
                log.error("Sink to es exception ,exceptionData: {} ,exceptionStackTrace: {}",action.toString(),org.apache.commons.lang.exception.ExceptionUtils.getFullStackTrace(failure));
            }
        }}
}

最后

以上就是可耐香菇为你收集整理的Flink DataStream写入Elasticsearch与容错从Kafka 0.10.1读数据并写入到Elasticsearch 6.2.4的全部内容,希望文章能够帮你解决Flink DataStream写入Elasticsearch与容错从Kafka 0.10.1读数据并写入到Elasticsearch 6.2.4所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(56)

评论列表共有 0 条评论

立即
投稿
返回
顶部