我是靠谱客的博主 甜蜜日记本,最近开发中收集的这篇文章主要介绍简述kafkaKafka无消息丢失配置 ,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

目录

1、如何获取 topic 主题的列表

2、生产者和消费者的命令行是什么?

3、consumer 是推还是拉?

4、kafka 维护消费状态跟踪的方法

5、讲一下主从同步**

6、为什么需要消息系统,mysql 不能满足需求吗?

7、Zookeeper 对于 Kafka 的作用是什么?

8、数据传输的事务定义有哪三种?

9、Kafka 判断一个节点是否还活着有那两个条件?

10、Kafka 与传统MQ消息系统之间有三个关键区别

11、kafka 的 ack 的三种机制

12、消费者如何不自动提交偏移量,由应用提交?

13、消费者故障,出现活锁问题如何解决?

14、如何控制消费的位置

15、kafka 分布式(不是单机)的情况下,如何保证消息的顺序消费?

16、kafka的高可用机制是什么?

17、kafka 如何减少数据丢失

18、kafka 如何不消费重复数据?比如扣款不能重复进行


 

 

1、如何获取 topic 主题的列表

bin/kafka-topics.sh --list --zookeeper localhost:2181

2、生产者和消费者的命令行是什么?

生产者在主题上发布消息:

bin/kafka-console-producer.sh --broker-list 192.168.43.49:9092 -- topic Hello-Kafka

注意这里的 IP  server.properties 中的 listeners 的配置。接下来每个新行就是输入一条新消息。消费者接受消息:bin/kafka-console-consumer.sh --zookeeper localhost:2181 -- topic Hello-Kafka -- from-beginning

3consumer 是推还是拉?

这里讲一下两种模式:

1.push

一些消息系统比如 Scribe Apache Flume采用了push 模式,将消息推送到下游的 consumer。由broker决定消息推送的速率,对不同消费速率的consumer无法统一。消息系统都致力于让consumer以最大的速率最快速的消费消息,push模式下,当 broker推送的速率远大于consumer消费的速率时,consumer趋于崩溃。而采用较低的推送速率,将可能导致一次只推送较少的消息而造成浪费。必须在不知道下游consumer消费能力和消费策略的情况下决定是立即推送每条消息还是缓存之后批量推送。

 

2.pull

Pull 模式下,consumer 可以自主决定是否批量的从 broker 拉取数据,可以根据自己的消费能力去决定这些策略。Pull的缺点是如果broker没有可供消费的消息,将导致consumer不断在循环中轮询,直到新消息到达。为了避免这点,Kafka有个参数可以让consumer阻塞知道新消息到达(当然也可以阻塞知道消息的数量达到某个特定的量这样就可以批量发送)

 Kafka遵循了一种大部分消息系统共同的传统的设计:producer将消息推送到 brokerconsumer  broker 拉取消息。

 

4kafka 维护消费状态跟踪的方法

大部分消息系统在 broker 端的维护消息被消费的记录:一个消息被分发到 consumer  broker就马上进行标记或者等待customer的通知后进行标记。这样也可以在消息在消费后立马就删除以减少空间占用。

存在的问题:

1.如果一条消息发送出去之后就立即被标记为消费过的,一旦 consumer 处理消息时失败了(比如程序崩溃)消息就丢失了。为了解决这个问题,很多消息系统提供了另外一个个功能:当消息被发送出去之后仅仅被标记为已发送状态,当接到 consumer 已经消费成功的通知后才标记为已被消费的状态。这虽然解决了消息丢失的问题,但产生了新问题,首先如果consumer处理消息成功了但是向broker发送响应时失败了,这条消息将被消费两次。

2.broker 必须维护每条消息的状态,并且每次都要先锁住消息然后更改状态然后释放锁。这样麻烦又来了,且不说要维护大量的状态数据,比如如果消息发送出去但没有收到消费成功的通知,这条消息将一直处于被锁定的状态,

