我是靠谱客的博主 着急雨,最近开发中收集的这篇文章主要介绍python openstack rabbitmq_关于openstack的Rabbitmq安装,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

RABBITMQ服务

一、RATTITMQ的概念

RabbitMQ 是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。

AMQP是一个定义了在应用或者组织之间传送消息的协议的开放标准 (an open standard for passing business messages between applications or organizations)。AMQP目标在于解决在两个应用之间传送消息存在的下列问题:

·网络是不可靠的 =>消息需要保存后再转发并有出错处理机制

·与本地调用相比,网络速度慢 =>得异步调用

·应用之间是不同的(比如不同语言实现、不同操作系统等) =>得与应用无关

·应用会经常变化 =>同上

AMQP 使用异步的、应用对应用的、二进制数据通信来解决这些问题。

RabbitMQ是 AMQP的一种实现,它包括Server(服务器端)、Client(客户端) 和Plugins(插件)。RabbitMQ服务器是用Erlang语言编写的。

1.1 RabbitMQ 的概念非常清晰、简洁

其基本概念参见下图:

简单说明如下:

·Message (消息):RabbitMQ转发的二进制对象,包括Headers(头)、Properties(属性)和Data(数据),其中数据部分不是必要的。具体见1.2部分的描述。

·Producer(生产者): 消息的生产者,负责产生消息并把消息发到交换机 Exhange的应用。

·Consumer (消费者):使用队列 Queue从Exchange中获取消息的应用。

·Exchange (交换机):负责接收生产者的消息并把它转到到合适的队列 Queue。下面有1.3部分描述。

·Queue (队列):一个存储Exchange发来的消息的缓冲,并将消息主动发送给Consumer,或者Consumer主动来获取消息。见1.4部分的描述。

·Binding (绑定):队列 和 交换机 之间的关系。Exchange根据消息的属性和Binding的属性来转发消息。绑定的一个重要属性是binding_key。

·Connection (连接)和Channel(通道):生产者和消费者需要和 RabbitMQ建立TCP连接。一些应用需要多个connection,为了节省TCP连接,可以使用Channel,它可以被认为是一种轻型的共享TCP连接的连接。连接需要用户认证,并且支持TLS (SSL)。连接需要显式关闭。

·Virtual Host (虚拟主机):RabbitMQ用来进行资源隔离的机制。一个虚机主机会隔离用户、exchange、queue等。默认的虚拟主机为"/"。

1.2 关于消息message

消息结构:

消息的几个重要属性:

·routing_key:Direct和Topic类型的exchange会根据本属性来转发消息。

·delivery_mode: 将其值设置为2将用于消息的持久化,持久化的消息会被保存到磁盘上来防止其丢失。下面章节3有描述。

·reply_to:一般用来表示RPC实现中客户端的回调队列的名字。下面章节4有描述。

·correlation_id:用于使用RabbitMQ来实现RPC的情形。下面章节4有描述。

·content_type:表示消息data的编码格式名称。实际上RabbitMQ只负责原样传送消息因此不会使用该属性,该属性只被Publisher和Consumer使用。

消息的确认/删除机制:

Consumer 处理消息可能会失败,那么RabbitMQ怎么知道什么时候来删除queue中的消息呢?它使用两种机制:

·当 RabbitMQ主动将消息发给Consumer以后,它会删除消息

·当 Consumer发回一个确认后,RabbitMQ会删除消息。

第二种情况下,如果 RabbitMQ没收到确认,它会把消息重新放进队列(re-queued)并添加标识'redelivered'表明该消息之前已经发送过,如果没有Consumer的话,消息将保持到有下一个Consumer为止。

Consumer 可以主动告诉RabbitMQ消息处理失败了(拒绝消息),并告知RabbitMQ是删除消息还是重新放进队列。

1.3 exchange 交换机

exchange 有几个重要的属性:

·Name 名字:交换机名字。空字符串名字的exchange为默认的exchange。

·Type 类型:Direct, Fanout, Topic, Headers。类型决定exchange的消息转发能力。下面 章节2有描述。

·durable:值为True/False。值为true的exchange在rabbitmq重启后会被自动创建。OpenStack使用的exchange的该值都为false。

·auto_delete:值为True/False。设置为true的话,当所有消费者的连接都关闭后,该exchange会被自动删除。OpenStack使用的exchange的该值都为false。

·exclusive:值为True/False。设置为true的话,该exchange只允许被创建的connection使用,并且在该connection关闭后它会被自动删除。

