MBrace.Core and MBrace.Azure


Programming model

The MBrace programming model is a language-integrated cloud programming DSL for use from F#. It offers a concise and elegant programming model which extends F# asynchronous workflows to the domain of distributed cloud computation.

The following tutorials demonstrate different aspects of the core programming model. All the tutorials and samples can be found in the MBrace Starter Kit

Examples:

More Techniques:

What follows is a general overview.

Cloud Workflows

In MBrace, the unit of computation is a cloud workflow:

1: 
let myFirstCloudWorkflow = cloud { return 42 }

Cloud workflows generate objects of type Cloud<'T>, which denote a delayed computation that once executed will yield a result of type 'T. Cloud workflows are language-integrated and can freely intertwine with native code.

1: 
2: 
3: 
4: 
let getDataCenterTime() = cloud {
    let now = System.DateTime.Now
    return now
}

Simple cloud computations can be composed into larger ones using the let! keyword:

1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 
let first = cloud { return 15 }
let second = cloud { return 27 }

cloud {
    let! x = first
    let! y = second
    return x + y
}

This creates bindings to the first and second workflows respectively, to be consumed by the continuations of the main computation. Once executed, this will sequentually perform the computations in first and second, resuming once the latter has completed.

For loops and while loops are possible:

1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 
9: 
/// Sequentially iterate and loop 
cloud {
    for i in [ 1 .. 100 ] do
        do! Cloud.Logf "Logging entry %d of 100" i

    while true do
        do! Cloud.Sleep 200
        do! Cloud.Log "Logging forever..."
}

MBrace workflows also integrate with exception handling:

1: 
2: 
3: 
4: 
5: 
6: 
7: 
cloud {
    try
        let! x = cloud { return 1 / 0 }
        return true

    with :? System.DivideByZeroException -> return false
}

Asynchronous workflows can be embedded into cloud workflows:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
let downloadAsync (url : string) = async {
    use webClient = new System.Net.WebClient()
    let! html = webClient.AsyncDownloadString(System.Uri url) 
    return html.Split('\n')
}


cloud {
    let! t1 = downloadAsync "http://www.nessos.gr/" |> Cloud.OfAsync
    let! t2 = downloadAsync "http://www.mbrace.io/" |> Cloud.OfAsync
    return t1.Length + t2.Length
}

Parallelism Combinators

Cloud workflows as discussed so far enable asynchronous computation but do not suffice in describing parallelism and distribution. To control this, MBrace uses a collection of primitive combinators that act on the distribution/parallelism semantics of execution in cloud workflows.

The previous example could be altered so that downloading happens in parallel:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
let downloadCloud url = downloadAsync url |> Cloud.OfAsync

cloud {

    let! results =
        [ "http://www.mbrace.io/"
          "http://www.nessos.gr/" ]
        |> List.map downloadCloud
        |> Cloud.Parallel

    return results |> Array.sumBy(fun r -> r.Length)
}

Here is a second example of Cloud.Parallel:

1: 
2: 
3: 
4: 
5: 
6: 
cloud {

    let n = System.Random().Next(50,100) // number of parallel jobs determined at runtime
    let! results = Cloud.Parallel [ for x in 1..n -> cloud { return x * x } ]
    return Array.sum results
}

Exception handling in workflows

For exception handling, consider the workflow:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
cloud {

    try
        let! results =
            [ "http://www.mbrace.io/"
              "http://www.nessos.gr/"
              "http://non.existent.domain/" ]
            |> List.map downloadCloud
            |> Cloud.Parallel

        return results |> Array.sumBy(fun r -> r.Length)

    with :? System.Net.WebException as e ->
        // log and reraise
        do! Cloud.Logf "Encountered error %O" e
        return raise e
}

In this case, one of the child computations will fail on account of an invalid url, creating an exception. In general, uncaught exceptions bubble up through Cloud.Parallel triggering cancellation of all outstanding child computations (just like Async.Parallel).

The exception handling clause will almost certainly be executed in a different machine than the one in which it was originally thrown. This is due to the cloud workflow, which allows exceptions, environments, closures to be passed around worker machines in a largely transparent manner.

Non-deterministic parallelism

MBrace provides the Cloud.Choice combinator that utilizes parallelism for non-deterministic algorithms. Cloud workflows of type Cloud<'T option> are said to be non-deterministic in the sense that their return type indicates either success with Some result or a negative answer with None.

Cloud.Choice combines a collection of arbitrary nondeterministic computations into one in which everything executes in parallel: it either returns Some result whenever a child happens to complete with Some result (cancelling all pending jobs) or None when all children have completed with None. It can be thought of as a distributed equivalent to the Seq.tryPick function found in the F# core library.