Kafka 采用了不同的策略。Topic 被分成了若干分区,每个分区在同一时间只被一个 consumer 消费。这意味着每个分区被消费的消息在日志中的位置仅仅是一个简单的整数:offset。这样就很容易标记每个分区消费状态就很容易了,仅仅需要一个整数而已。这样消费状态的跟踪就很简单了。这带来了另外一个好处:consumer 可以把 offset 调成一个较老的值,去重新消费老的消息。这对传统的消息系统来说看起来有些不可思议,但确实是非常有用的,谁规定了一条消息只能被消费一次呢?

5、讲一下主从同步**

https://blog.csdn.net/honglei915/article/details/37565289

6、为什么需要消息系统,mysql 不能满足需求吗?

1.解耦:

允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。

2.冗余:

消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的插入-获取-删除范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。

3.扩展性:

因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。 

4.灵活性 & 峰值处理能力:在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。 

5.可恢复性:

系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。 

6.顺序保证:

在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。(Kafka 保证一个 Partition 内的消息的有序性)

7.缓冲:

有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。 

8.异步通信:

很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

7Zookeeper 对于 Kafka 的作用是什么?

Zookeeper 是一个开放源码的、高性能的协调服务,它用于 Kafka 的分布式应用。 Zookeeper 主要用于在集群中不同节点之间进行通信

 Kafka 中,它被用于提交偏移量,因此如果节点在任何情况下都失败了,它都可以从之前提交的偏移量中获取

除此之外,它还执行其他活动,如: leader 检测、分布式同步、配置管理、识别新节点何时离开或连接、集群、节点实时状态等等。

8、数据传输的事务定义有哪三种?

 MQTT 的事务定义一样都是 3 种。

1)最多一次: 消息不会被重复发送,最多被传输一次,但也有可能一次不传输

2)最少一次: 消息不会被漏发送,最少被传输一次,但也有可能被重复传输.

3)精确的一次(Exactly once: 不会漏传输也不会重复传输,每个消息都传输被一次而且仅仅被传输一次,这是大家所期望的

9Kafka 判断一个节点是否存活的条件

1)节点必须可以维护和 ZooKeeper 的连接,Zookeeper 通过心跳机制检查每个节点的连接

2)如果节点是个 follower,他必须能及时的同步 leader 的写操作,延时不能太久

10Kafka 与传统MQ消息系统之间有三个关键区别

Kafka持久化日志,这些日志可以被重复读取和无限期保留是一个分布式系统,它以集群的方式运行,可以灵活伸缩,在内部通过复制数据提升容错能力和高可用性支持实时的流式处理
传统MQ消息系统

 

11kafka  ack 的三种机制

request.required.acks有三个值0 1 -1( all )

0:生产者不会等待 broker  ack,这个延迟最低但是存储的保证最弱当 server 挂掉的时候就会丢数据。

1:服务端会等待 ack leader 副本确认接收到消息后发送 ack 但是如果 leader 挂掉后他不确保是否复制完成新 leader 也会导致数据丢失。

-1(all):服务端会等所有的 follower 的副本受到数据后才会受到 leader 发出的 ack,这样数据不会丢失

12、消费者如何不自动提交偏移量,由应用提交?

 auto.commit.offset 设为 false,然后在处理一批消息后 commitSync() 或者异步提交 commitAsync()

即:

ConsumerRecords<> records = consumer.poll(); 
    for (ConsumerRecord<> record : records){
        …
        try{
        consumer.commitSync()
        }
        …
    }

13、消费者故障,出现活锁问题如何解决?

出现活锁的情况,是它持续的发送心跳,但是没有处理。为了预防消费者在

这种情况下一直持有分区,我们使用 max.poll.interval.ms 活跃检测机制。 在此基础上,如果你调用的 poll 的频率大于最大间隔,则客户端将主动地离开组,以便其他消费者接管该分区。发生这种情况时,你会看到 offset 提交失败(调用commitSync()引发的 CommitFailedException)。这是一种安全机制,保障只有活动成员能够提交 offset。所以要留在组中,你必须持续调用 poll

消费者提供两个配置设置来控制 poll 循环:

