Monday, May 7, 2012

A .NET Rx Driven Web Server

Edit: please see my other post on creating a .NET Web Server from Rx (Reactive Extensions) since it contains better code.

Although I've seen .Net Rx (Reactive Extensions) around, I never messed with them until today. To me, the concepts behind Rx always seemed self explanatory--perhaps because i have accomplished concurrent apps in .NET 1.0/2.0 without them. However, having spent a little time with them today, I think Rx is good stuff. Honestly, I'm impressed with the interfaces and the services they provide. Let's check it out:

What are the .Net Reactive Extensions(Rx)?

Short answer: pubsub.

Long answer: a ton of sugar over top of .Net streams, async, TPL, and pubsub. I'm not going to get into the generic intros you can find elsewhere that involve streaming enumerables to the console. Instead I'd prefer to create the argument for Rx as such-- when given the need for "X", it is better to provide "the ability to provide X" than "X" itself. The Reactive Extensions give you a ton of really helpful methods to aide you in implementing "the ability to create X" over "X" itself. Allow me to explain--

If i asked you to write me a function that gave me the first 1 million numbers, how would you implement it? I know a younger me would've started on cranking out a for loop, not taking into consideration that decision's implications upon the system's memory. A smarter implementation would be to give me a function/object that gives me the ability to create the first million numbers, perhaps through iterating through the set. Such an object could then forgo the previously mentioned memory issues. The idea of giving "the ability to create/observe X" over "X" itself is arguably the conceptual basis of functional programming's lazy evaluation, which is also what Rx aims to help the user create (to me, at least). So, out of the box you get a ton of ways to create and enable the push/pull of streaming data and/or events.

An Rx TCP Server

The first thing i could think of to make with Rx is a single-threaded TCP server. Maybe that's because when i think of streaming data these days, i tend to think of a node.js style web server. How hard could it be? (And what would the performance be like...?

Version One: A Single-Threaded Non-Rx Readonly TCP Server

If you run the following code, and make a request on your web browser to http://localhost:8081 you'll see the GET request come through to the app.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Net.Sockets;
using System.Data;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
namespace RxWeb
{
class Program
{
static void Main(string[] args)
{
var server = Task.Factory.StartNew(() =>
{
try
{
NetActor a = new NetActor(8081,(s)=>Console.Write(s));
}
catch (Exception exc)
{
throw;
}
});
while (true)
{
;
}
}
}
public class NetActor
{
Socket _Listener;
public NetActor(int port,Action<string> onRead)
{
_OnRead = onRead;
_Listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
_Listener.Bind(new IPEndPoint(IPAddress.Parse("127.0.0.1"), port));
_Listener.Listen(20);
_Listener.BeginAccept((o)=>BeginAccept(o),_Listener);
}
private Action<string> _OnRead { get; set; }
private void BeginAccept(IAsyncResult ar)
{
var listener = (Socket)ar.AsyncState;
var state=new StateObject()
{
workSocket=_Listener.EndAccept(ar)
};
state.workSocket.BeginReceive(state.buffer, 0, StateObject.BufferSize, 0, new AsyncCallback(BeginRead), state);
}
private void BeginRead(IAsyncResult ar)
{
var state = (StateObject)ar.AsyncState;
int bytes = state.workSocket.EndReceive(ar);
if (bytes > 0)
{
_OnRead(Encoding.ASCII.GetString(state.buffer, 0, bytes));
//var content = state.sb.ToString();
//if (content.IndexOf("<EOF>") != -1)
//{
//}
}
else
{
state.workSocket.BeginReceive(state.buffer, 0, StateObject.BufferSize, 0, new AsyncCallback(BeginRead), state);
}
}
private void GetRecieved() { }
// State object for reading client data asynchronously
private class StateObject
{
// Client socket.
public Socket workSocket = null;
// Size of receive buffer.
public const int BufferSize = 1024;
// Receive buffer.
public byte[] buffer = new byte[BufferSize];
// Received data string.
public StringBuilder sb = new StringBuilder();
}
}
}

Version Two: A Single-Threaded Rx Enabled TCP Server

