概述
基于MQTT做了一个小项目,记录一下完成过程,主要实现功能为用户可以在手机端和电脑远端:获取开发板的芯片温度、控制led灯的亮灭以及发光模式、播放想播放的音频。
水平有限,仅做自我学习记录。
一.主要内容
主要是编写一个基于mqtt协议的客户端程序,来实现相应的功能。
①搭建好连接mqtt服务器的环境(服务器用的是然也的公用服务器,网上可以查到)。
②连接好mqtt服务器后,订阅相关主题,比如led主题来接受远端的控制信号,playback主题来接受远端相应播放的音频等等。
③温度发布模块:每个一段时间发布一次开发板芯片的温度信息。
④led控制模块:接受远端数据,然后根据数据来控制led灯的发光模式或亮灭。
⑤音频播放模块:接受远端数据,然后根据数据信息来选择播放的音频文件,并且可以实时打断播放或者切换播放。
⑥这里还想加一个视频监控模块,具体功能实现是接收到消息后自动打开视频监控,并显示在显示设备上。后面摄像头到了再补上。
二.各个模块实现思路
1.搭建好连接服务器的环境
在mqtt协议中,客户端在连接服务器前需要创建一个客户端对象,还要设置好回调函数,这个回调函数很重要,因为在后面的模块中,需要接收别的客户端(手机、电脑等)通过服务器发布的消息,当接收到这些消息后,进程就会调用这个回调函数,我们可以在这个回调函数中设置好我们处理接收到消息后处理数据的方法,以及需要执行的代码。然后我们就可以连接服务器了,当然在这几步中我们还需要配置相关的设置,比如服务器IP地址、心跳间隔时间、cleansession标志、用户名以及密码(社区版)等等。
2.温度发布模块
linux下一切皆文件,只需要打开相应的设备文件即可,在我用的开发板中,芯片温度的文件路径为:/sys/class/thermal/thermal_zone0/temp,所以只要把该文件的打开然后循环的去读,然后再发布到temp主题中。
3.led控制模块
在这个功能模块中,需要接收led主题中的消息,然后根据消息的内容进行相应的操作。所以首先客户端在连接服务器后要订阅led主题,这样才能接受到消息。然后当接收到消息时,会自动调用回调函数,所以我们需要在回调函数中判断接受的消息是否是led主题,如果是,再根据消息内容操作相应的文件,led的设备文件路径为:/sys/class/leds/sys-led/trigger(发光模式)以及/sys/class/leds/sys-led/brightness(亮灭),有些led可以支持亮度大小,但我的只支持亮和灭,如果支持还可以根据数值大小调节亮度。
4.音频播放模块
这个模块的实现相对要难一些,难点在于要保证客户端再播放音频时还是可以接收到服务器的消息的,它不像led模块的操作执行在极短的时间内就能完成,所以需要在这个模块上需要进行多线程或者多进程编程,我一开始是进行的多线程编程,但在调试的时候发现多线程下客户端在播放音频时不能接收到服务器的消息,后面分析原因是因为尽管是多线程,但是在一个进程中只能同时存在一个回调函数在执行,所以当有别的消息从服务器端发过来时,客户端不能再调用回调函数执行相关操作。之后我就改用了多进程编程。
这个模块主要实现思路是:接收到消息后,执行回调函数判断是否为playback主题,如果是则判断音频设备是否正在播放,如果在播放就要结束播放进程,然后另外创建新进程播放新传来的消息对应的音频文件(这样设计是为了实现一个切歌、停止播放的功能)。然后就是一些进程回收上的细节处理。
整体思路框架搭建好之后,剩下的就是怎么播放音频了,其实和之前模块差不多,就是稍微复杂一点。在linux音频方面有一个库alsa-lib,它是一套 Linux 应用层的 C 语言函数库,为音频应用程序开发提供了一套统一、标准的接口,应用程序只需调用这一套 API 即可完成对底层声卡设备的操控。具体步骤是打开pcm设备,设置硬件参数(设置数据格式、采样率、声道数等等),应用配置的参数,然后就可以打开应用的音频文件进行读写操作了。读写音频文件时要注意的是wav格式的文件,先解析文件头是否正确,然后再定位到数据开始的地方进行读写。
三.代码实现
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include "MQTTClient.h"
#include <alsa/asoundlib.h>
#include "sys/wait.h"
#define BROKER_ADDRESS "tcp://test.ranye-iot.net:1883"
#define CLIENTID "mqtt_xiesy"
#define LED_TOPIC "mqtt_xiesy/dev/led"
#define WILL_TOPIC "mqtt_xiesy/willtopic"
#define TEMP_TOPIC "mqtt_xiesy/dev/temp"
#define PLAPBACK_TOPIC "mqtt_xiesy/dev/playback"
#define PCM_PLAYBACK_DEV "hw:0,0"//音频播放设备
static int playback_flags;//标记音频设备是否在工作
static snd_pcm_t *pcm = NULL;//pcm设备句柄
static unsigned int buf_bytes; //应用程序缓冲区的大小(字节为单位)
static void *buf = NULL; //指向应用程序缓冲区的指针
static int fd = -1; //指向 WAV 音频文件的文件描述符
static snd_pcm_uframes_t period_size = 1024; //周期大小(单位: 帧)
static unsigned int periods = 16; //周期数(设备驱动层 buffer 的大小)
static pid_t pid;
typedef struct WAV_RIFF
{
char ChunkID[4]; /* "RIFF" */
u_int32_t ChunkSize; /* 从下一个地址开始到文件末尾的总字节数 */
char Format[4]; /* "WAVE" */
} __attribute__ ((packed)) RIFF_t;
typedef struct WAV_FMT {
char Subchunk1ID[4];
u_int32_t Subchunk1Size;
u_int16_t AudioFormat;
u_int16_t NumChannels;
u_int32_t SampleRate;
u_int32_t ByteRate;
u_int16_t BlockAlign;
u_int16_t BitsPerSample;
} __attribute__ ((packed)) FMT_t;
static FMT_t wav_fmt;
typedef struct WAV_DATA {
char Subchunk2ID[4];
u_int32_t Subchunk2Size;
} __attribute__ ((packed)) DATA_t;
void led_cntr(char *topicName,MQTTClient_message*message)//led控制
{
if(!strcmp("2",message->payload))
{
system("echo heartbeat > /sys/class/leds/sys-led/trigger");
printf("接受到消息:%s:%sn",topicName,message->payload);
}
if(!strcmp("1",message->payload))
{
system("echo none > /sys/class/leds/sys-led/trigger");
system("echo 1 > /sys/class/leds/sys-led/brightness");
printf("接受到消息:%s:%sn",topicName,message->payload);
}
else if (!strcmp("0", message->payload))
{
system("echo none > /sys/class/leds/sys-led/trigger");
system("echo 0 > /sys/class/leds/sys-led/brightness");
printf("接受到消息:%s:%sn",topicName,message->payload);
}
}
static int playback_init(void)
{
int ret;
snd_pcm_hw_params_t *hwparams = NULL;
ret=snd_pcm_open(&pcm,PCM_PLAYBACK_DEV,SND_PCM_STREAM_PLAYBACK,0);//打开设备
if(ret>0)
{
fprintf(stderr, "snd_pcm_open error: %s: %sn",PCM_PLAYBACK_DEV, snd_strerror(ret));
return -1;
}
snd_pcm_hw_params_malloc(&hwparams);//实例化
ret = snd_pcm_hw_params_any(pcm, hwparams);//初始化
if (0 > ret)
{
fprintf(stderr, "snd_pcm_hw_params_any error: %sn", snd_strerror(ret));
goto err2;
}
//设置交错模式
ret = snd_pcm_hw_params_set_access(pcm, hwparams, SND_PCM_ACCESS_RW_INTERLEAVED);
if (0 > ret)
{
fprintf(stderr, "snd_pcm_hw_params_set_access error: %sn", snd_strerror(ret));
goto err2;
}
//设置数据格式
ret = snd_pcm_hw_params_set_format(pcm, hwparams, SND_PCM_FORMAT_S16_LE);
if (0 > ret)
{
fprintf(stderr, "snd_pcm_hw_params_set_format error: %sn", snd_strerror(ret));
goto err2;
}
//设置采样率
wav_fmt.SampleRate=44100;
ret = snd_pcm_hw_params_set_rate(pcm, hwparams, wav_fmt.SampleRate, 0);
if (0 > ret)
{
fprintf(stderr, "snd_pcm_hw_params_set_rate error: %sn", snd_strerror(ret));
goto err2;
}
//设置声道数
wav_fmt.NumChannels=2;
ret = snd_pcm_hw_params_set_channels(pcm, hwparams, wav_fmt.NumChannels);
if (0 > ret)
{
fprintf(stderr, "snd_pcm_hw_params_set_channels error: %sn", snd_strerror(ret));
goto err2;
}
ret = snd_pcm_hw_params_set_period_size(pcm, hwparams, period_size, 0);
if (0 > ret)
{
fprintf(stderr, "snd_pcm_hw_params_set_period_size error: %sn", snd_strerror(ret));
goto err2;
}
ret = snd_pcm_hw_params_set_periods(pcm, hwparams, periods, 0);
if (0 > ret)
{
fprintf(stderr, "snd_pcm_hw_params_set_periods error: %sn", snd_strerror(ret));
goto err2;
}
ret = snd_pcm_hw_params(pcm, hwparams);
snd_pcm_hw_params_free(hwparams); //释放 hwparams 对象占用的内存
if (0 > ret)
{
fprintf(stderr, "snd_pcm_hw_params error: %sn", snd_strerror(ret));
goto err1;
}
buf_bytes = period_size * wav_fmt.BlockAlign;
return 0;
err2:
snd_pcm_hw_params_free(hwparams);
err1:
snd_pcm_close(pcm);
return -1;
}
static int open_wav_file(const char *file)
{
RIFF_t wav_riff;
DATA_t wav_data;
int ret;
fd = open(file, O_RDONLY);
if (0 > fd)
{
fprintf(stderr, "open error: %s: %sn", file, strerror(errno));
return -1;
}
ret = read(fd, &wav_riff, sizeof(RIFF_t));
if (sizeof(RIFF_t) != ret)
{
if (0 > ret)
perror("read error");
else
fprintf(stderr, "check error: %sn", file);
close(fd);
return -1;
}
if (strncmp("RIFF", wav_riff.ChunkID, 4) || strncmp("WAVE", wav_riff.Format, 4))
{
fprintf(stderr, "check error: %sn", file);
close(fd);
return -1;
}
ret = read(fd, &wav_fmt, sizeof(FMT_t));
if (sizeof(FMT_t) != ret)
{
if (0 > ret)
perror("read error");
else
fprintf(stderr, "check error: %sn", file);
close(fd);
return -1;
}
if (strncmp("fmt ", wav_fmt.Subchunk1ID, 4))
{
fprintf(stderr, "check error: %sn", file);
close(fd);
return -1;
}
if (0 > lseek(fd, sizeof(RIFF_t) + 8 + wav_fmt.Subchunk1Size,SEEK_SET))
{
perror("lseek error");
close(fd);
return -1;
}
while(sizeof(DATA_t) == read(fd, &wav_data, sizeof(DATA_t)))
{
if (!strncmp("data", wav_data.Subchunk2ID, 4))
return 0;
if (0 > lseek(fd, wav_data.Subchunk2Size, SEEK_CUR))
{
perror("lseek error");
close(fd);
return -1;
}
}
fprintf(stderr, "check error: %sn", file);
return -1;
}
void playback_cntr(char*topicName,MQTTClient_message*message)//音频设备控制
{
printf("接受到消息:%s:%sn",topicName,message->payload);
int ret;
if (open_wav_file(message->payload))
goto err0;
printf("打开音频文件成功n");
if (playback_init())
goto err1;
printf("初始化设备成功n");
buf = malloc(buf_bytes);
if (NULL == buf)
{
perror("malloc error");
goto err2;
}
printf("开始播放n");
for(;;)
{
if(playback_flags==0)
goto err3;
memset(buf, 0x00, buf_bytes);
ret = read(fd, buf, buf_bytes);
if (0 >= ret)
goto err3;
ret = snd_pcm_writei(pcm, buf, period_size);
if (0 > ret)
{
fprintf(stderr, "snd_pcm_writei error: %sn", snd_strerror(ret));
goto err3;
}
else if (ret < period_size)
{
if (0 > lseek(fd, (ret-period_size) * wav_fmt.BlockAlign, SEEK_CUR))
{
perror("lseek error");
goto err3;
}
}
}
err3:
free(buf);
err2:
snd_pcm_close(pcm);
err1:
close(fd);
err0:
exit(0);
}
static int msgarrvd(void *context, char *topicName, int topicLen,MQTTClient_message *message)
{
if(!strcmp(topicName,LED_TOPIC))
led_cntr(topicName,message);
else if(!strcmp(topicName,PLAPBACK_TOPIC))
{
if(!strcmp(message->payload,"close"))
{
printf("关闭播放n");
kill(pid,SIGKILL);
waitpid(pid,NULL,0);
playback_flags=0;
return 1;
}
if(playback_flags==0)
{
printf("设备空闲n");
playback_flags=1;
pid=fork();
}
else
{
printf("设备被打断,播放新的文件n");
kill(pid,SIGKILL);
waitpid(pid,NULL,0);
playback_flags=1;
pid=fork();
}
if(pid==0)
playback_cntr(topicName,message);
}
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
return 1;
}
static void connlost(void *context, char *cause) {
printf("nConnection lostn");
printf(" cause: %sn", cause);
}
int main()
{
int rc;
MQTTClient client;
MQTTClient_connectOptions con_options=MQTTClient_connectOptions_initializer;
MQTTClient_willOptions will_options=MQTTClient_willOptions_initializer;
MQTTClient_message pubmsg = MQTTClient_message_initializer;
if(MQTTCLIENT_SUCCESS!=(rc=MQTTClient_create(&client,BROKER_ADDRESS,
CLIENTID,MQTTCLIENT_PERSISTENCE_NONE,NULL)))
{
printf("Create MQTTCLient error,return code is:%dn",rc);
rc = EXIT_FAILURE;
goto exit;
}
will_options.topicName=WILL_TOPIC;
will_options.message="Abnormally dropped";
will_options.retained=1;
will_options.qos=0;
con_options.keepAliveInterval=30;
con_options.cleansession=0;
con_options.will=&will_options;
if(MQTTClient_setCallbacks(client,NULL,connlost,msgarrvd,NULL)!=MQTTCLIENT_SUCCESS)
{
printf("Failed to set callbacks, return code %dn", rc);
rc = EXIT_FAILURE;
goto destroy_exit;
}
if( MQTTCLIENT_SUCCESS!=(rc=MQTTClient_connect(client,&con_options)))
{
printf("Connect MQTT error,return code is:%dn",rc);
rc=EXIT_FAILURE;
goto destroy_exit;
}
printf("MQTT 服务器连接成功!n");
pubmsg.payload = "Online";
pubmsg.payloadlen = 6;
pubmsg.qos = 0;
pubmsg.retained = 1;
if(MQTTCLIENT_SUCCESS!=(rc=MQTTClient_publishMessage(client,WILL_TOPIC,&pubmsg,NULL)))
{
printf("Failed to publish message, return code %dn", rc);
rc = EXIT_FAILURE;
goto disconnect_exit;
}
if (MQTTCLIENT_SUCCESS !=(rc = MQTTClient_subscribe(client, LED_TOPIC, 0)))
{
printf("Failed to subscribe, return code %dn", rc);
rc = EXIT_FAILURE;
goto disconnect_exit;
}
printf("订阅主题成功:%sn",LED_TOPIC);
if (MQTTCLIENT_SUCCESS !=(rc = MQTTClient_subscribe(client, PLAPBACK_TOPIC, 0)))
{
printf("Failed to subscribe, return code %dn", rc);
rc = EXIT_FAILURE;
goto disconnect_exit;
}
printf("订阅主题成功:%sn",PLAPBACK_TOPIC);
for(;;)
{
if(waitpid(pid,NULL,WNOHANG))
playback_flags=0;
MQTTClient_message tempmsg = MQTTClient_message_initializer;
int fd;
char temp[10]={0};
fd=open("/sys/class/thermal/thermal_zone0/temp",O_RDONLY);
read(fd,temp,sizeof(temp));
close(fd);
tempmsg.payload=temp;
tempmsg.payloadlen=strlen(temp);
tempmsg.qos=0;
tempmsg.retained=1;
if(MQTTCLIENT_SUCCESS!=(rc=MQTTClient_publishMessage(client,TEMP_TOPIC,&tempmsg,NULL)))
{
printf("Failed to publish message, return code %dn", rc);
rc = EXIT_FAILURE;
goto disconnect_exit;
}
sleep(30);
}
disconnect_exit:
if (MQTTCLIENT_SUCCESS !=(rc = MQTTClient_disconnect(client, 10000)))
{
printf("Failed to disconnect, return code %dn", rc);
rc = EXIT_FAILURE;
}
destroy_exit:
MQTTClient_destroy(&client);
exit:
return rc;
}
最后
以上就是美好奇异果为你收集整理的MQTT实战小项目一.主要内容二.各个模块实现思路三.代码实现的全部内容,希望文章能够帮你解决MQTT实战小项目一.主要内容二.各个模块实现思路三.代码实现所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复