我是靠谱客的博主 虚幻板凳,最近开发中收集的这篇文章主要介绍大数据之flink数据一致性,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

一、flink分析结果写入redis

1、下载flink-hadoop整合包,放入所有节点

2、KafkaToRedisWordCount

package cn._51doit.flink.day08;

import cn._51doit.flink.day02.RedisSinkDemo;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.util.Collector;

import java.util.Properties;

/**
 * 当前的程序能不能容错(保证数据的一致性)
 * 当前程序如果可以保证数据的一致性,是使用ExactlyOnce还是AtLeastOnce,使用的是AtLeastOnce
 * KafkaSource:可以记录偏移量,可以将偏移量保存到状态中(OperatorState)
 * keyBy后调用sum:sum有状态(ValueState)
 * RedisSink:使用HSET方法可以将数据覆盖(幂等性)
 */
public class KafkaToRedisWordCount {

    //--topic doit2021 --groupId g02 --redisHost node-3.51doit.cn 
	//--redisPwd 123456 --fsBackend hdfs://node-1.51doit.cn:9000/flinkck2021
    public static void main(String[] args) throws Exception{

        //System.setProperty("HADOOP_USER_NAME", "root");
        ParameterTool parameterTool = ParameterTool.fromArgs(args);

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		 //可以间内存中的状态持久化到StateBackend
        env.enableCheckpointing(parameterTool.getLong("chkInterval", 30000));
        //设置状态存储的后端
        env.setStateBackend(new FsStateBackend(parameterTool.getRequired("fsBackend")));
        
        //如果你手动canceljob后,不删除job的checkpoint数据
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        //设置Kafka相关参数
        Properties properties = new Properties();//设置Kafka的地址和端口
        properties.setProperty("bootstrap.servers", "node-1.51doit.cn:9092,node-1.51doit.cn:9092,node-1.51doit.cn:9092");
        //读取偏移量策略:如果没有记录偏移量,就从头读,如果记录过偏移量,就接着读
        properties.setProperty("auto.offset.reset", "earliest");
        //设置消费者组ID
        properties.setProperty("group.id", parameterTool.get("groupId"));
        //开启checkpoint,不然让flink的消费(source对他的subtask)自动提交偏移量
        properties.setProperty("enable.auto.commit", "false");
        //创建FlinkKafkaConsumer并传入相关参数
        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
                parameterTool.getRequired("topic"), //要读取数据的Topic名称
                new SimpleStringSchema(), //读取文件的反序列化Schema
                properties //传入Kafka的参数
        );
        
        //设置在checkpoint是不将偏移量保存到kafka特殊的topic中,可设可不设
        //kafkaConsumer.setCommitOffsetsOnCheckpoints(false); 

        //使用addSource添加kafkaConsumer
        DataStreamSource<String> lines = env.addSource(kafkaConsumer);

        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] words = line.split(" ");
                for (String word : words) {
                    //new Tuple2<String, Integer>(word, 1)
                    collector.collect(Tuple2.of(word, 1));
                }
            }
        });

        //分组
        KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndOne.keyBy(t -> t.f0);

        //聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> summed = keyed.sum(1);

        //将聚合后的结果写入到Redis中
        //调用Sink
        //summed.addSink()
        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
                .setHost(parameterTool.getRequired("redisHost"))
                .setPassword(parameterTool.getRequired("redisPwd"))
                .setDatabase(9).build();

        summed.addSink(new RedisSink<Tuple2<String, Integer>>(conf, new RedisSinkDemo.RedisWordCountMapper()));

        env.execute();

    }

    private static class RedisWordCountMapper implements RedisMapper<Tuple2<String, Integer>> {

        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.HSET, "WORD_COUNT");
        }

        @Override
        public String getKeyFromData(Tuple2<String, Integer> data) {
            return data.f0;
        }

        @Override
        public String getValueFromData(Tuple2<String, Integer> data) {
            return data.f1.toString();
        }
    }

}

