Thursday, 20 August 2015

Producer Consumer Problem with pthreads using semaphores

PC

Producer Consumer Problem with pthreads using semaphores

 The “Producer-Consumer problem” is one of the best ways to make an example of issues found in Process Communication. To make a simple analogy of the problem , let’s consider a restaurant , with a client who eats what is served for him on the table and a cook that brings food to the table. The issue is that we want to avoid two unpleasant situations. First the cook might bring food to the table when the table is full and then it might fall off. Second we want to avoid the client trying to eat the table , because there is no food on it.
            In reality, the cook is a process known as a “producer” while the client is a process known as the “cosnumer”, the food is data, and the table is a buffer. The problem is also reffered to as the “bounded buffer problem” as we have a limited size buffer that must be shared by two processes (maybe part of the same program) that both add and remove data from it.

The Issue:-


The main issue in this situation has been discussed above, the producer needs room in the buffer to add data, while the consumer needs tha buffer to be non-empty. 

Solutions:-

1. Sleep-wakeup (also known as the inadequate soluiton)
The main ideea of this solution is to use a variable, counter, to count the elements stored in the buffer. To prevent overflows, the code checks if the buffer hasn’t reached maximum capacity. If the value of counter = N (where N=max) , the producer makes a system call, sleep() , to wait until at least one element is ‘consumed’ . If the buffer was empty , the producer notifyes the consumer of the existence of one element in the buffer (namely the one it just added). The consumer code functions on the same principle, it checks if it there is an element to consume, and if the buffer was full and its cycle freed up a position it notifyes the producer. It uses the sleep() system call to wait for an element to be added tot the buffer.

int itemCount

procedure producer() {
    while (true) {
        item = produceItem()

        if (itemCount == BUFFER_SIZE) {
            sleep()
        }

        putItemIntoBuffer(item)
        itemCount = itemCount + 1
       
        if (itemCount == 1) {
            wakeup(consumer)
        }
    }
}

procedure consumer() {
    while (true) {

        if (itemCount == 0) {
            sleep()
        }
       
        item = removeItemFromBuffer()
        itemCount = itemCount - 1
        
        if (itemCount == BUFFER_SIZE - 1) {
            wakeup(producer)
        }
       
        consumeItem(item)
    }
}

2. Semaphor

Using semaphors we can solve this problem better . In the solution below we use two semaphores, full and empty, to solve the problem. Full is incremented and empty decremented when a new item has been put into the buffer. This works great for only one producer and consumer.Given the following circumstances we may encounter and unsatisfactory solution:

Two producers decrement the semaphor empty.One of the prodecers dtermines the next empty slot in the buffer. The sencond producer determines the next empty slot and gets the same result as the first producer. Both producer write in the same spot.
semaphore full = 0
semaphore empty = BUFFER_SIZE

procedure producer() {
    while (true) {
        item = produceItem()
        down(empty)
        putItemIntoBuffer(item)
        up(full)
    }
 }

procedure consumer() {
    while (true) {
        down(full)
        item = removeItemFromBuffer()
        up(empty)
        consumeItem(item)
    }
}

3. Mutex
To solve this issue, we should create a new semaphory to solve the mutual exclusion problem (no more than one producer has access to the buffer at one given time.) , thus the variable semaphory is named “mutex”.

When a process wants to enter critical region , it uses the mutex_lock() to lock the the variable mutex. If mutex is locked it waits it’s turn. The mutex solution works well because it offers another processes to accesss the processor , simply because it avoids the busy waiting solution.

semaphore mutex = 1
semaphore full = 0
semaphore empty = BUFFER_SIZE

procedure producer() {
    while (true) {
        item = produceItem()
        down(empty)
        down(mutex)
        putItemIntoBuffer(item)
        up(mutex)
        up(full)
    }
 }

procedure consumer() {
    while (true) {
        down(full)
        down(mutex)
        item = removeItemFromBuffer()
        up(mutex)
        up(empty)
        consumeItem(item)
    }
}

