我是靠谱客的博主 幽默大侠,最近开发中收集的这篇文章主要介绍Caused by: java.io.NotSerializableException: org.apache.kafka.common.metrics.MetricConfig,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

  • flink 自定义数据源的开发过程中遇到这样一个错误。
  • 错误原因:在于KafkaConsumer的初始化放在了定义处。
  • 解决: 应该放在open方法内初始化。
  •  DataStreamSource<String> dataStreamSource = env.addSource(new KafkaSourceFunction());
            //获取数据 kafka消费数据获取
            DataStreamSource<String> data = env.addSource(new RichSourceFunction<String>() {
                KafkaConsumer<String, String> kafkaConsumer;
    //            = new KafkaConsumer<>(prop);
                //new StringDeserializer(), new StringDeserializer());
    
                @Override
                public void open(Configuration parameters) throws Exception {
                    super.open(parameters);
                    String topic = "xxx";
                    Properties prop = new Properties();
                    prop.setProperty("bootstrap.servers","xxx:0000");
                    prop.setProperty("group.id","con1");
                    prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
                    prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
                    kafkaConsumer = new KafkaConsumer<>(prop);
                    kafkaConsumer.subscribe(Arrays.asList(topic));
    
                }
    
                @Override
                public void run(SourceContext<String> sourceContext) throws Exception {
                    while (true) {
                        ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(500));
                        Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
    
                        while (iterator.hasNext()){
                            ConsumerRecord<String, String> record = iterator.next();
                            String key = record.key();
                            String value = record.value();
                            sourceContext.collect("key:"+key+", value: "+value);
                        }
                    }
    
                }
    
                @Override
                public void cancel() {
                    System.out.println("cancel");
                }
            });

     

最后

以上就是幽默大侠为你收集整理的Caused by: java.io.NotSerializableException: org.apache.kafka.common.metrics.MetricConfig的全部内容,希望文章能够帮你解决Caused by: java.io.NotSerializableException: org.apache.kafka.common.metrics.MetricConfig所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(49)

评论列表共有 0 条评论

立即
投稿
返回
顶部