我是靠谱客的博主 彪壮睫毛膏,这篇文章主要介绍详解PHP实现生产者与消费者(Kafka应用),现在分享给大家,希望可以做个参考。

本篇文章给大家介绍PHP实现生产者与消费者,希望对需要的朋友有所帮助!

前言

PHP中使用Kafka需要RdKafka扩展,而RdKafka依赖于librdkafka,所以这两个我们都需要安装,具体安装方法自行百度,本篇不做说明了。

生产者(测试)

创建消费者需要步骤:

  • 生产者配置参数
  • 创建生产者实例
  • 创建主题实例(依赖生产者)
  • 生产主题消息
  • 推送消息

具体代码如下:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
$conf = new RdKafkaConf(); // 绑定服务节点 $conf->set('metadata.broker.list', '127.0.0.1:32772'); // 创建生产者 $kafka = new RdKafkaProducer($conf); // 创建主题实例 $topic = $kafka->newTopic('p1r1'); // 生产主题数据,此时消息在缓冲区中,并没有真正被推送 $topic->produce(RD_KAFKA_PARTITION_UA, 0, 'Message'); // 阻塞时间(毫秒), 0为非阻塞 $kafka->poll(0); // 推送消息,如果不调用此函数,消息不会被发送且会丢失 $result = $kafka->flush(5000); if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) { throw new RuntimeException('Was unable to flush, messages might be lost!'); }
登录后复制

消费者

创建一个消费者需要几个步骤:

  • 消费者配置参数
  • 应用配置参数创建消费者实例
  • 订阅对应主题
  • 拉取数据
  • 提交位移

具体代码如下:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
$conf = new RdKafkaConf(); // 绑定消费者组 $conf->set('group.id', 'ceshi'); // 绑定服务节点,多个用,分隔 $conf->set('metadata.broker.list', '127.0.0.1:32787'); // 设置自动提交为false $conf->set('enable.auto.commit', 'false'); // 设置当前消费者拉取数据时的偏移量, 可选参数: // earliest: 如果消费者组是新创建的,从头开始消费,否则从消费者组当前消费位移开始。 // latest:如果消费者组是新创建的,从最新偏移量开始,否则从消费者组当前消费位移开始。 $conf->set('auto.offset.reset', 'earliest'); // 创建消费者实例 $consumer = new RdKafkaKafkaConsumer($conf); // 消费者订阅主题,数组形式 $consumer->subscribe(['topic1','topic2']); while (true) { // 消费数据,阻塞5秒(5秒内有数据就消费,没有数据等待5秒进入下一轮循环) $message = $consumer->consume(5000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: // 业务逻辑 var_dump($message); // 提交位移 $consumer->commit($message); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for moren"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed outn"; break; default: throw new Exception($message->errstr(), $message->err); break; } } // 关闭消费者(一般用在脚本中,不需要关闭) $conumser->close();
登录后复制

只消费指定分区中的数据:

复制代码
1
2
3
4
5
// 对消费者指定分区,注意此方式不能与subscribe一同使用 $consumer->assign([ new RdKafkaTopicPartition("topic", 0), new RdKafkaTopicPartition("topic", 1), ]);
登录后复制

以上就是详解PHP实现生产者与消费者(Kafka应用)的详细内容,更多请关注靠谱客其它相关文章!

最后

以上就是彪壮睫毛膏最近收集整理的关于详解PHP实现生产者与消费者(Kafka应用)的全部内容,更多相关详解PHP实现生产者与消费者(Kafka应用)内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(94)

评论列表共有 0 条评论

立即
投稿
返回
顶部