我是靠谱客的博主 开心小懒猪,最近开发中收集的这篇文章主要介绍OneNET物联网平台10 SpringBoot项目接收并解析MQ消息存入数据库,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

一、将设备消息添加到MQ中

1、配置规则引擎

分别将设备生命周期事件设备数据点消息都连接上同一个topic

image-20220215173103268

image-20220215173457064

如图,两个规则,一个是设备生命周期事件,也就是设备创建、删除、上线、离线记录;另一个是设备数据点消息

image-20220215173523191

2、本地代码订阅消息

在设备连接的MQ消息topic中添加订阅yuyun-topic-data,用于本地工程订阅MQ发送的消息

image-20220215173833142

本地代码添加订阅:

image-20220215173917272

运行demo,如下如所示,一直接收数据就算订阅成功了

image-20220215174606891

二、数据消息格式

1、设备数据点消息

参数属性类型说明示例
sysPropertymessageTypestring消息类型:固定为deviceDatapoint
productIdstring产品ID90273
appPropertydeviceIdstring设备ID102839
dataTimestampint设备数据点生产时间戳,单位毫秒,设备上传时可自定义携带15980987429000
datastreamstring数据流名称weather
bodyobject/string/…详细的数据点消息内容见如下示例

本地接收到的数据实例:

{
    "sysProperty": {
        "messageType": "deviceDatapoint",
        "productId": "90273",
    },
    "appProperty":{
        "deviceId": "102839",
        "dataTimestamp": 15980987429000,
        "datastream":"weather"
    },
    "body":{
        "temperature": 30,
        "humidity": "47%"
    }
}

数据中有一个body参数,里面的数据类型可以是:JSON、数值、字符串和二进制

"body":{
    "temperature": 30,
    "humidity": "47%"
}
"body": 10
"body": 11.55
// String类型
"body":"sunny with wind"
  • 数据格式为二进制数据时,body中数据为二进制数据的索引号 index,示例如下,用户可以通过该索引号通过API获取该数据
"body":{
    "index": "3491506_1475204886914_bin"
}

2、设备生命周期事件消息

参数属性类型说明示例
sysPropertymessageTypestring消息类型:固定为deviceLifeCycle
productIdstring产品ID90273
appPropertydeviceIdstring设备ID102839
dataTimestampint设备消息生产时间戳,单位毫秒15980987429000
bodyobject创建、删除、上线、离线 created/deleted/online/offline示例如下
{
    "sysProperty": {
        "messageType": "deviceLifeCycle",
        "productId": "90273",
    },
    "appProperty":{
        "deviceId": "102839",
        "dataTimestamp": 15980987429000,
    },
    "body":{
        "event": "online"
    }
}

三、新建数据库表

新建两个表,一个存放设备生命周期事件,一个存放设备数据点消息

