LINUX 信号量

信号量

需求(JUST FOR FUN)

生产者一个进程,一个进程是循环放(for循环)

消费者四个进程,每个进程是循环取(for循环)

缓冲区为一个数组,大小为10

要求:生产者放向BUFF[0]一个,4个消费者进程的去BUFF[0]各取一次,消费了打印结果,

​ 生产者再向BUFF[1]放一个,4个消费者进程再去BUFF[1]消费,消费完打印结果,

​ 生产者再向BUFF[2]放一个,4个消费者进程再去BUFF[2]消费,消费完打印结果,

​ ……

​ ……

例如

PRODUCER—->1

​ CONSUMER1 FEATCH 1

​ CONSUMER2 FEATCH 1

​ CONSUMER2 FEATCH 1

​ CONSUMER3 FEATCH 1

PRODUCER—->2

​ CONSUMER1 FEATCH 2

​ CONSUMER2 FEATCH 2

​ CONSUMER2 FEATCH 2

​ CONSUMER3 FEATCH 2

代码

pc.c文件

#define __LIBRARY__
#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/wait.h>
#include <semaphore.h>
#include <errno.h>

#define BUFFER_SIZE 10
#define NUM 25
#define __NR_sem_open 87
#define __NR_sem_wait 88
#define __NR_sem_post 89
#define __NR_sem_unlink 90

//typedef void sem_t;gcc pc.c  -o pc -lpthread
void printSig(sem_t *sem,char *s);
void printChidlSig(sem_t *sem,char *s,int i,int pid);
void printSigPid(sem_t *sem,char *s,int pid);
int main()
{
    int i = 0, j = 0;
    int consumeNum = 0;
    int produceNum = 0;
    int consume_pos = 0;
    int produce_pos = 0;

    sem_t *empty, *full, *mutex, *mutex_produce, *mutex_consume;
    FILE *fp = NULL;
    pid_t producer_pid;
    pid_t consumer_pid[4] = {0, 0, 0, 0};

    sem_unlink("empty");
    sem_unlink("full");
    sem_unlink("mutex");
    sem_unlink("mutex_produce");
    sem_unlink("mutex_consume");

    sem_unlink("child1");
    sem_unlink("child2");
    sem_unlink("child3");
    sem_unlink("child4");
    sem_unlink("waitNumSem");

    empty = (sem_t *)sem_open("empty", O_CREAT, 0666, 10);
    full = (sem_t *)sem_open("full",O_CREAT, 0666, 0);
    mutex = (sem_t *)sem_open("mutex",O_CREAT, 0666, 1);
    mutex_produce = (sem_t *)sem_open("mutex_produce",O_CREAT, 0666, 1);
    mutex_consume = (sem_t *)sem_open("mutex_consume", O_CREAT, 0666,1);

    fp = fopen("filebuffer.txt", "wb+");

   sem_t *sem[4];
   sem_t * child1 = (sem_t *)sem_open("child1",O_CREAT, 0666, 1);
   sem_t * child2 = (sem_t *)sem_open("child2",O_CREAT, 0666, 0);
   sem_t * child3 = (sem_t *)sem_open("child3",O_CREAT, 0666, 0);
   sem_t * child4 = (sem_t *)sem_open("child4",O_CREAT, 0666, 0);

    //代表4个子进程有没有消费完
    sem_t * waitNumSem = (sem_t *)sem_open("waitNumSem",O_CREAT, 0666, 0);
    sem[0]=child1;
    sem[1]=child2;
    sem[2]=child3;
    sem[3]=child4;
    

    for (int i = 0; i < 1; i++)
    {
        if (!fork())
        {
            
            producer_pid = getpid();
            printf("Producer pid=%d create success!\n", producer_pid);
            for (int i = 0; i < NUM; i++)
            {

                //等4个子进程消费完毕才能往下,否则一直空转
                int value = 0;
                sem_getvalue(waitNumSem, &value);
                while(value>0){
                    sem_getvalue(waitNumSem, &value);
                    //printf("value = %d\n",value);
                    sleep(2);
                }
                sem_init(waitNumSem,1,4);
                //减一
                sem_wait(empty);

                //生产代码
                sem_wait(mutex_produce);
                produceNum = i;
                fseek(fp, produce_pos * sizeof(int), SEEK_SET);
                fwrite(&produceNum, sizeof(int), 1, fp);
                fflush(fp);

                //信号量的值加一
                sem_post(mutex_produce);
                sem_post(full);
                sem_post(full);
                sem_post(full);
                sem_post(full);

                printf("Producer pid=%d : %02d at %d\n", producer_pid, produceNum, produce_pos);
                fflush(stdout);
    
                produce_pos = (produce_pos + 1) % BUFFER_SIZE;

                sleep(1);
            }
            exit(0);
        }
    }
    for (int i = 0; i < 4; i++)
    {
        if (!fork())
        {   
            sleep(i);
            consumer_pid[i] = getpid();
           
            int local_consume_pos = 0;
            
            printf("\t\t\t\tConsumer pid=%d create success!\n", consumer_pid[i]);
            for (int j = 0; j < NUM; j++)
            {   
                //是否该自己消费
                 sem_wait(sem[i]); // 等待信号量

                sem_wait(full);

                //消费代码
                sem_wait(mutex_consume);
                fseek(fp, local_consume_pos * sizeof(int), SEEK_SET);
                fread(&consumeNum, sizeof(int), 1, fp);
                fflush(fp);
                printf("\t\t\tConsumer pid=%d: %02d at %d\n", consumer_pid[i], consumeNum, local_consume_pos);
                fflush(stdout);
                sem_post(mutex_consume);
                
            
                local_consume_pos = (local_consume_pos + 1) % BUFFER_SIZE;

        
                //通知下一个子进程消费
                sem_post(sem[(i + 1) % 4]);   

                //一个子进程消费完,
                sem_wait(waitNumSem);  
                int value = 0;
                 
                sem_getvalue(waitNumSem, &value);
                
                //四个子进程都消费完了才产生一个empty
                if (value==0)
                {
                    sem_post(empty); 
                }              
            }
            exit(0);
        }
    }

    waitpid(producer_pid, NULL, 0);
    for (int i = 0; i < 4; i++)
    {
        waitpid(consumer_pid[i], NULL, 0);
    }

    sem_unlink("empty");
    sem_unlink("full");
    sem_unlink("mutex");
    sem_unlink("mutex_produce");
    sem_unlink("mutex_consume");

    fclose(fp);

    return 0;
}

