Introduction to Data Parallel Cloud Flows
This tutorial is from the MBrace Starter Kit.
You now learn the CloudFlow programming model, for cloud-scheduled parallel data flow tasks. This model is similar to Hadoop and Spark.
CloudFlow.ofArray partitions the input array based on the number of available workers. The parts of the array are then fed into cloud tasks implementing the map and filter stages. The final 'countBy' stage is implemented by a final cloud task.
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: |
|
Next, check the progress of your job.
Note: the number of cloud tasks involved, which should be the number of workers * 2. This indicates the input array has been partitioned and the work carried out in a distributed way.
1:
|
|
Next, await the result
1:
|
|
Data parallel cloud flows can be used for all sorts of things. Later, you will see how to source the inputs to the data flow from a collection of cloud files, or from a partitioned cloud vector.
Changing the degree of parallelism
The default is to partition the input array between all available workers.
You can also use CloudFlow.withDegreeOfParallelism to specify the degree of partitioning of the stream at any point in the pipeline.
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: |
|
Persisting intermediate results to cloud storage
Results of a flow computation can be persisted to store by terminating with a call to CloudFlow.persist/persistaCached. This creates a PersistedCloudFlow instance that can be reused without performing recomputations of the original flow.
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: |
|
Summary
In this tutorial, you've learned the basics of the CloudFlow programming model, a powerful data-flow model for scalable pipelines of data. Continue with further samples to learn more about the MBrace programming model.
Note, you can use the above techniques from both scripts and compiled projects. To see the components referenced by this script, see ThespianCluster.fsx or AzureCluster.fsx.
Full name: 4-cloud-parallel-data-flow.cluster
Full name: Config.GetCluster
Gets or creates a new Thespian cluster session.
Full name: 4-cloud-parallel-data-flow.inputs
Full name: 4-cloud-parallel-data-flow.streamComputationTask
module CloudFlow
from MBrace.Flow
--------------------
type CloudFlow =
static member OfArray : source:'T [] -> CloudFlow<'T>
static member OfCloudArrays : cloudArrays:seq<#CloudArray<'T>> -> LocalCloud<PersistedCloudFlow<'T>>
static member OfCloudCollection : collection:ICloudCollection<'T> * ?sizeThresholdPerWorker:(unit -> int64) -> CloudFlow<'T>
static member OfCloudDirectory : dirPath:string * serializer:ISerializer * ?sizeThresholdPerCore:int64 -> CloudFlow<'T>
static member OfCloudDirectory : dirPath:string * ?deserializer:(Stream -> seq<'T>) * ?sizeThresholdPerCore:int64 -> CloudFlow<'T>
static member OfCloudDirectory : dirPath:string * deserializer:(TextReader -> seq<'T>) * ?encoding:Encoding * ?sizeThresholdPerCore:int64 -> CloudFlow<'T>
static member OfCloudDirectoryByLine : dirPath:string * ?encoding:Encoding * ?sizeThresholdPerCore:int64 -> CloudFlow<string>
static member OfCloudFileByLine : path:string * ?encoding:Encoding -> CloudFlow<string>
static member OfCloudFileByLine : paths:seq<string> * ?encoding:Encoding * ?sizeThresholdPerCore:int64 -> CloudFlow<string>
static member OfCloudFiles : paths:seq<string> * serializer:ISerializer * ?sizeThresholdPerCore:int64 -> CloudFlow<'T>
...
Full name: MBrace.Flow.CloudFlow
--------------------
type CloudFlow<'T> =
interface
abstract member WithEvaluators : collectorFactory:LocalCloud<Collector<'T,'S>> -> projection:('S -> LocalCloud<'R>) -> combiner:('R [] -> LocalCloud<'R>) -> Cloud<'R>
abstract member DegreeOfParallelism : int option
end
Full name: MBrace.Flow.CloudFlow<_>
Full name: MBrace.Flow.CloudFlow.map
Full name: MBrace.Flow.CloudFlow.countBy
Full name: Microsoft.FSharp.Core.Operators.id
Full name: MBrace.Flow.CloudFlow.toArray
Full name: 4-cloud-parallel-data-flow.numbers
Full name: 4-cloud-parallel-data-flow.computePrimesTask
Full name: MBrace.Flow.CloudFlow.withDegreeOfParallelism
Full name: Sieve.getPrimes
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.sprintf
Full name: 4-cloud-parallel-data-flow.computePrimes
Full name: 4-cloud-parallel-data-flow.persistedCloudFlow
Full name: MBrace.Flow.CloudFlow.collect
val seq : sequence:seq<'T> -> seq<'T>
Full name: Microsoft.FSharp.Core.Operators.seq
--------------------
type seq<'T> = Collections.Generic.IEnumerable<'T>
Full name: Microsoft.FSharp.Collections.seq<_>
val string : value:'T -> string
Full name: Microsoft.FSharp.Core.Operators.string
--------------------
type string = String
Full name: Microsoft.FSharp.Core.string
Full name: MBrace.Flow.CloudFlow.persist
Full name: 4-cloud-parallel-data-flow.length
Full name: MBrace.Flow.CloudFlow.length
Full name: 4-cloud-parallel-data-flow.max
Full name: MBrace.Flow.CloudFlow.maxBy
Full name: Microsoft.FSharp.Core.Operators.fst