Csharp/C Sharp/Thread/Producer Consumer

Материал из .Net Framework эксперт
Перейти к: навигация, поиск

Producer and comsumer in a synchronized buffer

/*
Code revised from Book published by 
(C) Copyright 1992-2006 by Deitel & Associates, Inc. and
Pearson Education, Inc. All Rights Reserved.  
*/

using System;
using System.Threading;
public class SynchronizedBuffer
{
   private int buffer = -1; 
   private int occupiedBufferCount = 0;  
   public int Buffer
   {      
      get
      { 
         Monitor.Enter( this );
         if ( occupiedBufferCount == 0 )
         {
            Console.WriteLine(Thread.CurrentThread.Name + " tries to read." );
            DisplayState( "Buffer empty. " +Thread.CurrentThread.Name + " waits." );
            Monitor.Wait( this );
         } 
         --occupiedBufferCount;    
                              
         DisplayState( Thread.CurrentThread.Name + " reads " + buffer );
         Monitor.Pulse( this );
         int bufferCopy = buffer;
         Monitor.Exit( this );
         return bufferCopy;
      }
      set
      {
         Monitor.Enter( this );
         if ( occupiedBufferCount == 1 )
         {
            Console.WriteLine(Thread.CurrentThread.Name + " tries to write." );
            DisplayState( "Buffer full. " + Thread.CurrentThread.Name + " waits." );
            Monitor.Wait( this );
         }
         buffer = value;
         ++occupiedBufferCount;
         DisplayState( Thread.CurrentThread.Name + " writes " + buffer );
         Monitor.Pulse( this );
         Monitor.Exit( this );
      } 
   }
   public void DisplayState( string operation )
   {
      Console.WriteLine( "{0,-35}{1,-9}{2}\n",operation, buffer, occupiedBufferCount );
   }
   static void Main( string[] args )
   {
      SynchronizedBuffer shared = new SynchronizedBuffer();
      Random random = new Random();
      Console.WriteLine( "{0,-35}{1,-9}{2}\n","Operation", "Buffer", "Occupied Count" );
      shared.DisplayState( "Initial state" );
      Producer producer = new Producer( shared, random );
      Consumer consumer = new Consumer( shared, random );
      Thread producerThread = new Thread( new ThreadStart( producer.Produce ) );
      producerThread.Name = "Producer";
      Thread consumerThread = new Thread( new ThreadStart( consumer.Consume ) );
      consumerThread.Name = "Consumer";
      producerThread.Start();
      consumerThread.Start();
   }
}
public class Consumer
{
   private SynchronizedBuffer sharedLocation;
   private Random randomSleepTime;
   public Consumer( SynchronizedBuffer shared, Random random )
   {
      sharedLocation = shared;
      randomSleepTime = random;
   }
   public void Consume()
   {
      int sum = 0;
      for ( int count = 1; count <= 10; count++ )
      {
         Thread.Sleep( randomSleepTime.Next( 1, 1001 ) );
         sum += sharedLocation.Buffer;
      }
      Console.WriteLine("{0} read values totaling: {1}.\nTerminating {0}.",Thread.CurrentThread.Name, sum );
   }
}
public class Producer 
{
   private SynchronizedBuffer sharedLocation;
   private Random randomSleepTime;
   public Producer( SynchronizedBuffer shared, Random random )
   {
      sharedLocation = shared;
      randomSleepTime = random;
   }
   public void Produce()
   {
      for ( int count = 1; count <= 10; count++ ) 
      {
         Thread.Sleep( randomSleepTime.Next( 1, 1001 ) );
         sharedLocation.Buffer = count; 
      }
      Console.WriteLine( "{0} done producing.\nTerminating {0}.",Thread.CurrentThread.Name );
   }
}


Producer and consumer with a Circular Buffer

/*
Code revised from Book published by 
(C) Copyright 1992-2006 by Deitel & Associates, Inc. and
Pearson Education, Inc. All Rights Reserved.  
*/
using System;
using System.Threading;
public class Producer 
{
   private CircularBuffer sharedLocation;
   private Random randomSleepTime;
   public Producer( CircularBuffer shared, Random random )
   {
      sharedLocation = shared;
      randomSleepTime = random;
   }
   public void Produce()
   {
      for ( int count = 1; count <= 10; count++ ) 
      {
         Thread.Sleep( randomSleepTime.Next( 1, 3001 ) );
         sharedLocation.Buffer = count; 
      } 
      Console.WriteLine( "{0} done producing.\nTerminating {0}.", Thread.CurrentThread.Name );
   }
}
public class Consumer
{
   private CircularBuffer sharedLocation;
   private Random randomSleepTime;
   public Consumer( CircularBuffer shared, Random random )
   {
      sharedLocation = shared;
      randomSleepTime = random;
   }
   public void Consume()
   {
      int sum = 0;
      for ( int count = 1; count <= 10; count++ )
      {
         Thread.Sleep( randomSleepTime.Next( 1, 3001 ) );
         sum += sharedLocation.Buffer;
      }
      Console.WriteLine("{0} read values totaling: {1}.\nTerminating {0}.",
         Thread.CurrentThread.Name, sum );
   }
}

