Sourcecode - PartnerStream
Download PartnerStream.cs
////////////////////////////////////////////////
//// PartnerStream ////
//// Version 1.0.0 09.07.04 ////
////////////////////////////////////////////////
// P2PLib - Copyright (C) 2004 Daniel Grunwald
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Lesser General Public
// License as published by the Free Software Foundation; either
// version 2.1 of the License, or (at your option) any later version.
//
using System;
using System.IO;
namespace Grunwald.P2PLib.Server
{
/// <summary>
/// This special type of stream only exists in pairs.
/// Use the static method Create to create a pair of PartnerStreams.
/// Whenever you write something to one stream, it can be read from the other stream.
/// </summary>
/// <example>
/// You can use this stream to emulate a socket connection.
/// <code>
/// void TestPartnerStream()
/// {
/// PartnerStream s1, s2;
/// PartnerStream.Create(out s1, out s2);
/// StreamWriter w = new StreamWriter(s1);
/// StreamReader r = new StreamReader(s2);
/// w.WriteLine("Hello World!");
/// w.Flush();
/// Console.Write(r.ReadLine());
/// }
/// </code>
/// </example>
public class PartnerStream : System.IO.Stream
{
/// <summary>
/// Creates a pair of PartnerStreams that are connected with each other.
/// </summary>
public static void Create(out PartnerStream stream1, out PartnerStream stream2)
{
stream1 = new PartnerStream();
stream2 = new PartnerStream();
stream1.partner = stream2;
stream2.partner = stream1;
}
MemoryStream m = new MemoryStream();
PartnerStream partner;
private PartnerStream()
{
}
/// <summary>
/// Gets a value indicating whether the current stream supports reading.
/// The PartnerStream always supports reading as long as it is not closed.
/// </summary>
/// <remarks>
/// Inherited property from base class Stream
/// - read only
/// </remarks>
public override bool CanRead {
get {
return partner != null;
}
}
/// <summary>
/// Gets a value indicating whether the current stream supports seeking.
/// The PartnerStream does not support seeking.
/// </summary>
/// <remarks>
/// Inherited property from base class Stream
/// - read only
/// </remarks>
public override bool CanSeek {
get { return false; }
}
/// <summary>
/// Gets a value indicating whether the current stream supports writing.
/// The PartnerStream always supports writing as long as it is not closed.
/// </summary>
/// <remarks>
/// Inherited property from base class Stream
/// - read only
/// </remarks>
public override bool CanWrite {
get {
return partner != null;
}
}
/// <summary>
/// Gets the length in bytes of the stream.
/// The PartnerStream doesn't support this property.
/// </summary>
/// <remarks>
/// Inherited property from base class Stream
/// - read only
/// </remarks>
public override long Length {
get {
throw new NotSupportedException();
}
}
/// <summary>
/// Gets or sets the position within the current stream.
/// This property is not supported by PartnerStream.
/// </summary>
/// <remarks>
/// Inherited property from base class Stream
/// - read/write
/// </remarks>
public override long Position {
get {
throw new NotSupportedException();
}
set {
throw new NotSupportedException();
}
}
MemoryStream writeBuffer = new MemoryStream();
bool autoFlush = false;
/// <summary>
/// Gets or sets a value indicating whether the PartnerStream will flush its
/// buffer to the partner after every call to <see cref="Write"/>.
/// </summary>
/// <remarks>
/// Turning this property off will call <see cref="Flush"/>.
/// </remarks>
public bool AutoFlush {
get { return autoFlush; }
set {
if (autoFlush && !value) Flush();
autoFlush = value;
}
}
/// <summary>
/// Writes a sequence of bytes to the partner stream.
/// </summary>
/// <remarks>
/// Inherited method from base class Stream
/// </remarks>
/// <param name='buffer'>An array of bytes. This method copies count bytes from buffer to the current stream.</param>
/// <param name='offset'>The zero-based byte offset in buffer at which to begin copying bytes to the current stream.</param>
/// <param name='count'>The number of bytes to be written to the current stream.</param>
public override void Write(byte[] buffer, int offset, int count)
{
if (partner == null) throw new ObjectDisposedException("PartnerStream");
if (AutoFlush) {
MemoryStream m = partner.m;
lock(m) {
long loc = m.Position;
m.Seek(0, SeekOrigin.End);
m.Write(buffer, offset, count);
m.Position = loc;
}
} else {
lock(writeBuffer)
{
writeBuffer.Write(buffer, offset, count);
}
}
}
/// <summary>
/// Reads a sequence of bytes from the current stream and advances the
/// position within the stream by the number of bytes read.
/// </summary>
/// <remarks>
/// Inherited method from base class Stream
/// </remarks>
/// <param name='buffer'>An array of bytes. When this method returns, the buffer
/// contains the specified byte array with the values between offset and
/// (offset + count- 1) replaced by the bytes read from the current source.</param>
/// <param name='offset'>The zero-based byte offset in buffer at which to
/// begin storing the data read from the current stream.</param>
/// <param name='count'>The maximum number of bytes to be read from the current stream.</param>
public override int Read(byte[] buffer, int offset, int count)
{
if (partner == null) return 0;
while(true) {
lock(m) {
int c = m.Read(buffer, offset, count);
if (c == 0) {
// Finished reading
m.Position = 0;
m.SetLength(0);
} else {
return c;
}
}
System.Threading.Thread.Sleep(50);
if (partner == null) return 0;
}
}
/// <summary>
/// Sets the length of the current stream.
/// PartnerStream doesn't support this method.
/// </summary>
/// <remarks>
/// Inherited method from base class Stream
/// </remarks>
/// <param name='value'>The desired length of the current stream in bytes.</param>
public override void SetLength(long value)
{
throw new NotSupportedException();
}
/// <summary>
/// Sets the position within the current stream.
/// PartnerStream doesn't support this method.
/// </summary>
/// <remarks>
/// Inherited method from base class Stream
/// </remarks>
/// <param name='offset'>A byte offset relative to the origin parameter.</param>
/// <param name='origin'>A value of type <see cref="SeekOrigin"/> indicating
/// the reference point used to obtain the new position.</param>
public override long Seek(long offset, System.IO.SeekOrigin origin)
{
throw new NotSupportedException();
}
/// <summary>
/// Clears all buffers for this stream and causes any buffered data to be
/// written to the underlying device (in this case the partner stream).
/// </summary>
/// <remarks>
/// Inherited method from base class Stream
/// </remarks>
public override void Flush()
{
if (partner == null) return;
MemoryStream m = partner.m;
lock(writeBuffer) {
lock(m) {
long loc = m.Position;
m.Seek(0, SeekOrigin.End);
writeBuffer.WriteTo(m);
m.Position = loc;
}
writeBuffer.Position = 0;
writeBuffer.SetLength(0);
}
}
/// <summary>
/// Closes the current stream and its partner stream.
/// </summary>
/// <remarks>
/// Inherited method from base class Stream
/// </remarks>
public override void Close()
{
if (partner == null) return;
base.Close();
PartnerStream p = partner;
partner = null;
p.Close();
}
}
}