概述
1、TCP的read和write 都采用 select 机制进行
这里使用 select 可以实现至少两种功能:
(1)select 可以检测 socket状态,如果select返回为负,说明当前socket异常,就不会再进行read/write
这样就能有效的避免 SIGPIPE 带来的终止进程操作。
(2)select 还可以检测当前socket是否拥堵,这样就变相的实现了 同一个socket 的“并发”读写。
2、由于 MQTT协议标准,所以在进行“读”socket操作时,步骤:
(1)先读1个“帧头”,确定类型。
(2)读1~4个字节,表示后续字节长度。
(3)根据步骤2中读到的字节长度,再读取剩余的字节
MQTT的数据帧长度是有规律的,所以可以TCP的 read可以采用select进行读指定长度。
3、mqtt yield 循环机制设计的非常好
伪代码逻辑:
int wrapper_mqtt_yield(void *client, int timeout_ms)
{
// Keep alive功能,通过发送循环发送 ping 指令给broker来检测网络状态
// 如果TCP read/write 返回错误则直接设定 状态为"disconnect",然后发起重连
// 如果是因为超时,则进行超时次数统计,通过变量
// pClient->keepalive_probes++; 进行判断,只要keepalive_probes 值大于2,则
// 认为 net 出现故障,发起重连
iotx_mc_keepalive(client);
// 对socket 进行 循环 “读”, 监听broker 返回的所有数据
// 如果返回的 是 ping 回复,则对 keepalive_probes进行清0(不清0意味着将要超时)
// 如果返回的 是 broker返回的数据,则进行 消息推送到 对应的映射关系的回调函数
// 如果返回的是 > qos1 的pub 消息对应 msgid,则清除确认 对应的消息发送完成
// 同时将 pub 的 topic 的状态 设置为 invalid
_mqtt_cycle(client)
// 检查 等待发送的消息链表ACK,如果 接收到 对应消息msgid的 ACK,
// 检查节点的状态,如果是 invalid,则从发送链表中清除对应的节点,
// 这样就能实现 qos > 1的机制
// 从函数名称也能看出来,这个是 用于 消息 pub 的 qos保证 的实现。
MQTTPubInfoProc(client);
}
最后
以上就是耍酷云朵为你收集整理的阿里云IOT SDK中的MQTT稳定架构分析的全部内容,希望文章能够帮你解决阿里云IOT SDK中的MQTT稳定架构分析所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复