备注:若redis挂了,flink继续写入数据,redis恢复,错过数据依旧写进来,因为;
取消flink, 不删除偏移量数据,重启后指定上次checkpoint,还能继续计算, 上面的案例就使用的这种方式或者使用savePoint,取消时手动保存。

在这里插入图片描述
在这里插入图片描述

二、从kafka读取数据,处理后写回kafka

package cn._51doit.flink.day09;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

import java.util.Properties;

/**
 * 从Kafka中读取数据,并且将数据处理后在写回到Kafka
 * 要求:保证数据的一致性
 * ExactlyOnce(Source可以记录偏移量【重放】,如果出现异常,的偏移量不更新),Sink要求支持事务
 * 开启Checkpointping,Source的偏移量保存到状态中(OperatorState),然后将处理的数据也保存状态中
 */
public class KafkaToKafka {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //开启checkpointing
        env.enableCheckpointing(30000);
        env.setStateBackend(new FsStateBackend("file:///Users/xing/Desktop/flinkck20210123"));

        //设置Kafka相关参数
        Properties properties = new Properties();//设置Kafka的地址和端口
        properties.setProperty("bootstrap.servers", "node-1.51doit.cn:9092,node-1.51doit.cn:9092,node-1.51doit.cn:9092");
        //读取偏移量策略:如果没有记录偏移量,就从头读,如果记录过偏移量,就接着读
        properties.setProperty("auto.offset.reset", "earliest");
        //设置消费者组ID
        properties.setProperty("group.id", "g1");
        //没有开启checkpoint,让flink提交偏移量的消费者定期自动提交偏移量
        properties.setProperty("enable.auto.commit", "false");
        //创建FlinkKafkaConsumer并传入相关参数
        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
                "doit2021", //要读取数据的Topic名称
                new SimpleStringSchema(), //读取文件的反序列化Schema
                properties //传入Kafka的参数
        );
        //使用addSource添加kafkaConsumer
        kafkaConsumer.setCommitOffsetsOnCheckpoints(false); //在checkpoint时,不将偏移量写入到kafka特殊的topic中

        DataStreamSource<String> lines = env.addSource(kafkaConsumer);

        SingleOutputStreamOperator<String> filtered = lines.filter(e -> !e.startsWith("error"));

        //使用的是AtLeastOnce
//        FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
//                "node-1.51doit.cn:9092,node-2.51doit.cn:9092,node-3.51doit.cn:9092", "out2021", new SimpleStringSchema()
//        );


        //写入Kafka的topic
        String topic = "out2021";
        //设置Kafka相关参数
        properties.setProperty("transaction.timeout.ms",1000 * 60 * 5 + "");

        //创建FlinkKafkaProducer
        FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>(
                topic, //指定topic
                new KafkaStringSerializationSchema(topic), //指定写入Kafka的序列化Schema
                properties, //指定Kafka的相关参数
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE //指定写入Kafka为EXACTLY_ONCE语义
        );

        filtered.addSink(kafkaProducer);

        env.execute();


    }
}

2、定义KafkaStringSerializationSchema

package cn._51doit.flink.day09;

import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;

import javax.annotation.Nullable;
import java.nio.charset.Charset;

/**
 * 自定义String类型数据Kafka的序列化Schema
 */
public class KafkaStringSerializationSchema implements KafkaSerializationSchema<String> {

    private String topic;
    private String charset;
    //构造方法传入要写入的topic和字符集,默认使用UTF-8
    public KafkaStringSerializationSchema(String topic) {
        this(topic, "UTF-8");
    }
    public KafkaStringSerializationSchema(String topic, String charset) {
        this.topic = topic;
        this.charset = charset;
    }
    //调用该方法将数据进行序列化
    @Override
    public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {
        //将数据转成bytes数组
        byte[] bytes = element.getBytes(Charset.forName(charset));
        //返回ProducerRecord
        return new ProducerRecord<>(topic, bytes);
    }
}

最后

以上就是虚幻板凳为你收集整理的大数据之flink数据一致性的全部内容,希望文章能够帮你解决大数据之flink数据一致性所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(64)

评论列表共有 0 条评论

立即
投稿
返回
顶部