MBrace.Core and MBrace.Azure


Introduction to Data Parallel Cloud Flows

This tutorial is from the MBrace Starter Kit.

You now learn the CloudFlow programming model, for cloud-scheduled parallel data flow tasks. This model is similar to Hadoop and Spark.

CloudFlow.ofArray partitions the input array based on the number of available workers. The parts of the array are then fed into cloud tasks implementing the map and filter stages. The final 'countBy' stage is implemented by a final cloud task.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
let inputs = [| 1..100 |]

let streamComputationTask = 
    inputs
    |> CloudFlow.OfArray
    |> CloudFlow.map (fun num -> num * num)
    |> CloudFlow.map (fun num -> num % 10)
    |> CloudFlow.countBy id
    |> CloudFlow.toArray
    |> cluster.CreateProcess

Next, check the progress of your job.

Note: the number of cloud tasks involved, which should be the number of workers * 2. This indicates the input array has been partitioned and the work carried out in a distributed way.

1: 
streamComputationTask.ShowInfo()

Next, await the result

1: 
streamComputationTask.Result

Data parallel cloud flows can be used for all sorts of things. Later, you will see how to source the inputs to the data flow from a collection of cloud files, or from a partitioned cloud vector.

Changing the degree of parallelism

The default is to partition the input array between all available workers.

You can also use CloudFlow.withDegreeOfParallelism to specify the degree of partitioning of the stream at any point in the pipeline.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
let numbers = [| for i in 1 .. 30 -> 50000000 |]

let computePrimesTask = 
    numbers
    |> CloudFlow.OfArray
    |> CloudFlow.withDegreeOfParallelism 6
    |> CloudFlow.map (fun n -> Sieve.getPrimes n)
    |> CloudFlow.map (fun primes -> sprintf "calculated %d primes: %A" primes.Length primes)
    |> CloudFlow.toArray
    |> cluster.CreateProcess 

(** Next, check if the work is done *) 
computePrimesTask.ShowInfo()

(** Next, await the result *) 
let computePrimes = computePrimesTask.Result

Persisting intermediate results to cloud storage

Results of a flow computation can be persisted to store by terminating with a call to CloudFlow.persist/persistaCached. This creates a PersistedCloudFlow instance that can be reused without performing recomputations of the original flow.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
let persistedCloudFlow =
    inputs
    |> CloudFlow.OfArray
    |> CloudFlow.collect(fun i -> seq {for j in 1 .. 10000 -> (i+j, string j) })
    |> CloudFlow.persist StorageLevel.Memory
    |> cluster.Run


let length = persistedCloudFlow |> CloudFlow.length |> cluster.Run
let max = persistedCloudFlow |> CloudFlow.maxBy fst |> cluster.Run

Summary

In this tutorial, you've learned the basics of the CloudFlow programming model, a powerful data-flow model for scalable pipelines of data. Continue with further samples to learn more about the MBrace programming model.

Note, you can use the above techniques from both scripts and compiled projects. To see the components referenced by this script, see ThespianCluster.fsx or AzureCluster.fsx.

namespace System
namespace System.IO
namespace MBrace
namespace MBrace.Core
namespace MBrace.Flow
val cluster : MBrace.Thespian.ThespianCluster

Full name: 4-cloud-parallel-data-flow.cluster
module Config
val GetCluster : unit -> MBrace.Thespian.ThespianCluster

Full name: Config.GetCluster


 Gets or creates a new Thespian cluster session.
val inputs : int []

Full name: 4-cloud-parallel-data-flow.inputs
val streamComputationTask : MBrace.Runtime.CloudProcess<(int * int64) []>

Full name: 4-cloud-parallel-data-flow.streamComputationTask
Multiple items
module CloudFlow

from MBrace.Flow

