我是靠谱客的博主 自然唇彩,这篇文章主要介绍Kafka生产者同步或者异步发送消息(保证数据不丢失),现在分享给大家,希望可以做个参考。

参考文档:Usage — kafka-python 2.0.2-dev documentation

同步方式

示例代码:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from kafka import KafkaProducer from kafka.errors import kafka_errors if __name__ == '__main__': print('演示python的kafka API:生产者的同步发送模式') # 创建kafkaProducer对象 producer = KafkaProducer( bootstrap_servers=['node1:9092', 'node2:9093', 'node3:9094'], acks=-1 ) # 执行生产者同步方案 future = producer.send('test_kafka', '我爱python'.encode('utf-8')) # 默认是异步 try: record_metadata = future.get() # 同步发送模式,当发送失败(已经重试完成)后 # 如果发送成功 print(record_metadata.topic) print(record_metadata.partition) print(record_metadata.offset) except kafka_errors: print(kafka_errors) finally: producer.close()

运行结果:

异步方式

示例代码:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from kafka import KafkaProducer if __name__ == '__main__': print('演示python的kafka API:生产者的异步发送模式') # 1.创建kafkaProducer对象 producer = KafkaProducer( bootstrap_servers=['node1:9092', 'node2:9093', 'node3:9094'], acks=-1, linger_ms=5000 ) # 2.执行生产者异步方案 # 2.1 异步无返回值方案 # producer.send('test_kafka', '我爱python'.encode('utf-8')) # 默认是异步 # producer.flush() # 2.2 异步有返回值方案 future = producer.send('test_kafka', '我爱python'.encode('utf-8')) # 默认是异步 # 表示当底层每一批数据发送成功后,都会调用此函数:add_callback def on_send_success(record_metadata): print(record_metadata.topic) print(record_metadata.partition) print(record_metadata.offset) # 表示当底层发送一批数据失败时,就会调用此函数:add_errback def on_send_error(excp): print(excp) future.add_callback(on_send_success).add_errback(on_send_error) # 为了演示效果,不使用flush,而是采用休眠的方式 # 注意:2.2时,如果此时程序运行结束,数据是没法发送到Kafka的 import time time.sleep(100)

运行结果:

最后

以上就是自然唇彩最近收集整理的关于Kafka生产者同步或者异步发送消息(保证数据不丢失)的全部内容,更多相关Kafka生产者同步或者异步发送消息(保证数据不丢失)内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(68)

评论列表共有 0 条评论

立即
投稿
返回
顶部