我是靠谱客的博主 甜蜜鞋子,最近开发中收集的这篇文章主要介绍开源一款针对IOT应用基于Springboot+Emqx+Tdengine的框架SET前言基本架构核心代码结论,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

前言

最近物联网相关技术还是很火的,正好有一个项目涉及到相关应用领域,研究一番发现EMQX以及TDengine都涉及物联网领域,而且都提供开源版本,但随后又发现只有Emqx的企业版才能支持自动向TDengine灌入数据的,但开源版这个功能被阉割了。
于是我们做了这个SET项目,一个基于Springboot搭建的java项目,主要作用是订阅Emqx消息并写入TDengine的框架,都是利用的开源版本的特性因此可以放心大胆的使用。

基本架构

下面是只有干货的架构图。
基本流程

核心代码

代码结构

在这里插入图片描述
依赖项

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.zew</groupId>
    <artifactId>iot-emqx-tdengine</artifactId>
    <version>0.0.1-SNAPSHOT</version>

    <name>spring-boot-demo</name>
    <description>Demo project for using TDEngine with Spring Boot And EMQX</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jdbc</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.1.1</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <!-- taos-jdbc -->
        <dependency>
            <groupId>com.taosdata.jdbc</groupId>
            <artifactId>taos-jdbcdriver</artifactId>
            <version>2.0.8</version>
        </dependency>

        <!-- druid连接池 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.1.17</version>
        </dependency>
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.1.0</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.68</version>
        </dependency>
        <!-- swagger -->
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>io.swagger</groupId>
            <artifactId>swagger-annotations</artifactId>
            <version>1.5.22</version>
        </dependency>
        <dependency>
            <groupId>io.swagger</groupId>
            <artifactId>swagger-models</artifactId>
            <version>1.5.22</version>
        </dependency>

        <!-- Slf4j -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
    </dependencies>

    <build>
        <resources>
            <resource>
                <directory>src/main/resources</directory>
                <includes>
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
                <filtering>true</filtering>
            </resource>
            <resource>
                <directory>src/main/java</directory>
                <includes>
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
            </resource>

        </resources>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

model层

package com.zew.demo.iot.model;

import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.sql.Timestamp;

/**
 * @author Administrator
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Weather {

    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss.SSS", timezone = "GMT+8")
    private Timestamp ts;

    private int temperature;

    private float humidity;

}

Service层

EMQX的监听模块

package com.zew.demo.iot.service;

import com.alibaba.fastjson.JSONObject;
import com.zew.demo.iot.model.Weather;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.net.InetAddress;
import java.net.UnknownHostException;

@Slf4j
@Service
public class MessageListenerService {
    @Value("${broker.ip:10.0.4.214}")
    private String brokerIp;
    @Value("${subscribe.topic:/#}")
    private String topic;
    @Autowired
    private WeatherService weatherService;

    @PostConstruct
    public void ini() {
        try {
            String host = InetAddress.getLocalHost().getHostAddress();
            MqttClient client = new MqttClient("tcp://" + brokerIp + ":1883", host + System.currentTimeMillis(), new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setCleanSession(true);
            options.setConnectionTimeout(10);
            options.setKeepAliveInterval(15);
            options.setUserName("userName");
            options.setPassword("userName".toCharArray());
            client.setCallback(new MqttCallback() {
                @Override
                public void connectionLost(Throwable cause) {
                    log.warn("连接中断", cause);
                }

                @Override
                public void messageArrived(String topic, MqttMessage message) {
                    log.debug("消息送达{},{}", topic, message);
                    byte[] payload = message.getPayload();
                    JSONObject jsonObject = JSONObject.parseObject(new String(payload));
                    Weather weather=new Weather();
                    weather.setTemperature(jsonObject.getInteger("temperature"));
                    weather.setHumidity(jsonObject.getFloat("humidity"));
                    weatherService.save(weather);
                }

                @Override
                public void deliveryComplete(IMqttDeliveryToken token) {
                    log.debug("消息完成{}", token);
                }
            });

            client.connect(options);
            client.subscribe(topic,1);
        } catch (UnknownHostException | MqttException e) {
            log.error("mqtt连接broker错误", e);
        }

    }
}

TDengine的操作模块

package com.zew.demo.iot.service;

import com.zew.demo.iot.mapper.DatabaseMapper;
import com.zew.demo.iot.mapper.WeatherMapper;
import com.zew.demo.iot.model.Weather;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @author Administrator
 */
@Slf4j
@Service
public class WeatherService {

    private final DatabaseMapper databaseMapper;
    private final WeatherMapper weatherMapper;