CREATE TABLE `device_data`  (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `product_id` varchar(20) CHARACTER  NULL DEFAULT NULL COMMENT '产品ID',
  `device_id` varchar(20) CHARACTER NULL DEFAULT NULL COMMENT '设备ID',
  `data_stream` varchar(20) CHARACTER NULL DEFAULT NULL COMMENT '数据流名称',
  `data_value` double(9, 2) NULL DEFAULT NULL COMMENT '数据',
  `date_time` datetime NULL DEFAULT NULL COMMENT '设备数据点产生时间',
  PRIMARY KEY (`id`) USING BTREE,
  INDEX `product_device_id`(`product_id`, `device_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci COMMENT = '设备数据点消息' ROW_FORMAT = DYNAMIC;
CREATE TABLE `device_event`  (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `product_id` varchar(20) CHARACTER NULL DEFAULT NULL COMMENT '产品ID',
  `device_id` varchar(20) CHARACTER NULL DEFAULT NULL COMMENT '设备ID',
  `date_time` datetime NULL DEFAULT NULL COMMENT '时间',
  `event` varchar(10) CHARACTER NULL DEFAULT NULL COMMENT '生命周期事件',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci COMMENT = '设备生命周期事件表' ROW_FORMAT = DYNAMIC;

四、创建SpringBoot+Mybatis-Plus项目

创建项目可以参照:新建一个Spring Boot+MyBatis-Plus项目

最终建成的项目结构如下:

image-20220216142955303

新建一个mq包,将配置好的demo中的东西搬过来,配置方法链接:https://yuyun.blog.csdn.net/article/details/122960344

搬过来之后就是这样:

image-20220216115125715

此时pom.xml文件内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.1</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <groupId>com.yuyun</groupId>
    <artifactId>springboot-onenet</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>springboot-onenet</name>
    <description>springboot-onenet</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!--生成实体成get set-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

        <!-- mybatisPlus 核心库 -->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.4.1</version>
            <optional>true</optional>
        </dependency>

        <!-- MySQL驱动 -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!--    onenet    -->
        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>3.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.0</version>
        </dependency>
        <dependency>
            <groupId>bouncycastle</groupId>
            <artifactId>bcprov-jdk15</artifactId>
            <version>140</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.78</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

五、运行项目

前面运行的demo中,运行的是MQClient.java文件中的main方法,而main方法中的内容为:

public static void main(String[] args) {
    MqClient mqClient = new MqClient();
    mqClient.connect();
}

demo运行的时候一直在接收onenet消息队列MQ生产的消息,SpringBoot要达到这种一直运行着的效果,将调用方法放入启动类最容易实现,代码如下:

package com.yuyun;

import com.yuyun.mq.MqClient;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author hyh
 */
@MapperScan("com.yuyun.mapper")
@SpringBootApplication
public class SpringbootOneNetApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringbootOneNetApplication.class, args);
        
        MqClient mqClient = new MqClient();
        mqClient.connect();
    }
}

启动SpringBoot项目,出现如下图所示的效果就算运行成功了

image-20220216120313691

此时运行linux平台的模拟设备,其自动向onenet平台发送消息,onenet平台又会通过MQ来发送消息,SpringBoot项目接收到消息:

image-20220216120943828

六、数据存入数据库

1、新建一个Service用来处理消息

接收到的消息是json格式的,解析这个格式的数据我用的是alibaba的fastjson。当messageType为deviceDatapoint是数据点消息,deviceLifeCycle是设备生命周期事件。

package com.yuyun.service.impl;

import com.alibaba.fastjson.JSONObject;
import com.yuyun.dto.DeviceDataDTO;
import com.yuyun.dto.DeviceEventDTO;
import com.yuyun.service.DeviceDataService;
import com.yuyun.service.DeviceEventService;
import com.yuyun.service.MessageProcessService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Date;

/**
 * @author hyh
 */
@Slf4j
@Service
public class MessageProcessServiceImpl implements MessageProcessService {
    
    @Autowired
    private DeviceEventService deviceEventService;
    @Autowired
    private DeviceDataService deviceDataService;

    @Override
    public void messageProcess(String body) {
        JSONObject obj = JSONObject.parseObject(body);
        //产品信息
        JSONObject product = obj.getJSONObject("sysProperty");
        //产品id
        String productId = product.getString("productId");
        // 消息类型
        String messageType = product.getString("messageType");
        //数据体
        JSONObject appProperty = obj.getJSONObject("appProperty");

        //数据上传时间
        Date time = new Date(appProperty.getLong("dataTimestamp"));
        //设备id
        String deviceId = appProperty.getString("deviceId");

        // 生命周期事件
        if ("deviceLifeCycle".equals(messageType)){
            JSONObject bodyJson = obj.getJSONObject("body");
            String event = bodyJson.getString("event");

            DeviceEventDTO deviceEvent = new DeviceEventDTO();
            deviceEvent.setProductId(productId);
            deviceEvent.setDeviceId(deviceId);
            deviceEvent.setDateTime(time);
            deviceEvent.setEvent(event);
            // 存储到数据库
            deviceEventService.save(deviceEvent);

        }
        // 数据点消息
        else if ("deviceDatapoint".equals(messageType)){
            // 数据流名称
            String dataStream = appProperty.getString("datastream");
            Double dataValue = obj.getDouble("body");

            DeviceDataDTO deviceDataDTO = new DeviceDataDTO();
            deviceDataDTO.setProductId(productId);
            deviceDataDTO.setDeviceId(deviceId);
            deviceDataDTO.setDataStream(dataStream);
            deviceDataDTO.setDataValue(dataValue);
            deviceDataDTO.setDateTime(time);

            // 存储到数据库
            deviceDataService.save(deviceDataDTO);

        } else {
            log.error("未知消息类型的数据:" + body);
        }
    }
}

2、调用该service方法

demo中提供了一个消息消费方法,PushCallback类中的messageArrived方法:

image-20220216144035448

就在这里调用处理消息的service就行

(1)在PushCallbackMqClient类上面加上注解@Component

(2)PushCallback类中引入MessageProcessService

@Autowired
protected MessageProcessService messageProcessService;
private static PushCallback pushCallback;

(3)添加代码

/**
 * 通过PostConstruct实现初始化bean之前进行的操作
 */
@PostConstruct
public void init() {
    pushCallback = this;
    pushCallback.messageProcessService = this.messageProcessService;
}

(4)调用service类的数据处理方法

// 消息处理,存储到数据库
pushCallback.messageProcessService.messageProcess(body);

最终代码如下:

package com.yuyun.mq;

import com.yuyun.service.MessageProcessService;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.text.SimpleDateFormat;
import java.util.logging.Logger;

@Component
public class PushCallback implements MqttCallback {

    @Autowired
    protected MessageProcessService messageProcessService;
    private static PushCallback pushCallback;

    private IMqttAsyncClient Client;
    private static final Logger logger = Logger.getLogger(PushCallback.class.getCanonicalName());
    private MqClient mqClient;
    private int reConnTimes = 0;

    /**
     * 通过PostConstruct实现初始化bean之前进行的操作
     */
    @PostConstruct
    public void init() {
        pushCallback = this;
        pushCallback.messageProcessService = this.messageProcessService;
    }

    public PushCallback(MqClient client) {
        mqClient = client;
    }

    @Override
    public void connectionLost(Throwable cause) {
        logger.info("connect is losted,and try to reconnect,cause = " + cause.getMessage() );
        cause.printStackTrace();
        while(true){
            if(mqClient.reConnect()){
                break;
            }
            try {
                //前20次每秒重连一次
                if(reConnTimes++ > 20){
                    Thread.sleep(1000);
                }else{//超过20次后没10s重连一次
                    Thread.sleep(10000);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    @Override
    /**
     * 说明:paho 本身有个bug,即此函数接口抛出异常都会回调到connectionLost()的接口,故需要在此函数中用try catch包起来处理,
     *      确保无异常抛出。
     * */
    public void messageArrived(String topic, MqttMessage message)  {
       try {
           byte[] payload = message.getPayload();
           OnenetMq.Msg obj = OnenetMq.Msg.parseFrom(payload);
           long at = obj.getTimestamp();
           long msgid = obj.getMsgid();
           String body = new String(obj.getData().toByteArray());

           SimpleDateFormat slf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
           String time = slf.format(at);

           logger.info("time = " + time +
                   ",msg id: " + msgid +
                   ", body: " + body);

           // 消息处理,存储到数据库
           pushCallback.messageProcessService.messageProcess(body);

       }catch(Exception e){
           logger.info("messageArrived phrase exception");
       }finally {
           if(mqClient.getManualAcks()){
               mqClient.messageArrivedComplete(message);
           }
       }
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        Client = token.getClient();
    }

}

3、运行项目

重新运行项目,MQ会把项目停止时没有接收的数据一股脑地传过来,打开数据库后就可以看到数据

image-20220216145419160

此时因为没有收到设备生命周期事件,device_event表中的数据为空

image-20220216145516812

将linux平台的模拟设备关闭运行几次,就收到了生命周期事件数据了:

image-20220216151740844

4、打包成jar包运行

将springboot项目打包成jar包运行:

image-20220216155408367

demo地址:https://gitee.com/hyh17808770899/spring-boot/tree/master/springboot-onenet

最后

以上就是开心小懒猪为你收集整理的OneNET物联网平台10 SpringBoot项目接收并解析MQ消息存入数据库的全部内容,希望文章能够帮你解决OneNET物联网平台10 SpringBoot项目接收并解析MQ消息存入数据库所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(96)

评论列表共有 0 条评论

立即
投稿
返回
顶部