我是靠谱客的博主 迅速枕头,这篇文章主要介绍kafka offset判断,现在分享给大家,希望可以做个参考。

在使用Spark streaming读取kafka数据时,为了避免数据丢失,我们会在zookeeper中保存kafka的topic对应的partition的offset信息(每次执行成功后,才更新zk中的offset信息);从而保证执行失败的下一轮,可以从特定的offset开始读。

实现方式类似下面文章所示:

http://blog.csdn.net/rongyongfeikai2/article/details/49784785

但,kafka的topic是可能会被删除的,而更糟糕的情况是,用户又新建了一个相同名字的topic。这是,zk中保存的offset信息会已经不再准确了,此时就需要与kafka的broker保存的offset信息进行比对,从而把zk中的offset信息修正成功。

实现方式如下:

1.用一个类来保存特定topic的leader信息,以及partition的offset信息

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import java.io.Serializable; import java.util.HashMap; /** * @function:kafka记录类 */ public class KafkaTopicOffset implements Serializable{ private String topicName; private HashMap<Integer,Long> offsetList; private HashMap<Integer,String> leaderList; public KafkaTopicOffset(String topicName){ this.topicName = topicName; this.offsetList = new HashMap<Integer,Long>(); this.leaderList = new HashMap<Integer, String>(); } public String getTopicName() { return topicName; } public HashMap<Integer, Long> getOffsetList() { return offsetList; } public void setTopicName(String topicName) { this.topicName = topicName; } public void setOffsetList(HashMap<Integer, Long> offsetList) { this.offsetList = offsetList; } public HashMap<Integer, String> getLeaderList() { return leaderList; } public void setLeaderList(HashMap<Integer, String> leaderList) { this.leaderList = leaderList; } public String toString(){ return "topic:"+topicName+",offsetList:"+this.offsetList+",leaderList:"+this.leaderList; } }


2.从kafka的broker中得到topic-partition的offset信息(主要是利用SimpleConsumer发送相应的Request)

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import java.io.Serializable; import java.util.*; import com.nsfocus.bsaips.common.Constant; import com.nsfocus.bsaips.model.KafkaTopicOffset; import kafka.javaapi.OffsetResponse; import kafka.api.PartitionOffsetRequestInfo; import kafka.common.TopicAndPartition; import kafka.javaapi.TopicMetadataRequest; import kafka.javaapi.consumer.SimpleConsumer; import kafka.javaapi.TopicMetadata; import kafka.javaapi.PartitionMetadata; /** * @function:kafka相关工具类 */ public class KafkaUtil implements Serializable { private static KafkaUtil kafkaUtil = null; private KafkaUtil(){} public static KafkaUtil getInstance(){ if(kafkaUtil == null){ kafkaUtil = new KafkaUtil(); } return kafkaUtil; } private String[] getIpsFromBrokerList(String brokerlist){ StringBuilder sb = new StringBuilder(); String[] brokers = brokerlist.split(","); for(int i=0;i<brokers.length;i++){ brokers[i] = brokers[i].split(":")[0]; } return brokers; } private Map<String,Integer> getPortFromBrokerList(String brokerlist){ Map<String,Integer> map = new HashMap<String,Integer>(); String[] brokers = brokerlist.split(","); for(String item:brokers){ String[] itemArr = item.split(":"); if(itemArr.length > 1){ map.put(itemArr[0],Integer.parseInt(itemArr[1])); } } return map; } public KafkaTopicOffset topicMetadataRequest(String brokerlist,String topic){ List<String> topics = Collections.singletonList(topic); TopicMetadataRequest topicMetadataRequest = new TopicMetadataRequest(topics); KafkaTopicOffset kafkaTopicOffset = new KafkaTopicOffset(topic); String[] seeds = getIpsFromBrokerList(brokerlist); Map<String,Integer> portMap = getPortFromBrokerList(brokerlist); for(int i=0;i<seeds.length;i++){ SimpleConsumer consumer = null; try{ consumer = new SimpleConsumer(seeds[i], portMap.get(seeds[i]), Constant.TIMEOUT, Constant.BUFFERSIZE, Constant.groupId); kafka.javaapi.TopicMetadataResponse resp = consumer.send(topicMetadataRequest); List<TopicMetadata> metaData = resp.topicsMetadata(); for (TopicMetadata item : metaData) { for (PartitionMetadata part : item.partitionsMetadata()) { kafkaTopicOffset.getLeaderList().put(part.partitionId(),part.leader().host()); kafkaTopicOffset.getOffsetList().put(part.partitionId(),0L); } } }catch(Exception ex){ ex.printStackTrace(); }finally{ if(consumer != null){ consumer.close(); } } } return kafkaTopicOffset; } public KafkaTopicOffset getLastOffsetByTopic(String brokerlist,String topic){ KafkaTopicOffset kafkaTopicOffset = topicMetadataRequest(brokerlist, topic); String[] seeds = getIpsFromBrokerList(brokerlist); Map<String,Integer> portMap = getPortFromBrokerList(brokerlist); for(int i=0;i<seeds.length;i++){ SimpleConsumer consumer = null; Iterator iterator = kafkaTopicOffset.getOffsetList().entrySet().iterator(); try{ consumer = new SimpleConsumer(seeds[i], portMap.get(seeds[i]), Constant.TIMEOUT, Constant.BUFFERSIZE, Constant.groupId); while(iterator.hasNext()){ Map.Entry<Integer,Long> entry = (Map.Entry<Integer, Long>) iterator.next(); int partitonId = entry.getKey(); if(!kafkaTopicOffset.getLeaderList().get(partitonId).equals(seeds[i])){ continue; } TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitonId); Map<TopicAndPartition,PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(),1) ); kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest( requestInfo, kafka.api.OffsetRequest.CurrentVersion(), Constant.groupId); OffsetResponse response = consumer.getOffsetsBefore(request); long[] offsets = response.offsets(topic,partitonId); if(offsets.length > 0){ kafkaTopicOffset.getOffsetList().put(partitonId,offsets[0]); } } }catch(Exception ex){ ex.printStackTrace(); }finally{ if(consumer != null){ consumer.close(); } } } return kafkaTopicOffset; } public Map<String,KafkaTopicOffset> getKafkaOffsetByTopicList(String brokerList,List<String> topics){ Map<String,KafkaTopicOffset> map = new HashMap<String,KafkaTopicOffset>(); for(int i=0;i<topics.size();i++){ map.put(topics.get(i),getLastOffsetByTopic(brokerList, topics.get(i))); } return map; } public static void main(String[] args){ try{ System.out.println(KafkaUtil.getInstance().getKafkaOffsetByTopicList( ConfigUtil.getInstance().getKafkaConf().get("brokerlist"), Arrays.asList(new String[]{"pj_test_tmp","test"}))); }catch(Exception ex) { ex.printStackTrace(); } } }

