- PhpAmqpLib 下载地址 https://github.com/php-amqplib/php-amqplib
- 包内的PhpAmqpLib文件夹放入到 根目录的 extend目录下
- 在控制器中引入就可以使用了
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127<?php namespace appindexcontroller; use PhpAmqpLibConnectionAMQPStreamConnection; use PhpAmqpLibMessageAMQPMessage; use thinkLog; class Index { const consumerTag = 'consumer'; // 消费者标签 const exchange = 'router'; // 交换机名 const queue = 'msgs';// 的队列名 /** * 推入消息到队列中 */ public static function pushMessage() { $data = [ "test" => 32132132 ]; // 连接rabbitMQ $connection = new AMQPStreamConnection('localhost', 5672, 'dome', 'dome', '/'); // 开启一个信道 $channel = $connection->channel(); // 声明一个队列 // queue 队列名 // passive 检测队列是否存在 true 只检测不创建 false 创建 // durable 是否持久化队列 true 为持久化 // exclusive 私有队列 不允许其它用户访问 设置true 将会变成私有 // auto_delete 当所有消费客户端连接断开后,是否自动删除队列 $channel->queue_declare(self::queue, false, true, false, false); // exchange 交换机名称 // type 交换器类型 // passive 检测交换机是否存在 true 只检测不创建 false 创建 // durable 是否持久化队列 true 为持久化 // auto_delete 当所有绑定队列都不在使用时,是否自动删除交换器 true:删除false:不删除 $channel->exchange_declare(self::exchange, 'direct', false, true, false); // 绑定队列和交换机 $channel->queue_bind(self::queue, self::exchange); // 写入队列的消息 $messageBody = json_encode($data) ; // 消息内容 // delivery_mode 投递模式 delivery mode 设置为 2标记持久化 $message = new AMQPMessage($messageBody, array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)); // $message 消息内容 // $exchange 交换器名称 // routing_key 路由键 (routing key) 主题交换机会用到 $channel->basic_publish($message, self::exchange,''); // 关闭信道 $channel->close(); //关闭 amqp 连接 $connection->close(); return "ok"; } function shutdown($channel, $connection) { $channel->close(); $connection->close(); Log::info("closed",3); } function process_message($message) { if ($message->body !== 'quit') { $obj = json_decode($message->body); if (!isset($obj->id)) { echo 'error datan'; // 消费成功会在 日志里面写入一条数据 Log::info("error data111111111111111:" . $message->body, 2); } else { try { Log::info("data:" . json_encode($message)); } catch (ThinkException $e) { Log::info($e->getMessage(), 2); Log::info(json_encode($message), 2); } catch (PDOException $pe) { Log::info($pe->getMessage(), 2); Log::info (json_encode($message), 2); } } } $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']); // Send a message with the string "quit" to cancel the consumer. if ($message->body === 'quit') { $message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']); } } /** * 启动 * * @return thinkResponse */ public function start() { $connection = new AMQPStreamConnection('localhost', 5672, 'dome', 'dome', '/'); $channel = $connection->channel(); $channel->queue_declare(self::queue, false, true, false, false); $channel->exchange_declare(self::exchange, 'direct', false, true, false); $channel->queue_bind(self::queue, self::exchange); // queue 队列名称 // consumer_tag 消费者标签 // no_ack 在设置了 no_ack=false 的情况下)只要 consumer 手动应答了 Basic.Ack ,就算其“成功”处理了 // no_ack=true (此时为自动应答) // exclusive 是否是私有队列 设置true 将会变成私有 // callback = null, 回调函数 $channel->basic_consume(self::queue, self::consumerTag, false, false, false, false, array($this, 'process_message')); // 不管你的php代码执行是否成功,最后都会执行 shutdown方法,关闭信道和连接 register_shutdown_function(array($this, 'shutdown'), $channel, $connection); while (count($channel->callbacks)) { $channel->wait(); } Log::info ("starting",3); } }
执行命令,就可以监听消费队列了
复制代码
1php index.php index/Index/start &
最后
以上就是岁月静好最近收集整理的关于RabbitMQ在Tp5.0 中使用PhpAmqpLib的全部内容,更多相关RabbitMQ在Tp5.0内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复