我是靠谱客的博主 飞快舞蹈,最近开发中收集的这篇文章主要介绍redis实现普通消息队列与延迟消息队列1.redis实现普通消息队列2.redis实现延迟消息队列,觉得挺不错的,现在分享给大家,希望可以做个参考。
概述
redis实现普通消息队列与延迟消息队列
- 1.redis实现普通消息队列
- 1.1 实现原理
- 1.2 pom.xml
- 1.3 JedisUtils工具类
- 1.4 消息类
- 1.4 消息队列类
- 1.5 消息入队测试
- 1.5 消息出队测试
- 2.redis实现延迟消息队列
- 2.1 实现原理
- 2.2 pom.xml
- 2.2 JedisUtils工具类
- 2.3 消息类
- 2.4 延迟消息队列类
- 2.5 消息入队测试
- 2.6 消息出队测试
1.redis实现普通消息队列
1.1 实现原理
利用lpush/rpush和lpop/blop(阻塞式)/rpop/brpop(阻塞式)来实现!
1.2 pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>distribute-lock</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<!--json依赖-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.3</version>
</dependency>
<!--jedis依赖-->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.2.0</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
</dependencies>
</project>
1.3 JedisUtils工具类
package com.yl;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
public class JedisUtils {
private static JedisPool jedisPool = null;
public static Jedis getJedisObject() {
if (jedisPool == null) {
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
//最大空闲数
config.setMaxIdle(400);
//最大连接数
config.setMaxTotal(2000);
//连接最大等待时间,-1代表没有限制
config.setMaxWaitMillis(300000);
/**
* 配置连接池的地址,端口号,超时时间,密码
*/
jedisPool = new JedisPool(config,"192.168.244.129",6379,30000,"root123");
}
try {
//通过连接池获取jedis对象
Jedis jedis = jedisPool.getResource();
jedis.auth("root123");
return jedis;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
}
1.4 消息类
package com.yl;
/**
* 消息对象
*/
public class Message {
private String id;
private Object object;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Object getObject() {
return object;
}
public void setObject(Object object) {
this.object = object;
}
@Override
public String toString() {
return "Message{" +
"id='" + id + ''' +
", object=" + object +
'}';
}
}
1.4 消息队列类
package com.yl;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import redis.clients.jedis.Jedis;
import java.util.*;
/**
* 消息队列
*/
public class MessageQueue {
private Jedis jedis;
private String queue;
public MessageQueue(Jedis jedis, String queue) {
this.jedis = jedis;
this.queue = queue;
}
/**
* 消息入队
* @param list 要发送的消息集合
*/
public void queue(List<Message> list) {
List<String> strs = new ArrayList<>();
for (Message message : list) {
//构造消息对象
message.setId(UUID.randomUUID().toString());
//序列化
ObjectMapper objectMapper = new ObjectMapper();
try {
String str = objectMapper.writeValueAsString(message);
strs.add(str);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
String[] arrs = null;
if (!strs.isEmpty()) {
arrs = new String[strs.size()];
for (int i = 0; i < strs.size(); i++) {
arrs[i] = strs.get(i);
}
}
if (arrs != null) {
System.out.println("message start send..."+new Date());
//发送消息
jedis.lpush(queue,arrs);
}
}
/**
* 消息消费
*/
public void cosume() {
//如果当前线程没有被打断,就一直读
while (true) {
//会一直读取
//String s = jedis.lpop(queue);
//阻塞式弹出元素,如果没有读取到数据,会睡眠指定时间,如果达到了睡眠时间后,list中任然没有数据进来,则会直接抛出!
List<String> list = jedis.blpop(5, queue);
System.out.println("receive message"+list.toString());
}
}
}
1.5 消息入队测试
package com.yl;
import redis.clients.jedis.Jedis;
import java.util.ArrayList;
import java.util.List;
public class Test1 {
public static void main(String[] args) {
Jedis jedis = JedisUtils.getJedisObject();
MessageQueue queue = new MessageQueue(jedis,"myQueue");
//生产者
new Thread(){
@Override
public void run() {
List<Message> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Message message = new Message();
message.setObject("yl===message"+i);
list.add(message);
}
queue.queue(list);
}
}.start();
}
}
结果
1.5 消息出队测试
package com.yl;
import redis.clients.jedis.Jedis;
public class Test2 {
public static void main(String[] args) {
Jedis jedis = JedisUtils.getJedisObject();
MessageQueue queue = new MessageQueue(jedis,"myQueue");
//消费者
new Thread(){
@Override
public void run() {
queue.cosume();
}
}.start();
}
}
结果
2.redis实现延迟消息队列
2.1 实现原理
通过zset,使用当前时间戳作为score可以实现延迟消息队列,读取数据的时候会延迟读取!
2.2 pom.xml
同上
2.2 JedisUtils工具类
同上
2.3 消息类
同上
2.4 延迟消息队列类
package com.yl;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import redis.clients.jedis.Jedis;
import java.util.Date;
import java.util.Set;
import java.util.UUID;
/**
* 延迟消息队列
*/
public class DelayMessageQueue {
private Jedis jedis;
private String queue;
public DelayMessageQueue(Jedis jedis, String queue) {
this.jedis = jedis;
this.queue = queue;
}
/**
* 消息入队
* @param object 要发送的消息
*/
public void queue(Object object) {
//构造消息对象
Message message = (Message)object;
message.setId(UUID.randomUUID().toString());
//序列化
ObjectMapper objectMapper = new ObjectMapper();
try {
String str = objectMapper.writeValueAsString(message);
System.out.println("message start send..."+new Date());
//发送消息,score延迟五秒
jedis.zadd(queue,System.currentTimeMillis() + 5000,str);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
/**
* 消息消费
*/
public void cosume() {
//如果当前线程没有被打断,就一直读
while (!Thread.interrupted()) {
//读取score在0到当前时间戳之间的消息,每次读取一条数据出来
Set<String> zset = jedis.zrangeByScore(queue, 0, System.currentTimeMillis(), 0, 1);
if (zset.isEmpty()) {
try {
//如果消息是空的,则睡眠一段时间继续读取
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
continue;
}
//如果读取到了消息,直接显示出来
String next = zset.iterator().next();
//从zset中移除消息,并且打印出来
if (jedis.zrem(queue,next) > 0) {
//抢到了,就在下面处理业务
try {
ObjectMapper objectMapper = new ObjectMapper();
Message message = objectMapper.readValue(next, Message.class);
System.out.println("receive message:"+message + new Date());
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
}
}
}
2.5 消息入队测试
package com.yl;
import redis.clients.jedis.Jedis;
import java.util.ArrayList;
import java.util.List;
public class Test3 {
public static void main(String[] args) {
Jedis jedis = JedisUtils.getJedisObject();
DelayMessageQueue queue = new DelayMessageQueue(jedis,"myDelayQueue");
//生产者
new Thread(){
@Override
public void run() {
for (int i = 0; i < 10; i++) {
Message message = new Message();
message.setObject("yl===message"+i);
queue.queue(message);
};
}
}.start();
}
}
结果
2.6 消息出队测试
package com.yl;
import redis.clients.jedis.Jedis;
import java.util.ArrayList;
import java.util.List;
public class Test4 {
public static void main(String[] args) {
Jedis jedis = JedisUtils.getJedisObject();
DelayMessageQueue queue = new DelayMessageQueue(jedis,"myDelayQueue");
//生产者
new Thread(){
@Override
public void run() {
queue.cosume();
}
}.start();
}
}
结果,5秒后才读取到消息
最后
以上就是飞快舞蹈为你收集整理的redis实现普通消息队列与延迟消息队列1.redis实现普通消息队列2.redis实现延迟消息队列的全部内容,希望文章能够帮你解决redis实现普通消息队列与延迟消息队列1.redis实现普通消息队列2.redis实现延迟消息队列所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复