Welcome to part two

Lets jump in at the deep end and take a look at some code…

When you look at the method syntax for the xxxAsync methods you will notice they return a boolean value that indicates if the method completed synchronously, this means that you have to check the return value every time you use one of the methods and invoke the callback yourself if it completes synchronously.  In practice this hardly ever happens, and normally only on a send operation.  But as it is a possibility we will add module with a some extension methods in to help us out, this will make the code more readable and avoid unnecessary duplication.

SocketExtensions

module SocketExtensions
  open System
  open System.Net
  open System.Net.Sockets  
  type Socket with
    /// extension method to make async based call easier, this ensures the callback always gets
    /// called even if there is an error or the async method completed syncronously
    member s.InvokeAsyncMethod( asyncmethod, callback, args:SocketAsyncEventArgs) =
      let result = asyncmethod args
      if result <> true then callback args
    member s.AcceptAsyncSafe(callback, args) = s.InvokeAsyncMethod(s.AcceptAsync, callback, args)
    member s.ReceiveAsyncSafe(callback, args) = s.InvokeAsyncMethod(s.ReceiveAsync, callback, args)
    member s.SendAsyncSafe(callback, args) = s.InvokeAsyncMethod(s.SendAsync, callback, args)
    member s.DisconnectAsyncSafe(callback, args) = s.InvokeAsyncMethod(s.DisconnectAsync, callback, args)

Now lets get down to business, the next few types have a fair bit of code in them so I will briefly explain each type in turn:

BocketPool

A BocketPool is a combination of a SocketAsyncEventArgs object and a chunk of memory allocated in an array.  The array is sliced up into sections and allocated for each send or receive operation by setting a start and end index using SetBuffer(). If you remember last time I mentioned that a lot of memory fragmentation can occur during sending and receiving due to continuously allocating memory buffers on the Socket object, this is primarily done through the BeginSend and BeginReceive methods passing in a byte array.  Using the BocketPool it a great way of reducing the amount of garbage collection during heavy traffic.

The other major difference with SocketAsyncEventArgs is the way in which you make the send and receive calls, heres the general flow that occurs:

  • Create a SocketAsyncEventArgs object or get one from a pool.
  • Allocate an array to the buffer.
  • Allocate an offset and length to the buffer.
  • Allocate a callback method.
  • Call Socket.xxxAsync passing in the SocketAsyncEventArgs, the operation will complete and invoke the callback.

What we are going to do is wrap the whole creation, array allocation, and offsetting to the BocketPool:

namespace Fes
  open System
  open System.Net.Sockets
  open System.Collections.Concurrent  
  type BocketPool( number, size, callback) as this =
    let number = number
    let size = size
    let totalsize = (number * size)
    let buffer = Array.create totalsize 0uy
    let pool = new BlockingCollection<SocketAsyncEventArgs>(number:int)
    let mutable disposed = false
    let cleanUp() =
      if not disposed then
        disposed <- true
        pool.CompleteAdding()
        while pool.Count > 1 do
          (pool.Take() :> IDisposable).Dispose()
        pool.Dispose()
    do
      let rec loop n =
        match n with
        | x when x < totalsize ->
          let saea = new SocketAsyncEventArgs()
          saea.Completed |> Observable.add( fun saea -> (callback saea))
          saea.SetBuffer(buffer, n, size)
          this.CheckIn(saea)
          loop (n + size)
        | _ -> ()
      loop 0
    member this.CheckOut()=
      pool.Take()
    member this.CheckIn(saea)=
      pool.Add(saea)
    member this.Count =
      pool.Count
    interface IDisposable with
      member this.Dispose() = cleanUp()

Next up we have to look at the Connection and the Tcplistener types as two interconnected entities:

  • The TcpListener listens for a connection on a socket and port number.
  • The client connects to the server.
  • An accept socket is allocated to the client, at this point we have one socket for the server and once for each client.
  • We also need to allocate a BocketPool for send and receive operation for each client To simplify things we are going to encapsulate the accept socket management into a type, it will also need a corresponding BocketPool to service any send and receive operations to and from the client

Connection

namespace Fes
  open System
  open System.Net
  open System.Net.Sockets
  open System.Collections.Generic
  open System.Collections.Concurrent
  open System.Threading
  open SocketExtensions  
  type Connection(maxreceives, maxsends, size, socket:Socket) as this =
    let socket = socket
    let maxreceives = maxreceives
    let maxsends = maxsends
    let sendPool = new BocketPool(maxsends, size, this.sendCompleted )
    let receivePool = new BocketPool(maxreceives, size, this.receiveCompleted)
    let mutable disposed = false
    let mutable anyErrors = false  
    let cleanUp() =
      if not disposed then
        disposed <- true
        socket.Shutdown(SocketShutdown.Both)
        socket.Disconnect(false)
        socket.Close()
        (sendPool :> IDisposable).Dispose()
        (receivePool :> IDisposable).Dispose()  
    member this.Start() =
      socket.ReceiveAsyncSafe(this.receiveCompleted, receivePool.CheckOut())  
    member this.Stop() =
      socket.Close(2)  
    member this.receiveCompleted (args: SocketAsyncEventArgs) =
      try
        match args.LastOperation with
        | SocketAsyncOperation.Receive ->
          match args.SocketError with
          | SocketError.Success ->
            socket.ReceiveAsyncSafe( this.receiveCompleted, receivePool.CheckOut())
            let data = Array.create args.BytesTransferred 0uy
            Buffer.BlockCopy(args.Buffer, args.Offset, data, 0, data.Length)
            let client = args.RemoteEndPoint
            args.RemoteEndPoint <- null
            data |> printfn "received data: %A"
          | _ -> args.SocketError.ToString() |> printfn "socket error on receive: %s"
        | _ -> failwith "unknown operation, should be receive"
      finally
        receivePool.CheckIn(args)  
    member this.sendCompleted (args: SocketAsyncEventArgs) =
      try
        match args.LastOperation with
        | SocketAsyncOperation.Send ->
          match args.SocketError with
          | SocketError.Success -> ()
          | SocketError.NoBufferSpaceAvailable
          | SocketError.IOPending
          | SocketError.WouldBlock ->
            if not(anyErrors) then
              anyErrors <- true
              failwith "Buffer overflow or send buffer timeout"
          | _ -> args.SocketError.ToString() |> printfn "socket error on send: %s"
        | _ -> failwith "invalid operation, should be receive"
      finally
        sendPool.CheckIn(args)  
    member this.Send (msg:byte[]) =
      let s = sendPool.CheckOut()
      Buffer.BlockCopy(msg, 0, s.Buffer, s.Offset, msg.Length)
      socket.SendAsyncSafe(this.sendCompleted, s)

