MBrace.Core and MBrace.Azure


Example: Starting a Web Server in your Cluster

This tutorial illustrates creating a cloud process which acts as a web server to monitor and control the cluster.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
#load "../lib/webserver.fsx"
#load "../lib/sieve.fsx"

open Suave
open Suave.Types
open Suave.Http
open Suave.Http.Applicatives
open Suave.Http.Successful
open Suave.Web
open Webserver

Starting a WebServer on your MBrace cluster

This example is from the MBrace Starter Kit.

In this tutorial, you learn how to start a web server on your MBrace cluster that can introspect into the cluster, schedule work, report on jobs and so on.

1: 
2: 
let getCluster() = 
    Config.GetCluster() 

This Suave request is executed in response to a GET on

http://nn.nn.nn.nn/

It uses cluster.GetWorkers to access information from the cluster.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
let getWorkersRequest ctxt = 
    async {
            let cluster = getCluster()
            let workers = cluster.Workers
            let msg = 
              [ yield "<html>"
                yield Angular.header
                yield "<body>"
                yield! workers |> Angular.table ["ID";"Heartbeat"] (fun w -> 
                    [ w.Id; sprintf "%A" w.LastHeartbeat ])
                yield "</body>"
                yield "</html>" ]
              |> String.concat "\n"
            let! rsp = OK msg ctxt 
            return rsp}

This Suave request is executed in response to a GET on http://nn.nn.nn.nn/cluster/logs

It uses cluster.GetLogs to access information from the cluster.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
37: 
38: 
39: 
40: 
41: 
42: 
43: 
44: 
45: 
46: 
47: 
48: 
49: 
50: 
51: 
52: 
53: 
let getLogsRequest ctxt = 
    async {
            let cluster = getCluster()
            let logs = cluster.GetSystemLogs()
            let msg = 
              [ yield "<html>"
                yield Angular.header
                yield "<body>"
                yield! logs |> Angular.table ["Time";"Message"] (fun w -> 
                    [ sprintf "%A" w.DateTime; w.Message ])
                yield "</body>"
                yield "</html>" ]
              |> String.concat "\n"
            return! OK msg ctxt  }

// This Suave request is executed in response to a GET on 
//   http://nn.nn.nn.nn/cluster/submit/primes/%d
//
// It uses cluster.CreateProcess to create a new job in the cluster.
let computePrimesRequest n ctxt = 
    async {
            let cluster = getCluster()
            let task = 
              cluster.CreateProcess 
                (cloud { let primes = Sieve.getPrimes n
                         return sprintf "calculated %d primes: %A" primes.Length primes })
            let msg = 
              [ yield "<html>"
                yield Angular.header
                yield "<body>"
                yield (sprintf "<p>Created job %s</p>" task.Id)
                yield "</body>"
                yield "</html>" ]
              |> String.concat "\n"
            return! OK msg ctxt  }

// This Suave request is executed in response to a GET on 
//   http://nn.nn.nn.nn/cluster/job/%d
//
// It uses cluster.GetProcess to create a new job in the cluster.
let getJobRequest v ctxt = 
    async {
            let cluster = getCluster()
            let task = cluster.GetProcessById v
            let msg = 
                [ yield "<html>"
                  yield Angular.header
                  yield "<body>"
                  yield (sprintf "<p>Job %s, Completed: %A, Result: %s</p>" task.Id task.Status (try if task.Status = CloudProcessStatus.Completed then sprintf "%A" (task.AwaitResultBoxed()) else "" with _ -> "<err>") )
                  yield "</body>"
                  yield "</html>" ]
                |> String.concat "\n"
            return! OK msg ctxt  }

Now the overall specification of the server:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
let webServerSpec () =
  choose
    [ GET >>= choose
                [ path "/" >>= OK "Welcome to the cluster" 
                  path "/cluster/workers" >>= getWorkersRequest
                  path "/cluster/logs" >>= getLogsRequest
                  pathScan "/cluster/job/%s" getJobRequest
                  pathScan "/cluster/submit/primes/%d" computePrimesRequest ]
      POST >>= choose
                [ path "/hello" >>= OK "Hello POST"
                  path "/goodbye" >>= OK "Good bye POST" ] ]

Now connect to the cluster:

1: 
2: 
3: 
let cluster = getCluster()

cluster.ShowProcesses()

Use this to inspect the endpoints we can bind to in the cluster. The code below is specific to MBrace.Azure where your code runs as part of an Azure worker role.

Note: If using a locally simulated MBrace.Thespian cluster then you need to adjust this example to use an appropriate endpoint IP address.

