我是靠谱客的博主 完美蜜蜂,最近开发中收集的这篇文章主要介绍kafka学习资料,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述


问题导读
1.Kafka独特设计在什么地方?

2.Kafka如何搭建及创建topic、发送消息、消费消息?
3.如何书写Kafka程序?
4.数据传输的事务定义有哪三种?
5.Kafka判断一个节点是否活着有哪两个条件?
6.producer是否直接将数据发送到broker的leader(主节点)?
7.Kafa consumer是否可以消费指定分区消息?
8.Kafka消息是采用Pull模式,还是Push模式?
9.Procuder API有哪两种?
10.Kafka存储在硬盘上的消息格式是什么?
一、基本概念

介绍

这个独特的设计是什么样的呢?
Topics 和Logs
分布式
Producers
Consumers
相比传统的消息系统,Kafka可以很好的保证有序性。
传统的队列在服务器上保存有序的消息,如果多个consumers同时从这个服务器消费消息,服务器就会以消息存储的顺序向consumer分发消息。虽然服务器按顺序发布消息,但是消息是被异步的分发到各consumer上,所以当消息到达时可能已经失去了原来的顺序,这意味着并发消费将导致顺序错乱。为了避免故障,这样的消息系统通常使用“专用consumer”的概念,其实就是只允许一个消费者消费消息,当然这就意味着失去了并发性。
二、环境搭建
Step 1: 下载Kafka

  1. > tar -xzf kafka_2.9.2-0.8.1.1.tgz
  2. > cd kafka_2.9.2-0.8.1.1
复制代码

