Csharp/C Sharp/Thread/Producer Consumer

Материал из .Net Framework эксперт
Перейти к: навигация, поиск

Producer and comsumer in a synchronized buffer

<source lang="csharp"> /* Code revised from Book published by (C) Copyright 1992-2006 by Deitel & Associates, Inc. and Pearson Education, Inc. All Rights Reserved.

  • /

using System; using System.Threading; public class SynchronizedBuffer {

  private int buffer = -1; 
  private int occupiedBufferCount = 0;  
  public int Buffer
  {      
     get
     { 
        Monitor.Enter( this );
        if ( occupiedBufferCount == 0 )
        {
           Console.WriteLine(Thread.CurrentThread.Name + " tries to read." );
           DisplayState( "Buffer empty. " +Thread.CurrentThread.Name + " waits." );
           Monitor.Wait( this );
        } 
        --occupiedBufferCount;    
                             
        DisplayState( Thread.CurrentThread.Name + " reads " + buffer );
        Monitor.Pulse( this );
        int bufferCopy = buffer;
        Monitor.Exit( this );
        return bufferCopy;
     }
     set
     {
        Monitor.Enter( this );
        if ( occupiedBufferCount == 1 )
        {
           Console.WriteLine(Thread.CurrentThread.Name + " tries to write." );
           DisplayState( "Buffer full. " + Thread.CurrentThread.Name + " waits." );
           Monitor.Wait( this );
        }
        buffer = value;
        ++occupiedBufferCount;
        DisplayState( Thread.CurrentThread.Name + " writes " + buffer );
        Monitor.Pulse( this );
        Monitor.Exit( this );
     } 
  }
  public void DisplayState( string operation )
  {
     Console.WriteLine( "{0,-35}{1,-9}{2}\n",operation, buffer, occupiedBufferCount );
  }
  static void Main( string[] args )
  {
     SynchronizedBuffer shared = new SynchronizedBuffer();
     Random random = new Random();
     Console.WriteLine( "{0,-35}{1,-9}{2}\n","Operation", "Buffer", "Occupied Count" );
     shared.DisplayState( "Initial state" );
     Producer producer = new Producer( shared, random );
     Consumer consumer = new Consumer( shared, random );
     Thread producerThread = new Thread( new ThreadStart( producer.Produce ) );
     producerThread.Name = "Producer";
     Thread consumerThread = new Thread( new ThreadStart( consumer.Consume ) );
     consumerThread.Name = "Consumer";
     producerThread.Start();
     consumerThread.Start();
  }

} public class Consumer {

  private SynchronizedBuffer sharedLocation;
  private Random randomSleepTime;
  public Consumer( SynchronizedBuffer shared, Random random )
  {
     sharedLocation = shared;
     randomSleepTime = random;
  }
  public void Consume()
  {
     int sum = 0;
     for ( int count = 1; count <= 10; count++ )
     {
        Thread.Sleep( randomSleepTime.Next( 1, 1001 ) );
        sum += sharedLocation.Buffer;
     }
     Console.WriteLine("{0} read values totaling: {1}.\nTerminating {0}.",Thread.CurrentThread.Name, sum );
  }

} public class Producer {

  private SynchronizedBuffer sharedLocation;
  private Random randomSleepTime;
  public Producer( SynchronizedBuffer shared, Random random )
  {
     sharedLocation = shared;
     randomSleepTime = random;
  }
  public void Produce()
  {
     for ( int count = 1; count <= 10; count++ ) 
     {
        Thread.Sleep( randomSleepTime.Next( 1, 1001 ) );
        sharedLocation.Buffer = count; 
     }
     Console.WriteLine( "{0} done producing.\nTerminating {0}.",Thread.CurrentThread.Name );
  }

}


      </source>


Producer and consumer with a Circular Buffer

<source lang="csharp"> /* Code revised from Book published by (C) Copyright 1992-2006 by Deitel & Associates, Inc. and Pearson Education, Inc. All Rights Reserved.

  • /

using System; using System.Threading; public class Producer {

  private CircularBuffer sharedLocation;
  private Random randomSleepTime;
  public Producer( CircularBuffer shared, Random random )
  {
     sharedLocation = shared;
     randomSleepTime = random;
  }
  public void Produce()
  {
     for ( int count = 1; count <= 10; count++ ) 
     {
        Thread.Sleep( randomSleepTime.Next( 1, 3001 ) );
        sharedLocation.Buffer = count; 
     } 
     Console.WriteLine( "{0} done producing.\nTerminating {0}.", Thread.CurrentThread.Name );
  }

} public class Consumer {

  private CircularBuffer sharedLocation;
  private Random randomSleepTime;
  public Consumer( CircularBuffer shared, Random random )
  {
     sharedLocation = shared;
     randomSleepTime = random;
  }
  public void Consume()
  {
     int sum = 0;
     for ( int count = 1; count <= 10; count++ )
     {
        Thread.Sleep( randomSleepTime.Next( 1, 3001 ) );
        sum += sharedLocation.Buffer;
     }
     Console.WriteLine("{0} read values totaling: {1}.\nTerminating {0}.",
        Thread.CurrentThread.Name, sum );
  }

}

public class CircularBuffer {

  private int[] buffers = { -1, -1, -1 };
  private int occupiedBufferCount = 0;
  private int readLocation = 0; 
  private int writeLocation = 0;
  
  public int Buffer
  {
     get
     {
        lock ( this )
        {
           if ( occupiedBufferCount == 0 )
           {
              Console.Write( "\nAll buffers empty. {0} waits.",Thread.CurrentThread.Name );
              Monitor.Wait( this );
           } 
           int readValue = buffers[ readLocation ];
           Console.Write( "\n{0} reads {1} ",Thread.CurrentThread.Name, buffers[ readLocation ] );
           --occupiedBufferCount;
           readLocation = ( readLocation + 1 ) % buffers.Length;
           Console.Write( CreateStateOutput() );
           Monitor.Pulse( this );
           return readValue;
        }
     }
     set
     {
        lock ( this )
        {
           if ( occupiedBufferCount == buffers.Length )
           {
              Console.Write( "\nAll buffers full. {0} waits.",Thread.CurrentThread.Name );
              Monitor.Wait( this );
           }
           buffers[ writeLocation ] = value;
           Console.Write( "\n{0} writes {1} ",Thread.CurrentThread.Name, buffers[ writeLocation ] );
           ++occupiedBufferCount;
           writeLocation = ( writeLocation + 1 ) % buffers.Length;
           Console.Write( CreateStateOutput() );
           Monitor.Pulse( this );
        }
     }
  }
  public string CreateStateOutput()
  {
     string output = "(buffers occupied: " + occupiedBufferCount + ")\nbuffers: ";
     for ( int i = 0; i < buffers.Length; i++ )
        output += " " + string.Format( "{0,2}", buffers[ i ] ) + "  ";
     output += "\n";
     output += "         ";
     for ( int i = 0; i < buffers.Length; i++ )
        output += "---- ";
     output += "\n";
     output += "         ";
     for ( int i = 0; i < buffers.Length; i++ ) 
     {
        if ( i == writeLocation && 
           writeLocation == readLocation ) 
           output += " WR  ";
        else if ( i == writeLocation )
           output += " W   ";
        else if  ( i == readLocation ) 
           output += "  R  ";
        else
           output += "     ";
     }
     output += "\n";
     return output;
  }
  static void Main( string[] args )
  {
     CircularBuffer shared = new CircularBuffer();
     Random random = new Random();
     Console.Write( shared.CreateStateOutput() );
     Producer producer = new Producer( shared, random );
     Consumer consumer = new Consumer( shared, random );
     Thread producerThread = new Thread( new ThreadStart( producer.Produce ) );
     producerThread.Name = "Producer";
     Thread consumerThread = new Thread( new ThreadStart( consumer.Consume ) );
     consumerThread.Name = "Consumer";
     producerThread.Start();
     consumerThread.Start();
  }    

}

      </source>


Uses Wait and Pulse to enable producer and consumer threads to cooperate in using a buffer

<source lang="csharp"> /* Revised from code in "Computing with C# and the .NET Framework"

  1. Paperback: 753 pages
  2. Publisher: Jones and Bartlett Publishers, Inc.; Bk&CD-Rom edition (February 2003)
  3. Language: English
  4. ISBN: 0763723398
  5. Product Dimensions: 8.9 x 7.7 x 1.1 inches
  • /

using System; using System.Runtime.rupilerServices; using System.Threading;

 class Buffer {
   public const int size = 3;
   int[] buffer = new int [size];             
   int putpos=0;
   int getpos=0;
   int number=0;
   [MethodImpl(MethodImplOptions.Synchronized)]
   public void Put(int value) {
     if (number == size) {
       Console.WriteLine("Cannot put -- Buffer full");
       Monitor.Wait(this);
     }
     number++;
     buffer[putpos] = value;
     Console.WriteLine("Put "+value);
     putpos = (putpos + 1) % size;
     if (number == 1) Monitor.Pulse(this);         
   }
   [MethodImpl(MethodImplOptions.Synchronized)]
   public int Get() {
     if (number == 0) {
        Console.WriteLine("Buffer empty");
        Monitor.Wait(this);
     }
     number--;
     int n = buffer[getpos];
     Console.WriteLine("Get "+n);
     getpos = (getpos + 1) % size;
     if (number == size - 1) Monitor.Pulse(this);    
     return n;
   }
 }
 class Producer {
    Buffer buf;
    public Producer(Buffer b) { 
       buf = b;
       Thread thread = new Thread(new ThreadStart(Run));
       thread.Start();  
    }      
    public void Run() {
      for(; true; ) {
            buf.Put(0);
      }
    }
 }
 class Consumer {
    Buffer buf;
    public Consumer(Buffer b) { 
      buf = b;
      Thread thread = new Thread(new ThreadStart(Run));
      thread.Start();  
    }
    public void Run() {
      for (; true;) {
          buf.Get();
      }
    }
 }

public class PutGet {

 public static void Main(String[] args) {
   Buffer b = new Buffer();
   Producer p = new Producer(b);
   Consumer c = new Consumer(b);
 }

}

      </source>