Problema del produttore/consumatore

In informatica, il problema del produttore-consumatore (conosciuto anche con il nome di problema del buffer limitato o bounded-buffer problem) è un esempio classico di sincronizzazione tra processi. Il problema descrive due processi, uno produttore (in inglese producer) ed uno consumatore (consumer), che condividono un buffer comune, di dimensione fissata. Compito del produttore è generare dati e depositarli nel buffer in continuo. Contemporaneamente, il consumatore utilizzerà i dati prodotti, rimuovendoli di volta in volta dal buffer. Il problema è assicurare che il produttore non elabori nuovi dati se il buffer è pieno, e che il consumatore non cerchi dati se il buffer è vuoto.

La soluzione per il produttore è sospendere la propria esecuzione se il buffer è pieno; non appena il consumatore avrà prelevato un elemento dal buffer, esso "sveglierà" il produttore, che ricomincerà quindi a riempire il buffer. Allo stesso modo, il consumatore si sospenderà se il buffer è vuoto; non appena il produttore avrà scaricato dati nel buffer, risveglierà il consumatore. Questa soluzione può essere implementata tramite delle strategie di comunicazione tra processi, tipicamente con dei semafori. Una soluzione errata potrebbe dar luogo ad un deadlock, in cui entrambi i processi aspettano di essere risvegliati.

Il problema può anche essere riscritto considerando più produttori e più consumatori distinti.

Implementazioni

Esempio di implementazione errata

La risoluzione del problema presenta una race condition. Un programmatore potrebbe perciò risolvere il problema in maniera incorretta, come mostrato nel codice seguente; in esso vengono usate due diverse procedure, sleep e wakeup. Quando viene invocata sleep da parte di un processo, questo si "addormenta", fino a che non interviene la procedura wakeup. La variabile itemCount è il numero di elementi contenuti nel buffer.

int itemCount procedure producer() {     while (true) {         item = produceItem()          if (itemCount == BUFFER_SIZE) {             sleep()         }          putItemIntoBuffer(item)         itemCount = itemCount + 1                  if (itemCount == 1) {             wakeup(consumer)         }     }}  procedure consumer() {     while (true) {          if (itemCount == 0) {             sleep()         }                  item = removeItemFromBuffer()         itemCount = itemCount - 1                  if (itemCount == BUFFER_SIZE - 1) {             wakeup(producer)         }                  consumeItem(item)     }}

Il problema con questa soluzione è la race condition che può portare ad un deadlock. Consideriamo il seguente scenario:

  1. Il consumatore ha appena letto la variabile itemCount, verificato che è uguale a zero e sta per entrare nel corpo dell'if.
  2. Prima di invocare sleep, il consumatore viene interrotto dal dispatcher del sistema operativo, che riattiva il produttore.
  3. Il produttore crea un nuovo dato da elaborare, lo mette nel buffer e incrementa itemCount.
  4. Poiché il buffer era vuoto prima di questa operazione, il produttore invoca wakeup per risvegliare i processi consumatori addormentati.
  5. Sfortunatamente, l'interruzione del consumatore è avvenuta prima che il consumatore invocasse sleep: wakeup non ha perciò alcun effetto. Non appena il controllo tornerà al consumatore, la procedura sleep verrà completata, impedendo l'esecuzione del consumatore.
  6. Il produttore andrà avanti fino a che non riempirà il buffer, dopodiché eseguirà anch'esso sleep.

Poiché entrambi i processi rimarranno per sempre addormentati, abbiamo raggiunto una situazione di stallo (deadlock), provando che questa soluzione è inadatta.

Una analisi possibile per questo errore è che ogni qual volta il linguaggio di programmazione non definisce la semantica con cui è necessario accedere alle variabili condivise (in questo caso itemCount), e non usa perciò elementi di sincronizzazione, la soluzione non sarà soddisfacente, anche senza dimostrare esplicitamente l'esistenza di una race condition.

Mediante semafori