1: 
2: 
3: 
let endPointNames = 
    cloud { return Microsoft.WindowsAzure.ServiceRuntime.RoleEnvironment.CurrentRoleInstance.InstanceEndpoints.Keys |> Seq.toArray }
    |> cluster.Run

By default, MBrace.Azure clusters created using the Azure CustomCloudService allow us to bind to

  • HTTP endpoint 'DefaultHttpEndpoint'
  • HTTP endpoint 'MBraceStats'
  • TCP endpoint 'DefaultTcpEndpoint'

Here we bind to DefaultHttpEndpoint.

1: 
let serverJob = suaveServerInCloud "DefaultHttpEndpoint" webServerSpec |> cluster.CreateProcess

After you start the webserver (by binding to this internal endpoint), the website will be published to the corresponding public endpoint. You can find the IP address or URL for the public endpoint by looking for 'Input Endpoints' in the 'Configuration' section of the Azure management console page for the Cloud Service for the MBrace cluster.

For example, the public URL for the MBraceWorkerRole may be http://191.233.103.54:80 *)

( Use this to inspect the status of the web server job:

1: 
2: 
3: 
4: 
5: 
serverJob.ShowInfo()

(** If the webserver doesn't start, then use this to see the logging output from webserver.fsx: *) 

serverJob.ShowLogs()

Use this to cancel the web server (via the distributed cancellationToken being cancelled):

1: 
// serverJob.Cancel()

Use this to wait for the web server to exit after cancellation:

1: 
// serverJob.Result

In this tutorial, you've learned how to start a web server on an MBrace cluster which can respond to requests, control the cluster, access storage and so on. Continue with further samples to learn more about the MBrace programming model.

Note, you can use the above techniques from both scripts and compiled projects. To see the components referenced by this script, see ThespianCluster.fsx or AzureCluster.fsx.

namespace System
namespace System.IO
namespace MBrace
namespace MBrace.Core
namespace MBrace.Flow
namespace Suave
module Http

from Suave
module Successful

from Suave
module Web

from Suave
module Webserver
val getCluster : unit -> MBrace.Thespian.ThespianCluster

Full name: 200-starting-a-web-server-example.getCluster
module Config
val GetCluster : unit -> MBrace.Thespian.ThespianCluster

Full name: Config.GetCluster


 Gets or creates a new Thespian cluster session.
val getWorkersRequest : ctxt:'a -> Async<'b>

Full name: 200-starting-a-web-server-example.getWorkersRequest
val ctxt : 'a
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val cluster : MBrace.Thespian.ThespianCluster
val workers : MBrace.Runtime.WorkerRef []
property MBrace.Runtime.MBraceClient.Workers: MBrace.Runtime.WorkerRef []
val msg : string
module Angular

from Webserver
val header : string

Full name: Webserver.Angular.header
val table : hs:seq<string> -> f:('T -> #seq<string>) -> xs:seq<'T> -> string list

Full name: Webserver.Angular.table
val w : MBrace.Runtime.WorkerRef
property MBrace.Runtime.WorkerRef.Id: string
val sprintf : format:Printf.StringFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.sprintf
property MBrace.Runtime.WorkerRef.LastHeartbeat: DateTimeOffset
Multiple items
type String =
  new : value:char -> string + 7 overloads
  member Chars : int -> char
  member Clone : unit -> obj
  member CompareTo : value:obj -> int + 1 overload
  member Contains : value:string -> bool
  member CopyTo : sourceIndex:int * destination:char[] * destinationIndex:int * count:int -> unit
  member EndsWith : value:string -> bool + 2 overloads
  member Equals : obj:obj -> bool + 2 overloads
  member GetEnumerator : unit -> CharEnumerator
  member GetHashCode : unit -> int
  ...

Full name: System.String

--------------------
String(value: nativeptr<char>) : unit
String(value: nativeptr<sbyte>) : unit
String(value: char []) : unit
String(c: char, count: int) : unit
String(value: nativeptr<char>, startIndex: int, length: int) : unit
String(value: nativeptr<sbyte>, startIndex: int, length: int) : unit
String(value: char [], startIndex: int, length: int) : unit
String(value: nativeptr<sbyte>, startIndex: int, length: int, enc: Text.Encoding) : unit
val concat : sep:string -> strings:seq<string> -> string

Full name: Microsoft.FSharp.Core.String.concat
val rsp : 'b
val getLogsRequest : ctxt:'a -> Async<'b>

Full name: 200-starting-a-web-server-example.getLogsRequest
val logs : MBrace.Runtime.SystemLogEntry []
member MBrace.Runtime.MBraceClient.GetSystemLogs : ?worker:MBrace.Core.IWorkerRef * ?logLevel:MBrace.Runtime.LogLevel * ?filter:(MBrace.Runtime.SystemLogEntry -> bool) -> MBrace.Runtime.SystemLogEntry []
val w : MBrace.Runtime.SystemLogEntry
MBrace.Runtime.SystemLogEntry.DateTime: DateTimeOffset
MBrace.Runtime.SystemLogEntry.Message: string
val computePrimesRequest : n:'a -> ctxt:'b -> Async<'c>

Full name: 200-starting-a-web-server-example.computePrimesRequest
val n : 'a
val ctxt : 'b
val task : MBrace.Runtime.CloudProcess<obj>
member MBrace.Runtime.MBraceClient.CreateProcess : workflow:MBrace.Core.Cloud<'T> * ?cancellationToken:MBrace.Core.ICloudCancellationToken * ?faultPolicy:MBrace.Core.FaultPolicy * ?target:MBrace.Core.IWorkerRef * ?additionalResources:MBrace.Core.Internals.ResourceRegistry * ?taskName:string -> MBrace.Runtime.CloudProcess<'T>
module Sieve
val getPrimes : nmax:int -> int []

Full name: Sieve.getPrimes
property MBrace.Runtime.CloudProcess.Id: string
val getJobRequest : v:string -> ctxt:'a -> Async<'b>

Full name: 200-starting-a-web-server-example.getJobRequest
val v : string
val task : MBrace.Runtime.CloudProcess
member MBrace.Runtime.MBraceClient.GetProcessById : processId:string -> MBrace.Runtime.CloudProcess
property MBrace.Runtime.CloudProcess.Status: MBrace.Core.CloudProcessStatus
abstract member MBrace.Runtime.CloudProcess.AwaitResultBoxed : ?timeoutMilliseconds:int -> Async<obj>
val choose : options:WebPart<'a> list -> WebPart<'a>

Full name: Suave.WebPart.choose
union case HttpMethod.GET: HttpMethod
union case HttpMethod.POST: HttpMethod
val cluster : MBrace.Thespian.ThespianCluster

Full name: 200-starting-a-web-server-example.cluster
member MBrace.Runtime.MBraceClient.ShowProcesses : unit -> unit
val endPointNames : obj

Full name: 200-starting-a-web-server-example.endPointNames
namespace Microsoft
namespace Microsoft.WindowsAzure
namespace Microsoft.WindowsAzure.ServiceRuntime
type RoleEnvironment =
  static val TraceSource : TraceSource
  static member CurrentRoleInstance : RoleInstance
  static member DeploymentId : string
  static member GetConfigurationSettingValue : configurationSettingName:string -> string
  static member GetLocalResource : localResourceName:string -> LocalResource
  static member IsAvailable : bool
  static member IsEmulated : bool
  static member RequestRecycle : unit -> unit
  static member Roles : IDictionary<string, Role>
  static event Changing : EventHandler<RoleEnvironmentChangingEventArgs>
  ...

Full name: Microsoft.WindowsAzure.ServiceRuntime.RoleEnvironment
property WindowsAzure.ServiceRuntime.RoleEnvironment.CurrentRoleInstance: WindowsAzure.ServiceRuntime.RoleInstance
property WindowsAzure.ServiceRuntime.RoleInstance.InstanceEndpoints: Collections.Generic.IDictionary<string,WindowsAzure.ServiceRuntime.RoleInstanceEndpoint>
property Collections.Generic.IDictionary.Keys: Collections.Generic.ICollection<string>
module Seq

from Microsoft.FSharp.Collections
val toArray : source:seq<'T> -> 'T []

Full name: Microsoft.FSharp.Collections.Seq.toArray
member MBrace.Runtime.MBraceClient.Run : workflow:MBrace.Core.Cloud<'T> * ?cancellationToken:MBrace.Core.ICloudCancellationToken * ?faultPolicy:MBrace.Core.FaultPolicy * ?target:MBrace.Core.IWorkerRef * ?additionalResources:MBrace.Core.Internals.ResourceRegistry * ?taskName:string -> 'T
val serverJob : MBrace.Runtime.CloudProcess<obj>

Full name: 200-starting-a-web-server-example.serverJob
val suaveServerInCloud : endPointName:'a -> mkApp:'b -> 'c

Full name: Webserver.suaveServerInCloud
member MBrace.Runtime.CloudProcess.ShowInfo : unit -> unit
override MBrace.Runtime.CloudProcess.ShowLogs : ?filter:(MBrace.Runtime.CloudLogEntry -> bool) -> unit
Fork me on GitHub