概述
说明
通过上篇博文翻译了AMQP协议中有关队列Queues的相关内容,本篇博文将继续翻译官方关于AMQP协议介绍的有关Consumer和Message部分的内容。通过翻译,学习记录在RabbitMQ中AMQP协议的Consumer和Message的作用。
正文
Consumers
队列中存储的消息如果不能被程序消费处理,那它们就是无用的。在AMQP 0-9-1模型中,应用程序有两种方式从队列获取消息:
- 消息被推送到应用程序中(push 模式)
- 程序按需从队列中拉取消息(pull 模式)
当使用push模式时,应用程序必须表明它对某个特定的队列中的消息感兴趣。当以这种方式消费消息时,我们称之为在队列注册了一个消费者,或者简单来说,队列有了一个订阅者。每个队列都存在不止一个消费者的情况,或者是注册了一个具有独占性的消费者(当消费时,禁止其它消费者在此队列消费消息,具有独占性)。
每个消费者(订阅者)都有一个被称为消费者标签(consumer tag)的凭证,它被用来取消对消息的订阅。这些标签只能是字符串类型的。
Message Acknowledgements
消费者程序是指接收和处理消息的应用程序。程序在处理消息时可能会失败或者程序可能会崩溃,还有网络也可能发生问题,这里就引起一个问题:AMQP代理服务器什么时候可以将消息从队列中删除?AMQP 0-9-1协议提出了两种方案:
- 在服务器将消息发送给应用程序后(使用AMQP的basic.deliver或basic.get-ok方法)
- 在应用程序发送一个确认消息后(使用AMQP的basic.ack方法)
第一种方案称为自动确认模式,第二种称为显示确认模式。当使用显示确认模式时,应用程序可以选择在什么时候发送确认消息。可以在接收到消息后,或者在存储消息后处理消息前,又或者在完全将消息处理结束后(例如,成功获取一个网页,在处理并存储后发送确认)。
如果一个消费者在未发送确认消息时宕机,AMQP代理服务器将重新把消息发送给其他消费者,或者在没有其他消费者的情况下,服务器将一直保存消息直到有消费者注册到该队列,进行重新派发。
Rejecting Messages
当消费者接收处理一个消息时,可能会失败。这时应用程序应该通过一个拒绝消息向服务器表明该消息处理失败(或当时无法完成)。当拒绝一个消息时,应用程序可以要去服务器丢弃或重新入队派发该消息。当队列只有一个消费者是,要确保在拒绝消息时不会造成消息被不断重复派发的无限循环,出现一个消息在同一个消费者上不断地被入队重新派发。
Negative Acknowledgements
使用AMQP的basic.reject方法来拒绝消息,但是该方法有一个限制:无法像确认那样一次拒绝多个消息,但是RabbitMQ有一个解决方案,RabbitMQ提供了一个AMQP协议的扩展,被称为消极确认(negative acknowledgements)或acks。更多的详细信息,请参照the help page。
Prefetching Messages
在多个消费者共享一个队列时,能够指定每个消费者可以在下次消息确认前,被一次推送多少个消息是有用的。这个方法可以作为简单的负载均衡技术或者在消息倾向被批量发送时可以提供系统的吞吐量。比如,如果生产者由于工作性质每分钟都发送消息时,可以设置该参数提高吞吐量。
注意,RabbitMQ只支持通道级别的prefetch-count,不能基于连接或大小进行预取。
Message Attributes and Payload
在AMQP模型中消息有很多属性,AMQP定义的一些属性是十分普遍的,应用程序开发者不必去考虑属性的确切名称。这里有一些例子:
- Content type
- Content encoding
- Routing key
- Delivery mode(persistent or not)
- Message priority
- Message publishing timestamp
- Expiration period
- Publisher application id
有些属性被AMQP代理服务器使用,但更多的属性是被接收它们的应用程序使用。有些属性时可选的比如headers,它们跟HTTP的X-Headers属性十分相似。消息的属性是在消息被发送时设置的。
AMQP消息也有一个有效负载(它们携带的数据),AMQP服务器通常将它们视为一个不透明的字节数组。服务器不会检查或更改这个负载。有时消息只包含了属性信息但是没有数据,为了将结构化数据作为消息的有效负载进行发送,使用一些如JSON,Thrift,Protocol Buffers 和 MessagePack等不同序列化格式对数据进行序列化是十分常见的。AMQP通常使用content-typ和content-enconding属性来设置消息,但这只是按照惯例进行设置。
在消息被发送时设置了持久化属性,这使得AMQP服务器将会把这类型的消息保存到硬盘。当服务器重启时需要确保接收到的持久化消息不会丢失。简单地说,比如讲消息发送到一个持久化的交换机或者消息被路由到一个持久化的队列,这种情况下都不使该消息得到持久化,消息是否持久化完全依赖于消息自身属性的设置。当发送持久化的消息时,会影响服务的性能(就像数据持久化,持久化会对性能造成一定的影响)。
Message Acknowledgements
由于网络是不可靠的并且应用程序也会崩溃,所以经常需要某些处理确认。有时仅需要确认消息已经被接收到了,有时需要确认消息已经被消费者处理了,比如,证实有强制性的消息已经被持久化到数据库或者被索引。
这种情况是普遍存在的,所以AMQP协议有一个称为消息确认(有时也指acks)的内置功能,消费者使用该功能来确认消息的成功发送或成功处理。如果应用程序崩溃(当连接断开时AQMP代理服务器将会发现)或服务器期待的消息的确认消息没有接收到,这时,消息将被重新入队派发(如果有其他消费者存在,会立即将消息发送给其他消费者)。
在协议中内置确认机制可以帮助开发人员建立更具健壮性的软件。
最后
以上就是甜蜜猫咪为你收集整理的RabbitMQ学习(十): AMQP 0-9-1协议之Consumer and Message说明正文的全部内容,希望文章能够帮你解决RabbitMQ学习(十): AMQP 0-9-1协议之Consumer and Message说明正文所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复