我是靠谱客的博主 专注唇彩,最近开发中收集的这篇文章主要介绍Apache Flink 读取本地文件,处理数据,导入ES需求环境pom.xml自定义一个工具类ElasticsearchSinkUtil.javaMain方法自定义一个配置文件打包部署启动Flink集群运行作业,觉得挺不错的,现在分享给大家,希望可以做个参考。
概述
需求
- 本地有一份文件
- 使用Flink读取本地数据源
- 处理数据,导入ES中
- 提交Flink作业
环境
- Flink :1.8.2
- Elasticsearch:6.2.3
- JDK:1.8
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.vincent</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.8.2</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<hadoop.version>2.8.5</hadoop.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
</properties>
<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Elasticsearch 6.x -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Hadoop -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.11</artifactId>
<version>1.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.41</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.6</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.vincent.Test</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<versionRange>[3.0.0,)</versionRange>
<goals>
<goal>shade</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<versionRange>[3.1,)</versionRange>
<goals>
<goal>testCompile</goal>
<goal>compile</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
<!-- This profile helps to make things run out of the box in IntelliJ -->
<!-- Its adds Flink's core classes to the runtime class path. -->
<!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
<profiles>
<profile>
<id>add-dependencies-for-IDEA</id>
<activation>
<property>
<name>idea.version</name>
</property>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
</profile>
</profiles>
</project>
自定义一个工具类ElasticsearchSinkUtil.java
package com.vincent;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.flink.util.ExceptionUtils;
import org.apache.http.HttpHost;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.List;
public class ElasticSearchSinkUtil {
public static List<HttpHost> getEsAddresses(String hosts) {
String[] hostList = hosts.split(",");
List<HttpHost> addresses = new ArrayList<>();
for (String host : hostList) {
String[] ip_port = host.split(":");
String ip = ip_port[0];
String port = ip_port[1];
addresses.add(new HttpHost(ip, Integer.parseInt(port)));
}
return addresses;
}
public static <T> void addSink(List<HttpHost> hosts, int bulkFlushMaxActions, int parallelism,
SingleOutputStreamOperator<T> data, ElasticsearchSinkFunction<T> func) {
ElasticsearchSink.Builder<T> esSinkBuilder = new ElasticsearchSink.Builder<>(hosts, func);
esSinkBuilder.setBulkFlushMaxActions(bulkFlushMaxActions);
esSinkBuilder.setFailureHandler(new ActionRequestFailureHandler() {
@Override
public void onFailure(ActionRequest actionRequest, Throwable throwable, int i, RequestIndexer requestIndexer) throws Throwable {
String description = actionRequest.getDescription();
System.out.println("----------");
System.out.println(description);
System.out.println("===========");
if (ExceptionUtils.findThrowable(throwable, SocketTimeoutException.class).isPresent()) {
System.out.println("超时异常");
} else if (ExceptionUtils.findThrowable(throwable, EsRejectedExecutionException.class).isPresent()) {
// 异常1: ES队列满了(Reject异常),放回队列
System.out.println("ES队列满了");
requestIndexer.add(actionRequest);
} else if (ExceptionUtils.findThrowable(throwable, ElasticsearchParseException.class).isPresent()) {
System.out.println("parse异常" + description);
} else if (ExceptionUtils.findThrowable(throwable, ElasticsearchException.class).isPresent()) {
System.out.println("出现异常");
}
}
});
data.addSink(esSinkBuilder.build()).setParallelism(parallelism);
}
}
Main方法
package com.vincent;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.tuple.Tuple7;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.xcontent.*;
import java.io.IOException;
import java.util.List;
public class Test {
public static void main(String[] args) throws Exception {
String propertiesPath = args[0];
ParameterTool parameterTool = ParameterTool.fromPropertiesFile(propertiesPath);
List<HttpHost> esAddresses = ElasticSearchSinkUtil.getEsAddresses(parameterTool.get("es.hosts"));
int bulk_size = parameterTool.getInt("es.bulk.flushMaxAction");
int sinkParallelism = parameterTool.getInt("es.sink.parallelism");
String rawPath = parameterTool.get("rawPath");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> dataStreamSource = env.readTextFile(rawPath);
SingleOutputStreamOperator<Tuple7<String, String, String, String, String, String, String>> map = dataStreamSource.map(new MapFunction<String, Tuple7<String, String, String, String, String, String, String>>() {
@Override
public Tuple7<String, String, String, String, String, String, String> map(String s) throws Exception {
String[] splits = s.split("t");
String field1= splits[0];
String field2 = splits[1];
String field3= splits[2];
String field4= splits[3];
String field5= splits[4];
String field6= splits[5];
String field7= splits[6];
return new Tuple7<>(uid, timestamp, desc_info, related_identity, record_num, desc_type, date);
}
});
ElasticSearchSinkUtil.addSink(esAddresses, bulk_size, sinkParallelism, map, new ElasticsearchSinkFunction<Tuple7<String, String, String, String, String, String, String>>() {
@Override
public void process(Tuple7<String, String, String, String, String, String, String> data, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
IndexRequest indexRequest = null;
try {
indexRequest = createIndexRequest(data);
} catch (IOException e) {
e.printStackTrace();
}
requestIndexer.add(indexRequest);
}
public IndexRequest createIndexRequest(Tuple7<String, String, String, String, String, String, String> data) throws IOException {
JSONObject jsonObject = new JSONObject();
jsonObject.put("field1", data.f0);
jsonObject.put("field2", data.f1);
jsonObject.put("field3", JSONObject.parseObject(data.f2));
jsonObject.put("field4", JSONObject.parseObject(data.f3));
jsonObject.put("field5", data.f4);
jsonObject.put("field6", data.f5);
jsonObject.put("field7", data.f6);
return Requests.indexRequest()
.index("my_index")
.type("type").source(jsonObject.toString(), XContentType.JSON);
}
});
// map.setParallelism(1).print();
env.execute("Test");
}
}
自定义一个配置文件
可以灵活地修改配置文件:
es.hosts=swarm-manager:9200,swarm-worker1:9200,swarm-worker2:9200
es.bulk.flushMaxAction=200
es.sink.parallelism=1
# hdfs: hdfs://swarm-manager:9001/text/000000_0, windows: E:/test/hello.txt
# rawPath=hdfs://swarm-manager:9001/text/000000_0
rawPath=E:/test/000000_0
打包部署
使用mvn pakage
打包应用,将生成的hadoop-hdfs-1.0-SNAPSHOT-shaded.jar
拷贝至服务器中。
启动Flink集群
使用命令./flink-1.8.2/bin/start-cluster.bat
启动集群
运行作业
使用命令:flink run ./hadoop-hdfs-1.0-SNAPSHOT-shaded.jar ./flink-es.properties
就可以运行该作业了
在浏览器中输入http://服务器IP:8081
可以查看作业运行情况
最后
以上就是专注唇彩为你收集整理的Apache Flink 读取本地文件,处理数据,导入ES需求环境pom.xml自定义一个工具类ElasticsearchSinkUtil.javaMain方法自定义一个配置文件打包部署启动Flink集群运行作业的全部内容,希望文章能够帮你解决Apache Flink 读取本地文件,处理数据,导入ES需求环境pom.xml自定义一个工具类ElasticsearchSinkUtil.javaMain方法自定义一个配置文件打包部署启动Flink集群运行作业所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复