我是靠谱客的博主 自然唇彩,最近开发中收集的这篇文章主要介绍Kafka生产者同步或者异步发送消息(保证数据不丢失),觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

参考文档:Usage — kafka-python 2.0.2-dev documentation

同步方式

示例代码:

from kafka import KafkaProducer
from kafka.errors import kafka_errors


if __name__ == '__main__':
    print('演示python的kafka API:生产者的同步发送模式')

    # 创建kafkaProducer对象
    producer = KafkaProducer(
        bootstrap_servers=['node1:9092', 'node2:9093', 'node3:9094'],
        acks=-1
    )

    # 执行生产者同步方案
    future = producer.send('test_kafka', '我爱python'.encode('utf-8'))  # 默认是异步

    try:
        record_metadata = future.get()  # 同步发送模式,当发送失败(已经重试完成)后
        # 如果发送成功
        print(record_metadata.topic)
        print(record_metadata.partition)
        print(record_metadata.offset)
    except kafka_errors:
        print(kafka_errors)
    finally:
        producer.close()

运行结果:

异步方式

示例代码:

from kafka import KafkaProducer


if __name__ == '__main__':
    print('演示python的kafka API:生产者的异步发送模式')

    # 1.创建kafkaProducer对象
    producer = KafkaProducer(
        bootstrap_servers=['node1:9092', 'node2:9093', 'node3:9094'],
        acks=-1,
        linger_ms=5000
    )

    # 2.执行生产者异步方案
    # 2.1 异步无返回值方案
    # producer.send('test_kafka', '我爱python'.encode('utf-8'))  # 默认是异步
    # producer.flush()

    # 2.2 异步有返回值方案
    future = producer.send('test_kafka', '我爱python'.encode('utf-8'))  # 默认是异步

    # 表示当底层每一批数据发送成功后,都会调用此函数:add_callback
    def on_send_success(record_metadata):
        print(record_metadata.topic)
        print(record_metadata.partition)
        print(record_metadata.offset)

    # 表示当底层发送一批数据失败时,就会调用此函数:add_errback
    def on_send_error(excp):
        print(excp)

    future.add_callback(on_send_success).add_errback(on_send_error)

    # 为了演示效果,不使用flush,而是采用休眠的方式
    # 注意:2.2时,如果此时程序运行结束,数据是没法发送到Kafka的
    import time
    time.sleep(100)

运行结果:

最后

以上就是自然唇彩为你收集整理的Kafka生产者同步或者异步发送消息(保证数据不丢失)的全部内容,希望文章能够帮你解决Kafka生产者同步或者异步发送消息(保证数据不丢失)所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(77)

评论列表共有 0 条评论

立即
投稿
返回
顶部