我是靠谱客的博主 苹果咖啡豆,最近开发中收集的这篇文章主要介绍RabbitMQ:发布确认模式1.基本介绍2.实现消息可靠传递的三个条件3.发布确认模式,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

✨ RabbitMQ:发布确认模式

  • 1.基本介绍
  • 2.实现消息可靠传递的三个条件
    • 2.1队列持久化
    • 2.2消息持久化
    • 2.3发布确认
  • 3.发布确认模式
    • 3.1单个确认发布模式
    • 3.2批量确认发布模式
    • 3.3异步确认发布模式

????个人主页:不断前进的皮卡丘
????博客描述:梦想也许遥不可及,但重要的是追梦的过程,用博客记录自己的成长,记录自己一步一步向上攀登的印记
????个人专栏:消息中间件

1.基本介绍

在这里插入图片描述

生产者把信道设置成为confirm(确认)模式,一旦信道进入confirm模式,所有在这个信道上面发布的消息都会被指定唯一的一个ID(ID从1开始).一旦消息被投递到所有匹配的队列以后,broker就会发送一个确认给生产者(包含ID),这样使得生产者知道消息已经正确到底目的队列了。如果消息和队列是可持久化的,那么确认消息就会在消息被写入磁盘以后发出,broker回传给生产者的确认消息中delivery-tag包含了确认消息的序列号。

  • 在这里插入图片描述

2.实现消息可靠传递的三个条件

2.1队列持久化

生产者发送消息到队列的时候,把durable参数设置为true(表示队列持久化)

// 参数1 queue :队列名
// 参数2 durable :是否持久化
// 参数3 exclusive :仅创建者可以使用的私有队列,断开后自动删除
// 参数4 autoDelete : 当所有消费客户端连接断开后,是否自动删除队列
// 参数5 arguments
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

2.2消息持久化

我们需要将消息标记为持久性 - 通过将消息属性(实现基本属性)设置为PERSISTENT_TEXT_PLAIN的值。

//交换机名称,队列名称,消息持久化,消息
channel.basicPublish("", "task_queue",
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());

2.3发布确认

  • 队列接收到生产者发送的数据以后,队列把消息保存在磁盘(为了实现持久化),队列会把最终的可靠性传递结果告诉给生产者,这就是发布确认。
  • 三种常用的发布确认策略:单个确认发布、批量确认发布、异步确认发布

3.发布确认模式

RabbitMQ的发布确认模式默认是没有开启的,我们可以通过调用channel.confirmSelect()方法来手动开启发布确认模式。

3.1单个确认发布模式

  • 单个确认发布模式是一种简单的同步确认发布的方式。也就是说发布一个消息以后,只要确认它被确认发布,才可以继续发布后续的消息。
  • waitForConfirms(long)这一个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认,就会抛出异常。
  • 缺点:速度慢,因为如果没有确认消息的话,后面的消息都会被阻塞
public class ConfirmMessage {
    //消息数量
    public static final int MSG_CNT=200;
    public static void main(String[] args) {
        //调用单个确认发布方法
        confirmSingleMessage();
    }

    public static void confirmSingleMessage() {
        try {
            //获取信道对象
            Channel channel = ConnectUtil.getChannel();
            //开启确认发布
            channel.confirmSelect();
            //声明队列
            String queue = UUID.randomUUID().toString();
            //队列持久化
            channel.queueDeclare(queue, true, false, false, null);
            //发送消息
            long start= System.currentTimeMillis();
            for (int i = 0; i < MSG_CNT; i++) {
                String msg="消息:"+i;
                //发送消息,消息需要持久化
                channel.basicPublish("", queue, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());
                //服务端返回false或者在超时时间内没有返回数据,生产者可以重新发送消息
                boolean flag=channel.waitForConfirms();
                if (flag){
                    System.out.println("————————第"+(i+1)+"条消息发送成功————————");
                }else {
                    System.out.println("========第"+(i+1)+"条消息发送失败=========");
                }

            }
            //记录结束时间
            long end=System.currentTimeMillis();
            System.out.println("发布:"+MSG_CNT+"个单独确认消息,耗时:"+(end-start)+"毫秒");


        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }


}

在这里插入图片描述

3.2批量确认发布模式

  • 先发布一批信息然后一起确认可以大大提高吞吐量
  • 缺点:当故障发生的时候,我们不知道是哪一个消息出现了问题,我们需要把整个批处理保存在内存中,记录重要的信息后重新发布消息
  • 这种方案仍然是同步的方式,会阻塞消息的发布
public class ConfirmMessage {
    //消息数量
    public static final int MSG_CNT = 200;

    public static void main(String[] args) {
        //调用单个确认发布方法
        //confirmSingleMessage();//发布:200个单独确认消息,耗时:192毫秒
        confirmBatchMessage();
    }

