我是靠谱客的博主 还单身世界,最近开发中收集的这篇文章主要介绍Flink学习笔记(4)——source and sink前言从集合读取数据从文件读取数据从socket读取数据从kafka读取数据自定义Sourcesink到kafkasink到redissink到Elasticsearchjdbc自定义,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

文章目录

  • 前言
  • 从集合读取数据
  • 从文件读取数据
  • 从socket读取数据
  • 从kafka读取数据
  • 自定义Source
  • sink到kafka
  • sink到redis
  • sink到Elasticsearch
  • jdbc自定义

前言

flink中提供了很多种数据源,有基于本地集合的 source、基于文件的 source、基于网络套接字的 source、自定义的 source。自定义的 source 常见的有 Apache kafka、Amazon Kinesis Streams、RabbitMQ、Twitter Streaming API、Apache NiFi 等,当然你也可以定义自己的 source。
在这里插入图片描述
Flink 常见的 Sink 大概有如下几类:写入文件、打印出来、写入 socket 、自定义的 sink 。自定义的 sink 常见的有 Apache kafka、RabbitMQ、MySQL、ElasticSearch、Apache Cassandra、Hadoop FileSystem 等,同理你也可以定义自己的 Sink。
在这里插入图片描述
自定义sink需要继承RichSinkFunction 类,主要实现open、invoke、close三个方法。open主要是一些连接,invoke主要是插入逻辑,close是关闭。比如sink to mysql,open建立连接,invoke具体写执行逻辑,close关闭连接。

从集合读取数据

  1. fromCollection(Collection) - 从 Java 的 Java.util.Collection 创建数据流。集合中的所有元素类型必须相同。
  2. fromCollection(Iterator, Class) - 从一个迭代器中创建数据流。Class 指定了该迭代器返回元素的类型。
  3. fromElements(T …) - 从给定的对象序列中创建数据流。所有对象类型必须相同。
  4. fromParallelCollection(SplittableIterator, Class) - 从一个迭代器中创建并行数据流。Class 指定了该迭代器返回元素的类型。
  5. fromSequence(from, to) - 创建一个生成指定区间范围内的数字序列的并行数据流。
    package source;
    
    import bean.SensorReading;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.util.Arrays;
    
    /**
     * Created with IntelliJ IDEA.
     *
     * @Author: yingtian
     * @Date: 2021/05/10/16:36
     * @Description: 从集合中读取数据
     */
    public class SourceTest1_Collection {
    
        public static void main(String[] args) throws Exception{
            //创建执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            env.setParallelism(1);
    
            // Source: 从集合Collection中获取数据
            DataStream<SensorReading> dataStream = env.fromCollection(
                    Arrays.asList(
                            new SensorReading("sensor_1", 1547718199L, 35.8),
                            new SensorReading("sensor_6", 1547718201L, 15.4),
                            new SensorReading("sensor_7", 1547718202L, 6.7),
                            new SensorReading("sensor_10", 1547718205L, 38.1)
                    )
            );
    
            //元素类型必须相同
            DataStream<Integer> intStream = env.fromElements(1,2,3,4,5,6,7,8,9);
    
            //生成区间 输出10、11、12、13、14、15
            DataStreamSource<Long> longStream = env.fromSequence(10, 15);
    
            //打印
            dataStream.print("sensorReading");
            intStream.print("int");
            longStream.print("long");
    
            //执行程序
            env.execute();
        }
    }
    

从文件读取数据

  1. readTextFile(path) - 读取文本文件,即符合 TextInputFormat 规范的文件,并将其作为字符串返回。

  2. readFile(fileInputFormat, path) - 根据指定的文件输入格式读取文件(一次)。

  3. readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)

    这是上面两个方法内部调用的方法。它根据给定的 fileInputFormat 和读取路径读取文件。根据提供的 watchType,这个 source 可以定期(每隔 interval 毫秒)监测给定路径的新数据(FileProcessingMode.PROCESS_CONTINUOUSLY),或者处理一次路径对应文件的数据并退出(FileProcessingMode.PROCESS_ONCE)。你可以通过 pathFilter 进一步排除掉需要处理的文件。
    实现:
    在具体实现上,Flink 把文件读取过程分为两个子任务,即目录监控和数据读取。每个子任务都由单独的实体实现。目录监控由单个非并行(并行度为1)的任务执行,而数据读取由并行运行的多个任务执行。后者的并行性等于作业的并行性。单个目录监控任务的作用是扫描目录(根据 watchType 定期扫描或仅扫描一次),查找要处理的文件并把文件分割成切分片(splits),然后将这些切分片分配给下游 reader。reader 负责读取数据。每个切分片只能由一个 reader 读取,但一个 reader 可以逐个读取多个切分片。
    重要注意:
    如果 watchType 设置为 FileProcessingMode.PROCESS_CONTINUOUSLY,则当文件被修改时,其内容将被重新处理。这会打破“exactly-once”语义,因为在文件末尾附加数据将导致其所有内容被重新处理。
    如果 watchType 设置为 FileProcessingMode.PROCESS_ONCE,则 source 仅扫描路径一次然后退出,而不等待 reader 完成文件内容的读取。当然 reader 会继续阅读,直到读取所有的文件内容。关闭 source 后就不会再有检查点。这可能导致节点故障后的恢复速度较慢,因为该作业将从最后一个检查点恢复读取。

    package source;
    
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import wc.WordCountSet;
    
    /**
     * Created with IntelliJ IDEA.
     *
     * @Author: yingtian
     * @Date: 2021/05/10/16:54
     * @Description: 从文件中获取数据
     */
    public class SourceTest2_File {
    
        public static void main(String[] args) throws Exception{
            //创建执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            env.setParallelism(1);
    
            //从文件中读取数据
            String inputPath = WordCountSet.class.getClassLoader().getResource("sensor.txt").getFile();
            DataStreamSource<String> textStream = env.readTextFile(inputPath);
    
            textStream.print();
    
            //执行程序
            env.execute();
        }
    }
    

    readTextFile方法内部调用:在这里插入图片描述