I semafori risolvono il problema della perdita delle notifiche di risveglio dei processi. Nella soluzione seguente utilizzeremo due semafori, fillCount e emptyCount: il primo è incrementato ed il secondo decrementato quando un nuovo dato viene immesso nel buffer. Se il produttore prova a decrementare emptyCount mentre questo è zero, la sua esecuzione viene sospesa; non appena un elemento del buffer viene consumato, emptyCount viene incrementato e il produttore si riattiverà. Il consumatore funzionerà in maniera analoga.

 semaphore fillCount = 0 semaphore emptyCount = DIMENSIONE_DEL_BUFFER  procedure producer() {     while (true) {         item = produceItem()         down(emptyCount)         putItemIntoBuffer(item)         up(fillCount)     }}  procedure consumer() {     while (true) {         down(fillCount)         item = removeItemFromBuffer()         up(emptyCount)         consumeItem(item)     }}

Questa soluzione lavora egregiamente quando esistono solo un produttore ed un consumatore. Al contrario, con più istanze di produttori e di consumatori, essa produce una rilevante race condition che può generare in due o più processi che leggono o scrivono lo stesso elemento contemporaneamente. Per capire come questo potrebbe accadere, immaginiamo come la procedura putItemIntoBuffer(), che mette un risultato nel buffer, potrebbe essere implementata: essa dovrebbe contenere due azioni, una che determini uno spazio disponibile del buffer e l'altra che vi scrive il dato. Se la procedura può essere eseguita concorrentemente da più produttori, allora diviene possibile il seguente scenario:

  1. Due produttori decrementano emptyCount
  2. Uno dei produttori determina il successivo spazio nel buffer, ma prima di passare alla fase di scrittura, viene interrotto dal sistema operativo.
  3. Un secondo produttore, nel determinare lo spazio successivo, individua il medesimo del precedente.
  4. Entrambi i produttori scrivono sul medesimo spazio.

Per superare questa situazione, dobbiamo assicurarci che solo un produttore per volta esegua putItemIntoBuffer(); in altre parole, abbiamo bisogno di una sezione critica con Mutex. Per raggiungere questo scopo, introduciamo un semaforo binario, chiamato Mutex. Poiché il valore di un semaforo binario può essere solamente uno o zero, solo un processo può essere in esecuzione tra i due cambi di valore del semaforo (algoritmo del biglietto). Questa soluzione, adatta anche a più produttori e più consumatori, avrà un codice di questo tipo:

 semaphore mutex = 1 semaphore fillCount = 0 semaphore emptyCount = DIMENSIONE_DEL_BUFFER  procedure producer() {     while (true) {         item = produceItem()         down(emptyCount)         down(mutex)         putItemIntoBuffer(item)         up(mutex)         up(fillCount)     }}  procedure consumer() {     while (true) {         down(fillCount)         down(mutex)         item = removeItemFromBuffer()         up(mutex)         up(emptyCount)         consumeItem(item)     }}

Notiamo che l'ordine in cui i semafori vengono incrementati e decrementati è essenziale: cambiando quest'ordine si potrebbe generare un deadlock.

Mediante test and set

/* Risolvere con un programma in pseudo-codice il problema produttore consumatore, con un buffer condiviso di lunghezza finita N ( un array b[M] adoperato come buffer circolare).Utilizzando per la mutua esclusione l'istruzione test-and-set e l'attesa attiva (busy wait). */// !! NO SEMAFORI  !!int i = 0;int nq = 0;queue q;P(){while(true){x = produce();while (nq == bufferSize) { /* non fare niente */ }        while (!testAndSet(i)) { /* non fare niente */ }    /* INIZIA SEZIONE CRITICA */q.append();nq++;i = 0;/* FINISCE LA SEZIONE CRITICA *///...altre operazioni...    }}C(){while (true) {while (qn == 0) { /* non fare niente */ }        while (!testAndSet(i)) { /* non fare niente */ }    /* INIZIA SEZIONE CRITICA */y = q.take();nq--;i = 0;/* FINISCE SEZIONE CRITICA */consume(y);//...altre operazioni...}}

