「前言」本文的大致内容是对进程间通信的基本介绍以及管道的介绍。

一、进程间通信介绍

1.1 进程间通信概念

进程间通信就是在不同进程之间传播或交换信息,进程间通信简称IPC(Interprocess communication)

1.2  为什么要有进程间通信

为什么要有进程间通信??

有时候我们是需要多进程协同的,去完成某种业务

1.3 进程间通信目的

  1. 数据传输:一个进程需要将它的数据发送给另一个进程
  2. 资源共享:多个进程之间共享同样的资源
  3. 通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程)
  4. 进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变

1.4 进程间通信分类

(1)管道

  • 匿名管道
  • 命名管道

(2)System V IPC

  • System V 消息队列
  • System V 共享内存
  • System V 信号量

(3)POSIX IPC

  • 消息队列
  • 共享内存
  • 信号量
  • 互斥量
  • 条件变量
  • 读写锁

管道:管道是基于文件系统的。System V IPC:聚焦在本地通信。POSIX IPC:让通信可以跨主机

1.5 进程间通信的本质

进程间通信的本质就是:让不同的进程看到同一份资源

两个进程间想要通信,就必须提供某一个资源,这个资源用于给两个进程之间进行通信。这个资源不能是进程的双方提供的,因为进程是具有独立性的,一个进程提供了资源,进行通信另一个进程必定会访问这个资源,这时就破坏了进程的独立性

因此,这个资源只能由第三方提供,这个第三方就是OS,OS需要直接或间接给通信双方的进程提供 “内存空间”

这个资源可以是OS中不同的模块提供,不同的模块提供的不同资源,造就了不同的通信种类(消息队列,共享内存,信号量...),因此出现了不同的通信方式

所以,进程间想要通信,首先要看到同一份资源,看到同一份资源才会有通信

二、管道

2.1 什么是管道

管道是Unix中最古老的进程间通信的形式,我们把从一个进程连接到另一个进程的一个数据流称为一个 “管道”

比如,我们执行的这条 cat file | grep hello 命令,其中 “|” 就是管道

其中,cat 命令和 grep 命令都是两个程序,当它们运行起来后就变成了两个进程,cat进程的数据传输到 “管道” 当中,grep进程再通过 “管道” 当中读取数据,至此便完成了数据的传输,两个进程就完成了通信

管道又分匿名管道和命名管道

2.2 匿名管道

匿名管道用于进程间通信,且仅限于本地父子进程之间通信

2.2.1 pipe函数

pipe函数用于创建匿名管道,man查看pipepipe函数是一个系统调用

man 2 pipe

pipe
头文件:#include <unistd.h>

函数原型
int pipe(int pipefd[2]);

返回值
成功时返回0,调用失败时返回-1且错误码被设置

pipe函数的参数pipefd[2] 是一个输出型参数,数组pipefd用于返回两个指向管道读端和写端的文件描述符

  • pipefd[0]是管道读端的文件描述符
  • pipef[1]是管道写端的文件描述符

帮助记忆:0可以想象成嘴(读),1可以想象成笔(写) 

因为匿名管道仅用于父子进程间通信,所以要使用匿名管道就要使用fork函数

2.2.2 匿名管道的原理

匿名管道用于进程间通信,且仅限于本地父子进程之间通信

  • 进程间通信的本质就是,让不同的进程看到同一份资源,使用匿名管道实现父子进程间通信的原理就是,让两个父子进程先看到同一份被打开的文件资源,然后父子进程就可以对该文件进行写入或是读取操作,进而实现父子进程间通信
  • 该文件资源是文件系统提供的,该文件资源就是匿名管道,该文件资源的操作方法与文件一致,也有自己的文件缓冲区

注意:父子进程对该文件进行写入操作时,该文件缓冲区当中的数据不会发生写时拷贝,该文件资源由文件系统维护

2.2.3 匿名管道的使用

管道只能单向通信,不能双向通信

比如,一端是写入了,另一端就必须是读取,反过来也是,一端进行读取,另一端必须进行写入

(1)父进程调用pipe函数创建管道

(2)父进程进行创建子进程

(3)父进程需要读取,父进程就需要关闭写端,子进程进行写入,子进程需要关闭读端

(4)父进程需要写入,父进程就需要关闭读端,子进程进行读取,子进程需要关写读端

注意:管道是单向通信的

2.2.4 以文件描述符的角度看待

站在文件描述符的角度看待匿名管道:

(1)父进程调用pipe函数创建管道

