This tutorial illustrates creating a cloud process which
acts as a web server to monitor and control the cluster.
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
|
#load "../lib/webserver.fsx"
#load "../lib/sieve.fsx"
open Suave
open Suave.Types
open Suave.Http
open Suave.Http.Applicatives
open Suave.Http.Successful
open Suave.Web
open Webserver
|
This example is from the MBrace Starter Kit.
In this tutorial, you learn how to start a web server on your MBrace cluster that can introspect
into the cluster, schedule work, report on jobs and so on.
1:
2:
|
let getCluster() =
Config.GetCluster()
|
This Suave request is executed in response to a GET on
http://nn.nn.nn.nn/
It uses cluster.GetWorkers to access information from the cluster.
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
|
let getWorkersRequest ctxt =
async {
let cluster = getCluster()
let workers = cluster.Workers
let msg =
[ yield "<html>"
yield Angular.header
yield "<body>"
yield! workers |> Angular.table ["ID";"Heartbeat"] (fun w ->
[ w.Id; sprintf "%A" w.LastHeartbeat ])
yield "</body>"
yield "</html>" ]
|> String.concat "\n"
let! rsp = OK msg ctxt
return rsp}
|
This Suave request is executed in response to a GET on
http://nn.nn.nn.nn/cluster/logs
It uses cluster.GetLogs to access information from the cluster.
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
23:
24:
25:
26:
27:
28:
29:
30:
31:
32:
33:
34:
35:
36:
37:
38:
39:
40:
41:
42:
43:
44:
45:
46:
47:
48:
49:
50:
51:
52:
53:
|
let getLogsRequest ctxt =
async {
let cluster = getCluster()
let logs = cluster.GetSystemLogs()
let msg =
[ yield "<html>"
yield Angular.header
yield "<body>"
yield! logs |> Angular.table ["Time";"Message"] (fun w ->
[ sprintf "%A" w.DateTime; w.Message ])
yield "</body>"
yield "</html>" ]
|> String.concat "\n"
return! OK msg ctxt }
// This Suave request is executed in response to a GET on
// http://nn.nn.nn.nn/cluster/submit/primes/%d
//
// It uses cluster.CreateProcess to create a new job in the cluster.
let computePrimesRequest n ctxt =
async {
let cluster = getCluster()
let task =
cluster.CreateProcess
(cloud { let primes = Sieve.getPrimes n
return sprintf "calculated %d primes: %A" primes.Length primes })
let msg =
[ yield "<html>"
yield Angular.header
yield "<body>"
yield (sprintf "<p>Created job %s</p>" task.Id)
yield "</body>"
yield "</html>" ]
|> String.concat "\n"
return! OK msg ctxt }
// This Suave request is executed in response to a GET on
// http://nn.nn.nn.nn/cluster/job/%d
//
// It uses cluster.GetProcess to create a new job in the cluster.
let getJobRequest v ctxt =
async {
let cluster = getCluster()
let task = cluster.GetProcessById v
let msg =
[ yield "<html>"
yield Angular.header
yield "<body>"
yield (sprintf "<p>Job %s, Completed: %A, Result: %s</p>" task.Id task.Status (try if task.Status = CloudProcessStatus.Completed then sprintf "%A" (task.AwaitResultBoxed()) else "" with _ -> "<err>") )
yield "</body>"
yield "</html>" ]
|> String.concat "\n"
return! OK msg ctxt }
|
Now the overall specification of the server:
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
|
let webServerSpec () =
choose
[ GET >>= choose
[ path "/" >>= OK "Welcome to the cluster"
path "/cluster/workers" >>= getWorkersRequest
path "/cluster/logs" >>= getLogsRequest
pathScan "/cluster/job/%s" getJobRequest
pathScan "/cluster/submit/primes/%d" computePrimesRequest ]
POST >>= choose
[ path "/hello" >>= OK "Hello POST"
path "/goodbye" >>= OK "Good bye POST" ] ]
|
Now connect to the cluster:
1:
2:
3:
|
let cluster = getCluster()
cluster.ShowProcesses()
|
Use this to inspect the endpoints we can bind to in the cluster.
The code below is specific to MBrace.Azure where your code runs as part
of an Azure worker role.
Note: If using a locally simulated MBrace.Thespian cluster then you need to adjust this example to use an appropriate endpoint IP address.
1:
2:
3:
|
let endPointNames =
cloud { return Microsoft.WindowsAzure.ServiceRuntime.RoleEnvironment.CurrentRoleInstance.InstanceEndpoints.Keys |> Seq.toArray }
|> cluster.Run
|
By default, MBrace.Azure clusters created using the Azure CustomCloudService allow us to bind to
- HTTP endpoint 'DefaultHttpEndpoint'
- HTTP endpoint 'MBraceStats'
- TCP endpoint 'DefaultTcpEndpoint'
Here we bind to DefaultHttpEndpoint.
1:
|
let serverJob = suaveServerInCloud "DefaultHttpEndpoint" webServerSpec |> cluster.CreateProcess
|
After you start the webserver (by binding to this internal endpoint), the website will
be published to the corresponding public endpoint. You can find the IP
address or URL for the public endpoint by looking for 'Input Endpoints'
in the 'Configuration' section of the Azure management console page for the
Cloud Service for the MBrace cluster.
For example, the public URL for the MBraceWorkerRole may be http://191.233.103.54:80
*)
( Use this to inspect the status of the web server job:
1:
2:
3:
4:
5:
|
serverJob.ShowInfo()
(** If the webserver doesn't start, then use this to see the logging output from webserver.fsx: *)
serverJob.ShowLogs()
|
Use this to cancel the web server (via the distributed cancellationToken being cancelled):
Use this to wait for the web server to exit after cancellation:
In this tutorial, you've learned how to start a web server on an MBrace cluster which can respond
to requests, control the cluster, access storage and so on.
Continue with further samples to learn more about the MBrace programming model.
Note, you can use the above techniques from both scripts and compiled projects. To see the components referenced
by this script, see ThespianCluster.fsx or AzureCluster.fsx.
namespace System
namespace System.IO
namespace MBrace
namespace MBrace.Core
namespace MBrace.Flow
namespace Suave
module Http
from Suave
module Successful
from Suave
module Web
from Suave
module Webserver
val getCluster : unit -> MBrace.Thespian.ThespianCluster
Full name: 200-starting-a-web-server-example.getCluster
module Config
val GetCluster : unit -> MBrace.Thespian.ThespianCluster
Full name: Config.GetCluster
Gets or creates a new Thespian cluster session.
val getWorkersRequest : ctxt:'a -> Async<'b>
Full name: 200-starting-a-web-server-example.getWorkersRequest
val ctxt : 'a
val async : AsyncBuilder
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val cluster : MBrace.Thespian.ThespianCluster
val workers : MBrace.Runtime.WorkerRef []
property MBrace.Runtime.MBraceClient.Workers: MBrace.Runtime.WorkerRef []
val msg : string
module Angular
from Webserver
val header : string
Full name: Webserver.Angular.header
val table : hs:seq<string> -> f:('T -> #seq<string>) -> xs:seq<'T> -> string list
Full name: Webserver.Angular.table
val w : MBrace.Runtime.WorkerRef
property MBrace.Runtime.WorkerRef.Id: string
val sprintf : format:Printf.StringFormat<'T> -> 'T
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.sprintf
property MBrace.Runtime.WorkerRef.LastHeartbeat: DateTimeOffset
Multiple items
type String =
new : value:char -> string + 7 overloads
member Chars : int -> char
member Clone : unit -> obj
member CompareTo : value:obj -> int + 1 overload
member Contains : value:string -> bool
member CopyTo : sourceIndex:int * destination:char[] * destinationIndex:int * count:int -> unit
member EndsWith : value:string -> bool + 2 overloads
member Equals : obj:obj -> bool + 2 overloads
member GetEnumerator : unit -> CharEnumerator
member GetHashCode : unit -> int
...
Full name: System.String
--------------------
String(value: nativeptr<char>) : unit
String(value: nativeptr<sbyte>) : unit
String(value: char []) : unit
String(c: char, count: int) : unit
String(value: nativeptr<char>, startIndex: int, length: int) : unit
String(value: nativeptr<sbyte>, startIndex: int, length: int) : unit
String(value: char [], startIndex: int, length: int) : unit
String(value: nativeptr<sbyte>, startIndex: int, length: int, enc: Text.Encoding) : unit
val concat : sep:string -> strings:seq<string> -> string
Full name: Microsoft.FSharp.Core.String.concat
val rsp : 'b
val getLogsRequest : ctxt:'a -> Async<'b>
Full name: 200-starting-a-web-server-example.getLogsRequest
val logs : MBrace.Runtime.SystemLogEntry []
member MBrace.Runtime.MBraceClient.GetSystemLogs : ?worker:MBrace.Core.IWorkerRef * ?logLevel:MBrace.Runtime.LogLevel * ?filter:(MBrace.Runtime.SystemLogEntry -> bool) -> MBrace.Runtime.SystemLogEntry []
val w : MBrace.Runtime.SystemLogEntry
MBrace.Runtime.SystemLogEntry.DateTime: DateTimeOffset
MBrace.Runtime.SystemLogEntry.Message: string
val computePrimesRequest : n:'a -> ctxt:'b -> Async<'c>
Full name: 200-starting-a-web-server-example.computePrimesRequest
val n : 'a
val ctxt : 'b
val task : MBrace.Runtime.CloudProcess<obj>
member MBrace.Runtime.MBraceClient.CreateProcess : workflow:MBrace.Core.Cloud<'T> * ?cancellationToken:MBrace.Core.ICloudCancellationToken * ?faultPolicy:MBrace.Core.FaultPolicy * ?target:MBrace.Core.IWorkerRef * ?additionalResources:MBrace.Core.Internals.ResourceRegistry * ?taskName:string -> MBrace.Runtime.CloudProcess<'T>
module Sieve
val getPrimes : nmax:int -> int []
Full name: Sieve.getPrimes
property MBrace.Runtime.CloudProcess.Id: string
val getJobRequest : v:string -> ctxt:'a -> Async<'b>
Full name: 200-starting-a-web-server-example.getJobRequest
val v : string
val task : MBrace.Runtime.CloudProcess
member MBrace.Runtime.MBraceClient.GetProcessById : processId:string -> MBrace.Runtime.CloudProcess
property MBrace.Runtime.CloudProcess.Status: MBrace.Core.CloudProcessStatus
abstract member MBrace.Runtime.CloudProcess.AwaitResultBoxed : ?timeoutMilliseconds:int -> Async<obj>
val choose : options:WebPart<'a> list -> WebPart<'a>
Full name: Suave.WebPart.choose
union case HttpMethod.GET: HttpMethod
union case HttpMethod.POST: HttpMethod
val cluster : MBrace.Thespian.ThespianCluster
Full name: 200-starting-a-web-server-example.cluster
member MBrace.Runtime.MBraceClient.ShowProcesses : unit -> unit
val endPointNames : obj
Full name: 200-starting-a-web-server-example.endPointNames
namespace Microsoft
namespace Microsoft.WindowsAzure
namespace Microsoft.WindowsAzure.ServiceRuntime
type RoleEnvironment =
static val TraceSource : TraceSource
static member CurrentRoleInstance : RoleInstance
static member DeploymentId : string
static member GetConfigurationSettingValue : configurationSettingName:string -> string
static member GetLocalResource : localResourceName:string -> LocalResource
static member IsAvailable : bool
static member IsEmulated : bool
static member RequestRecycle : unit -> unit
static member Roles : IDictionary<string, Role>
static event Changing : EventHandler<RoleEnvironmentChangingEventArgs>
...
Full name: Microsoft.WindowsAzure.ServiceRuntime.RoleEnvironment
property WindowsAzure.ServiceRuntime.RoleEnvironment.CurrentRoleInstance: WindowsAzure.ServiceRuntime.RoleInstance
property WindowsAzure.ServiceRuntime.RoleInstance.InstanceEndpoints: Collections.Generic.IDictionary<string,WindowsAzure.ServiceRuntime.RoleInstanceEndpoint>
property Collections.Generic.IDictionary.Keys: Collections.Generic.ICollection<string>
module Seq
from Microsoft.FSharp.Collections
val toArray : source:seq<'T> -> 'T []
Full name: Microsoft.FSharp.Collections.Seq.toArray
member MBrace.Runtime.MBraceClient.Run : workflow:MBrace.Core.Cloud<'T> * ?cancellationToken:MBrace.Core.ICloudCancellationToken * ?faultPolicy:MBrace.Core.FaultPolicy * ?target:MBrace.Core.IWorkerRef * ?additionalResources:MBrace.Core.Internals.ResourceRegistry * ?taskName:string -> 'T
val serverJob : MBrace.Runtime.CloudProcess<obj>
Full name: 200-starting-a-web-server-example.serverJob
val suaveServerInCloud : endPointName:'a -> mkApp:'b -> 'c
Full name: Webserver.suaveServerInCloud
member MBrace.Runtime.CloudProcess.ShowInfo : unit -> unit
override MBrace.Runtime.CloudProcess.ShowLogs : ?filter:(MBrace.Runtime.CloudLogEntry -> bool) -> unit