概述
最近soa项目要和官网系统对接,实现mq信息监听,保存等一些列操作。项目用的是Maven+SSM框架。然后学习和开发用了两天时间,算是搞定,趁加班时间做个总结。
对于Maven工程的ssm框架,整合RabbitMq首先就是
1.引入依赖:
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.3.5.RELEASE</version>
</dependency>
依赖不多,就这一个就够用了。但是有个坑要注意,如果引入的位置不对,启动会出现莫名的启动报错,我试了好几次,最后放到所有引入依赖的最后面是没问题的~所以,明智的选择是引入成功后先启动项目看看会不会报错,这个时候才引入一个依赖,正常不会报错,如果报错,就是位置不对,及时调整吧。
2.配置文件:context-rabbitMq.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
<!-- 定义RabbitMQ的连接工厂 -->
<rabbit:connection-factory id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="10.130.202.12"><!--服务器ip地址-->
<property name="username" value="tele-onlineYa">
<property name="password" value="tele-onlineYa">
<property name="port" value="5678">
<property name="channelCacheSize" value="50">
<property name="virtualHost" value="tele-onlineYa">
</bean>
<!--通过指定admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成。 -->
<rabbit:admin connection-factory="connectionFactory" />
<!-- 队列queue,自动声明 -->
<rabbit:queue name="ZL.SALETRACE" exclusive="false" auto-declare="false" durable="true" />
<!--自定义消息接受者-->
<bean id="messageReceiver" class="com.zlf.cn.api.RabbitMqConsumerListener">
<!-- queue监听器,观察监听模式。当有消息到达本应用时会通知监听在对应队列queue上的监听对象 -->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener queues="ZL.SALETRACE" ref="messageReceiver" />
</rabbit:listener-container>
</beans>
3.在web.xml中引入配置的文件context-rabbitMq.xml
<context-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:META-INF/app_config/context/context-*.xml</param-value>
</context-param>
4.自定义监听类:RabbitMqConsumerListener
自定义的监听类必须实现接口:ChannelAwareMessageListener或者MessageListener.这里最好用Channel…这个。
二者的区别可以从他们的抽象类的入参来说:一个是message(消息实体),一个是channel就是当前的通道。
ChannelAwareMessageListener的抽象类:
void onMessage(Message message,Channel channel) .
MessageListener接口的抽象类:
void onMessage(Message message).
多出的参数Channel可以很方便的提供监听之外的ack通知RabbitMq服务器功能。
由于MQ服务器对于消费端无反馈的,有重发机制,可能会有一条数据发送多次,并且一致保存在服务器中,久而久之就会由于数据量过大造成内存溢出的危险,而ack机制就是通过消费端发出通知给mq服务器,告诉服务器那条mq已经处理完毕,可以剔除。对于处理异常的,则可以重新回到mq服务器的队列中。而实现手动实现这个ack的关键点就是实现接口:ChannelAwareMessageListener.
如何实现手动ack?
手动ack就是在当前channel里面调用basic***的方法,并传入当前消息的tagId就可以了。
这里的ack通知分为三种情况:
a.消费端正常处理完成一条mq时,ack mq服务器可以移除此条mq的方法
//消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
参数解释:
deliveryTag:该消息的index
multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。
b. 消费端处理完成一条mq时发生异常,ack 会将此条mq重新放到mq服务器队列queue中
//ack返回false,此条mq并重新回到队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
参数解释:
deliveryTag:该消息的index
multiple:是否批量.true:将一次性拒绝所有小于deliveryTag的消息。
requeue:被拒绝的是否重新入队列
c.就是消费端主动拒绝mq服务器发送mq过来。
//拒绝消息
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
参数解释:
deliveryTag:该消息的index
requeue:被拒绝的是否重新入队列
channel.basicNack 与 channel.basicReject
的区别在于basicNack可以拒绝多条消息,而basicReject一次只能拒绝一条消息
5.自定义类的具体实现:
import java.io.IOException
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import com.rabbitmq.client.Channel;
import com.zlf.cn.action.IRabbitMqConsumerTaskAction;
/**
*使用监听器接收消息
*@author itw_zhanglf02
/
public class RabbitMqConsumerListener implents ChannelAwareMessageListener{
private Logger logger=loggerFactory.getLogger(RabbitMqConsumerListener.class);
@Resource(name=IRabbitMqConsumerTaskAction.ACTION_ID)
private IRabbitMqConsumerTaskAction rabbitMqConsumerTaskAction;
@Override
public void onMessage(Message arg0,Channel channel){
//业务处理,放到action层,并返回处理成功还是异常的flag
boolean mqFlag=rabbitMaConsumerTaskAction.saveMq(arg0);
//还有一个点就是如何获取mq消息的报文部分message?
//String message=new String(arg0.getBody(),"UTF-8");
if(mqFlag){
basicACK(arg0,channle);//处理正常--ack
}else{
basicNACK(arg0,channle);//处理异常--nack
}
}
//正常消费掉后通知mq服务器移除此条mq
private void basicACK(Message message,Channel channel){
try{ channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}catch(IOException e){
logger.error("通知服务器移除mq时异常,异常信息:"+e);
}
}
//处理异常,mq重回队列
private void basicNACK(Message message,Channel channel){
try{
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
}catch(IOExeption e){
logger.error("mq重新进入服务器时出现异常,异常信息:"+e);
}
}
}
这样就可以了。启动项目,有时候在项目启动完毕,监听程序开始的时候报错,报错信息大致:
org.springframework.amqpAmqpIOException:java.io.IOException....
Case by:com.rabbitmq.client.ShutdownSignalException:channel error;protocol method:#method<channel.close>(reply-code=403,reply-text=ACCESS_REFUSED - access to queue 'ZL.SALETRACE' in vhost 'tele-onlineYA' refused for user 'tel-online',class-id=50,method-id=10)......
这种情况网上给的大都是由于在配置对应权限上有问题,但我这里报的同样的错误,并不是他们说的那种情况,而是由于我们这里做的监听mq服务器名为’ZL.SALETRACE’的queue,如果服务器端现在没有mq,就会报这个错,估计是没有mq,服务器暂时没有分配队列,队列就没有生成,但这个不是啥配置问题,是初始化问题。如果现在服务器端发送mq,还是能迅速的接收到发送的mq的,然后一切就正常了。
所以这个问题归结于服务端没有mq,如果有mq,还是可以正常接收,没啥问题的。这种报错可以忽略。
好了,这就是本次迭代的总结。~
最后
以上就是动人航空为你收集整理的RabbitMQ消息队列+spring监听mq服务器,接收消费mq消息的全部内容,希望文章能够帮你解决RabbitMQ消息队列+spring监听mq服务器,接收消费mq消息所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复