--------------------
type CloudFlow =
  static member OfArray : source:'T [] -> CloudFlow<'T>
  static member OfCloudArrays : cloudArrays:seq<#CloudArray<'T>> -> LocalCloud<PersistedCloudFlow<'T>>
  static member OfCloudCollection : collection:ICloudCollection<'T> * ?sizeThresholdPerWorker:(unit -> int64) -> CloudFlow<'T>
  static member OfCloudDirectory : dirPath:string * serializer:ISerializer * ?sizeThresholdPerCore:int64 -> CloudFlow<'T>
  static member OfCloudDirectory : dirPath:string * ?deserializer:(Stream -> seq<'T>) * ?sizeThresholdPerCore:int64 -> CloudFlow<'T>
  static member OfCloudDirectory : dirPath:string * deserializer:(TextReader -> seq<'T>) * ?encoding:Encoding * ?sizeThresholdPerCore:int64 -> CloudFlow<'T>
  static member OfCloudDirectoryByLine : dirPath:string * ?encoding:Encoding * ?sizeThresholdPerCore:int64 -> CloudFlow<string>
  static member OfCloudFileByLine : path:string * ?encoding:Encoding -> CloudFlow<string>
  static member OfCloudFileByLine : paths:seq<string> * ?encoding:Encoding * ?sizeThresholdPerCore:int64 -> CloudFlow<string>
  static member OfCloudFiles : paths:seq<string> * serializer:ISerializer * ?sizeThresholdPerCore:int64 -> CloudFlow<'T>
  ...

Full name: MBrace.Flow.CloudFlow

--------------------
type CloudFlow<'T> =
  interface
    abstract member WithEvaluators : collectorFactory:LocalCloud<Collector<'T,'S>> -> projection:('S -> LocalCloud<'R>) -> combiner:('R [] -> LocalCloud<'R>) -> Cloud<'R>
    abstract member DegreeOfParallelism : int option
  end

Full name: MBrace.Flow.CloudFlow<_>
static member CloudFlow.OfArray : source:'T [] -> CloudFlow<'T>
val map : f:('T -> 'R) -> flow:CloudFlow<'T> -> CloudFlow<'R>

Full name: MBrace.Flow.CloudFlow.map
val num : int
val countBy : projection:('T -> 'Key) -> flow:CloudFlow<'T> -> CloudFlow<'Key * int64> (requires equality)

Full name: MBrace.Flow.CloudFlow.countBy
val id : x:'T -> 'T

Full name: Microsoft.FSharp.Core.Operators.id
val toArray : flow:CloudFlow<'T> -> MBrace.Core.Cloud<'T []>

Full name: MBrace.Flow.CloudFlow.toArray
member MBrace.Runtime.MBraceClient.CreateProcess : workflow:MBrace.Core.Cloud<'T> * ?cancellationToken:MBrace.Core.ICloudCancellationToken * ?faultPolicy:MBrace.Core.FaultPolicy * ?target:MBrace.Core.IWorkerRef * ?additionalResources:MBrace.Core.Internals.ResourceRegistry * ?taskName:string -> MBrace.Runtime.CloudProcess<'T>
member MBrace.Runtime.CloudProcess.ShowInfo : unit -> unit
property MBrace.Runtime.CloudProcess.Result: (int * int64) []
val numbers : int []

Full name: 4-cloud-parallel-data-flow.numbers
val i : int
val computePrimesTask : MBrace.Runtime.CloudProcess<string []>

Full name: 4-cloud-parallel-data-flow.computePrimesTask
val withDegreeOfParallelism : degreeOfParallelism:int -> flow:CloudFlow<'T> -> CloudFlow<'T>

Full name: MBrace.Flow.CloudFlow.withDegreeOfParallelism
val n : int
module Sieve
val getPrimes : nmax:int -> int []

Full name: Sieve.getPrimes
val primes : int []
val sprintf : format:Printf.StringFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.sprintf
property Array.Length: int
val computePrimes : string []

Full name: 4-cloud-parallel-data-flow.computePrimes
property MBrace.Runtime.CloudProcess.Result: string []
val persistedCloudFlow : PersistedCloudFlow<int * string>

Full name: 4-cloud-parallel-data-flow.persistedCloudFlow
val collect : f:('T -> #seq<'R>) -> flow:CloudFlow<'T> -> CloudFlow<'R>

Full name: MBrace.Flow.CloudFlow.collect
Multiple items
val seq : sequence:seq<'T> -> seq<'T>

Full name: Microsoft.FSharp.Core.Operators.seq

--------------------
type seq<'T> = Collections.Generic.IEnumerable<'T>

Full name: Microsoft.FSharp.Collections.seq<_>
val j : int
Multiple items
val string : value:'T -> string

Full name: Microsoft.FSharp.Core.Operators.string

--------------------
type string = String

Full name: Microsoft.FSharp.Core.string
val persist : storageLevel:MBrace.Core.StorageLevel -> flow:CloudFlow<'T> -> MBrace.Core.Cloud<PersistedCloudFlow<'T>>

Full name: MBrace.Flow.CloudFlow.persist
member MBrace.Runtime.MBraceClient.Run : workflow:MBrace.Core.Cloud<'T> * ?cancellationToken:MBrace.Core.ICloudCancellationToken * ?faultPolicy:MBrace.Core.FaultPolicy * ?target:MBrace.Core.IWorkerRef * ?additionalResources:MBrace.Core.Internals.ResourceRegistry * ?taskName:string -> 'T
val length : int64

Full name: 4-cloud-parallel-data-flow.length
val length : flow:CloudFlow<'T> -> MBrace.Core.Cloud<int64>

Full name: MBrace.Flow.CloudFlow.length
val max : int * string

Full name: 4-cloud-parallel-data-flow.max
val maxBy : projection:('T -> 'Key) -> flow:CloudFlow<'T> -> MBrace.Core.Cloud<'T> (requires comparison)

Full name: MBrace.Flow.CloudFlow.maxBy
val fst : tuple:('T1 * 'T2) -> 'T1

Full name: Microsoft.FSharp.Core.Operators.fst
Fork me on GitHub