概述
1、所示,客户端包版本得跟卡夫卡版本对应、否则报错如下
15:45:04.132 [kafka-producer-network-thread | producer-1] DEBUG o.apache.kafka.clients.NetworkClient - Completed connection to node -1.
Fetching API versions.
15:45:04.133 [kafka-producer-network-thread | producer-1] DEBUG o.apache.kafka.clients.NetworkClient - Initiating API versions fetch from node -1.
SLF4J: Failed toString() invocation on an object of type [org.apache.kafka.clients.NodeApiVersions]
java.lang.NullPointerException
at org.apache.kafka.clients.NodeApiVersions.apiVersionToText(NodeApiVersions.java:167)
at org.apache.kafka.clients.NodeApiVersions.toString(NodeApiVersions.java:134)
at org.apache.kafka.clients.NodeApiVersions.toString(NodeApiVersions.java:120)
at org.slf4j.helpers.MessageFormatter.safeObjectAppend(MessageFormatter.java:304)
at org.slf4j.helpers.MessageFormatter.deeplyAppendParameter(MessageFormatter.java:276)
at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:230)
at ch.qos.logback.classic.spi.LoggingEvent.getFormattedMessage(LoggingEvent.java:298)
at ch.qos.logback.classic.spi.LoggingEvent.prepareForDeferredProcessing(LoggingEvent.java:208)
at ch.qos.logback.core.OutputStreamAppender.subAppend(OutputStreamAppender.java:212)
at ch.qos.logback.core.OutputStreamAppender.append(OutputStreamAppender.java:103)
at ch.qos.logback.core.UnsynchronizedAppenderBase.doAppend(UnsynchronizedAppenderBase.java:88)
at ch.qos.logback.core.spi.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:48)
at ch.qos.logback.classic.Logger.appendLoopOnAppenders(Logger.java:273)
at ch.qos.logback.classic.Logger.callAppenders(Logger.java:260)
at ch.qos.logback.classic.Logger.buildLoggingEventAndAppend(Logger.java:442)
at ch.qos.logback.classic.Logger.filterAndLog_2(Logger.java:433)
at ch.qos.logback.classic.Logger.debug(Logger.java:511)
at org.apache.kafka.clients.NetworkClient.handleApiVersionsResponse(NetworkClient.java:558)
at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:538)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:359)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:225)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:126)
at java.lang.Thread.run(Thread.java:748)
2、非本地调用:server.properties文件中advertised.host.name =服务器IP地址advertised.port = 9092 listeners = PLAINTEXT://0.0.0.0:9092
新版本 listeners=PLAINTEXT://服务器内网地址:9092
advertised.listeners=PLAINTEXT://服务器公网地址:9092
配置不对报错如下:
15:52:27.651 [kafka-producer-network-thread | producer-1] DEBUG o.a.kafka.common.network.Selector - [Producer clientId=producer-1] Connection with /172.17.92.57 disconnected
java.net.ConnectException: Connection timed out: no further information
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.8.0_131]
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) ~[na:1.8.0_131]
at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50) ~[kafka-clients-2.0.0.jar:na]
at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:152) ~[kafka-clients-2.0.0.jar:na]
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:471) [kafka-clients-2.0.0.jar:na]
at org.apache.kafka.common.network.Selector.poll(Selector.java:425) [kafka-clients-2.0.0.jar:na]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510) [kafka-clients-2.0.0.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) [kafka-clients-2.0.0.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) [kafka-clients-2.0.0.jar:na]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
3、后台运行kafka 添加 -daemon参数
./kafka-server-start.sh -daemon ../config/server.properties
java的代码消费端
包com.cn.handle;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;
公共类KafkaConsumer {
私人最终ConsumerConnector消费者;
private KafkaConsumer(){
Properties props = new Properties();
// zookeeper配置
props.put(“zookeeper.connect”,“localhost:2181”);
//消费者所在组
props.put(“group.id”,“test-consumer-group”);
// zk连接超时
props.put(“zookeeper.session.timeout.ms”,“4000”);
props.put(“zookeeper.sync.time.ms”,“200”);
props.put(“auto.commit.interval.ms”,“1000”);
props.put(“auto.offset.reset”,“最小”);
//序列化类
props.put(“serializer.class”,“kafka.serializer.StringEncoder”);
ConsumerConfig config = new ConsumerConfig(props);
消费者= kafka.consumer.Consumer。 createJavaConsumerConnector(配置);
}
void consume(){
Map <String,Integer> topicCountMap = new HashMap <String,Integer>();
topicCountMap.put(“zzbtest”,new Integer(1));
StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());
Map <String,List <KafkaStream <String,String >>> consumerMap =
consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);
KafkaStream <String,String> stream = consumerMap.get(“zzbtest”)。get(0);
ConsumerIterator <String,String> it = stream.iterator();
int messageCount = 0;
while(it.hasNext()){
的System.out.println(it.next()消息());
messageCount ++;
if(messageCount == 100){
System.out.println(“Consumer端一共消费了”+ messageCount +“条消息!”);
}
}
的System.out.println( “结束”);
}
公共静态无效的主要(字串[] args){
新KafkaConsumer()消耗();
}
}
消息发送端:
package com.cn.handle;
import java.io.IOException;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
/**
*kafka 消息发送
*
*/
public class SendDatatToKafka {
public static void main(String[] args) {
SendDatatToKafka s = new SendDatatToKafka();
try {
s.send("zzbtest", "jack", "rose");
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void send(String topic, String key, String data) throws IOException {
Properties props = new Properties();
props.put("metadata.broker.list", "localhost:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
// key.serializer.class默认为serializer.class
props.put("key.serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
for (int i = 0; i < 1000; i++) {
try {
Thread.sleep(100);
} catch(InterruptedException e){
// TODO自动生成的catch块
e.printStackTrace();
}
producer.send(new KeyedMessage <String,String>(topic,key,data + i));
}
producer.close();
}
}
最后
以上就是懦弱睫毛为你收集整理的kafka java客户端调用问题的全部内容,希望文章能够帮你解决kafka java客户端调用问题所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复