(2)父进程进行创建子进程(右边是子进程,写错了)

 (3)父进程需要读取,父进程就需要关闭写端,子进程进行写入,子进程需要关闭读端(右边是子进程,写错了)

(4)父进程需要写入,父进程就需要关闭读端,子进程进行读取,子进程需要关写读端(右边是子进程,写错了)

2.2.5 匿名管道测试代码

以子进程写入,父进程读取为例

#include <iostream>
#include <unistd.h>
#include <cstdio>
#include <cassert>
#include <cstring> 
#include <sys/types.h>
#include <sys/wait.h>
using namespace std;

//子进程写入,父进程读取

int main()
{
     // 第一步:创建管道文件,打开读写端
    int fds[2];
    int n = pipe(fds);
    assert(n == 0);//否则创建管道失败,直接断言

    //创建子进程
    pid_t id = fork();
    assert(id >= 0);//否则创建子进程失败

    //子进程通信代码--子进程写入
    if(id == 0)
    {
        //关闭读端,写端打开
        close(fds[0]);
        const char* s = "我是子进程,我正在给你发消息";
        int cnt = 0;
        while(true)
        {
            ++cnt;
            char buffer[1024];//只能在子进程看到
            snprintf(buffer, sizeof buffer, "child -> parent say: %s[%d][子进程pid:%d]", s, cnt, getpid());
            write(fds[1], buffer, strlen(buffer));
            sleep(3);

            if(cnt >= 10)
                break;
        }

        close(fds[1]);
        cout << "子进程关闭自己的写端" << endl;
        exit(0);
    }

    //父进程通信代码--父进程读取
    close(fds[1]);
    while(true)
    {
        sleep(1);
        char buffer[1024];
        ssize_t s = read(fds[0], buffer, sizeof(buffer)-1);
        if(s > 0)//读取到数据
        {
            buffer[s] = '\0';//防止越界
            cout << "Get Message# " << buffer << " | 父进程pid: " << getpid() << endl;
        }
        else if(s == 0) //读到文件结尾
        {
            cout << "父进程读取完成" << endl;
            break;
        }
    }

    close(fds[0]);
    cout << "父进程的读端关闭" << endl;

    //等待子进程
    int status = 0;
    n = waitpid(id, &status, 0);
    cout << "等待子进程pid->" << n << " : 退出信号:" << (status & 0x7F) << endl;

    return 0;
}

运行结果

2.2.6 匿名管道读写规则

  1. 读快,写慢。如果管道中没有数据,读端进程再进行读取,会阻塞当前正在读取的进程;如果写端不进行写入,读端进程会一直阻塞
  2. 读慢,写快。如果写端把管道写满了,再写就会对该进程进行阻塞,需要等待对方对管道内数据进行读取;如果读端不读取数据,写端进程会一直阻塞
  3. 写关闭,读取到0。如果写入进程关闭了写入fd,读取端将管道内的数据读完后,程序结束
  4. **读关闭,写?**如果读关闭,操作系统会给写端发送13号信号SIGPIPE,终止写端

(1)读快,写慢

上面代码是读快,写慢这种情况

(2)读慢,写快

修改代码,修改sleep时间即可

运行结果

(3)写关闭,读取到0 

写入一条消息,直接关闭写端

运行结果

(4)读关闭,写?

读一次,直接把读端关闭 

运行结果

2.2.7 匿名管道的特征

  1. 只能用于具有共同祖先的进程(具有亲缘关系的进程)之间进行通信;通常,一个管道由一个进程创建,然后该进程调用fork,此后父、子进程之间就可应用该管道
  2. 管道提供流式服务(网络)
  3. 一般而言,进程退出,管道释放,所以管道的生命周期随进程
  4. 一般而言,内核会对管道操作进行同步与互斥(多线程)
  5. 管道是半双工的,数据只能向一个方向流动;需要双方通信时,需要建立起两个管道

2.2.8 基于匿名管道的进程池

实现思路:父进程控制写端进行写入,子进程进行读取,读取命令码后执行相应的任务,父进程创建多个子进程

代码: 

#include <iostream>
#include <string>
#include <vector>
#include <ctime>
#include <cassert>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>

using namespace std;

#define makeSeed() srand((unsigned long)time(nullptr) ^ getpid() ^ 0x363 ^ rand() % 1234)
#define PROCESS_SUM 10
typedef void (*func_t)();//函数指针类型

//--------------------------------  模拟一下子进程要完成的某种任务  --------------------- 
void downloadTask()
{
    cout << getpid() << "执行下载任务\n" << endl;
    sleep(1);
}

