MBrace.Core and MBrace.Azure


Local Cloud Workflows

This tutorial is from the MBrace Starter Kit.

In this tutorial we will be offering an description of local cloud workflows and how they can be useful in avoiding common errors when developing for MBrace.

Motivation

Cloud workflows are computations that often span multiple machines. This means variables in scope often need to be serialized and sent to a remote machine for resumption of the computation. This leads to a new class of potential errors, as illustrated below:

1: 
2: 
3: 
4: 
5: 
6: 
cloud {
    let wc = new System.Net.WebClient()
    let download uri = cloud { return! wc.AsyncDownloadString (Uri uri) }
    let! results = Cloud.Parallel [download "http://mbrace.io" ; download "http://fsharp.org" ]
    return results
}

which when run yields

1: 
2: 
3: 
4: 
System.Runtime.Serialization.SerializationException: Cloud.Parallel<string> workflow 
uses non-serializable closures. 
    ---> Nessos.FsPickler.NonSerializableTypeException: Type 'FSI_0005+download@37-1' contains 
non-serializable field of type 'System.Net.WebClient'.

The obvious fix here is to remove the global WebClient instance, which is not serializable and replace it with localized instantiations:

1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 
9: 
cloud {
    let download uri = cloud { 
        let wc = new System.Net.WebClient()
        return! wc.AsyncDownloadString (Uri uri) 
    }

    let! results = Cloud.Parallel [download "http://mbrace.io" ; download "http://fsharp.org" ]
    return results
}

The example however does illustrate a more general problem; suppose we have a black-box cloud computation:

1: 
val comp : Cloud<int>

Dependending on the implementation, this could either introduce distribution:

1: 
2: 
3: 
4: 
5: 
let comp : Cloud<int> = cloud {
    let f x = cloud { return x }
    let! results = Cloud.Parallel [f 17 ; f 25]
    return Array.sum results
}

or no distribution at all:

1: 
2: 
3: 
4: 
5: 
6: 
let comp' : Cloud<int> = cloud {
    let f x = cloud { return x }
    let! a = f 17
    let! b = f 25
    return a + b
}

The two computations carry identical types, yet their execution patterns are very different. This can often lead to unanticipated errors, for instance

1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 
let test (arg : Cloud<'T>) = cloud {
    let wc = new System.Net.WebClient()
    let! _ = arg
    return wc.GetHashCode()
}

test comp  |> cluster.Run
test comp' |> cluster.Run

which would fail with a runtime serialization error only if comp entails distribution.

Local Cloud Workflows

For the reasons outlined above, MBrace comes with local cloud workflows. These are a special type of cloud computation which are necessarily constrained to execute within a single worker. They can defined using the special local builder:

1: 
let localWorkflow : LocalCloud<int> = local { return 42 }

This creates workflows of type LocalCloud<'T>, which is a subtype of Cloud<'T>. Local workflows can be safely used in place of cloud workflows, however the opposite is not possible. In other words, the workflow below type checks:

1: 
2: 
3: 
4: 
cloud {
    let! x = local { return 17 }
    return x + 25
}

However, attempting to distribute inside the body of a local workflow:

1: 
2: 
3: 
4: 
5: 
local {
    let f x = cloud { return x }
    let! x = Cloud.Parallel [f 17 ; f 25]
    return Array.sum x
}

yields a compile-time error:

1: 
2: 
3: 
4: 
error FS0001: This expression was expected to have type
    LocalCloud<'a>    
but here has type
    Cloud<int []>

In other words, local workflows provided a compile-time guarantee that their execution will never execute beyond the context of a single machine. This allows the MBrace library author to enforce a certain degree of sanity with respect to serialization:

1: 
2: 
3: 
4: 
5: 
let testLocal (arg : LocalCloud<'T>) = local {
    use wc = new System.Net.WebClient()
    let! _ = arg // execution guaranteed to not switch to separate machine
    return wc.GetHashCode() // hence 'wc' will only live within the context of a single address space
}

Applications

The MBrace core APIs already make heavy use of local workflows; most store primitive operations are of type LocalCloud<'T> since they usually do not entail distribution:

1: 
2: 
3: 
4: 
local {
    let! files = CloudFile.Enumerate "/container"
    return files.Length
}

The same happens with many library implementations:

1: 
2: 
3: 
4: 
5: 
open MBrace.Library.Cloud