Finally here’s the TcpListener type.  It is responsible for creating an initial Connection object for each client and starts asynchronous sending messages to that client once a second, also notice that there is another BlockingCollection involved, this is somewhat simpler than the usage in the bocketPool as we have no buffer to manage here.

  • It is possible to fill the initial Buffer property, this causes the buffer to be sent to the client as soon as it has connected to the server, this can be useful to sent initial data to the client, such as protocol definitions etc) A finite number of connections can occur before blocking will occur depending on the number of AsyncEventArgs in the collection, this stops potential denial of service attacks due to too many connection being made.

TcpListener

namespace Fes
  open System
  open System.Net
  open System.Net.Sockets
  open System.Collections.Generic
  open System.Collections.Concurrent
  open System.Threading
  open SocketExtensions  
  type TcpListener(maxaccepts, maxsends, maxreceives, size, port, backlog) as this =  
    let createTcpSocket() =
      new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp)  
    let createListener (ip:IPAddress, port, backlog) =
      let s = createTcpSocket()
      s.Bind(new IPEndPoint(ip, port))
      s.Listen(backlog); s  
    let listeningSocket = createListener( IPAddress.Loopback, port, backlog)  
    let initPool (maxinpool, callback) =
      let pool = new BlockingCollection<SocketAsyncEventArgs>(maxinpool:int)
      let rec loop n =
        match n with
        | x when x < maxinpool ->
          let saea = new SocketAsyncEventArgs()
          saea.Completed |> Observable.add callback
          pool.Add saea
          loop (n+1)
        | _ -> ()
      loop 0
      pool  
    let acceptPool = initPool (maxaccepts, this.acceptcompleted)
    let newConnection socket = new Connection (maxreceives, maxsends, size, socket)
    let testMessage = Array.init<byte> 128 (fun _ -> 1uy)
    let header = Array.init<byte> 1 (fun _ -> 1uy)
    let mutable disposed = false  
    //mutable state from original
    let mutable anyErrors = false
    let mutable requestCount = 0
    let mutable numWritten = 0  
    //async code from original
    let asyncWriteStockQuote(connection:Connection) = async {
      do! Async.Sleep 1000
      connection.Send(testMessage)
      Interlocked.Increment(&numWritten) |> ignore }  
    //async code from original
    let asyncServiceClient (client: Connection) = async {
      client.Send(header)
      while true do
        do! asyncWriteStockQuote(client) }  
    let startSending connection =
      Async.Start (async {
        try
          use _holder = connection
          do! asyncServiceClient connection
        with e ->
          if not(anyErrors) then
            anyErrors <- true
            Console.WriteLine("server ERROR")
          raise e
        } )  
    let reportConnections =
      Interlocked.Increment(&requestCount) |> ignore
      if requestCount % 1000 = 0 then
        requestCount |> printfn "%A Clients accepted"  
    let cleanUp() =
      if not disposed then
        disposed <- true
        listeningSocket.Shutdown(SocketShutdown.Both)
        listeningSocket.Disconnect(false)
        listeningSocket.Close()  
    member this.acceptcompleted (args : SocketAsyncEventArgs) =
      try
        match args.LastOperation with
        | SocketAsyncOperation.Accept ->
          match args.SocketError with
          | SocketError.Success ->
            listeningSocket.AcceptAsyncSafe( this.acceptcompleted, acceptPool.Take())
            //create new connection
            let connection = newConnection args.AcceptSocket
            connection.Start()  
            //update stats
            reportConnections   
            //async start of messages to client
            startSending connection  
            //remove the AcceptSocket because we will be reusing args
            args.AcceptSocket <- null
          | _ -> args.SocketError.ToString() |> printfn "socket error on accept: %s"
        | _ -> args.LastOperation |> failwith "Unknown operation, should be accept but was %a"
      finally
        acceptPool.Add(args)  
    member this.start () =
      listeningSocket.AcceptAsyncSafe( this.acceptcompleted, acceptPool.Take())
      while true do
      Thread.Sleep 1000
      let count = Interlocked.Exchange(&numWritten, 0)
      count |> printfn "Quotes per sec: %A"  
    member this.Close() =
      cleanUp()  
    interface IDisposable with
      member this.Dispose() = cleanUp()

Its a fair bit of code to take in at once, so Ill leave you with it to ponder over.  Ill be explaining all of the interesting bits in more detail in part three…

Please feel free to leave any comments you have, especially on better use of functional constructs.