void ioTask()
{
    cout << getpid() << "执行io任务\n" << endl;
    sleep(1);
}

void flushTask()
{
    cout << getpid() << "执行刷新任务\n" << endl;
    sleep(1);
}

void loadTaskFunc(vector<func_t>* out)
{
    assert(out);
    out->push_back(downloadTask);
    out->push_back(ioTask);
    out->push_back(flushTask);
}

//--------------------------------  以下代码是多进程代码  --------------------- 
class subEP //sub end point
{
public:
    subEP(pid_t subId, int writeFd)
        :_subId(subId)
        ,_writeFd(writeFd)
    {
        char nameBuffer[1024];
        snprintf(nameBuffer, sizeof(nameBuffer), "preocess - %d [pid(%d) - fd(%d)]", _num++, _subId, _writeFd);
        _name = nameBuffer;
    }

public:
    static int _num;
    string _name;
    pid_t _subId;
    int _writeFd;
};
int subEP::_num = 0;

int recvTask(int readFd)
{
    int code = 0;
    ssize_t s = read(readFd, &code, sizeof code);
    if(s == sizeof(code))//读取正常
    {
        return code;
    }
    else if(s <= 0)//读取出错
    {
        return -1;
    }
    else
    {
        return 0;
    }
}

void createSubProcess(vector<subEP>* subs, vector<func_t>& funcMap)
{
    //vector<int> deleteFd;//第一种方法:解决下一个子进程拷贝父进程读写端的问题

    for(int i = 0; i < PROCESS_SUM; i++)
    {
        int fds[2];
        int n = pipe(fds);
        assert(n == 0);
        (void)n;

        pid_t id = fork();
        //子进程
        if(id == 0)
        {
            // for(int i = 0; i < deleteFd.size(); i++)
            //     close(deleteFd[i]);

            close(fds[1]);
            while(true)
            {
                //1.获取父进程发送的命令码,没有收到命令码,进行阻塞等待
                int commandCode = recvTask(fds[0]);
                //2.执行任务
                if(commandCode >= 0 && commandCode < funcMap.size())
                {
                    funcMap[commandCode]();
                }
                else if(commandCode == -1)//读取失败返回-1
                {
                    break;
                }
            }
            //子进程退出
            exit(0);
        }

        //父进程
        close(fds[0]);
        subEP sub(id, fds[1]);
        subs->push_back(sub);//
        //deleteFd.push_back(fds[1]);

    }
}

void sendTask(const subEP& process, int taskNum)
{
    cout << "send tak num: " << taskNum << " send to -> " << process._name << endl;
    int n = write(process._writeFd, &taskNum, sizeof(taskNum));
    assert(n == sizeof(int));
    (void)n;
}

void loadBlanceContrl(vector<subEP>& subs,  vector<func_t>& funcMap, int count)
{
    int processSum = subs.size();
    int taskSum = funcMap.size();

    bool forever = (count == 0 ? true : false);
    while(true)
    {
         // 1. 随机选择一个子进程
        int subIdx = rand() % processSum;
        // 2. 随机选择一个任务
        int taskIdx = rand() % taskSum;
        // 3. 任务发送给选择的进程
        sendTask(subs[subIdx], taskIdx);
        sleep(1);

        if(!forever)
        {
            count--;
            if(count == 0)
                break;
        }
    }

    //第二种方法:解决下一个子进程拷贝父进程读写端的问题
    //写端退出,关闭读
    for(int i = 0; i < processSum; i++)
    {
        close(subs[i]._writeFd);
    }
}

void waitProcess(vector<subEP> process)
{
    int processSum = process.size();
    for(int i = 0; i < processSum; i++)
    {
        waitpid(process[i]._subId, nullptr, 0);
        cout << "wait sub process success ..." << process[i]._subId << endl;
    }
}

int main()
{
    //创建随机数
    makeSeed();

    // 1.建立子进程并建立和子进程通信的信道
        // 1.1 加载方法任务表
    vector<func_t> funcMap;
    loadTaskFunc(&funcMap);
        // 1.2 创建子进程,并且维护父子通信信道
    vector<subEP> subs;
    createSubProcess(&subs, funcMap);

    // 2.父进程,控制子进程,负载均衡的向子进程发送命令码
    int taskCnt = 5;//执行任务次数,为0时永远执行任务
    loadBlanceContrl(subs, funcMap, taskCnt);

    // 3.回收子进程
    waitProcess(subs);

    return 0;
}

