1,基本概念
RabbitMQ是流行的开源消息队列系统,用erlang语言开发。我曾经对这门语言挺有兴趣,学过一段时间,后来没坚持。
RabbitMQ是AMQP(高级消息队列协议)的标准实现。如果不熟悉AMQP,直接看
RabbitMQ的文档会比较困难。不过它也只有几个关键概念,这里简单介绍。
几个概念说明:
Broker:简单来说就是消息队列服务器实体。
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
producer:消息生产者,就是投递消息的程序。
consumer:消息消费者,就是接受消息的程序。
channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
producer:消息生产者,就是投递消息的程序。
consumer:消息消费者,就是接受消息的程序。
channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。
消息队列的使用过程大概如下:
(1)客户端连接到消息队列服务器,打开一个channel。
(2)客户端声明一个exchange,并设置相关属性。
(3)客户端声明一个queue,并设置相关属性。
(4)客户端使用routing key,在exchange和queue之间建立好绑定关系。
(5)客户端投递消息到exchange。
(2)客户端声明一个exchange,并设置相关属性。
(3)客户端声明一个queue,并设置相关属性。
(4)客户端使用routing key,在exchange和queue之间建立好绑定关系。
(5)客户端投递消息到exchange。
exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。
exchange也有几个类型,完全根据key进行投递的叫做Direct交换机,例如,绑定时设置了routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。对key进行模式匹配后进行投递的叫做Topic交换机,符号”#”匹配一个或多个词,符号”*”匹配正好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。还有一种不需要key的,叫做Fanout交换机,它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。
RabbitMQ支持消息的持久化,也就是数据写在磁盘上,为了数据安全考虑,我想大多数用户都会选择持久化。消息队列持久化包括3个部分:
(1)exchange持久化,在声明时指定durable => 1
(2)queue持久化,在声明时指定durable => 1
(3)消息持久化,在投递时指定delivery_mode => 2(1是非持久化)
(1)exchange持久化,在声明时指定durable => 1
(2)queue持久化,在声明时指定durable => 1
(3)消息持久化,在投递时指定delivery_mode => 2(1是非持久化)
如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的。如果exchange和queue两者之间有一个持久化,一个非持久化,就不允许建立绑定。
2,环境搭建
因为最经工作在
Openstack的开发上,所以我直接在ubuntu上使用
devstack安装的
RabbitMQ。Ubuntu上也可以手动安装
RabbitMQ(sudo apt-get install rabbitmq-server)。
RabbitMQ官网给的教程使用的
pika连接
RabbitMQ。 Devstack 默认的是用kombu连接
RabbitMQ。所以先要安装pika。
复制代码
1sudo apt-get install python-pip git-core
复制代码
1sudo pip install pika==0.9.8
环境搭建起来后就可以按照教程试验了。
3,实例教程
3.1 生产/消费者
最简单的示例,先尝试发送一个message给接受者(这里的例子和官网教程有点差别,设置连接参数和官网不同,因为devstack默认安装的Rabbitmq时修改了guest的密码为nova)

receiver.py
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16#!/usr/bin/env python import pika credentials = pika.PlainCredentials('guest', 'nova') parameters = pika.ConnectionParameters('localhost',5672,'/',credentials ) connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.queue_declare(queue='hello') print ' [*] Waiting for messages. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] Received %r" % (body,) channel.basic_consume(callback, queue='hello', no_ack=True) channel.start_consuming()
send.py
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13#!/usr/bin/env python import pika credentials = pika.PlainCredentials('guest', 'nova') parameters = pika.ConnectionParameters('localhost',5672,'/',credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.queue_declare(queue='hello') channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print " [x] Sent 'Hello World!'" connection.close()
python
receiver.py
python send.py
3.2 工作队列

生产者生产消息,放入由routing_key指定的队列。消费者绑定该queue消费message。
new_task.py
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18#!/usr/bin/env python import sys import pika credentials = pika.PlainCredentials('guest', 'nova') parameters = pika.ConnectionParameters('localhost',5672,'/',credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent )) print " [x] Sent %r" % (message,) connection.close()
worker.py
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20#!/usr/bin/env python import time import pika credentials = pika.PlainCredentials('guest', 'nova') parameters = pika.ConnectionParameters('localhost',5672,'/',credentials ) connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print ' [*] Waiting for messages. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done" ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='task_queue') channel.start_consuming()
复制代码
1
2shell1$ python worker.py [*] Waiting for messages. To exit press CTRL+C
复制代码
1
2shell2$ python worker.py [*] Waiting for messages. To exit press CTRL+C
复制代码
1
2
3
4
5shell3$ python new_task.py First message. shell3$ python new_task.py Second message.. shell3$ python new_task.py Third message... shell3$ python new_task.py Fourth message.... shell3$ python new_task.py Fifth message.....

