高并发服务器整体框架
前置服务器
通常前置机从架构上解决两个问题
1,前端很多种类业务数据需要与后端核心业务数据交互,如果后端核心业务一个类型开发一个接口肯定是得不偿失的,所以利用前置机做数据收集清洗转换格式汇总给后端。访问路径是业务客户端到前置机伪服务端,伪服务端也就是真正的客户端然后最后才是业务服务器的服务端。
2,如果接口多了核心业务安全性也会下降,那有了前置机,大家先访问前置机,而不直接访问服务器,一般再配合审计的安全手段,很好的隐藏了后端服务器的存在。
前置服务器和后置服务器通信
这里我选用共享内存
共享内存
共享内存是在两个正在运行的进程之间共享和传递数据的一种非常有效的方式。不同进程之间共享的内存通常安排为同一段物理内存。进程可以将同一段共享内存连接到它们自己的地址空间中,所有进程都可以访问共享内存中的地址。
共享内存的特点
1、共享内存是以传输数据为目的
2、共享内存无同步无互斥
3、共享内存是所有进程间通信速度最快的。
4、共享内存的生命周期随内核
共享内存的优缺点
1、优点:使用共享内存进行进程间的通信非常方便,而且函数的接口也简单,数据的共享使进程间的数据不用传送,而是直接访问内存,加快了程序的效率。同时,它也不像匿名管道那样要求通信的进程有一定的父子关系。
2、缺点:共享内存没有提供同步的机制,这使得我们在使用共享内存进行进程间通信时,往往要借助其他的手段来进行进程间的同步工作。
由于其没有同步互斥的机制,且双方通过共享内存通信无法知晓什么时候写入,什么时候读取,修改了共享内存哪一段内容,因此我结合了信号量,以及消息队列来完成前后置服务器的消息互通
共享内存架构
我将共享内存划分为一个一个的数据块,并在开头加上索引区,通过索引区找到对应的数据,具体实现,是由于共享内存本身是内存,可以通过地址偏移来划分区域。
共享内存类封装
由于要让线程池的任务都访问同一块共享内存,所以我将共享内存封装成单例类,只能初始化一次,而且不同线程访问共享内存需要互斥访问,因此我还加入了一个共享内存锁.
CEnjoymemory.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28#include<sys/types.h> #include<sys/ipc.h> #include<stdio.h> #include <sys/shm.h> #include <sys/msg.h> #include <string.h> #include <pthread.h> #include "protocol.h" #define DATASIZE 50 //共享内存每块的大小 using namespace std; class CEnjoyMemory { //构造函数私有化 CEnjoyMemory(); int shmid; int msggid; int semid; void* shmaddr; //提供静态私有实例化对象指针 static CEnjoyMemory* Instance; public: //提供静态公有接口 static CEnjoyMemory* getInstance(); void write(void* arg,int type); static pthread_mutex_t memorymutex; };
信号量操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61union semun { int val; /* 设置值 */ struct semid_ds* buf; /* Buffer for IPC_STAT, IPC_SET */ unsigned short* array; /* 设置全部 获取全部 */ struct seminfo* __buf; /* Buffer for IPC_INFO (Linux-specific) */ }; typedef struct sendmsgbuf { long mtype; /* message type, must be > 0 */ char mtext[1]; /* message data */ }MSG; int sem_create(key_t key, int nsems) { int res = semget(key, nsems, IPC_CREAT | 0777); if (res < 0) { perror("create error"); } return res; } int sem_setVal(int semid, int semidex, int val) { union semun arg; arg.val = val; int res = semctl(semid, semidex, SETVAL, arg); if (res < 0) { perror("set error"); } return res; } //信号量P操作 上锁-1 int sem_p(int semid, int semidex) { struct sembuf buf = { semidex,-1,SEM_UNDO }; int res = semop(semid, &buf, 1); if (res < 0) { perror("semop p error"); } return res; } //信号量V操作 解锁 +1 int sem_v(int semid, int semidex) { struct sembuf buf = { semidex,1,SEM_UNDO }; int res = semop(semid, &buf, 1); if (res < 0) { perror("semop v error"); } return res; }
CEnjoymemory.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63CEnjoyMemory::CEnjoyMemory() { this->shmid = shmget((key_t)10001, sizeof(int)*100 + DATASIZE * 100, IPC_CREAT | 0777); this->msggid = msgget((key_t)10086, IPC_CREAT | 0777); if ((shmid == -1) && (msggid == -1)) { perror("create error"); } this->semid = sem_create((key_t)10002, 1); sem_setVal(semid, 0, 1); shmaddr = shmat(shmid, NULL, 0); } CEnjoyMemory* CEnjoyMemory::getInstance() { if (CEnjoyMemory::Instance == NULL) { CEnjoyMemory::Instance = new CEnjoyMemory(); } return Instance; } void CEnjoyMemory::write(char* arg) { int flag[100] = { 0 }; int index = 0; MSG buf = { 0 }; cout << "写进程开始" << endl; sem_p(this->semid, 0); //读索引区 memcpy(flag, shmaddr, sizeof(flag)); for (int i = 0; i < (sizeof(flag) / sizeof(int)); i++) { //索引区为0,说明此块没数据 可以写 if (flag[i] == 0) { index = i; break; } } cout << "index=" << index << endl; flag[index] = 1; //先写索引区 memcpy((void*)(shmaddr + index * sizeof(int)), &flag[index], sizeof(int)); memcpy((void*)(shmaddr + sizeof(flag) + DATASIZE * index), arg, DATASIZE); buf.mtype = 1; //将第几块作为消息队列的信息发送,告诉读端读第几块内存 sprintf(buf.mtext, "%d", index); if (msgsnd(msggid, &buf, sizeof(MSG), 0) == -1) { perror("send error"); return; } bzero(&buf, sizeof(MSG)); cout << "写完数据了" << endl; sem_v(semid, 0); //sleep(3); }
线程写入内容部分代码
1
2
3
4
5
6mptr = CEnjoyMemory::getInstance(); //单例共享内存写入 pthread_mutex_lock(&CEnjoyMemory::memorymutex); mptr->write(buf); pthread_mutex_unlock(&CEnjoyMemory::memorymutex);
后置服务器读
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78void Rmemory::run() { int flag[100] = { 0 };//用来读索引区 int index = 0; MSG rbuf = { 0 }; login LG = { 0 }; videoupload VD = { 0 }; videolist VL = { 0 }; videoplay VP = { 0 }; playrecord PR = { 0 }; finder FD = { 0 }; int id = 0; while (1) { cout << "读进程开始" << endl; if (msgrcv(msggid, &rbuf, sizeof(MSG), 1, 0) == -1) { perror("rcv error"); return ; } sem_p(semid, 0); int num = atoi(rbuf.mtext); cout << "读到的下标是" << num << endl; shmaddr = shmat(shmid, NULL, 0); memcpy(&(flag[num]), (void*)(shmaddr + (num * sizeof(int))), sizeof(int)); if (flag[num] == 1) { memcpy(&FD, (void*)(shmaddr + sizeof(flag) + num * DATASIZE), sizeof(FD)); if (FD.type == 1)//登录 { memcpy(&LG, (void*)(shmaddr + sizeof(flag) + num * DATASIZE + sizeof(FD)), sizeof(LG)); loginTask* tk = new loginTask(id++, FD.fd, LG); pool->addTask(tk); } else if (FD.type == 2)//注册 { memcpy(&LG, (void*)(shmaddr + sizeof(flag) + num * DATASIZE + sizeof(FD)), sizeof(LG)); registerTask* tk = new registerTask(id++, FD.fd, LG); pool->addTask(tk); } else if (FD.type == 3)//视频上传 { memcpy(&VD, (void*)(shmaddr + sizeof(flag) + num * DATASIZE + sizeof(FD)), sizeof(VD)); videouploadTask* tk = new videouploadTask(id++, FD.fd, VD); pool->addTask(tk); } else if (FD.type == 4)//视频列表 { memcpy(&VL, (void*)(shmaddr + sizeof(flag) + num * DATASIZE + sizeof(FD)), sizeof(VL)); videolistTask* tk = new videolistTask(id++, FD.fd, VL); pool->addTask(tk); } else if (FD.type == 5)//视频播放 { memcpy(&VP, (void*)(shmaddr + sizeof(flag) + num * DATASIZE + sizeof(FD)), sizeof(VP)); videoplayTask* tk = new videoplayTask(id++, FD.fd, VP); pool->addTask(tk); } else if (FD.type == 6)//视频播放记录上传 { memcpy(&VP, (void*)(shmaddr + sizeof(flag) + num * DATASIZE + sizeof(FD)), sizeof(VP)); videoplayTask* tk = new videoplayTask(id++, FD.fd, VP); pool->addTask(tk); } //清空数据区数据 memset((void*)(shmaddr + sizeof(flag) + num * DATASIZE), 0, DATASIZE); flag[num] = 0; memcpy((void*)(shmaddr + (num * sizeof(int))), &flag[num], sizeof(int)); /*cout << "id= " << LG.name << endl; cout << "passwd= " << LG.passwd << endl;*/ cout << "读完数据" << endl; } //shmdt(shmaddr); sem_v(semid, 0); } }
运行结果
客户端访问服务器做登录业务
最后
以上就是体贴小鸽子最近收集整理的关于360行车记录仪项目开发日志(服务端开发)(二)前后置服务器通信的全部内容,更多相关360行车记录仪项目开发日志(服务端开发)(二)前后置服务器通信内容请搜索靠谱客的其他文章。
发表评论 取消回复