概述
之前讲的消费者互相可以把队列中的消息全部读取,但是不是读完整的所有信息
那么采用订阅模式就行,这就是微信公众号的模式,
比如10个人订阅了我的公众号"BeJavaGod",当我发送一条消息的时候,
那么这10个人都能收到我的消息并且查看,比如本条消息,对吧?
生产者制造消息发送给交换机X,而不是发送给队列,队列和交换机绑定,消费者从各自的队列中获得消息
这样则实现一个生产者发送的所有消息都能被所有的消费者同时接收到
需要注意的地方是,在生产者创建消息发送到交换机时,此时没有队列,那么消息则丢失,消费者的队列绑定后再次发送则消息传达,原理是消息必须存放在队列中
生产者:
1 public class Send { 2 3 private final static String EXCHANGE_NAME = "test_exchange_fanout"; 4 5 public static void main(String[] argv) throws Exception { 6 // 获取到连接以及mq通道 7 Connection connection = ConnectionUtil.getConnection(); 8 Channel channel = connection.createChannel(); 9 10 // 声明exchange 11 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); 12 13 // 消息内容 14 String message = "id=1001"; 15 channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); 16 System.out.println(" [x] Sent '" + message + "'"); 17 18 channel.close(); 19 connection.close(); 20 } 21 }
消费者1
1 public class Recv { 2 3 private final static String QUEUE_NAME = "test_queue_fanout_1"; 4 5 private final static String EXCHANGE_NAME = "test_exchange_fanout"; 6 7 public static void main(String[] argv) throws Exception { 8 9 // 获取到连接以及mq通道 10 Connection connection = ConnectionUtil.getConnection(); 11 Channel channel = connection.createChannel(); 12 13 // 声明队列 14 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 15 16 // 绑定队列到交换机 17 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); 18 19 // 同一时刻服务器只会发一条消息给消费者 20 channel.basicQos(1); 21 22 // 定义队列的消费者 23 QueueingConsumer consumer = new QueueingConsumer(channel); 24 // 监听队列,手动返回完成 25 channel.basicConsume(QUEUE_NAME, false, consumer); 26 27 // 获取消息 28 while (true) { 29 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 30 String message = new String(delivery.getBody()); 31 System.out.println(" [x] Received '" + message + "'"); 32 Thread.sleep(10); 33 34 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 35 } 36 } 37 }
消费者2
1 public class Recv2 { 2 3 private final static String QUEUE_NAME = "test_queue_fanout_2"; 4 5 private final static String EXCHANGE_NAME = "test_exchange_fanout"; 6 7 public static void main(String[] argv) throws Exception { 8 9 // 获取到连接以及mq通道 10 Connection connection = ConnectionUtil.getConnection(); 11 Channel channel = connection.createChannel(); 12 13 // 声明队列 14 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 15 16 // 绑定队列到交换机 17 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); 18 19 // 同一时刻服务器只会发一条消息给消费者 20 channel.basicQos(1); 21 22 // 定义队列的消费者 23 QueueingConsumer consumer = new QueueingConsumer(channel); 24 // 监听队列,手动返回完成 25 channel.basicConsume(QUEUE_NAME, false, consumer); 26 27 // 获取消息 28 while (true) { 29 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 30 String message = new String(delivery.getBody()); 31 System.out.println(" [x] Received '" + message + "'"); 32 Thread.sleep(10); 33 34 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 35 } 36 } 37 }
ok,这样就是最简单的订阅demo
最后
以上就是淡定灯泡为你收集整理的RabbitMQ 一二事(3) - 订阅模式(微信公众号模式)的应用的全部内容,希望文章能够帮你解决RabbitMQ 一二事(3) - 订阅模式(微信公众号模式)的应用所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复