4. Monitors
A solution to this problem is implementing monitors. A monitor is a collection of data , procedure , variables and data structures grouped into a packed/module. Although processes can always call a monitor’s procedures they DO NOT have access to a monitor’s internal structure. Monitors use a conditional variable , and perform two operations upon it : wait and signal .
            If a proccess can’t continue , a wait operation on the conditional variable blocks the current process. A signal will make it resume operation.
Since mutual exclusion is implicit with monitors, no extra effort is necessary to protect critical section. Monitors provide a kind of high-level synchronization.

monitor ProducerConsumer {
   
    int itemCount
    condition full
    condition empty
   
    procedure add(item) {
        while (itemCount == BUFFER_SIZE) {
            wait(full)
        }
       
        putItemIntoBuffer(item)
        itemCount = itemCount + 1
       
        if (itemCount == 1) {
            notify(empty)
        }
    }
   
    procedure remove() {
        while (itemCount == 0) {
            wait(empty)
        }
       
        item = removeItemFromBuffer()
        itemCount = itemCount - 1
       
        if (itemCount == BUFFER_SIZE - 1) {
            notify(full)
        }
       
        return item;
    }
}

procedure producer() {
    while (true) {
        item = produceItem()
        ProducerConsumer.add(item)
    }
}

procedure consumer() {
    while (true) {
        item = ProducerConsumer.remove()
        consumeItem(item)
    }
}
l_ lass=MsoNormal> 

        if (itemCount == 0) {
            sleep()
        }
       
        item = removeItemFromBuffer()
        itemCount = itemCount - 1
        
        if (itemCount == BUFFER_SIZE - 1) {
            wakeup(producer)
        }
       
        consumeItem(item)
    }
}




            The “Producer-Consumer problem” is one of the best ways to make an example of issues found in Process Communication. To make a simple analogy of the problem , let’s consider a restaurant , with a client who eats what is served for him on the table and a cook that brings food to the table. The issue is that we want to avoid two unpleasant situations. First the cook might bring food to the table when the table is full and then it might fall off. Second we want to avoid the client trying to eat the table , because there is no food on it.

            In reality, the cook is a process known as a “producer” while the client is a process known as the “cosnumer”, the food is data, and the table is a buffer. The problem is also reffered to as the “bounded buffer problem” as we have a limited size buffer that must be shared by two processes (maybe part of the same program) that both add and remove data from it.



5. Messages

Messages sent between proccess is yet another way to solve the producer-consumer problem. We create a special structure , named a mailbox for both processes and messages are sent to these structures rather to memeory address . Thus the program is safer , with a high-end approach.


Usage
            The producer-cosnumer solves the issue of thread synchronization in operating systems, thus making sure there are no problems with stack overflows or inconsistent data addressing.
            Also, another possible application could be in reliable network protocols, to make sure data is delivered reliabley .


An Example:-

#include<stdio.h>
#include<pthread.h>
#include<sys/types.h>
#include<unistd.h>
#include<stdlib.h>
#include<semaphore.h>

sem_t empty,full,mutex;
char buf[10];
void* thread_fun1(void* arg)
{
int i;
//printf("inside producer\n");
for(i=0;i<10;i++)
{
sem_wait(&empty);
sem_wait(&mutex);
buf[i]=i;
printf("item produced is %d\n",buf[i]);
sem_post(&mutex);
sem_post(&full);
sleep(1);

}
pthread_exit("producer\n");

}
void * thread_fun2(void* arg)
{
int j;

printf("inside consumer\n");
for(j=0;j<10;j++)10
{
sem_wait(&full);
sem_wait(&mutex);
      // sleep(1);

j=buf[j];
printf("consumed item is:%d\n",buf[j]);
sem_post(&mutex);
sem_post(&empty);
sleep(5);


}

pthread_exit("consumer\n");
}



int main()
{
pthread_t pid1,pid2;

sem_init(&empty,0,10);
sem_init(&full,0,0);
sem_init(&mutex,1,1);

void *status;
pthread_create(&pid1,NULL,thread_fun1,NULL);
pthread_create(&pid2,NULL,thread_fun2,NULL);


pthread_join(pid1,&status);
printf("the exited status of 1st is %s\n",(char*)status);
pthread_join(pid2,&status);
printf("the exited status %s\n",(char*)status);



return 0;
}

No comments:

Post a Comment