运行结果

小提示:以 .cpp .cxx .cc 结尾的都是C++的源文件 

2.3 命名管道

匿名管道只能用于具有共同祖先的进程(具有亲缘关系的进程)之间的通信,通常,一个管道由一个进程创建,然后该进程调用fork创建子进程,父子进程通过匿名管道进行通信。如果要实现两个毫不相关进程之间的通信,可以使用命名管道来做到

2.3.1 使用命令创建命名管道

使用 mkfifo 命令创建一个命名管道

mkfifo 文件名
ps: mkfifo named_pipe

可以看到,创建出来的文件的类型是 p ,代表该文件是命名管道文件

命名管道也有自己的 inode,说明命名管道就是一个独立的文件

使用这个命名管道文件,就能实现两个进程之间的通信了。我们在一个进程(进程A)中用shell脚本每秒向命名管道写入一个字符串,在另一个进程(进程B)当中用 cat命令从命名管道当中进行读取

现象:当进程A启动后,进程B会每秒从命名管道中读取一个字符串打印到显示器上

这就证明了这两个毫不相关的进程可以通过命名管道进行数据传输,即通信

先测试往显示器上打印 (shell脚本语言)

cnt=0; while :; do echo "hello world -> $cnt"; let cnt++; sleep 2; done

运行结果 

 输出重定向到管道里

cnt=0; while :; do echo "hello world -> $cnt"; let cnt++; sleep 2; done > named_pipe

注:脚本语言是一个进程,cat也是一个进程,两个进程毫无关系

cat进行输入重定向 ,向管道named\_pipe读取数据

cat < named_pipe

运行结果

之前我们说过,当管道的读端进程退出后,写端进程再向管道写入数据就没有意义了,此时写端进程会被操作系统杀掉,在这里就可以很好的得到验证:当我们终止掉读端进程后,因为写端执行的循环脚本是由命令行解释器bash执行的,所以此时bash就会被操作系统杀掉,我们的云服务器也就退出了

注意:命名管道的大小是不会改变的,都为0,因为数据都是在文件缓冲区 

 2.3.2 命名管道的原理

命令管道用于实现两个毫不相关进程之间的通信

  • 进程间通信的本质就是,让不同的进程看到同一份资源,使用命名管道实现进程间通信的原理是:也是让两个进程先看到同一份被打开的文件资源,这个文件资源就是我们创建的命名管道
  • 两个毫不相关进程打开了同一个命名管道,此时这两个进程也就看到了同一份资源,进而就可以进行通信了,通信的数据依旧是在文件缓冲区里面,并且不会刷新到磁盘
  • 命名管道可以通过路径+名字标定唯一性,匿名管道是通过地址来标定唯一性的,这个地址没有名字,所以叫匿名管道

2.3.3 在程序中创建命名管道

在程序中创建命名管道使用也是使用mkfifomkfifo是命令,也是一个函数

man 3 mkfifo查看一下

mkfifo函数的函数原型如下:

int mkfifo(const char *pathname, mode_t mode);

 解释:

头文件:
#include <sys/types.h>
#include <sys/stat.h>

声明:
int mkfifo(const char *pathname, mode_t mode);

参数:
    (1)pathname
    mkfifo函数的第一个参数是pathname,表示要创建的命名管道文件
    注意:
        若pathname以路径的方式给出,则将命名管道文件创建在pathname路径下
        若pathname以文件名的方式给出,则将命名管道文件默认创建在当前路径下

    (2)mode
    mkfifo函数的第二个参数是mode,表示创建命名管道文件的默认权限

返回值:
命名管道创建成功,返回0
命名管道创建失败,返回-1,错误码被设置

注意:若想创建出来命名管道文件的权限值不受影响,则需要在创建文件前使用 umask函数将文件默认掩码设置为0

代码示例:

#include <stdio.h>
#include <sys/types.h>
#include <sys/stat.h>

#define FILE_NAME "named_pipe"

int main()
{
	umask(0); //将文件默认掩码设置为0
    //使用mkfifo创建命名管道文件
    int n = mkfifo(FILE_NAME, 0666);
	if (n < 0)
    { 
		perror("mkfifo");
		return -1;
	}

	return 0;
}

运行结果

2.3.4 unlink函数

上面的程序再次运行就会报错

这是因为mkfifo函数创建管道是,如果管道已经存在,就不会创建,直接报错:文件已经存在

如果我们想让程序运行结束,创建的管道也被删除,就要使用unlink函数