从socket读取数据

  1. socketTextStream(host,port):从指定ip和端口获取数据
    底层调用socketTextStream(hostname, port, delimiter, 0)方法

    package wc;
    
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.util.Collector;
    
    /**
     * Created with IntelliJ IDEA.
     *
     * @Author: yingtian
     * @Date: 2021/04/29/15:09
     * @Description: 监听socket数据,测试流
     */
    public class WordCountSocketStream {
        public static void main(String[] args) throws Exception {
            //创建运行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            //读取配置
            ParameterTool config = ParameterTool.fromArgs(args);
            String host = config.get("host");
            int port = Integer.parseInt(config.get("port"));
    
            //创建socket流
            DataStreamSource<String> socketStream = env.socketTextStream(host, port);
    
            // 对数据集进行处理,按空格分词展开,转换成(word, 1)二元组进行统计
            SingleOutputStreamOperator<Tuple2<String, Integer>> resultStream = socketStream.flatMap(new MyFlatMapper()).setParallelism(2)
                    .keyBy(item -> item.f0) // 按照第一个位置的word分组
                    .sum(1);// 按照第二个位置上的数据求和
    
            //打印输出
            resultStream.print("socket");
            env.execute();
        }
    
        public static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String,Integer>> {
    
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] arr = line.split(" ");
                for(String str : arr){
                    collector.collect(new Tuple2<>(str,1));
                }
            }
        }
    
    }
    
    

从kafka读取数据

pom文件中添加flink-kafka的jar包

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
package source;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

/**
 * Created with IntelliJ IDEA.
 *
 * @Author: yingtian
 * @Date: 2021/05/10/17:35
 * @Description: 从kafka获取数据
 */
public class SourceTest3_Kafka {

    public static void main(String[] args) throws Exception{
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 设置并行度1
        env.setParallelism(1);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "consumer-group");
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        //获取kafka的consumer 默认从当前位置消费
        FlinkKafkaConsumer<String> sensor = new FlinkKafkaConsumer<>("sensor", new SimpleStringSchema(), properties);

        //从头开始消费
//        sensor.setStartFromEarliest();
        //从最新位置开始消费
//        sensor.setStartFromLatest();

        // flink添加外部数据源
        DataStream<String> dataStream = env.addSource(sensor);

        // 打印输出
        dataStream.print();

        env.execute();
    }
}

自定义Source

flink支持自定义source,只需要实现SourceFunction接口。

package source;

import bean.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.HashMap;
import java.util.Random;

/**
 * Created with IntelliJ IDEA.
 *
 * @Author: yingtian
 * @Date: 2021/05/10/18:19
 * @Description: 自定义source
 */
public class SourceTest4_UDF {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<SensorReading> dataStream = env.addSource(new MySensorSource());

        //打印输出
        dataStream.print();

        env.execute();
    }

    // 实现自定义的SourceFunction
    public static class MySensorSource implements SourceFunction<SensorReading> {

        // 标示位,控制数据产生
        private volatile boolean running = true;


        @Override
        public void run(SourceContext<SensorReading> ctx) throws Exception {
            //定义一个随机数发生器
            Random random = new Random();

            // 设置10个传感器的初始温度
            HashMap<String, Double> sensorTempMap = new HashMap<>();
            for (int i = 0; i < 10; ++i) {
                sensorTempMap.put("sensor_" + (i + 1), 60 + random.nextGaussian() * 20);
            }

            while (running) {
                for (String sensorId : sensorTempMap.keySet()) {
                    // 在当前温度基础上随机波动
                    Double newTemp = sensorTempMap.get(sensorId) + random.nextGaussian();
                    sensorTempMap.put(sensorId, newTemp);
                    ctx.collect(new SensorReading(sensorId,System.currentTimeMillis(),newTemp));
                }
                // 控制输出评率
                Thread.sleep(2000L);
            }
        }

        @Override
        public void cancel() {
            this.running = false;
        }
    }
}