In this version I added two properties to the NetActor-- Incoming and Outgoing. Both are based on new Rx interfaces that allow the client to tap into the push/pull of data to the client. So if you open your web browser, open up the localhost site, and then type into the console app and press enter, it will get delivered to the web page:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Net.Sockets;
using System.Data;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using System.Reactive.Linq;
using System.Reactive.Subjects;
namespace RxWeb
{
class Program
{
static void Main(string[] args)
{
var server = Task.Factory.StartNew(() =>
{
try
{
//create a netactor on port 8081
NetActor a = new NetActor(8081);
//write out to the console when you get sent data
a.Incoming.Subscribe(i => Console.Write(i));
//create an observable from the console and bind it to the netactor
var o=Observable.Start<string>(()=>Console.ReadLine());
//publish from the console to the actor
o.Subscribe<string>(a.Outgoing.OnNext);
}
catch (Exception exc)
{
throw;
}
});
while (true)
{
;
}
}
}
/// <summary>
/// Binds to localhost. Pretty much 100% based on msdn code
/// </summary>
/// <seealso cref="http://msdn.microsoft.com/en-us/library/fx6588te.aspx"/>
public class NetActor
{
//this socket is us
Socket _Listener;
//this socket is them
Socket _Client;
//Incoming messages from the client
private ISubject<string> _Incoming {get; set;}
//allow the ability to subscribe to incoming messages
public IObservable<string> Incoming
{
get
{
return _Incoming.AsObservable();
}
}
//outgoing messages
public ISubject<string> Outgoing { get; private set; }
public NetActor(int port)
{
_Incoming = new Subject<string>();
Outgoing = new Subject<string>();
_Listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
_Listener.Bind(new IPEndPoint(IPAddress.Parse("127.0.0.1"), port));
_Listener.Listen(20);
_Listener.BeginAccept((o)=>BeginAccept(o),_Listener);
}
private void BeginAccept(IAsyncResult ar)
{
var listener = (Socket)ar.AsyncState;
//create a state that contains the ability to listen
var state=new StateObject()
{
workSocket=_Listener.EndAccept(ar)
};
//create the ability to push data out
Outgoing.Subscribe((s) => Send(state.workSocket, s));
//classic microsoft code, gotta love [object] state
state.workSocket.BeginReceive(state.buffer, 0, StateObject.BufferSize, 0, new AsyncCallback(BeginRead), state);
//lets try to override this with Observable.FromAsyncPattern....
// worthless. this func returns just iasyncresult and not a tangible something... anything...
// like in the microsoft example docs. anger face
//var o = Observable.FromAsyncPattern<byte[], int, int, SocketFlags>(
// (byt, offset, size, flags, cb, st) => state.workSocket.BeginReceive(byt, offset, size, flags, cb, st),
// (a) => state.workSocket.EndReceive(a));
}
private void BeginRead(IAsyncResult ar)
{
try
{
var state = (StateObject)ar.AsyncState;
int bytes = state.workSocket.EndReceive(ar);
if (bytes > 0)
{
state.sb.Append(Encoding.ASCII.GetString(state.buffer, 0, bytes));
var content = state.sb.ToString();
_Incoming.OnNext(content);
if (content.IndexOf("<EOF>") != -1)
{
_Incoming.OnCompleted();
}
}
else
{
state.workSocket.BeginReceive(state.buffer, 0, StateObject.BufferSize, 0, new AsyncCallback(BeginRead), state);
}
}
catch (Exception exc)
{
_Incoming.OnError(exc);
}
}
private void Send(Socket handler, String data)
{
// Convert the string data to byte data using ASCII encoding.
byte[] byteData = Encoding.ASCII.GetBytes(data);
// Begin sending the data to the remote device.
handler.BeginSend(byteData, 0, byteData.Length, 0,
new AsyncCallback(SendCallback), handler);
}
private void SendCallback(IAsyncResult ar)
{
try
{
// Retrieve the socket from the state object.
Socket handler = (Socket)ar.AsyncState;
// Complete sending the data to the remote device.
int bytesSent = handler.EndSend(ar);
handler.Shutdown(SocketShutdown.Both);
handler.Close();
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
}
}
// State object for reading client data asynchronously
private class StateObject
{
// Client socket.
public Socket workSocket = null;
// Size of receive buffer.
public const int BufferSize = 1024;
// Receive buffer.
public byte[] buffer = new byte[BufferSize];
// Received data string.
public StringBuilder sb = new StringBuilder();
}
}
}
view raw RxTcpServer.cs hosted with ❤ by GitHub

Version Three: The Node.js Killer

