我是靠谱客的博主 飞快舞蹈,这篇文章主要介绍redis实现普通消息队列与延迟消息队列1.redis实现普通消息队列2.redis实现延迟消息队列,现在分享给大家,希望可以做个参考。

redis实现普通消息队列与延迟消息队列

  • 1.redis实现普通消息队列
    • 1.1 实现原理
    • 1.2 pom.xml
    • 1.3 JedisUtils工具类
    • 1.4 消息类
    • 1.4 消息队列类
    • 1.5 消息入队测试
    • 1.5 消息出队测试
  • 2.redis实现延迟消息队列
    • 2.1 实现原理
    • 2.2 pom.xml
    • 2.2 JedisUtils工具类
    • 2.3 消息类
    • 2.4 延迟消息队列类
    • 2.5 消息入队测试
    • 2.6 消息出队测试

1.redis实现普通消息队列

1.1 实现原理

利用lpush/rpush和lpop/blop(阻塞式)/rpop/brpop(阻塞式)来实现!

1.2 pom.xml

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>distribute-lock</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <!--json依赖--> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.12.3</version> </dependency> <!--jedis依赖--> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.2.0</version> <type>jar</type> <scope>compile</scope> </dependency> </dependencies> </project>

1.3 JedisUtils工具类

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.yl; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; public class JedisUtils { private static JedisPool jedisPool = null; public static Jedis getJedisObject() { if (jedisPool == null) { GenericObjectPoolConfig config = new GenericObjectPoolConfig(); //最大空闲数 config.setMaxIdle(400); //最大连接数 config.setMaxTotal(2000); //连接最大等待时间,-1代表没有限制 config.setMaxWaitMillis(300000); /** * 配置连接池的地址,端口号,超时时间,密码 */ jedisPool = new JedisPool(config,"192.168.244.129",6379,30000,"root123"); } try { //通过连接池获取jedis对象 Jedis jedis = jedisPool.getResource(); jedis.auth("root123"); return jedis; } catch (Exception e) { e.printStackTrace(); return null; } } }

1.4 消息类

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package com.yl; /** * 消息对象 */ public class Message { private String id; private Object object; public String getId() { return id; } public void setId(String id) { this.id = id; } public Object getObject() { return object; } public void setObject(Object object) { this.object = object; } @Override public String toString() { return "Message{" + "id='" + id + ''' + ", object=" + object + '}'; } }

1.4 消息队列类

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package com.yl; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import redis.clients.jedis.Jedis; import java.util.*; /** * 消息队列 */ public class MessageQueue { private Jedis jedis; private String queue; public MessageQueue(Jedis jedis, String queue) { this.jedis = jedis; this.queue = queue; } /** * 消息入队 * @param list 要发送的消息集合 */ public void queue(List<Message> list) { List<String> strs = new ArrayList<>(); for (Message message : list) { //构造消息对象 message.setId(UUID.randomUUID().toString()); //序列化 ObjectMapper objectMapper = new ObjectMapper(); try { String str = objectMapper.writeValueAsString(message); strs.add(str); } catch (JsonProcessingException e) { e.printStackTrace(); } } String[] arrs = null; if (!strs.isEmpty()) { arrs = new String[strs.size()]; for (int i = 0; i < strs.size(); i++) { arrs[i] = strs.get(i); } } if (arrs != null) { System.out.println("message start send..."+new Date()); //发送消息 jedis.lpush(queue,arrs); } } /** * 消息消费 */ public void cosume() { //如果当前线程没有被打断,就一直读 while (true) { //会一直读取 //String s = jedis.lpop(queue); //阻塞式弹出元素,如果没有读取到数据,会睡眠指定时间,如果达到了睡眠时间后,list中任然没有数据进来,则会直接抛出! List<String> list = jedis.blpop(5, queue); System.out.println("receive message"+list.toString()); } } }

1.5 消息入队测试

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.yl; import redis.clients.jedis.Jedis; import java.util.ArrayList; import java.util.List; public class Test1 { public static void main(String[] args) { Jedis jedis = JedisUtils.getJedisObject(); MessageQueue queue = new MessageQueue(jedis,"myQueue"); //生产者 new Thread(){ @Override public void run() { List<Message> list = new ArrayList<>(); for (int i = 0; i < 10; i++) { Message message = new Message(); message.setObject("yl===message"+i); list.add(message); } queue.queue(list); } }.start(); } }

结果
在这里插入图片描述

1.5 消息出队测试

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package com.yl; import redis.clients.jedis.Jedis; public class Test2 { public static void main(String[] args) { Jedis jedis = JedisUtils.getJedisObject(); MessageQueue queue = new MessageQueue(jedis,"myQueue"); //消费者 new Thread(){ @Override public void run() { queue.cosume(); } }.start(); } }

结果
在这里插入图片描述

2.redis实现延迟消息队列

2.1 实现原理

通过zset,使用当前时间戳作为score可以实现延迟消息队列,读取数据的时候会延迟读取!

2.2 pom.xml

同上

2.2 JedisUtils工具类

同上

2.3 消息类

同上

2.4 延迟消息队列类

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package com.yl; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import redis.clients.jedis.Jedis; import java.util.Date; import java.util.Set; import java.util.UUID; /** * 延迟消息队列 */ public class DelayMessageQueue { private Jedis jedis; private String queue; public DelayMessageQueue(Jedis jedis, String queue) { this.jedis = jedis; this.queue = queue; } /** * 消息入队 * @param object 要发送的消息 */ public void queue(Object object) { //构造消息对象 Message message = (Message)object; message.setId(UUID.randomUUID().toString()); //序列化 ObjectMapper objectMapper = new ObjectMapper(); try { String str = objectMapper.writeValueAsString(message); System.out.println("message start send..."+new Date()); //发送消息,score延迟五秒 jedis.zadd(queue,System.currentTimeMillis() + 5000,str); } catch (JsonProcessingException e) { e.printStackTrace(); } } /** * 消息消费 */ public void cosume() { //如果当前线程没有被打断,就一直读 while (!Thread.interrupted()) { //读取score在0到当前时间戳之间的消息,每次读取一条数据出来 Set<String> zset = jedis.zrangeByScore(queue, 0, System.currentTimeMillis(), 0, 1); if (zset.isEmpty()) { try { //如果消息是空的,则睡眠一段时间继续读取 Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); break; } continue; } //如果读取到了消息,直接显示出来 String next = zset.iterator().next(); //从zset中移除消息,并且打印出来 if (jedis.zrem(queue,next) > 0) { //抢到了,就在下面处理业务 try { ObjectMapper objectMapper = new ObjectMapper(); Message message = objectMapper.readValue(next, Message.class); System.out.println("receive message:"+message + new Date()); } catch (JsonProcessingException e) { e.printStackTrace(); } } } } }

2.5 消息入队测试

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.yl; import redis.clients.jedis.Jedis; import java.util.ArrayList; import java.util.List; public class Test3 { public static void main(String[] args) { Jedis jedis = JedisUtils.getJedisObject(); DelayMessageQueue queue = new DelayMessageQueue(jedis,"myDelayQueue"); //生产者 new Thread(){ @Override public void run() { for (int i = 0; i < 10; i++) { Message message = new Message(); message.setObject("yl===message"+i); queue.queue(message); }; } }.start(); } }

结果
在这里插入图片描述

2.6 消息出队测试

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.yl; import redis.clients.jedis.Jedis; import java.util.ArrayList; import java.util.List; public class Test4 { public static void main(String[] args) { Jedis jedis = JedisUtils.getJedisObject(); DelayMessageQueue queue = new DelayMessageQueue(jedis,"myDelayQueue"); //生产者 new Thread(){ @Override public void run() { queue.cosume(); } }.start(); } }

结果,5秒后才读取到消息
在这里插入图片描述

最后

以上就是飞快舞蹈最近收集整理的关于redis实现普通消息队列与延迟消息队列1.redis实现普通消息队列2.redis实现延迟消息队列的全部内容,更多相关redis实现普通消息队列与延迟消息队列1内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(57)

评论列表共有 0 条评论

立即
投稿
返回
顶部