//#define Trace // ParallelDeflateOutputStream.cs // ------------------------------------------------------------------ // // A DeflateStream that does compression only, it uses a // divide-and-conquer approach with multiple threads to exploit multiple // CPUs for the DEFLATE computation. // // last saved: <2011-July-31 14:49:40> // // ------------------------------------------------------------------ // // Copyright (c) 2009-2011 by Dino Chiesa // All rights reserved! // // This code module is part of DotNetZip, a zipfile class library. // // ------------------------------------------------------------------ // // This code is licensed under the Microsoft Public License. // See the file License.txt for the license details. // More info on: http://dotnetzip.codeplex.com // // ------------------------------------------------------------------ using System; using System.Collections.Generic; using System.Threading; using Ionic.Zlib; using System.IO; namespace Ionic.Zlib { internal class WorkItem { public byte[] buffer; public byte[] compressed; public int crc; public int index; public int ordinal; public int inputBytesAvailable; public int compressedBytesAvailable; public ZlibCodec compressor; public WorkItem(int size, Ionic.Zlib.CompressionLevel compressLevel, CompressionStrategy strategy, int ix) { this.buffer= new byte[size]; // alloc 5 bytes overhead for every block (margin of safety= 2) int n = size + ((size / 32768)+1) * 5 * 2; this.compressed = new byte[n]; this.compressor = new ZlibCodec(); this.compressor.InitializeDeflate(compressLevel, false); this.compressor.OutputBuffer = this.compressed; this.compressor.InputBuffer = this.buffer; this.index = ix; } } /// /// A class for compressing streams using the /// Deflate algorithm with multiple threads. /// /// /// /// /// This class performs DEFLATE compression through writing. For /// more information on the Deflate algorithm, see IETF RFC 1951, /// "DEFLATE Compressed Data Format Specification version 1.3." /// /// /// /// This class is similar to , except /// that this class is for compression only, and this implementation uses an /// approach that employs multiple worker threads to perform the DEFLATE. On /// a multi-cpu or multi-core computer, the performance of this class can be /// significantly higher than the single-threaded DeflateStream, particularly /// for larger streams. How large? Anything over 10mb is a good candidate /// for parallel compression. /// /// /// /// The tradeoff is that this class uses more memory and more CPU than the /// vanilla DeflateStream, and also is less efficient as a compressor. For /// large files the size of the compressed data stream can be less than 1% /// larger than the size of a compressed data stream from the vanialla /// DeflateStream. For smaller files the difference can be larger. The /// difference will also be larger if you set the BufferSize to be lower than /// the default value. Your mileage may vary. Finally, for small files, the /// ParallelDeflateOutputStream can be much slower than the vanilla /// DeflateStream, because of the overhead associated to using the thread /// pool. /// /// /// /// public class ParallelDeflateOutputStream : System.IO.Stream { private static readonly int IO_BUFFER_SIZE_DEFAULT = 64 * 1024; // 128k private static readonly int BufferPairsPerCore = 4; private System.Collections.Generic.List _pool; private bool _leaveOpen; private bool emitting; private System.IO.Stream _outStream; private int _maxBufferPairs; private int _bufferSize = IO_BUFFER_SIZE_DEFAULT; private AutoResetEvent _newlyCompressedBlob; //private ManualResetEvent _writingDone; //private ManualResetEvent _sessionReset; private object _outputLock = new object(); private bool _isClosed; private bool _firstWriteDone; private int _currentlyFilling; private int _lastFilled; private int _lastWritten; private int _latestCompressed; private int _Crc32; private Ionic.Crc.CRC32 _runningCrc; private object _latestLock = new object(); private System.Collections.Generic.Queue _toWrite; private System.Collections.Generic.Queue _toFill; private Int64 _totalBytesProcessed; private Ionic.Zlib.CompressionLevel _compressLevel; private volatile Exception _pendingException; private bool _handlingException; private object _eLock = new Object(); // protects _pendingException // This bitfield is used only when Trace is defined. //private TraceBits _DesiredTrace = TraceBits.Write | TraceBits.WriteBegin | //TraceBits.WriteDone | TraceBits.Lifecycle | TraceBits.Fill | TraceBits.Flush | //TraceBits.Session; //private TraceBits _DesiredTrace = TraceBits.WriteBegin | TraceBits.WriteDone | TraceBits.Synch | TraceBits.Lifecycle | TraceBits.Session ; private TraceBits _DesiredTrace = TraceBits.Session | TraceBits.Compress | TraceBits.WriteTake | TraceBits.WriteEnter | TraceBits.EmitEnter | TraceBits.EmitDone | TraceBits.EmitLock | TraceBits.EmitSkip | TraceBits.EmitBegin; /// /// Create a ParallelDeflateOutputStream. /// /// /// /// /// This stream compresses data written into it via the DEFLATE /// algorithm (see RFC 1951), and writes out the compressed byte stream. /// /// /// /// The instance will use the default compression level, the default /// buffer sizes and the default number of threads and buffers per /// thread. /// /// /// /// This class is similar to , /// except that this implementation uses an approach that employs /// multiple worker threads to perform the DEFLATE. On a multi-cpu or /// multi-core computer, the performance of this class can be /// significantly higher than the single-threaded DeflateStream, /// particularly for larger streams. How large? Anything over 10mb is /// a good candidate for parallel compression. /// /// /// /// /// /// /// This example shows how to use a ParallelDeflateOutputStream to compress /// data. It reads a file, compresses it, and writes the compressed data to /// a second, output file. /// /// /// byte[] buffer = new byte[WORKING_BUFFER_SIZE]; /// int n= -1; /// String outputFile = fileToCompress + ".compressed"; /// using (System.IO.Stream input = System.IO.File.OpenRead(fileToCompress)) /// { /// using (var raw = System.IO.File.Create(outputFile)) /// { /// using (Stream compressor = new ParallelDeflateOutputStream(raw)) /// { /// while ((n= input.Read(buffer, 0, buffer.Length)) != 0) /// { /// compressor.Write(buffer, 0, n); /// } /// } /// } /// } /// /// /// Dim buffer As Byte() = New Byte(4096) {} /// Dim n As Integer = -1 /// Dim outputFile As String = (fileToCompress & ".compressed") /// Using input As Stream = File.OpenRead(fileToCompress) /// Using raw As FileStream = File.Create(outputFile) /// Using compressor As Stream = New ParallelDeflateOutputStream(raw) /// Do While (n <> 0) /// If (n > 0) Then /// compressor.Write(buffer, 0, n) /// End If /// n = input.Read(buffer, 0, buffer.Length) /// Loop /// End Using /// End Using /// End Using /// /// /// The stream to which compressed data will be written. public ParallelDeflateOutputStream(System.IO.Stream stream) : this(stream, CompressionLevel.Default, CompressionStrategy.Default, false) { } /// /// Create a ParallelDeflateOutputStream using the specified CompressionLevel. /// /// /// See the /// constructor for example code. /// /// The stream to which compressed data will be written. /// A tuning knob to trade speed for effectiveness. public ParallelDeflateOutputStream(System.IO.Stream stream, CompressionLevel level) : this(stream, level, CompressionStrategy.Default, false) { } /// /// Create a ParallelDeflateOutputStream and specify whether to leave the captive stream open /// when the ParallelDeflateOutputStream is closed. /// /// /// See the /// constructor for example code. /// /// The stream to which compressed data will be written. /// /// true if the application would like the stream to remain open after inflation/deflation. /// public ParallelDeflateOutputStream(System.IO.Stream stream, bool leaveOpen) : this(stream, CompressionLevel.Default, CompressionStrategy.Default, leaveOpen) { } /// /// Create a ParallelDeflateOutputStream and specify whether to leave the captive stream open /// when the ParallelDeflateOutputStream is closed. /// /// /// See the /// constructor for example code. /// /// The stream to which compressed data will be written. /// A tuning knob to trade speed for effectiveness. /// /// true if the application would like the stream to remain open after inflation/deflation. /// public ParallelDeflateOutputStream(System.IO.Stream stream, CompressionLevel level, bool leaveOpen) : this(stream, CompressionLevel.Default, CompressionStrategy.Default, leaveOpen) { } /// /// Create a ParallelDeflateOutputStream using the specified /// CompressionLevel and CompressionStrategy, and specifying whether to /// leave the captive stream open when the ParallelDeflateOutputStream is /// closed. /// /// /// See the /// constructor for example code. /// /// The stream to which compressed data will be written. /// A tuning knob to trade speed for effectiveness. /// /// By tweaking this parameter, you may be able to optimize the compression for /// data with particular characteristics. /// /// /// true if the application would like the stream to remain open after inflation/deflation. /// public ParallelDeflateOutputStream(System.IO.Stream stream, CompressionLevel level, CompressionStrategy strategy, bool leaveOpen) { TraceOutput(TraceBits.Lifecycle | TraceBits.Session, "-------------------------------------------------------"); TraceOutput(TraceBits.Lifecycle | TraceBits.Session, "Create {0:X8}", this.GetHashCode()); _outStream = stream; _compressLevel= level; Strategy = strategy; _leaveOpen = leaveOpen; this.MaxBufferPairs = 16; // default } /// /// The ZLIB strategy to be used during compression. /// /// public CompressionStrategy Strategy { get; private set; } /// /// The maximum number of buffer pairs to use. /// /// /// /// /// This property sets an upper limit on the number of memory buffer /// pairs to create. The implementation of this stream allocates /// multiple buffers to facilitate parallel compression. As each buffer /// fills up, this stream uses /// ThreadPool.QueueUserWorkItem() /// to compress those buffers in a background threadpool thread. After a /// buffer is compressed, it is re-ordered and written to the output /// stream. /// /// /// /// A higher number of buffer pairs enables a higher degree of /// parallelism, which tends to increase the speed of compression on /// multi-cpu computers. On the other hand, a higher number of buffer /// pairs also implies a larger memory consumption, more active worker /// threads, and a higher cpu utilization for any compression. This /// property enables the application to limit its memory consumption and /// CPU utilization behavior depending on requirements. /// /// /// /// For each compression "task" that occurs in parallel, there are 2 /// buffers allocated: one for input and one for output. This property /// sets a limit for the number of pairs. The total amount of storage /// space allocated for buffering will then be (N*S*2), where N is the /// number of buffer pairs, S is the size of each buffer (). By default, DotNetZip allocates 4 buffer /// pairs per CPU core, so if your machine has 4 cores, and you retain /// the default buffer size of 128k, then the /// ParallelDeflateOutputStream will use 4 * 4 * 2 * 128kb of buffer /// memory in total, or 4mb, in blocks of 128kb. If you then set this /// property to 8, then the number will be 8 * 2 * 128kb of buffer /// memory, or 2mb. /// /// /// /// CPU utilization will also go up with additional buffers, because a /// larger number of buffer pairs allows a larger number of background /// threads to compress in parallel. If you find that parallel /// compression is consuming too much memory or CPU, you can adjust this /// value downward. /// /// /// /// The default value is 16. Different values may deliver better or /// worse results, depending on your priorities and the dynamic /// performance characteristics of your storage and compute resources. /// /// /// /// This property is not the number of buffer pairs to use; it is an /// upper limit. An illustration: Suppose you have an application that /// uses the default value of this property (which is 16), and it runs /// on a machine with 2 CPU cores. In that case, DotNetZip will allocate /// 4 buffer pairs per CPU core, for a total of 8 pairs. The upper /// limit specified by this property has no effect. /// /// /// /// The application can set this value at any time, but it is effective /// only before the first call to Write(), which is when the buffers are /// allocated. /// /// public int MaxBufferPairs { get { return _maxBufferPairs; } set { if (value < 4) throw new ArgumentException("MaxBufferPairs", "Value must be 4 or greater."); _maxBufferPairs = value; } } /// /// The size of the buffers used by the compressor threads. /// /// /// /// /// The default buffer size is 128k. The application can set this value /// at any time, but it is effective only before the first Write(). /// /// /// /// Larger buffer sizes implies larger memory consumption but allows /// more efficient compression. Using smaller buffer sizes consumes less /// memory but may result in less effective compression. For example, /// using the default buffer size of 128k, the compression delivered is /// within 1% of the compression delivered by the single-threaded . On the other hand, using a /// BufferSize of 8k can result in a compressed data stream that is 5% /// larger than that delivered by the single-threaded /// DeflateStream. Excessively small buffer sizes can also cause /// the speed of the ParallelDeflateOutputStream to drop, because of /// larger thread scheduling overhead dealing with many many small /// buffers. /// /// /// /// The total amount of storage space allocated for buffering will be /// (N*S*2), where N is the number of buffer pairs, and S is the size of /// each buffer (this property). There are 2 buffers used by the /// compressor, one for input and one for output. By default, DotNetZip /// allocates 4 buffer pairs per CPU core, so if your machine has 4 /// cores, then the number of buffer pairs used will be 16. If you /// accept the default value of this property, 128k, then the /// ParallelDeflateOutputStream will use 16 * 2 * 128kb of buffer memory /// in total, or 4mb, in blocks of 128kb. If you set this property to /// 64kb, then the number will be 16 * 2 * 64kb of buffer memory, or /// 2mb. /// /// /// public int BufferSize { get { return _bufferSize;} set { if (value < 1024) throw new ArgumentOutOfRangeException("BufferSize", "BufferSize must be greater than 1024 bytes"); _bufferSize = value; } } /// /// The CRC32 for the data that was written out, prior to compression. /// /// /// This value is meaningful only after a call to Close(). /// public int Crc32 { get { return _Crc32; } } /// /// The total number of uncompressed bytes processed by the ParallelDeflateOutputStream. /// /// /// This value is meaningful only after a call to Close(). /// public Int64 BytesProcessed { get { return _totalBytesProcessed; } } private void _InitializePoolOfWorkItems() { _toWrite = new Queue(); _toFill = new Queue(); _pool = new System.Collections.Generic.List(); int nTasks = BufferPairsPerCore * Environment.ProcessorCount; nTasks = Math.Min(nTasks, _maxBufferPairs); for(int i=0; i < nTasks; i++) { _pool.Add(new WorkItem(_bufferSize, _compressLevel, Strategy, i)); _toFill.Enqueue(i); } _newlyCompressedBlob = new AutoResetEvent(false); _runningCrc = new Ionic.Crc.CRC32(); _currentlyFilling = -1; _lastFilled = -1; _lastWritten = -1; _latestCompressed = -1; } /// /// Write data to the stream. /// /// /// /// /// /// To use the ParallelDeflateOutputStream to compress data, create a /// ParallelDeflateOutputStream with CompressionMode.Compress, passing a /// writable output stream. Then call Write() on that /// ParallelDeflateOutputStream, providing uncompressed data as input. The /// data sent to the output stream will be the compressed form of the data /// written. /// /// /// /// To decompress data, use the class. /// /// /// /// The buffer holding data to write to the stream. /// the offset within that data array to find the first byte to write. /// the number of bytes to write. public override void Write(byte[] buffer, int offset, int count) { bool mustWait = false; // This method does this: // 0. handles any pending exceptions // 1. write any buffers that are ready to be written, // 2. fills a work buffer; when full, flip state to 'Filled', // 3. if more data to be written, goto step 1 if (_isClosed) throw new InvalidOperationException(); // dispense any exceptions that occurred on the BG threads if (_pendingException != null) { _handlingException = true; var pe = _pendingException; _pendingException = null; throw pe; } if (count == 0) return; if (!_firstWriteDone) { // Want to do this on first Write, first session, and not in the // constructor. We want to allow MaxBufferPairs to // change after construction, but before first Write. _InitializePoolOfWorkItems(); _firstWriteDone = true; } do { // may need to make buffers available EmitPendingBuffers(false, mustWait); mustWait = false; // use current buffer, or get a new buffer to fill int ix = -1; if (_currentlyFilling >= 0) { ix = _currentlyFilling; TraceOutput(TraceBits.WriteTake, "Write notake wi({0}) lf({1})", ix, _lastFilled); } else { TraceOutput(TraceBits.WriteTake, "Write take?"); if (_toFill.Count == 0) { // no available buffers, so... need to emit // compressed buffers. mustWait = true; continue; } ix = _toFill.Dequeue(); TraceOutput(TraceBits.WriteTake, "Write take wi({0}) lf({1})", ix, _lastFilled); ++_lastFilled; // TODO: consider rollover? } WorkItem workitem = _pool[ix]; int limit = ((workitem.buffer.Length - workitem.inputBytesAvailable) > count) ? count : (workitem.buffer.Length - workitem.inputBytesAvailable); workitem.ordinal = _lastFilled; TraceOutput(TraceBits.Write, "Write lock wi({0}) ord({1}) iba({2})", workitem.index, workitem.ordinal, workitem.inputBytesAvailable ); // copy from the provided buffer to our workitem, starting at // the tail end of whatever data we might have in there currently. Buffer.BlockCopy(buffer, offset, workitem.buffer, workitem.inputBytesAvailable, limit); count -= limit; offset += limit; workitem.inputBytesAvailable += limit; if (workitem.inputBytesAvailable == workitem.buffer.Length) { // No need for interlocked.increment: the Write() // method is documented as not multi-thread safe, so // we can assume Write() calls come in from only one // thread. TraceOutput(TraceBits.Write, "Write QUWI wi({0}) ord({1}) iba({2}) nf({3})", workitem.index, workitem.ordinal, workitem.inputBytesAvailable ); if (!ThreadPool.QueueUserWorkItem( _DeflateOne, workitem )) throw new Exception("Cannot enqueue workitem"); _currentlyFilling = -1; // will get a new buffer next time } else _currentlyFilling = ix; if (count > 0) TraceOutput(TraceBits.WriteEnter, "Write more"); } while (count > 0); // until no more to write TraceOutput(TraceBits.WriteEnter, "Write exit"); return; } private void _FlushFinish() { // After writing a series of compressed buffers, each one closed // with Flush.Sync, we now write the final one as Flush.Finish, // and then stop. byte[] buffer = new byte[128]; var compressor = new ZlibCodec(); int rc = compressor.InitializeDeflate(_compressLevel, false); compressor.InputBuffer = null; compressor.NextIn = 0; compressor.AvailableBytesIn = 0; compressor.OutputBuffer = buffer; compressor.NextOut = 0; compressor.AvailableBytesOut = buffer.Length; rc = compressor.Deflate(FlushType.Finish); if (rc != ZlibConstants.Z_STREAM_END && rc != ZlibConstants.Z_OK) throw new Exception("deflating: " + compressor.Message); if (buffer.Length - compressor.AvailableBytesOut > 0) { TraceOutput(TraceBits.EmitBegin, "Emit begin flush bytes({0})", buffer.Length - compressor.AvailableBytesOut); _outStream.Write(buffer, 0, buffer.Length - compressor.AvailableBytesOut); TraceOutput(TraceBits.EmitDone, "Emit done flush"); } compressor.EndDeflate(); _Crc32 = _runningCrc.Crc32Result; } private void _Flush(bool lastInput) { if (_isClosed) throw new InvalidOperationException(); if (emitting) return; // compress any partial buffer if (_currentlyFilling >= 0) { WorkItem workitem = _pool[_currentlyFilling]; _DeflateOne(workitem); _currentlyFilling = -1; // get a new buffer next Write() } if (lastInput) { EmitPendingBuffers(true, false); _FlushFinish(); } else { EmitPendingBuffers(false, false); } } /// /// Flush the stream. /// public override void Flush() { if (_pendingException != null) { _handlingException = true; var pe = _pendingException; _pendingException = null; throw pe; } if (_handlingException) return; _Flush(false); } /// /// Close the stream. /// /// /// You must call Close on the stream to guarantee that all of the data written in has /// been compressed, and the compressed data has been written out. /// public override void Close() { TraceOutput(TraceBits.Session, "Close {0:X8}", this.GetHashCode()); if (_pendingException != null) { _handlingException = true; var pe = _pendingException; _pendingException = null; throw pe; } if (_handlingException) return; if (_isClosed) return; _Flush(true); if (!_leaveOpen) _outStream.Close(); _isClosed= true; } // workitem 10030 - implement a new Dispose method /// Dispose the object /// /// /// Because ParallelDeflateOutputStream is IDisposable, the /// application must call this method when finished using the instance. /// /// /// This method is generally called implicitly upon exit from /// a using scope in C# (Using in VB). /// /// new public void Dispose() { TraceOutput(TraceBits.Lifecycle, "Dispose {0:X8}", this.GetHashCode()); Close(); _pool = null; Dispose(true); } /// The Dispose method /// /// indicates whether the Dispose method was invoked by user code. /// protected override void Dispose(bool disposing) { base.Dispose(disposing); } /// /// Resets the stream for use with another stream. /// /// /// Because the ParallelDeflateOutputStream is expensive to create, it /// has been designed so that it can be recycled and re-used. You have /// to call Close() on the stream first, then you can call Reset() on /// it, to use it again on another stream. /// /// /// /// The new output stream for this era. /// /// /// /// /// ParallelDeflateOutputStream deflater = null; /// foreach (var inputFile in listOfFiles) /// { /// string outputFile = inputFile + ".compressed"; /// using (System.IO.Stream input = System.IO.File.OpenRead(inputFile)) /// { /// using (var outStream = System.IO.File.Create(outputFile)) /// { /// if (deflater == null) /// deflater = new ParallelDeflateOutputStream(outStream, /// CompressionLevel.Best, /// CompressionStrategy.Default, /// true); /// deflater.Reset(outStream); /// /// while ((n= input.Read(buffer, 0, buffer.Length)) != 0) /// { /// deflater.Write(buffer, 0, n); /// } /// } /// } /// } /// /// public void Reset(Stream stream) { TraceOutput(TraceBits.Session, "-------------------------------------------------------"); TraceOutput(TraceBits.Session, "Reset {0:X8} firstDone({1})", this.GetHashCode(), _firstWriteDone); if (!_firstWriteDone) return; // reset all status _toWrite.Clear(); _toFill.Clear(); foreach (var workitem in _pool) { _toFill.Enqueue(workitem.index); workitem.ordinal = -1; } _firstWriteDone = false; _totalBytesProcessed = 0L; _runningCrc = new Ionic.Crc.CRC32(); _isClosed= false; _currentlyFilling = -1; _lastFilled = -1; _lastWritten = -1; _latestCompressed = -1; _outStream = stream; } private void EmitPendingBuffers(bool doAll, bool mustWait) { // When combining parallel deflation with a ZipSegmentedStream, it's // possible for the ZSS to throw from within this method. In that // case, Close/Dispose will be called on this stream, if this stream // is employed within a using or try/finally pair as required. But // this stream is unaware of the pending exception, so the Close() // method invokes this method AGAIN. This can lead to a deadlock. // Therefore, failfast if re-entering. if (emitting) return; emitting = true; if (doAll || mustWait) _newlyCompressedBlob.WaitOne(); do { int firstSkip = -1; int millisecondsToWait = doAll ? 200 : (mustWait ? -1 : 0); int nextToWrite = -1; do { if (Monitor.TryEnter(_toWrite, millisecondsToWait)) { nextToWrite = -1; try { if (_toWrite.Count > 0) nextToWrite = _toWrite.Dequeue(); } finally { Monitor.Exit(_toWrite); } if (nextToWrite >= 0) { WorkItem workitem = _pool[nextToWrite]; if (workitem.ordinal != _lastWritten + 1) { // out of order. requeue and try again. TraceOutput(TraceBits.EmitSkip, "Emit skip wi({0}) ord({1}) lw({2}) fs({3})", workitem.index, workitem.ordinal, _lastWritten, firstSkip); lock(_toWrite) { _toWrite.Enqueue(nextToWrite); } if (firstSkip == nextToWrite) { // We went around the list once. // None of the items in the list is the one we want. // Now wait for a compressor to signal again. _newlyCompressedBlob.WaitOne(); firstSkip = -1; } else if (firstSkip == -1) firstSkip = nextToWrite; continue; } firstSkip = -1; TraceOutput(TraceBits.EmitBegin, "Emit begin wi({0}) ord({1}) cba({2})", workitem.index, workitem.ordinal, workitem.compressedBytesAvailable); _outStream.Write(workitem.compressed, 0, workitem.compressedBytesAvailable); _runningCrc.Combine(workitem.crc, workitem.inputBytesAvailable); _totalBytesProcessed += workitem.inputBytesAvailable; workitem.inputBytesAvailable = 0; TraceOutput(TraceBits.EmitDone, "Emit done wi({0}) ord({1}) cba({2}) mtw({3})", workitem.index, workitem.ordinal, workitem.compressedBytesAvailable, millisecondsToWait); _lastWritten = workitem.ordinal; _toFill.Enqueue(workitem.index); // don't wait next time through if (millisecondsToWait == -1) millisecondsToWait = 0; } } else nextToWrite = -1; } while (nextToWrite >= 0); } while (doAll && (_lastWritten != _latestCompressed)); emitting = false; } #if OLD private void _PerpetualWriterMethod(object state) { TraceOutput(TraceBits.WriterThread, "_PerpetualWriterMethod START"); try { do { // wait for the next session TraceOutput(TraceBits.Synch | TraceBits.WriterThread, "Synch _sessionReset.WaitOne(begin) PWM"); _sessionReset.WaitOne(); TraceOutput(TraceBits.Synch | TraceBits.WriterThread, "Synch _sessionReset.WaitOne(done) PWM"); if (_isDisposed) break; TraceOutput(TraceBits.Synch | TraceBits.WriterThread, "Synch _sessionReset.Reset() PWM"); _sessionReset.Reset(); // repeatedly write buffers as they become ready WorkItem workitem = null; Ionic.Zlib.CRC32 c= new Ionic.Zlib.CRC32(); do { workitem = _pool[_nextToWrite % _pc]; lock(workitem) { if (_noMoreInputForThisSegment) TraceOutput(TraceBits.Write, "Write drain wi({0}) stat({1}) canuse({2}) cba({3})", workitem.index, workitem.status, (workitem.status == (int)WorkItem.Status.Compressed), workitem.compressedBytesAvailable); do { if (workitem.status == (int)WorkItem.Status.Compressed) { TraceOutput(TraceBits.WriteBegin, "Write begin wi({0}) stat({1}) cba({2})", workitem.index, workitem.status, workitem.compressedBytesAvailable); workitem.status = (int)WorkItem.Status.Writing; _outStream.Write(workitem.compressed, 0, workitem.compressedBytesAvailable); c.Combine(workitem.crc, workitem.inputBytesAvailable); _totalBytesProcessed += workitem.inputBytesAvailable; _nextToWrite++; workitem.inputBytesAvailable= 0; workitem.status = (int)WorkItem.Status.Done; TraceOutput(TraceBits.WriteDone, "Write done wi({0}) stat({1}) cba({2})", workitem.index, workitem.status, workitem.compressedBytesAvailable); Monitor.Pulse(workitem); break; } else { int wcycles = 0; // I've locked a workitem I cannot use. // Therefore, wake someone else up, and then release the lock. while (workitem.status != (int)WorkItem.Status.Compressed) { TraceOutput(TraceBits.WriteWait, "Write waiting wi({0}) stat({1}) nw({2}) nf({3}) nomore({4})", workitem.index, workitem.status, _nextToWrite, _nextToFill, _noMoreInputForThisSegment ); if (_noMoreInputForThisSegment && _nextToWrite == _nextToFill) break; wcycles++; // wake up someone else Monitor.Pulse(workitem); // release and wait Monitor.Wait(workitem); if (workitem.status == (int)WorkItem.Status.Compressed) TraceOutput(TraceBits.WriteWait, "Write A-OK wi({0}) stat({1}) iba({2}) cba({3}) cyc({4})", workitem.index, workitem.status, workitem.inputBytesAvailable, workitem.compressedBytesAvailable, wcycles); } if (_noMoreInputForThisSegment && _nextToWrite == _nextToFill) break; } } while (true); } if (_noMoreInputForThisSegment) TraceOutput(TraceBits.Write, "Write nomore nw({0}) nf({1}) break({2})", _nextToWrite, _nextToFill, (_nextToWrite == _nextToFill)); if (_noMoreInputForThisSegment && _nextToWrite == _nextToFill) break; } while (true); // Finish: // After writing a series of buffers, closing each one with // Flush.Sync, we now write the final one as Flush.Finish, and // then stop. byte[] buffer = new byte[128]; ZlibCodec compressor = new ZlibCodec(); int rc = compressor.InitializeDeflate(_compressLevel, false); compressor.InputBuffer = null; compressor.NextIn = 0; compressor.AvailableBytesIn = 0; compressor.OutputBuffer = buffer; compressor.NextOut = 0; compressor.AvailableBytesOut = buffer.Length; rc = compressor.Deflate(FlushType.Finish); if (rc != ZlibConstants.Z_STREAM_END && rc != ZlibConstants.Z_OK) throw new Exception("deflating: " + compressor.Message); if (buffer.Length - compressor.AvailableBytesOut > 0) { TraceOutput(TraceBits.WriteBegin, "Write begin flush bytes({0})", buffer.Length - compressor.AvailableBytesOut); _outStream.Write(buffer, 0, buffer.Length - compressor.AvailableBytesOut); TraceOutput(TraceBits.WriteBegin, "Write done flush"); } compressor.EndDeflate(); _Crc32 = c.Crc32Result; // signal that writing is complete: TraceOutput(TraceBits.Synch, "Synch _writingDone.Set() PWM"); _writingDone.Set(); } while (true); } catch (System.Exception exc1) { lock(_eLock) { // expose the exception to the main thread if (_pendingException!=null) _pendingException = exc1; } } TraceOutput(TraceBits.WriterThread, "_PerpetualWriterMethod FINIS"); } #endif private void _DeflateOne(Object wi) { // compress one buffer WorkItem workitem = (WorkItem) wi; try { int myItem = workitem.index; Ionic.Crc.CRC32 crc = new Ionic.Crc.CRC32(); // calc CRC on the buffer crc.SlurpBlock(workitem.buffer, 0, workitem.inputBytesAvailable); // deflate it DeflateOneSegment(workitem); // update status workitem.crc = crc.Crc32Result; TraceOutput(TraceBits.Compress, "Compress wi({0}) ord({1}) len({2})", workitem.index, workitem.ordinal, workitem.compressedBytesAvailable ); lock(_latestLock) { if (workitem.ordinal > _latestCompressed) _latestCompressed = workitem.ordinal; } lock (_toWrite) { _toWrite.Enqueue(workitem.index); } _newlyCompressedBlob.Set(); } catch (System.Exception exc1) { lock(_eLock) { // expose the exception to the main thread if (_pendingException!=null) _pendingException = exc1; } } } private bool DeflateOneSegment(WorkItem workitem) { ZlibCodec compressor = workitem.compressor; int rc= 0; compressor.ResetDeflate(); compressor.NextIn = 0; compressor.AvailableBytesIn = workitem.inputBytesAvailable; // step 1: deflate the buffer compressor.NextOut = 0; compressor.AvailableBytesOut = workitem.compressed.Length; do { compressor.Deflate(FlushType.None); } while (compressor.AvailableBytesIn > 0 || compressor.AvailableBytesOut == 0); // step 2: flush (sync) rc = compressor.Deflate(FlushType.Sync); workitem.compressedBytesAvailable= (int) compressor.TotalBytesOut; return true; } [System.Diagnostics.ConditionalAttribute("Trace")] private void TraceOutput(TraceBits bits, string format, params object[] varParams) { if ((bits & _DesiredTrace) != 0) { lock(_outputLock) { int tid = Thread.CurrentThread.GetHashCode(); #if !SILVERLIGHT Console.ForegroundColor = (ConsoleColor) (tid % 8 + 8); #endif Console.Write("{0:000} PDOS ", tid); Console.WriteLine(format, varParams); #if !SILVERLIGHT Console.ResetColor(); #endif } } } // used only when Trace is defined [Flags] enum TraceBits : uint { None = 0, NotUsed1 = 1, EmitLock = 2, EmitEnter = 4, // enter _EmitPending EmitBegin = 8, // begin to write out EmitDone = 16, // done writing out EmitSkip = 32, // writer skipping a workitem EmitAll = 58, // All Emit flags Flush = 64, Lifecycle = 128, // constructor/disposer Session = 256, // Close/Reset Synch = 512, // thread synchronization Instance = 1024, // instance settings Compress = 2048, // compress task Write = 4096, // filling buffers, when caller invokes Write() WriteEnter = 8192, // upon entry to Write() WriteTake = 16384, // on _toFill.Take() All = 0xffffffff, } /// /// Indicates whether the stream supports Seek operations. /// /// /// Always returns false. /// public override bool CanSeek { get { return false; } } /// /// Indicates whether the stream supports Read operations. /// /// /// Always returns false. /// public override bool CanRead { get {return false;} } /// /// Indicates whether the stream supports Write operations. /// /// /// Returns true if the provided stream is writable. /// public override bool CanWrite { get { return _outStream.CanWrite; } } /// /// Reading this property always throws a NotSupportedException. /// public override long Length { get { throw new NotSupportedException(); } } /// /// Returns the current position of the output stream. /// /// /// /// Because the output gets written by a background thread, /// the value may change asynchronously. Setting this /// property always throws a NotSupportedException. /// /// public override long Position { get { return _outStream.Position; } set { throw new NotSupportedException(); } } /// /// This method always throws a NotSupportedException. /// /// /// The buffer into which data would be read, IF THIS METHOD /// ACTUALLY DID ANYTHING. /// /// /// The offset within that data array at which to insert the /// data that is read, IF THIS METHOD ACTUALLY DID /// ANYTHING. /// /// /// The number of bytes to write, IF THIS METHOD ACTUALLY DID /// ANYTHING. /// /// nothing. public override int Read(byte[] buffer, int offset, int count) { throw new NotSupportedException(); } /// /// This method always throws a NotSupportedException. /// /// /// The offset to seek to.... /// IF THIS METHOD ACTUALLY DID ANYTHING. /// /// /// The reference specifying how to apply the offset.... IF /// THIS METHOD ACTUALLY DID ANYTHING. /// /// nothing. It always throws. public override long Seek(long offset, System.IO.SeekOrigin origin) { throw new NotSupportedException(); } /// /// This method always throws a NotSupportedException. /// /// /// The new value for the stream length.... IF /// THIS METHOD ACTUALLY DID ANYTHING. /// public override void SetLength(long value) { throw new NotSupportedException(); } } }