The following example defines a distributed search function based on Cloud.Choice:

1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 
9: 
let tryFind (f : 'T -> bool) (ts : 'T list) = cloud {
    let select t = cloud {
        return
            if f t then Some t
            else None
    }

    return! ts |> List.map select |> Cloud.Choice
}

Composing distributed workflows

Recursive and higher-order composition of cloud workflows is possible:

1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 
/// Sequentially fold along a set of jobs
let rec foldLeftCloud (f : 'State -> 'T -> Cloud<'State>) state ts = cloud {
    match ts with
    | [] -> return state
    | t :: ts' ->
        let! s' = f state t
        return! foldLeftCloud f s' ts'
}

Distributed Data

Cloud workflows offer a programming model for distributed computation. But what happens when it comes to data?

Small-to-medium-scale data can be transported implicitly as part of a cloud computation. This offers a limited (though extremely convenient) form of data distribution. However, it will not scale to all needs, particularly computations involving gigabytes of data.

MBrace offers a range of mechanisms for managing large-scale data in a more global and massive scale. These provide an essential decoupling between distributed computation and distributed data.

See also this summary of the MBrace abstractions for cloud data.

Cloud Values

The mbrace programming model offers access to persistable and cacheable distributed data entities known as cloud values. Cloud values very much resemble immutable data values found in F# but are stored in persisted cloud storage. The following workflow stores the downloaded content of a web page and returns a cloud value to it:

1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 
let textCell = cloud {
    // download a piece of data
    let! text = downloadCloud "http://www.mbrace.io/"
    // store data to a new CloudValue
    let! cref = CloudValue.New text
    // return the ref
    return cref
}

Dereferencing a cloud value can be done by getting its .Value property:

1: 
2: 
3: 
4: 
let dereference (data : CloudValue<byte []>) = cloud {
    let v = data.Value
    return v.Length
}

It is possible to define explicitly specify the StorageLevel used for the specific CloudValue instance:

1: 
2: 
3: 
4: 
5: 
cloud {
    let! text = downloadCloud "http://www.m-mbrace.net/largeFile.txt"
    let! cref = CloudValue.New(text, storageLevel = StorageLevel.MemoryAndDisk)
    return cref
}

This indicates the cloud value should be persisted using both disk storage and in-memory for dereferencing worker instances.

Example: Defining a MapReduce workflow

