MBrace.Core and MBrace.Azure


Exceptions and Fault tolerance

This tutorial is from the MBrace Starter Kit.

In this tutorial we will be offering an overview of the MBrace exception handling features as well as its fault tolerance mechanism.

Exception handling

Just like async, mbrace workflows support exception handling:

1: 
cloud { do failwith "kaboom!" } |> cluster.Run

Sending the above computation to your cluster will have the expected behaviour, any user exception will be caught and rethrown on the client side:

1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 
9: 
System.Exception: kaboom!
   at FSI_0010.it@24-3.Invoke(Unit unitVar)
   at MBrace.Core.BuilderImpl.Invoke@98.Invoke(Continuation`1 c) in ...
--- End of stack trace from previous location where exception was thrown ---
   at <StartupCode$MBrace-Runtime>.$CloudProcess.AwaitResult@211-2.Invoke(CloudProcessResult _arg2) in ...
   at Microsoft.FSharp.Control.AsyncBuilderImpl.args@835-1.Invoke(a a)
   at MBrace.Core.Internals.AsyncExtensions.Async.RunSync[T](FSharpAsync`1 workflow, ...
   at <StartupCode$FSI_0010>.$FSI_0010.main@() in ...
Stopped due to error

This has interesting ramifications when our cloud computation spans multiple machines:

1: 
2: 
3: 
4: 
5: 
cloud {
    let div m n = cloud { return m / n }
    let! results = Cloud.Parallel [for i in 1 .. 10 -> div 10 (5 - i) ]
    return Array.sum results
}

In the example above, we perform a calculation in parallel across the cluster in which one of the child work items are going to fail with a user exception. In the event of such an uncaught error, the exception will bubble up to the parent computation, actively cancelling any of the outstanding sibling computations:

1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 
9: 
System.DivideByZeroException: Attempted to divide by zero.
   at FSI_0013.div@47-1.Invoke(Unit unitVar)
   at MBrace.Core.BuilderImpl.Invoke@98.Invoke(Continuation`1 c) in ...
   at Cloud.Parallel(seq<Cloud<Int32>> computations)
--- End of stack trace from previous location where exception was thrown ---
   at <StartupCode$MBrace-Runtime>.$CloudProcess.AwaitResult@211-2.Invoke(CloudProcessResult _arg2) in ...
   at Microsoft.FSharp.Control.AsyncBuilderImpl.args@835-1.Invoke(a a)
   at MBrace.Core.Internals.AsyncExtensions.Async.RunSync[T](FSharpAsync`1 ...
   at <StartupCode$FSI_0014>.$FSI_0014.main@()

While the stacktrace offers a precise indication of what went wrong, it may be a bit ambiguous on how and where it went wrong. Let's see how we can use MBrace to improve this in our example:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
exception WorkerException of worker:IWorkerRef * input:int * exn:exn
with
    override e.Message = 
        sprintf "Worker '%O' given input %d has failed with exception:\n'%O'" 
            e.worker e.input e.exn


cloud {
    let div n = cloud { 
        try return 10 / (5 - n)
        with e -> 
            let! currentWorker = Cloud.CurrentWorker
            return raise (WorkerException(currentWorker, n, e))
    }

    let! results = Cloud.Parallel [for i in 1 .. 10 -> div i ]
    return Array.sum results
}

Which yields the following stacktrace:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
FSI_0023+WorkerException: Worker 'mbrace://grothendieck:52789' given input 5 has failed with exception: 
'System.DivideByZeroException: Attempted to divide by zero.
   at FSI_0024.div@88-32.Invoke(Unit unitVar)
   at MBrace.Core.BuilderImpl.Invoke@98.Invoke(Continuation`1 c) in ...'
   at FSI_0024.div@91-34.Invoke(IWorkerRef _arg2)
   at MBrace.Core.Builders.Bind@331-1.Invoke(ExecutionContext ctx, T t) in ...
   at Cloud.Parallel(seq<Cloud<Int32>> computations)
--- End of stack trace from previous location where exception was thrown ---
   at <StartupCode$MBrace-Runtime>.$CloudProcess.AwaitResult@211-2.Invoke(CloudProcessResult _arg2) in ...
   at Microsoft.FSharp.Control.AsyncBuilderImpl.args@835-1.Invoke(a a)
   at MBrace.Core.Internals.AsyncExtensions.Async.RunSync[T](FSharpAsync`1 workflow, FSharpOption`1 ...
   at <StartupCode$FSI_0025>.$FSI_0025.main@()

It is also possible to catch exceptions raised by distributed workflows:

1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 
cloud {
    let div m n = cloud { return m / n }
    try
        let! results = Cloud.Parallel [for i in 1 .. 10 -> div 10 (5 - i) ]
        return Some(Array.sum results)
    with :? DivideByZeroException ->
        return None
}

this will suppress any exceptions raised by the Parallel workflow and return a proper value to the client.

Computing partial results

It is often the case that this default behaviour (i.e. bubbling up and cancellation) may be undesirable, particularly when we are running an expensive distributed computation. Often, aggregating partial results is the prefered way to go. Let's see how we can encode this behaviour using MBrace:

1: 
2: 
3: 
4: 
5: 
cloud {
    let div m n = cloud { try return Choice1Of2(m / n) with e -> return Choice2Of2 e }
    let! results = Cloud.Parallel [for i in 1 .. 10 -> div 10 (5 - i) ]
    return results
}

Which when executed will result in the following value:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
val it : Choice<int,exn> [] =
  [|Choice1Of2 2; Choice1Of2 3; Choice1Of2 5; Choice1Of2 10;
    Choice2Of2
      System.DivideByZeroException: Attempted to divide by zero.
   at FSI_0031.div@140-47.Invoke(Unit unitVar)
   at MBrace.Core.BuilderImpl.Invoke@98.Invoke(Continuation`1 c) in ...
        {Data = dict [];
         HResult = -2147352558;
         HelpLink = null;
         InnerException = null;
         Message = "Attempted to divide by zero.";
         Source = "FSI-ASSEMBLY_f0c42c06-f5a8-45d0-ab7b-2fec2628dff0_10";
         StackTrace = "   at FSI_0031.div@140-47.Invoke(Unit unitVar)"
   at MBrace.Core.BuilderImpl.Invoke@98.Invoke(Continuation`1 c) in ...;
         TargetSite = null;}; Choice1Of2 -10; Choice1Of2 -5; Choice1Of2 -3;
    Choice1Of2 -2; Choice1Of2 -2|]

Fault tolerance

It is important at this point to make a distinction between user exceptions, i.e. runtime errors generated by user code and faults, errors that happen because of problems in an MBrace runtime. Faults can happen for a multitude of reasons:

  • Bugs in the runtime implementation.
  • Sudden death of a worker node: VMs of a cloud service can often be reset by the administrator without warning.
  • User errors that can cause the worker process to crash, such as stack overflows.

Let's have a closer look at an example of a fault, so that we gain a better understanding of how they work. First, we define a cloud function that forces the death of a worker by calling Environment.Exit on the process that it is being executed:

1: 
let die () = cloud { Environment.Exit 1 }

Let's try to run this on our cluster:

1: 
cluster.Run(die())

Sure enough, after a while we will be receiving the following exception:

1: 
2: 
3: 
4: 
5: 
MBrace.Core.FaultException: Work item '7927b7ad-f3ee-46cb-928b-92683f279722' was being processed by worker 'mbrace://grothendieck:52789' which has died.
   at <StartupCode$MBrace-Runtime>.$CloudProcess.AwaitResult@211-2.Invoke(CloudProcessResult _arg2) in ...
   at Microsoft.FSharp.Control.AsyncBuilderImpl.args@835-1.Invoke(a a)
   at MBrace.Core.Internals.AsyncExtensions.Async.RunSync[T](FSharpAsync`1 workflow, ...
   at <StartupCode$FSI_0035>.$FSI_0035.main@() in ...

Note that this computation has killed one of our worker instances. If working with MBrace on Azure, the service fabric will ensure that the dead instance will be reset. If working with MBrace on Thespian, you will have to manually replenish your cluster instance by calling the cluster.AttachNewLocalWorkers() method.

If we now call

1: 
cluster.ShowProcesses()

we can indeed verify that the last computation has faulted:

1: 
2: 
3: 
4: 
5: 
6: 
7: 
Processes

Name                            Process Id         Status  Execution Time  
----                            ----------         ------  --------------  
      67272653-9d80-403e-848a-8c99760cb943      Completed  00:00:00.5097248
      d1b47fe3-4cc0-4ad9-b22f-5ba2ec9f749e  UserException  00:00:00.8093416
      4e537251-0191-4afe-9a9a-fb3f966ee2ef        Faulted  00:07:07.1084813

MBrace will respond to faults in our cloud process by raising a FaultException. What differentiates fault exceptions from normal user exceptions is that they often cannot be caught by exception handling logic in the user level. For instance:

1: 
cloud { try return! die() with :? FaultException -> () } |> cluster.Run

Will not have any effect in the outcome of the computation. This happens because the exception handling clause is actually part of the work item which was to be executed by the worker that was killed. Compare this against

1: 
2: 
3: 
4: 
5: 
6: 
cloud { 
    try 
        let! _ = Cloud.Parallel [die();die()]
        return ()
    with :? FaultException -> return () 
} |> cluster.Run

which works as expected since the exception handling logic happens on the parent work item.

Working with fault policies

In stark contrast to our pathological die() example, most real faults actually happen because of transient errors in cluster deployments. It is often the case that we want our computation not to stop because of such minor faults. This is where fault policies come into play.

When sending a computation to the cloud, a fault policy can be specified. This indicates whether, and for how long a specific faulting computation should be retried:

1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 
/// die with a probability of 1 / N
let diePb (N : int) = cloud { if System.Random().Next(0, N) = 0 then return! die() }

let test() = cloud {
    let! N = Cloud.GetWorkerCount() // get the current number of workers
    let! _ = Cloud.Parallel [ for i in 1 .. N -> diePb N ]
    return ()
}

the computation as defined above introduces a significant probability of faulting at some point of its execution. We can compensate by applying the following a more flexible retry policy:

1: 
cluster.Run(test(), faultPolicy = FaultPolicy.WithMaxRetries 5)

This will cause any faulting part of the computation to yield a FaultException only after it has faulted more than 5 times.

Fault polices can also be scoped in our cloud code:

1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 
cloud {
    let! results1 = Cloud.Parallel [for i in 1 .. 10 -> cloud { return i * i }]
    let! results2 = 
        Cloud.Parallel [for i in 1 .. 10 -> diePb 10 ]
        |> Cloud.WithFaultPolicy (FaultPolicy.InfiniteRetries())

    return (results1, results2)
}

The example above uses the inherited fault policy for the first parallel computation, whereas the second parallel computation uses a custom fault policy, namely InfiniteRetries.

Faults & Partial results

The question that now arises is how would it be possible to recover partial results of a distributed computation in the presence of faults? This can be achieved through the use of runtime introspection primitives:

1: 
2: 
3: 
4: 
5: 
6: 
let run() = cloud {
    let! isPreviouslyFaulted = Cloud.IsPreviouslyFaulted
    if not isPreviouslyFaulted then do! die()
}

cluster.Run(run(), faultPolicy = FaultPolicy.WithMaxRetries 1)

The Cloud.IsPreviouslyFaulted primitive gives true if the current work item is part of a computation that has previously faulted and currently being retried. We can use this knowledge to dynamically alter its mode of execution.

Let's now have a look at a more useful example. Let's define a parallel combinator that returns partial results even in the presence of faults. First, let's define a result type:

1: 
2: 
3: 
4: 
type Result<'T> =
    | Success of 'T
    | Exception of exn
    | Fault of exn

we now define our protectedParallel combinator:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
let protectedParallel (computations : Cloud<'T> seq) = cloud {
    let protect (computation : Cloud<'T>) = cloud {
        let! faultData = Cloud.TryGetFaultData()
        match faultData with
        | None -> // computation not faulted, execute normally
            try let! t = computation in return Success t
            with e -> return Exception e

        | Some faultData -> // computation previously faulted, return an exception
            return Fault faultData.FaultException
    }

    return! 
        computations 
        |> Seq.map protect
        |> Cloud.Parallel 
        |> Cloud.WithFaultPolicy (FaultPolicy.WithMaxRetries 1)
}

the example makes use of the Cloud.TryGetFaultData() primitive to determine whether the current computation is a retry of a previously faulted operation.

Let's now test our workflow:

1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 
let test2() = cloud {
    let d = Random().Next(0,10)
    if d = 0 then do! die()
    if d < 5 then return failwithf "error %d" d
    else return d
}

protectedParallel [for i in 1 .. 10 -> test2() ] |> cluster.Run

Summary

In this tutorial, you've learned how to reason about exceptions and faults in MBrace. Continue with further samples to learn more about the MBrace programming model.

Note, you can use the above techniques from both scripts and compiled projects. To see the components referenced by this script, see ThespianCluster.fsx or AzureCluster.fsx.

namespace System
namespace System.IO
namespace MBrace
namespace MBrace.Core
module BuilderAsyncExtensions

from MBrace.Core
namespace MBrace.Flow
val cluster : MBrace.Thespian.ThespianCluster

Full name: 10-exceptions-and-fault-tolerance.cluster
module Config
val GetCluster : unit -> MBrace.Thespian.ThespianCluster

Full name: Config.GetCluster


 Gets or creates a new Thespian cluster session.
val failwith : message:string -> 'T

Full name: Microsoft.FSharp.Core.Operators.failwith
member MBrace.Runtime.MBraceClient.Run : workflow:MBrace.Core.Cloud<'T> * ?cancellationToken:MBrace.Core.ICloudCancellationToken * ?faultPolicy:MBrace.Core.FaultPolicy * ?target:MBrace.Core.IWorkerRef * ?additionalResources:MBrace.Core.Internals.ResourceRegistry * ?taskName:string -> 'T
type Array =
  member Clone : unit -> obj
  member CopyTo : array:Array * index:int -> unit + 1 overload
  member GetEnumerator : unit -> IEnumerator
  member GetLength : dimension:int -> int
  member GetLongLength : dimension:int -> int64
  member GetLowerBound : dimension:int -> int
  member GetUpperBound : dimension:int -> int
  member GetValue : [<ParamArray>] indices:int[] -> obj + 7 overloads
  member Initialize : unit -> unit
  member IsFixedSize : bool
  ...

Full name: System.Array
val sum : array:'T [] -> 'T (requires member ( + ) and member get_Zero)

Full name: Microsoft.FSharp.Collections.Array.sum
exception WorkerException of worker: obj * input: int * exn: exn

Full name: 10-exceptions-and-fault-tolerance.WorkerException
Multiple items
val int : value:'T -> int (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.int

--------------------
type int = int32

Full name: Microsoft.FSharp.Core.int

--------------------
type int<'Measure> = int

Full name: Microsoft.FSharp.Core.int<_>
type exn = Exception

Full name: Microsoft.FSharp.Core.exn
val e : WorkerException
override WorkerException.Message : string

Full name: 10-exceptions-and-fault-tolerance.WorkerException.Message
val sprintf : format:Printf.StringFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.sprintf
WorkerException.worker: obj
WorkerException.input: int
WorkerException.exn: exn
val raise : exn:Exception -> 'T

Full name: Microsoft.FSharp.Core.Operators.raise
union case Option.Some: Value: 'T -> Option<'T>
Multiple items
type DivideByZeroException =
  inherit ArithmeticException
  new : unit -> DivideByZeroException + 2 overloads

Full name: System.DivideByZeroException

--------------------
DivideByZeroException() : unit
DivideByZeroException(message: string) : unit
DivideByZeroException(message: string, innerException: exn) : unit
union case Option.None: Option<'T>
union case Choice.Choice1Of2: 'T1 -> Choice<'T1,'T2>
union case Choice.Choice2Of2: 'T2 -> Choice<'T1,'T2>
val die : unit -> 'a

Full name: 10-exceptions-and-fault-tolerance.die
type Environment =
  static member CommandLine : string
  static member CurrentDirectory : string with get, set
  static member Exit : exitCode:int -> unit
  static member ExitCode : int with get, set
  static member ExpandEnvironmentVariables : name:string -> string
  static member FailFast : message:string -> unit + 1 overload
  static member GetCommandLineArgs : unit -> string[]
  static member GetEnvironmentVariable : variable:string -> string + 1 overload
  static member GetEnvironmentVariables : unit -> IDictionary + 1 overload
  static member GetFolderPath : folder:SpecialFolder -> string + 1 overload
  ...
  nested type SpecialFolder
  nested type SpecialFolderOption

Full name: System.Environment
Environment.Exit(exitCode: int) : unit
member MBrace.Runtime.MBraceClient.ShowProcesses : unit -> unit
val diePb : N:int -> 'a

Full name: 10-exceptions-and-fault-tolerance.diePb


 die with a probability of 1 / N
val N : int
Multiple items
type Random =
  new : unit -> Random + 1 overload
  member Next : unit -> int + 2 overloads
  member NextBytes : buffer:byte[] -> unit
  member NextDouble : unit -> float

Full name: System.Random

--------------------
Random() : unit
Random(Seed: int) : unit
val test : unit -> 'a

Full name: 10-exceptions-and-fault-tolerance.test
val run : unit -> 'a

Full name: 10-exceptions-and-fault-tolerance.run
val not : value:bool -> bool

Full name: Microsoft.FSharp.Core.Operators.not
type Result<'T> =
  | Success of 'T
  | Exception of exn
  | Fault of exn

Full name: 10-exceptions-and-fault-tolerance.Result<_>
union case Result.Success: 'T -> Result<'T>
Multiple items
union case Result.Exception: exn -> Result<'T>

--------------------
type Exception =
  new : unit -> Exception + 2 overloads
  member Data : IDictionary
  member GetBaseException : unit -> Exception
  member GetObjectData : info:SerializationInfo * context:StreamingContext -> unit
  member GetType : unit -> Type
  member HelpLink : string with get, set
  member InnerException : Exception
  member Message : string
  member Source : string with get, set
  member StackTrace : string
  ...

Full name: System.Exception

--------------------
Exception() : unit
Exception(message: string) : unit
Exception(message: string, innerException: exn) : unit
union case Result.Fault: exn -> Result<'T>
val protectedParallel : computations:'a -> 'b

Full name: 10-exceptions-and-fault-tolerance.protectedParallel
val computations : 'a
Multiple items
val seq : sequence:seq<'T> -> seq<'T>

Full name: Microsoft.FSharp.Core.Operators.seq

--------------------
type seq<'T> = Collections.Generic.IEnumerable<'T>

Full name: Microsoft.FSharp.Collections.seq<_>
module Seq

from Microsoft.FSharp.Collections
val map : mapping:('T -> 'U) -> source:seq<'T> -> seq<'U>

Full name: Microsoft.FSharp.Collections.Seq.map
val test2 : unit -> 'a

Full name: 10-exceptions-and-fault-tolerance.test2
val failwithf : format:Printf.StringFormat<'T,'Result> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.failwithf
val i : int
Fork me on GitHub