Sourcecode - PartnerStream

Download PartnerStream.cs

////////////////////////////////////////////////
////             PartnerStream              ////
////  Version 1.0.0               09.07.04  ////
////////////////////////////////////////////////
// P2PLib - Copyright (C) 2004  Daniel Grunwald
// 
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Lesser General Public
// License as published by the Free Software Foundation; either
// version 2.1 of the License, or (at your option) any later version.
// 
using System;
using System.IO;

namespace Grunwald.P2PLib.Server
{
	/// <summary>
	/// This special type of stream only exists in pairs.
	/// Use the static method Create to create a pair of PartnerStreams.
	/// Whenever you write something to one stream, it can be read from the other stream.
	/// </summary>
	/// <example>
	/// You can use this stream to emulate a socket connection.
	/// <code>
	/// void TestPartnerStream()
	/// {
	/// 	PartnerStream s1, s2;
	/// 	PartnerStream.Create(out s1, out s2);
	/// 	StreamWriter w = new StreamWriter(s1);
	/// 	StreamReader r = new StreamReader(s2);
	/// 	w.WriteLine("Hello World!");
	/// 	w.Flush();
	/// 	Console.Write(r.ReadLine());
	/// }
	/// </code>
	/// </example>
	public class PartnerStream : System.IO.Stream
	{
		/// <summary>
		/// Creates a pair of PartnerStreams that are connected with each other.
		/// </summary>
		public static void Create(out PartnerStream stream1, out PartnerStream stream2)
		{
			stream1 = new PartnerStream();
			stream2 = new PartnerStream();
			stream1.partner = stream2;
			stream2.partner = stream1;
		}
		
		MemoryStream m = new MemoryStream();
		PartnerStream partner;
		
		private PartnerStream()
		{
		}
		
		/// <summary>
		/// Gets a value indicating whether the current stream supports reading.
		/// The PartnerStream always supports reading as long as it is not closed.
		/// </summary>
		/// <remarks>
		/// Inherited property from base class Stream
		/// 	- read only
		/// </remarks>
		public override bool CanRead {
			get {
				return partner != null;
			}
		}
		
		/// <summary>
		/// Gets a value indicating whether the current stream supports seeking.
		/// The PartnerStream does not support seeking.
		/// </summary>
		/// <remarks>
		/// Inherited property from base class Stream
		/// 	- read only
		/// </remarks>
		public override bool CanSeek {
			get { return false; }
		}
		
		/// <summary>
		/// Gets a value indicating whether the current stream supports writing.
		/// The PartnerStream always supports writing as long as it is not closed.
		/// </summary>
		/// <remarks>
		/// Inherited property from base class Stream
		/// 	- read only
		/// </remarks>
		public override bool CanWrite {
			get {
				return partner != null;
			}
		}
		
		/// <summary>
		/// Gets the length in bytes of the stream.
		/// The PartnerStream doesn't support this property.
		/// </summary>
		/// <remarks>
		/// Inherited property from base class Stream
		/// 	- read only
		/// </remarks>
		public override long Length {
			get {
				throw new NotSupportedException();
			}
		}
		
		/// <summary>
		/// Gets or sets the position within the current stream.
		/// This property is not supported by PartnerStream.
		/// </summary>
		/// <remarks>
		/// Inherited property from base class Stream
		/// 	- read/write
		/// </remarks>
		public override long Position {
			get {
				throw new NotSupportedException();
			}
			set {
				throw new NotSupportedException();
			}
		}
		
		MemoryStream writeBuffer = new MemoryStream();
		
		bool autoFlush = false;
		
		/// <summary>
		/// Gets or sets a value indicating whether the PartnerStream will flush its
		/// buffer to the partner after every call to <see cref="Write"/>.
		/// </summary>
		/// <remarks>
		/// Turning this property off will call <see cref="Flush"/>.
		/// </remarks>
		public bool AutoFlush {
			get { return autoFlush; }
			set {
				if (autoFlush && !value) Flush();
				autoFlush = value;
			}
		}
		