    public WeatherService(DatabaseMapper databaseMapper, WeatherMapper weatherMapper) {
        this.databaseMapper = databaseMapper;
        this.weatherMapper = weatherMapper;
    }

    public boolean init() {
        try {
            // 删除
            databaseMapper.dropDatabase("db");
            // 创建 databaseMapper.createDatabase( "db")
            Map<String, String> map = new HashMap<>(4);
            map.put("dbName", "db");
            map.put("keep", "36500");
            map.put("days", "30");
            map.put("blocks", "4");
            databaseMapper.creatDatabaseWithParameters(map);
            // 选择
            databaseMapper.useDatabase("db");
            // 创建表
            weatherMapper.createTable("db", "weather");
            return true;
        } catch (Exception e) {
            log.error("创建数据表出错", e);
        }
        return false;
    }

    public int save(Weather weather) {
        return weatherMapper.insert(weather);
    }

    public int batchSave(List<Weather> weatherList) {
        return weatherMapper.batchInsert(weatherList);
    }

    public List<Weather> query(Long limit, Long offset) {
        return weatherMapper.select(limit, offset);
    }
}

DAO层

package com.zew.demo.iot.mapper;

import com.zew.demo.iot.model.Weather;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * @author Administrator
 */
@Component
@Mapper
public interface WeatherMapper {

    /**
     * 创建数据表
     *
     * @param dbName    数据库名
     * @param tableName 表名
     */
    void createTable(String dbName, String tableName);

    /**
     * 单条插入数据
     *
     * @param weather 数据
     * @return
     */
    int insert(Weather weather);

    /**
     * 批量插入数据
     *
     * @param weatherList 数据列表
     * @return
     */
    int batchInsert(List<Weather> weatherList);

    /**
     * 查询数据
     *
     * @param limit  个数
     * @param offset 偏移
     * @return
     */
    List<Weather> select(@Param("limit") Long limit, @Param("offset") Long offset);
}

TDengine可能涉及到频繁的操作表,所以把表操作也封装了一下

package com.zew.demo.iot.mapper;

import org.apache.ibatis.annotations.Mapper;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * @author Administrator
 */
@Component
@Mapper
public interface DatabaseMapper {

    /**
     * 创建数据库
     *
     * @param dbName 数据库名
     * @return
     */
    int createDatabase(String dbName);

    /**
     * 创建数据库屏配置相关参数
     *
     * @param map 参数
     * @return
     */
    int creatDatabaseWithParameters(Map<String, String> map);

    /**
     * 删除数据库
     *
     * @param dbName 数据库名
     * @return
     */
    int dropDatabase(String dbName);

    /**
     * 选择数据库
     *
     * @param dbName 数据库名
     * @return
     */
    int useDatabase(String dbName);

}

mapper对应的xml

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">

<mapper namespace="com.zew.demo.iot.mapper.DatabaseMapper">

    <update id="createDatabase" parameterType="java.lang.String">
        create database if not exists ${dbName}
    </update>

    <update id="creatDatabaseWithParameters" parameterType="map">
        CREATE database if not EXISTS ${dbName}
        <if test="keep != null">
            KEEP ${keep}
        </if>
        <if test="days != null">
            DAYS ${days}
        </if>
        <if test="replica != null">
            REPLICA ${replica}
        </if>
        <if test="cache != null">
            cache ${cache}
        </if>
        <if test="blocks != null">
            blocks ${blocks}
        </if>
        <if test="minrows != null">
            minrows ${minrows}
        </if>
        <if test="maxrows != null">
            maxrows ${maxrows}
        </if>
    </update>

    <update id="dropDatabase" parameterType="java.lang.String">
        DROP database if exists ${dbName}
    </update>

    <update id="useDatabase" parameterType="java.lang.String">
        use ${dbName}
    </update>

</mapper>
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">

