Example: Running an iterative algorithm at scale with incremental notifications
This example is from the MBrace Starter Kit.
This example shows how to implement the iterative algorithm k-Means, which finds centroids of clusters for points.
It shows some important techniques
- How to partition data and keep affinity of workers to data
- How to emit partial results to an intermediate queue
- How to observe that queue using incremental charting
First define some parameters for the input set we want to classify:
1: 2: 3: 4: 5: |
|
Generate some random input data, a deterministic set of points based on the parameters above.
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: 17: 18: |
|
Next you display a chart showing the first 500 points from each partition:
1: 2: 3: 4: 5: 6: 7: 8: |
|
Giving
Now you define a set of helper functions and types related to points and finding centroids:
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: 17: 18: 19: 20: 21: 22: 23: 24: 25: 26: 27: 28: 29: 30: 31: 32: 33: 34: 35: 36: 37: 38: 39: 40: 41: 42: 43: 44: |
|
This is the iterative computation. Computes the new centroids based on classifying each point to an existing centroid.
Then computes new centroids based on that classification. emit
is used to emit observations of
intermediate states to a queue or some other sink.
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: 17: 18: 19: 20: 21: 22: 23: 24: 25: 26: 27: 28: 29: 30: 31: 32: 33: 34: 35: 36: 37: |
|
The main cloud routine. Partitions the points according to the available workers, then iterates.
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: 17: 18: 19: 20: 21: 22: |
|
Running a test flight of the algorithm
You can now run a test flight of the algorithm with a drastically increased epsilon value to allow for more rapid convergence:
1: 2: 3: |
|
Take a look at progress
1: 2: 3: 4: |
|
Get the result:
1:
|
|
Now chart a selection of the original points and the overall result
1: 2: 3: |
|
Giving
Observing intermediate states of the algorithm
Frequently when running iterative algorithms or long running processes you will need to emit information for visualization and inspection of the progress of the algorithm.
To do this, you create a queue to observe the partial output results from the iterations.
1: 2: 3: |
|
Next, you start the task, emitting observations to the queue:
1: 2: 3: |
|
Take a look at progress
1: 2: 3: 4: |
|
Next, you chart the intermediate results as they arrive as an incrementally updating chart:
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: |
|
This produces the following incrementally:
Now wait for the overall result:
1:
|
|
Now chart the original points, the centroids we computed on the first flight, and the final centroids.
1: 2: 3: 4: |
|
Giving
In this example, you've learned how to run an iterative algorithm on an MBrace cluster, including how to emit and observe intermediate states from the iterations. Continue with further samples to learn more about the MBrace programming model.
Note, you can use the above techniques from both scripts and compiled projects. To see the components referenced by this script, see ThespianCluster.fsx or AzureCluster.fsx.
namespace FSharp
--------------------
namespace Microsoft.FSharp
Full name: 200-kmeans-clustering-example.cluster
Full name: Config.GetCluster
Gets or creates a new Thespian cluster session.
Full name: 200-kmeans-clustering-example.dim
Full name: 200-kmeans-clustering-example.numCentroids
Full name: 200-kmeans-clustering-example.partitions
Full name: 200-kmeans-clustering-example.pointsPerPartition
Full name: 200-kmeans-clustering-example.epsilon
Full name: 200-kmeans-clustering-example.Point
Represents a multi-dimensional point.
val float : value:'T -> float (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.float
--------------------
type float = Double
Full name: Microsoft.FSharp.Core.float
--------------------
type float<'Measure> = float
Full name: Microsoft.FSharp.Core.float<_>
Full name: 200-kmeans-clustering-example.generatePoints
Generates a set of points via a random walk from the origin, using provided seed.
type Random =
new : unit -> Random + 1 overload
member Next : unit -> int + 2 overloads
member NextBytes : buffer:byte[] -> unit
member NextDouble : unit -> float
Full name: System.Random
--------------------
Random() : unit
Random(Seed: int) : unit
member Clone : unit -> obj
member CopyTo : array:Array * index:int -> unit + 1 overload
member GetEnumerator : unit -> IEnumerator
member GetLength : dimension:int -> int
member GetLongLength : dimension:int -> int64
member GetLowerBound : dimension:int -> int
member GetUpperBound : dimension:int -> int
member GetValue : [<ParamArray>] indices:int[] -> obj + 7 overloads
member Initialize : unit -> unit
member IsFixedSize : bool
...
Full name: System.Array
Full name: Microsoft.FSharp.Collections.Array.zeroCreate
Full name: 200-kmeans-clustering-example.randPoints
Full name: Microsoft.FSharp.Collections.Array.init
Full name: 200-kmeans-clustering-example.point2d
Full name: 200-kmeans-clustering-example.selectionOfPoints
static member Area : data:seq<#value> * ?Name:string * ?Title:string * ?Labels:#seq<string> * ?Color:Color * ?XTitle:string * ?YTitle:string -> GenericChart
static member Area : data:seq<#key * #value> * ?Name:string * ?Title:string * ?Labels:#seq<string> * ?Color:Color * ?XTitle:string * ?YTitle:string -> GenericChart
static member Bar : data:seq<#value> * ?Name:string * ?Title:string * ?Labels:#seq<string> * ?Color:Color * ?XTitle:string * ?YTitle:string -> GenericChart
static member Bar : data:seq<#key * #value> * ?Name:string * ?Title:string * ?Labels:#seq<string> * ?Color:Color * ?XTitle:string * ?YTitle:string -> GenericChart
static member BoxPlotFromData : data:seq<#key * #seq<'a2>> * ?Name:string * ?Title:string * ?Color:Color * ?XTitle:string * ?YTitle:string * ?Percentile:int * ?ShowAverage:bool * ?ShowMedian:bool * ?ShowUnusualValues:bool * ?WhiskerPercentile:int -> GenericChart (requires 'a2 :> value)
static member BoxPlotFromStatistics : data:seq<#key * #value * #value * #value * #value * #value * #value> * ?Name:string * ?Title:string * ?Labels:#seq<string> * ?Color:Color * ?XTitle:string * ?YTitle:string * ?Percentile:int * ?ShowAverage:bool * ?ShowMedian:bool * ?ShowUnusualValues:bool * ?WhiskerPercentile:int -> GenericChart
static member Bubble : data:seq<#value * #value> * ?Name:string * ?Title:string * ?Labels:#seq<string> * ?Color:Color * ?XTitle:string * ?YTitle:string * ?BubbleMaxSize:int * ?BubbleMinSize:int * ?BubbleScaleMax:float * ?BubbleScaleMin:float * ?UseSizeForLabel:bool -> GenericChart
static member Bubble : data:seq<#key * #value * #value> * ?Name:string * ?Title:string * ?Labels:#seq<string> * ?Color:Color * ?XTitle:string * ?YTitle:string * ?BubbleMaxSize:int * ?BubbleMinSize:int * ?BubbleScaleMax:float * ?BubbleScaleMin:float * ?UseSizeForLabel:bool -> GenericChart
static member Candlestick : data:seq<#value * #value * #value * #value> * ?Name:string * ?Title:string * ?Labels:#seq<string> * ?Color:Color * ?XTitle:string * ?YTitle:string -> CandlestickChart
static member Candlestick : data:seq<#key * #value * #value * #value * #value> * ?Name:string * ?Title:string * ?Labels:#seq<string> * ?Color:Color * ?XTitle:string * ?YTitle:string -> CandlestickChart
...
Full name: FSharp.Charting.Chart
static member Chart.Point : data:seq<#key * #value> * ?Name:string * ?Title:string * ?Labels:#seq<string> * ?Color:Drawing.Color * ?XTitle:string * ?YTitle:string * ?MarkerColor:Drawing.Color * ?MarkerSize:int -> ChartTypes.GenericChart
type AutoOpenAttribute =
inherit Attribute
new : unit -> AutoOpenAttribute
new : path:string -> AutoOpenAttribute
member Path : string
Full name: Microsoft.FSharp.Core.AutoOpenAttribute
--------------------
new : unit -> AutoOpenAttribute
new : path:string -> AutoOpenAttribute
Full name: 200-kmeans-clustering-example.KMeansHelpers.dist
Calculates the distance between two points.
Full name: Microsoft.FSharp.Collections.Array.fold2
Full name: Microsoft.FSharp.Core.Operators.pown
Full name: 200-kmeans-clustering-example.KMeansHelpers.findCentroid
Assigns a point to the correct centroid, and returns the index of that centroid.
val int : value:'T -> int (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.int
--------------------
type int = int32
Full name: Microsoft.FSharp.Core.int
--------------------
type int<'Measure> = int
Full name: Microsoft.FSharp.Core.int<_>
struct
member CompareTo : value:obj -> int + 1 overload
member Equals : obj:obj -> bool + 1 overload
member GetHashCode : unit -> int
member GetTypeCode : unit -> TypeCode
member ToString : unit -> string + 3 overloads
static val MinValue : float
static val MaxValue : float
static val Epsilon : float
static val NegativeInfinity : float
static val PositiveInfinity : float
...
end
Full name: System.Double
Full name: 200-kmeans-clustering-example.KMeansHelpers.kmeansLocal
Given a set of points, calculates the number of points assigned to each centroid.
Full name: 200-kmeans-clustering-example.KMeansHelpers.sumPoints
Sums a collection of points
Full name: 200-kmeans-clustering-example.KMeansHelpers.divPoint
Scalar division of a point
Full name: Microsoft.FSharp.Collections.Array.map
Full name: 200-kmeans-clustering-example.KMeansCloudIterate
Full name: Microsoft.FSharp.Collections.Array.concat
module ParStream
from Nessos.Streams
--------------------
type ParStream<'T> =
private {Impl: ParStreamImpl<'T>;}
member Apply : collector:ParCollector<'T,'R> -> 'R
member private Stream : unit -> Stream<'T>
member DegreeOfParallelism : int
member private PreserveOrdering : bool
member private SourceType : SourceType
Full name: Nessos.Streams.ParStream<_>
Full name: Nessos.Streams.ParStream.ofArray
Full name: Nessos.Streams.ParStream.groupBy
Full name: Microsoft.FSharp.Core.Operators.fst
Full name: Nessos.Streams.ParStream.sortBy
Full name: Nessos.Streams.ParStream.map
Full name: Microsoft.FSharp.Core.Operators.snd
from Microsoft.FSharp.Collections
Full name: Microsoft.FSharp.Collections.Seq.map
Full name: Microsoft.FSharp.Collections.Seq.toArray
Full name: Microsoft.FSharp.Collections.Array.unzip
Full name: Microsoft.FSharp.Collections.Array.sum
Full name: Nessos.Streams.ParStream.toArray
Full name: Microsoft.FSharp.Collections.Array.map2
Full name: Microsoft.FSharp.Collections.Array.max
type DateTimeOffset =
struct
new : dateTime:DateTime -> DateTimeOffset + 5 overloads
member Add : timeSpan:TimeSpan -> DateTimeOffset
member AddDays : days:float -> DateTimeOffset
member AddHours : hours:float -> DateTimeOffset
member AddMilliseconds : milliseconds:float -> DateTimeOffset
member AddMinutes : minutes:float -> DateTimeOffset
member AddMonths : months:int -> DateTimeOffset
member AddSeconds : seconds:float -> DateTimeOffset
member AddTicks : ticks:int64 -> DateTimeOffset
member AddYears : years:int -> DateTimeOffset
...
end
Full name: System.DateTimeOffset
--------------------
DateTimeOffset()
DateTimeOffset(dateTime: DateTime) : unit
DateTimeOffset(ticks: int64, offset: TimeSpan) : unit
DateTimeOffset(dateTime: DateTime, offset: TimeSpan) : unit
DateTimeOffset(year: int, month: int, day: int, hour: int, minute: int, second: int, offset: TimeSpan) : unit
DateTimeOffset(year: int, month: int, day: int, hour: int, minute: int, second: int, millisecond: int, offset: TimeSpan) : unit
DateTimeOffset(year: int, month: int, day: int, hour: int, minute: int, second: int, millisecond: int, calendar: Globalization.Calendar, offset: TimeSpan) : unit
Full name: 200-kmeans-clustering-example.KMeansCloud
Full name: Microsoft.FSharp.Collections.Seq.concat
Full name: Microsoft.FSharp.Collections.Seq.take
Full name: Microsoft.FSharp.Collections.Seq.mapi
Full name: 200-kmeans-clustering-example.kmeansTask
Full name: Microsoft.FSharp.Core.Operators.ignore
Full name: 200-kmeans-clustering-example.centroids
struct
member A : byte
member B : byte
member Equals : obj:obj -> bool
member G : byte
member GetBrightness : unit -> float32
member GetHashCode : unit -> int
member GetHue : unit -> float32
member GetSaturation : unit -> float32
member IsEmpty : bool
member IsKnownColor : bool
...
end
Full name: System.Drawing.Color
Full name: 200-kmeans-clustering-example.Observation
Full name: 200-kmeans-clustering-example.watchQueue
Full name: 200-kmeans-clustering-example.kmeansTask2
namespace FSharp.Control
--------------------
namespace Microsoft.FSharp.Control
Full name: FSharp.Control.AsyncSeqExtensions.asyncSeq
Full name: Microsoft.FSharp.Collections.ResizeArray<_>
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task -> Async<unit>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken
Full name: Microsoft.FSharp.Control.Async
--------------------
type Async<'T>
Full name: Microsoft.FSharp.Control.Async<_>
module AsyncSeq
from FSharp.Control
--------------------
type AsyncSeq<'T> = IAsyncEnumerable<'T>
Full name: FSharp.Control.AsyncSeq<_>
Full name: FSharp.Control.AsyncSeq.toObservable
static member Area : data:IObservable<#seq<'a1 * 'a2>> * ?Name:string * ?Title:string * ?Color:Color * ?XTitle:string * ?YTitle:string -> GenericChart (requires 'a1 :> key and 'a2 :> value)
static member Bar : data:IObservable<#seq<'a1 * 'a2>> * ?Name:string * ?Title:string * ?Color:Color * ?XTitle:string * ?YTitle:string -> GenericChart (requires 'a1 :> key and 'a2 :> value)
static member Bubble : data:IObservable<#seq<'a1 * 'a2 * 'a3>> * ?Name:string * ?Title:string * ?Color:Color * ?XTitle:string * ?YTitle:string * ?BubbleMaxSize:int * ?BubbleMinSize:int * ?BubbleScaleMax:float * ?BubbleScaleMin:float * ?UseSizeForLabel:bool -> GenericChart (requires 'a1 :> key and 'a2 :> value and 'a3 :> value)
static member BubbleIncremental : data:IObservable<#key * #value * #value> * ?Name:string * ?Title:string * ?Color:Color * ?XTitle:string * ?YTitle:string * ?BubbleMaxSize:int * ?BubbleMinSize:int * ?BubbleScaleMax:float * ?BubbleScaleMin:float * ?UseSizeForLabel:bool -> GenericChart
static member Candlestick : data:IObservable<#seq<'a1 * 'a2 * 'a3 * 'a4 * 'a5>> * ?Name:string * ?Title:string * ?Color:Color * ?XTitle:string * ?YTitle:string -> CandlestickChart (requires 'a1 :> key and 'a2 :> value and 'a3 :> value and 'a4 :> value and 'a5 :> value)
static member Candlestick : data:IObservable<seq<#value * #value * #value * #value>> * ?Name:string * ?Title:string * ?Color:Color * ?XTitle:string * ?YTitle:string -> CandlestickChart
static member CandlestickIncremental : data:IObservable<#key * #value * #value * #value * #value> * ?Name:string * ?Title:string * ?Color:Color * ?XTitle:string * ?YTitle:string -> CandlestickChart
static member CandlestickIncremental : data:IObservable<'a0 * #value * #value * #value> * ?Name:string * ?Title:string * ?Color:Color * ?XTitle:string * ?YTitle:string -> CandlestickChart (requires 'a0 :> key and 'a0 :> value)
static member Column : data:IObservable<#seq<'a1 * 'a2>> * ?Name:string * ?Title:string * ?Color:Color * ?XTitle:string * ?YTitle:string * ?ColumnWidth:float -> GenericChart (requires 'a1 :> key and 'a2 :> value)
static member ColumnIncremental : data:IObservable<#key * #value> * ?Name:string * ?Title:string * ?Color:Color * ?XTitle:string * ?YTitle:string * ?ColumnWidth:float -> GenericChart
...
Full name: FSharp.Charting.LiveChart
Full name: 200-kmeans-clustering-example.centroids2