我是靠谱客的博主 刻苦可乐,这篇文章主要介绍自定义访问rabbitmq的框架(二),现在分享给大家,希望可以做个参考。

RabbitMQHelper这个类,用于声明交换机,声明队列、发送消息等操作。
dedicatedChannels 用于存放当前线程中的信道,声明交换机、队列等操作发生在多个方法中,为了共享信道,所以放在局部线程变量中。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
public class RabbitMQHelper { protected final Logger logger = LoggerFactory.getLogger(RabbitMQHelper.class); private final CacheConnectionFactory cacheConnectionFactory; private final AtomicInteger activeCallbacks = new AtomicInteger(); private final ThreadLocal<Channel> dedicatedChannels = new ThreadLocal(); public RabbitMQHelper(@Autowired CacheConnectionFactory cacheConnectionFactory) { this.cacheConnectionFactory = cacheConnectionFactory; } public void closeChannel() { Channel channel = (Channel)this.dedicatedChannels.get(); if(channel != null) { try { this.dedicatedChannels.set((Object)null); channel.close(); } catch (IOException var3) { ; } catch (TimeoutException var4) { ; } } } public void queueBinding(BindingMQ bindingMQ) { this.doExecute((channel) -> { this.queueBindings(channel, new BindingMQ[]{bindingMQ}); return null; }, this.cacheConnectionFactory); } public void queueBindings(Channel channel, BindingMQ... bindingMQs) { for(int i = 0; i < bindingMQs.length; ++i) { BindingMQ bindingMQ = bindingMQs[i]; if(this.logger.isDebugEnabled()) { this.logger.debug("绑定交换机和队列:" + bindingMQ.getDestionation() + "," + bindingMQ.getExchange()); } try { channel.queueBind(bindingMQ.getDestionation(), bindingMQ.getExchange(), bindingMQ.getRoutingKey(), bindingMQ.getArguments()); } catch (IOException var6) { throw new RuntimeException("绑定交换机和队列出错:" + bindingMQ.getDestionation() + "," + bindingMQ.getExchange()); } } } public void declareExchange(ExchangeMQ exchangeMQ) { this.doExecute((channel) -> { this.declareExchanges(channel, new ExchangeMQ[]{exchangeMQ}); return null; }, this.cacheConnectionFactory); } public void declareExchanges(Channel channel, ExchangeMQ... exchangeMQs) { try { for(int i = 0; i < exchangeMQs.length; ++i) { ExchangeMQ exchangeMQ = exchangeMQs[i]; if(this.logger.isDebugEnabled()) { this.logger.debug("声明交换机:" + exchangeMQ.getName()); } if(exchangeMQ.isDelayed()) { Map<String, Object> arguments0 = exchangeMQ.getArguments(); HashMap arguments; if(arguments0 == null) { arguments = new HashMap(); } else { arguments = new HashMap(arguments0); } arguments.put("x-deplayed-type", exchangeMQ.getType()); channel.exchangeDeclare(exchangeMQ.getName(), exchangeMQ.getType(), exchangeMQ.isDurable(), exchangeMQ.isAutoDelete(), exchangeMQ.isInternal(), arguments); } else { channel.exchangeDeclare(exchangeMQ.getName(), exchangeMQ.getType(), exchangeMQ.isDurable(), exchangeMQ.isAutoDelete(), exchangeMQ.isInternal(), exchangeMQ.getArguments()); } } } catch (IOException var7) { throw new RuntimeException("声明交换机出错:" + var7); } } public String declareQueue(QueueMQ queue) { return (String)this.doExecute((channel) -> { List<DeclareOk> declareOk = this.declareQueues(channel, new QueueMQ[]{queue}); return declareOk.size() > 0?((DeclareOk)declareOk.get(0)).getQueue():null; }, this.cacheConnectionFactory); } public List<DeclareOk> declareQueues(Channel channel, QueueMQ... queues) { List<DeclareOk> declareOks = new ArrayList(); for(int i = 0; i < queues.length; ++i) { QueueMQ queue = queues[i]; if(this.logger.isDebugEnabled()) { this.logger.debug("声明队列:" + queue.getName()); } try { DeclareOk declareOk = channel.queueDeclare(queue.getName(), queue.isDurable(), queue.isExclusive(), queue.isAutoDelete(), queue.getArguments()); declareOks.add(declareOk); } catch (IOException var7) { throw new RuntimeException("声明队列的时候发生了异常:" + queue.getName()); } } return declareOks; } public Channel getChannel() { ChannelCachingConnectionProxy channelCachingConnectionProxy = null; Channel channel = null; try { channelCachingConnectionProxy = this.cacheConnectionFactory.createConnection(); } catch (Exception var5) { throw new RuntimeException("rabbitmqhelper:创建 rabbit 连接失败"); } try { channel = this.cacheConnectionFactory.createChannel(channelCachingConnectionProxy); return channel; } catch (Exception var4) { throw new RuntimeException("rabbitmqhelper:创建 rabbit 通道失败"); } } private <T> T doExecute(ChannelCallback<T> channelCallback, CacheConnectionFactory cacheConnectionFactory) { Assert.notNull(cacheConnectionFactory, "connection factory 不能为null"); Channel channel = null; channel = (Channel)this.dedicatedChannels.get(); if(channel == null || !channel.isOpen()) { if(channel != null) { this.closeChannel(); channel = null; } channel = this.getChannel(); this.dedicatedChannels.set(channel); } Object result = null; try { result = channelCallback.doInRabbit(channel); return result; } catch (Exception var6) { throw new RuntimeException("rabbitmqhelper:doInRabbit出错" + var6); } } public void send(String exchange, String destination, boolean durable, BasicProperties properties, String msg) { this.doExecute((channel) -> { channel.basicPublish(exchange, destination, durable, properties, msg.getBytes("utf-8")); return null; }, this.cacheConnectionFactory); }

最后

以上就是刻苦可乐最近收集整理的关于自定义访问rabbitmq的框架(二)的全部内容,更多相关自定义访问rabbitmq内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(54)

评论列表共有 0 条评论

立即
投稿
返回
顶部