概述
1.历史:
2004年,摩根大通和iMatrix开始着手Advanced Message Queuing Protocol (AMQP)开放标准的开发。2006年,AMQP规范发布。2007年,Rabbit技术公司基于AMQP标准开发的RabbitMQ 1.0 发布
AMQP是规范,rabbitmq是具体的一种实现技术
2.AMQP模型
通信模型: 1.建立一个连接Connection.
2.这个连接将产生一个信道,该信道可以被生产者与消费者同时使用,是实际传输数据的一个流。这里大概是半双工的。
3.Producer发送消息
4.根据Routing_key,进行路由转发
5.监听在某一queue上的consumer,一旦有可用消息,queue就将消息发送给consumer
6.Consumer需要发送一条ack给队列表示消息已经收到
具体代码.
注意,Publisher,Consumer以及amqp服务器的代码可以分别运行于三台物理主机上!
代码可以如下:
消息接收方的代码:
(ip:192.168.8.2)
#/usr/bin/python
from kombu import Queue,Exchange
from kombu.messaging import Consumer
from kombu.connection import Connection
def process_data(body,msg):
print body
msg.ack()
try:
connection=Connection('amqp://root:123456@192.168.8.108:5672//')
except Exception:
raise
channel=connection.channel()
_exchange = Exchange('media','direct',channel)
video_queue = Queue('video',exchange=_exchange,routing_key='video',channel=channel)
consumer=Consumer(channel,queues=[video_queue],callbacks=[process_data])
consumer.consume()
while True:
connection.drain_events(timeout=15)
consumer.cancel()
以及:
发送方的代码:
(ip:192.168.8.3)
from kombu.entity import Exchange
from kombu.messaging import Producer
from kombu.connection import Connection
from time import sleep
connection= Connection('amqp://root:123456@192.168.8.108:5672//')
channel=connection.channel()
_exchange=Exchange('media','direct',channel)
producer=Producer(channel,exchange=_exchange,routing_key='video')
for i in range(10):
producer.publish({'name':'fuckyou.avi','size':13131})
print 'published!'
sleep(1)
可以看出,都是现在本地把相关的信息配置好了之后,再向服务器发送请求。对于发送端来说,producer.publish()方法实际上是将本地申请的exchange以及要发送的信息发送到服务端,服务端的amqp server接收到请求后,会在服务器端创建一个名字为'media'的exchange.
可以利用以下命令来查看:
rabbitmqctl list_exchanges
对于来说, 然后这个时候应该会看到,名字为'media',类型为direct的exchange已经在rabbitmq-server上创建了
同理,当使用consumer.consume()之后,也将consumer中注册的回调函数传入到本地的connection对应的channel中了 然后等本地的connection.drain_events()之后,会将以consumer_id号注册的callback取出并调用
3.关于queue的疑问
1.fanout是无需路由键的
2.direct exchange
可能会出现组播的情况??
4.多重绑定的问题
今天遇到了个多重绑定的问题
两个不同的实例接收消息,相同的队列、相同的交换机、相同的routingkey,
但是数据是交替接收的,不能两个实例同时接收同一条消息
想要解决其实很容易,只需要没两个实例所绑定的队列不同即可,最简单的就是以不同的名字申明所绑定的队列
最后
以上就是哭泣舞蹈为你收集整理的OpenStack AMQP与kombu的全部内容,希望文章能够帮你解决OpenStack AMQP与kombu所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复