Esempi con numero di produttori, scrittori e buffer variabile

/* Un produttore P produce dati che mette in tre code FIFO, identificate con qa,qb e qc, in modo circolare fra le tre. Tre consumatori CA, CB e CC, prelevanoun dato per volta, rispettivamente dalle code qa, qb e qc, e li elaborano.I tempi di ogni singola produzione ed elaborazione sono molto irregolari, alpunto che un consumatore può trovare la propria coda vuota, nel qual casopuò prelevare un dato dalla coda successiva in senso circolare.Scrivere lo pseudo codice corrispondente ai processi P, CA, CB e CC,utilizzando i semafori per la sincronizzazione, le code qa, qb e qc sono dilunghezza uguale e finita. Evitare assolutamente l'attesa attiva dei treprocessi. N CONSUMATORI*/inizializzazioni() {queue qa, qb, qc; //inizializzo le codeSem vuoti_a = size;Sem vuoti_b = size;Sem vuoti_c = size; //servono per verificare la corretta produzioneSem dati = 0; //serve per vedere se abbiamo prodotto qualcosaSem mutex_a = 1;Sem mutex_b = 1; Sem mutex_c = 1; //servono per la mutua esclusioneint dati_a = 0;int dati_b = 0;int dati_c = 0; //controlla quanti dati abbiamo prodotto per quel consumatore}void P(){while (true){v = produce();semWait(vuoti_a);semWait(mutex_a);qa.append(v);dati_a++;semSignal(mutex_a);semSignal(dati);w = produce();semWait(vuoti_b);semWait(mutex_b);qb.append(w);dati_b++;semSignal(mutex_b);semSignal(dati);z = produce();semWait(vuoti_c);semWait(mutex_c);qc.append();dati_c++;semSignal(mutex_c);semSignal(dati);}}void CA(){while(true){semWait(dati);if (dati_a==0) {if (dati_b > 0) {//vado a "consumare" la coda bsemWait(mutex_b);qb.take();dati_b--;semSignal(mutex_b);semSignal(vuoti_b);} else {//vado a "consumare" la coda csemWait(mutex_c);qc.take();dati_c--;semSignal(mutex_c);semSignal(vuoti_c);}} else {//vado a "consumare" la coda del consumatore in esamesemWait(mutex_a);qa.take();dati_a--;semSignal(mutex_a);semSignal(vuoti_a);}consume();}}void CB(){while(true){semWait(dati);if (dati_b==0) {if (dati_a > 0) {//vado a "consumare" la coda asemWait(mutex_a);qa.take();dati_a--;semSignal(mutex_a);semSignal(vuoti_a);} else {//vado a "consumare" la coda csemWait(mutex_c);qc.take();dati_c--;semSignal(mutex_c);semSignal(vuoti_c);}} else {//vado a "consumare" la coda del consumatore in esamesemWait(mutex_b);qb.take();dati_b--;semSignal(mutex_b);semSignal(vuoti_b);}consume();}}void CC(){while(true){semWait(dati);if (dati_c==0) {if (dati_a > 0) {//vado a "consumare" la coda asemWait(mutex_a);qa.take();dati_a--;semSignal(mutex_a);semSignal(vuoti_a);} else {//vado a "consumare" la coda bsemWait(mutex_b);qb.take();dati_b--;semSignal(mutex_b);semSignal(vuoti_b);}} else {//vado a "consumare" la coda del consumatore in esamesemWait(mutex_c);qc.take();dati_c--;semSignal(mutex_c);semSignal(vuoti_c);}consume();}}// #################################################################// N PRODUTTORIinizializzazioni() {queue qa, qb; //inizializzo le codeSem vuoti_a = size;Sem vuoti_b = size; //servono per verificare la corretta produzioneSem dati = 0; //serve per vedere se abbiamo prodotto qualcosaSem mutex_a = 1;Sem mutex_b = 1;  //servono per la mutua esclusioneint dati_a = 0;int dati_b = 0; //controlla quanti dati abbiamo prodotto per quel consumatore}void P(){while (true){v = produce();semWait(vuoti_a);semWait(mutex_a);qa.append(v);dati_a++;semSignal(mutex_a);semSignal(dati);w = produce();semWait(vuoti_b);semWait(mutex_b);qb.append(w);dati_b++;semSignal(mutex_b);semSignal(dati);}}void C(){while(true){semWait(dati);if (dati_a==0) {// vado a consumare la coda qbsemWait(mutex_b);qb.take();dati_b--;semSignal(mutex_b);semSignal(vuoti_b);} else {//vado a "consumare" la coda del consumatore in esamesemWait(mutex_a);qa.take();dati_a--;semSignal(mutex_a);semSignal(vuoti_a);}consume();}}/* Il processo P1 produce dei dati e li invia al processo P2 tramite un sistema a doppio buffer, costituito da due buffer (ciascuno di lunghezza unitaria) condivisi tra P1 e P2. Mentre P2 legge da un buffer, P1 può scrivere nell'altro.Scrivere i codici dei processi P1 e P2, utilizzando i semafori per la sincronizzazione. N BUFFER*/Sem w1 = 1;Sem w2 = 1;Sem r1 = 0;Sem r2 = 0; dati x, y // inizializzazione dati lettivoid P1(){while (true) {k = produce();semWait(w1);append(buffer1, k);semSignal(r1);k = produce();semWait(w2);append(buffer2, k);semSignal(r2);}}void P2(){while(true){semWait(r1);y = take(buffer1);semSignal(w1);consume(y);semWait(r2);y = take(buffer2);semSignal(w2);consume(y);}}/* Si consideri un sistema costituito da soli due processi P1 e P2. Il processo P1 produce dei dati e li invia al processo P2 tramite una struttura a triplo buffer, costituita da tre buffer B1,B2 e B3 (ciascuno di lunghezza unitaria) condivisi fra P1 e P2. Mentre P2 legge da un buffer, P1 può scrivere in uno degli altri.Scrivere i codici dei processi P1 e P2, utilizzando i semafori per la sincronizzazione. Evitare l'attesa attiva dei processi P1 e P2. */Sem w1 = 1;Sem w2 = 1;Sem w3 = 1;Sem r1 = 0;Sem r2 = 0;Sem r3 = 0; var x, y // inizializzazione dati lettivoid P1(){while (true) {k = produce();semWait(w1);append(buffer1, k);semSignal(r1);k = produce();semWait(w2);append(buffer2, k);semSignal(r2);k = produce();semWait(w3);append(buffer3, k);semSignal(r3);}}void P2(){while(true){semWait(r1);y = take(buffer1);semSignal(w1);consume(y);semWait(r2);y = take(buffer2);semSignal(w2);consume(y);semWait(r3);y = take(buffer3);semSignal(w3);consume(y);}}

Uso di monitor

Mostriamo ora una soluzione tramite l'uso di monitor. Dal momento che essi garantiscono la mutua esclusione, non è necessaria nessuna implementazione aggiuntiva per proteggere le sezioni critiche nel caso di più produttori/consumatori. È inoltre degno di nota come questa implementazione renda più semplice prevedere ed evitare il verificarsi di race condition.

 monitor ProducerConsumer {          int itemCount     condition full     condition empty          procedure add(item) {         while (itemCount == BUFFER_SIZE) {             wait(full)         }                  putItemIntoBuffer(item)         itemCount = itemCount + 1                  if (itemCount == 1) {             notify(empty)         }     }          procedure remove() {         while (itemCount == 0) {             wait(empty)         }                  item = removeItemFromBuffer()         itemCount = itemCount - 1                  if (itemCount == BUFFER_SIZE - 1) {             notify(full)         }                  return item;     }}  procedure producer() {     while (true) {         item = produceItem()         ProducerConsumer.add(item)     } }  procedure consumer() {     while (true) {         item = ProducerConsumer.remove()         consumeItem(item)     } }

Bibliografia

Voci correlate

Portale Informatica: accedi alle voci di Wikipedia che trattano di informatica