概述
第一部分:认识MQTT
先来一段百度文库的介绍
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分。该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当做传感器和制动器(比如通过Twitter让房屋联网)的通信协议。
MQTT协议是为大量计算能力有限,且工作在低带宽、不可靠的网络的远程传感器和控制设备通讯而设计的协议,它具有以下主要的几项特性:
1、使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合;
2、对负载内容屏蔽的消息传输;
3、使用 TCP/IP 提供网络连接;
4、有三种消息发布服务质量:
我的认识
再我看来MQTT就有一点类似于java设计模式中的观察者模式,进行注册订阅信息后,当有类似消息进入队列时就会监听到,大概我画了一个草图:手动绘图板,所以比较丑^_^
消息接收者(消费者)会注册一个(或多个)感兴趣的主题,然后当信息发布者(生产者)发布一个消息后,注册过该主题的消息接收者(消费者),就可以获取到这条消息,MQTT就是针对这种场景,对tcp进行了一层封装。
MQTT协议详解
https://mcxiaoke.gitbooks.io/mqtt-cn/content/
第二部分准备工作
- 首先要准备apollo作为消息队列
安装流程请参考:http://blog.csdn.net/whb3299065/article/details/79092095 准备jar:http://download.csdn.net/download/whb3299065/10211343为了方便,我把所有用到的东西发布了一份。
jarMaven地址 <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.0</version> </dependency>
第三部分开发
我初学的时候发现很多人喜欢将DOME写成客户端已服务器两个部分,对我理解造成了一些困扰,其实QMTT并没有单纯意义上的服务和客户,我们应该把它理解成平级对待,大家都可以向apollo发送消息,也都可以从apollo中订阅主题。(订阅/发布)
一、创建监听
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class PushCallback implements MqttCallback{
//连接丢失:一般用与重连
public void connectionLost(Throwable throwable) {
System.out.println("丢失连接");
}
//消息到达:指收到消息
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("接收消息主题 : " + topic);
System.out.println("接收消息Qos : " + message.getQos());
System.out.println("接收消息内容 : " + new String(message.getPayload()));
}
public void deliveryComplete(IMqttDeliveryToken token) {
//(发布)publish后会执行到这里,发送状态
System.out.println("deliveryComplete---------"
+ token.isComplete());
}
}
二、接收消息(如果你不好理解,可以暂且把它理解成客户端):
import
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import top.whbweb.mqtt.callback.PushCallback;
import java.util.UUID;
public static void main(String[] args) {
try {
//apollo地址
String HOST = "tcp://127.0.0.1:61613";
//要订阅的主题
String TOPIC1="abc";
//指你Apollo中的用户名密码
String userName="admin";
String pwd="password";
String clientid =UUID.randomUUID().toString().replace("-","");
MqttClient client=new MqttClient(HOST,clientid,new MemoryPersistence());
// MQTT的连接对象
MqttConnectOptions options = new MqttConnectOptions();
//设置连接参数
//清除session回话
options.setCleanSession(false);
options.setUserName(userName);
options.setPassword(pwd.toCharArray());
//超时设置
options.setConnectionTimeout(10);
//心跳保持时间
options.setKeepAliveInterval(20);
//遗嘱:当该客户端端口连接时,会向whb主题发布一条信息
options.setWill("whb","我挂了,你加油".getBytes(),1,true);
//监听对象:自己创建
client.setCallback(new PushCallback());
//打开连接
client.connect(options);
//设置消息级别
int[] Qos={1};
//订阅主题
String[] topics={TOPIC1};
client.subscribe(topics,Qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
发送端
先建立连接,然后准备信息(主题,内容,级别),最后进行发送
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import top.whbweb.mqtt.callback.PushCallback;
import java.io.UnsupportedEncodingException;
import java.util.Scanner;
public class SendOut {
//tcp://MQTT安装的服务器地址:MQTT定义的端口号
public static final String HOST = "tcp://127.0.0.1:61613";
//定义一个主题
public static final String TOPIC = "topic11";
//定义MQTT的ID,可以在MQTT服务配置中指定
private static final String clientid = "server11";
private MqttMessage message;
public static final String TOPIC1 = "topic1";
public static final String userName = "admin";
public static final String pwd = "password";
public MqttClient client;
private MqttTopic topic;
public SendOut() {
try {
client = new MqttClient(HOST, clientid, new MemoryPersistence());
connect();
} catch (MqttException e) {
e.printStackTrace();
}
}
//发布消息
public void publish(MqttTopic topic, MqttMessage message) throws MqttException {
MqttDeliveryToken token = topic.publish(message);
token.waitForCompletion();
//打印发送状态
System.out.println("message is published completely!" + token.isComplete());
}
//建立连接:参数与订阅端相似
private void connect() throws MqttException {
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
options.setUserName(userName);
options.setPassword(pwd.toCharArray());
options.setConnectionTimeout(10);
options.setKeepAliveInterval(20);
client.setCallback(new PushCallback());
client.connect(options);
}
public static void main(String[] args) throws MqttException, UnsupportedEncodingException {
SendOut service = new SendOut();
Scanner sc = new Scanner(System.in);
service.topic = service.client.getTopic(TOPIC);
service.message = new MqttMessage();
//确保被收到一次
service.message.setQos(1);
service.message.setPayload("如果世界漆黑,其实我很美".getBytes("UTF-8"));
service.publish(service.topic, service.message);
}
}
这样,订阅者就可以接收到我们发送的消息了。
最后
以上就是追寻外套为你收集整理的MQTT协议初识——简单收发第一部分:认识MQTT第二部分准备工作第三部分开发的全部内容,希望文章能够帮你解决MQTT协议初识——简单收发第一部分:认识MQTT第二部分准备工作第三部分开发所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复