我是靠谱客的博主 痴情期待,最近开发中收集的这篇文章主要介绍springCloud使用rabbitMq一.安装rabbitMq二.rabbitMq 使用,觉得挺不错的,现在分享给大家,希望可以做个参考。
概述
一.安装rabbitMq
安装rabbitMq(需要安装延时插件)。
二.rabbitMq 使用
1.导入maven依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.在application.yml配置文件中加入RabbitMq配置信息
spring:
rabbitmq:
host: 127.0.0.1
port: 5672(端口)
username: super(用户名)
password: Jmy2019.(密码)
virtual-host: ecosphere
3.rabbitMq配置类
package com.nuvole.merchant.conf.mq;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* rabbit配置类(声明交换机、队列以及他们的绑定关系)
*
*/
@Configuration
public class AmqpConfig {
// 交换机名称(延时队列)
public static final String DELAYED_EXCHANGE_KEY = "exchange.delayed";
// 队列名称(成团状态更新)
public static final String ORDER_GROUP_JOIN_QUEUE_KEY = "order.group.join.delayed";
//队列路线/绑定关系(成团状态更新)
public static final String ORDER_GROUP_JOIN_ROUTK = "order.group.join.delayed";
/**
* 延时队列交换器
*
*/
@Bean
public CustomExchange testExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAYED_EXCHANGE_KEY, "x-delayed-message", true, false, args);
}
/**
* 成团状态更新推送队列
*
*/
@Bean
public Queue orderGroupJoinQueue() {
return new Queue(ORDER_GROUP_JOIN_QUEUE_KEY, true);
}
@Bean
public Binding flashSalePushBinding(CustomExchange delayedExchange, Queue orderGroupJoinQueue) {
Binding binding=BindingBuilder.bind(orderGroupJoinQueue).to(delayedExchange).with(ORDER_GROUP_JOIN_ROUTK).noargs();
return binding;
}
}
4.消息发送确认
package com.nuvole.shop.config.mq;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* @Description 消息发送确认
* <p>
* ConfirmCallback 只确认消息是否正确到达 Exchange 中
* ReturnCallback 消息没有正确到达队列时触发回调,如果正确到达队列不执行
* <p>
* 1. 如果消息没有到exchange,则confirm回调,ack=false
* 2. 如果消息到达exchange,则confirm回调,ack=true
* 3. exchange到queue成功,则不回调return
* 4. exchange到queue失败,则回调return
*/
@Component
public class AmqpAckConfig implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("return--message:" + new String(message.getBody()) + ",replyCode:" + replyCode
+ ",replyText:" + replyText + ",exchange:" + exchange + ",routingKey:" + routingKey);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
} else {
System.out.println("消息发送确认失败:" + cause);
}
}
}
5.发送队列消息的service
package com.nuvole.merchant.service.mq;
/**
* 发送消息
*
*/
public interface QueueMessageService {
/**
* 发送正常队列消息
*
*/
void send(String exchangeKey, String routingKey, Object message);
/**
* 发送延时队列消息
*
*/
void delayedSend(String exchangeKey, String routingKey, Object message, int msec);
}
6.发送队列消息的service实现类
package com.nuvole.merchant.service.mq;
import com.nuvole.util.IdGenerator;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class QueueMessageServiceImpl implements QueueMessageService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void send(String exchangeKey, String routingKey, Object message) {
CorrelationData correlationData = new CorrelationData(IdGenerator.getUUID());
rabbitTemplate.convertAndSend(exchangeKey, routingKey, message, correlationData);
}
@Override
public void delayedSend(String exchangeKey, String routingKey, Object msg,final int xdelay) {
rabbitTemplate.convertAndSend(exchangeKey, routingKey, msg, message -> {
// 设置延迟时间
message.getMessageProperties().setDelay(xdelay);
return message;
});
}
}
7.调用队列
//msec:距离调用队列的时间(毫秒) param_extend:向队列传递的参数
int msec = Integer.parseInt(Long.toString(DateUtil.betweenMs(new Date(), outTime)));
HashMap<String, Object> param_extend = new HashMap<>();
param_extend.put("id",“123456");
queueMessageService.delayedSend(AmqpConfig.DELAYED_EXCHANGE_KEY, AmqpConfig.ORDER_GROUP_JOIN_QUEUE_KEY, JSON.toJSONString(param_extend), msec);
8.监听队列方法
package com.nuvole.shop.config.mq;
import cn.hutool.core.convert.Convert;
import com.alibaba.fastjson.JSON;
import com.nuvole.shop.domain.StoreGroupGoodsJoin;
import com.nuvole.shop.domain.extend.WechatStoreGroupJoinGoods;
import com.nuvole.shop.mapper.extend.ExtendStoreGroupGoodsJoinMapper;
import com.nuvole.shop.scheduler.StorePushScheduler;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* 监听队列消息
*
*/
@Slf4j
@Component
@EnableRabbit
public class TestReceiver {
@RabbitListener(queues = AmqpConfig.ORDER_GROUP_JOIN_QUEUE_KEY)
public void process(String msg, Channel channel, Message message) {
log.info("======================延时队列开始执行。。。。。。。。。。。。。。");
log.info(new Date().toString() + ",延时收到了信息 message = " + msg);
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
Map p = JSON.parseObject(msg, HashMap.class);
} catch (IOException e) {
e.printStackTrace();
}
}
}
最后
以上就是痴情期待为你收集整理的springCloud使用rabbitMq一.安装rabbitMq二.rabbitMq 使用的全部内容,希望文章能够帮你解决springCloud使用rabbitMq一.安装rabbitMq二.rabbitMq 使用所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复