Showing posts with label Linux z. Show all posts
Showing posts with label Linux z. Show all posts

Thursday 20 August 2015

Producer Consumer Problem with pthreads using semaphores

PC

Producer Consumer Problem with pthreads using semaphores

 The “Producer-Consumer problem” is one of the best ways to make an example of issues found in Process Communication. To make a simple analogy of the problem , let’s consider a restaurant , with a client who eats what is served for him on the table and a cook that brings food to the table. The issue is that we want to avoid two unpleasant situations. First the cook might bring food to the table when the table is full and then it might fall off. Second we want to avoid the client trying to eat the table , because there is no food on it.
            In reality, the cook is a process known as a “producer” while the client is a process known as the “cosnumer”, the food is data, and the table is a buffer. The problem is also reffered to as the “bounded buffer problem” as we have a limited size buffer that must be shared by two processes (maybe part of the same program) that both add and remove data from it.

The Issue:-


The main issue in this situation has been discussed above, the producer needs room in the buffer to add data, while the consumer needs tha buffer to be non-empty. 

Solutions:-

1. Sleep-wakeup (also known as the inadequate soluiton)
The main ideea of this solution is to use a variable, counter, to count the elements stored in the buffer. To prevent overflows, the code checks if the buffer hasn’t reached maximum capacity. If the value of counter = N (where N=max) , the producer makes a system call, sleep() , to wait until at least one element is ‘consumed’ . If the buffer was empty , the producer notifyes the consumer of the existence of one element in the buffer (namely the one it just added). The consumer code functions on the same principle, it checks if it there is an element to consume, and if the buffer was full and its cycle freed up a position it notifyes the producer. It uses the sleep() system call to wait for an element to be added tot the buffer.

int itemCount

procedure producer() {
    while (true) {
        item = produceItem()

        if (itemCount == BUFFER_SIZE) {
            sleep()
        }

        putItemIntoBuffer(item)
        itemCount = itemCount + 1
       
        if (itemCount == 1) {
            wakeup(consumer)
        }
    }
}

procedure consumer() {
    while (true) {

        if (itemCount == 0) {
            sleep()
        }
       
        item = removeItemFromBuffer()
        itemCount = itemCount - 1
        
        if (itemCount == BUFFER_SIZE - 1) {
            wakeup(producer)
        }
       
        consumeItem(item)
    }
}

2. Semaphor

Using semaphors we can solve this problem better . In the solution below we use two semaphores, full and empty, to solve the problem. Full is incremented and empty decremented when a new item has been put into the buffer. This works great for only one producer and consumer.Given the following circumstances we may encounter and unsatisfactory solution:

Two producers decrement the semaphor empty.One of the prodecers dtermines the next empty slot in the buffer. The sencond producer determines the next empty slot and gets the same result as the first producer. Both producer write in the same spot.
semaphore full = 0
semaphore empty = BUFFER_SIZE

procedure producer() {
    while (true) {
        item = produceItem()
        down(empty)
        putItemIntoBuffer(item)
        up(full)
    }
 }

procedure consumer() {
    while (true) {
        down(full)
        item = removeItemFromBuffer()
        up(empty)
        consumeItem(item)
    }
}

3. Mutex
To solve this issue, we should create a new semaphory to solve the mutual exclusion problem (no more than one producer has access to the buffer at one given time.) , thus the variable semaphory is named “mutex”.

When a process wants to enter critical region , it uses the mutex_lock() to lock the the variable mutex. If mutex is locked it waits it’s turn. The mutex solution works well because it offers another processes to accesss the processor , simply because it avoids the busy waiting solution.

semaphore mutex = 1
semaphore full = 0
semaphore empty = BUFFER_SIZE

procedure producer() {
    while (true) {
        item = produceItem()
        down(empty)
        down(mutex)
        putItemIntoBuffer(item)
        up(mutex)
        up(full)
    }
 }

procedure consumer() {
    while (true) {
        down(full)
        down(mutex)
        item = removeItemFromBuffer()
        up(mutex)
        up(empty)
        consumeItem(item)
    }
}

