我是靠谱客的博主 专注唇彩,最近开发中收集的这篇文章主要介绍Apache Flink 读取本地文件,处理数据,导入ES需求环境pom.xml自定义一个工具类ElasticsearchSinkUtil.javaMain方法自定义一个配置文件打包部署启动Flink集群运行作业,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

需求

  • 本地有一份文件
  • 使用Flink读取本地数据源
  • 处理数据,导入ES中
  • 提交Flink作业

环境

  • Flink :1.8.2
  • Elasticsearch:6.2.3
  • JDK:1.8

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.vincent</groupId>
    <artifactId>hadoop-hdfs</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.8.2</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.11</scala.binary.version>
        <hadoop.version>2.8.5</hadoop.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
    </properties>

    <repositories>
        <repository>
            <id>apache.snapshots</id>
            <name>Apache Development Snapshot Repository</name>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
            <releases>
                <enabled>false</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
    </repositories>

    <dependencies>
        <!-- Apache Flink dependencies -->
        <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- Elasticsearch 6.x -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- Hadoop -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-hadoop-compatibility_2.11</artifactId>
            <version>1.8.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.41</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>runtime</scope>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.6</version>
        </dependency>


    </dependencies>

    <build>
        <plugins>

            <!-- Java Compiler -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                </configuration>
            </plugin>

            <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
            <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.vincent.Test</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>

        <pluginManagement>
            <plugins>

                <!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
                <plugin>
                    <groupId>org.eclipse.m2e</groupId>
                    <artifactId>lifecycle-mapping</artifactId>
                    <version>1.0.0</version>
                    <configuration>
                        <lifecycleMappingMetadata>
                            <pluginExecutions>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-shade-plugin</artifactId>
                                        <versionRange>[3.0.0,)</versionRange>
                                        <goals>
                                            <goal>shade</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-compiler-plugin</artifactId>
                                        <versionRange>[3.1,)</versionRange>
                                        <goals>
                                            <goal>testCompile</goal>
                                            <goal>compile</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                            </pluginExecutions>
                        </lifecycleMappingMetadata>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>

    <!-- This profile helps to make things run out of the box in IntelliJ -->
    <!-- Its adds Flink's core classes to the runtime class path. -->
    <!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
    <profiles>
        <profile>
            <id>add-dependencies-for-IDEA</id>

            <activation>
                <property>
                    <name>idea.version</name>
                </property>
            </activation>

            <dependencies>
                <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-java</artifactId>
                    <version>${flink.version}</version>
                    <scope>compile</scope>
                </dependency>
                <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
                    <version>${flink.version}</version>
                    <scope>compile</scope>
                </dependency>
            </dependencies>
        </profile>
    </profiles>

</project>

自定义一个工具类ElasticsearchSinkUtil.java

package com.vincent;

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.flink.util.ExceptionUtils;
import org.apache.http.HttpHost;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;

import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.List;

public class ElasticSearchSinkUtil {
    public static List<HttpHost> getEsAddresses(String hosts) {
        String[] hostList = hosts.split(",");
        List<HttpHost> addresses = new ArrayList<>();
        for (String host : hostList) {
            String[] ip_port = host.split(":");
            String ip = ip_port[0];
            String port = ip_port[1];
            addresses.add(new HttpHost(ip, Integer.parseInt(port)));
        }
        return addresses;
    }

    public static <T> void addSink(List<HttpHost> hosts, int bulkFlushMaxActions, int parallelism,
                                   SingleOutputStreamOperator<T> data, ElasticsearchSinkFunction<T> func) {
        ElasticsearchSink.Builder<T> esSinkBuilder = new ElasticsearchSink.Builder<>(hosts, func);
        esSinkBuilder.setBulkFlushMaxActions(bulkFlushMaxActions);
        esSinkBuilder.setFailureHandler(new ActionRequestFailureHandler() {
            @Override
            public void onFailure(ActionRequest actionRequest, Throwable throwable, int i, RequestIndexer requestIndexer) throws Throwable {
                String description = actionRequest.getDescription();
                System.out.println("----------");
                System.out.println(description);
                System.out.println("===========");
                if (ExceptionUtils.findThrowable(throwable, SocketTimeoutException.class).isPresent()) {
                    System.out.println("超时异常");
                } else if (ExceptionUtils.findThrowable(throwable, EsRejectedExecutionException.class).isPresent()) {
                    // 异常1: ES队列满了(Reject异常),放回队列
                    System.out.println("ES队列满了");
                    requestIndexer.add(actionRequest);
                } else if (ExceptionUtils.findThrowable(throwable, ElasticsearchParseException.class).isPresent()) {
                    System.out.println("parse异常" + description);
                } else if (ExceptionUtils.findThrowable(throwable, ElasticsearchException.class).isPresent()) {
                    System.out.println("出现异常");
                }
            }
        });
        data.addSink(esSinkBuilder.build()).setParallelism(parallelism);
    }
}