Step 2: 启动服务
  1. > bin/zookeeper-server-start.sh config/zookeeper.properties &
  2. [2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
  3. ...
复制代码

  1. > bin/kafka-server-start.sh config/server.properties
  2. [2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
  3. [2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
  4. ...
复制代码

Step 3: 创建 topic
  1. > bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
复制代码

  1. > bin/kafka-topics.sh --list --zookeeper localhost:2181
  2. test
复制代码

Step 4:发送消息.
  1. > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
  2. This is a messageThis is another message
复制代码
Step 5: 启动consumer
  1. > bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
  2. This is a message
  3. This is another message
复制代码
Step 6: 搭建一个多个broker的集群
  1. > cp config/server.properties config/server-1.properties
  2. > cp config/server.properties config/server-2.properties
复制代码
  1. config/server-1.properties:
  2.     broker.id=1
  3.     port=9093
  4.     log.dir=/tmp/kafka-logs-1
复制代码
  1. config/server-2.properties:
  2.     broker.id=2
  3.     port=9094
  4.     log.dir=/tmp/kafka-logs-2
复制代码
  1. > bin/kafka-server-start.sh config/server-1.properties &
  2. ...
  3. > bin/kafka-server-start.sh config/server-2.properties &
  4. ...
复制代码
  1. > bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
复制代码
  1. > bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
复制代码
  1. Topic:my-replicated-topic       PartitionCount:1        ReplicationFactor:3     Configs:
  2.         Topic: my-replicated-topic      Partition: 0    Leader: 1       Replicas: 1,2,0 Isr: 1,2,0
复制代码
leader:负责处理消息的读和写,leader是从所有节点中随机选择的.
replicas:列出了所有的副本节点,不管节点是否在服务中.
isr:是正在服务中的节点.
  1. > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
复制代码
  1. ...
  2. my test message 1my test message 2^C
复制代码
  1. > bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
复制代码
  1. ...
  2. my test message 1
  3. my test message 2
  4. ^C
复制代码
  1. > ps | grep server-1.properties7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.6/Home/bin/java...
  2. > kill -9 7564
复制代码
  1. > bin/kafka-topics.sh --describe --zookeeper localhost:218192 --topic my-replicated-topic
  2. Topic:my-replicated-topic       PartitionCount:1        ReplicationFactor:3     Configs:
  3.         Topic: my-replicated-topic      Partition: 0    Leader: 2       Replicas: 1,2,0 Isr: 2,0
复制代码
  1. > bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
  2. ...
  3. my test message 1
  4. my test message 2
复制代码
三、搭建Kafka开发环境

  1. <dependency>
  2.         <groupId> org.apache.kafka</groupId >
  3.         <artifactId> kafka_2.10</artifactId >
  4.         <version> 0.8.0</ version>
  5. </dependency>
复制代码
配置程序

  1. package com.sohu.kafkademon;
  2. public interface KafkaProperties
  3. {
  4.     final static String zkConnect = "10.22.10.139:2181";
  5.     final static String groupId = "group1";
  6.     final static String topic = "topic1";
  7.     final static String kafkaServerURL = "10.22.10.139";
  8.     final static int kafkaServerPort = 9092;
  9.     final static int kafkaProducerBufferSize = 64 * 1024;
  10.     final static int connectionTimeOut = 20000;
  11.     final static int reconnectInterval = 10000;
  12.     final static String topic2 = "topic2";
  13.     final static String topic3 = "topic3";
  14.     final static String clientId = "SimpleConsumerDemoClient";
  15. }
复制代码
producer
  1. package com.sohu.kafkademon;
  2. import java.util.Properties;
  3. import kafka.producer.KeyedMessage;
  4. import kafka.producer.ProducerConfig;
  5. /**
  6. * @author leicui bourne_cui@163.com
  7. */
  8. public class KafkaProducer extends Thread
  9. {
  10.     private final kafka.javaapi.producer.Producer<Integer, String> producer;
  11.     private final String topic;
  12.     private final Properties props = new Properties();
  13.     public KafkaProducer(String topic)
  14.     {
  15.         props.put("serializer.class", "kafka.serializer.StringEncoder");
  16.         props.put("metadata.broker.list", "10.22.10.139:9092");
  17.         producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props));
  18.         this.topic = topic;
  19.     }
  20.     @Override
  21.     public void run() {
  22.         int messageNo = 1;
  23.         while (true)
  24.         {
  25.             String messageStr = new String("Message_" + messageNo);
  26.             System.out.println("Send:" + messageStr);
  27.             producer.send(new KeyedMessage<Integer, String>(topic, messageStr));
  28.             messageNo++;
  29.             try {
  30.                 sleep(3000);
  31.             } catch (InterruptedException e) {
  32.                 // TODO Auto-generated catch block
  33.                 e.printStackTrace();
  34.             }
  35.         }
  36.     }
  37. }
复制代码
consumer
  1. package com.sohu.kafkademon;
  2. import java.util.HashMap;
  3. import java.util.List;
  4. import java.util.Map;
  5. import java.util.Properties;
  6. import kafka.consumer.ConsumerConfig;
  7. import kafka.consumer.ConsumerIterator;
  8. import kafka.consumer.KafkaStream;
  9. import kafka.javaapi.consumer.ConsumerConnector;
  10. /**
  11. * @author leicui bourne_cui@163.com
  12. */
  13. public class KafkaConsumer extends Thread
  14. {
  15.     private final ConsumerConnector consumer;
  16.     private final String topic;
  17.     public KafkaConsumer(String topic)
  18.     {
  19.         consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
  20.                 createConsumerConfig());
  21.         this.topic = topic;
  22.     }
  23.     private static ConsumerConfig createConsumerConfig()
  24.     {
  25.         Properties props = new Properties();
  26.         props.put("zookeeper.connect", KafkaProperties.zkConnect);
  27.         props.put("group.id", KafkaProperties.groupId);
  28.         props.put("zookeeper.session.timeout.ms", "40000");
  29.         props.put("zookeeper.sync.time.ms", "200");
  30.         props.put("auto.commit.interval.ms", "1000");
  31.         return new ConsumerConfig(props);
  32.     }
  33.     @Override
  34.     public void run() {
  35.         Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
  36.         topicCountMap.put(topic, new Integer(1));
  37.         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
  38.         KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
  39.         ConsumerIterator<byte[], byte[]> it = stream.iterator();
  40.         while (it.hasNext()) {
  41.             System.out.println("receive:" + new String(it.next().message()));
  42.             try {
  43.                 sleep(3000);
  44.             } catch (InterruptedException e) {
  45.                 e.printStackTrace();
  46.             }
  47.         }
  48.     }
  49. }
复制代码
简单的发送接收
  1. package com.sohu.kafkademon;
  2. /**
  3. * @author leicui bourne_cui@163.com
  4. */
  5. public class KafkaConsumerProducerDemo
  6. {
  7.     public static void main(String[] args)
  8.     {
  9.         KafkaProducer producerThread = new KafkaProducer(KafkaProperties.topic);
  10.         producerThread.start();
  11.         KafkaConsumer consumerThread = new KafkaConsumer(KafkaProperties.topic);
  12.         consumerThread.start();
  13.     }
  14. }