public class CircularBuffer
{
   private int[] buffers = { -1, -1, -1 };
   private int occupiedBufferCount = 0;
   private int readLocation = 0; 
   private int writeLocation = 0;
   
   public int Buffer
   {
      get
      {
         lock ( this )
         {
            if ( occupiedBufferCount == 0 )
            {
               Console.Write( "\nAll buffers empty. {0} waits.",Thread.CurrentThread.Name );
               Monitor.Wait( this );
            } 
            int readValue = buffers[ readLocation ];
            Console.Write( "\n{0} reads {1} ",Thread.CurrentThread.Name, buffers[ readLocation ] );
            --occupiedBufferCount;
            readLocation = ( readLocation + 1 ) % buffers.Length;
            Console.Write( CreateStateOutput() );
            Monitor.Pulse( this );
            return readValue;
         }
      }
      set
      {
         lock ( this )
         {
            if ( occupiedBufferCount == buffers.Length )
            {
               Console.Write( "\nAll buffers full. {0} waits.",Thread.CurrentThread.Name );
               Monitor.Wait( this );
            }
            buffers[ writeLocation ] = value;
            Console.Write( "\n{0} writes {1} ",Thread.CurrentThread.Name, buffers[ writeLocation ] );
            ++occupiedBufferCount;
            writeLocation = ( writeLocation + 1 ) % buffers.Length;
            Console.Write( CreateStateOutput() );
            Monitor.Pulse( this );
         }
      }
   }
   public string CreateStateOutput()
   {
      string output = "(buffers occupied: " + occupiedBufferCount + ")\nbuffers: ";
      for ( int i = 0; i < buffers.Length; i++ )
         output += " " + string.Format( "{0,2}", buffers[ i ] ) + "  ";
      output += "\n";
      output += "         ";
      for ( int i = 0; i < buffers.Length; i++ )
         output += "---- ";
      output += "\n";
      output += "         ";
      for ( int i = 0; i < buffers.Length; i++ ) 
      {
         if ( i == writeLocation && 
            writeLocation == readLocation ) 
            output += " WR  ";
         else if ( i == writeLocation )
            output += " W   ";
         else if  ( i == readLocation ) 
            output += "  R  ";
         else
            output += "     ";
      }
      output += "\n";
      return output;
   }
   static void Main( string[] args )
   {
      CircularBuffer shared = new CircularBuffer();
      Random random = new Random();
      Console.Write( shared.CreateStateOutput() );
      Producer producer = new Producer( shared, random );
      Consumer consumer = new Consumer( shared, random );
      Thread producerThread = new Thread( new ThreadStart( producer.Produce ) );
      producerThread.Name = "Producer";
      Thread consumerThread = new Thread( new ThreadStart( consumer.Consume ) );
      consumerThread.Name = "Consumer";
      producerThread.Start();
      consumerThread.Start();
   }    
}


Uses Wait and Pulse to enable producer and consumer threads to cooperate in using a buffer

/*
Revised from code in "Computing with C# and the .NET Framework"
# Paperback: 753 pages
# Publisher: Jones and Bartlett Publishers, Inc.; Bk&CD-Rom edition (February 2003)
# Language: English
# ISBN: 0763723398
# Product Dimensions: 8.9 x 7.7 x 1.1 inches
*/

using System;
using System.Runtime.rupilerServices;
using System.Threading;  
  class Buffer {
    public const int size = 3;
    int[] buffer = new int [size];             
    int putpos=0;
    int getpos=0;
    int number=0;
    [MethodImpl(MethodImplOptions.Synchronized)]
    public void Put(int value) {
      if (number == size) {
        Console.WriteLine("Cannot put -- Buffer full");
        Monitor.Wait(this);
      }
      number++;
      buffer[putpos] = value;
      Console.WriteLine("Put "+value);
      putpos = (putpos + 1) % size;
      if (number == 1) Monitor.Pulse(this);         
    }
    [MethodImpl(MethodImplOptions.Synchronized)]
    public int Get() {
      if (number == 0) {
         Console.WriteLine("Buffer empty");
         Monitor.Wait(this);
      }
      number--;
      int n = buffer[getpos];
      Console.WriteLine("Get "+n);
      getpos = (getpos + 1) % size;
      if (number == size - 1) Monitor.Pulse(this);    
      return n;
    }
  }
  class Producer {
     Buffer buf;
     public Producer(Buffer b) { 
        buf = b;
        Thread thread = new Thread(new ThreadStart(Run));
        thread.Start();  
     }      
     public void Run() {
       for(; true; ) {
             buf.Put(0);
       }
     }
  }
  class Consumer {
     Buffer buf;
     public Consumer(Buffer b) { 
       buf = b;
       Thread thread = new Thread(new ThreadStart(Run));
       thread.Start();  
     }
     public void Run() {
       for (; true;) {
           buf.Get();
       }
     }
  }
public class PutGet {
  public static void Main(String[] args) {
    Buffer b = new Buffer();
    Producer p = new Producer(b);
    Consumer c = new Consumer(b);
  }
}