我是靠谱客的博主 单身电灯胆,最近开发中收集的这篇文章主要介绍Flink elasticsearch-sink by http and https,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

官网上有关于http的例子:

DataStream<String> input = ...;

List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"));

// use a ElasticsearchSink.Builder to create an ElasticsearchSink
ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
    httpHosts,
    new ElasticsearchSinkFunction<String>() {
        public IndexRequest createIndexRequest(String element) {
            Map<String, String> json = new HashMap<>();
            json.put("data", element);
        
            return Requests.indexRequest()
                    .index("my-index")
                    .type("my-type")
                    .source(json);
        }
        
        @Override
        public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
            indexer.add(createIndexRequest(element));
        }
    }
);

     如果只是这样的话,写入elasticsearch时报错,将会导致flink不断的重启重试写入,原因是没有添加一旦错误写入elasticsearch的处理方式。 

     添加错误处理方式:

esSinkBuilder.setFailureHandler(new ActionRequestFailureHandler() {
            @Override
            public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {
                logger.warn(failure.getMessage());
                logger.warn("write message to es failed and drop the message ");
            }
        });

  如果在http基础上添加了用户名和密码:

final CredentialsProvider credentialsProvider = new SerializableCredentialsProvider();
            credentialsProvider.setCredentials(AuthScope.ANY,
                    new UsernamePasswordCredentials(esUsername, esPassword));


esSinkBuilder.setRestClientFactory(
                restClientBuilder -> {
                    restClientBuilder.setHttpClientConfigCallback(
                        httpAsyncClientBuilder -> {                        
                            return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                               
                        }
                    );

                })

那如果是https呢?

List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("127.0.0.1", 9200, "https"));
httpHosts.add(new HttpHost("10.2.3.1", 9200, "https"));

// use a ElasticsearchSink.Builder to create an ElasticsearchSink
ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
    httpHosts,
    new ElasticsearchSinkFunction<String>() {
        public IndexRequest createIndexRequest(String element) {
            Map<String, String> json = new HashMap<>();
            json.put("data", element);
        
            return Requests.indexRequest()
                    .index("my-index")
                    .type("my-type")
                    .source(json);
        }
        
        @Override
        public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
            indexer.add(createIndexRequest(element));
        }
    }
);


esSinkBuilder.setRestClientFactory(
                restClientBuilder -> {
                    restClientBuilder.setHttpClientConfigCallback(
                        httpAsyncClientBuilder -> {                        
                            httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                              return  httpAsyncClientBuilder.setConnectionManager(SslUtil.getConnectionManager());
                        }
                    );

                })

import java.io.Serializable;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSession;

import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;

import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.nio.conn.SchemeIOSessionStrategy;
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
import org.apache.http.ssl.TrustStrategy;


public class SslUtil implements Serializable {

    public static PoolingNHttpClientConnectionManager getConnectionManager() throws Exception {
        SSLContextBuilder builder = SSLContexts.custom();
        builder.loadTrustMaterial(null, new TrustStrategy() {
            @Override
            public boolean isTrusted(X509Certificate[] chain, String authType)
                    throws CertificateException {
                return true;
            }
        });
        SSLContext sslContext = builder.build();
        SchemeIOSessionStrategy sslioSessionStrategy = new SSLIOSessionStrategy(sslContext,
                new HostnameVerifier() {
                    @Override
                    public boolean verify(String hostname, SSLSession session) {
                        return true;
                    }
                });
        Registry<SchemeIOSessionStrategy> sslioSessionRegistry = RegistryBuilder.<SchemeIOSessionStrategy>create()
                .register("https", sslioSessionStrategy).build();
        PoolingNHttpClientConnectionManager ncm =
                new PoolingNHttpClientConnectionManager(new DefaultConnectingIOReactor(), sslioSessionRegistry);
        return ncm;
    }
}

 

最后

以上就是单身电灯胆为你收集整理的Flink elasticsearch-sink by http and https的全部内容,希望文章能够帮你解决Flink elasticsearch-sink by http and https所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(59)

评论列表共有 0 条评论

立即
投稿
返回
顶部