复制代码
高级别的consumer
  1. package com.sohu.kafkademon;
  2. import java.util.HashMap;
  3. import java.util.List;
  4. import java.util.Map;
  5. import java.util.Properties;
  6. import kafka.consumer.ConsumerConfig;
  7. import kafka.consumer.ConsumerIterator;
  8. import kafka.consumer.KafkaStream;
  9. import kafka.javaapi.consumer.ConsumerConnector;
  10. /**
  11. * @author leicui bourne_cui@163.com
  12. */
  13. public class KafkaConsumer extends Thread
  14. {
  15.     private final ConsumerConnector consumer;
  16.     private final String topic;
  17.     public KafkaConsumer(String topic)
  18.     {
  19.         consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
  20.                 createConsumerConfig());
  21.         this.topic = topic;
  22.     }
  23.     private static ConsumerConfig createConsumerConfig()
  24.     {
  25.         Properties props = new Properties();
  26.         props.put("zookeeper.connect", KafkaProperties.zkConnect);
  27.         props.put("group.id", KafkaProperties.groupId);
  28.         props.put("zookeeper.session.timeout.ms", "40000");
  29.         props.put("zookeeper.sync.time.ms", "200");
  30.         props.put("auto.commit.interval.ms", "1000");
  31.         return new ConsumerConfig(props);
  32.     }
  33.     @Override
  34.     public void run() {
  35.         Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
  36.         topicCountMap.put(topic, new Integer(1));
  37.         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
  38.         KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
  39.         ConsumerIterator<byte[], byte[]> it = stream.iterator();
  40.         while (it.hasNext()) {
  41.             System.out.println("receive:" + new String(it.next().message()));
  42.             try {
  43.                 sleep(3000);
  44.             } catch (InterruptedException e) {
  45.                 e.printStackTrace();
  46.             }
  47.         }
  48.     }
  49. }