Ok, so in order to get apache bench to recognize my console app as a web server i had to bind the NetActor's Ip to something other than localhost... not sure why. Once i got that working, i had intermittent failure until I implemented part of the HTTP spec-- at least the response code and connection closed. After that, and also after creating the ability for the NetActor to shut itself down and start itself up, here is what i was left with:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Net.Sockets;
using System.Data;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using System.Reactive.Linq;
using System.Reactive.Subjects;
namespace RxWeb
{
class Program
{
static void Main(string[] args)
{
var server = Task.Factory.StartNew(() =>
{
try
{
Factory();
}
catch (Exception exc)
{
throw;
}
});
while (true)
{
;
}
}
public static NetActor Factory()
{
var response = @"HTTP/1.1 200
Connection:close
thanks!";
//create a netactor on port 8081
NetActor a = new NetActor(8081);
//write out to the console when you get sent data
a.Incoming.Subscribe(i =>
{
Console.WriteLine("handling request...");
a.Outgoing.OnNext(response);
a.Dispose();
a = null;
Factory();
});
//create an observable from the console and bind it to the netactor
var o = Observable.Start<string>(() => Console.ReadLine());
//publish from the console to the actor
o.Subscribe<string>(a.Outgoing.OnNext);
return a;
}
}
/// <summary>
/// Binds to localhost. Pretty much 100% based on msdn code
/// </summary>
/// <seealso cref="http://msdn.microsoft.com/en-us/library/fx6588te.aspx"/>
public class NetActor : IDisposable
{
//this socket is us
Socket _Server;
//this socket is them
Socket _Client;
//Incoming messages from the client
private ISubject<string> _Incoming { get; set; }
//allow the ability to subscribe to incoming messages
public IObservable<string> Incoming
{
get
{
return _Incoming.AsObservable();
}
}
//outgoing messages
public ISubject<string> Outgoing { get; private set; }
public NetActor(int port)
{
_Incoming = new Subject<string>();
Outgoing = new Subject<string>();
_Server = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
IPHostEntry ipHostInfo = Dns.Resolve(Dns.GetHostName());
IPAddress ipAddress = ipHostInfo.AddressList[0];
IPEndPoint localEndPoint = new IPEndPoint(ipAddress, port);
// _Server.Bind(new IPEndPoint(IPAddress.Parse("127.0.0.1"), port));
_Server.Bind(localEndPoint);
_Server.Listen(100);
_Server.BeginAccept((o) => BeginAccept(o), _Server);
}
private void BeginAccept(IAsyncResult ar)
{
var listener = (Socket)ar.AsyncState;
//create a state that contains the ability to listen
var state = new StateObject()
{
workSocket = _Server.EndAccept(ar)
};
//create the ability to push data out
Outgoing.Subscribe((s) => Send(state.workSocket, s));
//classic microsoft code, gotta love [object] state
state.workSocket.BeginReceive(state.buffer, 0, StateObject.BufferSize, 0, new AsyncCallback(BeginRead), state);
//lets try to override this with Observable.FromAsyncPattern....
// worthless. this func returns just iasyncresult and not a tangible something... anything...
// like in the microsoft example docs. anger face
//var o = Observable.FromAsyncPattern<byte[], int, int, SocketFlags>(
// (byt, offset, size, flags, cb, st) => state.workSocket.BeginReceive(byt, offset, size, flags, cb, st),
// (a) => state.workSocket.EndReceive(a));
}
private void BeginRead(IAsyncResult ar)
{
try
{
var state = (StateObject)ar.AsyncState;
int bytes = state.workSocket.EndReceive(ar);
if (bytes > 0)
{
state.sb.Append(Encoding.ASCII.GetString(state.buffer, 0, bytes));
var content = state.sb.ToString();
_Incoming.OnNext(content);
if (content.IndexOf("<EOF>") != -1)
{
_Incoming.OnCompleted();
}
}
else
{
state.workSocket.BeginReceive(state.buffer, 0, StateObject.BufferSize, 0, new AsyncCallback(BeginRead), state);
}
}
catch (Exception exc)
{
_Incoming.OnError(exc);
}
}
private void Send(Socket handler, String data)
{
if (handler == null) return;
// Convert the string data to byte data using ASCII encoding.
byte[] byteData = Encoding.ASCII.GetBytes(data);
// Begin sending the data to the remote device.
handler.BeginSend(byteData, 0, byteData.Length, 0,
new AsyncCallback(SendCallback), handler);
}
private void SendCallback(IAsyncResult ar)
{
try
{
// Retrieve the socket from the state object.
Socket handler = (Socket)ar.AsyncState;
// Complete sending the data to the remote device.
int bytesSent = handler.EndSend(ar);
handler.Shutdown(SocketShutdown.Both);
handler.Close();
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
}
}
// State object for reading client data asynchronously
private class StateObject
{
// Client socket.
public Socket workSocket = null;
// Size of receive buffer.
public const int BufferSize = 1024;
// Receive buffer.
public byte[] buffer = new byte[BufferSize];
// Received data string.
public StringBuilder sb = new StringBuilder();
}
public void End()
{
if (_Client != null)
{
_Client.Shutdown(SocketShutdown.Both);
_Client.Close();
}
if (_Server != null)
{
_Server.Close();
}
}
public void Dispose()
{
End();
}
}
}

Apache Bench Results





At 500ms+ with a concurrency level of 1, this is definitely not a node.js killer..... ;-)

No comments:

Post a Comment