概述
一、linux环境部署
1、下载安装包(地址戳这里)apache-activemq-5.16.3-bin.tar.gz并上传到云服务器。
2、解压:tar -zvxf apache-activemq-5.16.3-bin.tar.gz
3、相关配置文件
activemq.xml可以修改各协议连接的ip地址:
jetty.xml可以修改管理后台的ip及端口
jetty-realm.properties可查看账户名及密码
4、进入bin目录执行./activemq start命令启动activeMq,其他命令如下:
5、访问http://xxxx:xxxx/8161/admin进入管理页面
注:如不能访问请开启防火墙端口,如是云服务器,只要在入方向开放对应端口就可以,更多安装及启动方式请 戳这里
二、springboot集成activemq
1、添加依赖包
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.8.0</version>
</dependency>
2、发布者
ActiveMqttClient(单例模式连接类)
package com.mq.server.busi.config;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.Session;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
/**
* ActiveMqttClient
*
* @author admin
* @date 2021-11-22 17:16
*/
public class ActiveMqttClient {
private static volatile ActiveMqttClient instance = null;
private static TopicConnection connection = null;
/**
* 默认用户名
*/
private static final String USERNAME = env("ACTIVEMQ_USER", "admin");
/**
* 默认密码
*/
private static final String PASSWORD = env("ACTIVEMQ_PASSWORD", "password");
/**
* 默认连接地址
*/
private static final String HOST = env("ACTIVEMQ_HOST", "118.31.168.121");
/**
* 端口
*/
private static final int PORT = Integer.parseInt(env("ACTIVEMQ_PORT", "61616"));
/**
* 构造函数
*/
private ActiveMqttClient() {
try {
//创建连接工厂
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://" + HOST + ":" + PORT);
//创建连接
connection = connectionFactory.createTopicConnection(USERNAME, PASSWORD);
//开启连接
connection.start();
} catch (Exception e) {
e.printStackTrace();
}
}
public static synchronized ActiveMqttClient getInstance() {
if (instance == null) {
instance = new ActiveMqttClient();
}
return instance;
}
/**
* 获取会话
* @return
*/
public TopicSession getSession() {
try {
//创建会话,不需要事务则传入false,注释session.commit(),如果是需要事务则传入true,放开session.commit()
return connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
private static String env(String key, String defaultValue) {
String rc = System.getenv(key);
if( rc== null ) {
return defaultValue;
}
return rc;
}
}
MqServerBusiServiceImpl(发送消息)
package com.mq.server.busi.service.impl;
import com.mq.server.busi.config.ActiveMqttClient;
import com.mq.server.busi.service.IMqServerBusiService;
import org.springframework.stereotype.Service;
import javax.jms.MapMessage;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
/**
* MqServerBusiServiceImpl
*
* @author admin
* @date 2021-11-22 17:39
*/
@Service
public class MqServerBusiServiceImpl implements IMqServerBusiService {
private ActiveMqttClient activeMqttClient = ActiveMqttClient.getInstance();
@Override
public void sendMessage(String msg) {
try {
TopicSession session = activeMqttClient.getSession();
//创建topic
Topic topic = session.createTopic("topic-test");
TopicPublisher publisher = session.createPublisher(topic);
String message1 = "发送消息:测试activemq用mqtt协议以Topic主题发布和订阅方式发送消息";
MapMessage message = session.createMapMessage();
message.setString("text", message1);
publisher.send(message);
session.commit();
} catch (Exception e) {
e.printStackTrace();
}
}
}
3、消费者
package com.sys.server.busi.config;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQMapMessage;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;
import javax.jms.*;
import java.util.Map;
/**
* ActiveMqttConsume
*
* @author admin
* @date 2021-11-23 17:19
*/
@Component
public class ActiveMqttConsume implements ApplicationListener<ContextRefreshedEvent> {
private ActiveMqttClient activeMqttClient = ActiveMqttClient.getInstance();
public void reveiveMessage() {
try {
TopicSession session = activeMqttClient.getSession();
//创建topic
Topic topic = session.createTopic("topic-test");
MessageConsumer consumer = session.createConsumer(topic);
do {
Message msg = consumer.receive();
if (msg instanceof TextMessage) {
String body = ((TextMessage) msg).getText();
System.out.println(body);
} if (msg instanceof ActiveMQMapMessage) {
Map body = ((ActiveMQMapMessage) msg).getContentMap();
System.out.println(body);
} if (msg instanceof ActiveMQBytesMessage) {
String body = ((ActiveMQBytesMessage) msg).readUTF();
System.out.println(body);
} else {
System.out.println("Unexpected message type: " + msg.getClass());
}
} while (true);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
this.reveiveMessage();
}
}
启动服务验证
PS:大家可以参考部署那一块examples下面的demo,里面有各个协议各种语言的demo;另外,activeMq也可以换成apollo,apollo 是 ActiveMQ的子工程,是 ActiveMQ的下一代消息代理,是一个更快、更可靠、更容易维护的消息代理。
apollo安装部署
1、下载安装包apache-apollo-1.7.1-unix-distro.tar.gz并上传到云服务器。
2、解压安装包 tar zvxf apache-apollo-1.7.1-unix-distro.tar.gz
3、在 /apache-apollo-1.7.1/bin下执行 ./apollo create mqttbroker
4、进入上面红框所示的目录下执行./apollo-broker run &启动apollo
5、进入mqttbroker目录下的etc目录里面,修改apollo.xml配置文件里面的ip地址,重启apollo即可
6、开放端口访问http://xxxx:61680/
其他概念:
消息组件:activemq、rocketmq、rabbitmq、kafka等,类比于mqsql、Oracle
通信协议:tcp、amqp、stomp、mqtt等属于通信协议
消息标准:JMS(面向消息中间件(MOM)的API),可类比于JDBC
activemq默认使用tcp协议,及配置文件里面的openwire
最后
以上就是无辜小馒头为你收集整理的activeMq部署安装及在springboot中实现消息的发布订阅的全部内容,希望文章能够帮你解决activeMq部署安装及在springboot中实现消息的发布订阅所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复