man 3 unlink 查看一下

unlink

头文件:
#include <unistd.h>

函数声明:
int unlink(const char *path);

参数:
传入要被删除文件的名字

返回值:
删除成功返回 0
失败返回 -1 ,错误码被设置

 测试代码

#include <stdio.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>

#define FILE_NAME "named_pipe"

int main()
{
	umask(0); //将文件默认掩码设置为0
    //使用mkfifo创建命名管道文件
    int n = mkfifo(FILE_NAME, 0666);
	if (n < 0)
    { 
		perror("mkfifo");
		return -1;
	}

    //删除管道文件
    n = unlink(FILE_NAME);
    if(n < 0)
    {
        perror("unlink");
		return -1;
    }
    else
    {
        printf("管道文件删除成功\n");
    }

	return 0;
}

运行结果

小提示:assert不要乱使用,意料之中使用assert,意料之外使用if判断 

2.3.5 使用命名管道实现serve&client通信

实现服务端(server)和客户端(client)之间的通信之前,我们需要先让服务端运行起来,我们需要让服务端运行后创建一个命名管道文件,然后再以读的方式打开该命名管道文件,之后服务端就可以从该命名管道当中读取客户端发来的通信信息了

共同的头文件:comm.hpp

客户端和服务端共用一个头文件

#pragma once

#include <iostream>
#include <string>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <fcntl.h>
#include <cassert>
#include <cstring>

using namespace std;

#define NAMED_PIPE "named_pipe"

//创建命名管道
bool createFifo(const string& path)
{
    umask(0);
    int n = mkfifo(path.c_str(), 0600);
    if(n == 0)//创建成功
    {
        return true;
    }
    else//创建失败
    {
        cout << "errno: " << "errno string: " << strerror(errno) << endl;
        return false;
    }
}

//删除命名管道
void removeFifo(const string& path)
{
    int n = unlink(path.c_str());
    assert(n == 0);//release下就没有了
    (void)n;
}

服务端的代码如下:(server.cc

#include "comm.hpp"

int main()
{
    //创建命名管道
    bool r = createFifo(NAMED_PIPE);
    assert(r);
    (void)r;

    cout << "server begin" << endl;
    int rfd = open(NAMED_PIPE, O_RDONLY);//打开命名管道,服务端以读方式打开
    if(rfd < 0)
        exit(-1);

    //read
    char buffer[1024];
    while(true)
    {
        ssize_t s = read(rfd, buffer, sizeof(buffer) - 1);
        if(s > 0)//读取正常
        {
            buffer[s] = '\0';
            cout << "client -> server# " << buffer << endl;
        }
        else if(s == 0)//client退出,server也退出
        {
            cout << "client quit, me too!" << endl;
            break;
        }
        else//读取错误
        {
            cout << "error string: " << strerror(errno) << endl;
            break;
        }
    }
    //关闭文件描述符
    close(rfd);
    //程序退出删除命名管道
    removeFifo(NAMED_PIPE);
    cout << "server end" << endl;

    return 0;
}

服务端代码:(client.cc)

#include "comm.hpp"

int main()
{
    cout << "client begin" << endl;
    int wfd = open(NAMED_PIPE, O_WRONLY);//打开命名管道,客户端以写的方式打开
    if(wfd < 0)
        exit(-1);

    //write
    char buffer[1024];
    while(true)
    {
        cout << "Please Say# ";
        fgets(buffer, sizeof(buffer), stdin);//输入信息
        if(strlen(buffer) > 0)
            buffer[strlen(buffer) - 1] = 0;//去掉输入多余的 \n
        
        ssize_t n = write(wfd, buffer, strlen(buffer));
        assert(n == strlen(buffer));
        (void)n;
    }

    close(wfd);
    cout << "client end" << endl;
    return 0;
}

运行的时候,服务端先运行,然后客户端再运行,客户端不输入数据,服务端会一直阻塞等待

2.3.6 匿名管道与命名管道的区别

  1. 匿名管道由pipe函数创建并打开。
  2. 命名管道由mkfifo函数创建,打开用open
  3. FIFO(命名管道)与pipe(匿名管道)之间唯一的区别在它们创建与打开的方式不同,一但这些工作完成之后,它们具有相同的语义

--------------- END ---------------

「 作者 」 枫叶先生
「 更新 」 2023.3.29
「 声明 」 余之才疏学浅,故所撰文疏漏难免,
          或有谬误或不准确之处,敬请读者批评指正。