概述
RabbitMQHelper这个类,用于声明交换机,声明队列、发送消息等操作。
dedicatedChannels 用于存放当前线程中的信道,声明交换机、队列等操作发生在多个方法中,为了共享信道,所以放在局部线程变量中。
public class RabbitMQHelper {
protected final Logger logger = LoggerFactory.getLogger(RabbitMQHelper.class);
private final CacheConnectionFactory cacheConnectionFactory;
private final AtomicInteger activeCallbacks = new AtomicInteger();
private final ThreadLocal<Channel> dedicatedChannels = new ThreadLocal();
public RabbitMQHelper(@Autowired CacheConnectionFactory cacheConnectionFactory) {
this.cacheConnectionFactory = cacheConnectionFactory;
}
public void closeChannel() {
Channel channel = (Channel)this.dedicatedChannels.get();
if(channel != null) {
try {
this.dedicatedChannels.set((Object)null);
channel.close();
} catch (IOException var3) {
;
} catch (TimeoutException var4) {
;
}
}
}
public void queueBinding(BindingMQ bindingMQ) {
this.doExecute((channel) -> {
this.queueBindings(channel, new BindingMQ[]{bindingMQ});
return null;
}, this.cacheConnectionFactory);
}
public void queueBindings(Channel channel, BindingMQ... bindingMQs) {
for(int i = 0; i < bindingMQs.length; ++i) {
BindingMQ bindingMQ = bindingMQs[i];
if(this.logger.isDebugEnabled()) {
this.logger.debug("绑定交换机和队列:" + bindingMQ.getDestionation() + "," + bindingMQ.getExchange());
}
try {
channel.queueBind(bindingMQ.getDestionation(), bindingMQ.getExchange(), bindingMQ.getRoutingKey(), bindingMQ.getArguments());
} catch (IOException var6) {
throw new RuntimeException("绑定交换机和队列出错:" + bindingMQ.getDestionation() + "," + bindingMQ.getExchange());
}
}
}
public void declareExchange(ExchangeMQ exchangeMQ) {
this.doExecute((channel) -> {
this.declareExchanges(channel, new ExchangeMQ[]{exchangeMQ});
return null;
}, this.cacheConnectionFactory);
}
public void declareExchanges(Channel channel, ExchangeMQ... exchangeMQs) {
try {
for(int i = 0; i < exchangeMQs.length; ++i) {
ExchangeMQ exchangeMQ = exchangeMQs[i];
if(this.logger.isDebugEnabled()) {
this.logger.debug("声明交换机:" + exchangeMQ.getName());
}
if(exchangeMQ.isDelayed()) {
Map<String, Object> arguments0 = exchangeMQ.getArguments();
HashMap arguments;
if(arguments0 == null) {
arguments = new HashMap();
} else {
arguments = new HashMap(arguments0);
}
arguments.put("x-deplayed-type", exchangeMQ.getType());
channel.exchangeDeclare(exchangeMQ.getName(), exchangeMQ.getType(), exchangeMQ.isDurable(), exchangeMQ.isAutoDelete(), exchangeMQ.isInternal(), arguments);
} else {
channel.exchangeDeclare(exchangeMQ.getName(), exchangeMQ.getType(), exchangeMQ.isDurable(), exchangeMQ.isAutoDelete(), exchangeMQ.isInternal(), exchangeMQ.getArguments());
}
}
} catch (IOException var7) {
throw new RuntimeException("声明交换机出错:" + var7);
}
}
public String declareQueue(QueueMQ queue) {
return (String)this.doExecute((channel) -> {
List<DeclareOk> declareOk = this.declareQueues(channel, new QueueMQ[]{queue});
return declareOk.size() > 0?((DeclareOk)declareOk.get(0)).getQueue():null;
}, this.cacheConnectionFactory);
}
public List<DeclareOk> declareQueues(Channel channel, QueueMQ... queues) {
List<DeclareOk> declareOks = new ArrayList();
for(int i = 0; i < queues.length; ++i) {
QueueMQ queue = queues[i];
if(this.logger.isDebugEnabled()) {
this.logger.debug("声明队列:" + queue.getName());
}
try {
DeclareOk declareOk = channel.queueDeclare(queue.getName(), queue.isDurable(), queue.isExclusive(), queue.isAutoDelete(), queue.getArguments());
declareOks.add(declareOk);
} catch (IOException var7) {
throw new RuntimeException("声明队列的时候发生了异常:" + queue.getName());
}
}
return declareOks;
}
public Channel getChannel() {
ChannelCachingConnectionProxy channelCachingConnectionProxy = null;
Channel channel = null;
try {
channelCachingConnectionProxy = this.cacheConnectionFactory.createConnection();
} catch (Exception var5) {
throw new RuntimeException("rabbitmqhelper:创建 rabbit 连接失败");
}
try {
channel = this.cacheConnectionFactory.createChannel(channelCachingConnectionProxy);
return channel;
} catch (Exception var4) {
throw new RuntimeException("rabbitmqhelper:创建 rabbit 通道失败");
}
}
private <T> T doExecute(ChannelCallback<T> channelCallback, CacheConnectionFactory cacheConnectionFactory) {
Assert.notNull(cacheConnectionFactory, "connection factory 不能为null");
Channel channel = null;
channel = (Channel)this.dedicatedChannels.get();
if(channel == null || !channel.isOpen()) {
if(channel != null) {
this.closeChannel();
channel = null;
}
channel = this.getChannel();
this.dedicatedChannels.set(channel);
}
Object result = null;
try {
result = channelCallback.doInRabbit(channel);
return result;
} catch (Exception var6) {
throw new RuntimeException("rabbitmqhelper:doInRabbit出错" + var6);
}
}
public void send(String exchange, String destination, boolean durable, BasicProperties properties, String msg) {
this.doExecute((channel) -> {
channel.basicPublish(exchange, destination, durable, properties, msg.getBytes("utf-8"));
return null;
}, this.cacheConnectionFactory);
}
最后
以上就是刻苦可乐为你收集整理的自定义访问rabbitmq的框架(二)的全部内容,希望文章能够帮你解决自定义访问rabbitmq的框架(二)所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复