概述
官网上有关于http的例子:
DataStream<String> input = ...;
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"));
// use a ElasticsearchSink.Builder to create an ElasticsearchSink
ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
httpHosts,
new ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) {
Map<String, String> json = new HashMap<>();
json.put("data", element);
return Requests.indexRequest()
.index("my-index")
.type("my-type")
.source(json);
}
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}
);
如果只是这样的话,写入elasticsearch时报错,将会导致flink不断的重启重试写入,原因是没有添加一旦错误写入elasticsearch的处理方式。
添加错误处理方式:
esSinkBuilder.setFailureHandler(new ActionRequestFailureHandler() {
@Override
public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {
logger.warn(failure.getMessage());
logger.warn("write message to es failed and drop the message ");
}
});
如果在http基础上添加了用户名和密码:
final CredentialsProvider credentialsProvider = new SerializableCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(esUsername, esPassword));
esSinkBuilder.setRestClientFactory(
restClientBuilder -> {
restClientBuilder.setHttpClientConfigCallback(
httpAsyncClientBuilder -> {
return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
);
})
那如果是https呢?
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("127.0.0.1", 9200, "https"));
httpHosts.add(new HttpHost("10.2.3.1", 9200, "https"));
// use a ElasticsearchSink.Builder to create an ElasticsearchSink
ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
httpHosts,
new ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) {
Map<String, String> json = new HashMap<>();
json.put("data", element);
return Requests.indexRequest()
.index("my-index")
.type("my-type")
.source(json);
}
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}
);
esSinkBuilder.setRestClientFactory(
restClientBuilder -> {
restClientBuilder.setHttpClientConfigCallback(
httpAsyncClientBuilder -> {
httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
return httpAsyncClientBuilder.setConnectionManager(SslUtil.getConnectionManager());
}
);
})
import java.io.Serializable;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSession;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.nio.conn.SchemeIOSessionStrategy;
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
import org.apache.http.ssl.TrustStrategy;
public class SslUtil implements Serializable {
public static PoolingNHttpClientConnectionManager getConnectionManager() throws Exception {
SSLContextBuilder builder = SSLContexts.custom();
builder.loadTrustMaterial(null, new TrustStrategy() {
@Override
public boolean isTrusted(X509Certificate[] chain, String authType)
throws CertificateException {
return true;
}
});
SSLContext sslContext = builder.build();
SchemeIOSessionStrategy sslioSessionStrategy = new SSLIOSessionStrategy(sslContext,
new HostnameVerifier() {
@Override
public boolean verify(String hostname, SSLSession session) {
return true;
}
});
Registry<SchemeIOSessionStrategy> sslioSessionRegistry = RegistryBuilder.<SchemeIOSessionStrategy>create()
.register("https", sslioSessionStrategy).build();
PoolingNHttpClientConnectionManager ncm =
new PoolingNHttpClientConnectionManager(new DefaultConnectingIOReactor(), sslioSessionRegistry);
return ncm;
}
}
最后
以上就是单身电灯胆为你收集整理的Flink elasticsearch-sink by http and https的全部内容,希望文章能够帮你解决Flink elasticsearch-sink by http and https所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复