我是靠谱客的博主 自觉含羞草,最近开发中收集的这篇文章主要介绍RabbitMQ(三)JavaClient SpringBoot集成 Publish/Subscribe,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

Java Client

package com.fang.java_client.fanout_3;

import com.fang.java_client.utils.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * fanout  扇出 也称 广播
 *
 * @Author Mr. Sun.
 * @Date 2020-11-12 18:27
 */
public class Provider {
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        // 将通道声明指定交换机
        // exchange 交换机名称
        // fanout 代表广播类型
        channel.exchangeDeclare("test_fanout", "fanout");
        // 发送消息
        channel.basicPublish("test_fanout", "", null, "hello fanout".getBytes());

        RabbitMQUtils.closeConnectAndChannel(channel,connection);
    }
}


package com.fang.java_client.fanout_3;

import com.fang.java_client.utils.RabbitMQUtils;
import com.rabbitmq.client.*;
import lombok.SneakyThrows;

import java.io.IOException;

/**
 * @Author Mr. Sun.
 * @Date 2020-11-12 17:35
 */
public class Consumer_1 {
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();

        // 通道绑定交换机
        channel.exchangeDeclare("test_fanout", "fanout");
        // 通道绑定队列 [ 临时队列 ]
        String queue = channel.queueDeclare().getQueue();
        // 绑定交换机和队列
        channel.queueBind(queue, "test_fanout", "");

        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @SneakyThrows
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1:"+new String(body));
            }
        });
    }
}

------------------------------------------
package com.fang.java_client.fanout_3;

import com.fang.java_client.utils.RabbitMQUtils;
import com.rabbitmq.client.*;
import lombok.SneakyThrows;

import java.io.IOException;

/**
 * @Author Mr. Sun.
 * @Date 2020-11-12 17:35
 */
public class Consumer_2 {
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();

        // 通道绑定交换机
        channel.exchangeDeclare("test_fanout", "fanout");
        // 通道绑定队列 [ 临时队列 ]
        String queue = channel.queueDeclare().getQueue();
        // 绑定交换机和队列
        channel.queueBind(queue, "test_fanout", "");

        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @SneakyThrows
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2:"+new String(body));
            }
        });
    }
}


SpringBoot

 	@Test// fanout 发布订阅 广播 模型
    void test_fanout() {
        rabbitTemplate.convertAndSend("logs", null, "fanout 模型生成的消息");
    }


	// fanout 模型==================================
    // fanout 临时队列不需要声明队列了
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue,//创建临时队列
                    exchange = @Exchange(value = "logs",type = "fanout")// 指定要绑定的交换机
            )
    )
    private void receiveMsgFanout1(String msg){
        System.out.println("fanout_1:"+msg);
    }

    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue,//创建临时队列
                    exchange = @Exchange(value = "logs",type = "fanout")// 指定要绑定的交换机
            )
    )
    private void receiveMsgFanout2(String msg){
        System.out.println("fanout_2:"+msg);
    }

最后

以上就是自觉含羞草为你收集整理的RabbitMQ(三)JavaClient SpringBoot集成 Publish/Subscribe的全部内容,希望文章能够帮你解决RabbitMQ(三)JavaClient SpringBoot集成 Publish/Subscribe所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(43)

评论列表共有 0 条评论

立即
投稿
返回
顶部