我是靠谱客的博主 无限舞蹈,最近开发中收集的这篇文章主要介绍kafka记录上次偏移量,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

#!/usr/bin/env python
# coding=utf-8
from kafka import *
from  kafka import KafkaConsumer
import datetime,time
import json
def get_kafka_reviews(bootstrap_servers,topics):
    # print type(self.bootstrap_servers)
    consumer = KafkaConsumer(bootstrap_servers=[bootstrap_servers], group_id='yyjk01', auto_offset_reset='latest', enable_auto_commit=True)
    consumer.subscribe(topics=(topics))  #订阅要消费的主题
    # print consumer.topics()
    # print "+++++++",consumer.position(TopicPartition(topic=u'ctripapi_duplicateddata_review', partition=1)) #获取当前主题的最新偏移量

    review_list =[]
    for message in consumer:
        print message
        # str_time = datetime.datetime.fromtimestamp(message.timestamp / 1000)
        # print message.timestamp
        # print type(message.timestamp)
        # #print message.topic ,message.timestamp,message.value
        #
        #
        # print message.topic, str_time, message.value
        # print type(message.value)
        # dict1 = json.loads(message.value)
        # print dict1
        # print type(dict1)
        # print '-------------------'
        # for key in dict1:
        #
        #     print str_time,key,dict1[key]
        # print '-------------------'
        # #print '====%s:%d:%d:key-%s value=%s=='%(message.topic,message.partition,message.offset,message.key,message.value)
        # #review_list.append(message.value)
    #return  review_list

print get_kafka_reviews('192.168.137.2:9092','test_topic')

最后

以上就是无限舞蹈为你收集整理的kafka记录上次偏移量的全部内容,希望文章能够帮你解决kafka记录上次偏移量所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(52)

评论列表共有 0 条评论

立即
投稿
返回
顶部