4. Monitors
A solution to this problem is implementing monitors. A monitor is a collection of data , procedure , variables and data structures grouped into a packed/module. Although processes can always call a monitor’s procedures they DO NOT have access to a monitor’s internal structure. Monitors use a conditional variable , and perform two operations upon it : wait and signal .
            If a proccess can’t continue , a wait operation on the conditional variable blocks the current process. A signal will make it resume operation.
Since mutual exclusion is implicit with monitors, no extra effort is necessary to protect critical section. Monitors provide a kind of high-level synchronization.

monitor ProducerConsumer {
   
    int itemCount
    condition full
    condition empty
   
    procedure add(item) {
        while (itemCount == BUFFER_SIZE) {
            wait(full)
        }
       
        putItemIntoBuffer(item)
        itemCount = itemCount + 1
       
        if (itemCount == 1) {
            notify(empty)
        }
    }
   
    procedure remove() {
        while (itemCount == 0) {
            wait(empty)
        }
       
        item = removeItemFromBuffer()
        itemCount = itemCount - 1
       
        if (itemCount == BUFFER_SIZE - 1) {
            notify(full)
        }
       
        return item;
    }
}

procedure producer() {
    while (true) {
        item = produceItem()
        ProducerConsumer.add(item)
    }
}

procedure consumer() {
    while (true) {
        item = ProducerConsumer.remove()
        consumeItem(item)
    }
}
l_ lass=MsoNormal> 

        if (itemCount == 0) {
            sleep()
        }
       
        item = removeItemFromBuffer()
        itemCount = itemCount - 1
        
        if (itemCount == BUFFER_SIZE - 1) {
            wakeup(producer)
        }
       
        consumeItem(item)
    }
}




            The “Producer-Consumer problem” is one of the best ways to make an example of issues found in Process Communication. To make a simple analogy of the problem , let’s consider a restaurant , with a client who eats what is served for him on the table and a cook that brings food to the table. The issue is that we want to avoid two unpleasant situations. First the cook might bring food to the table when the table is full and then it might fall off. Second we want to avoid the client trying to eat the table , because there is no food on it.

            In reality, the cook is a process known as a “producer” while the client is a process known as the “cosnumer”, the food is data, and the table is a buffer. The problem is also reffered to as the “bounded buffer problem” as we have a limited size buffer that must be shared by two processes (maybe part of the same program) that both add and remove data from it.



5. Messages

Messages sent between proccess is yet another way to solve the producer-consumer problem. We create a special structure , named a mailbox for both processes and messages are sent to these structures rather to memeory address . Thus the program is safer , with a high-end approach.


Usage
            The producer-cosnumer solves the issue of thread synchronization in operating systems, thus making sure there are no problems with stack overflows or inconsistent data addressing.
            Also, another possible application could be in reliable network protocols, to make sure data is delivered reliabley .


An Example:-

#include<stdio.h>
#include<pthread.h>
#include<sys/types.h>
#include<unistd.h>
#include<stdlib.h>
#include<semaphore.h>

sem_t empty,full,mutex;
char buf[10];
void* thread_fun1(void* arg)
{
int i;
//printf("inside producer\n");
for(i=0;i<10;i++)
{
sem_wait(&empty);
sem_wait(&mutex);
buf[i]=i;
printf("item produced is %d\n",buf[i]);
sem_post(&mutex);
sem_post(&full);
sleep(1);

}
pthread_exit("producer\n");

}
void * thread_fun2(void* arg)
{
int j;

printf("inside consumer\n");
for(j=0;j<10;j++)10
{
sem_wait(&full);
sem_wait(&mutex);
      // sleep(1);

j=buf[j];
printf("consumed item is:%d\n",buf[j]);
sem_post(&mutex);
sem_post(&empty);
sleep(5);


}

pthread_exit("consumer\n");
}



int main()
{
pthread_t pid1,pid2;

sem_init(&empty,0,10);
sem_init(&full,0,0);
sem_init(&mutex,1,1);

void *status;
pthread_create(&pid1,NULL,thread_fun1,NULL);
pthread_create(&pid2,NULL,thread_fun2,NULL);


pthread_join(pid1,&status);
printf("the exited status of 1st is %s\n",(char*)status);
pthread_join(pid2,&status);
printf("the exited status %s\n",(char*)status);



return 0;
}

