概述
linux进程间/线程间通讯
linux下的进程通信手段基本上是从Unix平台上的进程通信手段继承而来的。而对Unix发展做出重大贡献的两大主力AT&T的贝尔实验室及BSD(加州大学伯克利分校的伯克利软件发布中心)在进程间通信方面的侧重点有所不同。前者对Unix早期的进程间通信手段进行了系统的改进和扩充,形成了“system V IPC”,通信进程局限在单个计算机内;后者则跳过了该限制,形成了基于套接口(socket)的进程间通信机制。Linux则把两者继承了下来,如图示
Posix,全称“Portable operating System Interface”,可移植操作系统接口。 是IEEE开发的一套编码标准, 它已经是ISO/IEC采纳的国际标准。
一般System V IPC,都需要 ftok() 使用路径名生成key值, 而Posix IPC 直接使用路径名,且Posix IPC接口函数里面都有 "_" 连接符, 例如: mq_open/sem_open() 等。
linux下进程间通信的几种主要手段简介:
- 管道(Pipe)及有名管道(named pipe):管道可用于具有亲缘关系进程间的通信,管道只能承载无格式字节流。
- 有名管道克服了管道没有名字的限制,因此,除具有管道所具有的功能外,它还允许无亲缘关系进程间的通信;
- 信号(Signal):信号是比较复杂的通信方式,用于通知接受进程有某种事件发生,除了用于进程间通信外,进程还可以发送信号给进程本身;linux除了支持Unix早期信号语义函数sigal外,还支持语义符合Posix.1标准的信号函数sigaction(实际上,该函数是基于BSD的,BSD为了实现可靠信号机制,又能够统一对外接口,用sigaction函数重新实现了signal函数);
- 报文(Message)队列(消息队列):消息队列是消息的链接表,包括Posix消息队列system V消息队列。有足够权限的进程可以向队列中添加消息,被赋予读权限的进程则可以读走队列中的消息。消息队列克服了信号承载信息量少,管道只能承载无格式字节流以及缓冲区大小受限等缺点。
- 信号量(semaphore):主要作为进程间以及同一进程不同线程之间的同步手段。
- 共享内存:使得多个进程可以访问同一块内存空间,是最快的可用IPC形式。是针对其他通信机制运行效率较低而设计的。往往与其它通信机制,如信号量结合使用,来达到进程间的同步及互斥。
- 套接口(Socket):更为一般的进程间通信机制,可用于不同机器之间的进程间通信。起初是由Unix系统的BSD分支开发出来的,但现在一般可以移植到其它类Unix系统上:Linux和System V的变种都支持套接字。
1.管道
管道的主要局限性正体现在它的特点上:只支持单向数据流;
只能用于具有亲缘关系的进程之间;
没有名字;
管道的缓冲区是有限的(管道制存在于内存中,在管道创建时,为缓冲区分配一个页面大小);
管道所传送的是无格式字节流,这就要求管道的读出方和写入方必须事先约定好数据的格式,比如多少字节算作一个消息(或命令、或记录)等等;
接口:
pipe()/ fork()
2. 有名管道
FIFO可以说是管道的推广,克服了管道无名字的限制,使得无亲缘关系的进程同样可以采用先进先出的通信机制进行通信。FIFO的打开规则:
如果当前打开操作是为读而打开FIFO时,若已经有相应进程为写而打开该FIFO,则当前打开操作将成功返回;否则,可能阻塞直到有相应进程为写而打开该FIFO(当前打开操作设置了阻塞标志);或者,成功返回(当前打开操作没有设置阻塞标志)。
如果当前打开操作是为写而打开FIFO时,如果已经有相应进程为读而打开该FIFO,则当前打开操作将成功返回;否则,可能阻塞直到有相应进程为读而打开该FIFO(当前打开操作设置了阻塞标志);或者,返回ENXIO错误(当前打开操作没有设置阻塞标志)。
注意点:
不管写打开的阻塞标志是否设置,在请求写入的字节数大于4096时,都不保证写入的原子性。但二者有本质区别:
对于阻塞写来说,写操作在写满FIFO的空闲区域后,会一直等待,直到写完所有数据为止,请求写入的数据最终都会写入FIFO;
而非阻塞写则在写满FIFO的空闲区域后,就返回(实际写入的字节数),所以有些数据最终不能够写入。
接口:
#include <sys/types.h>
#include <sys/stat.h>
int mkfifo(const char * pathname, mode_t mode) //pathname 是一个普通的unix路径名,例如 /home/sundh/111.txt; 不同于,posix接口要求的路径名里面不能出现两次“/” 符号。
int open(const char *path, int oflag, ... ); //oflag 模式是BLOCK的, 如果显示的加上O_NONBLOCK,则不会出现open()阻塞等待另外一个线程调用open()函数。
int close()
int unlink(const char *path); //对FIFO文件进行删除
区别:
有名管道是作为一个特殊的设备文件存在于磁盘当中,而管道存在于内存当中,通信结束后,有名管道的文件本身依然存在(除非调用unlink()函数对fifo文件进行删除),但是管道已经释放了。
有名管道与文件也是有区别的,文件的话,当读取其中的内容之后,信息依然存在,但是有名管道中,通信结束之后,信息就会丢失。
3. 信号
kill -l 可以看到系统支持哪些信号。
信号是在软件层次上对中断机制的一种模拟,在原理上,一个进程收到一个信号与处理器收到一个中断请求可以说是一样的。
信号是异步的,一个进程不必通过任何操作来等待信号的到达,事实上,进程也不知道信号到底什么时候到达。
信号是进程间通信机制中唯一的异步通信机制,可以看作是异步通知,通知接收信号的进程有哪些事情发生了。
信号来源:
信号事件的发生有两个来源:硬件来源(比如我们按下了键盘或者其它硬件故障);
软件来源,最常用发送信号的系统函数是kill, raise, about, alarm和setitimer以及sigqueue函数,软件来源还包括一些非法运算等操作。
信号值小于SIGRTMIN的信号都是不可靠信息,可能会丢失(不支持排队,即不允许队列中有多个相同的信息,例如如果进程队列中有SIGINT,再来一个SIGINT消息,就不会插入到队列中了);
信号值位于SIGRTMIN和SIGRTMAX之间的信号都是可靠信号,它们支持排队,且允许队列中有多个相同的信息。
函数接口:
Linux在支持新版本的信号安装函数sigation()以及信号发送函数sigqueue()的同时,仍然支持早期的signal()信号安装函数,支持信号发送函数kill()。
最常用发送信号的系统函数是kill, raise, about, alarm和setitimer以及sigqueue函数,软件来源还包括一些非法运算等操作。
4. 消息队列
4.1 System V
消息队列就是一个消息的链表。可以把消息看作一个记录,具有特定的格式以及特定的优先级。对消息队列有写权限的进程可以向中按照一定的规则添加新消息;
对消息队列有读权限的进程则可以从消息队列中读走消息。消息队列是随内核持续的。函数接口:
#include <sys/types.h>
#include <sys/ipc.h>
key_t ftok (char*pathname, char proj); //convert a pathname and a project identifier to a System V IPC key. 如果pathname所指的文件不存在,则放回-1. msgget(-1, IPC_CREAT) 这个函数会随机返回一个唯一值
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgget(key_t key, int msgflg) //将创建一个新的消息队列或取得一个已有信号量
int msgrcv(int msqid, struct msgbuf *msgp, int msgsz, long msgtyp, int msgflg); //读取一个消息,并把消息存储在msgp指向的msgbuf结构中。
int msgsnd(int msqid, struct msgbuf *msgp, int msgsz, int msgflg); //向msgid代表的消息队列发送一个消息,即将发送的消息存储在msgp指向的msgbuf结构中,消息的大小由msgze指定。
int msgctl(int msqid, int cmd, struct msqid_ds *buf); //该系统调用对由msqid标识的消息队列执行cmd操作,共有三种cmd操作:IPC_STAT、IPC_SET 、IPC_RMID。
比较:
消息队列与管道以及有名管道相比,具有更大的灵活性,首先,它提供有格式字节流,有利于减少开发人员的工作量;其次,消息具有类型,在实际应用中,可作为优先级使用。这两点是管道以及有名管道所不能比的。
同样,消息队列可以在几个进程间复用,而不管这几个进程是否具有亲缘关系,这一点与有名管道很相似;但消息队列是随内核持续的,与有名管道(随进程持续)相比,生命力更强,应用空间更大。
4.2 posix 消息队列
mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);
//name 字符串格式有固定的要求 “/somename". 必须以/开始,somename里面不允许再有/符号。
//POSIX 消息队列生命期是内核生命期的;如果没有使用mq_unlink(3) 删除的话,一个消息队列会一直存在,直到系统关闭。
//创建的消息队列在一个虚拟的文件系统里,我们可以把挂载这个虚拟的消息队列文件系统在 Linux 系统中。 这样就可以看到const char *name 文件夹。
# mkdir /dev/mqueue
# mount -t mqueue none /dev/mqueue
mq_open.c 代码如下:
1 #include <unistd.h>
2 #include <fcntl.h> /* For O_* constants */
3 #include <sys/stat.h> /* For mode constants */
4 #include <mqueue.h>
5 #include <stdio.h>
6 #include <errno.h>
7
8 #define FILE_MODE (S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH)
9
10 int main(int argc, char**argv)
11 {
12 int c, flags;
13 mqd_t mqd;
14
15 printf("*************argv[0]:%s, %sn", argv[0], argv[1]);
16 flags = O_RDWR|O_CREAT;
17 while( (c=getopt(argc, argv, "e"))!= -1)
18 {
19 if(c =='e')
20 {
21 printf("have option e n");
22 flags = flags|O_EXCL;
23 break;
24 }
25 break;
26 }
27
28 mqd = mq_open(argv[optind], flags, FILE_MODE, NULL);
29 printf("*************argv[optind]:%s, %dn", argv[optind], optind);
30 if(mqd == -1)
31 printf("create error n");
32 printf(" %dn", errno);
33 perror("result:");
34 mq_close(mqd);
35 return 0;
36
37 }
运行这个可执行文件 ./mq_open /test, 执行 ls /dev/mqueue就可以在看到 test文件。
sundh@linux:~/temp$ ls /dev/mqueue
test
sundh@linux:~/temp$ cat /dev/mqueue/test
QSIZE:0 NOTIFY:0 SIGNO:0 NOTIFY_PID:0
int mq_close(mqd_t mqdes);
int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio);
ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio);
int mq_unlink(const char *name);
5. 信号量
5.1 Posix
Posix 信号量分为两种: 有名信号量和基于内存的信号量(也就是无名信号量)。
先看有名信号量:
sem_t *sem_open(const char *name, int oflag); ://creates a new POSIX semaphore or opens an existingsemaphore. 创建一个有名的信号量,它可以用于线程也可以用于进程间的同步。name变量不同于ftok (char*pathname, …)中的pathname, 只要两个进程的name字符串是一致且只有第一个字符是/,不能出现两个/字符,就可以使用这个信号量进行同步。 (对于POSIX信号量和共享内存的name参数,会在/dev/shm下建立对应的路径名。 可参考 http://blog.csdn.net/anonymalias/article/details/9938865)
sem_close() : 只是关闭信号量,并未从系统中删除
sem_unlink(): 删除该信号量
sem_close() 和 sem_unlink()并不是一定要先调用sem_close(),然后调用sem_unlink(), 没有这样的调用要求. 可以先sem_unlink()从系统内核中删除该信号量的名字,既从/dev/shm 中删除该信号量对应的文件, 然后还能调用sem_post() 和sem_wait()进行信号量操作,最后调用 sem_close(). 可参考 后面第6章的例子1。
《unix网络编程-进程间通信》这样解释sem_close() 和 sem_unlink():
sem_close()不是强制要求调用的,进程退出时,如果有信号量被这个进程打开,没有被关闭,退出时会自动关闭该信号量。
sem_wait()/sem_trywait():当所指定的信号量的值为0时,后者并不将调用者投入睡眠,而是立刻返回EAGAIN,即重试。
按照功能来分有两种, 二进制信号量和记数信号量。
//process 1
#include <iostream>
#include <cstring>
#include <errno.h>
#include <unistd.h>
#include <fcntl.h>
#include <semaphore.h>
#include <sys/mman.h>
using namespace std;
#define SHM_NAME "/memmap"
#define SHM_NAME_SEM "/memmap_sem"
char sharedMem[10];
int main()
{
int fd;
sem_t *sem;
fd = shm_open(SHM_NAME, O_RDWR | O_CREAT, 0666);
sem = sem_open(SHM_NAME_SEM, O_CREAT, 0666, 0);
if (fd < 0 || sem == SEM_FAILED)
{
cout<<"shm_open or sem_open failed...";
cout<<strerror(errno)<<endl;
return -1;
}
ftruncate(fd, sizeof(sharedMem));
char *memPtr;
memPtr = (char *)mmap(NULL, sizeof(sharedMem), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
close(fd);
char msg[] = "yuki...";
memmove(memPtr, msg, sizeof(msg));
cout<<"process:"<<getpid()<<" send:"<<memPtr<<endl;
sem_post(sem);
sem_close(sem);
return 0;
}
//process 2
#include <iostream>
#include <cstring>
#include <errno.h>
#include <unistd.h>
#include <fcntl.h>
#include <semaphore.h>
#include <sys/mman.h>
using namespace std;
#define SHM_NAME "/memmap"
#define SHM_NAME_SEM "/memmap_sem"
int main()
{
int fd;
sem_t *sem;
fd = shm_open(SHM_NAME, O_RDWR, 0);
sem = sem_open(SHM_NAME_SEM, 0);
if (fd < 0 || sem == SEM_FAILED)
{
cout<<"shm_open or sem_open failed...";
cout<<strerror(errno)<<endl;
return -1;
}
struct stat fileStat;
fstat(fd, &fileStat);
char *memPtr;
memPtr = (char *)mmap(NULL, fileStat.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
close(fd);
sem_wait(sem);
cout<<"process:"<<getpid()<<" recv:"<<memPtr<<endl;
sem_close(sem);
return 0;
}
程序执行结果:
# ./send
process:13719 send:yuki...
# ./recv
process:13720 recv:yuki...
对于POSIX信号量和共享内存的名字会在/dev/shm下建立对应的路径名,例如上面的测试代码,会生成如下的路径名:
# ll /dev/shm/
total 8
-rw-r--r-- 1 root root 10 Aug 13 00:28 memmap
-rw-r--r-- 1 root root 32 Aug 13 00:28 sem.memmap_sem
5.1.1 有名信号量
sem_open() 用于创建一个新的有名信号量或者打开一个已经存在的有名信号量,它可用于线程间同步或者进程间同步。 有名信号量是由内核维护的。不需要程序分配信号量的内存空间。
一般主线程或者主进程调用 sem_t *sem_open(const char *name, int oflag, mode_t mode, unsigned int value) 来创建一个新的信号量, mode_t mode 设置为O_CREAT。另外一个线程或者经常使用sem_t *sem_open(const char *name, int oflag)来打开一个已经存在的信号量。
5.1.2 无名信号量
即基于内存的信号量,随进程结束也释放。 它是有进程分配信号量的内存空间,然后调用sem_init初始化。
sem_t temp; //如果这个temp是分配在共享内存区,则可以用于进程间通信;否则只能用于线程间通信。
int sem_init(sem_t *sem, int pshared, unsigned int value); //pshared为0, 线程间通信; 不为0时,sem_t temp必须在共享内存分配内存空间。
int sem_destroy(sem_t *sem);
很少有人会使用无名信号量用于进程间通信,所以也就不细说了。 用的较多的方法是先声明一个结构体:
struct shareMemory { sem_t sem; char *buffer[MAX]}; 在共享内存区上面分配一个这样大小的内存,然后调用 sem_init(&shareMemory.sem, 1, 1). 这里需要注意的是sem_init()只能被调用一次。对一个已经初始化的信号量调用sem_init(),其结果是未定义的。 也就是说 主线程或者主进程调用 sem_init()后,另外的线程或者进程不允许再次调用sem_init(),这个另外的线程或者进程可以直接使用 shareMemory.sem进行同步。
如果pshared 的值为0,并且sem_t 这个信号量是对所有线程都可见,例如:声明为全局变量,或者用new/malloc 分配的堆内存,或者信号量是局部变量,通过需要通过pthread_create(pthread_t* thread, pthread_attr_t, void *(*Func), void *arg)函数中的 void* arg形参在创建新线程的时候传入,这时这个信号量就可以线程间同步了。
- /*
- * simple_sem_app.c
- */
- #include "all.h"
- /* 每个字符输出的间隔时间 */
- #define TEN_MILLION 5000000L
- #define BUFSIZE 1024
- void *threadout(void *args);
- int main(int argc, char *argv[])
- {
- int error;
- int i;
- int n;
- sem_t semlock;
- pthread_t *tids;
- if (argc != 2) {
- fprintf (stderr, "Usage: %s numthreadsn", argv[0]);
- return 1;
- }
- n = atoi(argv[1]);
- tids = (pthread_t *)calloc(n, sizeof(pthread_t));
- if (tids == NULL) {
- perror("Failed to allocate memory for thread IDs");
- return 1;
- }
- if (sem_init(&semlock, 0, 1) == -1) {
- perror("Failed to initialize semaphore");
- return 1;
- }
- for (i = 0; i < n; i++) {
- if (error = pthread_create(tids + i, NULL, threadout, &semlock)) {
- fprintf(stderr, "Failed to create thread:%sn", strerror(error));
- return 1;
- }
- }
- for (i = 0; i < n; i++) {
- if (error = pthread_join(tids[i], NULL)) {
- fprintf(stderr, "Failed to join thread:%sn", strerror(error));
- return 1;
- }
- }
- return 0;
- }
- void *threadout(void *args)
- {
- char buffer[BUFSIZE];
- char *c;
- sem_t *semlockp;
- struct timespec sleeptime;
- semlockp = (sem_t *)args;
- sleeptime.tv_sec = 0;
- sleeptime.tv_nsec = TEN_MILLION;
- snprintf(buffer, BUFSIZE, "This is thread from process %ldn",
- (long)getpid());
- c = buffer;
- /****************** entry section *******************************/
- while (sem_wait(semlockp) == -1)
- if(errno != EINTR) {
- fprintf(stderr, "Thread failed to lock semaphoren");
- return NULL;
- }
- /****************** start of critical section *******************/
- while (*c != '