进程间通信的几种方式及基本实现

进程间通信(又叫InterProcess Communication, IPC)。一般来说,进程间通信方式有6种:管道、命名管道(FIFO)、消息队列、信号量、共享存储以及套接字。

管道

管道只能在具有公共祖先的两个进程间使用,通常用于父子进程间通信。历史上的管道是半双工的,也就是数据只能在一个方向上流动。

进程先调用pipe(fd),创建管道; 接着,调用fork,形成父子进程;

fork之后,具体数据流向可以灵活设定。其中,fd[0]表示读端,fd[1]表示写端,具体而言,

  • 数据从 父进程 流到 子进程 ——父进程关闭fd[0],子进程关闭fd[1];
  • 数据从子进程流到父进程——父进程关闭fd[1],子进程关闭fd[0]。

管道通信实现,[https://github.com/qingdujun/basic/blob/master/fork/pipe.cpp]

#include <iostream>
#include <unistd.h> //fork, pipe
#include <string>
using namespace std;

int main(int argc, char const *argv[]) {
int fd[2];
int ret = pipe(fd);
if (ret < 0) {
cerr << "pipe error" << endl;
}
pid_t pid = fork();
//you can exchange, let pid > 0 in here
if (pid == 0) { //child process
close(fd[0]);
const char* hello = "hello, pipe!";
for (int i = 0; i < 5; ++i) {
write(fd[1], hello, strlen(hello) + 1);
sleep(1);
}
close(fd[1]);
//and here, you can let pid == 0
} else if (pid > 0) { //father process
close(fd[1]);
char buffer[128];
for (int i = 0; i < 5; ++i) {
memset(buffer, 0, sizeof(buffer));
ssize_t length = read(fd[0], buffer, sizeof(buffer));
if (length > 0) {
buffer[length - 1] = '\0';
}
cout << buffer << endl;
}
close(fd[0]);
} else {
cerr << "fork error" << endl;
}

return 0;
}

命名管道(FIFO)

FIFO(First in, First out)有时候也叫命名管道,它是一种特殊的文件类型,在文件系统中有对应的路径。

通过FIFO,不相关的进程也能交换数据。以下两种用途,

  • shell命令使用FIFO将数据从一条管道传送到另一条管道时,无需创建中间临时文件;
  • 客户进程-服务进程应用程序中,FIFO用作汇聚点,在客户端进程和服务端进程之间传递数据。

消息队列

消息队列是消息的链接表,存储在内核中,由消息队列标识符标识。原来诞生的目的是提供一种高于一般速度的IPC,但是现阶段和其他方式相比,速度方面已经没什么差别了。考虑到使用消息队列的遇到的问题,

  • 没有引用计数,不会自行删除;
  • 不使用文件系统命名空间,并且使用长整型key_t类型作为标识,经常被诟病。

而支持消息队列着说的优点是:它们可靠的、流控制以及面向记录,它们可以用非FIFO次序处理。

但是,事实上它需要获取消息标识符,大部分人认为它并不是无连接的。其次,消息队列是一台主机上的,不牵涉到可靠性。至于流控制,套接字表现的很优秀。

所以,得出的结论是:在新的应用程序中不应当再使用它们。

信号量

它是一个计数器,用于为多个进程提供对共享数据对象的访问。大名鼎鼎的PV操作就是由此而来。

信号量通常是在内核中实现的。二元信号量它控制单个资源,其初始值为1。但是,一般而言,信号量的初值可以是任意正值,该值表明有多少个共享资源单位可共享应用。

PV操作实现,[https://github.com/qingdujun/basic/blob/master/fork/pv.cpp]

  • 实现简单的生产者,消费者;
  • 在semun中,指定 生产者信号量的初始值为1 和消费者为0。
#include <sys/sem.h>//semun, semop, semget, semctl
#include <unistd.h> //fork
#include <iostream>
using namespace std;

void p(int& sem_id) {
struct sembuf sem_buffer = {0, -1, SEM_UNDO};
if (semop(sem_id, &sem_buffer, 1) < 0) {
cerr << "semop error" << endl;
exit(0);
}
}

void v(int& sem_id) {
struct sembuf sem_buffer = {0, 1, SEM_UNDO};
if (semop(sem_id, &sem_buffer, 1) < 0) {
cerr << "semop error" << endl;
exit(0);
}
}

int main(int argc, char const *argv[]) {
int producer = semget((key_t)2019, 1, 0666 | IPC_CREAT);
int consumer = semget((key_t)2020, 1, 0666 | IPC_CREAT);
union semun producer_init_value = {1}, consumer_init_value = {0};
if (semctl(producer, 0, SETVAL, producer_init_value) < 0 || semctl(consumer, 0, SETVAL, consumer_init_value) < 0) {
cerr << "semctl error" << endl;
exit(0);
}

pid_t pid = fork();
if (pid == 0) { //child process
for (int i = 0; i < 5; ++i) {
p(consumer);
cout << "eating apple" << endl;
v(producer);
}
} else if (pid > 0) { //father process
for (int i = 0; i < 5; ++i) {
p(producer);
cout << "making apple" << endl;
v(consumer);
}
} else {
cerr << "fork error" << endl;
}
wait(nullptr);

return 0;
}

共享存储

它是最快的一种IPC。共享存储允许两个或多个进程共享一个给定的存储区,数据不需要在客户进程和服务进程之间复制。

文件映射 将一个文件的指定区域映射进内存,作为共享区域。好处是内容会被存在文件当中,即使重启电脑也不会丢失。但是也有坏处,映射区域有限制——不能超过文件大小。

另外还有一点需要注意,映射都是整页分配的,如果文件大小为2.6页,那么会分配3个页。在内存中数据交换可以使用3个页面大小,但是超出文件大小部分数据不会被存入文件。关机后会丢失。同时需要注意,一旦访问超过3页,将会出错。

文件映射实现,[https://github.com/qingdujun/basic/blob/master/fork/mmap-file.cpp]

#include <iostream>
#include <unistd.h>
#include <sys/mman.h> //PROT_READ
#include <fcntl.h> //O_CREAT
#include <string>
using namespace std;

typedef struct {
char name[4];
int age;
} people;

int main(int argc, char const *argv[]) {
int fd = open("mmap.txt", O_CREAT | O_RDWR | O_TRUNC, 00777);
if (fd < 0) {
cerr << "open error" << endl;
exit(0);
}
lseek(fd, sizeof(people)*5-1, SEEK_SET);
write(fd, " ", 1); //set file length
people* p_map = (people*)mmap(nullptr, sizeof(people) * 10, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
close(fd);
if (p_map == nullptr) {
cerr << "mmap error" << endl;
exit(0);
}

pid_t pid = fork();
if (pid == 0) { //child process
for (int i = 0; i < 10; ++i) {
cout << "name: " << (*(p_map+i)).name << " age: "<<(*(p_map+i)).age<<endl;
}
munmap(p_map, sizeof(people) * 10);
} else if (pid > 0) { //father process
char c = 'a';
for (int i = 0; i < 10; ++i) {
c += 1;
memcpy((*(p_map+i)).name, &c, 2);
(*(p_map+i)).age = 20+i;
}
munmap(p_map, sizeof(people)*10);
} else {
cerr << "fork error" << endl;
}
wait(nullptr);

return 0;
}

匿名映射 匿名映射可以不显示指定文件名,只需要在调用mmap时指定MAP_ANON标志,并将文件描述符指定为-1。好处就是不存在文件大小的限制。坏处就是一旦关机,数据全部丢失。

匿名映射实现,[https://github.com/qingdujun/basic/blob/master/fork/mmap-1.cpp]

#include <fcntl.h>
#include <sys/mman.h>
#include <unistd.h>
#include <iostream>
using namespace std;

typedef struct {
char name[4];
int age;
} people;

int main(int argc, char const * argv[]) {//big memory
people* p_map = (people*)mmap(nullptr, sizeof(people) * 10000000, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0);
if (p_map == nullptr) {
cerr << "mmap error" << endl;
exit(0);
}
pid_t pid = fork();
if (pid == 0) {
sleep(2);
for (int i = 0; i < 5; i++) {
cout << (*(p_map + i)).age << endl;
}
(*(p_map+10000000)).age = 100;
munmap(p_map, sizeof(people) * 10);
} else if (pid > 0) {
char c = 'a';
for (int i = 0; i < 5; i++) {
c += 1;
memcpy((*(p_map + i)).name, &c, 2);
(*(p_map + i)).age = 20 + i;
}
sleep(5);
cout << (*(p_map+10000000)).age << endl;
munmap(p_map, sizeof(people) * 10);
} else {
cerr << "fork error" << endl;
}
wait(nullptr);

return 0;
}

系统V共享存储 系统调用mmap()通过映射一个普通文件实现共享内存。系统V则是通过映射特殊文件系统shm中的文件实现进程间的共享内存通信。也就是说,每个共享内存区域对应特殊文件系统shm中的一个文件。

一些要点及与mmap区别记录,

  • 系统V共享内存中的数据,从来不写入到实际磁盘文件中去;
  • 系统V共享内存是随内核持续的,即使所有访问共享内存的进程都已经正常终止,共享内存区仍然存在,除非重启(将丢失);
  • 通过调用mmap()映射普通文件进行进程间通信时,一定要注意考虑进程何时终止对通信的影响。而通过系统V共享内存实现通信的进程则不然。

系统V共享存储实现,[https://github.com/qingdujun/basic/blob/master/fork/shm.cpp]

#include <sys/ipc.h>
#include <sys/shm.h>
#include <unistd.h>
#include <iostream>
using namespace std;

typedef struct {
char name[4];
int age;
} people;

int main(int argc, char const* argv[]) {
int shm_id = shmget((key_t)2022, 4096, IPC_CREAT | 0666);
if (shm_id < 0) {
cerr << "shmget error" << endl;
exit(0);
}
people* p_map = (people*)shmat(shm_id, nullptr, 0);
if (p_map == (people*)-1) {
cerr << "shamt error" << endl;
exit(0);
}
pid_t pid = fork();
if (pid == 0) {
sleep(2);
for (int i = 0; i < 10; ++i) {
cout << "name: "<<(*(p_map+i)).name <<" age: " << (*(p_map+i)).age <<endl;
}
if (shmdt(p_map) < 0) {
cout << "shmdt error" << endl;
}
} else if (pid > 0) {
char c = 'a';
for (int i = 0; i < 10; ++i) {
c += 1;
memcpy((*(p_map + i)).name, &c, 2);
(*(p_map + i)).age = 20 + i;
}
if (shmdt(p_map) < 0) {
cerr << "shmdt error" << endl;
}
} else {
cerr << "fork error" << endl;
}
wait(nullptr);

return 0;
}

套接字

该部分,请参考我撰写的这几篇文章:《Linux网络篇:独有的I/O多路复用模型epoll(适用于大规模fd)》《Unix网络篇(4)与epoll类似的kqueue模型——Unix-like系统》《Unix网络篇(3)poll模型突破fd上限》《Unix网络篇(2)使用select复用I/O》以及《Unix网络篇(1)一个典型的TCP Socket通信例子》


References:

[1] W.Richard Stevens, 《UNIX环境高级编程(第三版)》

[2] IBM 郑彦兴,Linux环境进程间通信(五)内存共享(上)

[3] IBM 郑彦兴,Linux环境进程间通信(五)内存共享(下)