概述
xml文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">
<amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://localhost:61616" userName="admin" password="admin"/>
<bean id="connectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="amqConnectionFactory" />
<!-- Session缓存数量 -->
</bean>
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
<property name="userName" value="admin"/>
<property name="password" value="admin"/>
</bean>
<!-- 定义JmsTemplate的Queue类型 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<constructor-arg ref="connectionFactory" />
<!-- 非pub/sub模型(发布/订阅),即队列模式 -->
<property name="pubSubDomain" value="false" />
</bean>
<!-- 定义Queue监听器 -->
<jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
<jms:listener destination="queue" ref="consumerMessageListener"/>
</jms:listener-container>
<!--这个是队列目的地,点对点的 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>queue</value>
</constructor-arg>
</bean>
<bean id="consumerMessageListener" class="com.ycst.craft4j.system.listener.ConsumerMessageListener">
<property name="destination" ref="queueDestination"/>
</bean>
</beans>
consumer
package com.ycst.craft4j.system.listener;
import java.sql.Timestamp;
import java.util.List;
import javax.annotation.Resource;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.scheduling.annotation.Async;
import org.springframework.transaction.annotation.Transactional;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ycst.craft4j.constants.Constants;
import com.ycst.craft4j.core.entity.LocationCity;
import com.ycst.craft4j.core.entity.OrderSupplier;
import com.ycst.craft4j.core.entity.Suppliers;
import com.ycst.craft4j.core.entity.TransportOrder;
import com.ycst.craft4j.core.mapper.CodeMstMapper;
import com.ycst.craft4j.core.mapper.OrderDistributeMapper;
import com.ycst.craft4j.core.mapper.OrderSupplierMapper;
import com.ycst.craft4j.core.mapper.SupplierLinesMapper;
import com.ycst.craft4j.core.mapper.SuppliersMapper;
import com.ycst.craft4j.core.mapper.TransportOrderMapper;
import com.ycst.craft4j.system.util.DateUtil;
public class ConsumerMessageListener implements MessageListener{
@Resource
@Qualifier("queueDestination")
private Destination destination;
@Resource
private JmsTemplate jmsTemplate;
@Resource
private TransportOrderMapper orderDao;
@Resource
private SupplierLinesMapper linesDao;
@Resource
private OrderDistributeMapper orderDistributeDao;
@Resource
private CodeMstMapper codeMstDao;
@Resource
private SuppliersMapper supplierDao;
@Resource
private OrderSupplierMapper orderSupplierDao;
@Override
@Async
@Transactional
public void onMessage(Message message){
Thread thread = new Thread();
try {
Thread.sleep(10*60*1000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}finally {
thread.interrupt();
}
//这里我们知道生产者发送的就是一个纯文本消息,所以这里可以直接进行强制转换
String data = null;
try {
data = (String)message.getObjectProperty("data");
} catch (JMSException e) {
e.printStackTrace();
}
JSONObject object = JSON.parseObject(data);
//接收到消息,判断订单供应商是否接单,返回回复消息,再进行判断是否需要重新派车
int orderId = object.getIntValue("orderId");
TransportOrder order = orderDao.queryOrderById(orderId);
int count = orderSupplierDao.countDistributeNum(orderId);
int receiveOrder = orderSupplierDao.getOrderDistributeFlg(orderId);
//失效,重新分配
LocationCity fromCity = order.getFromCity();
LocationCity toCity = order.getToCity();
List<Integer> supplierIdList = linesDao.getFitSupplierList(fromCity == null ? 0 : fromCity.getId()
,toCity == null ? 0 : toCity.getId());
//获取已经失效的供应商
List<Integer> distributingSupplierList = orderSupplierDao.getDistributingSupplier(orderId);
List<Integer> invalidSupplierList = orderSupplierDao.getInvalidSupplierList(orderId
,Constants.SUPPLY_ORDER_STATUS.INVALID
,Constants.SUPPLY_ORDER_STATUS.CANCLE);
//把无效的订单更改为已失效
for(Integer orderSupplierId : distributingSupplierList){
orderSupplierDao.updateOrderSupplier(orderId, orderSupplierId, Constants.SUPPLY_ORDER_STATUS.INVALID);
}
supplierIdList.removeAll(invalidSupplierList);
supplierIdList.removeAll(distributingSupplierList);
//派单超过3或者已经派单了
if(count >= 3 || receiveOrder != 0 || supplierIdList.size() == 0){
thread.interrupt();
return;
}else{
//继续派单
if((supplierIdList != null)&&(supplierIdList.size() != 0)){
final Suppliers suppliers = supplierDao.getFitSupplier(supplierIdList);
if(suppliers == null){
thread.interrupt();
return;
}
//派单,发送消息队列
final OrderSupplier supplier = new OrderSupplier();
supplier.setTransportOrder(order);
//CodeMst codeMst = codeMstDao.selectByCode(Constants.SUPPLY_ORDER_STATUS.NEW_ORDER);
supplier.setUstatus(null);
supplier.setSupplier(suppliers);
Timestamp now = DateUtil.getNowTimestamp();
Timestamp after = new Timestamp(now.getTime()+10*60*1000);
supplier.setExpireTime(after);
int ret = orderSupplierDao.addOrderSuppliers(supplier);
final String orderNo = order.getOrderNo();
final int oId = order.getId();
if(ret == 1){
thread.interrupt();
//发送消息队列
jmsTemplate.send(destination, new MessageCreator(){
@Override
public Message createMessage(Session session) throws JMSException {
MessageProducer producer = session.createProducer(destination);
String data = "{orderNo:"+orderNo+",orderId:"+oId+",supplierId:"+supplier.getId()+"}";
ObjectMessage objectMessage = session.createObjectMessage();
objectMessage.setObjectProperty("data", data);
long time = 10*60*1000;
//objectMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
//producer.send(objectMessage);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
return objectMessage;
}
});
}
}
}
}
public Destination getDestination() {
return destination;
}
public void setDestination(Destination destination) {
this.destination = destination;
}
}
producer,自动派车
//自动匹配
@Transactional
@Override
public void autoDistributeSupplier(int orderId){
TransportOrder order = orderDao.queryOrderById(orderId);
if(order != null){
//获取订单的出发地和目的地
LocationCity fromCity = order.getFromCity();
LocationCity toCity = order.getToCity();
//找到匹配的供应商
List<Integer> supplierIdList = supplierLinesDao.getFitSupplierList(fromCity == null ? 0 : fromCity.getId()
,toCity == null ? 0 : toCity.getId());
//获取已经失效的供应商
List<Integer> invalidSupplierList = orderSupplierDao.getInvalidSupplierList(orderId
,Constants.SUPPLY_ORDER_STATUS.INVALID
,Constants.SUPPLY_ORDER_STATUS.CANCLE);
supplierIdList.removeAll(invalidSupplierList);
if((supplierIdList != null)&&(supplierIdList.size() != 0)){
final Suppliers suppliers = supplierDao.getFitSupplier(supplierIdList);
if(suppliers == null){
return;
}
//派单,发送消息队列
OrderSupplier supplier = new OrderSupplier();
supplier.setTransportOrder(order);
supplier.setUstatus(null);
supplier.setSupplier(suppliers);
Timestamp now = DateUtil.getNowTimestamp();
Timestamp after = new Timestamp(now.getTime()+10*60*1000);
supplier.setExpireTime(after);
int ret = orderSupplierDao.addOrderSuppliers(supplier);
final String orderNo = order.getOrderNo();
final int oId = order.getId();
if(ret == 1){
OrderDetailModel model = getOrderDetailAddressMsg(orderId);
//发送短信给供应商
String message = "您有一个新的订单:"+order.getOrderNo()+",路线:"+model.getFromProvince()+model.getFromCity()+"-"+model.getToProvince()+model.getToCity()+",货品:"+model.getGoodsName()+model.getGoodsNum()+model.getUnit()+",请登录App查看,订单5分钟内有效,请及时接单。";
try {
SMSUtil.sendMessage(message,suppliers.getMobile());
} catch (IOException e) {
e.printStackTrace();
}
//推送给供应商
PushLogicModel pm = new PushLogicModel();
pm.setSupplierId(suppliers.getId());
pm.setMessage(message);
pm.setMessageCode(Constants.PUSH_MESSAGE_CODE.AUTO_DISTRICT_ORDER);
pm.setUserType(Constants.USER_TYPE.USER_TYPE_SUPPLIER);
pm.setOrderNo(orderNo);
pm.setOrderId(order.getId());
pushLogic.pushMessageLogic(pm);
//发送消息队列
jmsTemplate.send(destination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
MessageProducer producer = session.createProducer(destination);
String data = "{orderNo:"+orderNo+",orderId:"+oId+",supplierId:"+suppliers.getId()+"}";
ObjectMessage objectMessage = session.createObjectMessage();
objectMessage.setObjectProperty("data", data);
long time = 10*60 * 1000;
//objectMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
//producer.send(objectMessage);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
return objectMessage;
}
});
}
}
}
}
- 什么是activemq
activeMQ是一种开源的,实现了JMS1.1规范的,面向消息(MOM)的中间件,为应用程序提供高效的、可扩展的、稳定的和安全的企业级消息通信。
- activemq的作用以及原理
Activemq 的作用就是系统之间进行通信。 当然可以使用其他方式进行系统间通信, 如果使用 Activemq 的话可以对系统之间的调用进行解耦, 实现系统间的异步通信。 原理就是生产者生产消息, 把消息发送给activemq。 Activemq 接收到消息, 然后查看有多少个消费者, 然后把消息转发给消费者, 此过程中生产者无需参与。 消费者接收到消息后做相应的处理和生产者没有任何关系
- activemq的几种通信方式
3.1publish(发布)-subscribe(订阅)(发布-订阅方式)
发布/订阅方式用于多接收客户端的方式.作为发布订阅的方式,可能存在多个接收客户端,并且接收端客户端与发送客户端存在时间上的依赖。一个接收端只能接收他创建以后发送客户端发送的信息。作为subscriber ,在接收消息时有两种方法,destination的receive方法,和实现message listener 接口的onMessage 方法
3.2 p2p(point-to-point)(点对点)
p2p的过程则理解起来比较简单。它好比是两个人打电话,这两个人是独享这一条通信链路的。一方发送消息,另外一方接收,就这么简单。在实际应用中因为有多个用户对使用p2p的链路
相互通信的双方是通过一个类似于队列的方式来进行交流。和前面pub-sub的区别在于一个topic有一个发送者和多个接收者,而在p2p里一个queue只有一个发送者和一个接收者。
- publish(发布)-subscribe(订阅)方式的处理
发布订阅模式的通信方式, 默认情况下只通知一次, 如果接收不到此消息就没有了。 这种场景只适用于对消息送达率要求不高的情况。 如果要求消息必须送达不可以丢失的话, 需要配置持久订阅。 每个订阅端定义一个 id, <property name="clientId" 在订阅是向 activemq 注册。 发布消息 <property name="subscriptionDurable" value="true"/>和接收消息时需要配置发送模式为持久化template.setDeliveryMode(DeliveryMode.PERSISTENT);。 此时如果客户端接收不到消息, 消息会持久化到服务端(就是硬盘上), 直到客户端正常接收后为止。
- 4.2p - p(点对点)方式的处理
点对点模式的话, 如果消息发送不成功此消息默认会保存到 activemq 服务端直到有消费者将其消费, 所以此时消息是不会丢失的。
- 如何解决消息重复问题
所谓消息重复,就是消费者接收到了重复的消息,一般来说我们对于这个问题的处理要把握下面几点,
①.消息不丢失(上面已经处理了)
②.消息不重复执行
一般来说我们可以在业务段加一张表,用来存放消息是否执行成功,每次业务事物commit之后,告知服务端,已经处理过该消息,
这样即使你消息重发了,也不会导致重复处理
大致流程如下:
业务端的表记录已经处理消息的id,每次一个消息进来之前先判断该消息是否执行过,如果执行过就放弃,如果没有执行就开始执行消息,消息执行完成之后存入这个消息的id
最后
以上就是糊涂乐曲为你收集整理的springmvc结合activemq的全部内容,希望文章能够帮你解决springmvc结合activemq所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复