概述
MQTT
(消息队列遥测传输)是ISO 标准(ISO/IEC PRF 20922
)下基于发布/订阅范式的消息协议。它工作在TCP/IP
协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议
。
MQTT
是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT
协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M
)通信和物联网(IoT
)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。
更多关于MQTT协议的介绍可自行百度,下文主要介绍使用libhv
中MQTT
的实现与使用。
文章目录
- MQTT实现
- 协议定义
- 接口定义
- mqtt_client_run
- mqtt_client_stop
- mqtt_client_connect 连接
- mqtt_client_disconnect 断开连接
- 连接回调 on_connect
- 断链回调 on_close + 重连 mqtt_client_reconnect
- 包回调 on_packet
- 使用示例
- 编译
- broker
- 订阅端
- 发布端
- 代码流程图
MQTT实现
实现代码位于 mqtt目录
整个实现只有不到700行,以往教程中我是很少讲源码解析的,但MQTT的实现非常精简且经典,非常适合用来学习,从而掌握使用libhv开发自定义应用层协议
的技巧。
MQTT固定头格式(1字节flags
+ varint
编码的长度字段):
0 | 1-2 | 3 | 4-7 |
---|---|---|---|
retain | qos | dup | type |
最简单的MQTT消息(比如长度为0的PING
消息),只需2个字节,所以MQTT协议是非常精简的协议。
协议定义
见 mqtt_protocol.h
#define DEFAULT_MQTT_PORT 1883
#define MQTT_PROTOCOL_V31 3
#define MQTT_PROTOCOL_V311 4
#define MQTT_PROTOCOL_V5 5
#define MQTT_PROTOCOL_NAME "MQTT"
#define MQTT_PROTOCOL_NAME_v31 "MQIsdp"
/*
* connect flags
* 0 1 2 3-4 5 6 7
* reserved clean_session has_will will_qos will_retain has_password has_username
*/
#define MQTT_CONN_CLEAN_SESSION 0x02
#define MQTT_CONN_HAS_WILL 0x04
#define MQTT_CONN_WILL_RETAIN 0x20
#define MQTT_CONN_HAS_PASSWORD 0x40
#define MQTT_CONN_HAS_USERNAME 0x80
typedef enum {
MQTT_TYPE_CONNECT = 1,
MQTT_TYPE_CONNACK = 2,
MQTT_TYPE_PUBLISH = 3,
MQTT_TYPE_PUBACK = 4,
MQTT_TYPE_PUBREC = 5,
MQTT_TYPE_PUBREL = 6,
MQTT_TYPE_PUBCOMP = 7,
MQTT_TYPE_SUBSCRIBE = 8,
MQTT_TYPE_SUBACK = 9,
MQTT_TYPE_UNSUBSCRIBE = 10,
MQTT_TYPE_UNSUBACK = 11,
MQTT_TYPE_PINGREQ = 12,
MQTT_TYPE_PINGRESP = 13,
MQTT_TYPE_DISCONNECT = 14,
} mqtt_type_e;
typedef enum {
MQTT_CONNACK_ACCEPTED = 0,
MQTT_CONNACK_REFUSED_PROTOCOL_VERSION = 1,
MQTT_CONNACK_REFUSED_IDENTIFIER_REJECTED = 2,
MQTT_CONNACK_REFUSED_SERVER_UNAVAILABLE = 3,
MQTT_CONNACK_REFUSED_BAD_USERNAME_PASSWORD = 4,
MQTT_CONNACK_REFUSED_NOT_AUTHORIZED = 5,
} mqtt_connack_e;
typedef struct mqtt_head_s {
unsigned char type: 4;
unsigned char dup: 1;
unsigned char qos: 2;
unsigned char retain: 1;
unsigned int length;
} mqtt_head_t;
typedef struct mqtt_message_s {
unsigned int topic_len;
const char* topic;
unsigned int payload_len;
const char* payload;
unsigned char qos;
unsigned char retain;
} mqtt_message_t;
头文件里主要定义了mqtt_type_e
消息类型、mqtt_head_t
头部、mqtt_message_t
消息。
协议里稍微复杂的是MQTT_TYPE_CONNECT
,即连接成功后发送的登录认证消息。
MQTT连接标示:
0 | 1 | 2 | 3-4 | 5 | 6 | 7 |
---|---|---|---|---|---|---|
reserved | clean_session | has_will | will_qos | will_retain | has_password | has_username |
接口定义
见 mqtt_client.h
typedef struct mqtt_client_s mqtt_client_t;
// @type mqtt_type_e
// @example examples/mqtt
typedef void (*mqtt_client_cb)(mqtt_client_t* cli, int type);
struct mqtt_client_s {
// connect: host:port
char host[256];
int port;
// reconnect
reconn_setting_t* reconn_setting;
// login: flags + keepalive + client_id + will + username + password
// flags
unsigned char protocol_version; // Default MQTT_PROTOCOL_V311
unsigned char clean_session: 1;
unsigned char ssl: 1; // Read Only
unsigned char alloced_ssl_ctx: 1; // intern
unsigned short keepalive;
char client_id[64];
// will
mqtt_message_t* will;
// auth
char username[64];
char password[64];
// message
mqtt_head_t head;
int error; // for MQTT_TYPE_CONNACK
int mid; // for MQTT_TYPE_SUBACK, MQTT_TYPE_PUBACK
mqtt_message_t message; // for MQTT_TYPE_PUBLISH
// callback
mqtt_client_cb cb;
// userdata
void* userdata;
// privdata
hloop_t* loop;
hio_t* io;
htimer_t* reconn_timer;
// SSL/TLS
hssl_ctx_t ssl_ctx;
// thread-safe
hmutex_t mutex_;
};
// hloop_new -> malloc(mqtt_client_t)
HV_EXPORT mqtt_client_t* mqtt_client_new(hloop_t* loop DEFAULT(NULL));
// @see hloop_run
HV_EXPORT void mqtt_client_run (mqtt_client_t* cli);
// @see hloop_stop
HV_EXPORT void mqtt_client_stop(mqtt_client_t* cli);
// hloop_free -> free(mqtt_client_t)
HV_EXPORT void mqtt_client_free(mqtt_client_t* cli);
// id
HV_EXPORT void mqtt_client_set_id(mqtt_client_t* cli, const char* id);
// will
HV_EXPORT void mqtt_client_set_will(mqtt_client_t* cli,
mqtt_message_t* will);
// auth
HV_EXPORT void mqtt_client_set_auth(mqtt_client_t* cli,
const char* username, const char* password);
// callback
HV_EXPORT void mqtt_client_set_callback(mqtt_client_t* cli, mqtt_client_cb cb);
// userdata
HV_EXPORT void mqtt_client_set_userdata(mqtt_client_t* cli, void* userdata);
HV_EXPORT void* mqtt_client_get_userdata(mqtt_client_t* cli);
// error
HV_EXPORT int mqtt_client_get_last_error(mqtt_client_t* cli);
// SSL/TLS
HV_EXPORT int mqtt_client_set_ssl_ctx(mqtt_client_t* cli, hssl_ctx_t ssl_ctx);
// hssl_ctx_new(opt) -> mqtt_client_set_ssl_ctx
HV_EXPORT int mqtt_client_new_ssl_ctx(mqtt_client_t* cli, hssl_ctx_opt_t* opt);
// reconnect
HV_EXPORT int mqtt_client_set_reconnect(mqtt_client_t* cli,
reconn_setting_t* reconn);
HV_EXPORT int mqtt_client_reconnect(mqtt_client_t* cli);
// connect
// hio_create_socket -> hio_connect ->
// on_connect -> mqtt_client_login ->
// on_connack
HV_EXPORT int mqtt_client_connect(mqtt_client_t* cli,
const char* host,
int port DEFAULT(DEFAULT_MQTT_PORT),
int ssl DEFAULT(0));
// disconnect
// @see hio_close
HV_EXPORT int mqtt_client_disconnect(mqtt_client_t* cli);
// publish
HV_EXPORT int mqtt_client_publish(mqtt_client_t* cli,
mqtt_message_t* msg);
// subscribe
HV_EXPORT int mqtt_client_subscribe(mqtt_client_t* cli,
const char* topic, int qos DEFAULT(0));
// unsubscribe
HV_EXPORT int mqtt_client_unsubscribe(mqtt_client_t* cli,
const char* topic);
接口列表:
mqtt_client_new
:新建MQTT客户端结构体mqtt_client_free
:释放MQTT客户端结构体mqtt_client_run
:运行MQTT客户端mqtt_client_stop
:停止MQTT客户端mqtt_client_set_id
:设置客户端IDmqtt_client_set_will
:设置遗嘱mqtt_client_set_auth
:设置认证用户名密码mqtt_client_set_callback
:设置回调mqtt_client_set_userdata
:设置用户数据mqtt_client_get_userdata
:获取用户数据mqtt_client_get_last_error
:获取最后的错误码mqtt_client_set_ssl_ctx
:设置SSL_CTX(用于SSL/TLS加密通信)mqtt_client_new_ssl_ctx
:新建SSL_CTXmqtt_client_set_reconnect
:设置重连mqtt_client_reconnect
:重连mqtt_client_connect
:开始连接mqtt_client_disconnect
:断开连接mqtt_client_publish
:发布mqtt_client_subscribe
:订阅mqtt_client_unsubscribe
:取消订阅
下面我们介绍几个主要接口的实现:
mqtt_client_run
void mqtt_client_run (mqtt_client_t* cli) {
if (!cli || !cli->loop) return;
hloop_run(cli->loop);
}
mqtt_client_run
就是调用hloop_run
,启动一个事件循环;
mqtt_client_stop
void mqtt_client_stop(mqtt_client_t* cli) {
if (!cli || !cli->loop) return;
hloop_stop(cli->loop);
}
mqtt_client_stop
就是调用 hloop_stop
,停止事件循环;
mqtt_client_connect 连接
int mqtt_client_connect(mqtt_client_t* cli, const char* host, int port, int ssl) {
if (!cli) return -1;
safe_strncpy(cli->host, host, sizeof(cli->host));
cli->port = port;
cli->ssl = ssl;
hio_t* io = hio_create_socket(cli->loop, host, port, HIO_TYPE_TCP, HIO_CLIENT_SIDE);
if (io == NULL) return -1;
if (ssl) {
if (cli->ssl_ctx) {
hio_set_ssl_ctx(io, cli->ssl_ctx);
}
hio_enable_ssl(io);
}
cli->io = io;
hevent_set_userdata(io, cli);
hio_setcb_connect(io, on_connect);
hio_setcb_close(io, on_close);
return hio_connect(io);
}
hio_create_socket
,创建一个套接字,返回一个hio_t
对象;hio_enable_ssl
启用SSL/TLS
,当然你可以调用mqtt_client_new_ssl_ctx
去设置证书等选项;hevent_set_userdata
:设置用户数据(绑定上下文);hio_setcb_connect
:设置连接回调函数;hio_setcb_close
:设置关闭回调函数;hio_connect
:开始连接;
mqtt_client_disconnect 断开连接
int mqtt_client_disconnect(mqtt_client_t* cli) {
if (!cli || !cli->io) return -1;
// cancel reconnect first
mqtt_client_set_reconnect(cli, NULL);
mqtt_send_disconnect(cli->io);
return hio_close(cli->io);
}
mqtt_client_set_reconnect
:因为是主动断开连接,首先取消重连来避免触发断线重连;mqtt_send_disconnect
:发送一个MQTT主动断开连接的消息;hio_close
:关闭连接;
连接回调 on_connect
static void on_connect(hio_t* io) {
mqtt_client_t* cli = (mqtt_client_t*)hevent_userdata(io);
if (cli->cb) {
cli->head.type = MQTT_TYPE_CONNECT;
cli->cb(cli, cli->head.type);
}
if (cli->reconn_setting) {
reconn_setting_reset(cli->reconn_setting);
}
static unpack_setting_t mqtt_unpack_setting;
mqtt_unpack_setting.mode = UNPACK_BY_LENGTH_FIELD;
mqtt_unpack_setting.package_max_length = DEFAULT_MQTT_PACKAGE_MAX_LENGTH;
mqtt_unpack_setting.body_offset = 2;
mqtt_unpack_setting.length_field_offset = 1;
mqtt_unpack_setting.length_field_bytes = 1;
mqtt_unpack_setting.length_field_coding = ENCODE_BY_VARINT;
hio_set_unpack(io, &mqtt_unpack_setting);
// start recv packet
hio_setcb_read(io, on_packet);
hio_read(io);
mqtt_client_login(cli);
}
hio_set_unpack
: 设置拆包规则,支持固定包长
、分隔符
、头部长度字段
三种常见的拆包方式,调用该接口设置拆包规则后,内部会根据拆包规则处理粘包与分包,保证回调上来的是完整的一包数据,大大节省了上层处理粘包与分包的成本,MQTT
协议对应头部长度字段
这种拆包规则,关于该接口的更详细介绍见 libhv教程14–200行实现一个纯C版jsonrpc框架hio_setcb_read
:设置读回调,因为上面设置了拆包规则,回调上来的就是完整的一包数据,所以叫on_packet
;hio_read
:开始读;mqtt_client_login
:MQTT登录认证;
断链回调 on_close + 重连 mqtt_client_reconnect
static void reconnect_timer_cb(htimer_t* timer) {
mqtt_client_t* cli = (mqtt_client_t*)hevent_userdata(timer);
if (cli == NULL) return;
cli->reconn_timer = NULL;
mqtt_client_reconnect(cli);
}
static void on_close(hio_t* io) {
mqtt_client_t* cli = (mqtt_client_t*)hevent_userdata(io);
if (cli->cb) {
cli->head.type = MQTT_TYPE_DISCONNECT;
cli->cb(cli, cli->head.type);
}
// reconnect
if (cli->reconn_setting && reconn_setting_can_retry(cli->reconn_setting)) {
uint32_t delay = reconn_setting_calc_delay(cli->reconn_setting);
cli->reconn_timer = htimer_add(cli->loop, reconnect_timer_cb, delay, 1);
hevent_set_userdata(cli->reconn_timer, cli);
}
}
int mqtt_client_reconnect(mqtt_client_t* cli) {
mqtt_client_connect(cli, cli->host, cli->port, cli->ssl);
return 0;
}
断链回调里除了调用cli->cb
通知上层掉线外,另外就是判断如果设置了重连,则启动一个定时器一段时间后再尝试重连;
reconn_setting_can_retry
:判断是否还有剩余重连次数;reconn_setting_calc_delay
:计算重连延时;htimer_add
:添加一个定时器;mqtt_client_reconnect
:因为连接时已经记录了host、port、ssl
等信息,重连就是再次调用mqtt_client_connect
;
包回调 on_packet
static void on_packet(hio_t* io, void* buf, int len) {
mqtt_client_t* cli = (mqtt_client_t*)hevent_userdata(io);
unsigned char* p = (unsigned char*)buf;
unsigned char* end = p + len;
memset(&cli->head, 0, sizeof(mqtt_head_t));
int headlen = mqtt_head_unpack(&cli->head, p, len);
if (headlen <= 0) return;
p += headlen;
switch (cli->head.type) {
// case MQTT_TYPE_CONNECT:
case MQTT_TYPE_CONNACK:
{
if (cli->head.length < 2) {
hloge("MQTT CONNACK malformed!");
hio_close(io);
return;
}
unsigned char conn_flags = 0, rc = 0;
POP8(p, conn_flags);
POP8(p, rc);
if (rc != MQTT_CONNACK_ACCEPTED) {
cli->error = rc;
hloge("MQTT CONNACK error=%d", cli->error);
hio_close(io);
return;
}
if (cli->keepalive) {
hio_set_heartbeat(io, cli->keepalive * 1000, mqtt_send_ping);
}
}
break;
case MQTT_TYPE_PUBLISH:
{
if (cli->head.length < 2) {
hloge("MQTT PUBLISH malformed!");
hio_close(io);
return;
}
memset(&cli->message, 0, sizeof(mqtt_message_t));
POP16(p, cli->message.topic_len);
if (end - p < cli->message.topic_len) {
hloge("MQTT PUBLISH malformed!");
hio_close(io);
return;
}
// NOTE: Not deep copy
cli->message.topic = (char*)p;
p += cli->message.topic_len;
if (cli->head.qos > 0) {
if (end - p < 2) {
hloge("MQTT PUBLISH malformed!");
hio_close(io);
return;
}
POP16(p, cli->mid);
}
cli->message.payload_len = end - p;
if (cli->message.payload_len > 0) {
// NOTE: Not deep copy
cli->message.payload = (char*)p;
}
cli->message.qos = cli->head.qos;
if (cli->message.qos == 0) {
// Do nothing
} else if (cli->message.qos == 1) {
mqtt_send_head_with_mid(io, MQTT_TYPE_PUBACK, cli->mid);
} else if (cli->message.qos == 2) {
mqtt_send_head_with_mid(io, MQTT_TYPE_PUBREC, cli->mid);
}
}
break;
case MQTT_TYPE_PUBACK:
case MQTT_TYPE_PUBREC:
case MQTT_TYPE_PUBREL:
case MQTT_TYPE_PUBCOMP:
{
if (cli->head.length < 2) {
hloge("MQTT PUBACK malformed!");
hio_close(io);
return;
}
POP16(p, cli->mid);
if (cli->head.type == MQTT_TYPE_PUBREC) {
mqtt_send_head_with_mid(io, MQTT_TYPE_PUBREL, cli->mid);
} else if (cli->head.type == MQTT_TYPE_PUBREL) {
mqtt_send_head_with_mid(io, MQTT_TYPE_PUBCOMP, cli->mid);
}
}
break;
// case MQTT_TYPE_SUBSCRIBE:
// break;
case MQTT_TYPE_SUBACK:
{
if (cli->head.length < 2) {
hloge("MQTT SUBACK malformed!");
hio_close(io);
return;
}
POP16(p, cli->mid);
}
break;
// case MQTT_TYPE_UNSUBSCRIBE:
// break;
case MQTT_TYPE_UNSUBACK:
{
if (cli->head.length < 2) {
hloge("MQTT UNSUBACK malformed!");
hio_close(io);
return;
}
POP16(p, cli->mid);
}
break;
case MQTT_TYPE_PINGREQ:
mqtt_send_pong(io);
return;
case MQTT_TYPE_PINGRESP:
return;
case MQTT_TYPE_DISCONNECT:
hio_close(io);
return;
default:
hloge("MQTT client received wrong type=%d", (int)cli->head.type);
hio_close(io);
return;
}
if (cli->cb) {
cli->cb(cli, cli->head.type);
}
}
包回调里,就是先调用mqtt_head_unpack
解析MQTT头部,然后根据消息类型做对应处理,都是些协议相关的细节了,值得一提的是hio_set_heartbeat
设置了一个心跳函数,每隔一段时间发送一个MQTT应用层心跳包
来保活。
使用示例
见 examples/mqtt
编译
git clone https://gitee.com/libhv/libhv.git
./configure --with-mqtt
make
broker
可以使用 mosquitto download
订阅端
bin/mqtt_sub 127.0.0.1 1883 hello
发布端
bin/mqtt_pub 127.0.0.1 1883 hello world
代码流程图
最后
以上就是干净八宝粥为你收集整理的libhv教程19--MQTT的实现与使用的全部内容,希望文章能够帮你解决libhv教程19--MQTT的实现与使用所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复