概述
RABBITMQ服务
一、RATTITMQ的概念
RabbitMQ 是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。
AMQP是一个定义了在应用或者组织之间传送消息的协议的开放标准 (an open standard for passing business messages between applications or organizations)。AMQP目标在于解决在两个应用之间传送消息存在的下列问题:
·网络是不可靠的 =>消息需要保存后再转发并有出错处理机制
·与本地调用相比,网络速度慢 =>得异步调用
·应用之间是不同的(比如不同语言实现、不同操作系统等) =>得与应用无关
·应用会经常变化 =>同上
AMQP 使用异步的、应用对应用的、二进制数据通信来解决这些问题。
RabbitMQ是 AMQP的一种实现,它包括Server(服务器端)、Client(客户端) 和Plugins(插件)。RabbitMQ服务器是用Erlang语言编写的。
1.1 RabbitMQ 的概念非常清晰、简洁
其基本概念参见下图:
简单说明如下:
·Message (消息):RabbitMQ转发的二进制对象,包括Headers(头)、Properties(属性)和Data(数据),其中数据部分不是必要的。具体见1.2部分的描述。
·Producer(生产者): 消息的生产者,负责产生消息并把消息发到交换机 Exhange的应用。
·Consumer (消费者):使用队列 Queue从Exchange中获取消息的应用。
·Exchange (交换机):负责接收生产者的消息并把它转到到合适的队列 Queue。下面有1.3部分描述。
·Queue (队列):一个存储Exchange发来的消息的缓冲,并将消息主动发送给Consumer,或者Consumer主动来获取消息。见1.4部分的描述。
·Binding (绑定):队列 和 交换机 之间的关系。Exchange根据消息的属性和Binding的属性来转发消息。绑定的一个重要属性是binding_key。
·Connection (连接)和Channel(通道):生产者和消费者需要和 RabbitMQ建立TCP连接。一些应用需要多个connection,为了节省TCP连接,可以使用Channel,它可以被认为是一种轻型的共享TCP连接的连接。连接需要用户认证,并且支持TLS (SSL)。连接需要显式关闭。
·Virtual Host (虚拟主机):RabbitMQ用来进行资源隔离的机制。一个虚机主机会隔离用户、exchange、queue等。默认的虚拟主机为"/"。
1.2 关于消息message
消息结构:
消息的几个重要属性:
·routing_key:Direct和Topic类型的exchange会根据本属性来转发消息。
·delivery_mode: 将其值设置为2将用于消息的持久化,持久化的消息会被保存到磁盘上来防止其丢失。下面章节3有描述。
·reply_to:一般用来表示RPC实现中客户端的回调队列的名字。下面章节4有描述。
·correlation_id:用于使用RabbitMQ来实现RPC的情形。下面章节4有描述。
·content_type:表示消息data的编码格式名称。实际上RabbitMQ只负责原样传送消息因此不会使用该属性,该属性只被Publisher和Consumer使用。
消息的确认/删除机制:
Consumer 处理消息可能会失败,那么RabbitMQ怎么知道什么时候来删除queue中的消息呢?它使用两种机制:
·当 RabbitMQ主动将消息发给Consumer以后,它会删除消息
·当 Consumer发回一个确认后,RabbitMQ会删除消息。
第二种情况下,如果 RabbitMQ没收到确认,它会把消息重新放进队列(re-queued)并添加标识'redelivered'表明该消息之前已经发送过,如果没有Consumer的话,消息将保持到有下一个Consumer为止。
Consumer 可以主动告诉RabbitMQ消息处理失败了(拒绝消息),并告知RabbitMQ是删除消息还是重新放进队列。
1.3 exchange 交换机
exchange 有几个重要的属性:
·Name 名字:交换机名字。空字符串名字的exchange为默认的exchange。
·Type 类型:Direct, Fanout, Topic, Headers。类型决定exchange的消息转发能力。下面 章节2有描述。
·durable:值为True/False。值为true的exchange在rabbitmq重启后会被自动创建。OpenStack使用的exchange的该值都为false。
·auto_delete:值为True/False。设置为true的话,当所有消费者的连接都关闭后,该exchange会被自动删除。OpenStack使用的exchange的该值都为false。
·exclusive:值为True/False。设置为true的话,该exchange只允许被创建的connection使用,并且在该connection关闭后它会被自动删除。
RabbitMQ 默认会为每一种类型生成一个或者两个的默认的exchange:
·Fanout 类型:名字为amq.fanout
·Topic 类型:名字为amq.topic
·Headers 类型:名字为amq.match和amq.headers
·Direct 类型:名字为空字符串的exchange以及amq.direct。其中名字为空的exchange比较特殊。在一个Queue被创建后,RabbitMQ会自动建立它和该exchange之间的binding,并且设置其binding_key为该queue的名字。这样,该语句channel.basic_publish(exchange='', routing_key='hello', body=message)会让该默认的exchange将该message转发到名字为'hello'的队列中。
1.4 队列Queue
队列同样有类似于 exchange的name、durable、auto_delete和exclusive等属性,并且含义相同。
Exchange 会将消息分发(copy)到符合要求的所有队列中。
Consumer 可以主动获取或者被动接受Queue里面的消息:
1.5 rabbitmqctl Cli
RabbitMQ 提供Clirabbitmqctl[-n ] [-q] [] 来进行管理和配置。常用到的命令有:
·stop/start_app
·add/delete/list_vhosts
·list_queues/exchanges/bindings/connections/channels
·trace_on/off
2 消息转发机制
Exchange 根据它自身的类型type、消息的属性routing_key或者headers,以及Binding的属性binding_key来转发消息。Exchange 的类型Type使用的消息属性使用的Binding属性转发模式
Fanout- (忽略消息的转发属性)- (忽略binding的转发属性)Exchange 将消息转发到所有与它有binding关系的队列中。
这种方法转发效率较高。OpenStack大量使用这种类型的exchange。
Directrouting_key (任意的字符串,比如"abc")binding_key (任意的字符串,比如"abc")Exchange 只将消息转到binding的binding_key等于消息的routing_key的队列中。
Topicrouting_key (以"."分割的多单词字符串,比如abc.efg.hij)binding_key (包含"#"和"*"的以“.”分割的多单词字符串,比如*.efg.*)Exchange 只将消息转到消息的routing_key和binding的binding_key匹配的队列中。匹配规则如下:
(1)两者以"."分割的单词数目相同
(2)"*"可代表一个单词
(3)"#“可代表零个或多个单词
Headersheaders (消息头)binding_keyExchange 只将消息转到消息的headers和binding的binding_key匹配的队列中。匹配规则待研究。
OpenStack不使用该类型的exchange。
参考文档:
3 持久化
消息的持久化意味着在 RabbitMQ被重启后,消息依然还在。要实现持久化,得实现几个相关组件的持久化:
(1).交换机的持久化,需要将其durable属性设为true。chan.exchange_declare(exchange="sorting_room", type="direct",durable=True, auto_delete=False,)
(2).队列的持久化,需要将其durable属性设置为true。chan.queue_declare(queue="po_box",durable=True, exclusive=False, auto_delete=False)
(3).消息的持久化,需要将其Delivery Mode属性设置成2。msg.properties["delivery_mode"] = 2
4 RPC
可以使用 RabbitMQ来实现RPC机制,这里说说其实现原理:
过程:
(1).客户端Client设置消息的routing key为Service的队列op_q;设置消息的reply-to属性为返回的response的目标队列reponse_q,设置其correlation_id为以随机UUID,然后将消息发到exchange。比如channel.basic_publish(exchange='', routing_key='op_q', properties=pika.BasicProperties(reply_to = reponse_q, correlation_id = self.corr_id),body=request)
(2). Exchange将消息转发到Service的op_q
(3). Service收到该消息后进行处理,然后将response发到exchange,并设置消息的routing_key为原消息的reply_to属性,以及设置其correlation_id为原消息的correlation_id。
ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = props.correlation_id), body=str(response))
(4). Exchange将消息转发到reponse_q
(5). Client逐一接受response_q中的消息,检查消息的correlation_id是否为等于它发出的消息的correlation_id,是的话表明该消息为它需要的response。
这里有详细的阐述。
5 Python AMQP SDK
常用的PythonAMQP SDK包括:
·pika (支持AMQP 0.9.1,Python 2.6和2.7):https://github.com/pika/pika
5.1 一个简单的使用py-amqplib的Consumer实现
#创建Connection和Channel连接到RabbitMQ服务器
conn = amqp.Connection(host="localhost:5672", userid="guest", password="1111", virtual_host="/", insist=False)
chan = conn.channel()
#创建queue
result = chan.queue_declare(queue="debug", durable=True, exclusive=False, auto_delete=False)
#创建exchange
result = chan.exchange_declare(exchange="sorting_room2", type="topic", durable=True, auto_delete=False,)
#创建binding
result = chan.queue_bind(queue="debug", exchange="sorting_room2", routing_key="*.debug")
#回调函数,当有message到达queue后,该函数会被调用defrecv_callback(msg):
print'Received: '+ msg.body +' from channel #'+ str(msg.channel.channel_id)
# lChannel.basic_ack(msg.delivery_tag)#如果no_ack=False的话,可以需要发回一个确认
#启动一个consumer,consumer_tag是该consumer的一个唯一标识符#no_ack = True表示该consumer不会发回确认chan.basic_consume(queue='debug', no_ack=True, callback=recv_callback, consumer_tag="debugtag")
#等待有消息发到queuewhileTrue:
chan.wait()
#终止该consumer
chan.basic_cancel("testtag")
#关闭connection和channelchan.close()
conn.close()
5.2 一个简单的使用py-amqplib的Producer实现代码
fromamqplibimportclient_0_8 as amqpimportsys
#创建connection和channel
conn = amqp.Connection(host="localhost:5672", userid="guest", password="1111", virtual_host="/", insist=False)
chan = conn.channel()
#创建message
msg = amqp.Message(sys.argv[1])
msg.properties["delivery_mode"] = 2
#发送message
chan.basic_publish(msg,exchange="sorting_room2",routing_key=(sys.argv[2]))
#关闭connection和channelchan.close()
conn.close()
6 插件和消息追踪
RabbitMQ 支持使用插件来支持Management,Federation,Shovel和STOMP。所有的插件都在这里。
二、安装rabbitmq
(1)controller节点
安装消息队列服务:
# apt-get install rabbitmq-server -y
新建用户名为openstack密码为51elab的用户:
# rabbitmqctl add_user openstack 51elab
配置用户权限:
# rabbitmqctl set_user_tags openstack administrator (给用户赋予角色)
# rabbitmqctl set_permissions -p / openstack ".*" ".*" ".*" (给予该用户所有权限)
重启服务即可:
# service rabbitmq-server restart
三、插件和消息追踪
#打开监听web插件:
rabbitmq-plugins enable rabbitmq_management
#再次重启服务即可
它的GUI的访问地址是http://192.168.1.141:15672。进去后输入之前创建的用户名和密码就可以登录进去了,它的GUI上,提供了一个overview,还可以通过它来管理connection、channel、exchange和queue,以及virtual host,tracing和policy等。
#Rabbitmq的firehost机制:
该机制提供了一个查看被转发的消息的途径。当打开 firehose的时候,RabbitMQ会自动建立amq.rabbitmq.trace和amq.rabbitmq.log两个exchange。你可以编程创建queue从这两个exchange里面获取trace和log,从而观察每一个被处理的消息。这里有一个开源代码实现。
rabbitmq-plugins enable rabbitmq_tracing 打开插件
#再次重启服务即可
此时前往 RabbitMQ 管理界面,可以看到在 Admin 标签之下多出了一个栏目 “Tracing”,之前是没有这项的。
#消息记录开关:
单纯只是打开 firehose 并不能就获取到消息的记录,由于记录 RabbitMQ 中传递的消息将会影响于性能。所以默认 RabbitMQ 不会打开该功能。要打开,需要执行命令:rabbitmqctl trace_on同样,如果不再使用消息历史记录,使用如下命令关闭:rabbitmqctl trace_off
#查看消息记录:
在 RabbitMQ 管理界面 Admin 标签下,打开 Tracing 栏目。要查看消息需要新建一个 Trace,点击”Add a new trace”,设置一个名称 “all”,表达式中设置 “#”,点击添加。
表达式 “#” 意为把所有的消息都记录下来,你可以设置只记录某一个队列或交换机上的消息,如 “#.myqueue”,或者 “#.myexchange”添加之后,可以在 All traces 栏目下,看到新建的 trace。右侧为消息历史记录的文件,打开即可看到 RabbitMQ 中传递的消息历史记录。
最后
以上就是着急雨为你收集整理的python openstack rabbitmq_关于openstack的Rabbitmq安装的全部内容,希望文章能够帮你解决python openstack rabbitmq_关于openstack的Rabbitmq安装所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复