概述
MQTT使用笔记(结合类库源代码)
一、类库中用到的主要的几个线程:
以用到的类名来命名
1、ConnectBG线程:TCP连接,发送连接服务器的消息,下面的大部分的线程的启动也是在这里调用的
2、CommsReceiver线程:接收消息
3、CommsSender线程:发送消息
4、CommsCallback线程:消息接收到之后,后续处理的消费者线程
5、TimerPingSender线程:维持心跳
二、主要的几个类:
ClientState
该类库中的一个比较核心的一个类,用来唤醒发送消息线程发送数据、存储发送的消息、清除收到的消息、恢复持久类里面的数据、消息发送和收到之后发送通知,其中的成员变量HashTable类型的inUseMsgIds,用来维护已经使用的Msg Id等。上面提到了好几个线程,这几个线程之间的协作,也主要通过该类来实现。其中有通过多线程的协作,实现了生产者-消费者的功能模式。
CommsReceiver
实现发送消息进程的功能
CommsSender
实现接收消息进程的功能
CommsCallback
通过维护一个线程来处理,接收消息线程发过来的消息和Token
TCPNetworkModule
Mqtt在应用层下层使用的是TCP协议,该类通过Socket与服务器实现连接
ClientComms
该类是封装的一个类,刚开始的时候,我们通常会拿到一个MqttClient实例(MqttClient是MqttAsyncClient的代理),然后通过MqttClient实例再调用MqttAsyncClient,接着就到ClientComms类,在通过ClientComms去执行具体的功能操作
Token
使线程阻塞,直到得到结果,继续执行;该类还能在消息发布之后通过回调接口,返回成功失败结果,里面还能存放服务器应答和发送的消息实例。并不是直接使用Token,而是封装成MqttToken来使用。
CommsTokenStore
用来存储Token,其实直接存储的是MqttToken对象,存储形式也是key-value,其中的key是消息的key,value是MqttToken对象。连接服务器的消息(MqttConnect)的key是固定的"Con",断开连接服务器的消息(MqttDisconnect)的key是固定的"Disc",心跳消息(MqttPingReq)的key是固定的"Ping",其他大部分的消息的key都是整形的id转换成的字符串。
三、结合代码理解
开始的时候需要生成一个MqttClient实例和MqttConnectOptions实例,其中MqttClient上面也讲到了是MqttAsyncClient的代理类,MqttConnectOptions是连接时候的参数
使用的代码如下:
try {
// host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
client = new MqttClient(SERVER_HOST, clientid, new MemoryPersistence());
// MQTT的连接设置
options = new MqttConnectOptions();
// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
options.setCleanSession(true);
// 设置连接的用户名
options.setUserName(USERNAME);
// 设置连接的密码
options.setPassword(PASSWORD.toCharArray());
// 设置超时时间 单位为秒
options.setConnectionTimeout(30);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
options.setKeepAliveInterval(30);
// 设置回调
// MqttTopic topic = client.getTopic(TOPIC);
// setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
// options.setWill(topic, "close".getBytes(), 2, true);
// SSLSocketFactory sslSocketFactory = null;
/*
* try { sslSocketFactory =
* sslContextFromStream(mContext.getAssets().open("server.pem")).
* getSocketFactory(); } catch (Exception e) { e.printStackTrace();
* } options.setSocketFactory(sslSocketFactory);
*/
client.setCallback(new MqttCallback());
client.connect(options);
Log.e(TAG, "ClientId=" + client.getClientId());
} catch (MqttException e) {
e.printStackTrace();
Log.e(TAG, "connect: " + e);
}
3.1 MqttClient初始化
在初始化MqttAsyncClient还初始化了心跳检测对象TimerPingSender,
public MqttAsyncClient(String serverURI, String clientId, MqttClientPersistence persistence) throws MqttException {
this(serverURI,clientId, persistence, new TimerPingSender());
}
待连接成功之后,就会启动该心跳线程。
下面来看下ClientComms的初始化,代码如下:
/**
* Creates a new ClientComms object, using the specified module to handle
* the network calls.
*/
public ClientComms(IMqttAsyncClient client, MqttClientPersistence persistence, MqttPingSender pingSender) throws MqttException {
this.conState = DISCONNECTED;
this.client = client;
this.persistence = persistence;
this.pingSender = pingSender;
this.pingSender.init(this);
this.tokenStore = new CommsTokenStore(getClient().getClientId());
this.callback = new CommsCallback(this);
this.clientState = new ClientState(persistence, tokenStore, this.callback, this, pingSender);
callback.setClientState(clientState);
log.setResourceName(getClient().getClientId());
}
在ClientComms类初始化的时候,会先初始化心跳线程类,生成CommsTokenStore对象,生成CommsCallback对象,生成ClientState对象,并且将ClientState对象设置为CommsCallback对象的内部引用。
CommsTokenStore类内部是用Hashtable类实现的Token的存储,key为对应Message的Key值,Value为对应的MqttToken
CommsCallback类实现了Runnable接口,作为接收到消息之后的处理的线程的功能就是通过这个类实现的。
ClientState对象的初始化参数包括MqttClientPersistence persistence(消息的持久化管理),CommsTokenStore tokenStore(存放Token), CommsCallback callback, ClientComms clientComms, MqttPingSender pingSender(心跳线程),这些对象都作为Client内部的引用。看下ClientState初始化的代码:
protected ClientState(MqttClientPersistence persistence, CommsTokenStore tokenStore,
CommsCallback callback, ClientComms clientComms, MqttPingSender pingSender) throws MqttException {
log.setResourceName(clientComms.getClient().getClientId());
log.finer(CLASS_NAME, "<Init>", "" );
inUseMsgIds = new Hashtable();
pendingMessages = new Vector(this.maxInflight);
pendingFlows = new Vector();
outboundQoS2 = new Hashtable();
outboundQoS1 = new Hashtable();
inboundQoS2 = new Hashtable();
pingCommand = new MqttPingReq();
inFlightPubRels = 0;
actualInFlight = 0;
this.persistence = persistence;
this.callback = callback;
this.tokenStore = tokenStore;
this.clientComms = clientComms;
this.pingSender = pingSender;
restoreState();
}
inUseMsgIds(Hashtable结构)通过名字就可以知道,这里面放的就是已经使用的Msg Id。pendingMessages(Vector结构)存放客户端发送的Msg(MqttPublish结构),pendingFlows (Vector结构)存放除了发送的Msg以外的需要存储的Msg。outboundQoS2,outboundQoS1,inboundQoS2都是Hashtable结构,也是用来存放message,其中outboundQoS2,outboundQoS1用来存放发送的消息,inboundQoS2用来存放收到的qos=2的消息。inFlightPubRels是正在发送的PUBREL消息的数量,actualInFlight是正在发送的PUBLISH消息的数量。
最后调用了restoreState()方法,该方法是从消息的持久化管理对象里恢复出来发送的消息和收到的消息。如果之前客户端断开了与服务端的连接,等再次重连的时候,就可以将之前存储在持久化管理中的消息先恢复,然后再接着发送。
ClientState.java
/**
* Restores the state information from persistence.
*/
protected void restoreState() throws MqttException {
final String methodName = "restoreState";
Enumeration messageKeys = persistence.keys();
MqttPersistable persistable;
String key;
int highestMsgId = nextMsgId;
Vector orphanedPubRels = new Vector();
//@TRACE 600=>
log.fine(CLASS_NAME, methodName, "600");
while (messageKeys.hasMoreElements()) {
key = (String) messageKeys.nextElement();
persistable = persistence.get(key);
MqttWireMessage message = restoreMessage(key, persistable);
if (message != null) {
if (key.startsWith(PERSISTENCE_RECEIVED_PREFIX)) {
//@TRACE 604=inbound QoS 2 publish key={0} message={1}
log.fine(CLASS_NAME,methodName,"604", new Object[]{key,message});
// The inbound messages that we have persisted will be QoS 2
inboundQoS2.put(new Integer(message.getMessageId()),message);
} else if (key.startsWith(PERSISTENCE_SENT_PREFIX)) {
MqttPublish sendMessage = (MqttPublish) message;
highestMsgId = Math.max(sendMessage.getMessageId(), highestMsgId);
if (persistence.containsKey(getSendConfirmPersistenceKey(sendMessage))) {
MqttPersistable persistedConfirm = persistence.get(getSendConfirmPersistenceKey(sendMessage));
// QoS 2, and CONFIRM has already been sent...
// NO DUP flag is allowed for 3.1.1 spec while it's not clear for 3.1 spec
// So we just remove DUP
MqttPubRel confirmMessage = (MqttPubRel) restoreMessage(key, persistedConfirm);
if (confirmMessage != null) {
// confirmMessage.setDuplicate(true); // REMOVED
//@TRACE 605=outbound QoS 2 pubrel key={0} message={1}
log.fine(CLASS_NAME,methodName, "605", new Object[]{key,message});
outboundQoS2.put(new Integer(confirmMessage.getMessageId()), confirmMessage);
} else {
//@TRACE 606=outbound QoS 2 completed key={0} message={1}
log.fine(CLASS_NAME,methodName, "606", new Object[]{key,message});
}
} else {
// QoS 1 or 2, with no CONFIRM sent...
// Put the SEND to the list of pending messages, ensuring message ID ordering...
sendMessage.setDuplicate(true);
if (sendMessage.getMessage().getQos() == 2) {
//@TRACE 607=outbound QoS 2 publish key={0} message={1}
log.fine(CLASS_NAME,methodName, "607", new Object[]{key,message});
outboundQoS2.put(new Integer(sendMessage.getMessageId()),sendMessage);
} else {
//@TRACE 608=outbound QoS 1 publish key={0} message={1}
log.fine(CLASS_NAME,methodName, "608", new Object[]{key,message});
outboundQoS1.put(new Integer(sendMessage.getMessageId()),sendMessage);
}
}
MqttDeliveryToken tok = tokenStore.restoreToken(sendMessage);
tok.internalTok.setClient(clientComms.getClient());
inUseMsgIds.put(new Integer(sendMessage.getMessageId()),new Integer(sendMessage.getMessageId()));
}
else if (key.startsWith(PERSISTENCE_CONFIRMED_PREFIX)) {
MqttPubRel pubRelMessage = (MqttPubRel) message;
if (!persistence.containsKey(getSendPersistenceKey(pubRelMessage))) {
orphanedPubRels.addElement(key);
}
}
}
}
messageKeys = orphanedPubRels.elements();
while(messageKeys.hasMoreElements()) {
key = (String) messageKeys.nextElement();
//@TRACE 609=removing orphaned pubrel key={0}
log.fine(CLASS_NAME,methodName, "609", new Object[]{key});
persistence.remove(key);
}
nextMsgId = highestMsgId;
}
消息的持久化存储也是以Key-Value的方式存储的,其中客户端发送消息的Key是以"s-"和"sc-"的前缀加上客户端发送的消息的Id拼接成的,"s-"的前缀的是客户端发送的PUBLISH消息,"sc-"的前缀是客户端发送的PUBREL消息;客户端接收到的消息PUBLISH是以"r-"的前缀加上消息的Id拼接而成的。代码中用了两个循环,第一个循环遍历每个消息,对于以"r-"开头的消息,直接存储在inboundQoS2中。对于以"s-"开头的消息,再检查是否存在以"sc-"开头的消息,如果存在的话,直接将"sc-"开头的消息存放在outboundQoS2中,如果不存在"sc-"开头的消息,再次判断qos的类型来区别是放在outboundQoS2还是outboundQoS1中,并且将消息的Id存放在inUseMsgIds中。对于以“sc-”开头的消息,再次查询持久化存储里面是否存在"s-"开头的消息,如果不存在,则将该key存在orphanedPubRels临时对象中。第二次循环就是循环orphanedPubRels里面的消息,然后从持久化存储里面去除。最后将nextMsgId变量的值赋值成目前恢复的发送的消息中的Id的最大值。
3.2 Mqttclient设置回调接口
setCallback(new MqttCallback()
MqttClient初始化之后,接着看下client.setCallback(new MqttCallback())设置回调,这个回调一层一层最终设置到CommsCallback对象中。
再看下这个回调接口里面的回调方法,通过方法名字知道当连接断开时,消息到达时,发送消息成功时会分别回调这个接口的三个方法。
public interface MqttCallback {
public void connectionLost(Throwable cause);
public void messageArrived(String topic, MqttMessage message) throws Exception;
public void deliveryComplete(IMqttDeliveryToken token);
}
3.3 Mqttclient连接服务器
connect(MqttConnectOptions options)
最终是到达ConnectBG类(实现Runnable)中,新建了一个线程(本文以类名命名,ConnectBG线程),接着networkModule.start()就是启动与服务器的连接,如果是TCP,networkModule对象就是TCPNetworkModule类型的对象,该对象就是通过Socket实现与服务器建立TCP连接;receiver.start(…)则是在与服务器TCP连接正常的情况下启动一个接收消息线程CommsReceiver线程;sender.start(…)则是启动一个发送消息线程CommsSender线程;callback.start(…)则是启动一个线程用来处理接收到的消息CommsCallback线程;internalSend(conPacket, conToken)则是将Mqtt连接消息放到ClientState的pendingFlows集合中,等待CommsSender线程读取该Mqtt连接消息发送到服务器执行连接。
在当前线程ConnectBG线程调用internalSend()将消息放到消息集合中,用CommsSender线程来执行发送,这样就涉及到了多线程之间的交互。先看发送线程的代码
CommsSender.java
public void run() {
final String methodName = "run";
MqttWireMessage message = null;
while (running && (out != null)) {
try {
message = clientState.get();
if (message != null) {
if (message instanceof MqttAck) {
out.write(message);
out.flush();
} else {
MqttToken token = tokenStore.getToken(message);
// While quiescing the tokenstore can be cleared so need
// to check for null for the case where clear occurs
// while trying to send a message.
if (token != null) {
synchronized (token) {
out.write(message);
try {
out.flush();
} catch (IOException ex) {
// The flush has been seen to fail on disconnect of a SSL socket
// as disconnect is in progress this should not be treated as an error
if (!(message instanceof MqttDisconnect)) {
throw ex;
}
}
clientState.notifySent(message);
}
}
}
} else { // null message
running = false;
}
} catch (MqttException me) {
handleRunException(message, me);
} catch (Exception ex) {
handleRunException(message, ex);
}
} // end while
}
run()方法中启用一个while循环不停的message = clientState.get()获取需要发送的消息,拿到消息之后就发送,如果没有消息就睡眠等待消息。这个方法整体的逻辑:拿到消息数据之后如果是MqttAck,这个的意思就是发送的消息是给服务器端的应答消息,例如PUBREC、PUBACK消息,这种消息的处理方式就是直接发送。如果不是这种消息类型,则需要发送之后,需要调用ClientState的notifySent()方法做一些处理。还有如果得到的消息是null,就将退出循环。
目前接着分析发送Mqtt连接消息的多线程的协作处理。进入clientState.get()方法中看一下,
ClientState.java
protected MqttWireMessage get() throws MqttException {
final String methodName = "get";
MqttWireMessage result = null;
synchronized (queueLock) {
while (result == null) {
// If there is no work wait until there is work.
// If the inflight window is full and no flows are pending wait until space is freed.
// In both cases queueLock will be notified.
if ((pendingMessages.isEmpty() && pendingFlows.isEmpty()) ||
(pendingFlows.isEmpty() && actualInFlight >= this.maxInflight)) {
try {
//@TRACE 644=wait for new work or for space in the inflight window
log.fine(CLASS_NAME,methodName, "644");
queueLock.wait();
//@TRACE 647=new work or ping arrived
log.fine(CLASS_NAME,methodName, "647");
} catch (InterruptedException e) {
}
}
// Handle the case where not connected. This should only be the case if:
// - in the process of disconnecting / shutting down
// - in the process of connecting
if (!connected &&
(pendingFlows.isEmpty() || !((MqttWireMessage)pendingFlows.elementAt(0) instanceof MqttConnect))) {
//@TRACE 621=no outstanding flows and not connected
log.fine(CLASS_NAME,methodName,"621");
return null;
}
// Check if there is a need to send a ping to keep the session alive.
// Note this check is done before processing messages. If not done first
// an app that only publishes QoS 0 messages will prevent keepalive processing
// from functioning.
// checkForActivity(); //Use pinger, don't check here
// Now process any queued flows or messages
if (!pendingFlows.isEmpty()) {
// Process the first "flow" in the queue
result = (MqttWireMessage)pendingFlows.remove(0);
if (result instanceof MqttPubRel) {
inFlightPubRels++;
//@TRACE 617=+1 inflightpubrels={0}
log.fine(CLASS_NAME,methodName,"617", new Object[]{new Integer(inFlightPubRels)});
}
checkQuiesceLock();
} else if (!pendingMessages.isEmpty()) {
// If the inflight window is full then messages are not
// processed until the inflight window has space.
if (actualInFlight < this.maxInflight) {
// The in flight window is not full so process the
// first message in the queue
result = (MqttWireMessage)pendingMessages.elementAt(0);
pendingMessages.removeElementAt(0);
actualInFlight++;
//@TRACE 623=+1 actualInFlight={0}
log.fine(CLASS_NAME,methodName,"623",new Object[]{new Integer(actualInFlight)});
} else {
//@TRACE 622=inflight window full
log.fine(CLASS_NAME,methodName,"622");
}
}
}
}
return result;
}
可以看到发送消息线程首先获得queueLock的对象锁,其实发送消息线程从集合中获取消息和其他线程向集合中放入消息就是通过这个对象锁来控制多线程异步协作的,无论发送消息线程,还是向消息集合中放消息的线程,都必须先获得queueLock的对象锁,才能做后续操作。线程获取到对象锁之后,另外的线程只能等待获取对象锁的线程执行完毕让出对象锁之后才能执行。假设刚开始第一次使用并且发送消息线程首先获取到了queueLock的对象锁的时候,pendingMessages和pendingFlows这两个集合里面肯定是空的,这个时候,就通过queueLock.wait()让出对象锁,这个时候发送消息线程就阻塞在那了,等待发送消息来了之后被唤醒。
这个时候看一下向集合中放入Mqtt连接消息的线程中的代码,在ConnectBG类的run()方法中调用internalSend(conPacket, conToken),接着在方法内部又调用this.clientState.send(message, token)
ClientState.java
/**
* Submits a message for delivery. This method will block until there is
* room in the inFlightWindow for the message. The message is put into
* persistence before returning.
*
* @param message the message to send
* @param token the token that can be used to track delivery of the message
* @throws MqttException
*/
public void send(MqttWireMessage message, MqttToken token) throws MqttException {
final String methodName = "send";
if (message.isMessageIdRequired() && (message.getMessageId() == 0)) {
message.setMessageId(getNextMessageId());
}
if (token != null ) {
try {
token.internalTok.setMessageID(message.getMessageId());
} catch (Exception e) {
}
}
if (message instanceof MqttPublish) {
…………
} else {
//@TRACE 615=pending send key={0} message {1}
log.fine(CLASS_NAME,methodName,"615", new Object[]{new Integer(message.getMessageId()), message});
if (message instanceof MqttConnect) {
synchronized (queueLock) {
// Add the connect action at the head of the pending queue ensuring it jumps
// ahead of any of other pending actions.
tokenStore.saveToken(token, message);
pendingFlows.insertElementAt(message,0);
queueLock.notifyAll();
}
} else {
…………
}
}
}
因为Mqtt连接消息不是MqttPublish类或者子类,所以就进入到else逻辑里面,Mqtt连接消息类型是MqttConnect,首先获取queueLock对象锁,得到对象锁之后,将消息对应的Token存放到Token库tokenStore中,并且将该连接消息放到pendingFlows的第一个位置,因为首先得建立连接之后才能发送消息,之后就唤醒queueLock对象锁,唤醒之后,当前线程线程释放queueLock对象锁,这个时候消息发送线程因为在queueLock对象锁阻塞等待,现在唤醒了,所以就可以从pendingFlows中拿到Mqtt连接消息,开始发送了;发送Mqtt连接消息这是一种情况,还有一种情况,是发送线程没有首先获取到queueLock对象锁,而是向队列中放连接消息的线程首先获得了queueLock对象锁,这种情况就是在放入连接消息之后,释放queueLock对象锁之后,发送消息线程就获取到queueLock对象锁之后,队列中已经存在连接消息了,这样发送消息线程就不用阻塞了。接着再讲下ClientState类中get()方法发送消息线程被唤醒之后的处理,代码判断了下,当前如果是非连接状态的时候,需要pendingFlows中的第一个消息是Mqtt连接消息,如果不是的话,就得退出发送消息线程了,通过前面的分析,第一个消息确实是Mqtt连接消息。再向下看,首先去拿pendingFlows集合中的消息,拿到之后发送消息线程就发送给服务器了。同时将该消息从pendingFlows删除,如果该消息类型是MqttPubRel类型,将inFlightPubRels计数加1,来统计现在发送的PUBREL消息数量。
还是接着看上面这段代码,message.setMessageId(getNextMessageId())设置消息的ID,最开始初始化Message的时候,ID默认都是0,并且默认的消息ID也是需要的,所以需要在发送方法中检测到这两条符合的时候,就需要设置消息的ID。从前面的分析知道,目前正在使用的消息ID是维护在inUseMsgIds中的,nextMsgId是正在使用的消息ID的最大值,所以getNextMessageId()就是生成一个没有正在使用的ID然后设置给消息。并且从该方法中可以知道最大的消息ID值是65535,最小的消息ID是1,并且会循环使用。消息对应的MqttToken也需要设置对应的消息ID,这样等到MqttToken放到tokenStore中的时候,通过消息的ID就能找到消息对应MqttToken。
Mqtt连接消息发送之后,再看下Mqtt连接消息的应答消息接收到之后的处理逻辑。接收消息线程是在与服务器TCP连接之后启动的,见前面。
CommsReceiver.java
/**
* Run loop to receive messages from the server.
*/
public void run() {
final String methodName = "run";
MqttToken token = null;
while (running && (in != null)) {
try {
//@TRACE 852=network read message
log.fine(CLASS_NAME,methodName,"852");
receiving = in.available() > 0;
MqttWireMessage message = in.readMqttWireMessage();
receiving = false;
if (message instanceof MqttAck) {
token = tokenStore.getToken(message);
if (token!=null) {
synchronized (token) {
// Ensure the notify processing is done under a lock on the token
// This ensures that the send processing can complete before the
// receive processing starts! ( request and ack and ack processing
// can occur before request processing is complete if not!
clientState.notifyReceivedAck((MqttAck)message);
}
} else {
// It its an ack and there is no token then something is not right.
// An ack should always have a token assoicated with it.
throw new MqttException(MqttException.REASON_CODE_UNEXPECTED_ERROR);
}
} else {
// A new message has arrived
clientState.notifyReceivedMsg(message);
}
}
catch (MqttException ex) {
//@TRACE 856=Stopping, MQttException
log.fine(CLASS_NAME,methodName,"856",null,ex);
running = false;
// Token maybe null but that is handled in shutdown
clientComms.shutdownConnection(token, ex);
}
catch (IOException ioe) {
//@TRACE 853=Stopping due to IOException
log.fine(CLASS_NAME,methodName,"853");
running = false;
// An EOFException could be raised if the broker processes the
// DISCONNECT and ends the socket before we complete. As such,
// only shutdown the connection if we're not already shutting down.
if (!clientComms.isDisconnecting()) {
clientComms.shutdownConnection(token, new MqttException(MqttException.REASON_CODE_CONNECTION_LOST, ioe));
}
}
finally {
receiving = false;
}
}
//@TRACE 854=<
log.fine(CLASS_NAME,methodName,"854");
}
接收到的消息类型MqttConnack,继承了MqttAck,所以首先通过得到的消息得到消息ID,再通过ID对应的消息的MqttToken,之后就是调用clientState.notifyReceivedAck((MqttAck)message),查看下notifyReceivedAck(MqttAck ack)方法的相关代码:
ClientState.java
} else if (ack instanceof MqttConnack) {
int rc = ((MqttConnack) ack).getReturnCode();
if (rc == 0) {
synchronized (queueLock) {
if (cleanSession) {
clearState();
// Add the connect token back in so that users can be
// notified when connect completes.
tokenStore.saveToken(token,ack);
}
inFlightPubRels = 0;
actualInFlight = 0;
restoreInflightMessages();
connected();
}
} else {
mex = ExceptionHelper.createMqttException(rc);
throw mex;
}
clientComms.connectComplete((MqttConnack) ack, mex);
notifyResult(ack, token, mex);
tokenStore.removeToken(ack);
// Notify the sender thread that there maybe work for it to do now
synchronized (queueLock) {
queueLock.notifyAll();
}
} else {
得到连接应答的返回码等于0,代表连接成功。连接成功之后,如果设置了cleanSession需要将之前记忆的东西全部清除,cleanSession是通过MqttConnectOptions类对象设置的,在初始化MqttClient参数的时候设置。接着再将现在的连接应答对应的MqttToken放进tokenStore中,以便连接完成的时候,用户可以被通知到。
ClientState.java
protected void clearState() throws MqttException {
final String methodName = "clearState";
//@TRACE 603=clearState
log.fine(CLASS_NAME, methodName,">");
persistence.clear();
inUseMsgIds.clear();
pendingMessages.clear();
pendingFlows.clear();
outboundQoS2.clear();
outboundQoS1.clear();
inboundQoS2.clear();
tokenStore.clear();
}
接着再将inFlightPubRels和actualInFlight变量都设置为0,然后调用restoreInflightMessages()恢复正在发送给服务器的消息。见下面
ClientState.java
private void restoreInflightMessages() {
final String methodName = "restoreInflightMessages";
pendingMessages = new Vector(this.maxInflight);
pendingFlows = new Vector();
Enumeration keys = outboundQoS2.keys();
while (keys.hasMoreElements()) {
Object key = keys.nextElement();
MqttWireMessage msg = (MqttWireMessage) outboundQoS2.get(key);
if (msg instanceof MqttPublish) {
//@TRACE 610=QoS 2 publish key={0}
log.fine(CLASS_NAME,methodName, "610", new Object[]{key});
// set DUP flag only for PUBLISH, but NOT for PUBREL (spec 3.1.1)
msg.setDuplicate(true);
insertInOrder(pendingMessages, (MqttPublish)msg);
} else if (msg instanceof MqttPubRel) {
//@TRACE 611=QoS 2 pubrel key={0}
log.fine(CLASS_NAME,methodName, "611", new Object[]{key});
insertInOrder(pendingFlows, (MqttPubRel)msg);
}
}
keys = outboundQoS1.keys();
while (keys.hasMoreElements()) {
Object key = keys.nextElement();
MqttPublish msg = (MqttPublish)outboundQoS1.get(key);
msg.setDuplicate(true);
//@TRACE 612=QoS 1 publish key={0}
log.fine(CLASS_NAME,methodName, "612", new Object[]{key});
insertInOrder(pendingMessages, msg);
}
this.pendingFlows = reOrder(pendingFlows);
this.pendingMessages = reOrder(pendingMessages);
}
首先将outboundQoS2和outboundQoS1中的消息按照消息ID大小顺序插入到pendingMessages和pendingFlows中,insertInOrder就是实现判断ID大小的。之后,又调用了reOrder(Vector list)方法,将pendingMessages和pendingFlows重新排序了下。我们知道消息ID值是循环的1到65535之间的值,如果消息ID值到了65535之后,再分配的话,就得从1开始分配了。举个例子,如果pendingMessages中ID目前的消息顺序是1,2,3,4,5,65535,就认为之前先发送的是ID为65535的消息,然后是ID为1,2,3,4,5的消息,所以现在消息集合中顺序需要调整为65535,1,2,3,4,5,后续再发送的时候,就会按照这个顺序发送了。
接着回到notifyReceivedAck(MqttAck ack)方法中,继续分析代码,恢复完数据之后,调用connected()方法,设置变量connected等于true,接着就启动心跳线程pingSender.start(),pingSender对象实际是TimerPingSender对象,在每隔间隔时间启动一个PingTask任务,这个间隔时间是在MqttConnectOptions类的对象设置的时候传递进来的。心跳线程的处理后面再分析,接着向下看。执行clientComms.connectComplete((MqttConnack) ack, mex),这个方法里面判断如果连接成功,设置ClientComms类的conState的值为CONNECTED。
后面接着执行notifyResult(ack, token, mex)方法,这个方法主要调用token.internalTok.markComplete(ack, ex)标记对应的Token的pendingComplete的值为true,并且将应答消息设置到Token对应的response中,还会执行客户端注册的异步回调接口。
ClientState.java
protected void notifyResult(MqttWireMessage ack, MqttToken token, MqttException ex) {
final String methodName = "notifyResult";
// unblock any threads waiting on the token
token.internalTok.markComplete(ack, ex);
// Let the user know an async operation has completed and then remove the token
if (ack != null && ack instanceof MqttAck && !(ack instanceof MqttPubRec)) {
//@TRACE 648=key{0}, msg={1}, excep={2}
log.fine(CLASS_NAME,methodName, "648", new Object [] {token.internalTok.getKey(), ack, ex});
callback.asyncOperationComplete(token);
}
// There are cases where there is no ack as the operation failed before
// an ack was received
if (ack == null ) {
//@TRACE 649=key={0},excep={1}
log.fine(CLASS_NAME,methodName, "649", new Object [] { token.internalTok.getKey(), ex});
callback.asyncOperationComplete(token);
}
}
notifyResult()方法执行之后,将MqttToken从tokenStore库中删除。重点看下异步回调接口怎么执行的,ClientState里面执行callback.asyncOperationComplete(token),callback是CommsCallback类型引用,进入CommsCallback类,看下asyncOperationComplete(MqttToken token)方法。先说下CommsCallback类,该类是继承了Runnable接口,所以其内部实现了一个消费者线程,如果没有内容需要消费的话,该线程也是阻塞,等待有内容来到之后,唤醒开始执行。看到asyncOperationComplete(MqttToken token)方法里面也是将MqttToken放到completeQueue里面之后,就通过workAvailable.notifyAll()唤醒消费者线程。下面就进入到消费者线程里面看下处理流程。代码在CommsCallback类的run()方法中,关键代码如下:
CommsCallback.java
public void run() {
final String methodName = "run";
while (running) {
try {
// If no work is currently available, then wait until there is some...
try {
synchronized (workAvailable) {
if (running && messageQueue.isEmpty()
&& completeQueue.isEmpty()) {
// @TRACE 704=wait for workAvailable
log.fine(CLASS_NAME, methodName, "704");
workAvailable.wait();
}
}
} catch (InterruptedException e) {
}
if (running) {
// Check for deliveryComplete callbacks...
MqttToken token = null;
synchronized (completeQueue) {
if (!completeQueue.isEmpty()) {
// First call the delivery arrived callback if needed
token = (MqttToken) completeQueue.elementAt(0);
completeQueue.removeElementAt(0);
}
}
if (null != token) {
handleActionComplete(token);
}
…………
}
…………
} catch (Throwable ex) {
……
} finally {
…………
}
}
}
通过上面的方法,可以清楚的看到在messageQueue和completeQueue里面没有内容的时候消费者线程通过workAvailable.wait()睡眠等待,现在已经放入了Mqtt连接消息对应的MqttToken,并且唤醒了线程,消费者线程就会继续向下执行。可以看到消费者线程拿到MqttToken之后,就进入到handleActionComplete(token)方法中去执行。
CommsCallback.java
private void handleActionComplete(MqttToken token)
throws MqttException {
final String methodName = "handleActionComplete";
synchronized (token) {
// @TRACE 705=callback and notify for key={0}
log.fine(CLASS_NAME, methodName, "705", new Object[] { token.internalTok.getKey() });
// Unblock any waiters and if pending complete now set completed
token.internalTok.notifyComplete();
if (!token.internalTok.isNotified()) {
// If a callback is registered and delivery has finished
// call delivery complete callback.
if ( mqttCallback != null
&& token instanceof MqttDeliveryToken
&& token.isComplete()) {
mqttCallback.deliveryComplete((MqttDeliveryToken) token);
}
// Now call async action completion callbacks
fireActionEvent(token);
}
// Set notified so we don't tell the user again about this action.
if ( token.isComplete() ){
if ( token instanceof MqttDeliveryToken || token.getActionCallback() instanceof IMqttActionListener ) {
token.internalTok.setNotified(true);
}
}
if (token.isComplete()) {
// Finish by doing any post processing such as delete
// from persistent store but only do so if the action
// is complete
clientState.notifyComplete(token);
}
}
}
handleActionComplete(token)方法执行如下步骤:
1、调用Token的notifyComplete()方法设置Token的状态,并且唤醒所有在该Token上阻塞的线程
2、检查Token的通知状态,如果没有被通知,并且是MqttDeliveryToken类型,Token的状态也是完成的情况的话,会首先回调MqttCallback接口的deliveryComplete()方法
3、如果Token的通知状态是没有被通知,会执行fireActionEvent()方法,该方法就是检查Token是否设置了IMqttActionListener接口,如果设置了的话,就会执行IMqttActionListener接口的回调方法
4、Token假如现在是完成的状态,并且如果是MqttDeliveryToken类型或者Token的IMqttActionListener接口存在的话,将Token的通知状态设置为通知过状态
5、Token假如现在是完成的状态,执行clientState对象的notifyComplete()方法
这个Token是在ConnectActionListener类的connect()方法中初始化的
ConnectActionListener.java
public void connect() throws MqttPersistenceException {
MqttToken token = new MqttToken(client.getClientId());
token.setActionCallback(this);
token.setUserContext(this);
…………
}
从上面的代码可以知道Token的IMqttActionListener接口设置为了ConnectActionListener对象本身,从上面的3步骤得知,成功的时候将会回调ConnectActionListener对象的onSuccess()方法,失败的时候将执行接口onFail()方法。
ConnectActionListener.java
public void onSuccess(IMqttToken token) {
if (originalMqttVersion == MqttConnectOptions.MQTT_VERSION_DEFAULT) {
options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_DEFAULT);
}
userToken.internalTok.markComplete(token.getResponse(), null);
userToken.internalTok.notifyComplete();
if (userCallback != null) {
userToken.setUserContext(userContext);
userCallback.onSuccess(userToken);
}
}
从上面可以知道,连接成功的时候,调用ConnectActionListener对象的成员变量userToken的internalTok的markComplete()和notifyComplete()方法,这两个方法前面也讲过了,将应答返回给成员变量userToken,并且唤醒在userToken.internalTok上睡眠的线程。现在是哪个线程在该Token上面睡眠等待呢?看一下这个userToken,这个userToken是在构造ConnectActionListener对象的时候传递进来的,ConnectActionListener的构造函数是在MqttAsyncClient的成员函数connect(MqttConnectOptions,Object,IMqttActionListener)里面,
MqttAsyncClient.java
public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttActionListener callback)
throws MqttException, MqttSecurityException {
…………
comms.setNetworkModules(createNetworkModules(serverURI, options));
// Insert our own callback to iterate through the URIs till the connect succeeds
MqttToken userToken = new MqttToken(getClientId());
ConnectActionListener connectActionListener = new ConnectActionListener(this, persistence, comms, options, userToken, userContext, callback);
userToken.setActionCallback(connectActionListener);
userToken.setUserContext(this);
comms.setNetworkModuleIndex(0);
connectActionListener.connect();
return userToken;
}
可以看到是一个MqttToken,并且将该MqttToken返回给了调用者,调用它的就是MqttClient对象的成员函数connect(MqttConnectOptions options),
MqttClient.java
public void connect(MqttConnectOptions options) throws MqttSecurityException, MqttException {
aClient.connect(options, null, null).waitForCompletion(getTimeToWait());
}
我们的程序就是通过调用该方法来执行连接服务器操作的,这样,程序的当前线程就会睡眠等待,直到得到结果。MqttClient对象的getTimeToWait()方法默认值是-1,看一下MqttToken里面的waitForCompletion(-1)是怎么实现的
MqttToken.java
public void waitForCompletion(long timeout) throws MqttException {
internalTok.waitForCompletion(timeout);
}
可以看到调用了内部成员变量internalTok的waitForCompletion(-1)方法,并且throws了MqttException,该方法怎么实现睡眠等待的?怎么被唤醒的?什么时候throws了MqttException?
MqttToken的成员变量internalTok是Token对象,
Token.java
public void waitForCompletion(long timeout) throws MqttException {
final String methodName = "waitForCompletion";
//@TRACE 407=key={0} wait max={1} token={2}
log.fine(CLASS_NAME,methodName, "407",new Object[]{getKey(), new Long(timeout), this});
MqttWireMessage resp = waitForResponse(timeout);
if (resp == null && !completed) {
//@TRACE 406=key={0} timed out token={1}
log.fine(CLASS_NAME,methodName, "406",new Object[]{getKey(), this});
exception = new MqttException(MqttException.REASON_CODE_CLIENT_TIMEOUT);
throw exception;
}
checkResult();
}
protected MqttWireMessage waitForResponse(long timeout) throws MqttException {
final String methodName = "waitForResponse";
synchronized (responseLock) {
//@TRACE 400=>key={0} timeout={1} sent={2} completed={3} hasException={4} response={5} token={6}
log.fine(CLASS_NAME, methodName, "400",new Object[]{getKey(), new Long(timeout),new Boolean(sent),new Boolean(completed),(exception==null)?"false":"true",response,this},exception);
while (!this.completed) {
if (this.exception == null) {
try {
//@TRACE 408=key={0} wait max={1}
log.fine(CLASS_NAME,methodName,"408",new Object[] {getKey(),new Long(timeout)});
if (timeout <= 0) {
responseLock.wait();
} else {
responseLock.wait(timeout);
}
} catch (InterruptedException e) {
exception = new MqttException(e);
}
}
if (!this.completed) {
if (this.exception != null) {
//@TRACE 401=failed with exception
log.fine(CLASS_NAME,methodName,"401",null,exception);
throw exception;
}
if (timeout > 0) {
// time up and still not completed
break;
}
}
}
}
//@TRACE 402=key={0} response={1}
log.fine(CLASS_NAME,methodName, "402",new Object[]{getKey(), this.response});
return this.response;
}
waitForCompletion()调用了waitForResponse()方法,在参数timeout=-1的时候,调用了responseLock(responseLock是普通的Object对象)的wait()方法无限等待,直到被唤醒。这样就实现了在当前线程等待结果之后,才会继续向下执行。
在Token对象上睡眠等待的线程是被Token对象的notifyComplete()方法唤醒的,上面也讲到了,针对这个连接服务器的线程的唤醒就是在ConnectActionListener接口回调的方法中执行的。
通过waitForResponse()方法可以看到,线程被唤醒之后,如果completed没有被置为true,Token的成员变量exception不为null,就抛出。接着就回到waitForCompletion()方法里,应答为null,并且completed不为true的时候,也会生成一个REASON_CODE_CLIENT_TIMEOUT的异常,并且抛出,最后checkResult()也是检查成员变量exception,如果不为null,也抛出。这些抛出的异常最终都会被调用MqttClient对象的成员函数connect(MqttConnectOptions options)的这个地方捕获到,可以看到如果正常成功收到结果的话,是不会捕获到异常的。
从上面可以看到,连接服务器的时候使用了2个Token实例,第1个用来使当前线程睡眠等待结果,第2个通过放入库里,等待接收到服务器的应答之后,在Token库里取到,将服务器的应答设置到它里面,通过它的回调执行第1个Token的唤醒线程的方法。
结合上面的连接服务器的分析知道,这是一个多线程协作的一个过程,将上面的各个线程的正常执行过程画成图的方式,基本就是上面的这幅图。
3.4 Mqttclient发送消息
连接服务器成功之后,就需要发送消息了,看下发送消息的调用的代码
/**
* 发布消息
*
* @param topic
* 发布消息主题
* @param msg
* 消息体
* @param isRetained
* 是否为保留消息
*/
public void publish(String topic, byte[] msg, boolean isRetained, int qos) {
try {
if (client != null) {
client.publish(topic, msg, qos, isRetained);
Log.e(TAG, "topic=" + topic + "--msg=" + msg + "--isRetained" + isRetained);
}
} catch (MqttPersistenceException e) {
e.printStackTrace();
} catch (MqttException e) {
e.printStackTrace();
}
}
client是MqttClient对象,看下MqttClient类里面对应的publish()方法
MqttClient.java
public void publish(String topic, byte[] payload,int qos, boolean retained) throws MqttException,
MqttPersistenceException {
MqttMessage message = new MqttMessage(payload);
message.setQos(qos);
message.setRetained(retained);
this.publish(topic, message);
}
方法里面将对应的消息信息封装成MqttMessage对象,然后再调用本类里面的成员函数public(String topic, MqttMessage message)方法,
public void publish(String topic, MqttMessage message) throws MqttException,
MqttPersistenceException {
aClient.publish(topic, message, null, null).waitForCompletion(getTimeToWait());
}
该方法调用成员函数aClient的public()方法,这个方法返回的也是一个MqttToken类对象,并且调用MqttToken类对象的waitForCompletion(getTimeToWait())方法实现调用线程睡眠等待结果。这个和上面分析的发送连接服务器消息类似。接着去看MqttAsyncClient里里面的成员函数的对应的代码
public IMqttDeliveryToken publish(String topic, MqttMessage message, Object userContext, IMqttActionListener callback) throws MqttException,
MqttPersistenceException {
final String methodName = "publish";
//@TRACE 111=< topic={0} message={1}userContext={1} callback={2}
log.fine(CLASS_NAME,methodName,"111", new Object[] {topic, userContext, callback});
//Checks if a topic is valid when publishing a message.
MqttTopic.validate(topic, false/*wildcards NOT allowed*/);
MqttDeliveryToken token = new MqttDeliveryToken(getClientId());
token.setActionCallback(callback);
token.setUserContext(userContext);
token.setMessage(message);
token.internalTok.setTopics(new String[] {topic});
MqttPublish pubMsg = new MqttPublish(topic, message);
comms.sendNoWait(pubMsg, token);
//@TRACE 112=<
log.fine(CLASS_NAME,methodName,"112");
return token;
}
该方法的后两个参数传递过来的都是null,最后一个参数是结果异步回调,现在我们没有设置这个回调,那么怎么知道发送是否成功呢?先留个疑问。
方法里面生成了一个MqttDeliveryToken对象,并且在方法最后返回的也是这个实例,所以调用线程就是使用的这个MqttToken实例来实现睡眠等待结果的。发送的消息内容封装也设置到该MqttToken的内部,接着生成一个MqttPublish实例,调用成员变量comms的sendNoWait()方法。成员变量comms是ClientComms对象,进入到该方法中看下
ClientComms.java
/**
* Sends a message to the broker if in connected state, but only waits for the message to be
* stored, before returning.
*/
public void sendNoWait(MqttWireMessage message, MqttToken token) throws MqttException {
final String methodName = "sendNoWait";
if (isConnected() ||
(!isConnected() && message instanceof MqttConnect) ||
(isDisconnecting() && message instanceof MqttDisconnect)) {
this.internalSend(message, token);
} else {
//@TRACE 208=failed: not connected
log.fine(CLASS_NAME, methodName, "208");
throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED);
}
}
现在这个时候已经是连接的状态了,所以就调用成员函数internalSend()方法,
ClientComms.java
void internalSend(MqttWireMessage message, MqttToken token) throws MqttException {
…………
try {
// Persist if needed and send the message
this.clientState.send(message, token);
} catch(MqttException e) {
if (message instanceof MqttPublish) {
this.clientState.undo((MqttPublish)message);
}
throw e;
}
}
该方法主要是调用了内部成员clientState的send()方法,clientState是ClientState实例对象,
ClientState.java
public void send(MqttWireMessage message, MqttToken token) throws MqttException {
…………
if (message instanceof MqttPublish) {
synchronized (queueLock) {
if (actualInFlight >= this.maxInflight) {
//@TRACE 613= sending {0} msgs at max inflight window
log.fine(CLASS_NAME, methodName, "613", new Object[]{new Integer(actualInFlight)});
throw new MqttException(MqttException.REASON_CODE_MAX_INFLIGHT);
}
MqttMessage innerMessage = ((MqttPublish) message).getMessage();
//@TRACE 628=pending publish key={0} qos={1} message={2}
log.fine(CLASS_NAME,methodName,"628", new Object[]{new Integer(message.getMessageId()), new Integer(innerMessage.getQos()), message});
switch(innerMessage.getQos()) {
case 2:
outboundQoS2.put(new Integer(message.getMessageId()), message);
persistence.put(getSendPersistenceKey(message), (MqttPublish) message);
break;
case 1:
outboundQoS1.put(new Integer(message.getMessageId()), message);
persistence.put(getSendPersistenceKey(message), (MqttPublish) message);
break;
}
tokenStore.saveToken(token, message);
pendingMessages.addElement(message);
queueLock.notifyAll();
}
}
…………
}
发送的消息根据qos是1还是2来放到outboundQoS1集合或outboundQoS2集合中,并且将消息都放到持久化管理对象里面(qos=0的没有放入),将MqttToken实例放入tokenStore库中,然后再将消息放到成员pendingMessages中(之前连接服务器的消息存放在成员pendingFlows中),放到这里面之后,就可以唤醒发送消息的线程CommsSender线程(queueLock.notifyAll(),CommsSender线程在queueLock对象锁上睡眠),将消息发送出去。
通过前面连接服务器的分析知道,CommsSender线程在ClientState类的get()方法中睡眠等待的,被唤醒之后,拿到发送的消息,接着就需要发送给服务器,就在CommsSender线程的处理逻辑中(在run()方法中),
CommsSender.java
public void run() {
final String methodName = "run";
MqttWireMessage message = null;
while (running && (out != null)) {
try {
message = clientState.get();
…………
MqttToken token = tokenStore.getToken(message);
// While quiescing the tokenstore can be cleared so need
// to check for null for the case where clear occurs
// while trying to send a message.
if (token != null) {
synchronized (token) {
out.write(message);
try {
out.flush();
} catch (IOException ex) {
// The flush has been seen to fail on disconnect of a SSL socket
// as disconnect is in progress this should not be treated as an error
if (!(message instanceof MqttDisconnect)) {
throw ex;
}
}
clientState.notifySent(message);
}
}
…………
} catch (MqttException me) {
handleRunException(message, me);
} catch (Exception ex) {
handleRunException(message, ex);
}
} // end while
//@TRACE 805=<
log.fine(CLASS_NAME, methodName,"805");
}
发送完之后,接着调用clientState.notifySent(message)方法,
ClientState.java
protected void notifySent(MqttWireMessage message) {
final String methodName = "notifySent";
this.lastOutboundActivity = System.currentTimeMillis();
MqttToken token = tokenStore.getToken(message);
……………………
if (message instanceof MqttPublish) {
if (((MqttPublish)message).getMessage().getQos() == 0) {
// once a QoS 0 message is sent we can clean up its records straight away as
// we won't be hearing about it again
token.internalTok.markComplete(null, null);
callback.asyncOperationComplete(token);
decrementInFlight();
releaseMessageId(message.getMessageId());
tokenStore.removeToken(message);
checkQuiesceLock();
}
}
首先更新成员变量lastOutboundActivity的值,这个值记录的是最后一次发出去的消息的时间,如果发送消息的qos是0,就是至多收到1次的消息,对于这种的消息,发送完成就完事了,所以现在就是完成的状态了,Token实例的markComplete()前面讲述过了,callback.asyncOperationComplete(token)方法前面也讲过了,不同的是现在的MqttToken类型是MqttDeliveryToken,会回调注册的MqttCallback接口的deliveryComplete()方法通知成功。该方法接着减少正在发送的消息的数量,并且将该消息占用的MessageId释放了,接着再将消息对应的MqttToken从Token库里面去除。这是针对qos=0的处理,因为发送完数据之后,就不管了,后面全部就交给TCP处理了。
qos=1和qos=2的都需要CommsReceiver线程接收应答消息才能知道是否成功,接着就看下现在接收线程的处理。通过连接服务器应答那块的分析,可以知道接收到应答消息之后,就进入到ClientState实例的notifyReceivedAck()方法中
ClientState.java
protected void notifyReceivedAck(MqttAck ack) throws MqttException {
this.lastInboundActivity = System.currentTimeMillis();
MqttToken token = tokenStore.getToken(ack);
MqttException mex = null;
if (ack instanceof MqttPubRec) {
// Complete the QoS 2 flow. Unlike all other
// flows, QoS is a 2 phase flow. The second phase sends a
// PUBREL - the operation is not complete until a PUBCOMP
// is received
MqttPubRel rel = new MqttPubRel((MqttPubRec) ack);
this.send(rel, token);
} else if (ack instanceof MqttPubAck || ack instanceof MqttPubComp) {
// QoS 1 & 2 notify users of result before removing from
// persistence
notifyResult(ack, token, mex);
// Do not remove publish / delivery token at this stage
// do this when the persistence is removed later
}
…………
}
首先更新了收到消息的时间变量lastInboundActivity的值,这块的处理包括了qos=1和qos=2的逻辑。qos=1的消息发出去之后,会受到PUBACK消息。qos=2的消息发送出去之后,会收到PUBREC消息,接着发送PUBREL消息,最后接收到PUBCOMP消息。
当收到的消息是MqttPubRec类型的实例时,重新生成一个MqttPubRel实例,继续发送,对于MqttPubRec类型的消息的处理如下
ClientState.java
public void send(MqttWireMessage message, MqttToken token) throws MqttException {
…………
else if (message instanceof MqttPubRel) {
outboundQoS2.put(new Integer(message.getMessageId()), message);
persistence.put(getSendConfirmPersistenceKey(message), (MqttPubRel) message);
}
…………
synchronized (queueLock) {
if ( !(message instanceof MqttAck )) {
tokenStore.saveToken(token, message);
}
pendingFlows.addElement(message);
queueLock.notifyAll();
}
}
主要是将该MqttPubRel类型消息放进持久化管理对象里面中,并且持久化管理对象中的key是"sc-"+消息ID,接着就将该消息放入了成员变量pendingFlows中。放入之后,就又唤醒CommsSender线程,去发送,处理逻辑一样,就不讲了。
接着看对于qos=1和qos=2的接到最后一个应答的处理,调用了notifyResult(ack, token, mex)方法,该方法前面也讲过,不细说了。后面就会执行到CommsCallback类的成员函数handleActionComplete()中去,该方法会唤醒调用线程,这样调用线程就会继续执行了。如果通信成功了,该方法还会调用ClientState类的成员函数notifyComplete(),看下这个函数的内容
ClientState.java
protected void notifyComplete(MqttToken token) throws MqttException {
MqttWireMessage message = token.internalTok.getWireMessage();
if (message != null && message instanceof MqttAck) {
MqttAck ack = (MqttAck) message;
if (ack instanceof MqttPubAck) {
// QoS 1 - user notified now remove from persistence...
persistence.remove(getSendPersistenceKey(message));
outboundQoS1.remove(new Integer(ack.getMessageId()));
decrementInFlight();
releaseMessageId(message.getMessageId());
tokenStore.removeToken(message);
// @TRACE 650=removed Qos 1 publish. key={0}
log.fine(CLASS_NAME, methodName, "650",
new Object[] { new Integer(ack.getMessageId()) });
} else if (ack instanceof MqttPubComp) {
// QoS 2 - user notified now remove from persistence...
persistence.remove(getSendPersistenceKey(message));
persistence.remove(getSendConfirmPersistenceKey(message));
outboundQoS2.remove(new Integer(ack.getMessageId()));
inFlightPubRels--;
decrementInFlight();
releaseMessageId(message.getMessageId());
tokenStore.removeToken(message);
// @TRACE 645=removed QoS 2 publish/pubrel. key={0}, -1 inFlightPubRels={1}
log.fine(CLASS_NAME, methodName, "645", new Object[] {
new Integer(ack.getMessageId()),
new Integer(inFlightPubRels) });
}
…………
}
}
可以看到对于qos=1和qos=2的处理方式是相似的,从持久化管理对象中删除,outboundQoS实例对象中删除,减少计数,释放Message ID,从Token库中删除消息对应的Token对象。
对比发送连接服务器消息和发送普通消息的流程,可以发现两个流程也是相似的。不过发送连接服务器的消息的时候,生成了2个MqttToken对象,发送普通消息的时候只是用了一个MqttToken对象,并且类型是MqttDeliveryToken。
怎么判断消息发送成功?调用发送消息的方法,可以看到并没有设置回调接口来接收结果,并且API里面也没提供设置回调的方法。但是调用方法会抛出异常,经过前面的分析也能知道,如果在发送过程中如果出现异常,会将异常存储在消息对应的MqttToken中,唤醒睡眠进程的时候,会将MqttToken中的异常抛出。所以如果调用发送消息的方法没有抛出异常,就认为发送成功了。
3.5 心跳
前面分析过,如果连接服务器成功的情况下,会启动心跳功能。心跳功能是为了保持该连接保活,如果心跳功能检测到超时了,就会将该连接断开,将所有睡眠的线程唤醒,接着会重新连接服务器。
心跳功能的启动是在TimerPingSender类里面,通过内部成员变量comms(ClientComms类对象),调用comms.checkForActivity()方法实现,
ClientComms.java
public MqttToken checkForActivity(){
MqttToken token = null;
try{
token = clientState.checkForActivity();
}catch(MqttException e){
handleRunException(e);
}catch(Exception e){
handleRunException(e);
}
return token;
}
通过调用成员变量clientState(ClientState对象)的checkForActivity()方法,如果出现异常就会进入handleRunException(e)方法处理。
ClientState.java
/**
* Check and send a ping if needed and check for ping timeout.
* Need to send a ping if nothing has been sent or received
* in the last keepalive interval. It is important to check for
* both sent and received packets in order to catch the case where an
* app is solely sending QoS 0 messages or receiving QoS 0 messages.
* QoS 0 message are not good enough for checking a connection is
* alive as they are one way messages.
*
* If a ping has been sent but no data has been received in the
* last keepalive interval then the connection is deamed to be broken.
*
* @return token of ping command, null if no ping command has been sent.
*/
public MqttToken checkForActivity() throws MqttException {
final String methodName = "checkForActivity";
//@TRACE 616=checkForActivity entered
log.fine(CLASS_NAME,methodName,"616", new Object[]{});
synchronized (quiesceLock) {
// ref bug: https://bugs.eclipse.org/bugs/show_bug.cgi?id=440698
// No ping while quiescing
if (quiescing) {
return null;
}
}
MqttToken token = null;
long nextPingTime = getKeepAlive();
if (connected && this.keepAlive > 0) {
long time = System.currentTimeMillis();
//Reduce schedule frequency since System.currentTimeMillis is no accurate, add a buffer
//It is 1/10 in minimum keepalive unit.
int delta = 100;
// ref bug: https://bugs.eclipse.org/bugs/show_bug.cgi?id=446663
synchronized (pingOutstandingLock) {
// Is the broker connection lost because the broker did not reply to my ping?
if (pingOutstanding > 0 && (time - lastInboundActivity >= keepAlive + delta)) {
// lastInboundActivity will be updated once receiving is done.
// Add a delta, since the timer and System.currentTimeMillis() is not accurate.
// A ping is outstanding but no packet has been received in KA so connection is deemed broken
//@TRACE 619=Timed out as no activity, keepAlive={0} lastOutboundActivity={1} lastInboundActivity={2} time={3} lastPing={4}
log.severe(CLASS_NAME,methodName,"619", new Object[]{new Long(this.keepAlive),new Long(lastOutboundActivity),new Long(lastInboundActivity), new Long(time), new Long(lastPing)});
// A ping has already been sent. At this point, assume that the
// broker has hung and the TCP layer hasn't noticed.
throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_TIMEOUT);
}
// Is the broker connection lost because I could not get any successful write for 2 keepAlive intervals?
if (pingOutstanding == 0 && (time - lastOutboundActivity >= 2*keepAlive)) {
// I am probably blocked on a write operations as I should have been able to write at least a ping message
log.severe(CLASS_NAME,methodName,"642", new Object[]{new Long(this.keepAlive),new Long(lastOutboundActivity),new Long(lastInboundActivity), new Long(time), new Long(lastPing)});
// A ping has not been sent but I am not progressing on the current write operation.
// At this point, assume that the broker has hung and the TCP layer hasn't noticed.
throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_WRITE_TIMEOUT);
}
// 1. Is a ping required by the client to verify whether the broker is down?
// Condition: ((pingOutstanding == 0 && (time - lastInboundActivity >= keepAlive + delta)))
// In this case only one ping is sent. If not confirmed, client will assume a lost connection to the broker.
// 2. Is a ping required by the broker to keep the client alive?
// Condition: (time - lastOutboundActivity >= keepAlive - delta)
// In this case more than one ping outstanding may be necessary.
// This would be the case when receiving a large message;
// the broker needs to keep receiving a regular ping even if the ping response are queued after the long message
// If lacking to do so, the broker will consider my connection lost and cut my socket.
if ((pingOutstanding == 0 && (time - lastInboundActivity >= keepAlive - delta)) ||
(time - lastOutboundActivity >= keepAlive - delta)) {
//@TRACE 620=ping needed. keepAlive={0} lastOutboundActivity={1} lastInboundActivity={2}
log.fine(CLASS_NAME,methodName,"620", new Object[]{new Long(this.keepAlive),new Long(lastOutboundActivity),new Long(lastInboundActivity)});
// pingOutstanding++; // it will be set after the ping has been written on the wire
// lastPing = time; // it will be set after the ping has been written on the wire
token = new MqttToken(clientComms.getClient().getClientId());
tokenStore.saveToken(token, pingCommand);
pendingFlows.insertElementAt(pingCommand, 0);
nextPingTime = getKeepAlive();
//Wake sender thread since it may be in wait state (in ClientState.get())
notifyQueueLock();
}
else {
log.fine(CLASS_NAME, methodName, "634", null);
nextPingTime = Math.max(1, getKeepAlive() - (time - lastOutboundActivity));
}
}
//@TRACE 624=Schedule next ping at {0}
log.fine(CLASS_NAME, methodName,"624", new Object[]{new Long(nextPingTime)});
pingSender.schedule(nextPingTime);
}
return token;
}
从代码中可以知道,什么时候判断保活超时,什么时候发送心跳消息。
第一种超时情况:pingOutstanding > 0,代表客户端发出去了心跳消息,还没收到心跳应答。delta设置为100,作为偏移量,因为获取的时间不会那么精确。现在如果距离最后一次收到服务器的消息(心跳应答和其他消息)的时间间隔超过了保活时间间隔加上偏移量,认为超时了,就抛出异常
第二种超时情况:pingOutstanding == 0,代表客户端发送的心跳应答现在都收到了。如果现在距离上次发送消息的时间超过了2倍的保活时间,也认为超时,抛出异常。
发送心跳消息的时机:上面两种超时时间的情况排除之后,如果所有的心跳消息都收到的情况下,现在的时间和上次收到消息的时间超过了保活时间减去偏移量的值,这种情况认为需要再次发送心跳。还有现在的时间距离上次发送消息的时间超过了保活时间减去偏移量的值,这种情况也认为需要再次发送心跳。
如果上面的情况都不属于,重新计算了下次检查心跳的时间,等待再次检查。
如果保活时间超时了,怎么处理?
public MqttToken checkForActivity(){
MqttToken token = null;
try{
token = clientState.checkForActivity();
}catch(MqttException e){
handleRunException(e);
}catch(Exception e){
handleRunException(e);
}
return token;
}
…………
private void handleRunException(Exception ex) {
final String methodName = "handleRunException";
//@TRACE 804=exception
log.fine(CLASS_NAME,methodName,"804",null, ex);
MqttException mex;
if ( !(ex instanceof MqttException)) {
mex = new MqttException(MqttException.REASON_CODE_CONNECTION_LOST, ex);
} else {
mex = (MqttException)ex;
}
shutdownConnection(null, mex);
}
…………
…………
public void shutdownConnection(MqttToken token, MqttException reason) {
final String methodName = "shutdownConnection";
boolean wasConnected;
MqttToken endToken = null; //Token to notify after disconnect completes
// This method could concurrently be invoked from many places only allow it
// to run once.
synchronized(conLock) {
if (stoppingComms || closePending) {
return;
}
stoppingComms = true;
//@TRACE 216=state=DISCONNECTING
log.fine(CLASS_NAME,methodName,"216");
wasConnected = (isConnected() || isDisconnecting());
conState = DISCONNECTING;
}
// Update the token with the reason for shutdown if it
// is not already complete.
if (token != null && !token.isComplete()) {
token.internalTok.setException(reason);
}
// Stop the thread that is used to call the user back
// when actions complete
if (callback!= null) {callback.stop(); }
// Stop the network module, send and receive now not possible
try {
if (networkModules != null) {
NetworkModule networkModule = networkModules[networkModuleIndex];
if (networkModule != null) {
networkModule.stop();
}
}
} catch (Exception ioe) {
// Ignore as we are shutting down
}
// Stop the thread that handles inbound work from the network
if (receiver != null) {receiver.stop();}
// Stop any new tokens being saved by app and throwing an exception if they do
tokenStore.quiesce(new MqttException(MqttException.REASON_CODE_CLIENT_DISCONNECTING));
// Notify any outstanding tokens with the exception of
// con or discon which may be returned and will be notified at
// the end
endToken = handleOldTokens(token, reason);
try {
// Clean session handling and tidy up
clientState.disconnected(reason);
}catch(Exception ex) {
// Ignore as we are shutting down
}
if (sender != null) { sender.stop(); }
if (pingSender != null){
pingSender.stop();
}
try {
if (persistence != null) {persistence.close();}
}catch(Exception ex) {
// Ignore as we are shutting down
}
// All disconnect logic has been completed allowing the
// client to be marked as disconnected.
synchronized(conLock) {
//@TRACE 217=state=DISCONNECTED
log.fine(CLASS_NAME,methodName,"217");
conState = DISCONNECTED;
stoppingComms = false;
}
// Internal disconnect processing has completed. If there
// is a disconnect token or a connect in error notify
// it now. This is done at the end to allow a new connect
// to be processed and now throw a currently disconnecting error.
// any outstanding tokens and unblock any waiters
if (endToken != null & callback != null) {
callback.asyncOperationComplete(endToken);
}
if (wasConnected && callback != null) {
// Let the user know client has disconnected either normally or abnormally
callback.connectionLost(reason);
}
// While disconnecting, close may have been requested - try it now
synchronized(conLock) {
if (closePending) {
try {
close();
} catch (Exception e) { // ignore any errors as closing
}
}
}
}
上面的方法抛出异常之后,最终会进入到shutdownConnection()方法里面进行处理,主要进行了一下工作。
1、设置conState = DISCONNECTING
2、停止CommsCallback线程
3、关闭TCP连接
4、停止CommsReceiver线程
5、对于Token库中的还未处理的Token的处理
6、ClientState类对象的disconnected方法执行
7、停止CommsSender线程
8、停止心跳线程
9、持久化管理对象的关闭
10、设置conState = DISCONNECTED
11、接口回调,通知用户连接断开
看一下CommsCallback线程的关闭,代码如下:
CommsCallback.java
/**
* Stops the callback thread.
* This call will block until stop has completed.
*/
public void stop() {
final String methodName = "stop";
synchronized (lifecycle) {
if (running) {
// @TRACE 700=stopping
log.fine(CLASS_NAME, methodName, "700");
running = false;
if (!Thread.currentThread().equals(callbackThread)) {
try {
synchronized (workAvailable) {
// @TRACE 701=notify workAvailable and wait for run
// to finish
log.fine(CLASS_NAME, methodName, "701");
workAvailable.notifyAll();
}
// Wait for the thread to finish.
callbackThread.join();
} catch (InterruptedException ex) {
}
}
}
callbackThread = null;
// @TRACE 703=stopped
log.fine(CLASS_NAME, methodName, "703");
}
}
可以看到代码里判断,如果停止该线程的线程和CommsCallback线程不是同一个线程的话,调用了callbackThread.join()来在当前线程里面完成CommsCallback线程的关闭之后再接着执行。同样后面的CommsReceiver线程和CommsSender线程的关闭也是同样的需要调用线程Thread的成员函数join()方法。
Token库中的还未处理的Token的处理,
ClientComms.java
private MqttToken handleOldTokens(MqttToken token, MqttException reason) {
final String methodName = "handleOldTokens";
//@TRACE 222=>
log.fine(CLASS_NAME,methodName,"222");
MqttToken tokToNotifyLater = null;
try {
// First the token that was related to the disconnect / shutdown may
// not be in the token table - temporarily add it if not
if (token != null) {
if (tokenStore.getToken(token.internalTok.getKey())==null) {
tokenStore.saveToken(token, token.internalTok.getKey());
}
}
Vector toksToNot = clientState.resolveOldTokens(reason);
Enumeration toksToNotE = toksToNot.elements();
while(toksToNotE.hasMoreElements()) {
MqttToken tok = (MqttToken)toksToNotE.nextElement();
if (tok.internalTok.getKey().equals(MqttDisconnect.KEY) ||
tok.internalTok.getKey().equals(MqttConnect.KEY)) {
// Its con or discon so remember and notify @ end of disc routine
tokToNotifyLater = tok;
} else {
// notify waiters and callbacks of outstanding tokens
// that a problem has occurred and disconnect is in
// progress
callback.asyncOperationComplete(tok);
}
}
}catch(Exception ex) {
// Ignore as we are shutting down
}
return tokToNotifyLater;
}
ClientState.java
public Vector resolveOldTokens(MqttException reason) {
final String methodName = "resolveOldTokens";
//@TRACE 632=reason {0}
log.fine(CLASS_NAME,methodName,"632", new Object[] {reason});
// If any outstanding let the user know the reason why it is still
// outstanding by putting the reason shutdown is occurring into the
// token.
MqttException shutReason = reason;
if (reason == null) {
shutReason = new MqttException(MqttException.REASON_CODE_CLIENT_DISCONNECTING);
}
// Set the token up so it is ready to be notified after disconnect
// processing has completed. Do not
// remove the token from the store if it is a delivery token, it is
// valid after a reconnect.
Vector outT = tokenStore.getOutstandingTokens();
Enumeration outTE = outT.elements();
while (outTE.hasMoreElements()) {
MqttToken tok = (MqttToken)outTE.nextElement();
synchronized (tok) {
if (!tok.isComplete() && !tok.internalTok.isCompletePending() && tok.getException() == null) {
tok.internalTok.setException(shutReason);
}
}
if (!(tok instanceof MqttDeliveryToken)) {
// If not a delivery token it is not valid on
// restart so remove
tokenStore.removeToken(tok.internalTok.getKey());
}
}
return outT;
}
这块的处理主要是将tokenStore里面的MqttToken放到Vector集合中,并且将tokenStore中的不是MqttDeliveryToken类型的删除,如果集合中存在MqttDisconnect或MqttConnect类型的对应的Token,通过这个方法返回,等到后面再唤醒在这个上面睡眠的线程,而对于其他的MqttToken都通过callback.asyncOperationComplete(tok)方法来唤醒。注意的是,现在CommsCallback线程现在已经停止了,是在当前线程中执行的。
ClientState类对象的disconnected方法执行
ClientState.java
/**
* Called when the client has been disconnected from the broker.
* @param reason The root cause of the disconnection, or null if it is a clean disconnect
*/
public void disconnected(MqttException reason) {
final String methodName = "disconnected";
//@TRACE 633=disconnected
log.fine(CLASS_NAME,methodName,"633", new Object[] {reason});
this.connected = false;
try {
if (cleanSession) {
clearState();
}
pendingMessages.clear();
pendingFlows.clear();
synchronized (pingOutstandingLock) {
// Reset pingOutstanding to allow reconnects to assume no previous ping.
pingOutstanding = 0;
}
} catch (MqttException e) {
// Ignore as we have disconnected at this point
}
}
执行一些清理工作,主要包括一些存储消息的集合。
3.6 其他的一些记录
静默状态相关
private boolean quiescing = false
在调用和服务器断开连接的时候,会设置为这种状态,在这个状态下不会处理收到的消息。
ClientState.java
/**
* Quiesce the client state, preventing any new messages getting sent,
* and preventing the callback on any newly received messages.
* After the timeout expires, delete any pending messages except for
* outbound ACKs, and wait for those ACKs to complete.
*/
public void quiesce(long timeout) {
final String methodName = "quiesce";
// If the timeout is greater than zero t
if (timeout > 0 ) {
//@TRACE 637=timeout={0}
log.fine(CLASS_NAME,methodName, "637",new Object[]{new Long(timeout)});
synchronized (queueLock) {
this.quiescing = true;
}
// We don't want to handle any new inbound messages
callback.quiesce();
notifyQueueLock();
synchronized (quiesceLock) {
try {
// If token count is not zero there is outbound work to process and
// if pending flows is not zero there is outstanding work to complete and
// if call back is not quiseced there it needs to complete.
int tokc = tokenStore.count();
if (tokc > 0 || pendingFlows.size() >0 || !callback.isQuiesced()) {
//@TRACE 639=wait for outstanding: actualInFlight={0} pendingFlows={1} inFlightPubRels={2} tokens={3}
log.fine(CLASS_NAME, methodName,"639", new Object[]{new Integer(actualInFlight), new Integer(pendingFlows.size()), new Integer(inFlightPubRels), new Integer(tokc)});
// wait for outstanding in flight messages to complete and
// any pending flows to complete
quiesceLock.wait(timeout);
}
}
catch (InterruptedException ex) {
// Don't care, as we're shutting down anyway
}
}
// Quiesce time up or inflight messages delivered. Ensure pending delivery
// vectors are cleared ready for disconnect to be sent as the final flow.
synchronized (queueLock) {
pendingMessages.clear();
pendingFlows.clear();
quiescing = false;
actualInFlight = 0;
}
//@TRACE 640=finished
log.fine(CLASS_NAME, methodName, "640");
}
}
可见如果发送消息没有处理完毕或者收到的消息没有处理完毕的时候,线程会先睡眠等待一段时间之后,将待发送的消息集合给清空,并且将静默状态给恢复。
ClientState.java
protected boolean checkQuiesceLock() {
final String methodName = "checkQuiesceLock";
// if (quiescing && actualInFlight == 0 && pendingFlows.size() == 0 && inFlightPubRels == 0 && callback.isQuiesced()) {
int tokC = tokenStore.count();
if (quiescing && tokC == 0 && pendingFlows.size() == 0 && callback.isQuiesced()) {
//@TRACE 626=quiescing={0} actualInFlight={1} pendingFlows={2} inFlightPubRels={3} callbackQuiesce={4} tokens={5}
log.fine(CLASS_NAME,methodName,"626",new Object[]{new Boolean(quiescing), new Integer(actualInFlight), new Integer(pendingFlows.size()), new Integer(inFlightPubRels), Boolean.valueOf(callback.isQuiesced()), new Integer(tokC)});
synchronized (quiesceLock) {
quiesceLock.notifyAll();
}
return true;
}
return false;
}
如果在设置静默状态线程睡眠等待这段时间内,其他的线程检测到待发送的消息和收到的消息的状态变化了的话,并且已经处理完毕了,会提前唤醒设置静默的线程。
3.7 结尾
这个库代码大概整理下来之后,发现该类库主要是通过使用java的Object对象的wait()和notifyAll()方法,一些线程安全的集合来实现多线程的交互协作,来实现相应的功能。该笔记大部分分析的是一些正常情况下的处理流程,并没有太深入细节,后期还待继续完善。
最后
以上就是文静香菇为你收集整理的MQTT使用的笔记(结合源代码)MQTT使用笔记(结合类库源代码)的全部内容,希望文章能够帮你解决MQTT使用的笔记(结合源代码)MQTT使用笔记(结合类库源代码)所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复