Cloud worklows in conjunctions with parallel combinators can be used to articulate MapReduce-like workflows. A simplistic version follows:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
let mapReduce (map : 'T -> 'R) (reduce : 'R -> 'R -> 'R)
              (identity : 'R) (inputs : 'T list) =

    let rec aux inputs = cloud {
        match inputs with
        | [] -> return identity
        | [t] -> return map t
        | _ ->
            let left,right = List.split inputs
            let! results = Cloud.Parallel [ aux left;  aux right ]
            return reduce results.[0] results.[1]
    }

    aux inputs

The workflow follows a divide-and-conquer approach, recursively partitioning input data until trivial cases are met. Recursive calls are passed through Cloud.Parallel, thus achieving the effect of distributed parallelism.

This is a naive conception of mapReduce, as it does not enable data parallelism nor does it take into account cluster granularity.

Cloud Files

Like cloud values, sequences and vectors, CloudFile is an immutable storage primitive that a references a file saved in the global store. In other words, it is an interface for storing or accessing binary blobs in the runtime.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
cloud {
    // enumerate all files from underlying storage container
    let! files = CloudFile.Enumerate "path/to/container"

    // read a cloud file and return its word count
    let wordCount (f : CloudFileInfo) = cloud {
        let! text = CloudFile.ReadAllText f.Path
        let count =
            text.Split(' ')
            |> Seq.groupBy id
            |> Seq.map (fun (token,instances) -> token, Seq.length instances)
            |> Seq.toArray

        return f.Path, count
    }

    // perform computation in parallel
    let! results = files |> Array.map wordCount |> Cloud.Parallel

    return results
}

Mutable cloud values (CloudAtom)

The CloudAtom primitive is, like CloudValue, a reference to data stored in the underlying cloud service. However, CloudAtoms are mutable. The value of a cloud atom can be updated and, as a result, its values are never cached. Mutable cloud values can be updated transactionally using the CloudAtom.Transact methods or forcibly using the CloudAtom.Force method.

The CloudAtom is a powerful primitive that can be used to create runtime-wide synchronization mechanisms like locks, semaphores, etc.

The following demonstrates simple use of the cloud atom:

1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 
9: 
let race () = cloud {
    let! ca = CloudAtom.New(0)
    let! _ =
        cloud { ca.Force 2 }
            <||>
        cloud { ca.Force 1 }

    return ca.Value
}

The snippet will return a result of either 1 or 2, depending on which update operation was run last.

The following snippet implements an transactionally incrementing function acting on a cloud atom:

1: 
2: 
3: 
let increment (counter : CloudAtom<int>) = cloud {
    do! Cloud.OfAsync <| counter.UpdateAsync(fun c -> c + 1)
}

Other Primitives

For more information and examples on the programming model, please refer to the API Reference.

namespace MBrace
namespace MBrace.Core
namespace MBrace.Thespian
namespace MBrace.Flow
val config : ThespianCluster

Full name: Programming-model.config
module Unchecked

from Microsoft.FSharp.Core.Operators
val defaultof<'T> : 'T

Full name: Microsoft.FSharp.Core.Operators.Unchecked.defaultof
type ThespianCluster =
  inherit MBraceClient
  private new : state:ClusterState * logLevel:LogLevel option * defaultFaultPolicy:FaultPolicy option -> ThespianCluster
  private new : state:ClusterState * manager:IRuntimeManager * defaultFaultPolicy:FaultPolicy option -> ThespianCluster
  member AttachNewLocalWorkers : workerCount:int * ?logLevel:LogLevel * ?quiet:bool -> unit
  member AttachWorker : worker:ThespianWorker -> unit
  member DetachWorker : worker:IWorkerRef -> unit
  member KillAllWorkers : unit -> unit
  member KillWorker : worker:IWorkerRef -> unit
  member IsWorkerHosted : bool
  member MasterNode : ThespianWorker option
  ...

Full name: MBrace.Thespian.ThespianCluster
val myFirstCloudWorkflow : obj

Full name: Programming-model.myFirstCloudWorkflow
val getDataCenterTime : unit -> 'a

Full name: Programming-model.getDataCenterTime
namespace System
Multiple items
type DateTime =
  struct
    new : ticks:int64 -> DateTime + 10 overloads
    member Add : value:TimeSpan -> DateTime
    member AddDays : value:float -> DateTime
    member AddHours : value:float -> DateTime
    member AddMilliseconds : value:float -> DateTime
    member AddMinutes : value:float -> DateTime
    member AddMonths : months:int -> DateTime
    member AddSeconds : value:float -> DateTime
    member AddTicks : value:int64 -> DateTime
    member AddYears : value:int -> DateTime
    ...
  end

Full name: System.DateTime

--------------------
System.DateTime()
   (+0 other overloads)
System.DateTime(ticks: int64) : unit
   (+0 other overloads)
System.DateTime(ticks: int64, kind: System.DateTimeKind) : unit
   (+0 other overloads)
System.DateTime(year: int, month: int, day: int) : unit
   (+0 other overloads)
System.DateTime(year: int, month: int, day: int, calendar: System.Globalization.Calendar) : unit
   (+0 other overloads)
System.DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int) : unit
   (+0 other overloads)
System.DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int, kind: System.DateTimeKind) : unit
   (+0 other overloads)
System.DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int, calendar: System.Globalization.Calendar) : unit
   (+0 other overloads)
System.DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int, millisecond: int) : unit
   (+0 other overloads)
System.DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int, millisecond: int, kind: System.DateTimeKind) : unit
   (+0 other overloads)
property System.DateTime.Now: System.DateTime
val first : obj

Full name: Programming-model.first
val second : obj

Full name: Programming-model.second
Multiple items
type DivideByZeroException =
  inherit ArithmeticException
  new : unit -> DivideByZeroException + 2 overloads

Full name: System.DivideByZeroException

--------------------
System.DivideByZeroException() : unit
System.DivideByZeroException(message: string) : unit
System.DivideByZeroException(message: string, innerException: exn) : unit
val downloadAsync : url:string -> Async<string []>

Full name: Programming-model.downloadAsync


 Sequentially iterate and loop
val url : string
Multiple items
val string : value:'T -> string

Full name: Microsoft.FSharp.Core.Operators.string

--------------------
type string = System.String

Full name: Microsoft.FSharp.Core.string
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val webClient : System.Net.WebClient
namespace System.Net
Multiple items
type WebClient =
  inherit Component
  new : unit -> WebClient
  member BaseAddress : string with get, set
  member CachePolicy : RequestCachePolicy with get, set
  member CancelAsync : unit -> unit
  member Credentials : ICredentials with get, set
  member DownloadData : address:string -> byte[] + 1 overload
  member DownloadDataAsync : address:Uri -> unit + 1 overload
  member DownloadFile : address:string * fileName:string -> unit + 1 overload
  member DownloadFileAsync : address:Uri * fileName:string -> unit + 1 overload
  member DownloadString : address:string -> string + 1 overload
  ...