sink到kafka

// 将数据写入Kafka
dataStream.addSink( new FlinkKafkaProducer<String>("localhost:9092", "sinktest", new SimpleStringSchema()));

sink到redis

pom.xml

<dependency>
    <groupId>org.apache.bahir</groupId>
    <artifactId>flink-connector-redis_2.11</artifactId>
    <version>1.0</version>
</dependency>
// 定义jedis连接配置(我这里连接的是docker的redis)
FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
         .setHost("localhost")
         .setPort(6379)
         .setPassword("123456")
         .setDatabase(0)
         .build();

 dataStream.addSink(new RedisSink<>(config, new MyRedisMapper()));

自定义RedisMapper

public static class MyRedisMapper implements RedisMapper<SensorReading> {
    // 定义保存数据到redis的命令,存成Hash表,hset sensor_temp id temperature
    @Override
    public RedisCommandDescription getCommandDescription() {
        return new RedisCommandDescription(RedisCommand.HSET, "sensor_temp");
    }

    @Override
    public String getKeyFromData(SensorReading data) {
        return data.getId();
    }

    @Override
    public String getValueFromData(SensorReading data) {
        return data.getTemperature().toString();
    }
}

sink到Elasticsearch

pom.xml

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-elasticsearch7_2.12</artifactId>
    <version>1.12.1</version>
</dependency>
// 定义es的连接配置
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("localhost", 9200));

dataStream.addSink( new ElasticsearchSink.Builder<SensorReading>(httpHosts, new MyEsSinkFunction()).build());

实现自定义的ES写入操作

public static class MyEsSinkFunction implements ElasticsearchSinkFunction<SensorReading> {
    @Override
    public void process(SensorReading element, RuntimeContext ctx, RequestIndexer indexer) {
        // 定义写入的数据source
        HashMap<String, String> dataSource = new HashMap<>();
        dataSource.put("id", element.getId());
        dataSource.put("temp", element.getTemperature().toString());
        dataSource.put("ts", element.getTimestamp().toString());

        // 创建请求,作为向es发起的写入命令(ES7统一type就是_doc,不再允许指定type)
        IndexRequest indexRequest = Requests.indexRequest()
                .index("sensor")
                .source(dataSource);

        // 用index发送请求
        indexer.add(indexRequest);
    }
}

jdbc自定义

pom.xml

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.19</version>
</dependency>
// 实现自定义的SinkFunction
    public static class MyJdbcSink extends RichSinkFunction<SensorReading> {
        // 声明连接和预编译语句
        Connection connection = null;
        PreparedStatement insertStmt = null;
        PreparedStatement updateStmt = null;

        @Override
        public void open(Configuration parameters) throws Exception {
            connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/flink_test?useUnicode=true&serverTimezone=Asia/Shanghai&characterEncoding=UTF-8&useSSL=false", "root", "example");
            insertStmt = connection.prepareStatement("insert into sensor_temp (id, temp) values (?, ?)");
            updateStmt = connection.prepareStatement("update sensor_temp set temp = ? where id = ?");
        }

        // 每来一条数据,调用连接,执行sql
        @Override
        public void invoke(SensorReading value, Context context) throws Exception {
            // 直接执行更新语句,如果没有更新那么就插入
            updateStmt.setDouble(1, value.getTemperature());
            updateStmt.setString(2, value.getId());
            updateStmt.execute();
            if (updateStmt.getUpdateCount() == 0) {
                insertStmt.setString(1, value.getId());
                insertStmt.setDouble(2, value.getTemperature());
                insertStmt.execute();
            }
        }

        @Override
        public void close() throws Exception {
            insertStmt.close();
            updateStmt.close();
            connection.close();
        }
    }

ps:以上内容整理于SGG教程。

最后

以上就是还单身世界为你收集整理的Flink学习笔记(4)——source and sink前言从集合读取数据从文件读取数据从socket读取数据从kafka读取数据自定义Sourcesink到kafkasink到redissink到Elasticsearchjdbc自定义的全部内容,希望文章能够帮你解决Flink学习笔记(4)——source and sink前言从集合读取数据从文件读取数据从socket读取数据从kafka读取数据自定义Sourcesink到kafkasink到redissink到Elasticsearchjdbc自定义所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(47)

评论列表共有 0 条评论

立即
投稿
返回
顶部