我是靠谱客的博主 无辜小馒头,最近开发中收集的这篇文章主要介绍activeMq部署安装及在springboot中实现消息的发布订阅,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

一、linux环境部署

1、下载安装包(地址戳这里)apache-activemq-5.16.3-bin.tar.gz并上传到云服务器。

2、解压:tar -zvxf apache-activemq-5.16.3-bin.tar.gz

3、相关配置文件

activemq.xml可以修改各协议连接的ip地址:

jetty.xml可以修改管理后台的ip及端口

jetty-realm.properties可查看账户名及密码

4、进入bin目录执行./activemq start命令启动activeMq,其他命令如下:

 5、访问http://xxxx:xxxx/8161/admin进入管理页面

注:如不能访问请开启防火墙端口,如是云服务器,只要在入方向开放对应端口就可以,更多安装及启动方式请 戳这里

二、springboot集成activemq

1、添加依赖包

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.8.0</version>
</dependency>

 2、发布者

 ActiveMqttClient(单例模式连接类)

package com.mq.server.busi.config;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.Session;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;

/**
 * ActiveMqttClient
 *
 * @author admin
 * @date 2021-11-22 17:16
 */
public class ActiveMqttClient {

    private static volatile ActiveMqttClient instance = null;
    private static TopicConnection connection = null;
    /**
     * 默认用户名
     */
    private static final String USERNAME = env("ACTIVEMQ_USER", "admin");
    /**
     * 默认密码
     */
    private static final String PASSWORD = env("ACTIVEMQ_PASSWORD", "password");
    /**
     * 默认连接地址
     */
    private static final String HOST = env("ACTIVEMQ_HOST", "118.31.168.121");
    /**
     * 端口
     */
    private static final int PORT = Integer.parseInt(env("ACTIVEMQ_PORT", "61616"));

    /**
     * 构造函数
     */
    private ActiveMqttClient() {
        try {
            //创建连接工厂
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://" + HOST + ":" + PORT);
            //创建连接
            connection = connectionFactory.createTopicConnection(USERNAME, PASSWORD);
            //开启连接
            connection.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static synchronized ActiveMqttClient getInstance() {
        if (instance == null) {
            instance = new ActiveMqttClient();
        }
        return instance;
    }

    /**
     * 获取会话
     * @return
     */
    public TopicSession getSession() {
        try {
            //创建会话,不需要事务则传入false,注释session.commit(),如果是需要事务则传入true,放开session.commit()
            return connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    private static String env(String key, String defaultValue) {
        String rc = System.getenv(key);
        if( rc== null ) {
            return defaultValue;
        }
        return rc;
    }
}
MqServerBusiServiceImpl(发送消息)
package com.mq.server.busi.service.impl;

import com.mq.server.busi.config.ActiveMqttClient;
import com.mq.server.busi.service.IMqServerBusiService;
import org.springframework.stereotype.Service;

import javax.jms.MapMessage;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;

/**
 * MqServerBusiServiceImpl
 *
 * @author admin
 * @date 2021-11-22 17:39
 */
@Service
public class MqServerBusiServiceImpl implements IMqServerBusiService {

    private ActiveMqttClient activeMqttClient = ActiveMqttClient.getInstance();

    @Override
    public void sendMessage(String msg) {
        try {
            TopicSession session = activeMqttClient.getSession();
            //创建topic
            Topic topic = session.createTopic("topic-test");
            TopicPublisher publisher = session.createPublisher(topic);
            String message1 = "发送消息:测试activemq用mqtt协议以Topic主题发布和订阅方式发送消息";
            MapMessage message = session.createMapMessage();
            message.setString("text", message1);
            publisher.send(message);
            session.commit();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

3、消费者

package com.sys.server.busi.config;

import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQMapMessage;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;

import javax.jms.*;
import java.util.Map;

/**
 * ActiveMqttConsume
 *
 * @author admin
 * @date 2021-11-23 17:19
 */
@Component
public class ActiveMqttConsume implements ApplicationListener<ContextRefreshedEvent> {

    private ActiveMqttClient activeMqttClient = ActiveMqttClient.getInstance();


    public void reveiveMessage() {
        try {
            TopicSession session = activeMqttClient.getSession();
            //创建topic
            Topic topic = session.createTopic("topic-test");
            MessageConsumer consumer = session.createConsumer(topic);
            do {
                Message msg = consumer.receive();
                if (msg instanceof TextMessage) {
                    String body = ((TextMessage) msg).getText();
                    System.out.println(body);
                } if (msg instanceof ActiveMQMapMessage) {
                    Map body = ((ActiveMQMapMessage) msg).getContentMap();
                    System.out.println(body);
                }  if (msg instanceof ActiveMQBytesMessage) {
                    String body = ((ActiveMQBytesMessage) msg).readUTF();
                    System.out.println(body);
                } else {
                    System.out.println("Unexpected message type: " + msg.getClass());
                }
            } while (true);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        this.reveiveMessage();
    }
}

启动服务验证

PS:大家可以参考部署那一块examples下面的demo,里面有各个协议各种语言的demo;另外,activeMq也可以换成apollo,apollo 是 ActiveMQ的子工程,是 ActiveMQ的下一代消息代理,是一个更快、更可靠、更容易维护的消息代理。

apollo安装部署

1、下载安装包apache-apollo-1.7.1-unix-distro.tar.gz并上传到云服务器。

2、解压安装包 tar zvxf apache-apollo-1.7.1-unix-distro.tar.gz

3、在 /apache-apollo-1.7.1/bin下执行 ./apollo create mqttbroker

4、进入上面红框所示的目录下执行./apollo-broker run &启动apollo

5、进入mqttbroker目录下的etc目录里面,修改apollo.xml配置文件里面的ip地址,重启apollo即可

6、开放端口访问http://xxxx:61680/

其他概念:

消息组件:activemq、rocketmq、rabbitmq、kafka等,类比于mqsql、Oracle

通信协议:tcp、amqp、stomp、mqtt等属于通信协议

消息标准:JMS(面向消息中间件(MOM)的API),可类比于JDBC

activemq默认使用tcp协议,及配置文件里面的openwire

最后

以上就是无辜小馒头为你收集整理的activeMq部署安装及在springboot中实现消息的发布订阅的全部内容,希望文章能够帮你解决activeMq部署安装及在springboot中实现消息的发布订阅所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(79)

评论列表共有 0 条评论

立即
投稿
返回
顶部