Full name: System.Net.WebClient

--------------------
System.Net.WebClient() : unit
val html : string
member System.Net.WebClient.AsyncDownloadString : address:System.Uri -> Async<string>
Multiple items
type Uri =
  new : uriString:string -> Uri + 5 overloads
  member AbsolutePath : string
  member AbsoluteUri : string
  member Authority : string
  member DnsSafeHost : string
  member Equals : comparand:obj -> bool
  member Fragment : string
  member GetComponents : components:UriComponents * format:UriFormat -> string
  member GetHashCode : unit -> int
  member GetLeftPart : part:UriPartial -> string
  ...

Full name: System.Uri

--------------------
System.Uri(uriString: string) : unit
System.Uri(uriString: string, uriKind: System.UriKind) : unit
System.Uri(baseUri: System.Uri, relativeUri: string) : unit
System.Uri(baseUri: System.Uri, relativeUri: System.Uri) : unit
System.String.Split([<System.ParamArray>] separator: char []) : string []
System.String.Split(separator: string [], options: System.StringSplitOptions) : string []
System.String.Split(separator: char [], options: System.StringSplitOptions) : string []
System.String.Split(separator: char [], count: int) : string []
System.String.Split(separator: string [], count: int, options: System.StringSplitOptions) : string []
System.String.Split(separator: char [], count: int, options: System.StringSplitOptions) : string []
val downloadCloud : url:string -> 'a

Full name: Programming-model.downloadCloud
Multiple items
module List

from Microsoft.FSharp.Collections

--------------------
type List<'T> =
  | ( [] )
  | ( :: ) of Head: 'T * Tail: 'T list
  interface IEnumerable
  interface IEnumerable<'T>
  member GetSlice : startIndex:int option * endIndex:int option -> 'T list
  member Head : 'T
  member IsEmpty : bool
  member Item : index:int -> 'T with get
  member Length : int
  member Tail : 'T list
  static member Cons : head:'T * tail:'T list -> 'T list
  static member Empty : 'T list

Full name: Microsoft.FSharp.Collections.List<_>
val map : mapping:('T -> 'U) -> list:'T list -> 'U list

Full name: Microsoft.FSharp.Collections.List.map
module Array

from Microsoft.FSharp.Collections
val sumBy : projection:('T -> 'U) -> array:'T [] -> 'U (requires member ( + ) and member get_Zero)

Full name: Microsoft.FSharp.Collections.Array.sumBy
Multiple items
type Random =
  new : unit -> Random + 1 overload
  member Next : unit -> int + 2 overloads
  member NextBytes : buffer:byte[] -> unit
  member NextDouble : unit -> float

Full name: System.Random

--------------------
System.Random() : unit
System.Random(Seed: int) : unit
val sum : array:'T [] -> 'T (requires member ( + ) and member get_Zero)

Full name: Microsoft.FSharp.Collections.Array.sum
Multiple items
type WebException =
  inherit InvalidOperationException
  new : unit -> WebException + 4 overloads
  member GetObjectData : serializationInfo:SerializationInfo * streamingContext:StreamingContext -> unit
  member Response : WebResponse
  member Status : WebExceptionStatus

Full name: System.Net.WebException

--------------------
System.Net.WebException() : unit
System.Net.WebException(message: string) : unit
System.Net.WebException(message: string, innerException: exn) : unit
System.Net.WebException(message: string, status: System.Net.WebExceptionStatus) : unit
System.Net.WebException(message: string, innerException: exn, status: System.Net.WebExceptionStatus, response: System.Net.WebResponse) : unit
val raise : exn:System.Exception -> 'T