max.poll.interval.ms:增大 poll 的间隔,可以为消费者提供更多的时间去处理返回的消息(调用 poll(long)返回的消息,通常返回的消息都是一批)。缺点是此值越大将会延迟组重新平衡。

max.poll.records:此设置限制每次调用 poll 返回的消息数,这样可以更容易的预测每次 poll 间隔要处理的最大值。通过调整此值,可以减少 poll 间隔,减少重新平衡分组的

对于消息处理时间不可预测地的情况,这些选项是不够的。 处理这种情况的推荐方法是将消息处理移到另一个线程中,让消费者继续调用 poll 但是必须注意确保已提交的 offset 不超过实际位置。另外,你必须禁用自动提交,并只有在线程完成处理后才为记录手动提交偏移量(取决于你)。 还要注意,你需要 pause 暂停分区,不会从 poll 接收到新消息,让线程处理完之前返回的消息(如果你的处理能力比拉取消息的慢,那创建新线程将导致你机器内存溢出)。

14、如何控制消费的位置

kafka 使用 seek(TopicPartition, long)指定新的消费位置。用于查找服务器保留的最早和最新的 offset 的特殊的方法也可用(seekToBeginning(Collection) seekToEnd(Collection)

15kafka 分布式(不是单机)的情况下,如何保证消息的顺序消费?

Kafka 分布式的单位是 partition,同一个partition用一个write ahead log组织,所以可以保证 FIFO的顺序。不同 partition 之间不能保证顺序。但是绝大多数用户都可以通过 message key来定义,因为同一个keymessage可以保证只发送到同一个 partition

Kafka 中发送1条消息的时候,可以指定(topic, partition, key) 3 个参数。partitonkey 是可选的。如果你指定了 partition,那就是所有消息发往同 1 partition,就是有序的。并且在消费端,Kafka保证,1partition只能被1consumer消费。或者你指定 key(比如 order id),具有同 1key的所有消息,会发往同1partition

16kafka的高可用机制是什么?

这个问题比较系统,回答出 kafka 的系统特点,leaderfollower的关系,消息读写的顺序即可。

https://www.cnblogs.com/qingyunzong/p/9004703.html https://www.tuicool.com/articles/BNRza2E

https://yq.aliyun.com/articles/64703

17kafka 如何减少数据丢失

参考:https://www.cnblogs.com/huxi2b/p/6056364.html

Kafka无消息丢失配置

Kafka到底通常不会丢数据(data loss),但有些情况下的确有可能会发生。下面的参数配置及Best practice列表可以较好地保证数据的持久性(当然是trade-off,牺牲了吞吐量)。会在该列表之后对列表中的每一项进行讨论

  1. block.on.buffer.full = true
  2. acks = all
  3. retries = MAX_VALUE
  4. max.in.flight.requests.per.connection = 1
  5. 使用KafkaProducer.send(record, callback)
  6. callback逻辑中显式关闭producer:close(0) 
  7. unclean.leader.election.enable=false
  8. replication.factor = 3 
  9. min.insync.replicas = 2
  10. replication.factor > min.insync.replicas
  11. enable.auto.commit=false
  12. 消息处理完成之后再提交位移

给出列表之后,我们从两个方面来探讨一下数据为什么会丢失:

1. Producer端

  目前比较新版本的Kafka正式替换了Scala版本的old producer,使用了由Java重写的producer。新版本的producer采用异步发送机制。KafkaProducer.send(ProducerRecord)方法仅仅是把这条消息放入一个缓存中(即RecordAccumulator,本质上使用了队列来缓存记录),同时后台的IO线程会不断扫描该缓存区,将满足条件的消息封装到某个batch中然后发送出去。显然,这个过程中就有一个数据丢失的窗口:若IO线程发送之前client端挂掉了,累积在accumulator中的数据的确有可能会丢失。

  Producer的另一个问题是消息的乱序问题。假设客户端代码依次执行下面的语句将两条消息发到相同的分区

producer.send(record1);
producer.send(record2);

如果此时由于某些原因(比如瞬时的网络抖动)导致record1没有成功发送,同时Kafka又配置了重试机制和max.in.flight.requests.per.connection大于1(默认值是5,本来就是大于1的),那么重试record1成功后,record1在分区中就在record2之后,从而造成消息的乱序。很多某些要求强顺序保证的场景是不允许出现这种情况的。

  鉴于producer的这两个问题,我们应该如何规避呢??对于消息丢失的问题,很容易想到的一个方案就是:既然异步发送有可能丢失数据, 我改成同步发送总可以吧?比如这样:

producer.send(record).get();

这样当然是可以的,但是性能会很差,不建议这样使用。因此特意总结了一份配置列表。个人认为该配置清单应该能够比较好地规避producer端数据丢失情况的发生:(特此说明一下,软件配置的很多决策都是trade-off,下面的配置也不例外:应用了这些配置,你可能会发现你的producer/consumer 吞吐量会下降,这是正常的,因为你换取了更高的数据安全性)

  • block.on.buffer.full = true  尽管该参数在0.9.0.0已经被标记为“deprecated”,但鉴于它的含义非常直观,所以这里还是显式设置它为true,使得producer将一直等待缓冲区直至其变为可用。否则如果producer生产速度过快耗尽了缓冲区,producer将抛出异常
  • acks=all  很好理解,所有follower都响应了才认为消息提交成功,即"committed"
  • retries = MAX 无限重试,直到你意识到出现了问题:)
  • max.in.flight.requests.per.connection = 1 限制客户端在单个连接上能够发送的未响应请求的个数。设置此值是1表示kafka broker在响应请求之前client不能再向同一个broker发送请求。注意:设置此参数是为了避免消息乱序
  • 使用KafkaProducer.send(record, callback)而不是send(record)方法   自定义回调逻辑处理消息发送失败
  • callback逻辑中最好显式关闭producer:close(0) 注意:设置此参数是为了避免消息乱序
  • unclean.leader.election.enable=false   关闭unclean leader选举,即不允许非ISR中的副本被选举为leader,以避免数据丢失
  • replication.factor >= 3   这个完全是个人建议了,参考了Hadoop及业界通用的三备份原则
  • min.insync.replicas > 1 消息至少要被写入到这么多副本才算成功,也是提升数据持久性的一个参数。与acks配合使用
  • 保证replication.factor > min.insync.replicas  如果两者相等,当一个副本挂掉了分区也就没法正常工作了。通常设置replication.factor = min.insync.replicas + 1即可