发送者发送消息到制定的exchange,接受者绑定队列到exchange接受消息。
emit_log.py
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15#!/usr/bin/env python import sys import pika credentials = pika.PlainCredentials('guest', 'nova') parameters = pika.ConnectionParameters('localhost',5672,'/',credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.exchange_declare(exchange='logs', type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) print " [x] Sent %r" % (message,) connection.close()
receive_log.py
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22#!/usr/bin/env python import time import pika credentials = pika.PlainCredentials('guest', 'nova') parameters = pika.ConnectionParameters('localhost',5672,'/',credentials ) connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.exchange_declare(exchange='logs', type='fanout') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name) print ' [*] Waiting for logs. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] %r" % (body,) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
复制代码
1$ python emit_log.py
复制代码
1$ python receive_logs.py

发送者不但可以指定消息的exchange,还可以指定消息的routing_key。进一步增加了消息投递的灵活性。
emit_logs_direct.py
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16#!/usr/bin/env python import sys import pika credentials = pika.PlainCredentials('guest', 'nova') parameters = pika.ConnectionParameters('localhost',5672,'/',credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') severity = sys.argv[1] if len(sys.argv) > 1 else 'info' message = ' '.join(sys.argv[2:]) or "info: Hello World!" channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) print " [x] Sent %r:%r" % (severity,message) connection.close()
receive_logs_direct.py
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29#!/usr/bin/env python import sys import time import pika credentials = pika.PlainCredentials('guest', 'nova') parameters = pika.ConnectionParameters('localhost',5672,'/',credentials ) connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: print >> sys.stderr, "Usage: %s [info] [warning] [error]" % (sys.argv[0],) sys.exit(1) for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print ' [*] Waiting for logs. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] %r:%r" % (method.routing_key, body,) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
复制代码
1
2$ python receive_logs_direct.py info warning error [*] Waiting for logs. To exit press CTRL+C
复制代码
1
2$ python emit_log_direct.py error "Run. Run. Or it will explode." [x] Sent 'error':'Run. Run. Or it will explode.'
3.2 Topics

比exchange+routing_key更加灵活,routing_key可以支持通配符。
emit_logs_topic.py
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16#!/usr/bin/env python import pika import sys credentials = pika.PlainCredentials('guest', 'nova') parameters = pika.ConnectionParameters('localhost',5672,'/',credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) print " [x] Sent %r:%r" % (routing_key, message) connection.close()
receive_logs_topic.py
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26#!/usr/bin/env python import pika import sys credentials = pika.PlainCredentials('guest', 'nova') parameters = pika.ConnectionParameters('localhost',5672,'/',credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: print >> sys.stderr, "Usage: %s [binding_key]..." % (sys.argv[0],) sys.exit(1) for binding_key in binding_keys: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key) print ' [*] Waiting for logs. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] %r:%r" % (method.routing_key, body,) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
复制代码
1python emit_log_topic.py "kern.critical" "A critical kernel error"
复制代码
1python receive_logs_topic.py "kern.*" "*.critical"
3.2 远程过程调用

直接从一个进程调用另外一个进程。
rpc_client.py
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34#!/usr/bin/env python import pika import uuid class FibonacciRpcClient(object): def __init__(self): credentials = pika.PlainCredentials('guest', 'nova') parameters = pika.ConnectionParameters('localhost',5672,'/',credentials) self.connection = pika.BlockingConnection(parameters) self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue) def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: self.response = body def call(self, n): self.response = None self.corr_id = str(uuid.uuid4()) self.channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to = self.callback_queue, correlation_id = self.corr_id, ), body=str(n)) while self.response is None: self.connection.process_data_events() return int(self.response) fibonacci_rpc = FibonacciRpcClient() print " [x] Requesting fib(30)" response = fibonacci_rpc.call(30) print " [.] Got %r" % (response,)
rpc_server.py
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29#!/usr/bin/env python import pika credentials = pika.PlainCredentials('guest', 'nova') parameters = pika.ConnectionParameters('localhost',5672,'/',credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.queue_declare(queue='rpc_queue') def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n-1) + fib(n-2) def on_request(ch, method, props, body): n = int(body) print " [.] fib(%s)" % (n,) response = fib(n) ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = props.correlation_id), body=str(response)) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(on_request, queue='rpc_queue') print " [x] Awaiting RPC requests" channel.start_consuming()
复制代码
1
2$ python rpc_server.py [x] Awaiting RPC requests
复制代码
1
2$ python rpc_client.py [x] Requesting fib(30)
3,结束
Rabbitmq的topics方式,远程过程调用方式在openstack的源码中大量使用,如果不理解这几种方式,理解openstack源码还是比较费力的。
kombu的例子直接在kombu官网可以找到,在devstack环境可以直接跑。这里就不列了。
最后
以上就是聪慧小蝴蝶最近收集整理的关于Rabbitmq学习笔记的全部内容,更多相关Rabbitmq学习笔记内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复