概述
上一篇多线程介绍的是,QT多线程处理机制,这篇,将对接收数据,实时处理进行分析。
QT通过socket通信,从接收缓冲区中读取数据,交给线程进行处理,那么问题来了,如果线程还没有处理完数据,则线程就没有办法继续从缓冲区中取数,那么当数据量过大的时候,缓冲区会满,继而被覆盖,从而造成数据的丢失。那么如何将数据存储在某个特定的空间内,并且让其他线程进行读取。这个是线程通信的问题,这个问题有多种方式,一种是操作系统课本上,通过线程同步、互斥、信号量这三种机制实现线程通信,或者就是通过循环队列的方式,完成线程通信。
这篇主要介绍的是第二种方式,即循环队列的方式,进行通信。
循环队列的实现方式,通过全局变量定义一个大的数组,同时,定义两个读写指针,这个指针不是语言中指针这个类型,可以理解成,两个标志位,记录读和写的位置,通过这种方式,可以实现一个循环队列的基本模型。如下图:
write表示写指针,read表示读指针。我们将从socket缓冲区接收到的数据,缓存到队列中,将写指针向后移动,另外一个线程,操作读指针,不断跟随写指针,将数据取出,处理。下面把代码贴上来供大家参考:
MyThread.h
#ifndef MYTHREAD_H
#define MYTHREAD_H
#include <QThread>
#include <QDebug>
#include "basetype.h"
#include "mythreadstore.h"
/**
* @brief The MyThreadRecv class
* 该类负责接收从tcp读取到的数据。并将数据存储到缓冲区中
*/
class MyThreadRecv
{
public:
MyThreadRecv();
~MyThreadRecv(){};
void RecvMsg(const BYTE *data, int len); ///< 接收数据,存入循环队列
};
/**
* @brief The MyThread class
* 处理数据的线程
*/
class MyThread: public QThread
{
public:
public:
MyThread();
~MyThread(){};
void init();
void run(); ///< 任务执行
private:
volatile bool stopped;
int Flag;
MythreadStore *mythreadstore;
};
#endif // MYTHREAD_H
MyThread.cpp
#include "mythread.h"
BYTE Queue1[(1024 * 500)] = {0}; ///< 循环消息队列
int wReadPoint = 0; ///< 读指针
int wWritePoint = 0; ///< 写指针
MyThreadRecv::MyThreadRecv()
{
}
void MyThreadRecv::RecvMsg(const BYTE *data, int len)
{
qDebug()<<"I will gointo for";
for(int iNum = 0; iNum < len; iNum++) {
/**
* @brief tempWReadPoint
* 存储,到程序执行到此处的时候,wReadPoint的值,因为线程一直在执行,很有可能执行到这步的时候,wReadPoint的值被改变。
*/
int tempWReadPoint = wReadPoint;
if((wWritePoint + 1) % (1024 * 500) == tempWReadPoint) {
/**
* 队列已满
*/
continue;
}
/**
* 处理队列不满的情况
*/
Queue1[wWritePoint % (1024 * 500)] = data[iNum];
wWritePoint = (wWritePoint +1) % (1024 * 500);
}
qDebug()<<"After for";
}
void MyThread::init()
{
start();
}
MyThread::MyThread()
{
stopped = false;
Flag = 0;
mythreadstore = new MythreadStore();
}
void MyThread::run()
{
qDebug()<<"In run";
int iFlag = 0; ///< 标志位
int iNum = 0;
BYTE NeedDealdata[200] = {0};
while(!stopped) {
/**
* @brief itempWritePoint
* 存储,到程序执行到此处的时候,wWritePoint的值,因为线程一直在执行,很有可能执行到这步的时候,wWritePoint的值被改变。
*/
int itempWritePoint = wWritePoint;
if((wReadPoint) % (1024 * 500) != itempWritePoint) {
/**
* 队列不空
*/
if((0 != Queue1[(wReadPoint - 2) % (1024 * 500)]) && (0x5A == Queue1[(wReadPoint - 1) % (1024 * 500)]) && (0x54 == Queue1[(wReadPoint) % (1024 * 500)])) {
/**
* 找帧头
*/
iNum = 0;
NeedDealdata[iNum++] = Queue1[(wReadPoint -1) % (1024 * 500)];
NeedDealdata[iNum++] = Queue1[(wReadPoint) % (1024 * 500)];
wReadPoint = (wReadPoint + 1) % (1024 * 500);
iFlag = 1;
}
if((0 != Queue1[(wReadPoint - 2) % (1024 * 500)]) && (0x5A == Queue1[(wReadPoint - 1) % (1024 * 500)]) && (0xFE == Queue1[(wReadPoint) % (1024 * 500)]) && (1 == iFlag)) {
NeedDealdata[iNum++] = Queue1[(wReadPoint) % (1024 * 500)];
wReadPoint = (wReadPoint + 1) % (1024 * 500);
qDebug()<<"I will store msg || iNum = " + QString::number(iNum);
/**
* 找到需要处理的数据,处理数据
*/
mythreadstore->StoreMsgDeal(NeedDealdata, iNum);
memset(NeedDealdata, '