val Balanced.mapReduceLocal : 
    mapper:('T -> LocalCloud<'R>) -> reducer:('R -> 'R -> LocalCloud<'R>) 
        -> init:'R -> inputs:seq<'T> -> Cloud<'R>

In this case, a distributed cloud computation is created given user-supplied computations that must be constrained to a single machine. This API restriction enables the library author to efficiently schedule computation across the cluster based on the assumption that user code will never escape the scope of a single machine per input.

Gotchas

It is important to clarify that even though local workflows do not introduce distribution inside their implementation, this does not imply that they are devoid of any serialization issues. Let's illustrate using a couple of examples. Running

1: 
local { return new System.Net.WebClient() } |> cluster.Run

fails with the error

1: 
2: 
System.Runtime.Serialization.SerializationException: Cloud process returns non-serializable type 'System.Net.WebClient'.
    at MBrace.Runtime.Combinators.runStartAsCloudProcess@297.Invoke(Unit unitVar)

This happens because the local computation returns result which cannot be serialized. Similarly

1: 
2: 
3: 
4: 
5: 
cloud {
    let wc = new System.Net.WebClient()
    let! proc = Cloud.CreateProcess(local { return wc.GetHashCode() })
    return proc.Result
} |> cluster.Run

fails with the error

1: 
2: 
System.Runtime.Serialization.SerializationException: Cloud process of type 'int' uses non-serializable closure. 
    ---> Nessos.FsPickler.NonSerializableTypeException: Type 'FSI_0020+it@230-7' contains non-serializable field of type 'System.Net.WebClient'.

Since its closure has been rendered nonserializable due to its containing an instance of type WebClient.

Summary

In this tutorial, you've learned how to use local workflows to avoid common errors when developing in MBrace. Continue with further samples to learn more about the MBrace programming model.

Note, you can use the above techniques from both scripts and compiled projects. To see the components referenced by this script, see ThespianCluster.fsx or AzureCluster.fsx.

namespace System
namespace System.IO
namespace MBrace
namespace MBrace.Core
module BuilderAsyncExtensions

from MBrace.Core
namespace MBrace.Flow
val cluster : MBrace.Thespian.ThespianCluster

Full name: 11-local-cloud-workflows.cluster
module Config
val GetCluster : unit -> MBrace.Thespian.ThespianCluster

Full name: Config.GetCluster


 Gets or creates a new Thespian cluster session.
namespace System.Net
Multiple items
type WebClient =
  inherit Component
  new : unit -> WebClient
  member BaseAddress : string with get, set
  member CachePolicy : RequestCachePolicy with get, set
  member CancelAsync : unit -> unit
  member Credentials : ICredentials with get, set
  member DownloadData : address:string -> byte[] + 1 overload
  member DownloadDataAsync : address:Uri -> unit + 1 overload
  member DownloadFile : address:string * fileName:string -> unit + 1 overload
  member DownloadFileAsync : address:Uri * fileName:string -> unit + 1 overload
  member DownloadString : address:string -> string + 1 overload
  ...

Full name: System.Net.WebClient

--------------------
Net.WebClient() : unit
Multiple items
type Uri =
  new : uriString:string -> Uri + 5 overloads
  member AbsolutePath : string
  member AbsoluteUri : string
  member Authority : string
  member DnsSafeHost : string
  member Equals : comparand:obj -> bool
  member Fragment : string
  member GetComponents : components:UriComponents * format:UriFormat -> string
  member GetHashCode : unit -> int
  member GetLeftPart : part:UriPartial -> string
  ...

Full name: System.Uri

--------------------
Uri(uriString: string) : unit
Uri(uriString: string, uriKind: UriKind) : unit
Uri(baseUri: Uri, relativeUri: string) : unit
Uri(baseUri: Uri, relativeUri: Uri) : unit
Multiple items
val int : value:'T -> int (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.int

--------------------
type int = int32

Full name: Microsoft.FSharp.Core.int

--------------------
type int<'Measure> = int

Full name: Microsoft.FSharp.Core.int<_>
val comp : obj

Full name: 11-local-cloud-workflows.comp
type Array =
  member Clone : unit -> obj
  member CopyTo : array:Array * index:int -> unit + 1 overload
  member GetEnumerator : unit -> IEnumerator
  member GetLength : dimension:int -> int
  member GetLongLength : dimension:int -> int64
  member GetLowerBound : dimension:int -> int
  member GetUpperBound : dimension:int -> int
  member GetValue : [<ParamArray>] indices:int[] -> obj + 7 overloads
  member Initialize : unit -> unit
  member IsFixedSize : bool
  ...

Full name: System.Array
val sum : array:'T [] -> 'T (requires member ( + ) and member get_Zero)

Full name: Microsoft.FSharp.Collections.Array.sum
val comp' : obj

Full name: 11-local-cloud-workflows.comp'
val test : arg:'a -> 'b

Full name: 11-local-cloud-workflows.test
val arg : 'a
member MBrace.Runtime.MBraceClient.Run : workflow:MBrace.Core.Cloud<'T> * ?cancellationToken:MBrace.Core.ICloudCancellationToken * ?faultPolicy:MBrace.Core.FaultPolicy * ?target:MBrace.Core.IWorkerRef * ?additionalResources:MBrace.Core.Internals.ResourceRegistry * ?taskName:string -> 'T
val localWorkflow : obj

Full name: 11-local-cloud-workflows.localWorkflow
val testLocal : arg:'a -> 'b

Full name: 11-local-cloud-workflows.testLocal
Multiple items
val seq : sequence:seq<'T> -> seq<'T>

Full name: Microsoft.FSharp.Core.Operators.seq

--------------------
type seq<'T> = System.Collections.Generic.IEnumerable<'T>

Full name: Microsoft.FSharp.Collections.seq<_>
Fork me on GitHub