我是靠谱客的博主 彩色爆米花,最近开发中收集的这篇文章主要介绍SpringBoot整合 ActiveMQ快速入门 实现点对点推送,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

ActiveMQ是一个高性能的消息服务, 它已经实现JMS接口(Java消息服务(Java Message Service),Java平台中关于面向消息中间件的接口), 所以我们可以直接在 Java 中使用

使用场景: 多项目解耦合、分布式事务、流量控制等等

JMS里面有一些概念, 我们提前说明一下

  • JMS提供者:Apache ActiveMQ、RabbitMQ、Kafka、Notify、MetaQ、RocketMQ
  • JMS生产者(Message Producer)
  • JMS消费者(Message Consumer)
  • JMS消息
  • JMS队列
  • JMS主题

JMS消息通常有两种类型:点对点(Point-to-Point)发布/订阅(Publish/Subscribe)

编程模型

  • ConnectionFactory :连接工厂,JMS 用它创建连接
  • Connection :JMS 客户端到JMS Provider 的连接
  • Session: 一个发送或接收消息的线程
  • Destination :消息的目的地;消息发送给谁.
  • MessageConsumer / MessageProducer: 消息接收者,消费者

快速安装

下载地址

cd bin/linux-x86-64/
./activemq start # 启动

启动之后可以看看图形界面版 localhots:8161

点击manager, 默认账号密码是 admin

SpringBoot配置activeMq

  • 增加依赖 pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!--
连接池-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>
  • 配置连接 application.properties
# 连接地址 默认连接端口为 61616
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin
# 开启连接池 最大连接数100
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=100
  • 启动文件配置 xxxApplication.java
package com.example.demo;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.servlet.MultipartConfigFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.util.unit.DataSize;
import javax.jms.ConnectionFactory;
import javax.jms.Queue;
import javax.servlet.MultipartConfigElement;
@SpringBootApplication
@MapperScan("com.example.demo.mapper")
// 直至mapper
@EnableJms // 开启jms
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
// 配置默认队列
@Bean
public Queue queue(){
return new ActiveMQQueue("common.queue");
}
@Bean
public ConnectionFactory connectionFactory(){
System.out.println("创建连接");
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL("tcp://localhost:61616");
connectionFactory.setUserName("admin");
connectionFactory.setPassword("admin");
return connectionFactory;
}
@Bean
public JmsTemplate genJmsTemplate(){
return new JmsTemplate(connectionFactory());
}
@Bean
public JmsMessagingTemplate jmsMessagingTemplate() {
return new JmsMessagingTemplate(connectionFactory());
}
  • 创建消息生产者接口 ProducerService.java
package com.example.demo.service;
import javax.jms.Destination;
/**
* 消息生产者
*/
public interface ProducerService {
/**
*指定消息队列 、消息
* @param destination
* @param message
*/
public void sendMessage(Destination destination, final String message);
/**
* 使用默认消息队列发送消息
* @param message
*/
public void sendMessage(final String message);
}
  • 创建消息生产者实现 ProducerServiceImpl.java
package com.example.demo.service.impl;
import com.example.demo.service.ProducerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;
import javax.jms.Queue;
@Service
public class ProducerServiceImpl implements ProducerService {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Queue queue;
// 发送消息 destination是发送到队列 message是待发送的消息
@Override
public void sendMessage(javax.jms.Destination destination, String message) {
jmsMessagingTemplate.convertAndSend(destination, message);
}
@Override
public void sendMessage(String message) {
jmsMessagingTemplate.convertAndSend(this.queue, message);
}
}
  • 消息消费者 OrderConsumer.java
package com.example.demo.jms;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class OrderConsumer {
@JmsListener(destination = "order.queue")
public void receiveQueue(String text) {
System.out.println("接受到的消息 ====" + text);
}
}
  • 控制器 OrderController.java
package com.example.demo.controller;
import com.example.demo.bean.JsonUtils;
import com.example.demo.service.ProducerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.jms.Destination;
@RestController
@RequestMapping("/api/v1/mq")
public class OrderController {
@Autowired
private ProducerService producerService;
@GetMapping("order")
public Object order(String msg) {
Destination destination = new ActiveMQQueue("order.queue");
producerService.sendMessage(destination, msg);
return JsonUtils.buildSuccess("ok");
}
@GetMapping("common")
public Object common(String msg) {
producerService.sendMessage(msg);
return JsonUtils.buildSuccess("ok");
}
}
  • 消息产生

由于common队列我们并没有去创建消费者, 所以消息会堆积,但是order对列是有消费者的, 来让我们看看情况

正常项目肯定生产者和消费者不在同一个项目里面, 这里只是简单演示一下。

本文为作者原创,手码不易,允许转载,转载后请以链接形式说明文章出处。

最后

以上就是彩色爆米花为你收集整理的SpringBoot整合 ActiveMQ快速入门 实现点对点推送的全部内容,希望文章能够帮你解决SpringBoot整合 ActiveMQ快速入门 实现点对点推送所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(54)

评论列表共有 0 条评论

立即
投稿
返回
顶部