我是靠谱客的博主 花痴棒球,这篇文章主要介绍SpringBoot+Redis+最简发布订阅,现在分享给大家,希望可以做个参考。

依赖

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>${spring-boot.version}</version>
</dependency>

redis配置

这里提供必备的配置(基本配置忽略)这里没配好的话会出现JSON解析异常乱码现象

/**
* 序列化定制
*
* @return
*/
@Bean
public Jackson2JsonRedisSerializer<Object> jackson2JsonSerializer() {
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(
Object.class);
// 初始化objectmapper
ObjectMapper mapper = new ObjectMapper();
mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(mapper);
return jackson2JsonRedisSerializer;
}
/**
* 操作模板
*
* @param connectionFactory
* @param jackson2JsonRedisSerializer
* @return
*/
@Bean
@Primary
public RedisTemplate<String, Object> redisTemplate(JedisConnectionFactory connectionFactory,
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer) {
RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();
template.setConnectionFactory(connectionFactory);
// 设置key/hashkey序列化
RedisSerializer<String> stringSerializer = new StringRedisSerializer();
template.setKeySerializer(stringSerializer);
template.setHashKeySerializer(stringSerializer);
// 设置值序列化
template.setValueSerializer(jackson2JsonRedisSerializer);
template.setHashValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}

绑定消息处理器:(这里示例是三个一对一的发布订阅,也可以一对多)

import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.stereotype.Component;
/**
* 绑定消息处理器
*
* @author sunziwen
* @version 1.0
* @date 2020/1/8 17:10
**/
@Component
public class BindingContainer {
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer,
HandleReceiver handleReceiver) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
/**
* 绑定异常日志处理信道
*/
MessageListenerAdapter exceptionAdapter = new MessageListenerAdapter(handleReceiver, "handleExceptionInfo");
exceptionAdapter.setSerializer(jackson2JsonRedisSerializer);
exceptionAdapter.afterPropertiesSet();
container.addMessageListener(exceptionAdapter, new PatternTopic("channel:exception_info"));
/**
* 绑定登录日志处理信道
*/
MessageListenerAdapter loginAdapter = new MessageListenerAdapter(handleReceiver, "handleLoginInfo");
loginAdapter.setSerializer(jackson2JsonRedisSerializer);
loginAdapter.afterPropertiesSet();
container.addMessageListener(loginAdapter, new PatternTopic("channel:login_info"));
/**
* 绑定访问日志处理信道
*/
MessageListenerAdapter visitorAdapter = new MessageListenerAdapter(handleReceiver, "handleVisitor");
visitorAdapter.setSerializer(jackson2JsonRedisSerializer);
visitorAdapter.afterPropertiesSet();
container.addMessageListener(visitorAdapter, new PatternTopic("channel:visitor_info"));
return container;
}
}

消息处理器

package com.rz.mq;
import com.rz.domain.SysExceptionInfo;
import com.rz.domain.SysLoginInfo;
import com.rz.domain.SysVisitorInfo;
import com.rz.service.SysExceptionInfoServer;
import com.rz.service.SysLoginInfoServer;
import com.rz.service.SysVisitorInfoServer;
import lombok.AllArgsConstructor;
import org.springframework.stereotype.Component;
/**
* 消息处理器
*
* @author sunziwen
* @version 1.0
* @date 2020/1/8 17:08
**/
@Component
@AllArgsConstructor
public class HandleReceiver {
private SysLoginInfoServer sysLoginInfoServer;
private SysExceptionInfoServer sysExceptionInfoServer;
private SysVisitorInfoServer sysVisitorInfoServer;
/**
* 记录访问日志
*
* @param sysVisitorInfo
* @param message
*/
public void handleVisitor(SysVisitorInfo sysVisitorInfo, String message) {
sysVisitorInfoServer.save(sysVisitorInfo);
}
/**
* 记录异常日志
*
* @param sysExceptionInfo
* @param message
*/
public void handleExceptionInfo(SysExceptionInfo sysExceptionInfo, String message) {
sysExceptionInfoServer.save(sysExceptionInfo);
}
/**
* 记录登录日志
*
* @param sysLoginInfo
* @param message
*/
public void handleLoginInfo(SysLoginInfo sysLoginInfo, String message) {
sysLoginInfoServer.save(sysLoginInfo);
}
}

数据模型类:忽略...

消息发布:这里给出其中一个消息发布示例

import java.util.Date;
import java.util.Map;
import com.alibaba.fastjson.JSON;
import com.rz.constants.ServiceNames;
import com.rz.domain.SysVisitorInfo;
import com.rz.dto.AppResDto;
import com.rz.enums.ResEnums;
import com.rz.service.SysVisitorInfoServer;
import com.rz.util.application.ApplicationUtils;
import com.rz.utils.IdWorker;
import com.rz.utils.RedisUtil;
import lombok.AllArgsConstructor;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.context.annotation.Configuration;
import lombok.extern.slf4j.Slf4j;
/**
* 接口访问日志
*
* @author sunziwen
* @version 1.0
* @date 2019/12/13 8:34
**/
@Aspect
@Configuration
@Slf4j
@AllArgsConstructor
public class VisitorLog {
private RedisUtil redisUtil;
@Around("execution(public * *(..))&&@annotation(com.rz.exceptions.RzLog)")
public Object interceptor02(ProceedingJoinPoint pjp) throws Throwable {
SysVisitorInfo visitorInfo = SysVisitorInfo.builder()
.id(IdWorker.getId())
.userId(ApplicationUtils.currentUid())
.createTime(new Date())
.uri(ApplicationUtils.getRequest().getRequestURI())
.params(JSON.toJSONString(ApplicationUtils.getRequest().getParameterMap()))
.serverName(ServiceNames.rz_api)
.build();
try {
Object proceed = pjp.proceed();
AppResDto resDto = (AppResDto) proceed;
visitorInfo.setResult(resDto.getMsg());
redisUtil.convertAndSend("channel:visitor_info",visitorInfo);
return proceed;
} catch (BaseException e) {
ResEnums resEnums = e.getResEnums();
visitorInfo.setResult(resEnums.getMsg());
redisUtil.convertAndSend("channel:visitor_info",visitorInfo);
throw e;
}
}
}

注意:消息发布和消息订阅不要放在同一个服务中,消息订阅应该单独开一个服务来接收处理,否则集群的时候会有重复消费的情况。

最后

以上就是花痴棒球最近收集整理的关于SpringBoot+Redis+最简发布订阅的全部内容,更多相关SpringBoot+Redis+最简发布订阅内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(45)

评论列表共有 0 条评论

立即
投稿
返回
顶部