我是靠谱客的博主 幽默大侠,最近开发中收集的这篇文章主要介绍Caused by: java.io.NotSerializableException: org.apache.kafka.common.metrics.MetricConfig,觉得挺不错的,现在分享给大家,希望可以做个参考。
概述
- flink 自定义数据源的开发过程中遇到这样一个错误。
- 错误原因:在于KafkaConsumer的初始化放在了定义处。
- 解决: 应该放在open方法内初始化。
-
DataStreamSource<String> dataStreamSource = env.addSource(new KafkaSourceFunction()); //获取数据 kafka消费数据获取 DataStreamSource<String> data = env.addSource(new RichSourceFunction<String>() { KafkaConsumer<String, String> kafkaConsumer; // = new KafkaConsumer<>(prop); //new StringDeserializer(), new StringDeserializer()); @Override public void open(Configuration parameters) throws Exception { super.open(parameters); String topic = "xxx"; Properties prop = new Properties(); prop.setProperty("bootstrap.servers","xxx:0000"); prop.setProperty("group.id","con1"); prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); kafkaConsumer = new KafkaConsumer<>(prop); kafkaConsumer.subscribe(Arrays.asList(topic)); } @Override public void run(SourceContext<String> sourceContext) throws Exception { while (true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(500)); Iterator<ConsumerRecord<String, String>> iterator = records.iterator(); while (iterator.hasNext()){ ConsumerRecord<String, String> record = iterator.next(); String key = record.key(); String value = record.value(); sourceContext.collect("key:"+key+", value: "+value); } } } @Override public void cancel() { System.out.println("cancel"); } });
最后
以上就是幽默大侠为你收集整理的Caused by: java.io.NotSerializableException: org.apache.kafka.common.metrics.MetricConfig的全部内容,希望文章能够帮你解决Caused by: java.io.NotSerializableException: org.apache.kafka.common.metrics.MetricConfig所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复