概述
下一篇文章里我们通过交叉编译生成了 libmosquitto.so.1
参看:MQTT再学习 -- 交叉编译与移植
之前有讲过MQTT客户端的测试,参看:MQTT再学习 -- 安装MQTT客户端及测试
那接下来,我们就开始先测试一下,看看交叉编译生成的客户端,是否能发送数据到服务器上。
一、生成交叉编译客户端
# ls
a.txt deviceCfg.xml libmosquitto.so.1 mosq.c mosquitto.h net_zslf.c net_zslf.h readme.txt
交叉编译,生成目标文件。arm-none-linux-gnueabi-gcc -c net_zslf.c mosq.c
链接共享库:
//查看文件属性
# file client
client: ELF 32-bit LSB executable, ARM, version 1 (SYSV), dynamically linked (uses shared libs),
for GNU/Linux 2.6.14, not stripped
//Ubuntu下 不能执行
# ./client
bash: ./client: cannot execute binary file
注意: 这个必须要链接 libssl.so.1.0.0 libcrypto.so.1.0.0,否则出现错误:
/opt/arm-2009q1-203/bin/../lib/gcc/arm-none-linux-gnueabi/4.3.3/../../../../arm-none-linux-gnueabi/bin/ld: cannot find -lmosquitto
collect2: ld returned 1 exit status
这部分参看文章:
mosquitto的交叉编译mips
mqtt的开源实现mosquitto的交叉编译与使用
尝试了一上午终于找到原因不容易啊。至此生成了Linux下可执行的客户端的二进制文件 client
二、在开发板上执行客户端测试
将这三个库文件 libcrypto.so.1.0.0 libmosquitto.so.1 libssl.so.1.0.0 拷贝到开发板 lib 目录下
然后拷贝 client 和 deviceCfg.xml 放在同一目录
执行client 之前需要修改权限:chmod 777 client
Ubuntu下 打开 MQTT 服务器
开发板下 执行 client
OK,终于完成了!!!
证明交叉编译生成的客户端测试代码是可行的。
三、客户单测试源码下载
下载:交叉编译 MQTT 客户端测试源码
四、结合MQTT和PMS7003开发程序
这才是本篇文章的核心,也是本项目最后的一步了。
#include <fcntl.h> //文件控制定义
#include <stdio.h> //标准输入输出定义
#include <stdlib.h> //标准函数库定义
#include <unistd.h> //Unix标准函数定义
#include <errno.h> //错误好定义
#include <termios.h> //POSIX终端控制定义
#include <sys/ioctl.h> //ioctl函数定义
#include <string.h> //字符操作
#include <sys/types.h>
#include <sys/stat.h>
#include <pthread.h>
#include <sys/time.h>
#include <sys/msg.h>
#include <stdint.h>
#include <stdbool.h>
#include <pthread.h>
#include "mosquitto.h"
#include "net_zslf.h"
#include <sys/wait.h>
//message queue variable
int message_count = 0;
//const char *mqtt_broker_address = "192.168.2.52"; /* mqtt_broker ip address */
int mqtt_broker_port = 1883; /* mqtt_broker port number */
long msgtype = 10; /* pm sensor message type */
//int msgsize = 100; /* pm sensor message size */
int msgsize = sizeof (MSG_data_buf); /* pm sensor message size */
int fd_gpio;
struct termios newtio, oldtio;
typedef struct {
int pin_idx;
int pin_dir;
int pin_sta;
} davinci_gio_arg;
typedef enum {
AT91PIO_DIR_OUT = 0,
AT91PIO_DIR_INP
} davinci_gio_dir;
//驱动判断输入输出模式
davinci_gio_arg arg;
#define DEV_PIO_LED "/dev/pio"
// 需要手动添加设备号 mknod /dev/pio c 203 0
#define PIO_NUM 47
// 47pin 为控制输入输出方向引脚
#define DEV_UART "/dev/ttyS1"
// /dev/ttyS1 为串口设备
#define IOCTL_PIO_SETDIR 1 //set gpio direct
#define IOCTL_PIO_GETDIR 2 //get gpio direct
#define IOCTL_PIO_SETSTA 3 //set gpio status
#define IOCTL_PIO_GETSTA 4 //get gpio status
//保存信息
int log_init( const char *strFileName )
{
int fdLog = -1;
if( -1 == (fdLog = open( strFileName, O_CREAT|O_TRUNC ) ) )
{
}
close( fdLog );
}
int log_out( const char *strFileName, const char * szLog )
{
int fdLog = -1;
if( -1 == ( fdLog = open( strFileName, O_CREAT|O_WRONLY|O_APPEND ) ) )
{
printf( "LOG (%s) open error!n", strFileName );
return -1;
}
write( fdLog, szLog, strlen( szLog ) );
close( fdLog );
return 0;
}
//配置串口
/* 参数说明:fd 设备文件描述符,nspeed 波特率,nbits 数据位数(7位或8位),
parity 奇偶校验位('n'或'N'为无校验位,'o'或'O'为偶校验,'e'或'E'奇校验),
nstop 停止位(1位或2位)
成功返回1,失败返回-1。
*/
int set_com_opt( int fd, int nspeed, int nbits, char parity, int nstop )
{
char szTmp[128];
//打印配置信息
sprintf( szTmp, "set_com_opt - speed:%d,bits:%d,parity:%c,stop:%dn",
nspeed, nbits, parity, nstop );
log_out( "./485.log", szTmp );
//保存并测试现在有串口参数设置,在这里如果串口号等出错,会有相关的出错信息
if( tcgetattr( fd, &oldtio ) != 0 )
{
sprintf( szTmp, "SetupSerial 1" );
log_out( "./485.log", szTmp );
perror( "SetupSerial 1" );
return -1;
}
//修改输出模式,原始数据输出
bzero( &newtio, sizeof( newtio ));
newtio.c_cflag &=~(OPOST);
//屏蔽其他标志位
newtio.c_cflag |= (CLOCAL | CREAD );
newtio.c_cflag &= ~CSIZE;
//设置数据位
switch( nbits )
{
case 7:
newtio.c_cflag |= CS7;
break;
case 8:
newtio.c_cflag |= CS8;
break;
default:
perror("Unsupported date bit!n");
return -1;
}
//设置校验位
switch( parity )
{
case 'n':
case 'N': //无奇偶校验位
newtio.c_cflag &= ~PARENB;
newtio.c_iflag &= ~INPCK;
break;
case 'o':
case 'O': //设置为奇校验
newtio.c_cflag |= ( PARODD | PARENB );
newtio.c_iflag |= ( INPCK | ISTRIP );
break;
case 'e':
case 'E': //设置为偶校验
newtio.c_iflag |= ( INPCK |ISTRIP );
newtio.c_cflag |= PARENB;
newtio.c_cflag &= ~PARODD;
break;
default:
perror("unsupported parityn");
return -1;
}
//设置停止位
switch( nstop )
{
case 1:
newtio.c_cflag &= ~CSTOPB;
break;
case 2:
newtio.c_cflag |= CSTOPB;
break;
default :
perror("Unsupported stop bitn");
return -1;
}
//设置波特率
switch( nspeed )
{
case 2400:
cfsetispeed( &newtio, B2400 );
cfsetospeed( &newtio, B2400 );
break;
case 4800:
cfsetispeed( &newtio, B4800 );
cfsetospeed( &newtio, B4800 );
break;
case 9600:
cfsetispeed( &newtio, B9600 );
cfsetospeed( &newtio, B9600 );
break;
case 115200:
cfsetispeed( &newtio, B115200 );
cfsetospeed( &newtio, B115200 );
break;
case 460800:
cfsetispeed( &newtio, B460800 );
cfsetospeed( &newtio, B460800 );
break;
default:
cfsetispeed( &newtio, B9600 );
cfsetospeed( &newtio, B9600 );
break;
}
//设置等待时间和最小接收字符
newtio.c_cc[VTIME] = 0;
newtio.c_cc[VMIN] = 0;
//VTIME=0,VMIN=0,不管能否读取到数据,read都会立即返回。
//输入模式
newtio.c_lflag &= ~(ICANON|ECHO|ECHOE|ISIG);
//设置数据流控制
newtio.c_iflag &= ~(IXON|IXOFF|IXANY); //使用软件流控制
//如果发生数据溢出,接收数据,但是不再读取 刷新收到的数据但是不读
tcflush( fd, TCIFLUSH );
//激活配置 (将修改后的termios数据设置到串口中)
if( tcsetattr( fd, TCSANOW, &newtio ) != 0 )
{
sprintf( szTmp, "serial set error!n" );
log_out( "./485.log", szTmp );
perror( "serial set error!" );
return -1;
}
log_out( "./485.log", "serial set ok!n" );
return 1;
}
//打开串口并返回串口设备文件描述
int open_com_dev( char *dev_name )
{
int fd;
char szTmp[128];
log_init( "./485.log" );
if(( fd = open( dev_name, O_RDWR|O_NOCTTY|O_NDELAY)) == -1 )
{
perror("openn");
//printf("Can't open Serial %s Port!n", dev_name );
sprintf( szTmp, "Can't open Serial %s Port!n", dev_name );
log_out( "./485.log", szTmp );
return -1;
}
sprintf( szTmp, "open %s ok!n", dev_name );
log_out( "./485.log", szTmp );
if(fcntl(fd,F_SETFL,0)<0)
{
printf("fcntl failed!n");
}
//printf("Open %s okn",dev_name );
return fd;
}
int main (void)
{
int gflags;
int msgid;
key_t key;
pthread_t thread1, thread2;
int ret;
/* struct msqid_ds msg_ginfo, msg_sinfo; */
char *msgpath = "home/tarena/project/MQTT/test/a.txt";
//消息队列的 键
key = ftok(msgpath, 'a');
gflags = IPC_CREAT;
//创建消息队列
msgid = msgget(key, gflags | 00666);
if(msgid == -1)
{
DUG_PRINTF("msg create errorn");
return -1;
}
int error = 0, error1 = 0;
arg.pin_idx = PIO_NUM;
arg.pin_dir = AT91PIO_DIR_OUT;
//打开/dev/pio设备
fd_gpio = open(DEV_PIO_LED, O_RDWR);
if(fd_gpio < 0)
{
perror("fd_gpio open err");
exit (-1);
}
/* msg_stat(msgid,msg_ginfo); */
//创建消息队列发送线程
ret = pthread_create(&thread1, NULL, &start_thread_msgsend, (void *)&msgid);
if (ret != 0) {
perror("pthread msgsend create errorn");
return -1;
}
//创建消息队列接收线程
ret = pthread_create(&thread2, NULL, &start_thread_msgrcv, (void *)&msgid);
if (ret != 0){
perror("pthread msgrcv create errorn");
return -1;
}
//线程等待
pthread_join(thread1, NULL);
pthread_join(thread2, NULL);
//关闭设备
close (fd_gpio);
return 0;
}
//消息队列接收线程
void *start_thread_msgrcv(void *arg)
{
int rflags = 0;
int ret;
int msgid = *(int *)(arg);
MSG_data_buf msg_rbuf; //消息队列类型
struct mosquitto *mosq; //保存一个MQTT客户端连接的所有信息
//下面的代码是从xml文件中读取
FILE *fp;
char szFileBuff[1024] = {0};
char serverADDR[16] = {0},devID[15] = {0},devName[15] = {0};
char Longitude[15] = {0},Latitude[15] = {0},frequency[3] = {0};
char *lFirst, *lEnd;
char devInfo[70];
FILE *fp_re;
char buffer_re[4];
//打开xml文件
fp = fopen("deviceCfg.xml","r");
if (fp==NULL)
{
DUG_PRINTF("read XML file error!n");
}
//你只要知道while里面是获取xml信息的,至于这种操作有点6
while(fgets(szFileBuff, 1023, fp))
{
if ((lFirst = strstr(szFileBuff, "<serverADDR>")) != NULL)
{
lEnd = strstr(lFirst + 1, "</serverADDR>");
memcpy(serverADDR, lFirst + 12, lEnd - lFirst - 12);
}
if ((lFirst = strstr(szFileBuff, "<devID>")) != NULL)
{
lEnd = strstr(lFirst + 1, "</devID>");
memcpy(devID, lFirst + 7, lEnd - lFirst - 7);
}
if ((lFirst = strstr(szFileBuff, "<devName>")) != NULL)
{
lEnd = strstr(lFirst + 1, "</devName>");
memcpy(devName, lFirst + 9, lEnd - lFirst - 9);
}
if ((lFirst = strstr(szFileBuff, "<Longitude>")) != NULL)
{
lEnd = strstr(lFirst + 1, "</Longitude>");
memcpy(Longitude, lFirst + 11, lEnd - lFirst - 11);
}
if ((lFirst = strstr(szFileBuff, "<Latitude>")) != NULL)
{
lEnd = strstr(lFirst + 1, "</Latitude>");
memcpy(Latitude, lFirst + 10, lEnd - lFirst - 10);
}
//下面这个语句是用于分频率传送数据的
if ((lFirst = strstr(szFileBuff, "<frequency>")) != NULL)
{
lEnd = strstr(lFirst + 1, "</frequency>");
memcpy(frequency, lFirst + 11, lEnd - lFirst - 11);
}
if ((lFirst = strstr(szFileBuff, "</display>")) != NULL)
{
sprintf(devInfo, "&%s&%s&%s&%s&n",devID,devName,Longitude,Latitude);
}
}
fclose(fp);
// sleep(18);
//这里是关键了,MQTT协议 要上演了 这部分为 pub 发布内容
//MQTT 库初始化
mosquitto_lib_init();
//新建
mosq = mosquitto_new(devID, true, NULL);
//连接回调设置
mosquitto_connect_callback_set(mosq, my_connect_callback);
//断开回调设置
mosquitto_disconnect_callback_set(mosq, my_disconnect_callback);
//发布回调设置
mosquitto_publish_callback_set(mosq, my_publish_callback);
//MQTT连接
if(mosquitto_connect(mosq, serverADDR, mqtt_broker_port, 600) != MOSQ_ERR_SUCCESS)
{
DUG_PRINTF("mosquitto connection errorn");
//销毁
mosquitto_destroy(mosq);
//清空
mosquitto_lib_cleanup();
}
// sleep(18);
//这里开始 消息队列接收消息
while (1)
{
//接收消息
ret=msgrcv(msgid, &msg_rbuf, msgsize, msgtype, rflags);
//sleep(1);
if (ret == -1)
{
DUG_PRINTF("read msg errorn");
}
DUG_PRINTF("%sn", msg_rbuf.mtext);
<span style="white-space:pre"> </span>//strcat(msg_rbuf.mtext,devInfo); 将其和其他信息合并
//将接收到的信息发布
mosquitto_publish(mosq, NULL, "pmsensor", ret, (void *)msg_rbuf.mtext, 0, 0);
//sleep(5);
<span style="white-space:pre"> </span>memset (msg_rbuf.mtext, 0, sizeof (msg_rbuf.mtext));
}
/* should never run below */
//销毁
mosquitto_destroy(mosq);
//清空
mosquitto_lib_cleanup();
return NULL;
}
//单片机数据收发
void* start_thread_msgsend(void* p)
{
char buf[64];
char frameBuf[64];
int detectOff = 0;
int res = 0, nread = 0;
int m_pm1_factory;
int m_pm25_factory;
int m_pm10_factory;
int m_pm1_outdoor;
int m_pm25_outdoor;
int m_pm10_outdoor;
int m_count03;
int m_count05;
int m_count1;
int m_count25;
int m_count5;
int m_count10;
unsigned short m_length;
unsigned short m_version;
unsigned short m_errorno;
char pm_f[30];
int retval;
MSG_data_buf msg_sbuf; //消息队列类型
int msgid = *(int *)(p);
int sflags=IPC_NOWAIT;
msg_sbuf.mtype = msgtype;
while (1) {
arg.pin_sta = 0; //设为低电平 接收态
ioctl(fd_gpio, IOCTL_PIO_SETSTA, &arg);
int fd_r=open_com_dev( DEV_UART );
if( fd_r < 0 )
{
printf( "open UART device error! %sn", DEV_UART );
}
else
set_com_opt(fd_r, 9600,8,'n',1);
//执行select
fd_set rd;
FD_ZERO(&rd);
FD_SET(fd_r, &rd);
if ((res = select (fd_r+1,&rd, NULL, NULL, NULL) )< 0)
{
perror ("read err");
exit (-1);
}
memset (buf, 0, sizeof (buf));
if (FD_ISSET (fd_r, &rd))
{
//接收数据 8 8 2
int res1 = 0;
int val = 0;
int calcChecksum = 0;
int checksum = 0;
int i = 0;
while ((nread = read(fd_r, buf, 1)) > 0)
{
//printf ("%02X ",*buf);
//frameBuf[detectOff] = buf[0];
memcpy (frameBuf+detectOff, buf, 1);
//calcChecksum += *buf;
detectOff++;
if (frameBuf[0] == 0x42 && frameBuf[1] == 0x4d)
{
// m_length = frameBuf[3]+(frameBuf[2]<<8);
m_pm1_factory = frameBuf[5]+(frameBuf[4]<<8);
m_pm25_factory = frameBuf[7]+(frameBuf[6]<<8);
m_pm10_factory = frameBuf[9]+(frameBuf[8]<<8);
m_pm1_outdoor = frameBuf[11]+(frameBuf[10]<<8);
m_pm25_outdoor = frameBuf[13]+(frameBuf[12]<<8);
m_pm10_outdoor = frameBuf[15]+(frameBuf[14]<<8);
// m_count03 = frameBuf[17]+(frameBuf[16]<<8);
// m_count05 = frameBuf[19]+(frameBuf[18]<<8);
// m_count1 = frameBuf[21]+(frameBuf[20]<<8);
// m_count25 = frameBuf[23]+(frameBuf[22]<<8);
// m_count5 = frameBuf[25]+(frameBuf[24]<<8);
// m_count10 = frameBuf[27]+(frameBuf[26]<<8);
// m_version = frameBuf[28];
// m_errorno = frameBuf[29];
// checksum = frameBuf[31]+(frameBuf[30]<<8);
// calcChecksum -= ((checksum>>8)+(checksum&0xFF));
}
//退出循环, 这里有点疑问
if (detectOff == 32)
{
//printf ("n");
//printf ("pm1_factory = %d ug/m3npm25_factory = %d ug/m3npm10_factory = %d ug/m3n", m_pm1_factory, m_pm25_factory, m_pm10_factory);
//printf ("pm1_outdoor = %d ug/m3npm25_outdoor = %d ug/m3npm10_outdoor = %d ug/m3n",m_pm1_outdoor,m_pm25_outdoor,m_pm10_outdoor);
// printf ("m_count03 = %02Xnm_count05 = %02Xnm_count1 = %02Xnm_count25 = %02Xnm_count5 = %02Xnm_count10 = %02Xn",
// m_count03, m_count05, m_count1, m_count25, m_count5, m_count10);
// printf ("m_length = %dn", m_length);
// printf ("m_version = %sn", m_version);
// printf ("m_errorno = %sn", m_errorno);
// printf("checksum = %02X %s calcChecksum = %02X",checksum, (calcChecksum == checksum ? "==" : "!="), calcChecksum);
// for (i = 0;i<32;i++)
// {
// printf ("%02X ", frameBuf[i]);
// }
// printf ("n");
sprintf(pm_f,"%d %d",m_pm25_factory,m_pm10_factory);
/*这里和 下面的 read 结合 就好了 */
strcpy (msg_sbuf.mtext,pm_f);
//strcpy (msg_sbuf.mtext,"hello world");
//消息队列发送
//retval = msgsnd(msgid, &msg_sbuf,msgsize, sflags);
retval = msgsnd(msgid, &msg_sbuf,msgsize, 0);
if(retval == -1)
{
DUG_PRINTF("message send errorn");
}
memset (frameBuf, 0, sizeof (frameBuf));
memset (pm_f, 0, sizeof (pm_f));
memset (msg_sbuf.mtext, 0, sizeof (msg_sbuf.mtext));
detectOff = 0;
break;
}
}
}
close (fd_r);
usleep (200000);
}
}
最后
以上就是昏睡乌龟为你收集整理的PM2.5检测 -- PMS7003 采集和 MQTT 传输一、生成交叉编译客户端二、在开发板上执行客户端测试三、客户单测试源码下载四、结合MQTT和PMS7003开发程序这才是本篇文章的核心,也是本项目最后的一步了。的全部内容,希望文章能够帮你解决PM2.5检测 -- PMS7003 采集和 MQTT 传输一、生成交叉编译客户端二、在开发板上执行客户端测试三、客户单测试源码下载四、结合MQTT和PMS7003开发程序这才是本篇文章的核心,也是本项目最后的一步了。所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复