复制代码
四、数据持久化
不要畏惧文件系统!
另外再来讨论一下JVM,以下两个事实是众所周知的:
常量时间的操作效率
五、消息传输的事务定义

  • 最多一次: 消息不会被重复发送,最多被传输一次,但也有可能一次不传输。
  • 最少一次: 消息不会被漏发送,最少被传输一次,但也有可能被重复传输.
  • 精确的一次(Exactly once): 不会漏传输也不会重复传输,每个消息都传输被一次而且仅仅被传输一次,这是大家所期望的。
    大多数消息系统声称可以做到“精确的一次”,但是仔细阅读它们的的文档可以看到里面存在误导,比如没有说明当consumer或producer失败时怎么样,或者当有多个consumer并行时怎么样,或写入硬盘的数据丢失时又会怎么样。kafka的做法要更先进一些。当发布消息时,Kafka有一个“committed”的概念,一旦消息被提交了,只要消息被写入的分区的所在的副本broker是活动的,数据就不会丢失。关于副本的活动的概念,下节文档会讨论。现在假设broker是不会down的。
  • consumer可以先读取消息,然后将offset写入日志文件中,然后再处理消息。这存在一种可能就是在存储offset后还没处理消息就crash了,新的consumer继续从这个offset处理,那么就会有些消息永远不会被处理,这就是上面说的“最多一次”。
  • consumer可以先读取消息,处理消息,最后记录offset,当然如果在记录offset之前就crash了,新的consumer会重复的消费一些消息,这就是上面说的“最少一次”。
  • “精确一次”可以通过将提交分为两个阶段来解决:保存了offset后提交一次,消息处理成功之后再提交一次。但是还有个更简单的做法:将消息的offset和消息被处理后的结果保存在一起。比如用Hadoop ETL处理消息时,将处理后的结果和offset同时保存在HDFS中,这样就能保证消息和offser同时被处理了。

    六、性能优化

    消息集(message set)
    zero copy
    Broker维护的消息日志仅仅是一些目录文件,消息集以固定队的格式写入到日志文件中,这个格式producer和consumer是共享的,这使得Kafka可以一个很重要的点进行优化:消息在网络上的传递。现代的unix操作系统提供了高性能的将数据从页面缓存发送到socket的系统函数,在linux中,这个函数是sendfile.
  • 操作系统把数据从文件拷贝内核中的页缓存中
  • 应用程序从页缓存从把数据拷贝自己的内存缓存中
  • 应用程序将数据写入到内核中socket缓存中
  • 操作系统把数据从socket缓存中拷贝到网卡接口缓存,从这里发送到网络上。

    数据压缩
    很多时候,性能的瓶颈并非CPU或者硬盘而是网络带宽,对于需要在数据中心之间传送大量数据的应用更是如此。当然用户可以在没有Kafka支持的情况下各自压缩自己的消息,但是这将导致较低的压缩率,因为相比于将消息单独压缩,将大量文件压缩在一起才能起到最好的压缩效果。
    七、Producer和Consumer

    Kafka Producer 消息发送
    异步发送
    Kafka Consumer
    推还是拉?
    消费状态跟踪
    但是这样会不会有什么问题呢?如果一条消息发送出去之后就立即被标记为消费过的,一旦consumer处理消息时失败了(比如程序崩溃)消息就丢失了。为了解决这个问题,很多消息系统提供了另外一个个功能:当消息被发送出去之后仅仅被标记为已发送状态,当接到consumer已经消费成功的通知后才标记为已被消费的状态。这虽然解决了消息丢失的问题,但产生了新问题,首先如果consumer处理消息成功了但是向broker发送响应时失败了,这条消息将被消费两次。第二个问题时,broker必须维护每条消息的状态,并且每次都要先锁住消息然后更改状态然后释放锁。这样麻烦又来了,且不说要维护大量的状态数据,比如如果消息发送出去但没有收到消费成功的通知,这条消息将一直处于被锁定的状态,
    离线处理消息
    八、主从同步

    Kafka判断一个节点是否活着有两个条件:
  • 节点必须可以维护和ZooKeeper的连接,Zookeeper通过心跳机制检查每个节点的连接。
  • 如果节点是个follower,他必须能及时的同步leader的写操作,延时不能太久。
    符合以上条件的节点准确的说应该是“同步中的(in sync)”,而不是模糊的说是“活着的”或是“失败的”。Leader会追踪所有“同步中”的节点,一旦一个down掉了,或是卡住了,或是延时太久,leader就会把它移除。至于延时多久算是“太久”,是由参数replica.lag.max.messages决定的,怎样算是卡住了,怎是由参数replica.lag.time.max.ms决定的。 
    Leader的选择
  • 等待ISR中的任何一个节点恢复并担任leader。
  • 选择所有节点中(不只是ISR)第一个恢复的节点作为leader.
    这是一个在可用性和连续性之间的权衡。如果等待ISR中的节点恢复,一旦ISR中的节点起不起来或者数据都是了,那集群就永远恢复不了了。如果等待ISR意外的节点恢复,这个节点的数据就会被作为线上数据,有可能和真实的数据有所出入,因为有些数据它可能还没同步到。Kafka目前选择了第二种策略,在未来的版本中将使这个策略的选择可配置,可以根据场景灵活的选择。
    副本管理
    九、客户端API

    Kafka Producer APIs
    1. class Producer {
    2. /* 将消息发送到指定分区 */
    3. publicvoid send(kafka.javaapi.producer.ProducerData<K,V> producerData);
    4. /* 批量发送一批消息 */
    5. publicvoid send(java.util.List<kafka.javaapi.producer.ProducerData<K,V>> producerData);
    6. /* 关闭producer */
    7. publicvoid close();
    8. }
    复制代码
    Producer API提供了以下功能:
  • 提供了基于Zookeeper的broker自动感知能力,可以通过参数zk.connect实现。如果不使用Zookeeper,也可以使用broker.list参数指定一个静态的brokers列表,这样消息将被随机的发送到一个broker上,一旦选中的broker失败了,消息发送也就失败了。
  • 通过分区函数kafka.producer.Partitioner类对消息分区。
  • interface Partitioner<T> {
  • int partition(T key, int numPartitions);
  • }


  • KafKa Consumer APIs
    低级别的API
    1. class SimpleConsumer {
    2. /*向一个broker发送读取请求并得到消息集 */
    3. public ByteBufferMessageSet fetch(FetchRequest request);
    4. /*向一个broker发送读取请求并得到一个相应集 */
    5. public MultiFetchResponse multifetch(List<FetchRequest> fetches);
    6. /**
    7. * 得到指定时间之前的offsets
    8. * 返回值是offsets列表,以倒序排序
    9. * @param time: 时间,毫秒,
    10. * 如果指定为OffsetRequest$.MODULE$.LATIEST_TIME(), 得到最新的offset.
    11. * 如果指定为OffsetRequest$.MODULE$.EARLIEST_TIME(),得到最老的offset.
    12. */
    13. publiclong[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets);
    14. }
    复制代码
    高级别的API
    1. /* 创建连接 */
    2. ConsumerConnector connector = Consumer.create(consumerConfig);
    3. interface ConsumerConnector {
    4. /**
    5. * 这个方法可以得到一个流的列表,每个流都是MessageAndMetadata的迭代,通过MessageAndMetadata可以拿到消息和其他的元数据(目前之后topic)
    6. * Input: a map of <topic, #streams>
    7. * Output: a map of <topic, list of message streams>
    8. */
    9. public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap);
    10. /**
    11. * 你也可以得到一个流的列表,它包含了符合TopicFiler的消息的迭代,
    12. * 一个TopicFilter是一个封装了白名单或黑名单的正则表达式。
    13. */
    14. public List<KafkaStream> createMessageStreamsByFilter(
    15. TopicFilter topicFilter, int numStreams);
    16. /* 提交目前消费到的offset */
    17. public commitOffsets()
    18. /* 关闭连接 */
    19. public shutdown()
    20. }
    复制代码
    十、消息和日志
    1. /**
    2. * 具有N个字节的消息的格式如下
    3. *
    4. * 如果版本号是0
    5. *
    6. * 1. 1个字节的 "magic" 标记
    7. *
    8. * 2. 4个字节的CRC32校验码
    9. *
    10. * 3. N - 5个字节的具体信息
    11. *
    12. * 如果版本号是1
    13. *
    14. * 1. 1个字节的 "magic" 标记
    15. *
    16. * 2.1个字节的参数允许标注一些附加的信息比如是否压缩了,解码类型等
    17. *
    18. * 3.4个字节的CRC32校验码
    19. *
    20. * 4. N - 6 个字节的具体信息
    21. *
    22. */
    复制代码
    日志一个叫做“my_topic”且有两个分区的的topic,它的日志有两个文件夹组成,my_topic_0和my_topic_1,每个文件夹里放着具体的数据文件,每个数据文件都是一系列的日志实体,每个日志实体有一个4个字节的整数N标注消息的长度,后边跟着N个字节的消息。每个消息都可以由一个64位的整数offset标注,offset标注了这条消息在发送到这个分区的消息流中的起始位置。每个日志文件的名称都是这个文件第一条日志的offset.所以第一个日志文件的名字就是00000000000.kafka.所以每相邻的两个文件名字的差就是一个数字S,S差不多就是配置文件中指定的日志文件的最大容量。
  • 消息长度: 4 bytes (value: 1+4+n)
  • 版本号: 1 byte
  • CRC校验码: 4 bytes
  • 具体的消息: n bytes

    写操作消息被不断的追加到最后一个日志的末尾,当日志的大小达到一个指定的值时就会产生一个新的文件。对于写操作有两个参数,一个规定了消息的数量达到这个值时必须将数据刷新到硬盘上,另外一个规定了刷新到硬盘的时间间隔,这对数据的持久性是个保证,在系统崩溃的时候只会丢失一定数量的消息或者一个时间段的消息。
    读操作
    1. MessageSetSend (fetch result)

    2. total length     : 4 bytes
    3. error code       : 2 bytes
    4. message 1        : x bytes
    5. ...
    6. message n        : x bytes
    7. MultiMessageSetSend (multiFetch result)

    8. total length       : 4 bytes
    9. error code         : 2 bytes
    10. messageSetSend 1
    11. ...
    12. messageSetSend n
    复制代码
    删除
    可靠性保证

    最后

    以上就是完美蜜蜂为你收集整理的kafka学习资料的全部内容,希望文章能够帮你解决kafka学习资料所遇到的程序开发问题。

    如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

    本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
    点赞(72)

    评论列表共有 0 条评论

  • 立即
    投稿
    返回
    顶部