编译

gcc pc.c  -o pc -lpthread

运行

./pc

运行结果

image-20240417164729603

image-20240417164745410

java实现

public static void main(String[] args) throws InterruptedException {
        int BUFFER_SIZE = 10;
        Lock produceLock = new ReentrantLock();
        Lock consumerLock = new ReentrantLock();
        CountDownLatch mainLatch = new CountDownLatch(1);

        Semaphore fullS = new Semaphore(0);
        Semaphore emptyS = new Semaphore(BUFFER_SIZE);


        Semaphore c1 = new Semaphore(1);
        Semaphore c2 = new Semaphore(0);
        Semaphore c3 = new Semaphore(0);
        Semaphore c4 = new Semaphore(0);
        Semaphore[] cList = new Semaphore[4];
        cList[0] = c1;
        cList[1] = c2;
        cList[2] = c3;
        cList[3] = c4;

        int[] arr = new int[BUFFER_SIZE];
        AtomicInteger waitNum = new AtomicInteger(0);

        final AtomicInteger produce_pos = new AtomicInteger(0);
        for (int i = 0; i < 1; i++) {
            Thread produceThread = new Thread(() -> {
                for (int j = 0; j < 24; j++) {
                    try {
                        produceLock.lock();
                        while (waitNum.get() > 0) {

                        }
                        waitNum.set(4);
                        emptyS.acquire();
                        arr[produce_pos.get()] = j;
                        System.out.printf("Producer pid=%d : %02d at %d\n", Thread.currentThread().getId(), j, produce_pos.get());
                        produce_pos.set((produce_pos.get() + 1) % BUFFER_SIZE);
                        fullS.release(4);
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    } finally {
                        produceLock.unlock();

                    }
                }
                mainLatch.countDown();
            });
            produceThread.start();
            System.out.printf("Producer pid=%d create success!\n", produceThread.getId());
        }


        for (int i = 0; i < 4; i++) {

            int finalI = i;
            Thread subThread = new Thread(() -> {
                AtomicInteger local_consume_pos = new AtomicInteger(0);
                for (int j = 0; j < 24; j++) {
                    try {
                        cList[finalI].acquire();
                        fullS.acquire();
                        consumerLock.lock();
                        System.out.printf("\t\t\tConsumer pid=%d: %02d at %d\n", Thread.currentThread().getId(), j, local_consume_pos.get());
                        waitNum.decrementAndGet();
                        Semaphore semaphore = cList[(finalI + 1) % 4];
                        semaphore.release();
                        if (waitNum.get() == 0) {
                            emptyS.release();
                        }
                        local_consume_pos.set((local_consume_pos.get() + 1) % BUFFER_SIZE);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    } finally {
                        consumerLock.unlock();
                    }
                }
            });
            subThread.start();
            System.out.printf("Consumer pid=%d create success!\n", subThread.getId());
        }

        mainLatch.await();
        System.out.println("END");
    }

运行结果

image-20240418114152839

image-image-20240418114142619