Linux

Linux Socket Programming,

File transfer application using Sockets



 Code for Client


#include<stdio.h>
#include<stdlib.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<fcntl.h>

int main(int argc,int *argv[])
{

char buffer[100];
int fd=open("abc.txt",O_RDONLY);
read(fd,buffer,100);



int sock_id=socket(AF_INET,SOCK_STREAM,0);
if(sock_id<0)
{
printf("Error in getting socket\n");
return 0;
}

struct sockaddr_in clientstruct;
clientstruct.sin_family=AF_INET;
clientstruct.sin_addr.s_addr=inet_addr("127.0.0.1");
clientstruct.sin_port=10125;

int ret=connect(sock_id,(struct sockaddr *)&clientstruct,sizeof(clientstruct));


write(sock_id,buffer,100);

close(sock_id);


return 0;
}

Code for Server

#include<stdio.h>
#include<stdlib.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<fcntl.h>
int main()
{
char buffer[100];
int session_id;
int server_id=socket(AF_INET,SOCK_STREAM,0);
if(server_id<0)
{
printf("Error in getting socket\n");
return 0;
}
struct sockaddr_in serverstruct,clientstruct;
serverstruct.sin_family=AF_INET;
serverstruct.sin_addr.s_addr=inet_addr("127.0.0.1");
serverstruct.sin_port=10125;
int i=bind(server_id,(struct sockaddr*)&serverstruct,sizeof(serverstruct));
if(i<0)
{
printf("Error in bind\n");
return 0;
}
i=listen(server_id,10);
if(i<0)
{
printf("Error in listening\n");
return 0;
}
int client_len=sizeof(serverstruct);
while(1)
{
int fd=open("target.txt",O_WRONLY);
if(fd<0)
{
printf("file cant be iopened\n");
}
printf("Waiting for the client\n");
session_id=accept(server_id,(struct sockaddr*)&serverstruct,&client_len);
read(session_id,buffer,sizeof(buffer));
write(fd,buffer,100);
printf("From CLIENT:%s\n",buffer);
close(fd);
close(session_id);
}
close(server_id);
return 0;
}

Linux

Linuxz Pipes |
Linux (pronounced i/ˈlɪnəks/ LIN-əks or, less frequently, /ˈlaɪnəks/ LYN-əks) is a Unix-like and mostly POSIX-compliant computer operating system (OS) assembled under the model of free and open-source software development and distribution.

The UNIX domain sockets (UNIX Pipes) are typically used when communicating between two processes running in the same UNIX machine. UNIX Pipes usually have a very good throughput. They are also more secure than TCP/IP, since Pipes can only be accessed from applications that run on the computer where the server executes.

 

When starting a server using UNIX Pipes, you must reserve a unique listening name (inside that machine) for the server, for instance, 'soliddb'. Because UNIX Pipes handle the UNIX domain sockets as standard file system entries, there is always a corresponding file created for every listened pipe. In solidDB®'s case, the entries are created under the path /tmp Our example listening name 'soliddb' creates the directory/tmp/solunp_SOLIDDB and shared files in that directory. The /tmp/solunp_ is a constant prefix for all created objects while the latter part ('SOLIDDB' in this case) is the listening name in upper case format.


/*Write a program that spawns a child process which inherits two UNIX pipes created by the parent.Based on a command line option, you will then measure the "round-trip" time required to exchange a byte of data using the pipes*/

#include<stdio.h>
#include<unistd.h>
#include<fcntl.h>
#include<sys/time.h>
#include<time.h>
int main()
{
int p1[2],p2[2];
struct timeval tv;
char buf[6];
pipe(p1);
pipe(p2);
int i,j,pid;
pid=fork();
if(pid>0)
{
read(p1[0],buf,5);
write(p2[1],buf,5);
}
if(pid==0)
{

gettimeofday(&tv,NULL); //the fuction gettimeofday() is called
i=tv.tv_usec; //record the time before writing
write(p1[1],"HELLO",5);
read(p2[0],buf,5);
gettimeofday(&tv,NULL);
j=tv.tv_usec; //record the time after recieving
printf("\n\nRound tripp time = %d usec\n",j-i); //calculate the diffrence
}
return 0;

}