我是靠谱客的博主 兴奋石头,最近开发中收集的这篇文章主要介绍Mqtt连接管理工具(Maven),需Mysql。多MQTT连接多Topic,在线连接,在线重启,断线重连······MQTT连接操作步骤,觉得挺不错的,现在分享给大家,希望可以做个参考。
概述
MQTT连接操作步骤
前提
当你还在为连接MQTT的繁琐配置而烦恼?当你还在为添加或者删除MQTT连接/主题而一遍遍修改代码发布?快使用如下连接管理工具吧!~
一.环境准备
- 数据库准备,库名:dlg_mqtt_manage (库/表已有请忽略)
CREATE TABLE `dmm_mqtt_server_data` ( `id` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '唯一标识 唯一标识', `server_host` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '服务器ip', `server_port` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '端口', `username` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '用户名', `password` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '密码', `client_id` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '客户端id,自动添加三位随机数', `client_topic` varchar(3072) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '客户端订阅的主题(以','隔开)', `client_clean_session` char(1) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '客户端每次重连是否清除session, 0:否 1:是', `remark` varchar(1024) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '备注', `data_from` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '数据来源', `app_name` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '应用名称 (配置文件中的spring.application.name)', `pub_code` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '发布标识符,需要以此标识获取需要发送的消息的服务', `owner` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '负责人', `org_id` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '机构ID', `created_by` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '创建人', `created_on` datetime(0) NULL DEFAULT NULL COMMENT '创建时间', `modify_by` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '更新人', `modify_on` datetime(0) NULL DEFAULT NULL COMMENT '更新时间', `is_valid` char(1) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT '0' COMMENT '是否有效 0有效;1无效', PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = ' ' ROW_FORMAT = Dynamic;
- 根据提供的MQTT服务器信息填入数据库表中,需要注意的是 is_valid填入有效 0,app_name填你的项目的applicationName,data_from可填入信息来源eg:THDS,client_topic填入需要订阅的MQTT_TOPIC,多个以逗号(,)分割。其他根据表中内容填写
- 在需要接入MQTT的项目(已能连通数据库)的POM中添加如下maven包(若阿里云源没有可切换至maven自带源)
<dependency> <groupId>com.dlg-tec</groupId> <artifactId>dlg-mqtt-spring-boot-starter</artifactId> <version>(Latest-Version)</version> </dependency>
二.使用
-
**【消息接收】**创建回调函数类接收mqtt信息 代码如下
注意点1:该类需注册成bean,继承MyMqttClientCallback,并实现messageArrived方法
注意点2:!!!若是需要注入使用其他Spring管理的Bean,请带上@Need2Inject注解,beanName中填入此Bean的BeanName!!! (ps:如果你是用 xxxservice的形式注入,若是没有指定名字你的BeanName就是xxxserviceImpl)
注意点3:重要的事情说三遍,错误请try_catch,不要抛出!!! 错误请try_catch,不要抛出!!! 错误请try_catch,不要抛出!!!
(不然会导致连接断开,触发重连,然后继续断开,重连,断开,重连~)
// 该类需注册成bean,继承MyMqttClientCallback,并实现messageArrived方法 @Component @Slf4j public class MyCallBack extends MyMqttClientCallback { @Need2Inject(beanName = "testService") @Resource public TestService testService; /** * 数据接收处 * @param topic 订阅的topic * @param message 订阅到的消息内容 */ @Override public void messageArrived(String topic, MqttMessage message) throws Exception { try { // 该类为该topic对应mqtt服务器信息 DmmMqttServerData mqttClient = getMqttClient(); log.info("sss收到来自【 " + mqttClient.getDataFrom() + "】 topic:【 " + topic + "】 的消息:{}", new String(message.getPayload())); } catch (Exception e) { log.info("错误请自行处理,不要抛出"); } } }
-
【消息发送】 注入MqttPubUtil,并调用相关方法
@Component public class TestMqttPub { @Resource private MqttPubUtil mqttPubUtil; /** * * @param pubCode 发布标识,需通过此标识找到对应服务器进行发送。建议以enum形式保存 * @param topic 发布主题 * @param message 发布内容 * @throws Exception 错误信息 */ public void pub(String pubCode,String topic,String message) throws Exception { mqttPubUtil.pubMessage(pubCode,topic,message); } }
三.拓展
若需要调用接口/指定连接等其他形式的订阅重新加载 注入后调用实现
/**
* <p>
* 服务类
* </p>
*
* @author 格兰德法则·祝
* @since 2021-12-20
*/
public interface DmmMqttServerDataService {
/**
* 全部连接刷新配置并重新连接
* @throws Exception 错误信息
*/
void reLinkAll() throws Exception;
/**
* 刷新指定配置通过ID
* @param id 数据库中保存的ID
* @throws Exception 错误信息
*/
void reLinkById(String id) throws Exception;
/**
* 刷新指定配置通过Code
* @param pubCode
* @throws Exception 错误信息
*/
void reLinkByPubCode(String pubCode) throws Exception;
}
四.版本
【可用】版本 | 描述(请使用最新版) |
---|---|
【1.0.5】 | 更新以前版本存在问题,去除不必要maven依赖,增加断线重连功能,增加消息回调注入功能 |
【1.0.6】 | 更新业务(刷新所有连接方法,刷新指定ID/pubCode连接) |
【1.0.6】 | 更新业务(刷新所有连接方法,刷新指定ID/pubCode连接) |
【1.0.7】 | 修改断线重连时长等问题 |
【1.0.8】 | 由于未知原因,还是会出现断线重连失败的可能,开放继承断线重连接口,在callBack中可自行实现断线重连(重写),断线后重连失败可添加发送消息等通知手段 |
【1.0.9】 | 增加线程连接,由MQTT服务端导致的服务拒绝连接重连无法成功 |
最后
以上就是兴奋石头为你收集整理的Mqtt连接管理工具(Maven),需Mysql。多MQTT连接多Topic,在线连接,在线重启,断线重连······MQTT连接操作步骤的全部内容,希望文章能够帮你解决Mqtt连接管理工具(Maven),需Mysql。多MQTT连接多Topic,在线连接,在线重启,断线重连······MQTT连接操作步骤所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复