Main方法

package com.vincent;

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.tuple.Tuple7;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.xcontent.*;

import java.io.IOException;
import java.util.List;

public class Test {
    public static void main(String[] args) throws Exception {
        String propertiesPath = args[0];
        ParameterTool parameterTool = ParameterTool.fromPropertiesFile(propertiesPath);
        List<HttpHost> esAddresses = ElasticSearchSinkUtil.getEsAddresses(parameterTool.get("es.hosts"));
        int bulk_size = parameterTool.getInt("es.bulk.flushMaxAction");
        int sinkParallelism = parameterTool.getInt("es.sink.parallelism");
        String rawPath = parameterTool.get("rawPath");

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> dataStreamSource = env.readTextFile(rawPath);
        SingleOutputStreamOperator<Tuple7<String, String, String, String, String, String, String>> map = dataStreamSource.map(new MapFunction<String, Tuple7<String, String, String, String, String, String, String>>() {
            @Override
            public Tuple7<String, String, String, String, String, String, String> map(String s) throws Exception {
                String[] splits = s.split("t");
                String field1= splits[0];
                String field2 = splits[1];
                String field3= splits[2];
                String field4= splits[3];
                String field5= splits[4];
                String field6= splits[5];
                String field7= splits[6];
                return new Tuple7<>(uid, timestamp, desc_info, related_identity, record_num, desc_type, date);
            }
        });

        ElasticSearchSinkUtil.addSink(esAddresses, bulk_size, sinkParallelism, map, new ElasticsearchSinkFunction<Tuple7<String, String, String, String, String, String, String>>() {
            @Override
            public void process(Tuple7<String, String, String, String, String, String, String> data, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
                IndexRequest indexRequest = null;
                try {
                    indexRequest = createIndexRequest(data);
                } catch (IOException e) {
                    e.printStackTrace();
                }
                requestIndexer.add(indexRequest);
            }

            public IndexRequest createIndexRequest(Tuple7<String, String, String, String, String, String, String> data) throws IOException {
                JSONObject jsonObject = new JSONObject();
                jsonObject.put("field1", data.f0);
                jsonObject.put("field2", data.f1);
                jsonObject.put("field3", JSONObject.parseObject(data.f2));
                jsonObject.put("field4", JSONObject.parseObject(data.f3));
                jsonObject.put("field5", data.f4);
                jsonObject.put("field6", data.f5);
                jsonObject.put("field7", data.f6);
                return Requests.indexRequest()
                        .index("my_index")
                        .type("type").source(jsonObject.toString(), XContentType.JSON);
            }
        });

        // map.setParallelism(1).print();
        env.execute("Test");
    }
}

自定义一个配置文件

可以灵活地修改配置文件:

es.hosts=swarm-manager:9200,swarm-worker1:9200,swarm-worker2:9200
es.bulk.flushMaxAction=200
es.sink.parallelism=1
# hdfs: hdfs://swarm-manager:9001/text/000000_0, windows: E:/test/hello.txt
# rawPath=hdfs://swarm-manager:9001/text/000000_0
rawPath=E:/test/000000_0

打包部署

使用mvn pakage打包应用,将生成的hadoop-hdfs-1.0-SNAPSHOT-shaded.jar拷贝至服务器中。

启动Flink集群

使用命令./flink-1.8.2/bin/start-cluster.bat启动集群

运行作业

使用命令:flink run ./hadoop-hdfs-1.0-SNAPSHOT-shaded.jar ./flink-es.properties就可以运行该作业了
在浏览器中输入http://服务器IP:8081可以查看作业运行情况

最后

以上就是专注唇彩为你收集整理的Apache Flink 读取本地文件,处理数据,导入ES需求环境pom.xml自定义一个工具类ElasticsearchSinkUtil.javaMain方法自定义一个配置文件打包部署启动Flink集群运行作业的全部内容,希望文章能够帮你解决Apache Flink 读取本地文件,处理数据,导入ES需求环境pom.xml自定义一个工具类ElasticsearchSinkUtil.javaMain方法自定义一个配置文件打包部署启动Flink集群运行作业所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(65)

评论列表共有 0 条评论

立即
投稿
返回
顶部