我是靠谱客的博主 自觉含羞草,最近开发中收集的这篇文章主要介绍RabbitMQ(三)JavaClient SpringBoot集成 Publish/Subscribe,觉得挺不错的,现在分享给大家,希望可以做个参考。
概述
Java Client
package com.fang.java_client.fanout_3;
import com.fang.java_client.utils.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* fanout 扇出 也称 广播
*
* @Author Mr. Sun.
* @Date 2020-11-12 18:27
*/
public class Provider {
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
// 将通道声明指定交换机
// exchange 交换机名称
// fanout 代表广播类型
channel.exchangeDeclare("test_fanout", "fanout");
// 发送消息
channel.basicPublish("test_fanout", "", null, "hello fanout".getBytes());
RabbitMQUtils.closeConnectAndChannel(channel,connection);
}
}
package com.fang.java_client.fanout_3;
import com.fang.java_client.utils.RabbitMQUtils;
import com.rabbitmq.client.*;
import lombok.SneakyThrows;
import java.io.IOException;
/**
* @Author Mr. Sun.
* @Date 2020-11-12 17:35
*/
public class Consumer_1 {
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
// 通道绑定交换机
channel.exchangeDeclare("test_fanout", "fanout");
// 通道绑定队列 [ 临时队列 ]
String queue = channel.queueDeclare().getQueue();
// 绑定交换机和队列
channel.queueBind(queue, "test_fanout", "");
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
@SneakyThrows
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1:"+new String(body));
}
});
}
}
------------------------------------------
package com.fang.java_client.fanout_3;
import com.fang.java_client.utils.RabbitMQUtils;
import com.rabbitmq.client.*;
import lombok.SneakyThrows;
import java.io.IOException;
/**
* @Author Mr. Sun.
* @Date 2020-11-12 17:35
*/
public class Consumer_2 {
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
// 通道绑定交换机
channel.exchangeDeclare("test_fanout", "fanout");
// 通道绑定队列 [ 临时队列 ]
String queue = channel.queueDeclare().getQueue();
// 绑定交换机和队列
channel.queueBind(queue, "test_fanout", "");
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
@SneakyThrows
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2:"+new String(body));
}
});
}
}
SpringBoot
@Test// fanout 发布订阅 广播 模型
void test_fanout() {
rabbitTemplate.convertAndSend("logs", null, "fanout 模型生成的消息");
}
// fanout 模型==================================
// fanout 临时队列不需要声明队列了
@RabbitListener(
bindings = @QueueBinding(
value = @Queue,//创建临时队列
exchange = @Exchange(value = "logs",type = "fanout")// 指定要绑定的交换机
)
)
private void receiveMsgFanout1(String msg){
System.out.println("fanout_1:"+msg);
}
@RabbitListener(
bindings = @QueueBinding(
value = @Queue,//创建临时队列
exchange = @Exchange(value = "logs",type = "fanout")// 指定要绑定的交换机
)
)
private void receiveMsgFanout2(String msg){
System.out.println("fanout_2:"+msg);
}
最后
以上就是自觉含羞草为你收集整理的RabbitMQ(三)JavaClient SpringBoot集成 Publish/Subscribe的全部内容,希望文章能够帮你解决RabbitMQ(三)JavaClient SpringBoot集成 Publish/Subscribe所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复