概述
一、将设备消息添加到MQ中
1、配置规则引擎
分别将设备生命周期事件
和设备数据点消息
都连接上同一个topic
如图,两个规则,一个是设备生命周期事件,也就是设备创建、删除、上线、离线记录;另一个是设备数据点消息
2、本地代码订阅消息
在设备连接的MQ消息topic中添加订阅yuyun-topic-data
,用于本地工程订阅MQ发送的消息
本地代码添加订阅:
运行demo,如下如所示,一直接收数据就算订阅成功了
二、数据消息格式
1、设备数据点消息
参数 | 属性 | 类型 | 说明 | 示例 |
---|---|---|---|---|
sysProperty | messageType | string | 消息类型:固定为deviceDatapoint | |
productId | string | 产品ID | 90273 | |
appProperty | deviceId | string | 设备ID | 102839 |
dataTimestamp | int | 设备数据点生产时间戳,单位毫秒,设备上传时可自定义携带 | 15980987429000 | |
datastream | string | 数据流名称 | weather | |
body | object/string/… | 详细的数据点消息内容 | 见如下示例 |
本地接收到的数据实例:
{
"sysProperty": {
"messageType": "deviceDatapoint",
"productId": "90273",
},
"appProperty":{
"deviceId": "102839",
"dataTimestamp": 15980987429000,
"datastream":"weather"
},
"body":{
"temperature": 30,
"humidity": "47%"
}
}
数据中有一个body
参数,里面的数据类型可以是:JSON、数值、字符串和二进制
"body":{
"temperature": 30,
"humidity": "47%"
}
"body": 10
"body": 11.55
// String类型
"body":"sunny with wind"
- 数据格式为二进制数据时,body中数据为二进制数据的索引号 index,示例如下,用户可以通过该索引号通过API获取该数据
"body":{
"index": "3491506_1475204886914_bin"
}
2、设备生命周期事件消息
参数 | 属性 | 类型 | 说明 | 示例 |
---|---|---|---|---|
sysProperty | messageType | string | 消息类型:固定为deviceLifeCycle | |
productId | string | 产品ID | 90273 | |
appProperty | deviceId | string | 设备ID | 102839 |
dataTimestamp | int | 设备消息生产时间戳,单位毫秒 | 15980987429000 | |
body | object | 创建、删除、上线、离线 created/deleted/online/offline | 示例如下 |
{
"sysProperty": {
"messageType": "deviceLifeCycle",
"productId": "90273",
},
"appProperty":{
"deviceId": "102839",
"dataTimestamp": 15980987429000,
},
"body":{
"event": "online"
}
}
三、新建数据库表
新建两个表,一个存放设备生命周期事件,一个存放设备数据点消息
CREATE TABLE `device_data` (
`id` bigint NOT NULL AUTO_INCREMENT,
`product_id` varchar(20) CHARACTER NULL DEFAULT NULL COMMENT '产品ID',
`device_id` varchar(20) CHARACTER NULL DEFAULT NULL COMMENT '设备ID',
`data_stream` varchar(20) CHARACTER NULL DEFAULT NULL COMMENT '数据流名称',
`data_value` double(9, 2) NULL DEFAULT NULL COMMENT '数据',
`date_time` datetime NULL DEFAULT NULL COMMENT '设备数据点产生时间',
PRIMARY KEY (`id`) USING BTREE,
INDEX `product_device_id`(`product_id`, `device_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci COMMENT = '设备数据点消息' ROW_FORMAT = DYNAMIC;
CREATE TABLE `device_event` (
`id` bigint NOT NULL AUTO_INCREMENT,
`product_id` varchar(20) CHARACTER NULL DEFAULT NULL COMMENT '产品ID',
`device_id` varchar(20) CHARACTER NULL DEFAULT NULL COMMENT '设备ID',
`date_time` datetime NULL DEFAULT NULL COMMENT '时间',
`event` varchar(10) CHARACTER NULL DEFAULT NULL COMMENT '生命周期事件',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci COMMENT = '设备生命周期事件表' ROW_FORMAT = DYNAMIC;
四、创建SpringBoot+Mybatis-Plus项目
创建项目可以参照:新建一个Spring Boot+MyBatis-Plus项目
最终建成的项目结构如下:
新建一个mq包,将配置好的demo中的东西搬过来,配置方法链接:https://yuyun.blog.csdn.net/article/details/122960344
搬过来之后就是这样:
此时pom.xml文件内容如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.1</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.yuyun</groupId>
<artifactId>springboot-onenet</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>springboot-onenet</name>
<description>springboot-onenet</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--生成实体成get set-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!-- mybatisPlus 核心库 -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.4.1</version>
<optional>true</optional>
</dependency>
<!-- MySQL驱动 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- onenet -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.8.0</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>bouncycastle</groupId>
<artifactId>bcprov-jdk15</artifactId>
<version>140</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.78</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
五、运行项目
前面运行的demo中,运行的是MQClient.java文件中的main方法,而main方法中的内容为:
public static void main(String[] args) {
MqClient mqClient = new MqClient();
mqClient.connect();
}
demo运行的时候一直在接收onenet消息队列MQ生产的消息,SpringBoot要达到这种一直运行着的效果,将调用方法放入启动类最容易实现,代码如下:
package com.yuyun;
import com.yuyun.mq.MqClient;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author hyh
*/
@MapperScan("com.yuyun.mapper")
@SpringBootApplication
public class SpringbootOneNetApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootOneNetApplication.class, args);
MqClient mqClient = new MqClient();
mqClient.connect();
}
}
启动SpringBoot项目,出现如下图所示的效果就算运行成功了
此时运行linux平台的模拟设备,其自动向onenet平台发送消息,onenet平台又会通过MQ来发送消息,SpringBoot项目接收到消息:
六、数据存入数据库
1、新建一个Service用来处理消息
接收到的消息是json格式的,解析这个格式的数据我用的是alibaba的fastjson。当messageType为deviceDatapoint
是数据点消息,deviceLifeCycle
是设备生命周期事件。
package com.yuyun.service.impl;
import com.alibaba.fastjson.JSONObject;
import com.yuyun.dto.DeviceDataDTO;
import com.yuyun.dto.DeviceEventDTO;
import com.yuyun.service.DeviceDataService;
import com.yuyun.service.DeviceEventService;
import com.yuyun.service.MessageProcessService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Date;
/**
* @author hyh
*/
@Slf4j
@Service
public class MessageProcessServiceImpl implements MessageProcessService {
@Autowired
private DeviceEventService deviceEventService;
@Autowired
private DeviceDataService deviceDataService;
@Override
public void messageProcess(String body) {
JSONObject obj = JSONObject.parseObject(body);
//产品信息
JSONObject product = obj.getJSONObject("sysProperty");
//产品id
String productId = product.getString("productId");
// 消息类型
String messageType = product.getString("messageType");
//数据体
JSONObject appProperty = obj.getJSONObject("appProperty");
//数据上传时间
Date time = new Date(appProperty.getLong("dataTimestamp"));
//设备id
String deviceId = appProperty.getString("deviceId");
// 生命周期事件
if ("deviceLifeCycle".equals(messageType)){
JSONObject bodyJson = obj.getJSONObject("body");
String event = bodyJson.getString("event");
DeviceEventDTO deviceEvent = new DeviceEventDTO();
deviceEvent.setProductId(productId);
deviceEvent.setDeviceId(deviceId);
deviceEvent.setDateTime(time);
deviceEvent.setEvent(event);
// 存储到数据库
deviceEventService.save(deviceEvent);
}
// 数据点消息
else if ("deviceDatapoint".equals(messageType)){
// 数据流名称
String dataStream = appProperty.getString("datastream");
Double dataValue = obj.getDouble("body");
DeviceDataDTO deviceDataDTO = new DeviceDataDTO();
deviceDataDTO.setProductId(productId);
deviceDataDTO.setDeviceId(deviceId);
deviceDataDTO.setDataStream(dataStream);
deviceDataDTO.setDataValue(dataValue);
deviceDataDTO.setDateTime(time);
// 存储到数据库
deviceDataService.save(deviceDataDTO);
} else {
log.error("未知消息类型的数据:" + body);
}
}
}
2、调用该service方法
demo中提供了一个消息消费方法,PushCallback
类中的messageArrived
方法:
就在这里调用处理消息的service就行
(1)在PushCallback
和MqClient
类上面加上注解@Component
(2)PushCallback
类中引入MessageProcessService
@Autowired
protected MessageProcessService messageProcessService;
private static PushCallback pushCallback;
(3)添加代码
/**
* 通过PostConstruct实现初始化bean之前进行的操作
*/
@PostConstruct
public void init() {
pushCallback = this;
pushCallback.messageProcessService = this.messageProcessService;
}
(4)调用service类的数据处理方法
// 消息处理,存储到数据库
pushCallback.messageProcessService.messageProcess(body);
最终代码如下:
package com.yuyun.mq;
import com.yuyun.service.MessageProcessService;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.text.SimpleDateFormat;
import java.util.logging.Logger;
@Component
public class PushCallback implements MqttCallback {
@Autowired
protected MessageProcessService messageProcessService;
private static PushCallback pushCallback;
private IMqttAsyncClient Client;
private static final Logger logger = Logger.getLogger(PushCallback.class.getCanonicalName());
private MqClient mqClient;
private int reConnTimes = 0;
/**
* 通过PostConstruct实现初始化bean之前进行的操作
*/
@PostConstruct
public void init() {
pushCallback = this;
pushCallback.messageProcessService = this.messageProcessService;
}
public PushCallback(MqClient client) {
mqClient = client;
}
@Override
public void connectionLost(Throwable cause) {
logger.info("connect is losted,and try to reconnect,cause = " + cause.getMessage() );
cause.printStackTrace();
while(true){
if(mqClient.reConnect()){
break;
}
try {
//前20次每秒重连一次
if(reConnTimes++ > 20){
Thread.sleep(1000);
}else{//超过20次后没10s重连一次
Thread.sleep(10000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Override
/**
* 说明:paho 本身有个bug,即此函数接口抛出异常都会回调到connectionLost()的接口,故需要在此函数中用try catch包起来处理,
* 确保无异常抛出。
* */
public void messageArrived(String topic, MqttMessage message) {
try {
byte[] payload = message.getPayload();
OnenetMq.Msg obj = OnenetMq.Msg.parseFrom(payload);
long at = obj.getTimestamp();
long msgid = obj.getMsgid();
String body = new String(obj.getData().toByteArray());
SimpleDateFormat slf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String time = slf.format(at);
logger.info("time = " + time +
",msg id: " + msgid +
", body: " + body);
// 消息处理,存储到数据库
pushCallback.messageProcessService.messageProcess(body);
}catch(Exception e){
logger.info("messageArrived phrase exception");
}finally {
if(mqClient.getManualAcks()){
mqClient.messageArrivedComplete(message);
}
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
Client = token.getClient();
}
}
3、运行项目
重新运行项目,MQ会把项目停止时没有接收的数据一股脑地传过来,打开数据库后就可以看到数据
此时因为没有收到设备生命周期事件,device_event表中的数据为空
将linux平台的模拟设备关闭运行几次,就收到了生命周期事件数据了:
4、打包成jar包运行
将springboot项目打包成jar包运行:
demo地址:https://gitee.com/hyh17808770899/spring-boot/tree/master/springboot-onenet
最后
以上就是开心小懒猪为你收集整理的OneNET物联网平台10 SpringBoot项目接收并解析MQ消息存入数据库的全部内容,希望文章能够帮你解决OneNET物联网平台10 SpringBoot项目接收并解析MQ消息存入数据库所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复