Java Client
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99package com.fang.java_client.fanout_3; import com.fang.java_client.utils.RabbitMQUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; /** * fanout 扇出 也称 广播 * * @Author Mr. Sun. * @Date 2020-11-12 18:27 */ public class Provider { public static void main(String[] args) throws Exception { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); // 将通道声明指定交换机 // exchange 交换机名称 // fanout 代表广播类型 channel.exchangeDeclare("test_fanout", "fanout"); // 发送消息 channel.basicPublish("test_fanout", "", null, "hello fanout".getBytes()); RabbitMQUtils.closeConnectAndChannel(channel,connection); } } package com.fang.java_client.fanout_3; import com.fang.java_client.utils.RabbitMQUtils; import com.rabbitmq.client.*; import lombok.SneakyThrows; import java.io.IOException; /** * @Author Mr. Sun. * @Date 2020-11-12 17:35 */ public class Consumer_1 { public static void main(String[] args) throws Exception { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); // 通道绑定交换机 channel.exchangeDeclare("test_fanout", "fanout"); // 通道绑定队列 [ 临时队列 ] String queue = channel.queueDeclare().getQueue(); // 绑定交换机和队列 channel.queueBind(queue, "test_fanout", ""); channel.basicConsume(queue, true, new DefaultConsumer(channel) { @SneakyThrows @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1:"+new String(body)); } }); } } ------------------------------------------ package com.fang.java_client.fanout_3; import com.fang.java_client.utils.RabbitMQUtils; import com.rabbitmq.client.*; import lombok.SneakyThrows; import java.io.IOException; /** * @Author Mr. Sun. * @Date 2020-11-12 17:35 */ public class Consumer_2 { public static void main(String[] args) throws Exception { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); // 通道绑定交换机 channel.exchangeDeclare("test_fanout", "fanout"); // 通道绑定队列 [ 临时队列 ] String queue = channel.queueDeclare().getQueue(); // 绑定交换机和队列 channel.queueBind(queue, "test_fanout", ""); channel.basicConsume(queue, true, new DefaultConsumer(channel) { @SneakyThrows @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2:"+new String(body)); } }); } }
SpringBoot
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28@Test// fanout 发布订阅 广播 模型 void test_fanout() { rabbitTemplate.convertAndSend("logs", null, "fanout 模型生成的消息"); } // fanout 模型================================== // fanout 临时队列不需要声明队列了 @RabbitListener( bindings = @QueueBinding( value = @Queue,//创建临时队列 exchange = @Exchange(value = "logs",type = "fanout")// 指定要绑定的交换机 ) ) private void receiveMsgFanout1(String msg){ System.out.println("fanout_1:"+msg); } @RabbitListener( bindings = @QueueBinding( value = @Queue,//创建临时队列 exchange = @Exchange(value = "logs",type = "fanout")// 指定要绑定的交换机 ) ) private void receiveMsgFanout2(String msg){ System.out.println("fanout_2:"+msg); }
最后
以上就是自觉含羞草最近收集整理的关于RabbitMQ(三)JavaClient SpringBoot集成 Publish/Subscribe的全部内容,更多相关RabbitMQ(三)JavaClient内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复