    public static void confirmSingleMessage() {
        try {
            //获取信道对象
            Channel channel = ConnectUtil.getChannel();
            //开启确认发布
            channel.confirmSelect();
            //声明队列
            String queue = UUID.randomUUID().toString();
            //队列持久化
            channel.queueDeclare(queue, true, false, false, null);
            //发送消息
            long start = System.currentTimeMillis();
            for (int i = 0; i < MSG_CNT; i++) {
                String msg = "消息:" + i;
                //发送消息,消息需要持久化
                channel.basicPublish("", queue, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
                //服务端返回false或者在超时时间内没有返回数据,生产者可以重新发送消息
                boolean flag = channel.waitForConfirms();
                if (flag) {
                    System.out.println("————————第" + (i + 1) + "条消息发送成功————————");
                } else {
                    System.out.println("========第" + (i + 1) + "条消息发送失败=========");
                }

            }
            //记录结束时间
            long end = System.currentTimeMillis();
            System.out.println("发布:" + MSG_CNT + "个单独确认消息,耗时:" + (end - start) + "毫秒");


        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void confirmBatchMessage() {
        try {
            //获取信道对象
            Channel channel = ConnectUtil.getChannel();
            //开启确认发布
            channel.confirmSelect();
            //批量确认消息数量
            int batchSize=20;
            //未确认消息数量
            int nackMessageCount=0;
            //声明队列
            String queue = UUID.randomUUID().toString();
            //队列持久化
            channel.queueDeclare(queue, true, false, false, null);
            //发送消息
            long start = System.currentTimeMillis();
            for (int i = 0; i < MSG_CNT; i++) {
                String msg = "消息:" + i;
                //发送消息,消息需要持久化
                channel.basicPublish("", queue, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
                //累加未确认的发布数量
                nackMessageCount++;
                //判断的未确认消息数量和批量确认消息的数量是否一致
                if (nackMessageCount==batchSize){
                    //服务端返回false或者在超时时间内没有返回数据,生产者可以重新发送消息
                    boolean flag = channel.waitForConfirms();
                    if (flag) {
                        System.out.println("————————第" + (i + 1) + "条消息发送成功————————");
                    } else {
                        System.out.println("========第" + (i + 1) + "条消息发送失败=========");
                    }
                    //清空未确认发布消息个数
                    nackMessageCount=0;
                }
            }
            //为了确认剩下的是没有确认的消息,所以要再次进行确认
            if (nackMessageCount>0){
                //再次重新确认
                channel.waitForConfirms();
            }
            //记录结束时间
            long end = System.currentTimeMillis();
            System.out.println("发布:" + MSG_CNT + "个单独确认消息,耗时:" + (end - start) + "毫秒");


        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }


}

在这里插入图片描述

3.3异步确认发布模式

在这里插入图片描述

 //异步消息发布确认
    public static void publishMessageAsync() throws Exception {
        Channel channel = ConnectUtil.getChannel();
        //声明队列,此处使用UUID作为队列的名字
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName, false, false, false, null);
        //开启发布确认模式
        channel.confirmSelect();
        //创建ConcurrentSkipListMap集合(跳表集合)
        ConcurrentSkipListMap<Long, String> concurrentSkipListMap = new ConcurrentSkipListMap<>();
        //确认收到消息回调函数
        ConfirmCallback ackCallBack = new ConfirmCallback() {

            @Override
            public void handle(long deliveryTag, boolean multiple) throws IOException {
                //判断是否批量异步确认
                if (multiple) {
                    //把集合中没有被确认的消息添加到该集合中
                    ConcurrentNavigableMap<Long, String> confirmed = concurrentSkipListMap.headMap(deliveryTag, true);
                    //清除该部分没有被确认的消息
                    confirmed.clear();
                } else {
                    //只清除当前序列胡的消息
                    concurrentSkipListMap.remove(deliveryTag);

                }
                System.out.println("确认的消息序列序号:" + deliveryTag);
            }
        };

        //未被确认消息的回调函数
        ConfirmCallback nackCallBack = new ConfirmCallback() {
            @Override
            public void handle(long deliveryTag, boolean multiple) throws IOException {
                //获取没有被确认的消息
                String msg = concurrentSkipListMap.get(deliveryTag);
                System.out.println("发布的消息:" + msg + "未被确认,该消息序列号:" + deliveryTag);
            }
        };
        //添加异步确认监听器
        channel.addConfirmListener(ackCallBack, nackCallBack);
        //记录开始时间
        long start = System.currentTimeMillis();
        //循环发送消息
        for (int i = 0; i < MSG_CNT; i++) {
            //消息内容
            String message = "消息:" + i;
            //把未确认的消息放到集合中,通过序列号和消息进行关联
//            channel.getNextPublishSeqNo(); 获取下一个消息的序列号
            concurrentSkipListMap.put(channel.getNextPublishSeqNo(), message);
            //发送消息
            channel.basicPublish("", queueName, null, message.getBytes());

        }
        //记录结束时间
        long end = System.currentTimeMillis();
        System.out.println("发布"+MSG_CNT+"个批量确认消息,一共耗时:"+(end-start)+"毫秒");


    }

在这里插入图片描述

最后

以上就是苹果咖啡豆为你收集整理的RabbitMQ:发布确认模式1.基本介绍2.实现消息可靠传递的三个条件3.发布确认模式的全部内容,希望文章能够帮你解决RabbitMQ:发布确认模式1.基本介绍2.实现消息可靠传递的三个条件3.发布确认模式所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(73)

评论列表共有 0 条评论

立即
投稿
返回
顶部