LINUX 信号量
信号量
需求(JUST FOR FUN)
生产者一个进程,一个进程是循环放(for循环)
消费者四个进程,每个进程是循环取(for循环)
缓冲区为一个数组,大小为10
要求:生产者放向BUFF[0]一个,4个消费者进程的去BUFF[0]各取一次,消费了打印结果,
生产者再向BUFF[1]放一个,4个消费者进程再去BUFF[1]消费,消费完打印结果,
生产者再向BUFF[2]放一个,4个消费者进程再去BUFF[2]消费,消费完打印结果,
……
……
例如
PRODUCER—->1
CONSUMER1 FEATCH 1
CONSUMER2 FEATCH 1
CONSUMER2 FEATCH 1
CONSUMER3 FEATCH 1
PRODUCER—->2
CONSUMER1 FEATCH 2
CONSUMER2 FEATCH 2
CONSUMER2 FEATCH 2
CONSUMER3 FEATCH 2
代码
pc.c文件
#define __LIBRARY__
#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/wait.h>
#include <semaphore.h>
#include <errno.h>
#define BUFFER_SIZE 10
#define NUM 25
#define __NR_sem_open 87
#define __NR_sem_wait 88
#define __NR_sem_post 89
#define __NR_sem_unlink 90
//typedef void sem_t;gcc pc.c -o pc -lpthread
void printSig(sem_t *sem,char *s);
void printChidlSig(sem_t *sem,char *s,int i,int pid);
void printSigPid(sem_t *sem,char *s,int pid);
int main()
{
int i = 0, j = 0;
int consumeNum = 0;
int produceNum = 0;
int consume_pos = 0;
int produce_pos = 0;
sem_t *empty, *full, *mutex, *mutex_produce, *mutex_consume;
FILE *fp = NULL;
pid_t producer_pid;
pid_t consumer_pid[4] = {0, 0, 0, 0};
sem_unlink("empty");
sem_unlink("full");
sem_unlink("mutex");
sem_unlink("mutex_produce");
sem_unlink("mutex_consume");
sem_unlink("child1");
sem_unlink("child2");
sem_unlink("child3");
sem_unlink("child4");
sem_unlink("waitNumSem");
empty = (sem_t *)sem_open("empty", O_CREAT, 0666, 10);
full = (sem_t *)sem_open("full",O_CREAT, 0666, 0);
mutex = (sem_t *)sem_open("mutex",O_CREAT, 0666, 1);
mutex_produce = (sem_t *)sem_open("mutex_produce",O_CREAT, 0666, 1);
mutex_consume = (sem_t *)sem_open("mutex_consume", O_CREAT, 0666,1);
fp = fopen("filebuffer.txt", "wb+");
sem_t *sem[4];
sem_t * child1 = (sem_t *)sem_open("child1",O_CREAT, 0666, 1);
sem_t * child2 = (sem_t *)sem_open("child2",O_CREAT, 0666, 0);
sem_t * child3 = (sem_t *)sem_open("child3",O_CREAT, 0666, 0);
sem_t * child4 = (sem_t *)sem_open("child4",O_CREAT, 0666, 0);
//代表4个子进程有没有消费完
sem_t * waitNumSem = (sem_t *)sem_open("waitNumSem",O_CREAT, 0666, 0);
sem[0]=child1;
sem[1]=child2;
sem[2]=child3;
sem[3]=child4;
for (int i = 0; i < 1; i++)
{
if (!fork())
{
producer_pid = getpid();
printf("Producer pid=%d create success!\n", producer_pid);
for (int i = 0; i < NUM; i++)
{
//等4个子进程消费完毕才能往下,否则一直空转
int value = 0;
sem_getvalue(waitNumSem, &value);
while(value>0){
sem_getvalue(waitNumSem, &value);
//printf("value = %d\n",value);
sleep(2);
}
sem_init(waitNumSem,1,4);
//减一
sem_wait(empty);
//生产代码
sem_wait(mutex_produce);
produceNum = i;
fseek(fp, produce_pos * sizeof(int), SEEK_SET);
fwrite(&produceNum, sizeof(int), 1, fp);
fflush(fp);
//信号量的值加一
sem_post(mutex_produce);
sem_post(full);
sem_post(full);
sem_post(full);
sem_post(full);
printf("Producer pid=%d : %02d at %d\n", producer_pid, produceNum, produce_pos);
fflush(stdout);
produce_pos = (produce_pos + 1) % BUFFER_SIZE;
sleep(1);
}
exit(0);
}
}
for (int i = 0; i < 4; i++)
{
if (!fork())
{
sleep(i);
consumer_pid[i] = getpid();
int local_consume_pos = 0;
printf("\t\t\t\tConsumer pid=%d create success!\n", consumer_pid[i]);
for (int j = 0; j < NUM; j++)
{
//是否该自己消费
sem_wait(sem[i]); // 等待信号量
sem_wait(full);
//消费代码
sem_wait(mutex_consume);
fseek(fp, local_consume_pos * sizeof(int), SEEK_SET);
fread(&consumeNum, sizeof(int), 1, fp);
fflush(fp);
printf("\t\t\tConsumer pid=%d: %02d at %d\n", consumer_pid[i], consumeNum, local_consume_pos);
fflush(stdout);
sem_post(mutex_consume);
local_consume_pos = (local_consume_pos + 1) % BUFFER_SIZE;
//通知下一个子进程消费
sem_post(sem[(i + 1) % 4]);
//一个子进程消费完,
sem_wait(waitNumSem);
int value = 0;
sem_getvalue(waitNumSem, &value);
//四个子进程都消费完了才产生一个empty
if (value==0)
{
sem_post(empty);
}
}
exit(0);
}
}
waitpid(producer_pid, NULL, 0);
for (int i = 0; i < 4; i++)
{
waitpid(consumer_pid[i], NULL, 0);
}
sem_unlink("empty");
sem_unlink("full");
sem_unlink("mutex");
sem_unlink("mutex_produce");
sem_unlink("mutex_consume");
fclose(fp);
return 0;
}
编译
gcc pc.c -o pc -lpthread
运行
./pc
运行结果


