概述
目录
AMQP介绍
介绍:
工作过程:
深入理解:
入门案例:消息发送与消息接收
WorkQueue
场景:
案例:多个消费者绑定一个队列
发布订阅-Fanout Exchange
介绍:
步骤:编辑
1.consumer服务中声明交换机和队列,将队列与交换机进行绑定(利用Binding中bind方法)
2.在consumer中声明两个消费者消费队列中的消息
3.在Publisher中发布消息 ,发布消息给交换机
总结:
DirectExchang:可以根据规则路由到指定的消息队列
Direct交换机与Fanout交换机之间的差异+@RabbitListener注解中声明队列与交换机的常见注解
TopicExchange
案例:利用SpringAMQP来演示TopicExchange的使用
消息转换器
AMQP介绍
介绍:
(11条消息) AMQP是什么?看完你就知道了_hello_读书就是赚钱的博客-CSDN博客_amqp
好处:
什么connection:消息队列的连接、channel:服务发送接收消息的通道、Queue:消息队列——>这些你都不需要自己编写
工作过程:
发布者(Publisher)发布消息(Message),经由交换机(Exchange)。
交换机根据路由规则将收到的消息分发给与该交换机绑定的队列(Queue)。
最后 AMQP 代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。
深入理解:
1、发布者、交换机、队列、消费者都可以有多个。同时因为 AMQP 是一个网络协议,所以这个过程中的发布者,消费者,消息代理 可以分别存在于不同的设备上。
2、发布者发布消息时可以给消息指定各种消息属性(Message Meta-data)。有些属性有可能会被消息代理(Brokers)使用,然而其他的属性则是完全不透明的,它们只能被接收消息的应用所使用。
3、从安全角度考虑,网络是不可靠的,又或是消费者在处理消息的过程中意外挂掉,这样没有处理成功的消息就会丢失。基于此原因,AMQP 模块包含了一个消息确认(Message Acknowledgements)机制:当一个消息从队列中投递给消费者后,不会立即从队列中删除,直到它收到来自消费者的确认回执(Acknowledgement)后,才完全从队列中删除。
4、在某些情况下,例如当一个消息无法被成功路由时(无法从交换机分发到队列),消息或许会被返回给发布者并被丢弃。或者,如果消息代理执行了延期操作,消息会被放入一个所谓的死信队列中。此时,消息发布者可以选择某些参数来处理这些特殊情况。
入门案例:消息发送与消息接收
消息发送
package cn.itcast.mq.spring;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* @author diao 2022/5/13
*/
@RunWith(SpringRunner.class)
@EnableRabbit
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessageSimpleQueue(){
String queueName="simple.queue";
String messgae="Hello,生日快乐";
rabbitTemplate.convertAndSend(queueName,messgae);
}
}
可以发现队列simple.queue中 多了一条消息
AMOP如何发送消息?
引入amqp的starter依赖,然后我们配置RabbitMQ的地址,最后利用RabbitTemplate中的convertAndSend方法发送消息
消息接收:
1、首先进行yaml配置,RabbitMQ连接信息
2、新建一个组件,里面编写消费逻辑(利用@RabbitListener监听队列,只要队列一有消息就进行接收)
package cn.itcast.mq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author diao 2022/5/13
*/
@Component
public class SpringRabbitListener {
/**
* @RabbitListener监听队列
* @param msg
*/
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg){
System.out.println("消费者接收到的simple.queue中消息为:"+msg);
}
}
注意:消息一旦被消费就会从队列删除,RabbitMQ没有消息回溯功能
WorkQueue
场景:
采用多个工作队列处理消息,避免消息堆积;
案例:多个消费者绑定一个队列
1.首先配置类中把队列支棱起来,Publisher发布消息给消息队列simple.queue
@RunWith(SpringRunner.class)
@EnableRabbit
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
// @Test
// public void testSendMessageSimpleQueue(){
// String queueName="simple.queue";
// String messgae="Hello,生日快乐";
// rabbitTemplate.convertAndSend(queueName,messgae);
// }
@Test
public void testSendMessageWorkQueue() throws InterruptedException {
String queueName="simple.queue";
String messgae="Fairy同学,祝你生日快乐--";
for (int i = 0; i < 50; i++) {
rabbitTemplate.convertAndSend(queueName,messgae+i);
Thread.sleep(20);
}
}
2.consumer服务中定义两个消费者,利用@RabbitListener监听队列
@Component
public class SpringRabbitListener {
/**
* @RabbitListener监听队列
* @param msg
*/
// @RabbitListener(queues = "simple.queue")
// public void listenSimpleQueue(String msg){
// System.out.println("消费者接收到的simple.queue中消息为:"+msg);
// }
/**
* 我们的思想是:两个消费者,一个1s能够消费50条,一个消费1s5条,一共50条信息,能者多劳
* 现实:默认是平均分配->造成处理消息时间过长(因为第二个消费者处理消息很慢)
* @param msg
* @throws InterruptedException
*/
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue(String msg) throws InterruptedException {
System.out.println("消费者1......接收到消息为:"+msg+"当前时间:"+ LocalTime.now());
//每s50个消息
Thread.sleep(20);
}
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
System.err.println("消费者2.......接收到消息为:"+msg+"当前时间:"+ LocalTime.now());
Thread.sleep(200);
}
我们可以采用prefetch设置消费者预取的消息数量,这样就不会出现平均分配消息的现象了——>而是根据每个消费者的能力来处理消息
发布(Publish)和订阅(Subscribe)
因为一个消息只可能被一个消费者消费,消费完就会删除该消费,所以我们需要利用发布和订阅来进行处理,这样就可以将同一个消息发送给多个消费者——> 利用exchange交换机将消息发送给多个队列,然后队列是可以将消息进行储存的,发给多个消费者
常见的exchange类型:Fanout(广播)、Direct(路由)、Topic(话题)
注意:exchange负责消息路由,而不是储存,路由失败则会造成消息丢失(也就是说路由到哪个消息队列)。
发布订阅-Fanout Exchange
介绍:
exchange: 只能作为消息的转发,记住不能作为消息的缓存,如果路由失败消息就丢了
交换机exchange:通过FanoutExchange返回交换机
消息队列Queue:通过Queue返回消息队列
将消息队列与交换机进行绑定Bingding:可以通过Binding中的bind方法进行绑定
步骤:
1.consumer服务中声明交换机和队列,将队列与交换机进行绑定(利用Binding中bind方法)
package cn.itcast.mq.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author diao 2022/5/14 */ @Configuration public class FanoutConfig { //itcast.fanout交换机 @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("itcast.fanout"); } //fanout.queue1任务队列 @Bean public Queue fanoutQueue1(){ return new Queue("fanout.queue1"); } //消息队列2 @Bean public Queue fanoutQueue2(){ return new Queue("fanout.queue2"); } //将消息队列与交换机进行绑定:类型和名称要保持一致,不然无法注入 @Bean public Binding fanoutBinding1(Queue fanoutQueue1,FanoutExchange fanoutExchange){ return BindingBuilder .bind(fanoutQueue1) .to(fanoutExchange); } //第二个消息队列与交换机进行绑定 @Bean public Binding fanouting2(Queue fanoutQueue2,FanoutExchange fanoutExchange){ return BindingBuilder .bind(fanoutQueue2) .to(fanoutExchange); } }
2.在consumer中声明两个消费者消费队列中的消息
package cn.itcast.mq.listener; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.time.LocalTime; /** * @author diao 2022/5/13 */ @Component public class SpringRabbitListener { /** * @RabbitListener监听队列 * @param msg */ // @RabbitListener(queues = "simple.queue") // public void listenSimpleQueue(String msg){ // System.out.println("消费者接收到的simple.queue中消息为:"+msg); // } /** * 我们的思想是:两个消费者,一个1s能够消费50条,一个消费1s5条,一共50条信息,能者多劳 * 现实:默认是平均分配->造成处理消息时间过长(因为第二个消费者处理消息很慢) * @param msg * @throws InterruptedException */ @RabbitListener(queues = "simple.queue") public void listenWorkQueue(String msg) throws InterruptedException { System.out.println("消费者1......接收到消息为:"+msg+"当前时间:"+ LocalTime.now()); //每s50个消息 Thread.sleep(20); } @RabbitListener(queues = "simple.queue") public void listenWorkQueue2(String msg) throws InterruptedException { System.err.println("消费者2.......接收到消息为:"+msg+"当前时间:"+ LocalTime.now()); Thread.sleep(200); } /** * 以下是利用交换机通知信息给处理消息的消费者 */ @RabbitListener(queues = "fanout.queue1") public void listenFanoutQueue1(String msg){ System.out.println("消费者接收到的fanout.queue1的消息为:"+msg); } @RabbitListener(queues = "fanout.queue2") public void listenFanoutQueue2(String msg){ System.out.println("消费者收到fanout.queue2的消息为:"+msg); } }
3.在Publisher中发布消息 ,发布消息给交换机
package cn.itcast.mq.spring; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; /** * @author diao 2022/5/13 */ @RunWith(SpringRunner.class) @EnableRabbit @SpringBootTest public class SpringAmqpTest { @Autowired private RabbitTemplate rabbitTemplate; // @Test // public void testSendMessageSimpleQueue(){ // String queueName="simple.queue"; // String messgae="Hello,生日快乐"; // rabbitTemplate.convertAndSend(queueName,messgae); // } @Test public void testSendMessageWorkQueue() throws InterruptedException { String queueName="simple.queue"; String messgae="Fairy同学,祝你生日快乐--"; for (int i = 0; i < 50; i++) { rabbitTemplate.convertAndSend(queueName,messgae+i); Thread.sleep(20); } } /** * 发送信息给交换机exchange */ @Test public void testSendFanoutExchange(){ //1.交换机名称 String exchangeName="itcast.fanout"; //2.消息 String message="hello,every one!"; //3.发送消息 rabbitTemplate.convertAndSend(exchangeName,"",message); } }
总结:
DirectExchang:可以根据规则路由到指定的消息队列
1.在consumer服务中声明Exchange交换机与不同key之消息队列的绑定,利用@RabbitListener->还监听了一波消息队列
对比与之前的Fanout,它们的队列以及交换机是在一个配置类中定义并绑定的,利用了Binding,并注入容器
Direct交换机与Fanout交换机之间的差异+@RabbitListener注解中声明队列与交换机的常见注解
TopicExchange
案例:利用SpringAMQP来演示TopicExchange的使用
介绍:
两个队列:一个是中国的新闻,一个所有的新闻消息;
两个消费者分别监听这两个队列
Publisher将消息发送给交换机Exchange,然后交换机根据key(+通配符)->决定将消息路由到哪个队列中
区别:
与DirectExchange区别就是,DirectExchange没有通配符,TopicExchange有通配符:xxx.xxx
最明显的地方就是:队列与交换机进行绑定时,key不一样
消费者的监听器
@RabbitListener(bindings = @QueueBinding(
value=@Queue(name = "topic.queue1"),
exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void listenTopicQueue1(String msg){
System.out.println("小二接收到的topic.queue1的消息是:"+msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name="topic.queue2"),
exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),
key = "#.news"
))
public void listenTopicQueue2(String msg){
System.err.println("小二接收到的topic.queue2的消息是:"+msg);
}
服务者测试方,发布消息:
@Test
public void testSendTopicExchange(){
//交换机名称
String exchangeName="itcast.topic";
//消息
String message="世界最帅前一百名,Fairy同学成功进入";
//将消息发送到交换机
rabbitTemplate.convertAndSend(exchangeName,"china.weather",message);
}
消息转换器
场景: 因为我们Publisher服务者发送消息到消息队列(将消息序列化为二进制),消息是乱码的——>因为消息类型content-type是java序列化变来的类型:缺点:消息体大、传输速度慢、安全问题、占内存;
处理:
1.我们定义一个bean,利用boot的自动配置思想,将默认的替换
@Bean
public Jackson2JsonMessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
2.然后导入依赖,Jacksonxxxx
<!--序列化-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
3.Publisher发送消息给到消息队列
//Publisher发送消息给到队列object.queue
@Test
public void testSendObjectQueue(){
HashMap<Object, Object> msg = new HashMap<>();
msg.put("name","liuyan");
msg.put("age",21);
//发送消息到队列
rabbitTemplate.convertAndSend("object.queue",msg);
}
最后
以上就是纯情黑猫为你收集整理的SpringAMQP的全部内容,希望文章能够帮你解决SpringAMQP所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复