我是靠谱客的博主 善良火,最近开发中收集的这篇文章主要介绍springBoot2.x快速集成RabbitMQ一、公共部分-MQ配置集成二、生产者三、消费者,觉得挺不错的,现在分享给大家,希望可以做个参考。
概述
文章目录
- 一、公共部分-MQ配置集成
- 二、生产者
- 1、生产者-公共父类
- 2、自定义队列1的子类
- 三、消费者
- 1、监听队列
主要分为三大部分:MQ配置集成,生产者,消费者。
一、公共部分-MQ配置集成
申明队列、绑定队列的操作,无论是在消费者端和生产者端写都可以的,只要写了这部分操作的代码所在的服务先启动,就会去MQ服务端创建了。
建议不知道哪个服务先启动,就两个服务都写也没有问题,幂等的,不会得复创建。
package com.epay.fsc.ucms.rbhs.provider.configuration;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.*;
/**
* MQ配置
*
* @author Jiang xiaopeng
* @date 2021/11/18
*/
@Configuration
@EnableRabbit
public class RabbitMqConfiguration {
@Value("${rbhs.rabbitMq.address}")
private String address;
@Value("${rbhs.rabbitMq.userName}")
private String userName;
@Value("${rbhs.rabbitMq.password}")
private String password;
@Value("${rbhs.rabbitMq.vhost}")
private String vhost;
//处理交换器
@Value("${rbhs.rabbitMq.exchange:fsc-ucms-x_piq}")
public String exchangeXPiq;
//入金queue-rbhs推向Piq
@Value("${rbhs.rabbitMq.queue.piqRbhs:fsc-ucms-q_piq_rbhs}")
public String queuePiqRbhs;
@Value("${rbhs.rabbitMq.routingKey.piqRbhs:fsc-ucms-key_piq_rbhs}")
public String routingKeyPiqRbhs;
//入金queue-Piq响应rbhs
@Value("${rbhs.rabbitMq.queue.piqRbhsAck:fsc-ucms-q_piq_rbhs_ack}")
public String queuePiqRbhsAck;
@Value("${rbhs.rabbitMq.routingKey.piqRbhsAck:fsc-ucms-key_piq_rbhs_ack}")
public String routingKeyPiqRbhsAck;
/**
* @Description: 如果是singleton,回调类就只能有一个,就是最后设置的那个;所以如果想要设置不同的回调类,就要设置为prototype
*/
@Bean("rabbitTemplate")
@Scope(value =ConfigurableBeanFactory.SCOPE_PROTOTYPE,proxyMode = ScopedProxyMode.TARGET_CLASS)
public RabbitTemplate sendRbhsAckRabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(rbhsConnectionFactory());
return template;
}
/**
* 自定义ConnectionFactory,直接开启发送确认机制
*/
@Bean(name = "rbhsConnectionFactory")
public ConnectionFactory rbhsConnectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(address);
connectionFactory.setUsername(userName);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(vhost);
//与rbhsRabbitTemplate.setConfirmCallback(this)组合使用开启发送确认机制-判断是否成功到达交换机
connectionFactory.setPublisherConfirms(true);
//与rbhsRabbitTemplate.setReturnCallback(this)组合使用开启发送确认机制-判断是否成功到达队列
connectionFactory.setPublisherReturns(true);
return connectionFactory;
}
/**
* 定义SimpleRabbitListenerContainerFactory。选填;
*/
@Bean(name = "rabbitListenerContainerFactory")
public SimpleRabbitListenerContainerFactory rbhsListenerContainer(@Qualifier("rbhsConnectionFactory") ConnectionFactory firstConnectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(firstConnectionFactory);
//手动确认
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//默认消费者数量
factory.setConcurrentConsumers(3);
//最大消费者数量
factory.setMaxConcurrentConsumers(3);
//消息预取:每次给消费者发送的消息数量(默认会全量一次发消费者)
factory.setPrefetchCount(1);
factory.setDefaultRequeueRejected(true);
return factory;
}
/**
* 设置交换机
* 1. 选定交换机类型DirectExchange
* FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
* HeadersExchange :通过添加属性key-value匹配
* DirectExchange:按照routingkey分发到指定队列
* TopicExchange:多关键字匹配
*/
@Bean
public DirectExchange exchangeRbhs() {
return new DirectExchange(exchangeXPiq);
}
/**
* 声明队列-入金queue-rbhs推向Piq
*/
@Bean
public Queue queueRbhs1() {
return new Queue(queuePiqRbhs, true);
}
/**
* 将队列绑定到交换机
*/
@Bean
public Binding bindingQueueRbhs1() {
return BindingBuilder.bind(queueRbhs1()).to(exchangeRbhs()).with(routingKeyPiqRbhs);
}
/**
* 声明队列-入金queue-Piq响应rbhs
*/
@Bean
public Queue queueRbhs2() {
return new Queue(queuePiqRbhsAck, true);
}
/**
* 将队列绑定到交换机
*/
@Bean
public Binding bindingQueueRbhs2() {
return BindingBuilder.bind(queueRbhs2()).to(exchangeRbhs()).with(routingKeyPiqRbhsAck);
}
}
二、生产者
1、生产者-公共父类
package com.epay.fsc.ucms.rbhs.provider.util.mq;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.PostConstruct;
/**
* 消息发送
*
* @author Jiang xiaopeng
* @date 2021/11/19
*/
public abstract class RabbitMqSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
//初始配置为prototype(若rabbitTemplate为单例的话,那setConfirmCallback、setReturnCallback就会覆盖)
@Autowired
public RabbitTemplate rabbitTemplate;
//初始设置回调类
@PostConstruct
public void init() {
//指定 判断是否成功到达交换机
rabbitTemplate.setConfirmCallback(this);
//指定 判断是否成功到达队列
rabbitTemplate.setReturnCallback(this);
}
/**
* 是否成功发送至交换机,都会回调此方法
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
logger.info(" 回调id:" + correlationData);
if (ack) {
logger.info("消息发送至交换机-成功");
} else {
//todo 重推/告警
logger.error("消息发送至交换机-失败:原因={}", cause);
}
}
/**
* 交换机发送至队列失败,才会回调此方法
*
* @param message
消息主体
* @param replyCode
消息主体
* @param replyText
描述
* @param exchange
消息使用的交换器
* @param routingKey 消息使用的路由键
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
//todo 重推/告警
logger.error("消息发送至队列失败-失败:原因={}", message);
}
/**
* 消息发送Ack
*
* @param id
幂等ID
* @param content 内容
*/
public abstract void convertAndSend(String id, String content);
}
2、自定义队列1的子类
package com.epay.fsc.ucms.rbhs.provider.util.mq.rbhs;
import com.epay.fsc.ucms.rbhs.provider.configuration.RabbitMqConfiguration;
import com.epay.fsc.ucms.rbhs.provider.util.mq.RabbitMqSender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
/**
* 消息发送Rbhs
*
* @author Jiang xiaopeng
* @date 2021/11/19
*/
@Component("rabbitMqSenderRbhs")
public class RabbitMqSenderRbhs extends RabbitMqSender {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
RabbitMqConfiguration rabbitMqConfiguration;
@Override
public void convertAndSend(String id, String content) {
CorrelationData correlationId = new CorrelationData(id);
//把消息放入routingKey对应的队列当中
rabbitTemplate.convertAndSend(rabbitMqConfiguration.exchangeXPiq, rabbitMqConfiguration.routingKeyPiqRbhs, content, correlationId);
}
}
使用的时候,调用rabbitMqSenderRbhs.convertAndSend方法就行了
三、消费者
1、监听队列
package com.epay.fsc.ucms.piq.provider.util.mq.rbhs;
import com.alibaba.fastjson.JSON;
import com.epay.fsc.ucms.piq.provider.constant.AlarmConstant;
import com.epay.fsc.ucms.piq.provider.entity.FmsPiq;
import com.epay.fsc.ucms.piq.provider.model.*;
import com.epay.fsc.ucms.piq.provider.service.MqAckService;
import com.epay.fsc.ucms.piq.provider.service.MqConsumerService;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.UUID;
/**
* 消费者Rbhs
*
* @author Jiang xiaopeng
* @date 2021/11/19
*/
@Component
public class RabbitMqReceiverRbhs {
protected final String TAG = "[消费响应操作-Rbhs]";
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private MqAckService mqAckService;
@Autowired
MqConsumerService mqConsumerRbhsService;
@Autowired
public AlarmConstant alarmConstant;
@RabbitListener(containerFactory = "rabbitListenerContainerFactory", queues = "${piq.rabbitMq.queue.piqRbhs:fsc-ucms-q_piq_rbhs}")
@RabbitHandler
public void process(Channel channel, String content, Message message) {
String requestId = UUID.randomUUID().toString();
logger.info("{}接收处理队列当中的消息: requestId={},content={} ", TAG, requestId, content);
//消息消费
MqConsumerReq mqConsumerReq = MqConsumerReq.builder().requestId(requestId).content(content).build();
BaseResponse<FmsPiq> result = mqConsumerRbhsService.process(mqConsumerReq);
if (!result.isOk()) {
//失败则重试一次
result = mqConsumerRbhsService.process(mqConsumerReq);
}
if (!result.isOk()) {
//告警
alarmConstant.alarmSys(requestId, new StringBuilder(TAG).append("消息消费异常:content=")
.append(mqConsumerReq.getContent()).toString());
}
try {
//进行ack处理,手动ack应答,参数含义:标识哪条信息,是否批量签收
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
logger.info("{}消息签收:requestId={}", TAG, requestId);
} catch (IOException e) {
e.printStackTrace();
logger.error("{}消息签收异常:requestId={},content={},e={}", TAG, requestId, content, e);
}
//逆向发送响应队列
if (result.isOk()) {
logger.info("{}逆向发送响应队列-start: requestId={},id={}", TAG, requestId, result.getData().getId());
MqAckReq mqAckReq = MqAckReq.builder().requestId(requestId).content(result.getData().getId()).build();
BaseResponse<String> resultAck = mqAckService.process(mqAckReq);
if (!resultAck.isOk()) {
//失败则重试一次
resultAck =
mqAckService.process(mqAckReq);
}
if (!resultAck.isOk()) {
//告警
alarmConstant.alarmSys(requestId, new StringBuilder(TAG).append("逆向发送队列异常:content=")
.append(content).toString());
}
logger.info("{}逆向发送响应队列-end: requestId={},id={},resultAck={} ", TAG, requestId, result.getData().getId(), JSON.toJSONString(resultAck));
}
}
}
这里加了一部分是采用逆向推送应答队列的方式来保证一致性。
你也可以改成死信队列,监控告警
最后
以上就是善良火为你收集整理的springBoot2.x快速集成RabbitMQ一、公共部分-MQ配置集成二、生产者三、消费者的全部内容,希望文章能够帮你解决springBoot2.x快速集成RabbitMQ一、公共部分-MQ配置集成二、生产者三、消费者所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复