我是靠谱客的博主 动人航空,最近开发中收集的这篇文章主要介绍RabbitMQ消息队列+spring监听mq服务器,接收消费mq消息,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

最近soa项目要和官网系统对接,实现mq信息监听,保存等一些列操作。项目用的是Maven+SSM框架。然后学习和开发用了两天时间,算是搞定,趁加班时间做个总结。
对于Maven工程的ssm框架,整合RabbitMq首先就是

1.引入依赖:

<dependency>
	<groupId>org.springframework.amqp</groupId>
	<artifactId>spring-rabbit</artifactId>
	<version>1.3.5.RELEASE</version>
</dependency>

依赖不多,就这一个就够用了。但是有个坑要注意,如果引入的位置不对,启动会出现莫名的启动报错,我试了好几次,最后放到所有引入依赖的最后面是没问题的~所以,明智的选择是引入成功后先启动项目看看会不会报错,这个时候才引入一个依赖,正常不会报错,如果报错,就是位置不对,及时调整吧。

2.配置文件:context-rabbitMq.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans 	xmlns="http://www.springframework.org/schema/beans"  
		xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"     xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans   http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/rabbit  http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">  
<!-- 定义RabbitMQ的连接工厂 -->  
<rabbit:connection-factory id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
	<constructor-arg value="10.130.202.12"><!--服务器ip地址-->  
	<property name="username" value="tele-onlineYa">
	<property name="password" value="tele-onlineYa">
	<property name="port" value="5678">
	<property name="channelCacheSize" value="50">
	<property name="virtualHost" value="tele-onlineYa">
</bean>

    <!--通过指定admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成。 -->  
    <rabbit:admin connection-factory="connectionFactory" />  
  
    <!-- 队列queue,自动声明 -->  
    <rabbit:queue name="ZL.SALETRACE"  exclusive="false" auto-declare="false" durable="true" />  
	<!--自定义消息接受者-->
	<bean id="messageReceiver" class="com.zlf.cn.api.RabbitMqConsumerListener">  
	<!-- queue监听器,观察监听模式。当有消息到达本应用时会通知监听在对应队列queue上的监听对象 -->  
    <rabbit:listener-container connection-factory="connectionFactory">  
        <rabbit:listener queues="ZL.SALETRACE" ref="messageReceiver" />  
    </rabbit:listener-container>  
</beans>

3.在web.xml中引入配置的文件context-rabbitMq.xml

<context-param>
	<param-name>contextConfigLocation</param-name>
	<param-value>classpath:META-INF/app_config/context/context-*.xml</param-value>
</context-param>

4.自定义监听类:RabbitMqConsumerListener

自定义的监听类必须实现接口:ChannelAwareMessageListener或者MessageListener.这里最好用Channel…这个。

二者的区别可以从他们的抽象类的入参来说:一个是message(消息实体),一个是channel就是当前的通道。
ChannelAwareMessageListener的抽象类: 
	void onMessage(Message message,Channel channel) .
MessageListener接口的抽象类:  
	void onMessage(Message message).

多出的参数Channel可以很方便的提供监听之外的ack通知RabbitMq服务器功能。
由于MQ服务器对于消费端无反馈的,有重发机制,可能会有一条数据发送多次,并且一致保存在服务器中,久而久之就会由于数据量过大造成内存溢出的危险,而ack机制就是通过消费端发出通知给mq服务器,告诉服务器那条mq已经处理完毕,可以剔除。对于处理异常的,则可以重新回到mq服务器的队列中。而实现手动实现这个ack的关键点就是实现接口:ChannelAwareMessageListener.

如何实现手动ack?
手动ack就是在当前channel里面调用basic***的方法,并传入当前消息的tagId就可以了。

这里的ack通知分为三种情况:

a.消费端正常处理完成一条mq时,ack mq服务器可以移除此条mq的方法

//消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
参数解释:
deliveryTag:该消息的index
multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。

b. 消费端处理完成一条mq时发生异常,ack 会将此条mq重新放到mq服务器队列queue中

//ack返回false,此条mq并重新回到队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
参数解释:
deliveryTag:该消息的index
multiple:是否批量.true:将一次性拒绝所有小于deliveryTag的消息。
requeue:被拒绝的是否重新入队列

c.就是消费端主动拒绝mq服务器发送mq过来。

//拒绝消息
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
参数解释:
deliveryTag:该消息的index
requeue:被拒绝的是否重新入队列

channel.basicNack 与 channel.basicReject
的区别在于basicNack可以拒绝多条消息,而basicReject一次只能拒绝一条消息

5.自定义类的具体实现:

import java.io.IOException
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import com.rabbitmq.client.Channel;
import com.zlf.cn.action.IRabbitMqConsumerTaskAction;
/**
*使用监听器接收消息
*@author itw_zhanglf02
/
public class RabbitMqConsumerListener implents ChannelAwareMessageListener{
private Logger logger=loggerFactory.getLogger(RabbitMqConsumerListener.class);

@Resource(name=IRabbitMqConsumerTaskAction.ACTION_ID)
private IRabbitMqConsumerTaskAction rabbitMqConsumerTaskAction;

@Override
public void onMessage(Message arg0,Channel channel){
	//业务处理,放到action层,并返回处理成功还是异常的flag
	boolean mqFlag=rabbitMaConsumerTaskAction.saveMq(arg0);
	//还有一个点就是如何获取mq消息的报文部分message?
	//String message=new String(arg0.getBody(),"UTF-8");
	if(mqFlag){
		basicACK(arg0,channle);//处理正常--ack
	}else{
		basicNACK(arg0,channle);//处理异常--nack
	}
}
//正常消费掉后通知mq服务器移除此条mq
private void basicACK(Message message,Channel channel){
try{							channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}catch(IOException e){
	logger.error("通知服务器移除mq时异常,异常信息:"+e);
}
}
//处理异常,mq重回队列
private void basicNACK(Message message,Channel channel){
try{
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
}catch(IOExeption e){
	logger.error("mq重新进入服务器时出现异常,异常信息:"+e);
}
}
}

这样就可以了。启动项目,有时候在项目启动完毕,监听程序开始的时候报错,报错信息大致:

org.springframework.amqpAmqpIOException:java.io.IOException....
Case by:com.rabbitmq.client.ShutdownSignalException:channel error;protocol method:#method<channel.close>(reply-code=403,reply-text=ACCESS_REFUSED - access to queue 'ZL.SALETRACE' in vhost 'tele-onlineYA' refused for user 'tel-online',class-id=50,method-id=10)......

这种情况网上给的大都是由于在配置对应权限上有问题,但我这里报的同样的错误,并不是他们说的那种情况,而是由于我们这里做的监听mq服务器名为’ZL.SALETRACE’的queue,如果服务器端现在没有mq,就会报这个错,估计是没有mq,服务器暂时没有分配队列,队列就没有生成,但这个不是啥配置问题,是初始化问题。如果现在服务器端发送mq,还是能迅速的接收到发送的mq的,然后一切就正常了。
所以这个问题归结于服务端没有mq,如果有mq,还是可以正常接收,没啥问题的。这种报错可以忽略。
好了,这就是本次迭代的总结。~

最后

以上就是动人航空为你收集整理的RabbitMQ消息队列+spring监听mq服务器,接收消费mq消息的全部内容,希望文章能够帮你解决RabbitMQ消息队列+spring监听mq服务器,接收消费mq消息所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(47)

评论列表共有 0 条评论

立即
投稿
返回
顶部