概述
- Redis实现消息队列的缺点
- 多消费者场景下,如果某个消费者宕机,当他重连上时,宕机期间的消息会全部丢失
- Redis中PubSub的消息不会被持久化,所以Redis宕机后所有的消息都会被丢弃。
- Redis5.0后新增了Stream数据结构,带来了可持久化的消息队列,后续再补充。
- 使用list数据结构实现一个简单的消息队列
- list数据结构提供一种阻塞读取的方式blpop/rlpop,使用该接口可以再队列没有消息时先休眠,有消息过来时再立即苏醒,消息几乎无延迟。阻塞超过设定的timeout后会返回一个空值。
- 代码实现:
package com.xliu.chapter1; import redis.clients.jedis.Jedis; import java.sql.Time; import java.util.List; import java.util.concurrent.TimeUnit; /** * @author liuxin * @version 1.0 * @date 2020/4/23 10:41 */ public class MessageQueueTest { public static void main(String[] args) throws InterruptedException { Thread consumer = new Thread(){ @Override public void run() { MyConsumer consumer = new MyConsumer(new Jedis("192.168.198.128"),"queue"); while(true){ List<String> consume = consumer.consume(); if(consume == null || consume.size() == 0){ System.out.println("队列为空"); try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } }else{ System.out.println(consume.get(0)+ " "+ consume.get(1)); } } } }; Thread producer = new Thread(){ @Override public void run() { MyProducer producer = new MyProducer(new Jedis("192.168.198.128"),"queue"); for(int i=0;i<10;i++){ producer.produce("我是"+Thread.currentThread().getName() +"的第"+i+"条消息"); try { TimeUnit.MILLISECONDS.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }; Thread producer2 = new Thread(){ @Override public void run() { MyProducer producer = new MyProducer(new Jedis("192.168.198.128"),"queue"); for(int i=0;i<10;i++){ producer.produce("我是"+Thread.currentThread().getName() +"的第"+i+"条消息"); try { TimeUnit.MILLISECONDS.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }; consumer.start(); TimeUnit.SECONDS.sleep(3); producer.start(); producer2.start(); consumer.join(); producer.join(); } private static class MyConsumer { private Jedis jedis; private String queueKey; public MyConsumer(Jedis jedis,String queueKey) { this.jedis = jedis; this.queueKey = queueKey; } public List<String> consume(){ return jedis.blpop(1,queueKey); } } private static class MyProducer { private Jedis jedis; private String queueKey; public MyProducer(Jedis jedis, String queueKey) { this.jedis = jedis; this.queueKey = queueKey; } public long produce(String message){ return jedis.rpush(queueKey, message); } } }
- 运行结果:上述代码建立了两个生产者,一个消费者。
队列为空 队列为空 queue 我是Thread-2的第0条消息 queue 我是Thread-1的第0条消息 queue 我是Thread-2的第1条消息 queue 我是Thread-1的第1条消息 queue 我是Thread-1的第2条消息 queue 我是Thread-2的第2条消息 queue 我是Thread-2的第3条消息 queue 我是Thread-1的第3条消息 queue 我是Thread-1的第4条消息 queue 我是Thread-2的第4条消息 queue 我是Thread-2的第5条消息 queue 我是Thread-1的第5条消息 queue 我是Thread-1的第6条消息 queue 我是Thread-2的第6条消息 queue 我是Thread-1的第7条消息 queue 我是Thread-2的第7条消息 queue 我是Thread-1的第8条消息 queue 我是Thread-2的第8条消息 queue 我是Thread-1的第9条消息 queue 我是Thread-2的第9条消息
- 通过订阅功能PubSub实现一个简单的消息队列
- 客户端发起订阅命令后,Redis会返回消息通知,告知订阅结果。生产者发布的消息会发送给订阅了该模块的消费者。
- 代码实现:
package com.xliu.chapter1; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPubSub; import java.util.concurrent.TimeUnit; /** * @author liuxin * @version 1.0 * @date 2020/4/23 11:27 */ public class PubSubTest { public static void main(String[] args) throws InterruptedException { Thread consumer = new Thread() { @Override public void run() { Jedis jedis = null; try { jedis = new Jedis("192.168.198.128"); jedis.subscribe(new MsgPubSubListener(), "codehole","codehole2"); } catch (Exception e) { e.printStackTrace(); } finally { if(jedis!=null){ jedis.disconnect(); } } } }; Thread producer = new Thread() { @Override public void run() { Jedis jedis = null; try { jedis = new Jedis("192.168.198.128"); jedis.publish("codehole","python"); jedis.publish("codehole","java"); jedis.publish("codehole2","python"); jedis.publish("codehole2","java"); TimeUnit.SECONDS.sleep(1); jedis.publish("codehole","python1"); jedis.publish("codehole","java1"); } catch (Exception e) { e.printStackTrace(); } finally { if(jedis!=null){ jedis.disconnect(); } } } }; consumer.start(); TimeUnit.SECONDS.sleep(1); producer.start(); consumer.join(); producer.join(); } static class MsgPubSubListener extends JedisPubSub { @Override public void onMessage(String channel, String message) { System.out.println(channel + " 发布了 " + message); } @Override public void onSubscribe(String channel, int subscribedChannels) { System.out.println(channel + "订阅了频道:" + subscribedChannels); } } }
- 实验结果:
codehole订阅了频道:1 codehole2订阅了频道:2 codehole 发布了 python codehole 发布了 java codehole2 发布了 python codehole2 发布了 java codehole 发布了 python1 codehole 发布了 java1
最后
以上就是含糊便当为你收集整理的使用Redis实现一个简单的消息队列的全部内容,希望文章能够帮你解决使用Redis实现一个简单的消息队列所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复