1、创建pv
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52[root@k8s-node1 kafka]# cat kafka-pv.yaml kind: PersistentVolume apiVersion: v1 metadata: name: pv-kafka1 annotations: volume.beta.kubernetes.io/storage-class: "anything" labels: type: local spec: capacity: storage: 300Mi accessModes: - ReadWriteOnce hostPath: path: "/data/kafka1" persistentVolumeReclaimPolicy: Recycle --- kind: PersistentVolume apiVersion: v1 metadata: name: pv-kafka2 annotations: volume.beta.kubernetes.io/storage-class: "anything" labels: type: local spec: capacity: storage: 300Mi accessModes: - ReadWriteOnce hostPath: path: "/data/kafka2" persistentVolumeReclaimPolicy: Recycle --- kind: PersistentVolume apiVersion: v1 metadata: name: pv-kafka3 annotations: volume.beta.kubernetes.io/storage-class: "anything" labels: type: local spec: capacity: storage: 300Mi accessModes: - ReadWriteOnce hostPath: path: "/data/kafka3" persistentVolumeReclaimPolicy: Recycle
2、查看pv
复制代码
1
2
3
4
5
6[root@k8s-node1 kafka]# kubectl get pv NAME CAPACITY ACCESS MODES RECLAIM POLICY STATUS CLAIM STORAGECLASS REASON AGE pv-kafka1 300Mi RWO Recycle Bound default/datadir-kafka-0 anything 18h pv-kafka2 300Mi RWO Recycle Bound default/datadir-kafka-1 anything 18h pv-kafka3 300Mi RWO Recycle Bound default/datadir-kafka-2 anything 18h
3、创建kafka StatefulSet
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171[root@k8s-node1 kafka]# cat kafka-sts.yaml apiVersion: v1 kind: Service metadata: name: kafka-hs labels: app: kafka spec: ports: - port: 9093 name: server clusterIP: None selector: app: kafka --- apiVersion: policy/v1beta1 kind: PodDisruptionBudget metadata: name: kafka-pdb spec: selector: matchLabels: app: kafka maxUnavailable: 1 --- apiVersion: apps/v1beta1 kind: StatefulSet metadata: name: kafka spec: serviceName: kafka-hs replicas: 3 podManagementPolicy: Parallel updateStrategy: type: RollingUpdate template: metadata: labels: app: kafka spec: terminationGracePeriodSeconds: 300 containers: - name: k8skafka imagePullPolicy: IfNotPresent image: mirrorgooglecontainers/kubernetes-kafka:1.0-10.2.1 resources: requests: memory: "256Mi" cpu: "0.1" ports: - containerPort: 9093 name: server command: - sh - -c - "exec kafka-server-start.sh /opt/kafka/config/server.properties --override broker.id=${HOSTNAME##*-} --override listeners=PLAINTEXT://:9093 --override zookeeper.connect=zk-cs.default.svc.cluster.local.:2181 #修改zookeeper连接地址 --override log.dir=/var/lib/kafka --override auto.create.topics.enable=true --override auto.leader.rebalance.enable=true --override background.threads=10 --override compression.type=producer --override delete.topic.enable=false --override leader.imbalance.check.interval.seconds=300 --override leader.imbalance.per.broker.percentage=10 --override log.flush.interval.messages=9223372036854775807 --override log.flush.offset.checkpoint.interval.ms=60000 --override log.flush.scheduler.interval.ms=9223372036854775807 --override log.retention.bytes=-1 --override log.retention.hours=168 --override log.roll.hours=168 --override log.roll.jitter.hours=0 --override log.segment.bytes=1073741824 --override log.segment.delete.delay.ms=60000 --override message.max.bytes=1000012 --override min.insync.replicas=1 --override num.io.threads=8 --override num.network.threads=3 --override num.recovery.threads.per.data.dir=1 --override num.replica.fetchers=1 --override offset.metadata.max.bytes=4096 --override offsets.commit.required.acks=-1 --override offsets.commit.timeout.ms=5000 --override offsets.load.buffer.size=5242880 --override offsets.retention.check.interval.ms=600000 --override offsets.retention.minutes=1440 --override offsets.topic.compression.codec=0 --override offsets.topic.num.partitions=50 --override offsets.topic.replication.factor=3 --override offsets.topic.segment.bytes=104857600 --override queued.max.requests=500 --override quota.consumer.default=9223372036854775807 --override quota.producer.default=9223372036854775807 --override replica.fetch.min.bytes=1 --override replica.fetch.wait.max.ms=500 --override replica.high.watermark.checkpoint.interval.ms=5000 --override replica.lag.time.max.ms=10000 --override replica.socket.receive.buffer.bytes=65536 --override replica.socket.timeout.ms=30000 --override request.timeout.ms=30000 --override socket.receive.buffer.bytes=102400 --override socket.request.max.bytes=104857600 --override socket.send.buffer.bytes=102400 --override unclean.leader.election.enable=true --override zookeeper.session.timeout.ms=6000 --override zookeeper.set.acl=false --override broker.id.generation.enable=true --override connections.max.idle.ms=600000 --override controlled.shutdown.enable=true --override controlled.shutdown.max.retries=3 --override controlled.shutdown.retry.backoff.ms=5000 --override controller.socket.timeout.ms=30000 --override default.replication.factor=1 --override fetch.purgatory.purge.interval.requests=1000 --override group.max.session.timeout.ms=300000 --override group.min.session.timeout.ms=6000 --override inter.broker.protocol.version=0.10.2-IV0 --override log.cleaner.backoff.ms=15000 --override log.cleaner.dedupe.buffer.size=134217728 --override log.cleaner.delete.retention.ms=86400000 --override log.cleaner.enable=true --override log.cleaner.io.buffer.load.factor=0.9 --override log.cleaner.io.buffer.size=524288 --override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 --override log.cleaner.min.cleanable.ratio=0.5 --override log.cleaner.min.compaction.lag.ms=0 --override log.cleaner.threads=1 --override log.cleanup.policy=delete --override log.index.interval.bytes=4096 --override log.index.size.max.bytes=10485760 --override log.message.timestamp.difference.max.ms=9223372036854775807 --override log.message.timestamp.type=CreateTime --override log.preallocate=false --override log.retention.check.interval.ms=300000 --override max.connections.per.ip=2147483647 --override num.partitions=3 --override producer.purgatory.purge.interval.requests=1000 --override replica.fetch.backoff.ms=1000 --override replica.fetch.max.bytes=1048576 --override replica.fetch.response.max.bytes=10485760 --override reserved.broker.max.id=1000 " env: - name: KAFKA_HEAP_OPTS value : "-Xmx256M -Xms256M" - name: KAFKA_OPTS value: "-Dlogging.level=INFO" volumeMounts: - name: datadir mountPath: /var/lib/kafka readinessProbe: exec: command: - sh - -c - "/opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server=localhost:9093" securityContext: runAsUser: 1000 fsGroup: 1000 volumeClaimTemplates: - metadata: name: datadir annotations: volume.beta.kubernetes.io/storage-class: "anything" spec: accessModes: [ "ReadWriteOnce" ] resources: requests: storage: 300Mi
注意:需要修改以下地方:
复制代码
1
2
3--override zookeeper.connect=zk-cs.bigdata.svc.cluster.local:2181 #zookeeper地址 image: mirrorgooglecontainers/kubernetes-kafka:1.0-10.2.1 #镜像地址
4、查看pod运行状态
复制代码
1
2
3
4
5
6[root@k8s-node1 kafka]# kubectl get pod NAME READY STATUS RESTARTS AGE kafka-0 1/1 Running 0 18h kafka-1 1/1 Running 0 18h kafka-2 1/1 Running 0 18h
5、通过zookeeper查看broker:
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51[root@k8s-node1 kafka]# kubectl exec -ti zk-1 bash zookeeper@zk-1:/$ zkCli.sh Connecting to localhost:2181 [zk: localhost:2181(CONNECTED) 0] ls / [cluster, controller, controller_epoch, brokers, zookeeper, admin, isr_change_notification, consumers, hello, config] [zk: localhost:2181(CONNECTED) 1] ls /brokers [ids, topics, seqid] [zk: localhost:2181(CONNECTED) 2] ls /brokers/ids [0, 1, 2] [zk: localhost:2181(CONNECTED) 3] [zk: localhost:2181(CONNECTED) 3] get /brokers/ids/0 {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafka-0.kafka-hs.default.svc.cluster.local:9093"],"jmx_port":-1,"host":"kafka-0.kafka-hs.default.svc.cluster.local","timestamp":"1573619460916","port":9093,"version":4} cZxid = 0x10000004b ctime = Wed Nov 13 04:31:00 UTC 2019 mZxid = 0x10000004b mtime = Wed Nov 13 04:31:00 UTC 2019 pZxid = 0x10000004b cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x16e628b4c0a0006 dataLength = 254 numChildren = 0 [zk: localhost:2181(CONNECTED) 4] get /brokers/ids/1 {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafka-1.kafka-hs.default.svc.cluster.local:9093"],"jmx_port":-1,"host":"kafka-1.kafka-hs.default.svc.cluster.local","timestamp":"1573619460909","port":9093,"version":4} cZxid = 0x100000048 ctime = Wed Nov 13 04:31:00 UTC 2019 mZxid = 0x100000048 mtime = Wed Nov 13 04:31:00 UTC 2019 pZxid = 0x100000048 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x36e628b4c540004 dataLength = 254 numChildren = 0 [zk: localhost:2181(CONNECTED) 5] get /brokers/ids/2 {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafka-2.kafka-hs.default.svc.cluster.local:9093"],"jmx_port":-1,"host":"kafka-2.kafka-hs.default.svc.cluster.local","timestamp":"1573619460932","port":9093,"version":4} cZxid = 0x10000004e ctime = Wed Nov 13 04:31:00 UTC 2019 mZxid = 0x10000004e mtime = Wed Nov 13 04:31:00 UTC 2019 pZxid = 0x10000004e cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x26e628b922c0005 dataLength = 254 numChildren = 0 [zk: localhost:2181(CONNECTED) 6]
6、kafka基本操作测试
复制代码
1
2
3
4
5
6
7
8
9
10
11[root@k8s-master sts]# kubectl exec -it kafka-0 sh $ pwd /opt/kafka/bin #创建test topic $ ./kafka-topics.sh --create --topic test --zookeeper zk-cs.default.svc.cluster.local:2181 --partitions 3 --replication-factor 3 Created topic "test". #查看topic $ ./kafka-topics.sh --zookeeper zk-cs.default.svc.cluster.local:2181 --list test $
最后
以上就是欣慰汉堡最近收集整理的关于在Kubernetes集群上搭建Stateful kafka集群的全部内容,更多相关在Kubernetes集群上搭建Stateful内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复