MBrace.Core and MBrace.Azure


Running MBrace workflows in the Thread Pool

It is possible to run and test your MBrace workflows using the local thread pool. To do this, you will need to reference MBrace.Runtime library.

1: 
2: 
3: 
4: 
5: 
open MBrace.ThreadPool

let tp = ThreadPoolRuntime.Create() // creates a thread pool handle

tp.RunSynchronously(cloud { printfn "Hello, World!"})

The thread pool runtime can be used to test arbitrary cloud code in the confines of your current process:

1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 
let test () = cloud {
    let! atom = CloudAtom.New<int>(0)
    let incr () = cloud { do! atom.UpdateAsync (fun i -> i + 1) }
    let! _ = Cloud.Parallel [for i in 1 .. 100 -> incr () ]
    return atom.Value
}

tp.RunSynchronously(test ())

Emulating distribution

By default, the thread pool runtime uses the expected async semantics, i.e. everything runs in shared memory. However, when debugging cloud applications it is often useful to emulate conditions particular to distribution. In the thread pool runtime this can be done by overriding the MemoryEmulation parameter, which is an enumeration with three possible values:

  1. Shared: the default async semantics, values passed to child workflows are shared among worker threads.
  2. EnsureSerializable: values passed to child workflows are shared among worker threads, however checks are made at runtime to ensure that closures are serializable.

  3. Copied: values are passed to child workflows as cloned copies, ensuring proper emulation of distribution semantics.

Let's highlight the differences using a couple of examples:

1: 
let example1 = cloud { return new System.Net.WebClient() }

Running example1 with the default shared semantics produces no error

1: 
tp.RunSynchronously(example1, memoryEmulation = MemoryEmulation.Shared)

however attempting to do the same with either of the other two options

1: 
2: 
tp.RunSynchronously(example1, memoryEmulation = MemoryEmulation.EnsureSerializable)
tp.RunSynchronously(example1, memoryEmulation = MemoryEmulation.Copied)

produces the error:

System.Runtime.Serialization.SerializationException: Cloud process returns non-serializable type 'System.Net.WebClient'.

which is precisely what would happen if we were to run the computation in a distributed MBrace cluster. Consider now the following example

1: 
2: 
3: 
4: 
5: 
6: 
let example2 = cloud {
    let i = ref 0
    let incr () = cloud { ignore <| System.Threading.Interlocked.Increment i }
    let! _ = Cloud.Parallel [for i in 1 .. 100 -> incr() ]
    return !i
}

Running the example using Shared and EnsureSerializable emulation modes produces the same expected result (100):

1: 
2: 
tp.RunSynchronously(example2, memoryEmulation = MemoryEmulation.EnsureSerializable)
tp.RunSynchronously(example2, memoryEmulation = MemoryEmulation.Shared)

However, using Copied produces a wholly different output (0):

1: 
tp.RunSynchronously(example2, memoryEmulation = MemoryEmulation.Copied)

This is entirely consistent with the behaviour of the workflow in the cloud, since each child work item will be performing the increment operation in a copy of the value.

Testing MBrace code locally before deploying

When working with a distributed MBrace cluster, it is easy to test your code locally before deploying using the companion ThreadPool runtime available to every MBrace client instance:

1: 
someDistributedCluster.RunLocally(example2, memoryEmulation = MemoryEmulation.Copied)

When we are certain that everything works fine in the local process, we can confidently deploy as usual

1: 
someDistributedCluster.CreateProcess(example2)

Gotchas

It should always be kept in mind that thread pool emulation does not sufficiently reproduce all fine semantics variations that occur when executing the same workflow in the cloud. The simplest example are side-effects:

1: 
2: 
3: 
4: 
5: 
let hello = cloud { printfn "Hello, World" }

tp.RunSynchronously(hello) // writes to stdout of the current process, effect easily observable

someDistributedCluster.Run(hello) // writes to stdout of some worker process, effect will most likely not be observed

In general, side effects affecting global state behave differently in the local as opposed to the distributed setting:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
type GlobalCounter private () =
    static let mutable count = 0
    static member Value = count
    static member Incr() = System.Threading.Interlocked.Increment(&count)
    static member Reset() = count <- 0


let example3 = cloud {
    GlobalCounter.Reset()
    let! _ = Cloud.Parallel [for i in 1 .. 100 -> cloud { ignore <| GlobalCounter.Incr() }]
    return GlobalCounter.Value
}

tp.RunSynchronously(example3, memoryEmulation = MemoryEmulation.Copied) // 100, as expected

someDistributedCluster.Run(example3) // nondeterministic result

Summary