RabbitMQ 默认会为每一种类型生成一个或者两个的默认的exchange:

·Fanout 类型:名字为amq.fanout

·Topic 类型:名字为amq.topic

·Headers 类型:名字为amq.match和amq.headers

·Direct 类型:名字为空字符串的exchange以及amq.direct。其中名字为空的exchange比较特殊。在一个Queue被创建后,RabbitMQ会自动建立它和该exchange之间的binding,并且设置其binding_key为该queue的名字。这样,该语句channel.basic_publish(exchange='', routing_key='hello',             body=message)会让该默认的exchange将该message转发到名字为'hello'的队列中。

1.4 队列Queue

队列同样有类似于 exchange的name、durable、auto_delete和exclusive等属性,并且含义相同。

Exchange 会将消息分发(copy)到符合要求的所有队列中。

Consumer 可以主动获取或者被动接受Queue里面的消息:

1.5 rabbitmqctl  Cli

RabbitMQ 提供Clirabbitmqctl[-n ] [-q] [] 来进行管理和配置。常用到的命令有:

·stop/start_app

·add/delete/list_vhosts

·list_queues/exchanges/bindings/connections/channels

·trace_on/off

2 消息转发机制

Exchange 根据它自身的类型type、消息的属性routing_key或者headers,以及Binding的属性binding_key来转发消息。Exchange 的类型Type使用的消息属性使用的Binding属性转发模式

Fanout- (忽略消息的转发属性)- (忽略binding的转发属性)Exchange 将消息转发到所有与它有binding关系的队列中。

这种方法转发效率较高。OpenStack大量使用这种类型的exchange。

Directrouting_key (任意的字符串,比如"abc")binding_key (任意的字符串,比如"abc")Exchange 只将消息转到binding的binding_key等于消息的routing_key的队列中。

Topicrouting_key (以"."分割的多单词字符串,比如abc.efg.hij)binding_key (包含"#"和"*"的以“.”分割的多单词字符串,比如*.efg.*)Exchange 只将消息转到消息的routing_key和binding的binding_key匹配的队列中。匹配规则如下:

(1)两者以"."分割的单词数目相同

(2)"*"可代表一个单词

(3)"#“可代表零个或多个单词

Headersheaders (消息头)binding_keyExchange 只将消息转到消息的headers和binding的binding_key匹配的队列中。匹配规则待研究。

OpenStack不使用该类型的exchange。

参考文档:

3 持久化

消息的持久化意味着在 RabbitMQ被重启后,消息依然还在。要实现持久化,得实现几个相关组件的持久化:

(1).交换机的持久化,需要将其durable属性设为true。chan.exchange_declare(exchange="sorting_room", type="direct",durable=True, auto_delete=False,)

(2).队列的持久化,需要将其durable属性设置为true。chan.queue_declare(queue="po_box",durable=True, exclusive=False, auto_delete=False)

(3).消息的持久化,需要将其Delivery Mode属性设置成2。msg.properties["delivery_mode"] = 2

4 RPC

可以使用 RabbitMQ来实现RPC机制,这里说说其实现原理:

过程:

(1).客户端Client设置消息的routing key为Service的队列op_q;设置消息的reply-to属性为返回的response的目标队列reponse_q,设置其correlation_id为以随机UUID,然后将消息发到exchange。比如channel.basic_publish(exchange='', routing_key='op_q', properties=pika.BasicProperties(reply_to = reponse_q, correlation_id = self.corr_id),body=request)

(2). Exchange将消息转发到Service的op_q

(3). Service收到该消息后进行处理,然后将response发到exchange,并设置消息的routing_key为原消息的reply_to属性,以及设置其correlation_id为原消息的correlation_id。

ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = props.correlation_id), body=str(response))

(4). Exchange将消息转发到reponse_q

(5). Client逐一接受response_q中的消息,检查消息的correlation_id是否为等于它发出的消息的correlation_id,是的话表明该消息为它需要的response。

这里有详细的阐述。

5 Python AMQP SDK

常用的PythonAMQP SDK包括:

·pika (支持AMQP 0.9.1,Python 2.6和2.7):https://github.com/pika/pika

5.1 一个简单的使用py-amqplib的Consumer实现

#创建Connection和Channel连接到RabbitMQ服务器

conn = amqp.Connection(host="localhost:5672", userid="guest", password="1111", virtual_host="/", insist=False)

chan = conn.channel()

#创建queue