		/// <summary>
		/// Writes a sequence of bytes to the partner stream.
		/// </summary>
		/// <remarks>
		/// Inherited method from base class Stream
		/// </remarks>
		/// <param name='buffer'>An array of bytes. This method copies count bytes from buffer to the current stream.</param>
		/// <param name='offset'>The zero-based byte offset in buffer at which to begin copying bytes to the current stream.</param>
		/// <param name='count'>The number of bytes to be written to the current stream.</param>
		public override void Write(byte[] buffer, int offset, int count)
		{
			if (partner == null) throw new ObjectDisposedException("PartnerStream");
			if (AutoFlush) {
				MemoryStream m = partner.m;
				lock(m) {
					long loc = m.Position;
					m.Seek(0, SeekOrigin.End);
					m.Write(buffer, offset, count);
					m.Position = loc;
				}
			} else {
				lock(writeBuffer)
				{
					writeBuffer.Write(buffer, offset, count);
				}
			}
		}
		
		/// <summary>
		/// Reads a sequence of bytes from the current stream and advances the
		/// position within the stream by the number of bytes read.
		/// </summary>
		/// <remarks>
		/// Inherited method from base class Stream
		/// </remarks>
		/// <param name='buffer'>An array of bytes. When this method returns, the buffer
		/// contains the specified byte array with the values between offset and
		/// (offset + count- 1) replaced by the bytes read from the current source.</param>
		/// <param name='offset'>The zero-based byte offset in buffer at which to
		/// begin storing the data read from the current stream.</param>
		/// <param name='count'>The maximum number of bytes to be read from the current stream.</param>
		public override int Read(byte[] buffer, int offset, int count)
		{
			if (partner == null) return 0;
			while(true) {
				lock(m) {
					int c = m.Read(buffer, offset, count);
					if (c == 0) {
						// Finished reading
						m.Position = 0;
						m.SetLength(0);
					} else {
						return c;
					}
				}
				System.Threading.Thread.Sleep(50);
				if (partner == null) return 0;
			}
		}
		
		/// <summary>
		/// Sets the length of the current stream.
		/// PartnerStream doesn't support this method.
		/// </summary>
		/// <remarks>
		/// Inherited method from base class Stream
		/// </remarks>
		/// <param name='value'>The desired length of the current stream in bytes.</param>
		public override void SetLength(long value)
		{
			throw new NotSupportedException();
		}
		
		/// <summary>
		/// Sets the position within the current stream.
		/// PartnerStream doesn't support this method.
		/// </summary>
		/// <remarks>
		/// Inherited method from base class Stream
		/// </remarks>
		/// <param name='offset'>A byte offset relative to the origin parameter.</param>
		/// <param name='origin'>A value of type <see cref="SeekOrigin"/> indicating
		/// the reference point used to obtain the new position.</param>
		public override long Seek(long offset, System.IO.SeekOrigin origin)
		{
			throw new NotSupportedException();
		}
		
		/// <summary>
		/// Clears all buffers for this stream and causes any buffered data to be
		/// written to the underlying device (in this case the partner stream).
		/// </summary>
		/// <remarks>
		/// Inherited method from base class Stream
		/// </remarks>
		public override void Flush()
		{
			if (partner == null) return;
			MemoryStream m = partner.m;
			lock(writeBuffer) {
				lock(m) {
					long loc = m.Position;
					m.Seek(0, SeekOrigin.End);
					writeBuffer.WriteTo(m);
					m.Position = loc;
				}
				writeBuffer.Position = 0;
				writeBuffer.SetLength(0);
			}
		}
		
		/// <summary>
		/// Closes the current stream and its partner stream.
		/// </summary>
		/// <remarks>
		/// Inherited method from base class Stream
		/// </remarks>
		public override void Close()
		{
			if (partner == null) return;
			base.Close();
			PartnerStream p = partner;
			partner = null;
			p.Close();
		}
	}
}