java实现
public static void main(String[] args) throws InterruptedException {
int BUFFER_SIZE = 10;
Lock produceLock = new ReentrantLock();
Lock consumerLock = new ReentrantLock();
CountDownLatch mainLatch = new CountDownLatch(1);
Semaphore fullS = new Semaphore(0);
Semaphore emptyS = new Semaphore(BUFFER_SIZE);
Semaphore c1 = new Semaphore(1);
Semaphore c2 = new Semaphore(0);
Semaphore c3 = new Semaphore(0);
Semaphore c4 = new Semaphore(0);
Semaphore[] cList = new Semaphore[4];
cList[0] = c1;
cList[1] = c2;
cList[2] = c3;
cList[3] = c4;
int[] arr = new int[BUFFER_SIZE];
AtomicInteger waitNum = new AtomicInteger(0);
final AtomicInteger produce_pos = new AtomicInteger(0);
for (int i = 0; i < 1; i++) {
Thread produceThread = new Thread(() -> {
for (int j = 0; j < 24; j++) {
try {
produceLock.lock();
while (waitNum.get() > 0) {
}
waitNum.set(4);
emptyS.acquire();
arr[produce_pos.get()] = j;
System.out.printf("Producer pid=%d : %02d at %d\n", Thread.currentThread().getId(), j, produce_pos.get());
produce_pos.set((produce_pos.get() + 1) % BUFFER_SIZE);
fullS.release(4);
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
produceLock.unlock();
}
}
mainLatch.countDown();
});
produceThread.start();
System.out.printf("Producer pid=%d create success!\n", produceThread.getId());
}
for (int i = 0; i < 4; i++) {
int finalI = i;
Thread subThread = new Thread(() -> {
AtomicInteger local_consume_pos = new AtomicInteger(0);
for (int j = 0; j < 24; j++) {
try {
cList[finalI].acquire();
fullS.acquire();
consumerLock.lock();
System.out.printf("\t\t\tConsumer pid=%d: %02d at %d\n", Thread.currentThread().getId(), j, local_consume_pos.get());
waitNum.decrementAndGet();
Semaphore semaphore = cList[(finalI + 1) % 4];
semaphore.release();
if (waitNum.get() == 0) {
emptyS.release();
}
local_consume_pos.set((local_consume_pos.get() + 1) % BUFFER_SIZE);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
consumerLock.unlock();
}
}
});
subThread.start();
System.out.printf("Consumer pid=%d create success!\n", subThread.getId());
}
mainLatch.await();
System.out.println("END");
}
运行结果