In this tutorial, you've learned about MBrace.ThreadPool as ways of executing and testing cloud workflows using the thread pool of your local client process.

Note, you can use the above techniques from both scripts and compiled projects. To see the components referenced by this script, see ThespianCluster.fsx or AzureCluster.fsx.

namespace MBrace
namespace MBrace.Core
module BuilderAsyncExtensions

from MBrace.Core
namespace MBrace.ThreadPool
namespace MBrace.Flow
val someDistributedCluster : MBrace.Runtime.MBraceClient

Full name: 400-advanced-core-thread-pool.someDistributedCluster
module Unchecked

from Microsoft.FSharp.Core.Operators
val defaultof<'T> : 'T

Full name: Microsoft.FSharp.Core.Operators.Unchecked.defaultof
namespace MBrace.Runtime
val tp : ThreadPoolRuntime

Full name: 400-advanced-core-thread-pool.tp
type ThreadPoolRuntime =
  private new : resources:ResourceRegistry * _logger:ICloudLogger * _memoryEmulation:MemoryEmulation -> ThreadPoolRuntime
  member RunSynchronously : workflow:Cloud<'T> * cancellationToken:CancellationToken * ?memoryEmulation:MemoryEmulation * ?logger:ICloudLogger * ?resources:ResourceRegistry -> 'T
  member RunSynchronously : workflow:Cloud<'T> * ?cancellationToken:ICloudCancellationToken * ?memoryEmulation:MemoryEmulation * ?logger:ICloudLogger * ?resources:ResourceRegistry -> 'T
  member StartAsTask : workflow:Cloud<'T> * cancellationToken:CancellationToken * ?memoryEmulation:MemoryEmulation * ?logger:ICloudLogger * ?resources:ResourceRegistry -> ThreadPoolProcess<'T>
  member StartAsTask : workflow:Cloud<'T> * ?cancellationToken:ICloudCancellationToken * ?memoryEmulation:MemoryEmulation * ?logger:ICloudLogger * ?resources:ResourceRegistry -> ThreadPoolProcess<'T>
  member ToAsync : workflow:Cloud<'T> * ?memoryEmulation:MemoryEmulation * ?logger:ICloudLogger * ?resources:ResourceRegistry -> Async<'T>
  member MemoryEmulation : MemoryEmulation
  member Resources : ResourceRegistry
  static member Create : ?logger:ICloudLogger * ?memoryEmulation:MemoryEmulation * ?fileStore:ICloudFileStore * ?serializer:ISerializer * ?textSerializer:ITextSerializer * ?valueProvider:ICloudValueProvider * ?atomProvider:ICloudAtomProvider * ?queueProvider:ICloudQueueProvider * ?dictionaryProvider:ICloudDictionaryProvider * ?resources:ResourceRegistry -> ThreadPoolRuntime
  static member CreateCancellationToken : sysToken:CancellationToken -> ThreadPoolCancellationToken
  ...

Full name: MBrace.ThreadPool.ThreadPoolRuntime
static member ThreadPoolRuntime.Create : ?logger:MBrace.Core.Internals.ICloudLogger * ?memoryEmulation:MBrace.Core.MemoryEmulation * ?fileStore:MBrace.Core.Internals.ICloudFileStore * ?serializer:MBrace.Core.Internals.ISerializer * ?textSerializer:MBrace.Core.Internals.ITextSerializer * ?valueProvider:MBrace.Core.Internals.ICloudValueProvider * ?atomProvider:MBrace.Core.Internals.ICloudAtomProvider * ?queueProvider:MBrace.Core.Internals.ICloudQueueProvider * ?dictionaryProvider:MBrace.Core.Internals.ICloudDictionaryProvider * ?resources:MBrace.Core.Internals.ResourceRegistry -> ThreadPoolRuntime
member ThreadPoolRuntime.RunSynchronously : workflow:MBrace.Core.Cloud<'T> * cancellationToken:System.Threading.CancellationToken * ?memoryEmulation:MBrace.Core.MemoryEmulation * ?logger:MBrace.Core.Internals.ICloudLogger * ?resources:MBrace.Core.Internals.ResourceRegistry -> 'T
member ThreadPoolRuntime.RunSynchronously : workflow:MBrace.Core.Cloud<'T> * ?cancellationToken:MBrace.Core.ICloudCancellationToken * ?memoryEmulation:MBrace.Core.MemoryEmulation * ?logger:MBrace.Core.Internals.ICloudLogger * ?resources:MBrace.Core.Internals.ResourceRegistry -> 'T
val printfn : format:Printf.TextWriterFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
val test : unit -> 'a