3.再在KafkaCluster从zk中得到offset信息时,与从broker得到的offset信息中比对(假定调用KafkaUtil的getKafkaOffsetByTopicList得到的返回值放在了offsetMap中):



经过讨论,已经知道early offset是最新的起始offset的值,而last offset则是最新的终止offset的值,所以应对过期的情况,应该是从最新的起始offset开始消费。所以应该发送的是EarliestOffsetRequest而非LastOffsetRequest。修改后的代码如下:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
package com.nsfocus.bsaips.util; import java.io.Serializable; import java.util.*; import com.nsfocus.bsaips.common.Constant; import com.nsfocus.bsaips.model.KafkaTopicOffset; import kafka.javaapi.OffsetResponse; import kafka.api.PartitionOffsetRequestInfo; import kafka.common.TopicAndPartition; import kafka.javaapi.TopicMetadataRequest; import kafka.javaapi.consumer.SimpleConsumer; import kafka.javaapi.TopicMetadata; import kafka.javaapi.PartitionMetadata; /** * @function:kafka相关工具类 */ public class KafkaUtil implements Serializable { private static KafkaUtil kafkaUtil = null; private KafkaUtil(){} public static KafkaUtil getInstance(){ if(kafkaUtil == null){ kafkaUtil = new KafkaUtil(); } return kafkaUtil; } private String[] getIpsFromBrokerList(String brokerlist){ StringBuilder sb = new StringBuilder(); String[] brokers = brokerlist.split(","); for(int i=0;i<brokers.length;i++){ brokers[i] = brokers[i].split(":")[0]; } return brokers; } private Map<String,Integer> getPortFromBrokerList(String brokerlist){ Map<String,Integer> map = new HashMap<String,Integer>(); String[] brokers = brokerlist.split(","); for(String item:brokers){ String[] itemArr = item.split(":"); if(itemArr.length > 1){ map.put(itemArr[0],Integer.parseInt(itemArr[1])); } } return map; } public KafkaTopicOffset topicMetadataRequest(String brokerlist,String topic){ List<String> topics = Collections.singletonList(topic); TopicMetadataRequest topicMetadataRequest = new TopicMetadataRequest(topics); KafkaTopicOffset kafkaTopicOffset = new KafkaTopicOffset(topic); String[] seeds = getIpsFromBrokerList(brokerlist); Map<String,Integer> portMap = getPortFromBrokerList(brokerlist); for(int i=0;i<seeds.length;i++){ SimpleConsumer consumer = null; try{ consumer = new SimpleConsumer(seeds[i], portMap.get(seeds[i]), Constant.TIMEOUT, Constant.BUFFERSIZE, Constant.groupId); kafka.javaapi.TopicMetadataResponse resp = consumer.send(topicMetadataRequest); List<TopicMetadata> metaData = resp.topicsMetadata(); for (TopicMetadata item : metaData) { for (PartitionMetadata part : item.partitionsMetadata()) { kafkaTopicOffset.getLeaderList().put(part.partitionId(),part.leader().host()); kafkaTopicOffset.getOffsetList().put(part.partitionId(),0L); } } }catch(Exception ex){ ex.printStackTrace(); }finally{ if(consumer != null){ consumer.close(); } } } return kafkaTopicOffset; } public KafkaTopicOffset getLastOffsetByTopic(String brokerlist,String topic){ KafkaTopicOffset kafkaTopicOffset = topicMetadataRequest(brokerlist, topic); String[] seeds = getIpsFromBrokerList(brokerlist); Map<String,Integer> portMap = getPortFromBrokerList(brokerlist); for(int i=0;i<seeds.length;i++){ SimpleConsumer consumer = null; Iterator iterator = kafkaTopicOffset.getOffsetList().entrySet().iterator(); try{ consumer = new SimpleConsumer(seeds[i], portMap.get(seeds[i]), Constant.TIMEOUT, Constant.BUFFERSIZE, Constant.groupId); while(iterator.hasNext()){ Map.Entry<Integer,Long> entry = (Map.Entry<Integer, Long>) iterator.next(); int partitonId = entry.getKey(); if(!kafkaTopicOffset.getLeaderList().get(partitonId).equals(seeds[i])){ continue; } TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitonId); Map<TopicAndPartition,PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(),1) ); kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest( requestInfo, kafka.api.OffsetRequest.CurrentVersion(), Constant.groupId); OffsetResponse response = consumer.getOffsetsBefore(request); long[] offsets = response.offsets(topic,partitonId); if(offsets.length > 0){ kafkaTopicOffset.getOffsetList().put(partitonId,offsets[0]); } } }catch(Exception ex){ ex.printStackTrace(); }finally{ if(consumer != null){ consumer.close(); } } } return kafkaTopicOffset; } public KafkaTopicOffset getEarlyOffsetByTopic(String brokerlist,String topic){ KafkaTopicOffset kafkaTopicOffset = topicMetadataRequest(brokerlist, topic); String[] seeds = getIpsFromBrokerList(brokerlist); Map<String,Integer> portMap = getPortFromBrokerList(brokerlist); for(int i=0;i<seeds.length;i++){ SimpleConsumer consumer = null; Iterator iterator = kafkaTopicOffset.getOffsetList().entrySet().iterator(); try{ consumer = new SimpleConsumer(seeds[i], portMap.get(seeds[i]), Constant.TIMEOUT, Constant.BUFFERSIZE, Constant.groupId); while(iterator.hasNext()){ Map.Entry<Integer,Long> entry = (Map.Entry<Integer, Long>) iterator.next(); int partitonId = entry.getKey(); if(!kafkaTopicOffset.getLeaderList().get(partitonId).equals(seeds[i])){ continue; } TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitonId); Map<TopicAndPartition,PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.EarliestTime(),1) ); kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest( requestInfo, kafka.api.OffsetRequest.CurrentVersion(), Constant.groupId); OffsetResponse response = consumer.getOffsetsBefore(request); long[] offsets = response.offsets(topic,partitonId); if(offsets.length > 0){ kafkaTopicOffset.getOffsetList().put(partitonId,offsets[0]); } } }catch(Exception ex){ ex.printStackTrace(); }finally{ if(consumer != null){ consumer.close(); } } } return kafkaTopicOffset; } public Map<String,KafkaTopicOffset> getKafkaOffsetByTopicList(String brokerList,List<String> topics){ Map<String,KafkaTopicOffset> map = new HashMap<String,KafkaTopicOffset>(); for(int i=0;i<topics.size();i++){ map.put(topics.get(i),getLastOffsetByTopic(brokerList, topics.get(i))); } return map; } public Map<String,KafkaTopicOffset> getKafkaEarlyOffsetByTopicList(String brokerList,List<String> topics){ Map<String,KafkaTopicOffset> map = new HashMap<String,KafkaTopicOffset>(); for(int i=0;i<topics.size();i++){ map.put(topics.get(i),getEarlyOffsetByTopic(brokerList, topics.get(i))); } return map; } public static void main(String[] args){ try{ // System.out.println(KafkaUtil.getInstance().topicMetadataRequest( // ConfigUtil.getInstance().getKafkaConf().get("brokerlist"), // "pj_test_tmp")); // System.out.println(KafkaUtil.getInstance().getLastOffsetByTopic( // ConfigUtil.getInstance().getKafkaConf().get("brokerlist"), // "bsaips")); // System.out.println(KafkaUtil.getInstance().getKafkaOffsetByTopicList( // ConfigUtil.getInstance().getKafkaConf().get("brokerlist"), // Arrays.asList(new String[]{"bsa_sys_tmp"}))); //last offset是最新的终止offset的值,early offset是最新的起始offset的值 System.out.println(KafkaUtil.getInstance().getKafkaEarlyOffsetByTopicList( ConfigUtil.getInstance().getKafkaConf().get("brokerlist"), Arrays.asList(new String[]{"bsa_sys_tmp"}))); }catch(Exception ex) { ex.printStackTrace(); } } }

判断逻辑为:




最后

以上就是迅速枕头最近收集整理的关于kafka offset判断的全部内容,更多相关kafka内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(58)

评论列表共有 0 条评论

立即
投稿
返回
顶部