概述
起因还是上一篇博客,发现原来补心跳的方法没了,看了一下源码:
MqttAsyncClient
/* (non-Javadoc)
* Check and send a ping if needed.
* <p>By default, client sends PingReq to server to keep the connection to
* server. For some platforms which cannot use this mechanism, such as Android,
* developer needs to handle the ping request manually with this method.
* </p>
*
* @throws MqttException for other errors encountered while publishing the message.
*/
public IMqttToken checkPing(Object userContext,
IMqttActionListener callback) throws MqttException{
final String methodName = "ping";
MqttToken token;
//@TRACE 117=>
log.fine(CLASS_NAME,methodName,"117");
token = comms.checkForActivity();
//@TRACE 118=<
log.fine(CLASS_NAME,methodName,"118");
return token;
}
ClientComms
/*
* Check and send a ping if needed and check for ping timeout.
* Need to send a ping if nothing has been sent or received
* in the last keepalive interval.
* Passes an IMqttActionListener to ClientState.checkForActivity
* so that the callbacks are attached as soon as the token is created
* (Bug 473928)
*/
public MqttToken checkForActivity(IMqttActionListener pingCallback){
MqttToken token = null;
try{
token = clientState.checkForActivity(pingCallback);
}catch(MqttException e){
handleRunException(e);
}catch(Exception e){
handleRunException(e);
}
return token;
}
ClientState
/**
* Check and send a ping if needed and check for ping timeout.
* Need to send a ping if nothing has been sent or received
* in the last keepalive interval. It is important to check for
* both sent and received packets in order to catch the case where an
* app is solely sending QoS 0 messages or receiving QoS 0 messages.
* QoS 0 message are not good enough for checking a connection is
* alive as they are one way messages.
*
* If a ping has been sent but no data has been received in the
* last keepalive interval then the connection is deamed to be broken.
*
* @return token of ping command, null if no ping command has been sent.
*/
public MqttToken checkForActivity(IMqttActionListener pingCallback)
throws MqttException {
final String methodName = "checkForActivity";
//@TRACE 616=checkForActivity entered
log.fine(CLASS_NAME,methodName,"616", new Object[]{});
synchronized (quiesceLock) {
// ref bug: https://bugs.eclipse.org/bugs/show_bug.cgi?id=440698
// No ping while quiescing
if (quiescing) {
return null;
}
}
MqttToken token = null;
long nextPingTime = getKeepAlive();
if (connected && this.keepAlive > 0) {
long time = System.currentTimeMillis();
//Reduce schedule frequency since
//System.currentTimeMillis is no accurate, add a buffer
//It is 1/10 in minimum keepalive unit.
int delta = 100;
// ref bug: https://bugs.eclipse.org/bugs/show_bug.cgi?id=446663
synchronized (pingOutstandingLock) {
// Is the broker connection lost because the broker did not reply to my ping?
if (pingOutstanding > 0 && (time - lastInboundActivity >= keepAlive + delta)) {
// lastInboundActivity will be updated once receiving is done.
// Add a delta, since the timer and
//System.currentTimeMillis() is not accurate.
// A ping is outstanding but no packet has been received
// in KA so connection is deemed broken
//@TRACE 619=Timed out as no activity, keepAlive={0}
// lastOutboundActivity={1} lastInboundActivity={2} time={3} lastPing={4}
log.severe(CLASS_NAME,methodName,"619", new Object[]{
new Long(this.keepAlive),
new Long(lastOutboundActivity),
new Long(lastInboundActivity),
new Long(time), new Long(lastPing)});
// A ping has already been sent. At this point, assume that the
// broker has hung and the TCP layer hasn't noticed.
throw ExceptionHelper.createMqttException(
MqttException.REASON_CODE_CLIENT_TIMEOUT);
}
// Is the broker connection lost because I could not get
//any successful write for 2 keepAlive intervals?
if (pingOutstanding == 0 &&
(time - lastOutboundActivity >= 2*keepAlive)) {
// I am probably blocked on a write operations as
//I should have been able to write at least a ping message
log.severe(CLASS_NAME,methodName,"642",
new Object[]{new Long(this.keepAlive),
new Long(lastOutboundActivity),
new Long(lastInboundActivity),
new Long(time), new Long(lastPing)});
// A ping has not been sent but I am not
// progressing on the current write operation.
// At this point, assume that the broker
// has hung and the TCP layer hasn't noticed.
throw ExceptionHelper.createMqttException(
MqttException.REASON_CODE_WRITE_TIMEOUT);
}
// 1. Is a ping required by the client to
// verify whether the broker is down?
// Condition: ((pingOutstanding == 0 &&
// (time - lastInboundActivity >= keepAlive + delta)))
// In this case only one ping is sent. If not confirmed,
// client will assume a lost connection to the broker.
// 2. Is a ping required by the broker to keep the client alive?
// Condition: (time - lastOutboundActivity >= keepAlive - delta)
// In this case more than one ping outstanding may be necessary.
// This would be the case when receiving a large message;
// the broker needs to keep receiving a regular ping even if
// the ping response are queued after the long message
// If lacking to do so, the broker will consider
// my connection lost and cut my socket.
if ((pingOutstanding == 0 &&
(time - lastInboundActivity >= keepAlive - delta)) ||
(time - lastOutboundActivity >= keepAlive - delta)) {
//@TRACE 620=ping needed. keepAlive={0}
//lastOutboundActivity={1} lastInboundActivity={2}
log.fine(CLASS_NAME,methodName,"620",
new Object[]{new Long(this.keepAlive),
new Long(lastOutboundActivity),
new Long(lastInboundActivity)});
// pingOutstanding++;
// it will be set after the ping has been written on the wire
// lastPing = time;
// it will be set after the ping has been written on the wire
token = new MqttToken(clientComms.getClient().getClientId());
if(pingCallback != null){
token.setActionCallback(pingCallback);
}
tokenStore.saveToken(token, pingCommand);
pendingFlows.insertElementAt(pingCommand, 0);
nextPingTime = getKeepAlive();
//Wake sender thread since it may be in wait state (in ClientState.get())
notifyQueueLock();
}
else {
log.fine(CLASS_NAME, methodName, "634", null);
nextPingTime = Math.max(1, getKeepAlive() -
(time - lastOutboundActivity));
}
}
//@TRACE 624=Schedule next ping at {0}
log.fine(CLASS_NAME, methodName,"624", new Object[]{new Long(nextPingTime)});
pingSender.schedule(nextPingTime);
}
return token;
}
TimerPingSender:
public void schedule(long delayInMilliseconds) {
timer.schedule(new PingTask(), delayInMilliseconds);
}
private class PingTask extends TimerTask {
private static final String methodName = "PingTask.run";
public void run() {
//@Trace 660=Check schedule at {0}
log.fine(CLASS_NAME, methodName, "660",
new Object[]{new Long(System.currentTimeMillis())});
comms.checkForActivity();
}
}
看到ClientComms有点动容,这么有诚意。然而和我要的调用一下就发一个PING好像有点出入,是我打开的方式不对?
补:
I've been doing some work with MQTT on Android and I've experienced exactly the same issue.
As Dale says, the old version of the MQTT client used to have an explicit ping() method, but unfortunately this is now hidden away.
The simplest approach, and the one I use, is to explicitly publish a 1 byte message to a particular topic, to serve as the keepalive. I don't think this should add much to the overhead of your application and, while I'm not familiar with Mosquitto's ACL, I assume you could have every client use the same 'keepalive' topic and just provide write access to all. This shouldn't affect security as long as no-one can read from the topic.
An alternative approach would be to have the server send the client(s) a 'keepalive' message at QoS 1 or 2 (pub/sub through a single topic to all for efficiency) as, due to the QoS flows, this will involve the client sending a message back to the server under the covers; which will serve as the keepalive. This has the advantage of keeping your clients as subscriber only; however it's incompatible with 'clean session = false' (as you would have large amounts of messages queued up for delivery to clients who are offline for a while - needlessly affecting performance on reconnect).
Unfortunately these are the only two workarounds that I can currently think of.
Also, as a brief aside, I've experienced a number of issues using the MqttDefaultFilePersistence on Android, so you might want to be aware of this. In particular to do with file locking and problems when re-instantiating the client. To get around this I've created an implementation of MqttClientPersistence built on top of an SQLite database and this is much more robust; you might want to do the same.
再补:
看漏了一个方法,里面是写了一条线程,在重复等待操作,在
//Wake sender thread since it may be in wait state (in ClientState.get())
notifyQueueLock();
这里释放锁。不过timer那里多了一次循环调用没关系吗?可能是用同步处理了吧?
重复等待在这里写:
public void run() {
final String methodName = "run";
MqttWireMessage message = null;
while (running && (out != null)) {
try {
message = clientState.get();
if (message != null) {
//@TRACE 802=network send key={0} msg={1}
log.fine(CLASS_NAME,methodName,"802",
new Object[] {message.getKey(),message});
if (message instanceof MqttAck) {
out.write(message);
out.flush();
} else {
MqttToken token = tokenStore.getToken(message);
// While quiescing the tokenstore can be cleared so need
// to check for null for the case where clear occurs
// while trying to send a message.
if (token != null) {
synchronized (token) {
out.write(message);
try {
out.flush();
} catch (IOException ex) {
// The flush has been seen to fail
//on disconnect of a SSL socket
// as disconnect is in progress
//this should not be treated as an error
if (!(message instanceof MqttDisconnect)) {
throw ex;
}
}
clientState.notifySent(message);
}
}
}
} else { // null message
//@TRACE 803=get message returned null, stopping}
log.fine(CLASS_NAME,methodName,"803");
running = false;
}
} catch (MqttException me) {
handleRunException(message, me);
} catch (Exception ex) {
handleRunException(message, ex);
}
} // end while
//@TRACE 805=<
log.fine(CLASS_NAME, methodName,"805");
}
while()中,死循环查消息队列,查完等待下一次释放锁
结语:
checkping 还是按照代码注释中描述那样用就行
补,今天读源码发现里面有例子:
/*
* This class sends PingReq packet to MQTT broker
*/
class AlarmReceiver extends BroadcastReceiver {
private WakeLock wakelock;
private String wakeLockTag = MqttServiceConstants.PING_WAKELOCK
+ that.comms.getClient().getClientId();
@Override
public void onReceive(Context context, Intent intent) {
// According to the docs, "Alarm Manager holds a CPU wake lock as
// long as the alarm receiver's onReceive() method is executing.
// This guarantees that the phone will not sleep until you have
// finished handling the broadcast.", but this class still get
// a wake lock to wait for ping finished.
int count = intent.getIntExtra(Intent.EXTRA_ALARM_COUNT, -1);
Log.d(TAG, "Ping " + count + " times.");
Log.d(TAG, "Check time :" + System.currentTimeMillis());
IMqttToken token = comms.checkForActivity();
// No ping has been sent.
if (token == null) {
return;
}
// Assign new callback to token to execute code after PingResq
// arrives. Get another wakelock even receiver already has one,
// release it until ping response returns.
if (wakelock == null) {
PowerManager pm = (PowerManager) service
.getSystemService(Service.POWER_SERVICE);
wakelock = pm.newWakeLock(PowerManager.PARTIAL_WAKE_LOCK,
wakeLockTag);
}
wakelock.acquire();
token.setActionCallback(new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
Log.d(TAG, "Success. Release lock(" + wakeLockTag + "):"
+ System.currentTimeMillis());
//Release wakelock when it is done.
if(wakelock != null && wakelock.isHeld()){
wakelock.release();
}
}
@Override
public void onFailure(IMqttToken asyncActionToken,
Throwable exception) {
Log.d(TAG, "Failure. Release lock(" + wakeLockTag + "):"
+ System.currentTimeMillis());
//Release wakelock when it is done.
if(wakelock != null && wakelock.isHeld()){
wakelock.release();
}
}
});
}
}
补-2016.04.06
二月份做过一次解读,后来也是在那个的基础上做了,部署进系统了,实际测试(wireshark)中发现用这个方法是没有PING信息发出来的。仔细阅读源码之后,发现其实PING是这个库的内部逻辑,作为使用者,可以维护。但涉及到内部逻辑还是让它独立,保持原来的样子。
我对这个库分了三层:
第一层是耦合库的东西,扩展了库接口,实现了库方法。
第二层是维护库的东西,例如客户端断网、MQTT连接失败、MQTT关闭、重连等。
第三层是用户通知层,主要扩展publishArrive的接口,把原来提醒的逻辑搬到这里。
PING怎么办呢?我重写了它的模板类,TimerPingSender,用alarmManger来定时,几部机器测试了几个下午,能稳定工作,耗电量比之前小了一点。
最后
以上就是隐形西装为你收集整理的paho.mqtt 之 PING的全部内容,希望文章能够帮你解决paho.mqtt 之 PING所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复