<mapper namespace="com.zew.demo.iot.mapper.WeatherMapper">

    <resultMap id="BaseResultMap" type="com.zew.demo.iot.model.Weather">
        <id column="ts" jdbcType="TIMESTAMP" property="ts"/>
        <result column="temperature" jdbcType="INTEGER" property="temperature"/>
        <result column="humidity" jdbcType="FLOAT" property="humidity"/>
    </resultMap>

    <update id="createTable">
        create table if not exists ${dbName}.${tableName}(ts timestamp, temperature int, humidity float);
    </update>

    <update id="dropTable" parameterType="java.lang.String">
        drop ${tableName}
    </update>

    <insert id="insert" parameterType="com.zew.demo.iot.model.Weather">
        insert into db.weather (ts,temperature, humidity) values (now, #{temperature,jdbcType=INTEGER}, #{humidity,jdbcType=FLOAT})
    </insert>

    <insert id="batchInsert" parameterType="java.util.List">
        insert into db.weather(ts,temperature, humidity) values
        <foreach separator=" " collection="list" item="weather" index="index">
            (now + #{index}a, #{weather.temperature}, #{weather.humidity})
        </foreach>
    </insert>

    <sql id="Base_Column_List">
        ts, temperature, humidity
    </sql>

    <select id="select" resultMap="BaseResultMap">
        select
        <include refid="Base_Column_List"/>
        from db.weather
        order by ts desc
        <if test="limit != null">
            limit #{limit,jdbcType=BIGINT}
        </if>
        <if test="offset != null">
            offset #{offset,jdbcType=BIGINT}
        </if>
    </select>
</mapper>

控制层

主要是测试用的

package com.zew.demo.iot.ctrl;

import com.zew.demo.iot.model.Weather;
import com.zew.demo.iot.service.WeatherService;
import io.swagger.annotations.ApiOperation;
import org.springframework.web.bind.annotation.*;

import java.util.List;

/**
 * @author Administrator
 */
@RestController
@RequestMapping("/weather")
public class WeatherController {

    private final WeatherService weatherService;

    public WeatherController(WeatherService weatherService) {
        this.weatherService = weatherService;
    }

    @ApiOperation(value = "初始数据库和数据表weather")
    @GetMapping("/init")
    public boolean init() {
        return weatherService.init();
    }

    @ApiOperation(value = "插入单条数据")
    @PostMapping
    public int saveWeather(@RequestBody Weather weather) {
        return weatherService.save(weather);
    }

    @ApiOperation(value = "插入多条数据")
    @PostMapping("/batch")
    public int batchSaveWeather(@RequestBody List<Weather> weatherList) {
        return weatherService.batchSave(weatherList);
    }

    @ApiOperation(value = "查询数据")
    @GetMapping("/{limit}/{offset}")
    public List<Weather> queryWeather(@PathVariable Long limit, @PathVariable Long offset) {
        return weatherService.query(limit, offset);
    }

}

其他

入口类

package com.zew.demo.iot;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author Administrator
 */

@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

}

swagger

package com.zew.demo.iot;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;

/**
 * @author zy
 */
@Configuration
@EnableSwagger2
public class SwaggerConfig {

    @Bean
    public Docket createRestApi() {
        return new Docket(DocumentationType.SWAGGER_2).select()
                .apis(RequestHandlerSelectors.basePackage("com.zew.demo.iot"))
                .paths(PathSelectors.any()).build().apiInfo(apiInfo());
    }

    private ApiInfo apiInfo() {
        return new ApiInfoBuilder().title("IOT-API").description("IOT-接口设计").build();
    }
}

配置

server.port=8080
server.servlet.context-path=/iot/services
# 数据库配置
spring.datasource.driver-class-name=com.taosdata.jdbc.TSDBDriver
spring.datasource.url=jdbc:TAOS://127.0.0.1:6030/log
spring.datasource.username=root
spring.datasource.password=taosdata
# druid连接池配置
spring.datasource.druid.initial-size=5
spring.datasource.druid.min-idle=5
spring.datasource.druid.max-active=5
spring.datasource.druid.max-wait=60000
spring.datasource.druid.validation-query=select server_status();
spring.datasource.druid.validation-query-timeout=5000
spring.datasource.druid.test-on-borrow=false
spring.datasource.druid.test-on-return=false
spring.datasource.druid.test-while-idle=true
spring.datasource.druid.time-between-eviction-runs-millis=60000
spring.datasource.druid.min-evictable-idle-time-millis=600000
spring.datasource.druid.max-evictable-idle-time-millis=900000
# mybatis
mybatis.mapper-locations=classpath:mapper/*.xml
# log
logging.level.com.zew.demo.iot.mapper=debug

结论

  1. 开源地址如下:传送门在这里
  2. 所用到的EMQX版本为4.2.2,TDengine版本为2.0.6.对了,TDengine的集成还是踩了不少坑的尤其那个2.0新加的FQDN,这块问题下次找同事来说说。
  3. Windows下要跑项目,别忘了安装专门的客户端哦,链接在这
  4. 项目只是示例,确实能够跑通,优化和定制还需要自己努力哦
  5. 这个项目大量参考了tdengine的spirngdemo的代码哦

最后

以上就是甜蜜鞋子为你收集整理的开源一款针对IOT应用基于Springboot+Emqx+Tdengine的框架SET前言基本架构核心代码结论的全部内容,希望文章能够帮你解决开源一款针对IOT应用基于Springboot+Emqx+Tdengine的框架SET前言基本架构核心代码结论所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(65)

评论列表共有 0 条评论

立即
投稿
返回
顶部