我是靠谱客的博主 超帅硬币,最近开发中收集的这篇文章主要介绍StringBoot集成Rabbit Redis和ack机制双重保险,保障消息一定能够正确的消费转: StringBoot集成Rabbit,根据业务返回ACK,觉得挺不错的,现在分享给大家,希望可以做个参考。
概述
转: StringBoot集成Rabbit,根据业务返回ACK
为了维护消息的有效性,当消费消息时候处理失败时候,不进行消费,需要我们根据业务区返回ACK,本项目我使用Redis和ack机制双重保险,保障消息一定能够正确的消费
首先,接着上部分内容,使用Topic,机制(不明白的,可以回顾上部分内容)
上部分内容,我们使用SpringBoot注解,去实现,但是控制权不完全账务,当进行大规模项目时候,不太建议使用
@RabbitListener(queues = TopicRabbitConfig.USER_QUEUE)
@RabbitHandler
public void processUser(String message) {
threadPool.execute(new Runnable() {
@Override
public void run() {
logger.info("用户侧流水:{}",message);
}
});
}
根据源码分析,当然这里不分析源码,有兴趣的可以多失败几次就ok明白了
在配置类中定义监听器,监听这个序列(
AcknowledgeMode.MANUAL
是必须的哦)
/**
* 接受消息的监听,这个监听客户交易流水的消息
* 针对消费者配置
* @return
*/
@Bean
public SimpleMessageListenerContainer messageContainer1(ConnectionFactory connectionFactory, TransactionConsumeImpl transactionConsume) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queueMessage());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认
container.setMessageListener(transactionConsume);
return container;
}
这个 TransactionConsumeImpl
要继承ChannelAwareMessageListener
,主要说的手动返回ACK就是channel。调用
@Component
public class TransactionConsumeImpl implements ChannelAwareMessageListener {
private static final Logger logger = LoggerFactory.getLogger(TransactionConsumeImpl.class);
private static final Gson gson = new Gson();
@Autowired
JedisShardInfo jedisShardInfo;
@Autowired
ExecutorService threadPool;
@Autowired
BoluomeFlowService boluomeFlowService;
@Override
public void onMessage(Message message, Channel channel) throws Exception {
String boby = new String(message.getBody(), "utf-8");//转换消息,我们是使用json数据格式
threadPool.execute(new Runnable() { //多线程处理
@Override
public void run() {
Jedis jedis = jedisShardInfo.createResource();
jedis.sadd(TopicRabbitConfig.TRANSACTION_QUEUE, boby);//添加到key为当前消息类型的集合里面,防止丢失消息
BoluomeFlow flow = gson.fromJson(boby, BoluomeFlow.class);
String json = gson.toJson(flow);
if (boluomeFlowService.insert(flow)) { //当添加成功时候返回成功
logger.info("客户交易流水添加1条记录:{}", json);
jedis.srem(TopicRabbitConfig.TRANSACTION_QUEUE, boby);//从当前消息类型集合中移除已经消费过的消息
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//手工返回ACK,通知此消息已经争取消费
} catch (IOException ie) {
logger.error("消费成功回调成功,io操作异常");
}
} else {
logger.info("客户交易流水添加失败记录:{}", json);
}
}
});
}
}
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); // ack返回false,并重新回到队列,api里面解释得很清楚
- channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // 拒绝消息
- true 发送给下一个消费者
- false 谁都不接受,从队列中删除
Rabbitmq进阶
© 著作权归作者所有
import com.rabbitmq.client.Channel;
import config.callback.ConfirmCallBackListener;
import config.callback.ReturnCallBackListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import po.Mail;
import rabbitMQ.listener.TransactionConsumerImpl;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
//连接rabbitMQ的基本配置
@Configuration
@ComponentScan(basePackages = {"rabbitMQ.listener","config.callback"})
@EnableRabbit
public class RabbitConfig {
private static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class);
@Autowired
private TransactionConsumerImpl transactionConsumer;
@Autowired
private ConfirmCallBackListener confirmCallBackListener;
@Autowired
private ReturnCallBackListener returnCallBackListener;
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost("mq.efunbox.cn");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setPort(5672);
//connectionFactory.setPublisherConfirms(true);
return connectionFactory;
}
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
/**
* 使用消息队列,必须要考虑的问题就是生产者消息发送失败和消费者消息处理失败,这两种情况怎么处理.
生产者发送消息,成功,则确认消息发送成功;失败,则返回消息发送失败信息,再做处理.
消费者处理消息,成功,则消息队列自动删除消息;失败,则消息重新返回队列,等待处理.
对于消费者处理失败的情况,如果仅仅只是让消息重新返回队列,等待处理,那么久有可能会出现很多消息一直无法处理的情况;
因此,是否让消息返回队列,还有待商榷.
**/
//消息确认监听器confirmCallBackListener:
rabbitTemplate.setConfirmCallback(confirmCallBackListener);
//消息发送失败返回监听器returnCallBackListener:
rabbitTemplate.setReturnCallback(returnCallBackListener);
// mandatory必须设置true,return callback才生效
rabbitTemplate.setMandatory(true);
return rabbitTemplate;
}
@Bean
public Queue payQueue() {
//第二个参数 durable=true 表示 持久化
Queue queue=new Queue("payQueue",true);
return queue;
}
/**
* 初始化线程池 多线程执行 消费任务
*/
@Bean
public ExecutorService threadPool(){
return Executors.newFixedThreadPool(20);
}
/**
* 对于消费端,我们可以只创建 SimpleRabbitListenerContainerFactory,
* 它能够帮我们生成 RabbitListenerContainer,然后我们再使用
* @RabbitListener 指定接收者收到信息时处理的方法。
*/
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(10);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
//消费者配置 @RabbitListener 和 @Bean SimpleMessageListenerContainer 方式
/**
* 针对消费者配置
* 方式一 每一个Queue 对应一个SimpleMessageListenerContainer
* 指定消息接受监听器 MessageListener implements ChannelAwareMessageListener 接口 自己实现onMessage方法
*
* 作用: 接受消息的监听,这个监听指定Queue(payQueue)客户交易流水的消息
* @return
*/
/*
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(payQueue());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
//设置确认模式手工确认
//设置ack方式为手动,增加对应队列的监听器。acknowledge="manual" 则开启ack机制
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setMessageListener(transactionConsumer);
return container;
}
*/
/**
* 方式二 @RabbitListener
* SimpleRabbitListenerContainerFactory 可以生成 RabbitListenerContainer
*/
//factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
// 自动响应时 简单配置即可 不需要手动确认是否消费
@RabbitListener(queues = "payQueue")
public void displayMail(Mail mail) throws Exception {
System.out.println("队列监听器1号收到消息"+mail.toString());
}
//factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//需要手动确认消息是否消费时 这样处理
@RabbitListener(queues = "payQueue")
public void process(@Payload Mail mail, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel)throws Exception{
logger.info("rabbit receiver message:{}",mail);
try {
channel.basicAck(deliveryTag, false);
}catch (Exception e){
logger.error("process message error: {}",e);
}
}
}
}
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import po.Mail;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
@Service("transactionConsumerImpl")
public class TransactionConsumerImpl implements ChannelAwareMessageListener {
private static final Logger logger = LoggerFactory.getLogger(TransactionConsumerImpl.class);
private SimpleMessageConverter converter = new SimpleMessageConverter();
@Autowired
ExecutorService threadPool;
// @Autowired
//BoluomeFlowService boluomeFlowService;
//只有在消息处理成功后发送ack确认,或失败后发送nack使信息重新投递
public void onMessage(final Message message, final Channel channel) throws Exception {
final String boby = new String(message.getBody(), "utf-8");//转换消息,我们是使用json数据格式
Object msg = converter.fromMessage(message);
// todo mail chuli
System.out.println(JSONObject.toJSONString(msg));
try {
//手工返回ACK,通知此消息已经争取消费
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e1) {
e1.printStackTrace();
System.out.println("消息已重复处理失败,拒绝再次接收 失败...");
}
/* threadPool.execute(new Runnable() { //多线程处理
public void run() {
Jedis jedis = jedisShardInfo.createResource();
jedis.sadd(TopicRabbitConfig.TRANSACTION_QUEUE, boby);//添加到key为当前消息类型的集合里面,防止丢失消息
BoluomeFlow flow = gson.fromJson(boby, BoluomeFlow.class);
String json = gson.toJson(flow);
if (boluomeFlowService.insert(flow)) { //当添加成功时候返回成功
logger.info("客户交易流水添加1条记录:{}", json);
jedis.srem(TopicRabbitConfig.TRANSACTION_QUEUE, boby);//从当前消息类型集合中移除已经消费过的消息
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//手工返回ACK,通知此消息已经争取消费
} catch (IOException ie) {
logger.error("消费成功回调成功,io操作异常");
}
} else {
logger.info("客户交易流水添加失败记录:{}", json);
}
try
{
System.out.println("consumer--:" + message.getMessageProperties() + ":" + new String(message.getBody()));
// deliveryTag是消息传送的次数,我这里是为了让消息队列的第一个消息到达的时候抛出异常,处理异常让消息重新回到队列,然后再次抛出异常,处理异常拒绝让消息重回队列
if (message.getMessageProperties().getDeliveryTag() == 1 || message.getMessageProperties().getDeliveryTag() == 2)
{
throw new Exception();
}
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // false只确认当前一个消息收到,true确认所有consumer获得的消息
}
catch (Exception e){
e.printStackTrace();
if (message.getMessageProperties().getRedelivered())
{
System.out.println("消息已重复处理失败,拒绝再次接收...");
try {
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // 拒绝消息
} catch (IOException e1) {
e1.printStackTrace();
System.out.println("消息已重复处理失败,拒绝再次接收 失败...");
}
}
else
{
System.out.println("消息即将再次返回队列处理...");
try {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); // requeue为是否重新回到队列
} catch (IOException e1) {
e1.printStackTrace();
System.out.println("requeue为是否重新回到队列 失败...");
}
}
}
}
});*/
}
本地测试代码地址: xxx
最后
以上就是超帅硬币为你收集整理的StringBoot集成Rabbit Redis和ack机制双重保险,保障消息一定能够正确的消费转: StringBoot集成Rabbit,根据业务返回ACK的全部内容,希望文章能够帮你解决StringBoot集成Rabbit Redis和ack机制双重保险,保障消息一定能够正确的消费转: StringBoot集成Rabbit,根据业务返回ACK所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复