Csharp/C Sharp/Thread/Producer Consumer
Producer and comsumer in a synchronized buffer
<source lang="csharp"> /* Code revised from Book published by (C) Copyright 1992-2006 by Deitel & Associates, Inc. and Pearson Education, Inc. All Rights Reserved.
- /
using System; using System.Threading; public class SynchronizedBuffer {
private int buffer = -1; private int occupiedBufferCount = 0; public int Buffer { get { Monitor.Enter( this ); if ( occupiedBufferCount == 0 ) { Console.WriteLine(Thread.CurrentThread.Name + " tries to read." ); DisplayState( "Buffer empty. " +Thread.CurrentThread.Name + " waits." ); Monitor.Wait( this ); } --occupiedBufferCount; DisplayState( Thread.CurrentThread.Name + " reads " + buffer ); Monitor.Pulse( this ); int bufferCopy = buffer; Monitor.Exit( this ); return bufferCopy; } set { Monitor.Enter( this ); if ( occupiedBufferCount == 1 ) { Console.WriteLine(Thread.CurrentThread.Name + " tries to write." ); DisplayState( "Buffer full. " + Thread.CurrentThread.Name + " waits." ); Monitor.Wait( this ); } buffer = value; ++occupiedBufferCount; DisplayState( Thread.CurrentThread.Name + " writes " + buffer ); Monitor.Pulse( this ); Monitor.Exit( this ); } } public void DisplayState( string operation ) { Console.WriteLine( "{0,-35}{1,-9}{2}\n",operation, buffer, occupiedBufferCount ); } static void Main( string[] args ) { SynchronizedBuffer shared = new SynchronizedBuffer(); Random random = new Random(); Console.WriteLine( "{0,-35}{1,-9}{2}\n","Operation", "Buffer", "Occupied Count" ); shared.DisplayState( "Initial state" ); Producer producer = new Producer( shared, random ); Consumer consumer = new Consumer( shared, random ); Thread producerThread = new Thread( new ThreadStart( producer.Produce ) ); producerThread.Name = "Producer"; Thread consumerThread = new Thread( new ThreadStart( consumer.Consume ) ); consumerThread.Name = "Consumer"; producerThread.Start(); consumerThread.Start(); }
} public class Consumer {
private SynchronizedBuffer sharedLocation; private Random randomSleepTime; public Consumer( SynchronizedBuffer shared, Random random ) { sharedLocation = shared; randomSleepTime = random; } public void Consume() { int sum = 0; for ( int count = 1; count <= 10; count++ ) { Thread.Sleep( randomSleepTime.Next( 1, 1001 ) ); sum += sharedLocation.Buffer; } Console.WriteLine("{0} read values totaling: {1}.\nTerminating {0}.",Thread.CurrentThread.Name, sum ); }
} public class Producer {
private SynchronizedBuffer sharedLocation; private Random randomSleepTime; public Producer( SynchronizedBuffer shared, Random random ) { sharedLocation = shared; randomSleepTime = random; } public void Produce() { for ( int count = 1; count <= 10; count++ ) { Thread.Sleep( randomSleepTime.Next( 1, 1001 ) ); sharedLocation.Buffer = count; } Console.WriteLine( "{0} done producing.\nTerminating {0}.",Thread.CurrentThread.Name ); }
}
</source>
Producer and consumer with a Circular Buffer
<source lang="csharp"> /* Code revised from Book published by (C) Copyright 1992-2006 by Deitel & Associates, Inc. and Pearson Education, Inc. All Rights Reserved.
- /
using System; using System.Threading; public class Producer {
private CircularBuffer sharedLocation; private Random randomSleepTime; public Producer( CircularBuffer shared, Random random ) { sharedLocation = shared; randomSleepTime = random; } public void Produce() { for ( int count = 1; count <= 10; count++ ) { Thread.Sleep( randomSleepTime.Next( 1, 3001 ) ); sharedLocation.Buffer = count; } Console.WriteLine( "{0} done producing.\nTerminating {0}.", Thread.CurrentThread.Name ); }
} public class Consumer {
private CircularBuffer sharedLocation; private Random randomSleepTime; public Consumer( CircularBuffer shared, Random random ) { sharedLocation = shared; randomSleepTime = random; } public void Consume() { int sum = 0; for ( int count = 1; count <= 10; count++ ) { Thread.Sleep( randomSleepTime.Next( 1, 3001 ) ); sum += sharedLocation.Buffer; } Console.WriteLine("{0} read values totaling: {1}.\nTerminating {0}.", Thread.CurrentThread.Name, sum ); }
}
public class CircularBuffer {
private int[] buffers = { -1, -1, -1 }; private int occupiedBufferCount = 0; private int readLocation = 0; private int writeLocation = 0; public int Buffer { get { lock ( this ) { if ( occupiedBufferCount == 0 ) { Console.Write( "\nAll buffers empty. {0} waits.",Thread.CurrentThread.Name ); Monitor.Wait( this ); } int readValue = buffers[ readLocation ]; Console.Write( "\n{0} reads {1} ",Thread.CurrentThread.Name, buffers[ readLocation ] ); --occupiedBufferCount; readLocation = ( readLocation + 1 ) % buffers.Length; Console.Write( CreateStateOutput() ); Monitor.Pulse( this ); return readValue; } } set { lock ( this ) { if ( occupiedBufferCount == buffers.Length ) { Console.Write( "\nAll buffers full. {0} waits.",Thread.CurrentThread.Name ); Monitor.Wait( this ); } buffers[ writeLocation ] = value; Console.Write( "\n{0} writes {1} ",Thread.CurrentThread.Name, buffers[ writeLocation ] ); ++occupiedBufferCount; writeLocation = ( writeLocation + 1 ) % buffers.Length; Console.Write( CreateStateOutput() ); Monitor.Pulse( this ); } } } public string CreateStateOutput() { string output = "(buffers occupied: " + occupiedBufferCount + ")\nbuffers: "; for ( int i = 0; i < buffers.Length; i++ ) output += " " + string.Format( "{0,2}", buffers[ i ] ) + " "; output += "\n"; output += " "; for ( int i = 0; i < buffers.Length; i++ ) output += "---- "; output += "\n"; output += " "; for ( int i = 0; i < buffers.Length; i++ ) { if ( i == writeLocation && writeLocation == readLocation ) output += " WR "; else if ( i == writeLocation ) output += " W "; else if ( i == readLocation ) output += " R "; else output += " "; } output += "\n"; return output; } static void Main( string[] args ) { CircularBuffer shared = new CircularBuffer(); Random random = new Random(); Console.Write( shared.CreateStateOutput() ); Producer producer = new Producer( shared, random ); Consumer consumer = new Consumer( shared, random ); Thread producerThread = new Thread( new ThreadStart( producer.Produce ) ); producerThread.Name = "Producer"; Thread consumerThread = new Thread( new ThreadStart( consumer.Consume ) ); consumerThread.Name = "Consumer"; producerThread.Start(); consumerThread.Start(); }
}
</source>
Uses Wait and Pulse to enable producer and consumer threads to cooperate in using a buffer
<source lang="csharp"> /* Revised from code in "Computing with C# and the .NET Framework"
- Paperback: 753 pages
- Publisher: Jones and Bartlett Publishers, Inc.; Bk&CD-Rom edition (February 2003)
- Language: English
- ISBN: 0763723398
- Product Dimensions: 8.9 x 7.7 x 1.1 inches
- /
using System; using System.Runtime.rupilerServices; using System.Threading;
class Buffer { public const int size = 3; int[] buffer = new int [size]; int putpos=0; int getpos=0; int number=0; [MethodImpl(MethodImplOptions.Synchronized)] public void Put(int value) { if (number == size) { Console.WriteLine("Cannot put -- Buffer full"); Monitor.Wait(this); } number++; buffer[putpos] = value; Console.WriteLine("Put "+value); putpos = (putpos + 1) % size; if (number == 1) Monitor.Pulse(this); } [MethodImpl(MethodImplOptions.Synchronized)] public int Get() { if (number == 0) { Console.WriteLine("Buffer empty"); Monitor.Wait(this); } number--; int n = buffer[getpos]; Console.WriteLine("Get "+n); getpos = (getpos + 1) % size; if (number == size - 1) Monitor.Pulse(this); return n; } } class Producer { Buffer buf; public Producer(Buffer b) { buf = b; Thread thread = new Thread(new ThreadStart(Run)); thread.Start(); } public void Run() { for(; true; ) { buf.Put(0); } } } class Consumer { Buffer buf; public Consumer(Buffer b) { buf = b; Thread thread = new Thread(new ThreadStart(Run)); thread.Start(); } public void Run() { for (; true;) { buf.Get(); } } }
public class PutGet {
public static void Main(String[] args) { Buffer b = new Buffer(); Producer p = new Producer(b); Consumer c = new Consumer(b); }
}
</source>