概述
Flink入门到放弃
- 简介
- 快速上手
- 准备
- pom.xml
- log4j.properties
- 配置idea, 运行的时候包括provided scope
- 批处理wordCount
- 流处理wordCount
- 有界
- 无界
- 运行模式
- 部署
- 开发模式
- local-cluster模式
- 运行无界WordCount
- 停止集群
- Standalone模式
- 运行无界WordCount
- Standalone高可用(HA)
- 配置
- 启动
- Yarn模式
- Session-Cluster
- Per-Job-Cluster
- Application Mode
- yarn模式高可用
- Scala REPL
- 错误解决
- 错误1
- 错误2
简介
Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。Flink被设计在所有常见的集群环境中运行,以内存执行速度和任意规模来执行计算。
https://flink.apache.org/zh/#
快速上手
准备
pom.xml
<properties>
<flink.version>1.12.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<slf4j.version>1.7.30</slf4j.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
log4j.properties
log4j.rootLogger=error, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
配置idea, 运行的时候包括provided scope
Application->
configuration->
Include dependencies with "Provided" scope
批处理wordCount
package com.yire.quickstart.workcount;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
/**
* 批处理
*/
public class BatchHandle {
public static void main(String[] args) throws Exception {
ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> dataSource = environment.readTextFile("input/testWordCount.txt");
dataSource
.flatMap((FlatMapFunction<String, Tuple2<String, Long>>) (line, out) -> {
String[] s = line.split(" ");
for (String s1 : s) {
out.collect(new Tuple2<>(s1, 1L));
}
})
//泛型擦除,要指定泛型
// 在很多情况下,当涉及Java泛型时,lambda方法无法提供足够的信息来进行自动类型提取。
.returns(Types.TUPLE(Types.STRING, Types.LONG))
.groupBy(0)
.sum(1)
.print();
}
}
流处理wordCount
有界
package com.yire.quickstart.workcount;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
/**
* 有界流处理
*/
public class BoundStreamWordCount {
public static void main(String[] args) throws Exception {
//用流式
//
ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> dataSource = environment.readTextFile("input/testWordCount.txt");
//原始写法
dataSource
.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> out) throws Exception {
Arrays
.asList(line.split(" "))
.stream()
.forEach(x -> {
out.collect(x);
});
}
})
.returns(Types.STRING)
.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String value) throws Exception {
return Tuple2.of(value, 1L);
}
})
.keyBy(new KeySelector<Tuple2<String,Long>, String>() {
@Override
public String getKey(Tuple2<String, Long> value) throws Exception {
return value.f0;
}
})
.sum(1)
.print();
dataSource
.flatMap((FlatMapFunction<String, String>) (line, out) -> Arrays
.stream(line.split(" "))
.forEach(out::collect))
.returns(Types.STRING)
.map(value -> Tuple2.of(value, 1L))
//这个代码用lambda时要加上
.returns(Types.TUPLE(Types.STRING, Types.LONG))
.keyBy(k -> k.f0)
.sum(1)
.print();
//保持
environment.execute();
}
}
无界
package com.yire.quickstart.workcount;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
/**
* 无界流处理
*/
public class UnboundStreamWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> socket = environment.socketTextStream("hadoop162", 8089);
socket
.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
Arrays.stream(line.split(" "))
.map(s -> Tuple2.of(s, 1L))
.forEach(out::collect);
})
.returns(Types.TUPLE(Types.STRING, Types.LONG))
.keyBy(k -> k.f0)
.sum(1)
.print();
environment.execute();
}
}
测试,在虚拟机
nc -lk hadoop162 8089
运行模式
可以指定批模式读取有界数据
environment.setRuntimeMode(RuntimeExecutionMode.BATCH);
部署
开发模式
上面的快速入门就是开发模式
local-cluster模式
本地集群,零配置,用于测试、学习
- 解压tar包
- 解压完的目录复制出一个flink-local
运行无界WordCount
-
idea中打包(不带依赖),并上传
-
启动本地集群bin/start-cluster.sh
-
测试
nc -lk hadoop162 8089
-
提交flink应用
-
命令行提交
bin/flink run -m hadoop162:8081 -c com.yire.quickstart.workcount.UnboundStreamWordCount ./flink-1.0-SNAPSHOT.jar
bin/flink run -m 主机名:端口 -c 全类名 jar包 bin/flink run [OPTIONS] <jar-file> <arguments> -m参数不支持高可用的集群
-
也可以在web-ui界面提交应用
- Submit New Job
-
-
访问hadoop162:8081(默认web ui server的端口为8081)查看执行情况
-
可以进log/查看执行结果
停止集群
bin/stop-cluster.sh
Standalone模式
独立集群模式
-
复制安装目录为flink-standalone
-
修改配置flink-conf.yaml
jobmanager.rpc.address: hadoop162
-
修改workers
hadoop164 hadoop165
没有hadoop162
运行无界WordCount
和local-cluster完全相同
Standalone高可用(HA)
- 需要zookeeper
- 高可用数据存在hdfs,所以也要开hdfs
- JobManager为StandaloneSessionClusterEntrypoint进程
- 执行者为TaskManagerRunner进程
配置
-
修改flink-conf.yaml
high-availability: zookeeper high-availability.storageDir: hdfs://hadoop162:8020/flink/standalone/ha high-availability.zookeeper.quorum: hadoop162:2181,hadoop164:2181,hadoop165:2181 high-availability.zookeeper.path.root: /flink-standalone high-availability.cluster-id: /cluster_yire
-
修改masters
hadoop162:8081 hadoop163:8081
-
环境变量/etc/profile.d/my.sh
export HADOOP_CLASSPATH=`hadoop classpath`
-
分发
启动
- 启动zookeeper
- 启动hdfs
- 启动flink
- 可以访问zookeeper的节点查看主从
Yarn模式
-
复制安装包成flink-yarn
-
配置HADOOP_CLASSPATH
export HADOOP_CLASSPATH=`hadoop classpath`
Session-Cluster
-
将flink的jobManager启动在yarn上,动态分配资源,跑任务就申请,跑完就释放
-
启动flink-Session,-d的意思detached独立模式,相当于启动完退出客户端
bin/yarn-session.sh -d
-
在session上运行job
-
自动找
bin/flink run -d -c com.yire.quickstart.workcount.UnboundStreamWordCount ./flink-1.0-SNAPSHOT.jar
-
指定端口
#端口是随机生成的,在页面的submit界面可以看到 bin/flink run -d -m hadoop162:34382 -c com.yire.quickstart.workcount.UnboundStreamWordCount ./flink-1.0-SNAPSHOT.jar
-
指定yarn的appid
bin/flink run -d -t yarn-session -Dyarn.application.id=application_1611137588501_0005 -c com.yire.quickstart.workcount.UnboundStreamWordCount ./flink-1.0-SNAPSHOT.jar
-
Per-Job-Cluster
-
启动hadoop集群
-
运行jar包
bin/flink run -t yarn-per-job -c com.yire.quickstart.workcount.UnboundStreamWordCount ./flink-1.0-SNAPSHOT.jar
-
每次都会提交一个flink集群
Application Mode
跟Per-Job-Cluster的不同是,main函数执行的位置,此模式为yarn的container上(flink的集群上),Per-Job-Cluster为本地
bin/flink run-application -d -t yarn-application -c com.yire.quickstart.workcount.UnboundStreamWordCount ./flink-1.0-SNAPSHOT.jar
yarn模式高可用
原理与standalone不同,standalone为一开就启动多个jobManager,yarn模式下为jobManager停了,会重启
-
在yarn-site.xml中配置
<property> <name>yarn.resourcemanager.am.max-attempts</name> <value>4</value> <description> The maximum number of application master execution attempts. </description> </property>
-
配置完分发, 和重启yarn
-
在flink-conf.yaml中配置
#这个值应该小于等于上面yarn那个值 yarn.application-attempts: 3 high-availability: zookeeper high-availability.storageDir: hdfs://hadoop162:8020/flink/yarn/ha high-availability.zookeeper.quorum: hadoop162:2181,hadoop164:2181,hadoop165:2181 high-availability.zookeeper.path.root: /flink-yarn
-
启动yarn-session
-
杀死Jobmanager, 查看的他的复活情况
-
重试的次数为固定间隔时间内的次数,此间隔时间内成功启动的话,会重置这个次数
Scala REPL
类似spark-shell
-
本地模式
bin/start-scala-shell.sh local
-
yarn-session模式
bin/start-scala-shell.sh yarn
错误解决
错误1
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/streaming/api/environment/StreamExecutionEnvironment
解决:idea配置
Application->
configuration->
Include dependencies with "Provided" scope
错误2
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(UnboundStreamWordCount.java:19)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
at org.apache.flink.api.dag.Transformation.getOutputType(Transformation.java:484)
at org.apache.flink.streaming.api.datastream.DataStream.getType(DataStream.java:190)
at org.apache.flink.streaming.api.datastream.KeyedStream.<init>(KeyedStream.java:115)
at org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:290)
at com.yire.quickstart.workcount.UnboundStreamWordCount.main(UnboundStreamWordCount.java:24)
lambda泛型擦除,加上returns(…指定类型…)
最后
以上就是优美咖啡为你收集整理的Flink入门到放弃简介快速上手部署Scala REPL错误解决的全部内容,希望文章能够帮你解决Flink入门到放弃简介快速上手部署Scala REPL错误解决所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复