概述
一、flink分析结果写入redis
1、下载flink-hadoop整合包,放入所有节点
2、KafkaToRedisWordCount
package cn._51doit.flink.day08;
import cn._51doit.flink.day02.RedisSinkDemo;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.util.Collector;
import java.util.Properties;
/**
* 当前的程序能不能容错(保证数据的一致性)
* 当前程序如果可以保证数据的一致性,是使用ExactlyOnce还是AtLeastOnce,使用的是AtLeastOnce
* KafkaSource:可以记录偏移量,可以将偏移量保存到状态中(OperatorState)
* keyBy后调用sum:sum有状态(ValueState)
* RedisSink:使用HSET方法可以将数据覆盖(幂等性)
*/
public class KafkaToRedisWordCount {
//--topic doit2021 --groupId g02 --redisHost node-3.51doit.cn
//--redisPwd 123456 --fsBackend hdfs://node-1.51doit.cn:9000/flinkck2021
public static void main(String[] args) throws Exception{
//System.setProperty("HADOOP_USER_NAME", "root");
ParameterTool parameterTool = ParameterTool.fromArgs(args);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//可以间内存中的状态持久化到StateBackend
env.enableCheckpointing(parameterTool.getLong("chkInterval", 30000));
//设置状态存储的后端
env.setStateBackend(new FsStateBackend(parameterTool.getRequired("fsBackend")));
//如果你手动canceljob后,不删除job的checkpoint数据
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//设置Kafka相关参数
Properties properties = new Properties();//设置Kafka的地址和端口
properties.setProperty("bootstrap.servers", "node-1.51doit.cn:9092,node-1.51doit.cn:9092,node-1.51doit.cn:9092");
//读取偏移量策略:如果没有记录偏移量,就从头读,如果记录过偏移量,就接着读
properties.setProperty("auto.offset.reset", "earliest");
//设置消费者组ID
properties.setProperty("group.id", parameterTool.get("groupId"));
//开启checkpoint,不然让flink的消费(source对他的subtask)自动提交偏移量
properties.setProperty("enable.auto.commit", "false");
//创建FlinkKafkaConsumer并传入相关参数
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
parameterTool.getRequired("topic"), //要读取数据的Topic名称
new SimpleStringSchema(), //读取文件的反序列化Schema
properties //传入Kafka的参数
);
//设置在checkpoint是不将偏移量保存到kafka特殊的topic中,可设可不设
//kafkaConsumer.setCommitOffsetsOnCheckpoints(false);
//使用addSource添加kafkaConsumer
DataStreamSource<String> lines = env.addSource(kafkaConsumer);
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] words = line.split(" ");
for (String word : words) {
//new Tuple2<String, Integer>(word, 1)
collector.collect(Tuple2.of(word, 1));
}
}
});
//分组
KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndOne.keyBy(t -> t.f0);
//聚合
SingleOutputStreamOperator<Tuple2<String, Integer>> summed = keyed.sum(1);
//将聚合后的结果写入到Redis中
//调用Sink
//summed.addSink()
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
.setHost(parameterTool.getRequired("redisHost"))
.setPassword(parameterTool.getRequired("redisPwd"))
.setDatabase(9).build();
summed.addSink(new RedisSink<Tuple2<String, Integer>>(conf, new RedisSinkDemo.RedisWordCountMapper()));
env.execute();
}
private static class RedisWordCountMapper implements RedisMapper<Tuple2<String, Integer>> {
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET, "WORD_COUNT");
}
@Override
public String getKeyFromData(Tuple2<String, Integer> data) {
return data.f0;
}
@Override
public String getValueFromData(Tuple2<String, Integer> data) {
return data.f1.toString();
}
}
}
备注:若redis挂了,flink继续写入数据,redis恢复,错过数据依旧写进来,因为;
取消flink, 不删除偏移量数据,重启后指定上次checkpoint,还能继续计算, 上面的案例就使用的这种方式或者使用savePoint,取消时手动保存。
二、从kafka读取数据,处理后写回kafka
package cn._51doit.flink.day09;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.util.Properties;
/**
* 从Kafka中读取数据,并且将数据处理后在写回到Kafka
* 要求:保证数据的一致性
* ExactlyOnce(Source可以记录偏移量【重放】,如果出现异常,的偏移量不更新),Sink要求支持事务
* 开启Checkpointping,Source的偏移量保存到状态中(OperatorState),然后将处理的数据也保存状态中
*/
public class KafkaToKafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//开启checkpointing
env.enableCheckpointing(30000);
env.setStateBackend(new FsStateBackend("file:///Users/xing/Desktop/flinkck20210123"));
//设置Kafka相关参数
Properties properties = new Properties();//设置Kafka的地址和端口
properties.setProperty("bootstrap.servers", "node-1.51doit.cn:9092,node-1.51doit.cn:9092,node-1.51doit.cn:9092");
//读取偏移量策略:如果没有记录偏移量,就从头读,如果记录过偏移量,就接着读
properties.setProperty("auto.offset.reset", "earliest");
//设置消费者组ID
properties.setProperty("group.id", "g1");
//没有开启checkpoint,让flink提交偏移量的消费者定期自动提交偏移量
properties.setProperty("enable.auto.commit", "false");
//创建FlinkKafkaConsumer并传入相关参数
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
"doit2021", //要读取数据的Topic名称
new SimpleStringSchema(), //读取文件的反序列化Schema
properties //传入Kafka的参数
);
//使用addSource添加kafkaConsumer
kafkaConsumer.setCommitOffsetsOnCheckpoints(false); //在checkpoint时,不将偏移量写入到kafka特殊的topic中
DataStreamSource<String> lines = env.addSource(kafkaConsumer);
SingleOutputStreamOperator<String> filtered = lines.filter(e -> !e.startsWith("error"));
//使用的是AtLeastOnce
// FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
// "node-1.51doit.cn:9092,node-2.51doit.cn:9092,node-3.51doit.cn:9092", "out2021", new SimpleStringSchema()
// );
//写入Kafka的topic
String topic = "out2021";
//设置Kafka相关参数
properties.setProperty("transaction.timeout.ms",1000 * 60 * 5 + "");
//创建FlinkKafkaProducer
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>(
topic, //指定topic
new KafkaStringSerializationSchema(topic), //指定写入Kafka的序列化Schema
properties, //指定Kafka的相关参数
FlinkKafkaProducer.Semantic.EXACTLY_ONCE //指定写入Kafka为EXACTLY_ONCE语义
);
filtered.addSink(kafkaProducer);
env.execute();
}
}
2、定义KafkaStringSerializationSchema
package cn._51doit.flink.day09;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;
import javax.annotation.Nullable;
import java.nio.charset.Charset;
/**
* 自定义String类型数据Kafka的序列化Schema
*/
public class KafkaStringSerializationSchema implements KafkaSerializationSchema<String> {
private String topic;
private String charset;
//构造方法传入要写入的topic和字符集,默认使用UTF-8
public KafkaStringSerializationSchema(String topic) {
this(topic, "UTF-8");
}
public KafkaStringSerializationSchema(String topic, String charset) {
this.topic = topic;
this.charset = charset;
}
//调用该方法将数据进行序列化
@Override
public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {
//将数据转成bytes数组
byte[] bytes = element.getBytes(Charset.forName(charset));
//返回ProducerRecord
return new ProducerRecord<>(topic, bytes);
}
}
最后
以上就是虚幻板凳为你收集整理的大数据之flink数据一致性的全部内容,希望文章能够帮你解决大数据之flink数据一致性所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复