我是靠谱客的博主 甜蜜鞋子,最近开发中收集的这篇文章主要介绍开源一款针对IOT应用基于Springboot+Emqx+Tdengine的框架SET前言基本架构核心代码结论,觉得挺不错的,现在分享给大家,希望可以做个参考。
概述
前言
最近物联网相关技术还是很火的,正好有一个项目涉及到相关应用领域,研究一番发现EMQX以及TDengine都涉及物联网领域,而且都提供开源版本,但随后又发现只有Emqx的企业版才能支持自动向TDengine灌入数据的,但开源版这个功能被阉割了。
于是我们做了这个SET项目,一个基于Springboot搭建的java项目,主要作用是订阅Emqx消息并写入TDengine的框架,都是利用的开源版本的特性因此可以放心大胆的使用。
基本架构
下面是只有干货的架构图。
核心代码
代码结构
依赖项
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.zew</groupId>
<artifactId>iot-emqx-tdengine</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-boot-demo</name>
<description>Demo project for using TDEngine with Spring Boot And EMQX</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- taos-jdbc -->
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>2.0.8</version>
</dependency>
<!-- druid连接池 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.17</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.68</version>
</dependency>
<!-- swagger -->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>io.swagger</groupId>
<artifactId>swagger-annotations</artifactId>
<version>1.5.22</version>
</dependency>
<dependency>
<groupId>io.swagger</groupId>
<artifactId>swagger-models</artifactId>
<version>1.5.22</version>
</dependency>
<!-- Slf4j -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>src/main/resources</directory>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
</includes>
<filtering>true</filtering>
</resource>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
</includes>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
model层
package com.zew.demo.iot.model;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.sql.Timestamp;
/**
* @author Administrator
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Weather {
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss.SSS", timezone = "GMT+8")
private Timestamp ts;
private int temperature;
private float humidity;
}
Service层
EMQX的监听模块
package com.zew.demo.iot.service;
import com.alibaba.fastjson.JSONObject;
import com.zew.demo.iot.model.Weather;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.net.InetAddress;
import java.net.UnknownHostException;
@Slf4j
@Service
public class MessageListenerService {
@Value("${broker.ip:10.0.4.214}")
private String brokerIp;
@Value("${subscribe.topic:/#}")
private String topic;
@Autowired
private WeatherService weatherService;
@PostConstruct
public void ini() {
try {
String host = InetAddress.getLocalHost().getHostAddress();
MqttClient client = new MqttClient("tcp://" + brokerIp + ":1883", host + System.currentTimeMillis(), new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setConnectionTimeout(10);
options.setKeepAliveInterval(15);
options.setUserName("userName");
options.setPassword("userName".toCharArray());
client.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
log.warn("连接中断", cause);
}
@Override
public void messageArrived(String topic, MqttMessage message) {
log.debug("消息送达{},{}", topic, message);
byte[] payload = message.getPayload();
JSONObject jsonObject = JSONObject.parseObject(new String(payload));
Weather weather=new Weather();
weather.setTemperature(jsonObject.getInteger("temperature"));
weather.setHumidity(jsonObject.getFloat("humidity"));
weatherService.save(weather);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
log.debug("消息完成{}", token);
}
});
client.connect(options);
client.subscribe(topic,1);
} catch (UnknownHostException | MqttException e) {
log.error("mqtt连接broker错误", e);
}
}
}
TDengine的操作模块
package com.zew.demo.iot.service;
import com.zew.demo.iot.mapper.DatabaseMapper;
import com.zew.demo.iot.mapper.WeatherMapper;
import com.zew.demo.iot.model.Weather;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author Administrator
*/
@Slf4j
@Service
public class WeatherService {
private final DatabaseMapper databaseMapper;
private final WeatherMapper weatherMapper;
public WeatherService(DatabaseMapper databaseMapper, WeatherMapper weatherMapper) {
this.databaseMapper = databaseMapper;
this.weatherMapper = weatherMapper;
}
public boolean init() {
try {
// 删除
databaseMapper.dropDatabase("db");
// 创建 databaseMapper.createDatabase( "db")
Map<String, String> map = new HashMap<>(4);
map.put("dbName", "db");
map.put("keep", "36500");
map.put("days", "30");
map.put("blocks", "4");
databaseMapper.creatDatabaseWithParameters(map);
// 选择
databaseMapper.useDatabase("db");
// 创建表
weatherMapper.createTable("db", "weather");
return true;
} catch (Exception e) {
log.error("创建数据表出错", e);
}
return false;
}
public int save(Weather weather) {
return weatherMapper.insert(weather);
}
public int batchSave(List<Weather> weatherList) {
return weatherMapper.batchInsert(weatherList);
}
public List<Weather> query(Long limit, Long offset) {
return weatherMapper.select(limit, offset);
}
}
DAO层
package com.zew.demo.iot.mapper;
import com.zew.demo.iot.model.Weather;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @author Administrator
*/
@Component
@Mapper
public interface WeatherMapper {
/**
* 创建数据表
*
* @param dbName 数据库名
* @param tableName 表名
*/
void createTable(String dbName, String tableName);
/**
* 单条插入数据
*
* @param weather 数据
* @return
*/
int insert(Weather weather);
/**
* 批量插入数据
*
* @param weatherList 数据列表
* @return
*/
int batchInsert(List<Weather> weatherList);
/**
* 查询数据
*
* @param limit 个数
* @param offset 偏移
* @return
*/
List<Weather> select(@Param("limit") Long limit, @Param("offset") Long offset);
}
TDengine可能涉及到频繁的操作表,所以把表操作也封装了一下
package com.zew.demo.iot.mapper;
import org.apache.ibatis.annotations.Mapper;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* @author Administrator
*/
@Component
@Mapper
public interface DatabaseMapper {
/**
* 创建数据库
*
* @param dbName 数据库名
* @return
*/
int createDatabase(String dbName);
/**
* 创建数据库屏配置相关参数
*
* @param map 参数
* @return
*/
int creatDatabaseWithParameters(Map<String, String> map);
/**
* 删除数据库
*
* @param dbName 数据库名
* @return
*/
int dropDatabase(String dbName);
/**
* 选择数据库
*
* @param dbName 数据库名
* @return
*/
int useDatabase(String dbName);
}
mapper对应的xml
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.zew.demo.iot.mapper.DatabaseMapper">
<update id="createDatabase" parameterType="java.lang.String">
create database if not exists ${dbName}
</update>
<update id="creatDatabaseWithParameters" parameterType="map">
CREATE database if not EXISTS ${dbName}
<if test="keep != null">
KEEP ${keep}
</if>
<if test="days != null">
DAYS ${days}
</if>
<if test="replica != null">
REPLICA ${replica}
</if>
<if test="cache != null">
cache ${cache}
</if>
<if test="blocks != null">
blocks ${blocks}
</if>
<if test="minrows != null">
minrows ${minrows}
</if>
<if test="maxrows != null">
maxrows ${maxrows}
</if>
</update>
<update id="dropDatabase" parameterType="java.lang.String">
DROP database if exists ${dbName}
</update>
<update id="useDatabase" parameterType="java.lang.String">
use ${dbName}
</update>
</mapper>
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.zew.demo.iot.mapper.WeatherMapper">
<resultMap id="BaseResultMap" type="com.zew.demo.iot.model.Weather">
<id column="ts" jdbcType="TIMESTAMP" property="ts"/>
<result column="temperature" jdbcType="INTEGER" property="temperature"/>
<result column="humidity" jdbcType="FLOAT" property="humidity"/>
</resultMap>
<update id="createTable">
create table if not exists ${dbName}.${tableName}(ts timestamp, temperature int, humidity float);
</update>
<update id="dropTable" parameterType="java.lang.String">
drop ${tableName}
</update>
<insert id="insert" parameterType="com.zew.demo.iot.model.Weather">
insert into db.weather (ts,temperature, humidity) values (now, #{temperature,jdbcType=INTEGER}, #{humidity,jdbcType=FLOAT})
</insert>
<insert id="batchInsert" parameterType="java.util.List">
insert into db.weather(ts,temperature, humidity) values
<foreach separator=" " collection="list" item="weather" index="index">
(now + #{index}a, #{weather.temperature}, #{weather.humidity})
</foreach>
</insert>
<sql id="Base_Column_List">
ts, temperature, humidity
</sql>
<select id="select" resultMap="BaseResultMap">
select
<include refid="Base_Column_List"/>
from db.weather
order by ts desc
<if test="limit != null">
limit #{limit,jdbcType=BIGINT}
</if>
<if test="offset != null">
offset #{offset,jdbcType=BIGINT}
</if>
</select>
</mapper>
控制层
主要是测试用的
package com.zew.demo.iot.ctrl;
import com.zew.demo.iot.model.Weather;
import com.zew.demo.iot.service.WeatherService;
import io.swagger.annotations.ApiOperation;
import org.springframework.web.bind.annotation.*;
import java.util.List;
/**
* @author Administrator
*/
@RestController
@RequestMapping("/weather")
public class WeatherController {
private final WeatherService weatherService;
public WeatherController(WeatherService weatherService) {
this.weatherService = weatherService;
}
@ApiOperation(value = "初始数据库和数据表weather")
@GetMapping("/init")
public boolean init() {
return weatherService.init();
}
@ApiOperation(value = "插入单条数据")
@PostMapping
public int saveWeather(@RequestBody Weather weather) {
return weatherService.save(weather);
}
@ApiOperation(value = "插入多条数据")
@PostMapping("/batch")
public int batchSaveWeather(@RequestBody List<Weather> weatherList) {
return weatherService.batchSave(weatherList);
}
@ApiOperation(value = "查询数据")
@GetMapping("/{limit}/{offset}")
public List<Weather> queryWeather(@PathVariable Long limit, @PathVariable Long offset) {
return weatherService.query(limit, offset);
}
}
其他
入口类
package com.zew.demo.iot;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author Administrator
*/
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
swagger
package com.zew.demo.iot;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
/**
* @author zy
*/
@Configuration
@EnableSwagger2
public class SwaggerConfig {
@Bean
public Docket createRestApi() {
return new Docket(DocumentationType.SWAGGER_2).select()
.apis(RequestHandlerSelectors.basePackage("com.zew.demo.iot"))
.paths(PathSelectors.any()).build().apiInfo(apiInfo());
}
private ApiInfo apiInfo() {
return new ApiInfoBuilder().title("IOT-API").description("IOT-接口设计").build();
}
}
配置
server.port=8080
server.servlet.context-path=/iot/services
# 数据库配置
spring.datasource.driver-class-name=com.taosdata.jdbc.TSDBDriver
spring.datasource.url=jdbc:TAOS://127.0.0.1:6030/log
spring.datasource.username=root
spring.datasource.password=taosdata
# druid连接池配置
spring.datasource.druid.initial-size=5
spring.datasource.druid.min-idle=5
spring.datasource.druid.max-active=5
spring.datasource.druid.max-wait=60000
spring.datasource.druid.validation-query=select server_status();
spring.datasource.druid.validation-query-timeout=5000
spring.datasource.druid.test-on-borrow=false
spring.datasource.druid.test-on-return=false
spring.datasource.druid.test-while-idle=true
spring.datasource.druid.time-between-eviction-runs-millis=60000
spring.datasource.druid.min-evictable-idle-time-millis=600000
spring.datasource.druid.max-evictable-idle-time-millis=900000
# mybatis
mybatis.mapper-locations=classpath:mapper/*.xml
# log
logging.level.com.zew.demo.iot.mapper=debug
结论
- 开源地址如下:传送门在这里
- 所用到的EMQX版本为4.2.2,TDengine版本为2.0.6.对了,TDengine的集成还是踩了不少坑的尤其那个2.0新加的FQDN,这块问题下次找同事来说说。
- Windows下要跑项目,别忘了安装专门的客户端哦,链接在这
- 项目只是示例,确实能够跑通,优化和定制还需要自己努力哦
- 这个项目大量参考了tdengine的spirngdemo的代码哦
最后
以上就是甜蜜鞋子为你收集整理的开源一款针对IOT应用基于Springboot+Emqx+Tdengine的框架SET前言基本架构核心代码结论的全部内容,希望文章能够帮你解决开源一款针对IOT应用基于Springboot+Emqx+Tdengine的框架SET前言基本架构核心代码结论所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复