result = chan.queue_declare(queue="debug", durable=True, exclusive=False, auto_delete=False)

#创建exchange

result = chan.exchange_declare(exchange="sorting_room2", type="topic", durable=True, auto_delete=False,)

#创建binding

result = chan.queue_bind(queue="debug", exchange="sorting_room2", routing_key="*.debug")

#回调函数,当有message到达queue后,该函数会被调用defrecv_callback(msg):

print'Received: '+ msg.body +' from channel #'+ str(msg.channel.channel_id)

# lChannel.basic_ack(msg.delivery_tag)#如果no_ack=False的话,可以需要发回一个确认

#启动一个consumer,consumer_tag是该consumer的一个唯一标识符#no_ack = True表示该consumer不会发回确认chan.basic_consume(queue='debug', no_ack=True, callback=recv_callback, consumer_tag="debugtag")

#等待有消息发到queuewhileTrue:

chan.wait()

#终止该consumer

chan.basic_cancel("testtag")

#关闭connection和channelchan.close()

conn.close()

5.2 一个简单的使用py-amqplib的Producer实现代码

fromamqplibimportclient_0_8 as amqpimportsys

#创建connection和channel

conn = amqp.Connection(host="localhost:5672", userid="guest", password="1111", virtual_host="/", insist=False)

chan = conn.channel()

#创建message

msg = amqp.Message(sys.argv[1])

msg.properties["delivery_mode"] = 2

#发送message

chan.basic_publish(msg,exchange="sorting_room2",routing_key=(sys.argv[2]))

#关闭connection和channelchan.close()

conn.close()

6 插件和消息追踪

RabbitMQ 支持使用插件来支持Management,Federation,Shovel和STOMP。所有的插件都在这里。

二、安装rabbitmq

(1)controller节点

安装消息队列服务:

# apt-get install rabbitmq-server -y

新建用户名为openstack密码为51elab的用户:

#  rabbitmqctl add_user openstack 51elab

配置用户权限:

# rabbitmqctl set_user_tags openstack administrator (给用户赋予角色)

# rabbitmqctl set_permissions -p /  openstack ".*" ".*" ".*" (给予该用户所有权限)

重启服务即可:

# service rabbitmq-server restart

三、插件和消息追踪

#打开监听web插件:

rabbitmq-plugins enable rabbitmq_management

#再次重启服务即可

它的GUI的访问地址是http://192.168.1.141:15672。进去后输入之前创建的用户名和密码就可以登录进去了,它的GUI上,提供了一个overview,还可以通过它来管理connection、channel、exchange和queue,以及virtual host,tracing和policy等。

#Rabbitmq的firehost机制:

该机制提供了一个查看被转发的消息的途径。当打开 firehose的时候,RabbitMQ会自动建立amq.rabbitmq.trace和amq.rabbitmq.log两个exchange。你可以编程创建queue从这两个exchange里面获取trace和log,从而观察每一个被处理的消息。这里有一个开源代码实现。

rabbitmq-plugins enable rabbitmq_tracing 打开插件

#再次重启服务即可

此时前往 RabbitMQ 管理界面,可以看到在 Admin 标签之下多出了一个栏目 “Tracing”,之前是没有这项的。

#消息记录开关:

单纯只是打开 firehose 并不能就获取到消息的记录,由于记录 RabbitMQ 中传递的消息将会影响于性能。所以默认 RabbitMQ 不会打开该功能。要打开,需要执行命令:rabbitmqctl trace_on同样,如果不再使用消息历史记录,使用如下命令关闭:rabbitmqctl trace_off

#查看消息记录:

在 RabbitMQ 管理界面 Admin 标签下,打开 Tracing 栏目。要查看消息需要新建一个 Trace,点击”Add a new trace”,设置一个名称 “all”,表达式中设置 “#”,点击添加。

表达式 “#” 意为把所有的消息都记录下来,你可以设置只记录某一个队列或交换机上的消息,如 “#.myqueue”,或者 “#.myexchange”添加之后,可以在 All traces 栏目下,看到新建的 trace。右侧为消息历史记录的文件,打开即可看到 RabbitMQ 中传递的消息历史记录。

最后

以上就是着急雨为你收集整理的python openstack rabbitmq_关于openstack的Rabbitmq安装的全部内容,希望文章能够帮你解决python openstack rabbitmq_关于openstack的Rabbitmq安装所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(54)

评论列表共有 0 条评论

立即
投稿
返回
顶部