概述
springboot整合mqtt客户端订阅接收信息
应用场景:从通讯管理集中订阅终端设备数据信息
1、引入jar包
<!-- mqtt -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
2、properties中配置mqtt的基本连接信息
#mqtt配置 - start
#用户名
spring.mqtt.username = anTest
#密码
spring.mqtt.password = an123456
#服务器连接地址
spring.mqtt.url = tcp://locahost:61613
#默认的消息推送主题 topIc
spring.mqtt.default.topic = test/#
#连接超时
spring.mqtt.completionTimeout=3000
#mqtt配置 - end
3、编写mqtt消息接收处理类MQTTSubsribe
@Component
public class MQTTSubsribe {
@Value("${spring.mqtt.password}")
private String password;
@Value("${spring.mqtt.username}")
private String username;
@Value("${spring.mqtt.url}")
private String url;
//配置中的topic
@Value("${spring.mqtt.default.topic}")
private String defaultTopic;
// 连接超时时间
@Value("${spring.mqtt.completionTimeout}")
private int completionTimeout ;
private String[] topics;
private MqttClient client;
private MqttConnectOptions mqttConnectOptions;
@Autowired
private PushCallback pushCallback;
private ScheduledExecutorService scheduled;
public void startReconnect(){
this.scheduled = Executors.newSingleThreadScheduledExecutor();
// 定时任务——重新连接mqtt服务器
this.scheduled.scheduleAtFixedRate(new Runnable() {
public void run() {
if (!MQTTSubsribe.this.client.isConnected()) {
try {
MQTTSubsribe.this.client.connect(MQTTSubsribe.this.mqttConnectOptions);
System.out.println("---mqtt已经重新连接上---");
int[] Qos = new int[]{1};
MQTTSubsribe.this.client.subscribe(MQTTSubsribe.this.topics, Qos);
} catch (MqttSecurityException var2) {
var2.printStackTrace();
} catch (MqttException var3) {
var3.printStackTrace();
}
}
}
},5000L,10000L, TimeUnit.MILLISECONDS);
}
// 对mqttConnectOptions对象的常规设置
public MqttConnectOptions getMqttConnectOptions(){
this.mqttConnectOptions = new MqttConnectOptions();
this.mqttConnectOptions.setCleanSession(true);
this.mqttConnectOptions.setUserName(username);
this.mqttConnectOptions.setPassword(password.toCharArray());
this.mqttConnectOptions.setServerURIs(new String[]{url});
this.mqttConnectOptions.setConnectionTimeout(completionTimeout);
this.mqttConnectOptions.setKeepAliveInterval(2000);
return mqttConnectOptions;
}
// 连接mqtt服务器订阅信息方法
// topic也可作为参数传入
public void start(String topic) {
try {
this.client = new MqttClient(url, getClientId(), new MemoryPersistence());
this.getMqttConnectOptions();
this.client.setCallback(this.pushCallback);
// MqttTopic topicMq = client.getTopic(defaultTopic);
this.client.connect(this.mqttConnectOptions);
// topics = topic.toString().split(",");
//订阅消息
int[] Qos = {1};
// 可将订阅的一个或多个topic都存入数组中,同时订阅
// String[] topic1 = {defaultTopic};
topics = new String[]{defaultTopic};
this.client.subscribe(topics, Qos);
boolean connected = this.client.isConnected();
System.out.println("连接状态为:"+connected);
String flag = connected ? "成功" : "失败";
} catch (MqttException e) {
e.printStackTrace();
}
}
// 随机生成唯一client.id方法
public String getClientId()
{
String nums = "";
String[] codeChars = { "0", "1", "2", "3", "4", "5", "6", "7", "8", "9",
"a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z",
"A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M", "N", "O", "P", "Q", "R", "S", "T", "U", "V", "W", "X", "Y", "Z" };
for (int i = 0; i < 23; i++)
{
int charNum = (int)Math.floor(Math.random() * codeChars.length);
nums = nums + codeChars[charNum];
}
return nums;
}
4、mqtt回调类PushCallback
@Configuration
public class PushCallback implements MqttCallback {
@Autowired
private MQTTSubsribe mqttSubsribe;
@Autowired
private RedisTemplate redisTemplate;
// 定时任务——定时缓存查询的数据
private ScheduledExecutorService scheduled;
@Override
public void connectionLost(Throwable throwable) {
// 连接丢失后,一般在这里面进行重连
System.out.println("连接断开,可以做重连");
this.mqttSubsribe.startReconnect();
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// subscribe后得到的消息会执行到这里面
System.out.println("接收消息主题 : " + topic);
System.out.println("接收消息Qos : " + message.getQos());
System.out.println("接收消息内容 : " + new String(message.getPayload()));
String[] topics = topic.split("/");
this.scheduled = Executors.newSingleThreadScheduledExecutor();
//周期定时方法,可以在里面进行定时数据存储操作,我测试时是先将数据存储到了redis中,可做实时数据来用
this.scheduled.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
String key = topics[0] + topics[1];
String value = new String(message.getPayload());
if (redisTemplate.hasKey(key)){
redisTemplate.delete(key);
}
redisTemplate.opsForValue().set(key,value);
System.out.println("redis缓存数据"+value);
//下面是我对数据的一些处理,仅供参考
JSONObject jsonObject = JSONObject.parseObject(value);
Map map = jsonObject;
List<Map<String,Object>> list = (List<Map<String, Object>>) ((Map) map.get("lines")).get("data");
System.out.println("-----------------------------"+list);
}
//此处 120为每120秒执行一次
},0,120, TimeUnit.SECONDS);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
}
测试调用:
我是在controller类中写了一个方法测试,实际项目中,可以在监听中调用MQTTSubsribe的start方法,每次程序启动时调用一次
@RequestMapping(value = "/insertByMq",method = RequestMethod.GET)
@ApiOperation(value = "订阅mqtt---测试")
public Result insertByMq(){
mqtt.start("anjia");
return new Result(ResultCodeEnum.SUCCESS);
}
以上亲测,欢迎交流
最后
以上就是坚定人生为你收集整理的springboot整合mqtt客户端订阅信息的全部内容,希望文章能够帮你解决springboot整合mqtt客户端订阅信息所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复