Csharp/C Sharp/Thread/Producer Consumer
Producer and comsumer in a synchronized buffer
/*
Code revised from Book published by
(C) Copyright 1992-2006 by Deitel & Associates, Inc. and
Pearson Education, Inc. All Rights Reserved.
*/
using System;
using System.Threading;
public class SynchronizedBuffer
{
private int buffer = -1;
private int occupiedBufferCount = 0;
public int Buffer
{
get
{
Monitor.Enter( this );
if ( occupiedBufferCount == 0 )
{
Console.WriteLine(Thread.CurrentThread.Name + " tries to read." );
DisplayState( "Buffer empty. " +Thread.CurrentThread.Name + " waits." );
Monitor.Wait( this );
}
--occupiedBufferCount;
DisplayState( Thread.CurrentThread.Name + " reads " + buffer );
Monitor.Pulse( this );
int bufferCopy = buffer;
Monitor.Exit( this );
return bufferCopy;
}
set
{
Monitor.Enter( this );
if ( occupiedBufferCount == 1 )
{
Console.WriteLine(Thread.CurrentThread.Name + " tries to write." );
DisplayState( "Buffer full. " + Thread.CurrentThread.Name + " waits." );
Monitor.Wait( this );
}
buffer = value;
++occupiedBufferCount;
DisplayState( Thread.CurrentThread.Name + " writes " + buffer );
Monitor.Pulse( this );
Monitor.Exit( this );
}
}
public void DisplayState( string operation )
{
Console.WriteLine( "{0,-35}{1,-9}{2}\n",operation, buffer, occupiedBufferCount );
}
static void Main( string[] args )
{
SynchronizedBuffer shared = new SynchronizedBuffer();
Random random = new Random();
Console.WriteLine( "{0,-35}{1,-9}{2}\n","Operation", "Buffer", "Occupied Count" );
shared.DisplayState( "Initial state" );
Producer producer = new Producer( shared, random );
Consumer consumer = new Consumer( shared, random );
Thread producerThread = new Thread( new ThreadStart( producer.Produce ) );
producerThread.Name = "Producer";
Thread consumerThread = new Thread( new ThreadStart( consumer.Consume ) );
consumerThread.Name = "Consumer";
producerThread.Start();
consumerThread.Start();
}
}
public class Consumer
{
private SynchronizedBuffer sharedLocation;
private Random randomSleepTime;
public Consumer( SynchronizedBuffer shared, Random random )
{
sharedLocation = shared;
randomSleepTime = random;
}
public void Consume()
{
int sum = 0;
for ( int count = 1; count <= 10; count++ )
{
Thread.Sleep( randomSleepTime.Next( 1, 1001 ) );
sum += sharedLocation.Buffer;
}
Console.WriteLine("{0} read values totaling: {1}.\nTerminating {0}.",Thread.CurrentThread.Name, sum );
}
}
public class Producer
{
private SynchronizedBuffer sharedLocation;
private Random randomSleepTime;
public Producer( SynchronizedBuffer shared, Random random )
{
sharedLocation = shared;
randomSleepTime = random;
}
public void Produce()
{
for ( int count = 1; count <= 10; count++ )
{
Thread.Sleep( randomSleepTime.Next( 1, 1001 ) );
sharedLocation.Buffer = count;
}
Console.WriteLine( "{0} done producing.\nTerminating {0}.",Thread.CurrentThread.Name );
}
}
Producer and consumer with a Circular Buffer
/*
Code revised from Book published by
(C) Copyright 1992-2006 by Deitel & Associates, Inc. and
Pearson Education, Inc. All Rights Reserved.
*/
using System;
using System.Threading;
public class Producer
{
private CircularBuffer sharedLocation;
private Random randomSleepTime;
public Producer( CircularBuffer shared, Random random )
{
sharedLocation = shared;
randomSleepTime = random;
}
public void Produce()
{
for ( int count = 1; count <= 10; count++ )
{
Thread.Sleep( randomSleepTime.Next( 1, 3001 ) );
sharedLocation.Buffer = count;
}
Console.WriteLine( "{0} done producing.\nTerminating {0}.", Thread.CurrentThread.Name );
}
}
public class Consumer
{
private CircularBuffer sharedLocation;
private Random randomSleepTime;
public Consumer( CircularBuffer shared, Random random )
{
sharedLocation = shared;
randomSleepTime = random;
}
public void Consume()
{
int sum = 0;
for ( int count = 1; count <= 10; count++ )
{
Thread.Sleep( randomSleepTime.Next( 1, 3001 ) );
sum += sharedLocation.Buffer;
}
Console.WriteLine("{0} read values totaling: {1}.\nTerminating {0}.",
Thread.CurrentThread.Name, sum );
}
}
public class CircularBuffer
{
private int[] buffers = { -1, -1, -1 };
private int occupiedBufferCount = 0;
private int readLocation = 0;
private int writeLocation = 0;
public int Buffer
{
get
{
lock ( this )
{
if ( occupiedBufferCount == 0 )
{
Console.Write( "\nAll buffers empty. {0} waits.",Thread.CurrentThread.Name );
Monitor.Wait( this );
}
int readValue = buffers[ readLocation ];
Console.Write( "\n{0} reads {1} ",Thread.CurrentThread.Name, buffers[ readLocation ] );
--occupiedBufferCount;
readLocation = ( readLocation + 1 ) % buffers.Length;
Console.Write( CreateStateOutput() );
Monitor.Pulse( this );
return readValue;
}
}
set
{
lock ( this )
{
if ( occupiedBufferCount == buffers.Length )
{
Console.Write( "\nAll buffers full. {0} waits.",Thread.CurrentThread.Name );
Monitor.Wait( this );
}
buffers[ writeLocation ] = value;
Console.Write( "\n{0} writes {1} ",Thread.CurrentThread.Name, buffers[ writeLocation ] );
++occupiedBufferCount;
writeLocation = ( writeLocation + 1 ) % buffers.Length;
Console.Write( CreateStateOutput() );
Monitor.Pulse( this );
}
}
}
public string CreateStateOutput()
{
string output = "(buffers occupied: " + occupiedBufferCount + ")\nbuffers: ";
for ( int i = 0; i < buffers.Length; i++ )
output += " " + string.Format( "{0,2}", buffers[ i ] ) + " ";
output += "\n";
output += " ";
for ( int i = 0; i < buffers.Length; i++ )
output += "---- ";
output += "\n";
output += " ";
for ( int i = 0; i < buffers.Length; i++ )
{
if ( i == writeLocation &&
writeLocation == readLocation )
output += " WR ";
else if ( i == writeLocation )
output += " W ";
else if ( i == readLocation )
output += " R ";
else
output += " ";
}
output += "\n";
return output;
}
static void Main( string[] args )
{
CircularBuffer shared = new CircularBuffer();
Random random = new Random();
Console.Write( shared.CreateStateOutput() );
Producer producer = new Producer( shared, random );
Consumer consumer = new Consumer( shared, random );
Thread producerThread = new Thread( new ThreadStart( producer.Produce ) );
producerThread.Name = "Producer";
Thread consumerThread = new Thread( new ThreadStart( consumer.Consume ) );
consumerThread.Name = "Consumer";
producerThread.Start();
consumerThread.Start();
}
}
Uses Wait and Pulse to enable producer and consumer threads to cooperate in using a buffer
/*
Revised from code in "Computing with C# and the .NET Framework"
# Paperback: 753 pages
# Publisher: Jones and Bartlett Publishers, Inc.; Bk&CD-Rom edition (February 2003)
# Language: English
# ISBN: 0763723398
# Product Dimensions: 8.9 x 7.7 x 1.1 inches
*/
using System;
using System.Runtime.rupilerServices;
using System.Threading;
class Buffer {
public const int size = 3;
int[] buffer = new int [size];
int putpos=0;
int getpos=0;
int number=0;
[MethodImpl(MethodImplOptions.Synchronized)]
public void Put(int value) {
if (number == size) {
Console.WriteLine("Cannot put -- Buffer full");
Monitor.Wait(this);
}
number++;
buffer[putpos] = value;
Console.WriteLine("Put "+value);
putpos = (putpos + 1) % size;
if (number == 1) Monitor.Pulse(this);
}
[MethodImpl(MethodImplOptions.Synchronized)]
public int Get() {
if (number == 0) {
Console.WriteLine("Buffer empty");
Monitor.Wait(this);
}
number--;
int n = buffer[getpos];
Console.WriteLine("Get "+n);
getpos = (getpos + 1) % size;
if (number == size - 1) Monitor.Pulse(this);
return n;
}
}
class Producer {
Buffer buf;
public Producer(Buffer b) {
buf = b;
Thread thread = new Thread(new ThreadStart(Run));
thread.Start();
}
public void Run() {
for(; true; ) {
buf.Put(0);
}
}
}
class Consumer {
Buffer buf;
public Consumer(Buffer b) {
buf = b;
Thread thread = new Thread(new ThreadStart(Run));
thread.Start();
}
public void Run() {
for (; true;) {
buf.Get();
}
}
}
public class PutGet {
public static void Main(String[] args) {
Buffer b = new Buffer();
Producer p = new Producer(b);
Consumer c = new Consumer(b);
}
}