Linux信号量与互斥锁解决生产者与消费者问题

Source

先来看什么是生产者消费者问题:

生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了两个共享固定大小缓冲区线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。

如何解决?

如上百度百科所示,它是一个经典的多线程同步问题,然后解决方案就用我们标题所说的信号量和互斥锁共同解决:
 如上图所示,由主线程main创建子线程生产者以及消费者,生产者向缓冲区中写入数据,消费者向缓冲区读取数据,当缓冲区为满(full)时,生产者写入数据的操作必须堵塞,而当缓冲区为空(NULL)时,消费者向缓冲区读取数据的操作也必须堵塞

然后子线程中的生产者线程和消费者线程必定会产生竞争,如何让这若干生产者和若干消费者有序的进行呢,这时候就必须想到多线程的同步,最常用的就是信号量和互斥锁,关于信号量和互斥锁还有不太懂得朋友们可以去看看这篇:Linux多线程的同步-----信号量和互斥锁_神厨小福贵!的博客-CSDN博客

上面说到的缓冲区为NULL时,读取阻塞,如果缓冲区为满(full)时,写入数据进行阻塞,那么我们如何判断这个缓冲区是NULL或者是满(full)的呢?其实很简单,下标来追踪数据,定义两个下标in和out,in即是我们写入数据的下标,out是我们读取数据的下标,每写入一次in++,每读取一次,out++,然后这两下标的初始值都为0,因为刚开始的时候没有数据。

然后我们数据追踪的事情解决了,下面就来进行信号量初值的给定,假设我们这里缓冲区最大可以存储一个buff[10]的int整型数据,定义一个buff数组最大值为初始写入的信号量,表示还有空间,可以写入那么生产者写入操作的信号量初值为10

那么消费者读取数据的操作的信号量的初值代表的含义就是,缓冲区中有没有数据,刚开始缓冲区中肯定是没有数据的,所以读取操作的信号量的初值给0就好

 上图中的条件就是我们所说的生产者和消费者有序进行的条件,但是生产者消费者不可能只是单个的一条执行路线,可能会有多个生产者和多个消费者,一起共同运作,那么当生产者/消费者对缓冲区写入/读取数据的时候,怎么保证其他的生产者/消费者不会对共同下标的数据进行操作呢,也急速在某一时间内,只能有单个生产者或消费者对缓冲区进行操作,这是不是很像互斥锁的概念:

 对的,就是互斥锁,这块为了不让若干生产者/消费者共同对同一下标的数据进行操作,这块在信号量之后加上互斥锁,就可以保证同样一时间只有某个单一的操作对缓冲区进行操作:
下面来看代码:
 

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>
#include <semaphore.h>
#include <time.h>

#define BUFF_MAX 10    //假设缓冲区buff最大存储数据为10
#define SC_NUM 2       //假设两个生产者
#define XF_NUM 3       //假设两个消费者

sem_t empty;           //信号量empty
sem_t full;            //信号量full
pthread_mutex_t mutex; //互斥锁的定义

int buff[BUFF_MAX];    //缓冲区存储10十个数据
int in = 0; //初始写入下标为0
int out = 0; //初始读取下标为0

void * sc_thread(void * arg)    //生产者线程函数 sc的意思就是生产的缩写
{
    int index = (int)arg;
    for(int i = 0 ; i < 30 ; i++)
    {
        sem_wait(&empty);   //对初始值为10的信号量进行减一的操作
        pthread_mutex_lock(&mutex);  //上锁
        buff[in] = rand() % 100;   //在缓冲区中存储的数据是多少我们不关心,只需要知道那玩意存进去就好了
        printf("第%d个生产者,产生了%d数据,在%d位置上\n",index,buff[in],in); //打印做个标记
        in = (in + 1) % BUFF_MAX;  //假设写满之后,重头开始覆盖前面的数据,不会产生越界
        pthread_mutex_unlock(&mutex);   //解锁
        sem_post(&full);   //对full为0的初始值信号量进行加一,告诉消费者函数,可以进行读取了
        int n = rand() % 3; 
        sleep(n);    //随机进行睡眠一到三秒
    }
}

void * xf_thread(void * arg)  //xf是消费者的缩写
{
    int index = (int)arg;  
    for(int i = 0 ; i < 20 ; i++)      //因为生产者有两个,各自写入30次总共就是60次
    {                                  //然后消费者有三个,各自读取20次,刚好可以读完
        sem_wait(&full);
        pthread_mutex_lock(&mutex);
        printf("第%d个消费者,读取了%d数据,在%d位置上\n",index,buff[out],out);
        out = (out + 1) % BUFF_MAX;
        pthread_mutex_unlock(&mutex);
        sem_post(&empty);

        int n = rand() % 10;
        sleep(n);
   }        
}

int main()
{
    sem_init(&empty,0,BUFF_MAX);
    sem_init(&full,0,0);
    pthread_mutex_init(&mutex,NULL);

    srand(time(NULL));
    pthread_t sc_id[SC_NUM];
    pthread_t xf_id[XF_NUM];
    for(int i = 0 ; i < SC_NUM ; i++)
    {
        pthread_create(&sc_id[i],NULL,sc_thread,(void*)i);
    }

    for(int i = 0 ; i < XF_NUM ; i++)
    {
        pthread_create(&xf_id[i],NULL,xf_thread,(void*)i);
    }

    for(int i = 0 ; i < SC_NUM ; i++)
    {
         pthread_join(sc_id[i],NULL);
    }

    for(int i = 0 ; i < XF_NUM ; i++)
    {
        pthread_join(xf_id[i],NULL);
    }

    sem_destroy(&empty);
    sem_destroy(&full);
    pthread_mutex_destroy(&mutex);

    printf("main run over\n");
    exit(0);
}

下面来看运行截图:

 运行结果如上所示,需要注意的一点就是这道题的难点就在于,信号量里面嵌套一个互斥锁,来达到让若干生产者/消费者不会同时对某一块缓冲区进操作,代码理解的话,都不是很难,在前面一篇也都有讲到!

“加油   兄弟们!”