Full name: 400-advanced-core-thread-pool.test
Multiple items
val int : value:'T -> int (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.int

--------------------
type int = int32

Full name: Microsoft.FSharp.Core.int

--------------------
type int<'Measure> = int

Full name: Microsoft.FSharp.Core.int<_>
val incr : cell:int ref -> unit

Full name: Microsoft.FSharp.Core.Operators.incr
val example1 : MBrace.Core.Cloud<obj>

Full name: 400-advanced-core-thread-pool.example1
namespace System
namespace System.Net
Multiple items
type WebClient =
  inherit Component
  new : unit -> WebClient
  member BaseAddress : string with get, set
  member CachePolicy : RequestCachePolicy with get, set
  member CancelAsync : unit -> unit
  member Credentials : ICredentials with get, set
  member DownloadData : address:string -> byte[] + 1 overload
  member DownloadDataAsync : address:Uri -> unit + 1 overload
  member DownloadFile : address:string * fileName:string -> unit + 1 overload
  member DownloadFileAsync : address:Uri * fileName:string -> unit + 1 overload
  member DownloadString : address:string -> string + 1 overload
  ...

Full name: System.Net.WebClient

--------------------
System.Net.WebClient() : unit
val example2 : MBrace.Core.Cloud<obj>

Full name: 400-advanced-core-thread-pool.example2
Multiple items
val ref : value:'T -> 'T ref

Full name: Microsoft.FSharp.Core.Operators.ref

--------------------
type 'T ref = Ref<'T>

Full name: Microsoft.FSharp.Core.ref<_>
val ignore : value:'T -> unit

Full name: Microsoft.FSharp.Core.Operators.ignore
namespace System.Threading
type Interlocked =
  static member Add : location1:int * value:int -> int + 1 overload
  static member CompareExchange : location1:int * value:int * comparand:int -> int + 6 overloads
  static member Decrement : location:int -> int + 1 overload
  static member Exchange : location1:int * value:int -> int + 6 overloads
  static member Increment : location:int -> int + 1 overload
  static member Read : location:int64 -> int64

Full name: System.Threading.Interlocked
System.Threading.Interlocked.Increment(location: byref<int64>) : int64
System.Threading.Interlocked.Increment(location: byref<int>) : int
member MBrace.Runtime.MBraceClient.RunLocally : workflow:MBrace.Core.Cloud<'T> * ?cancellationToken:MBrace.Core.ICloudCancellationToken * ?memoryEmulation:MBrace.Core.MemoryEmulation -> 'T
member MBrace.Runtime.MBraceClient.CreateProcess : workflow:MBrace.Core.Cloud<'T> * ?cancellationToken:MBrace.Core.ICloudCancellationToken * ?faultPolicy:MBrace.Core.FaultPolicy * ?target:MBrace.Core.IWorkerRef * ?additionalResources:MBrace.Core.Internals.ResourceRegistry * ?taskName:string -> MBrace.Runtime.CloudProcess<'T>
val hello : MBrace.Core.Cloud<obj>

Full name: 400-advanced-core-thread-pool.hello
member MBrace.Runtime.MBraceClient.Run : workflow:MBrace.Core.Cloud<'T> * ?cancellationToken:MBrace.Core.ICloudCancellationToken * ?faultPolicy:MBrace.Core.FaultPolicy * ?target:MBrace.Core.IWorkerRef * ?additionalResources:MBrace.Core.Internals.ResourceRegistry * ?taskName:string -> 'T
Multiple items
type GlobalCounter =
  private new : unit -> GlobalCounter
  static member Incr : unit -> int
  static member Reset : unit -> unit
  static member Value : int

Full name: 400-advanced-core-thread-pool.GlobalCounter

--------------------
private new : unit -> GlobalCounter
val mutable count : int
static member GlobalCounter.Value : int

Full name: 400-advanced-core-thread-pool.GlobalCounter.Value
static member GlobalCounter.Incr : unit -> int

Full name: 400-advanced-core-thread-pool.GlobalCounter.Incr
static member GlobalCounter.Reset : unit -> unit

Full name: 400-advanced-core-thread-pool.GlobalCounter.Reset
val example3 : MBrace.Core.Cloud<obj>

Full name: 400-advanced-core-thread-pool.example3
type GlobalCounter =
  private new : unit -> GlobalCounter
  static member Incr : unit -> int
  static member Reset : unit -> unit
  static member Value : int

Full name: 400-advanced-core-thread-pool.GlobalCounter
static member GlobalCounter.Reset : unit -> unit
static member GlobalCounter.Incr : unit -> int
property GlobalCounter.Value: int
Fork me on GitHub