Full name: Microsoft.FSharp.Core.Operators.raise
val tryFind : f:('T -> bool) -> ts:'T list -> 'a

Full name: Programming-model.tryFind
val f : ('T -> bool)
type bool = System.Boolean

Full name: Microsoft.FSharp.Core.bool
val ts : 'T list
type 'T list = List<'T>

Full name: Microsoft.FSharp.Collections.list<_>
union case Option.Some: Value: 'T -> Option<'T>
union case Option.None: Option<'T>
Multiple items
type Choice<'T1,'T2> =
  | Choice1Of2 of 'T1
  | Choice2Of2 of 'T2

Full name: Microsoft.FSharp.Core.Choice<_,_>

--------------------
type Choice<'T1,'T2,'T3> =
  | Choice1Of3 of 'T1
  | Choice2Of3 of 'T2
  | Choice3Of3 of 'T3

Full name: Microsoft.FSharp.Core.Choice<_,_,_>

--------------------
type Choice<'T1,'T2,'T3,'T4> =
  | Choice1Of4 of 'T1
  | Choice2Of4 of 'T2
  | Choice3Of4 of 'T3
  | Choice4Of4 of 'T4

Full name: Microsoft.FSharp.Core.Choice<_,_,_,_>

--------------------
type Choice<'T1,'T2,'T3,'T4,'T5> =
  | Choice1Of5 of 'T1
  | Choice2Of5 of 'T2
  | Choice3Of5 of 'T3
  | Choice4Of5 of 'T4
  | Choice5Of5 of 'T5

Full name: Microsoft.FSharp.Core.Choice<_,_,_,_,_>

--------------------
type Choice<'T1,'T2,'T3,'T4,'T5,'T6> =
  | Choice1Of6 of 'T1
  | Choice2Of6 of 'T2
  | Choice3Of6 of 'T3
  | Choice4Of6 of 'T4
  | Choice5Of6 of 'T5
  | Choice6Of6 of 'T6

Full name: Microsoft.FSharp.Core.Choice<_,_,_,_,_,_>

--------------------
type Choice<'T1,'T2,'T3,'T4,'T5,'T6,'T7> =
  | Choice1Of7 of 'T1
  | Choice2Of7 of 'T2
  | Choice3Of7 of 'T3
  | Choice4Of7 of 'T4
  | Choice5Of7 of 'T5
  | Choice6Of7 of 'T6
  | Choice7Of7 of 'T7

Full name: Microsoft.FSharp.Core.Choice<_,_,_,_,_,_,_>
val foldLeftCloud : f:('State -> 'T -> 'a) -> state:'b -> ts:'c -> 'd

Full name: Programming-model.foldLeftCloud


 Sequentially fold along a set of jobs
val f : ('State -> 'T -> 'a)
val state : 'b
val ts : 'c
val textCell : obj

Full name: Programming-model.textCell
val dereference : data:'a -> 'b

Full name: Programming-model.dereference
val data : 'a
Multiple items
val byte : value:'T -> byte (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.byte

--------------------
type byte = System.Byte

Full name: Microsoft.FSharp.Core.byte
val split : ts:'a -> 'a * 'a

Full name: Programming-model.List.split


 splits a list into two halves
val ts : 'a
val mapReduce : map:('T -> 'R) -> reduce:('R -> 'R -> 'R) -> identity:'R -> inputs:'T list -> 'a

Full name: Programming-model.mapReduce
val map : ('T -> 'R)
val reduce : ('R -> 'R -> 'R)
val identity : 'R
val inputs : 'T list
val aux : ('b -> 'c)
val inputs : 'b
Multiple items
module List

from Programming-model

--------------------
module List

from Microsoft.FSharp.Collections

--------------------
type List<'T> =
  | ( [] )
  | ( :: ) of Head: 'T * Tail: 'T list
  interface IEnumerable
  interface IEnumerable<'T>
  member GetSlice : startIndex:int option * endIndex:int option -> 'T list
  member Head : 'T
  member IsEmpty : bool
  member Item : index:int -> 'T with get
  member Length : int
  member Tail : 'T list
  static member Cons : head:'T * tail:'T list -> 'T list
  static member Empty : 'T list

Full name: Microsoft.FSharp.Collections.List<_>
module Seq

from Microsoft.FSharp.Collections
val groupBy : projection:('T -> 'Key) -> source:seq<'T> -> seq<'Key * seq<'T>> (requires equality)

Full name: Microsoft.FSharp.Collections.Seq.groupBy
val id : x:'T -> 'T

Full name: Microsoft.FSharp.Core.Operators.id
val map : mapping:('T -> 'U) -> source:seq<'T> -> seq<'U>

Full name: Microsoft.FSharp.Collections.Seq.map
val length : source:seq<'T> -> int

Full name: Microsoft.FSharp.Collections.Seq.length
val toArray : source:seq<'T> -> 'T []

Full name: Microsoft.FSharp.Collections.Seq.toArray
val map : mapping:('T -> 'U) -> array:'T [] -> 'U []

Full name: Microsoft.FSharp.Collections.Array.map
val race : unit -> 'a

Full name: Programming-model.race
val increment : counter:'a -> 'b

Full name: Programming-model.increment
val counter : 'a
Multiple items
val int : value:'T -> int (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.int

--------------------
type int = int32

Full name: Microsoft.FSharp.Core.int

--------------------
type int<'Measure> = int

Full name: Microsoft.FSharp.Core.int<_>
Fork me on GitHub