2. Consumer端

  consumer端丢失消息的情形比较简单:如果在消息处理完成前就提交了offset,那么就有可能造成数据的丢失。由于Kafka consumer默认是自动提交位移的,所以在后台提交位移前一定要保证消息被正常处理了,因此不建议采用很重的处理逻辑,如果处理耗时很长,则建议把逻辑放到另一个线程中去做。为了避免数据丢失,现给出两点建议:

  • enable.auto.commit=false  关闭自动提交位移
  • 在消息被完整处理之后再手动提交位移

18kafka 如何不消费重复数据?比如扣款不能重复进行

结合具体业务来思考,几个参考思路:

比如你拿个数据要写库,先根据主键查一下,如果这条数据已经存在就不执行insert执行update。比如是写Redis,就不必在意因为每次都是set,天然幂等性。如果不是上面两个场景,稍微复杂一点,假设需要让生产者发送每条数据的时候,里面加一个全局唯一的id,类似订单id之类的东西,然后你这里消费到了之后,先根据这个id 去比如Redis里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个idRedis。如果消费过了,那你就别处理了,保证别重复处理相同的消息即可。比如基于数据库的唯一键来保证重复数据不会重复插入多条。因为有唯一键约束了,重复数据插入只会报错,不会导致数据库中出现脏数据。

最后

以上就是甜蜜日记本为你收集整理的简述kafkaKafka无消息丢失配置 的全部内容,希望文章能够帮你解决简述kafkaKafka无消息丢失配置 所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(43)

评论列表共有 0 条评论

立即
投稿
返回
顶部