概述
一、下载依赖
1.<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
2.配置文件
#MQTT-用户名
spring.mqtt.username=admin
#MQTT-密码
spring.mqtt.password=password
#MQTT-服务器连接地址,如果有多个,用逗号隔开,如:tcp://127.0.0.1:61613,tcp://192.168.2.133:61613
spring.mqtt.url=tcp://127.0.0.1:61613
#MQTT-连接服务器默认客户端ID,注意一个必须唯一,订阅端的clientId如果重复,订阅端只能启动一个
spring.mqtt.client.id=publishClient
#MQTT-默认的消息推送主题,实际可在调用接口时指定
spring.mqtt.default.topic=topic
二、发布端代码
1.建立一个配置文件实现bean方法
@ConfigurationProperties("spring.mqtt")
@Configuration
public class MqttConfig {
@Value("${spring.mqtt.username}")
private String username;
@Value("${spring.mqtt.password}")
private String password;
@Value("${spring.mqtt.url}")
private String hostUrl;
@Value("${spring.mqtt.client.id}")
private String clientId;
@Value("${spring.mqtt.default.topic}")
private String defaultTopic;
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getHostUrl() {
return hostUrl;
}
public void setHostUrl(String hostUrl) {
this.hostUrl = hostUrl;
}
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public String getDefaultTopic() {
return defaultTopic;
}
public void setDefaultTopic(String defaultTopic) {
this.defaultTopic = defaultTopic;
}
@Bean
MqttClient mqttClient(){
try {
MemoryPersistence persistence = new MemoryPersistence();
// 创建客户端
MqttClient sampleClient = new MqttClient(hostUrl, clientId, persistence);
// 建立连接
sampleClient.connect(MqttConnectOptions());
return sampleClient;
} catch (Exception me) {
me.printStackTrace();
return null;
}
}
@Bean
MqttConnectOptions MqttConnectOptions(){
MqttConnectOptions connOpts = new MqttConnectOptions();
// 在重新启动和重新连接时记住状态
connOpts.setCleanSession(true);
// 设置连接的用户名
connOpts.setUserName(username);
connOpts.setPassword(password.toCharArray());
return connOpts;
}
}
2.实现service
@Service
public class MQTTService {
@Autowired
MqttClient mqttClient;
@Autowired
MqttConnectOptions mqttConnectOptions;
protected final Logger logger = LoggerFactory.getLogger(getClass());
public void sendMsg(Integer qos,String topic,String content){
try {
if ( !mqttClient.isConnected()){
synchronized(MQTTService.class) {
mqttClient.connect(mqttConnectOptions);
}
}
MqttMessage message = new MqttMessage(content.getBytes("utf-8"));
// 设置消息的服务质量
if(qos == null){
message.setQos(0);
}
message.setQos(qos);
mqttClient.publish(topic, message);
}catch (Exception e){
e.printStackTrace();
logger.error(e.getMessage());
}
}
}
三、订阅端代码
1.config文件
@Configuration
public class MqttConfig {
@Bean
MqttClient mqttClient(){
try {
String path = System.getProperty("user.dir") + "/DBServer.ini";
String hostUrl = ConfigReader.readCfgValue(path, "dbsource", "hostUrl", "");
String clientId = "subClient";
MemoryPersistence persistence = new MemoryPersistence();
// 创建客户端
MqttClient sampleClient = new MqttClient(hostUrl, clientId, persistence);
// 建立连接
sampleClient.connect(MqttConnectOptions());
return sampleClient;
} catch (Exception me) {
me.printStackTrace();
return null;
}
}
@Bean
MqttConnectOptions MqttConnectOptions(){
try{
String path = System.getProperty("user.dir") + "/DBServer.ini";
String username = ConfigReader.readCfgValue(path, "dbsource", "mqttuserName", "");
String password = ConfigReader.readCfgValue(path, "dbsource", "mqttpassWord", "");
MqttConnectOptions connOpts = new MqttConnectOptions();
// 在重新启动和重新连接时记住状态
connOpts.setCleanSession(true);
// 设置连接的用户名
connOpts.setUserName(username);
connOpts.setPassword(password.toCharArray());
// 设置超时时间 单位为秒
connOpts.setConnectionTimeout(10);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
connOpts.setKeepAliveInterval(20);
return connOpts;
}catch (Exception e){
e.printStackTrace();
return null;
}
}
}
2.service
下面是我写的一个传输数据的方法的一部分,主要是在messageArrived()接收数据,注意设置编码格式,不然在idea里可以正确运行,但是打成jar包运行时将乱码。
@Component
public
class MQTTService {
static long time = 0;
static List<TimerTask> timerTaskList = new ArrayList<>();
static Logger logger= Logger.getLogger(MQTTService.class);
public static JSONObject receiveMg(Integer qos, String topic, ClientInfo clientInfo, List<TimerTask> timerList, List<TimerTask> timerList2, List<Thread> threadList){
JSONObject json = new JSONObject();
try {
String path = System.getProperty("user.dir") + "/DBServer.ini";
String userName = ConfigReader.readCfgValue(path, "dbsource", "mqttuserName", "");
String passWord = ConfigReader.readCfgValue(path, "dbsource", "mqttpassWord", "");
String HOST = ConfigReader.readCfgValue(path, "dbsource", "hostUrl", "");
String clientid = "subClient_"+UUID.randomUUID().toString();
// host为主机名,test为clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
MqttClient client = new MqttClient(HOST, clientid, new MemoryPersistence());
// MQTT的连接设置
MqttConnectOptions options = new MqttConnectOptions();
// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
options.setCleanSession(true);
// 设置连接的用户名
options.setUserName(userName);
// 设置连接的密码
options.setPassword(passWord.toCharArray());
// 设置超时时间 单位为秒
options.setConnectionTimeout(10);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
options.setKeepAliveInterval(20);
// 设置回调函数
client.setCallback(new MqttCallback() {
public void connectionLost(Throwable cause) {
System.out.println("connectionLost");
}
public void messageArrived(String topic, MqttMessage message){
String clientId = clientInfo.getClientId();
String loginIp = clientInfo.getLoginIp();
String loginPort = clientInfo.getLoginPort();
String getdbUrl = clientInfo.getGetdbUrl();
String systemId = clientInfo.getSystemId();
String systemSecret = clientInfo.getSystemSecret();
String JDBC_DRIVER = clientInfo.getJdbcDriver();
String DB_URL = clientInfo.getDbUrl();
String USER = clientInfo.getUsername();
String PASS = clientInfo.getPassword();
String content = null;
try {
content = new String(message.getPayload(),"utf-8");
} catch (UnsupportedEncodingException e) {
logger.error(e.getMessage());
}
if((systemId+"_"+clientId+"/dataJoinUpdate").equals(topic)){
//设置缓冲区,10s内数据只读取一次
long curTime = System.currentTimeMillis();
if((curTime - time)>1000*10){
//执行方法并重置
time = curTime;
restart(loginIp, loginPort, getdbUrl, systemId, systemSecret, clientId, timerList, timerList2,threadList);
}else{
//清空定时器,防止多次new
if(timerTaskList.size()>0){
for (TimerTask t:timerTaskList) {
t.cancel();
}
timerTaskList.clear();
}
for (TimerTask t:timerList2) {
t.cancel();
}
//设置10s的定时器
TimerTask task = new TimerTask() {
@Override
public void run() {
restart(loginIp, loginPort, getdbUrl, systemId, systemSecret, clientId, timerList, timerList2,threadList);
}
};
Timer timer = new Timer();
timer.schedule(task,1000*10);
timerTaskList.add(task);
}
}else{
//同步数据
JSONObject json = JSONObject.parseObject(content);
String tableName = json.getString("tableName");
String primaryKey = json.getString("primaryKey");
json.remove("clientId");
json.remove("tableName");
json.remove("primaryKey");
json.remove("systemId");
Connection conn = null;
PreparedStatement stmt = null;
Map<String, Object> map =json;
try{
if((systemId+"_"+clientId+"/insert").equals(topic)){
logger.info("mqttInsert:"+content);
Class.forName(JDBC_DRIVER);
conn = DriverManager.getConnection(DB_URL, USER, PASS);
List keyList = new LinkedList();
List valueList = new LinkedList();
String primaryValue = "";
for (Map.Entry<String, Object> entry : map.entrySet()) {
if(StringUtils.hasText(entry.getValue().toString())){
if(primaryKey.equals(entry.getKey())){
primaryValue = "'"+entry.getValue().toString()+"'";
}
keyList.add(entry.getKey());
valueList.add("'"+entry.getValue().toString()+"'");
}
}
String sql_count = "select count("+primaryKey+") from "+tableName+" where "+primaryKey+" = "+primaryValue;
stmt = conn.prepareStatement(sql_count);
ResultSet rs = stmt.executeQuery();
int count = 0;
while (rs.next()){
count = rs.getInt(1);
}
rs.close();
if(count == 0){
String keyStr = String.join(",",keyList);
String valueStr = String.join(",",valueList);
String insertSql = "INSERT INTO "+tableName+"("+keyStr+") VALUES ("+valueStr+")";
stmt = conn.prepareStatement(insertSql);
stmt.execute();
String deleteInsertSql = "delete from "+tableName+"copy where "+primaryKey+" = "+primaryValue+" and action = 'insert'";
stmt = conn.prepareStatement(deleteInsertSql);
stmt.execute();
}
}else if((systemId+"_"+clientId+"/update").equals(topic)){
logger.info("mqttUpdate:"+content);
Class.forName(JDBC_DRIVER);
conn = DriverManager.getConnection(DB_URL, USER, PASS);
String primaryValue = "";
List list = new ArrayList();
for (Map.Entry<String, Object> entry : map.entrySet()) {
if(StringUtils.hasText(entry.getValue().toString())){
list.add(entry.getKey()+"="+"'"+entry.getValue().toString()+"'");
}
if(primaryKey.equals(entry.getKey())){
primaryValue = "'"+entry.getValue().toString()+"'";
}
}
String str = String.join(",",list);
String updateSql = "update "+tableName+"
set "+str+" where "+primaryKey+" = "+primaryValue;
stmt = conn.prepareStatement(updateSql);
stmt.execute();
String sql_update = "select * from "+tableName+" where "+primaryKey+" = "+primaryValue;
stmt = conn.prepareStatement(sql_update);
ResultSet r = stmt.executeQuery();
ResultSetMetaData data = r.getMetaData();
List updateList = new ArrayList();
while (r.next()){
for(int i = 1; i <= data.getColumnCount(); ++i) {
String name = data.getColumnName(i);
String columnValue = r.getString(i);
if(columnValue != null){
updateList.add(name+"="+"'"+columnValue+"'");
}
}
}
r.close();
String delStr = String.join(" and ",updateList);
String deleteUpdateSql = "delete from "+tableName+"copy where "+delStr+" and action = 'update'";
stmt = conn.prepareStatement(deleteUpdateSql);
stmt.execute();
}else if((systemId+"_"+clientId+"/delete").equals(topic)) {
logger.info("mqttDelete:"+content);
Class.forName(JDBC_DRIVER);
conn = DriverManager.getConnection(DB_URL, USER, PASS);
String primaryValue = "'"+json.getString(primaryKey)+"'";
String deleteSql = "DELETE FROM "+tableName+" WHERE "+primaryKey+"="+primaryValue;
stmt = conn.prepareStatement(deleteSql);
stmt.execute();
String deleteDelSql = "delete from "+tableName+"copy where "+primaryKey+" = "+primaryValue+" and action = 'delete'";
stmt = conn.prepareStatement(deleteDelSql);
stmt.execute();
}
}catch (Exception e){
logger.error(e.getMessage());
}finally {
try{
stmt.close();
conn.close();
}catch (Exception e){
logger.error(e.getMessage());
}
}
}
}
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------"+ token.isComplete());
}
});
client.connect(options);
//订阅消息
client.subscribe(topic, qos);
} catch (Exception e) {
e.printStackTrace();
json.put("code",1);
json.put("dara",e.getMessage());
}
return json;
}
四、Mqtt服务器搭建
怎么搭建不多赘述了,百度就有,需要注意的是订阅端多个的话,clientId必须不重复,下面贴上搭建服务器的链接。
https://jingyan.baidu.com/article/d45ad148b8efb769542b8066.html
最后
以上就是傲娇秀发为你收集整理的MQTT项目中的实践的全部内容,希望文章能够帮你解决MQTT项目中的实践所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复