MBrace.Core and MBrace.Azure


KNN Digit Recognizer

This example is from the MBrace Starter Kit.

This example shows a digit recognizer classification using k nearest neighbours based on the Kaggle dataset. https://www.kaggle.com/c/digit-recognizer

First, define the types and constants relevant to images:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
[<Literal>]
let pixelLength = 784 // 28 * 28

/// Image identifier
type ImageId = int

/// Image bitmap representation
type Image = 
    { Id : ImageId 
      Pixels : int [] }

    /// Parses a set of points from text using the Kaggle digit recognizer CSV format
    static member Parse file=
        File.ReadAllLines file
        |> Stream.ofSeq
        |> Stream.skip 1
        |> Stream.map (fun line -> line.Split(','))
        |> Stream.map (fun line -> line |> Array.map int)
        |> Stream.mapi (fun i nums -> let id = i + 1 in { Id = id ; Pixels = nums })
        |> Stream.toArray

Next, define the types relevant to classification of images:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
/// Digit classification
type Classification = int

/// Distance on points; use uint64 to avoid overflows
type Distance = Image -> Image -> uint64

/// A training image annotaded by its classification
type TrainingImage = 
    { Classification : Classification 
      Image : Image }

    /// Parses a training set from text using the Kaggle digit recognizer CSV format
    static member Parse file =
        File.ReadAllLines file
        |> Stream.ofSeq
        |> Stream.skip 1
        |> Stream.map (fun line -> line.Split(','))
        |> Stream.map (fun line -> line |> Array.map int)
        |> Stream.mapi (fun i nums -> 
                            let id = i + 1
                            let image = { Id = id ; Pixels = nums.[1..] }
                            { Classification = nums.[0] ; Image = image })
        |> Stream.toArray

/// Digit classifier 
type Classifier = TrainingImage [] -> Image -> Classification

Next, implement a range of image classifiers:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
/// Digit classifiers
type Classifications =

    /// Writes a point classification to file
    static member Write(outFile : string, classifications : (ImageId * Classification) []) =
        let fs = File.OpenWrite(outFile)
        use sw = new StreamWriter(fs) 
        sw.WriteLine "ImageId,Label"
        classifications |> Array.iter (fun (i,c) -> sw.WriteLine(sprintf "%d,%d" i c))

/// l^2 distance 
let l2 : Distance =
    fun x y ->
        let xp = x.Pixels
        let yp = y.Pixels
        let mutable acc = 0uL
        for i = 0 to pixelLength - 1 do
            acc <- acc + uint64 (pown (xp.[i] - yp.[i]) 2)
        acc

/// single-threaded, stream-based k-nearest neighbour classifier
let knn (d : Distance) (k : int) : Classifier =
    fun (training : TrainingImage []) (img : Image) ->
        training
        |> Stream.ofArray
        |> Stream.sortBy (fun ex -> d ex.Image img)
        |> Stream.take k
        |> Stream.map (fun ex -> ex.Classification)
        |> Stream.countBy id
        |> Stream.maxBy snd
        |> fst

Next, implement local multicore classification and validation:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
/// local multicore classification
let classifyLocalMulticore (classifier : Classifier) (training : TrainingImage []) (images : Image []) =
    ParStream.ofArray images
    |> ParStream.map (fun img -> img.Id, classifier training img)
    |> ParStream.toArray

/// local multicore validation
let validateLocalMulticore (classifier : Classifier) (training : TrainingImage []) (validation : TrainingImage []) =
    ParStream.ofArray validation
    |> ParStream.map(fun tr -> tr.Classification, classifier training tr.Image)
    |> ParStream.map(fun (expected,prediction) -> if expected = prediction then 1. else 0.)
    |> ParStream.sum
    |> fun results -> results / float validation.Length


//// Performance (3.5Ghz Quad Core i7 CPU)
//// Real: 00:01:02.281, CPU: 00:07:51.481, GC gen0: 179, gen1: 82, gen2: 62
//validateLocalMulticore classifier training.[ .. 39999] training.[40000 ..]
//
//// Performance (3.5Ghz Quad Core i7 CPU)
//// Real: 00:15:30.855, CPU: 01:56:59.842, GC gen0: 2960, gen1: 2339, gen2: 1513
//classifyLocalMulticore classifier training tests

Next, implement the distributed, cloud versions of the same algorithms, to classify and validate the images using an MBrace cluster:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
/// Clasify test images using MBrace
let classifyCloud (classifier : Classifier) (training : TrainingImage []) (images : Image []) =
    CloudFlow.OfArray images
    |> CloudFlow.map (fun img -> img.Id, classifier training img)
    |> CloudFlow.toArray

/// Validate training images using MBrace
let validateCloud (classifier : Classifier) (training : TrainingImage []) (validation : TrainingImage []) = cloud {
    let! successCount =
        CloudFlow.OfArray validation
        |> CloudFlow.filter (fun tI -> classifier training tI.Image = tI.Classification)
        |> CloudFlow.length

    return float successCount / float validation.Length
}

Now, acquire the samples:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
////////////////////
// Run the examples

let trainPath = __SOURCE_DIRECTORY__ + "/../../data/train.csv"
let testPath = __SOURCE_DIRECTORY__ + "/../../data/test.csv"

// parse data
let training = TrainingImage.Parse trainPath
let tests = Image.Parse testPath

let classifier = knn l2 10

#time

Run the validation operation in the cluster:

1: 
let validateTask = cloud { return! validateCloud classifier training.[0 .. 39999] training.[40000 ..] } |> cluster.CreateProcess

Check on its progress:

1: 
2: 
cluster.ShowWorkers()
validateTask.ShowInfo()

Run the classification operation in the cluster:

1: 
let classifyTask = cloud { return! classifyCloud classifier training tests } |> cluster.CreateProcess

Check on its progress:

1: 
2: 
cluster.ShowWorkers()
classifyTask.ShowInfo()

Get the results:

1: 
2: 
let validateResult = validateTask.Result
let classifyResult = classifyTask.Result

In this example, you've learned how perform a machine learning classification task on an MBrace cluster. Continue with further samples to learn more about the MBrace programming model.

Note, you can use the above techniques from both scripts and compiled projects. To see the components referenced by this script, see ThespianCluster.fsx or AzureCluster.fsx.

namespace System
namespace System.Numerics
namespace System.IO
namespace System.Text
namespace Nessos
namespace Nessos.Streams
namespace MBrace
namespace MBrace.Core
namespace MBrace.Flow
val cluster : MBrace.Thespian.ThespianCluster

Full name: 200-knn-digit-recognizer-example.cluster
module Config
val GetCluster : unit -> MBrace.Thespian.ThespianCluster

Full name: Config.GetCluster


 Gets or creates a new Thespian cluster session.
Multiple items
type LiteralAttribute =
  inherit Attribute
  new : unit -> LiteralAttribute

Full name: Microsoft.FSharp.Core.LiteralAttribute

--------------------
new : unit -> LiteralAttribute
val pixelLength : int

Full name: 200-knn-digit-recognizer-example.pixelLength
type ImageId = int

Full name: 200-knn-digit-recognizer-example.ImageId


 Image identifier
Multiple items
val int : value:'T -> int (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.int

--------------------
type int = int32

Full name: Microsoft.FSharp.Core.int

--------------------
type int<'Measure> = int

Full name: Microsoft.FSharp.Core.int<_>
type Image =
  {Id: ImageId;
   Pixels: int [];}
  static member Parse : file:string -> Image []

Full name: 200-knn-digit-recognizer-example.Image


 Image bitmap representation
Image.Id: ImageId
Image.Pixels: int []
static member Image.Parse : file:string -> Image []

Full name: 200-knn-digit-recognizer-example.Image.Parse


 Parses a set of points from text using the Kaggle digit recognizer CSV format
val file : string
type File =
  static member AppendAllLines : path:string * contents:IEnumerable<string> -> unit + 1 overload
  static member AppendAllText : path:string * contents:string -> unit + 1 overload
  static member AppendText : path:string -> StreamWriter
  static member Copy : sourceFileName:string * destFileName:string -> unit + 1 overload
  static member Create : path:string -> FileStream + 3 overloads
  static member CreateText : path:string -> StreamWriter
  static member Decrypt : path:string -> unit
  static member Delete : path:string -> unit
  static member Encrypt : path:string -> unit
  static member Exists : path:string -> bool
  ...

Full name: System.IO.File
File.ReadAllLines(path: string) : string []
File.ReadAllLines(path: string, encoding: Encoding) : string []
Multiple items
type Stream =
  inherit MarshalByRefObject
  member BeginRead : buffer:byte[] * offset:int * count:int * callback:AsyncCallback * state:obj -> IAsyncResult
  member BeginWrite : buffer:byte[] * offset:int * count:int * callback:AsyncCallback * state:obj -> IAsyncResult
  member CanRead : bool
  member CanSeek : bool
  member CanTimeout : bool
  member CanWrite : bool
  member Close : unit -> unit
  member CopyTo : destination:Stream -> unit + 1 overload
  member Dispose : unit -> unit
  member EndRead : asyncResult:IAsyncResult -> int
  ...

Full name: System.IO.Stream

--------------------
type Stream<'T> =
  private {Run: Context<'T> -> Iterable;}
  member private RunBulk : ctxt:Context<'T> -> unit
  override ToString : unit -> string

Full name: Nessos.Streams.Stream<_>
val ofSeq : source:seq<'T> -> Stream<'T>

Full name: Nessos.Streams.Stream.ofSeq
val skip : n:int -> stream:Stream<'T> -> Stream<'T>

Full name: Nessos.Streams.Stream.skip
val map : f:('T -> 'R) -> stream:Stream<'T> -> Stream<'R>

Full name: Nessos.Streams.Stream.map
val line : string
String.Split([<ParamArray>] separator: char []) : string []
String.Split(separator: string [], options: StringSplitOptions) : string []
String.Split(separator: char [], options: StringSplitOptions) : string []
String.Split(separator: char [], count: int) : string []
String.Split(separator: string [], count: int, options: StringSplitOptions) : string []
String.Split(separator: char [], count: int, options: StringSplitOptions) : string []
val line : string []
type Array =
  member Clone : unit -> obj
  member CopyTo : array:Array * index:int -> unit + 1 overload
  member GetEnumerator : unit -> IEnumerator
  member GetLength : dimension:int -> int
  member GetLongLength : dimension:int -> int64
  member GetLowerBound : dimension:int -> int
  member GetUpperBound : dimension:int -> int
  member GetValue : [<ParamArray>] indices:int[] -> obj + 7 overloads
  member Initialize : unit -> unit
  member IsFixedSize : bool
  ...

Full name: System.Array
val map : mapping:('T -> 'U) -> array:'T [] -> 'U []

Full name: Microsoft.FSharp.Collections.Array.map
val mapi : f:(int -> 'T -> 'R) -> stream:Stream<'T> -> Stream<'R>

Full name: Nessos.Streams.Stream.mapi
val i : int
val nums : int []
val id : int
val toArray : stream:Stream<'T> -> 'T []

Full name: Nessos.Streams.Stream.toArray
type Classification = int

Full name: 200-knn-digit-recognizer-example.Classification


 Digit classification
type Distance = Image -> Image -> uint64

Full name: 200-knn-digit-recognizer-example.Distance


 Distance on points; use uint64 to avoid overflows
Multiple items
val uint64 : value:'T -> uint64 (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.uint64

--------------------
type uint64 = UInt64

Full name: Microsoft.FSharp.Core.uint64
type TrainingImage =
  {Classification: Classification;
   Image: Image;}
  static member Parse : file:string -> TrainingImage []

Full name: 200-knn-digit-recognizer-example.TrainingImage


 A training image annotaded by its classification
Multiple items
TrainingImage.Classification: Classification

--------------------
type Classification = int

Full name: 200-knn-digit-recognizer-example.Classification


 Digit classification
Multiple items
TrainingImage.Image: Image

--------------------
type Image =
  {Id: ImageId;
   Pixels: int [];}
  static member Parse : file:string -> Image []

Full name: 200-knn-digit-recognizer-example.Image


 Image bitmap representation
static member TrainingImage.Parse : file:string -> TrainingImage []

Full name: 200-knn-digit-recognizer-example.TrainingImage.Parse


 Parses a training set from text using the Kaggle digit recognizer CSV format
val image : Image
type Classifier = TrainingImage [] -> Image -> Classification

Full name: 200-knn-digit-recognizer-example.Classifier


 Digit classifier
type Classifications =
  static member Write : outFile:string * classifications:(ImageId * Classification) [] -> unit

Full name: 200-knn-digit-recognizer-example.Classifications


 Digit classifiers
static member Classifications.Write : outFile:string * classifications:(ImageId * Classification) [] -> unit

Full name: 200-knn-digit-recognizer-example.Classifications.Write


 Writes a point classification to file
val outFile : string
Multiple items
val string : value:'T -> string

Full name: Microsoft.FSharp.Core.Operators.string

--------------------
type string = String

Full name: Microsoft.FSharp.Core.string
val classifications : (ImageId * Classification) []
val fs : FileStream
File.OpenWrite(path: string) : FileStream
val sw : StreamWriter
Multiple items
type StreamWriter =
  inherit TextWriter
  new : stream:Stream -> StreamWriter + 6 overloads
  member AutoFlush : bool with get, set
  member BaseStream : Stream
  member Close : unit -> unit
  member Encoding : Encoding
  member Flush : unit -> unit
  member Write : value:char -> unit + 3 overloads
  static val Null : StreamWriter

Full name: System.IO.StreamWriter

--------------------
StreamWriter(stream: Stream) : unit
StreamWriter(path: string) : unit
StreamWriter(stream: Stream, encoding: Encoding) : unit
StreamWriter(path: string, append: bool) : unit
StreamWriter(stream: Stream, encoding: Encoding, bufferSize: int) : unit
StreamWriter(path: string, append: bool, encoding: Encoding) : unit
StreamWriter(path: string, append: bool, encoding: Encoding, bufferSize: int) : unit
TextWriter.WriteLine() : unit
   (+0 other overloads)
TextWriter.WriteLine(value: obj) : unit
   (+0 other overloads)
TextWriter.WriteLine(value: string) : unit
   (+0 other overloads)
TextWriter.WriteLine(value: decimal) : unit
   (+0 other overloads)
TextWriter.WriteLine(value: float) : unit
   (+0 other overloads)
TextWriter.WriteLine(value: float32) : unit
   (+0 other overloads)
TextWriter.WriteLine(value: uint64) : unit
   (+0 other overloads)
TextWriter.WriteLine(value: int64) : unit
   (+0 other overloads)
TextWriter.WriteLine(value: uint32) : unit
   (+0 other overloads)
TextWriter.WriteLine(value: int) : unit
   (+0 other overloads)
val iter : action:('T -> unit) -> array:'T [] -> unit

Full name: Microsoft.FSharp.Collections.Array.iter
val i : ImageId
val c : Classification
val sprintf : format:Printf.StringFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.sprintf
val l2 : x:Image -> y:Image -> uint64

Full name: 200-knn-digit-recognizer-example.l2


 l^2 distance
val x : Image
val y : Image
val xp : int []
val yp : int []
val mutable acc : uint64
val pown : x:'T -> n:int -> 'T (requires member get_One and member ( * ) and member ( / ))

Full name: Microsoft.FSharp.Core.Operators.pown
val knn : d:Distance -> k:int -> training:TrainingImage [] -> img:Image -> Classification

Full name: 200-knn-digit-recognizer-example.knn


 single-threaded, stream-based k-nearest neighbour classifier
val d : Distance
val k : int
val training : TrainingImage []
val img : Image
val ofArray : source:'T [] -> Stream<'T>

Full name: Nessos.Streams.Stream.ofArray
val sortBy : projection:('T -> 'Key) -> stream:Stream<'T> -> Stream<'T> (requires comparison)

Full name: Nessos.Streams.Stream.sortBy
val ex : TrainingImage
TrainingImage.Image: Image
val take : n:int -> stream:Stream<'T> -> Stream<'T>

Full name: Nessos.Streams.Stream.take
TrainingImage.Classification: Classification
val countBy : project:('T -> 'Key) -> stream:Stream<'T> -> Stream<'Key * int> (requires equality)

Full name: Nessos.Streams.Stream.countBy
val id : x:'T -> 'T

Full name: Microsoft.FSharp.Core.Operators.id
val maxBy : projection:('T -> 'Key) -> source:Stream<'T> -> 'T (requires comparison)

Full name: Nessos.Streams.Stream.maxBy
val snd : tuple:('T1 * 'T2) -> 'T2

Full name: Microsoft.FSharp.Core.Operators.snd
val fst : tuple:('T1 * 'T2) -> 'T1

Full name: Microsoft.FSharp.Core.Operators.fst
val classifyLocalMulticore : classifier:Classifier -> training:TrainingImage [] -> images:Image [] -> (ImageId * Classification) []

Full name: 200-knn-digit-recognizer-example.classifyLocalMulticore


 local multicore classification
val classifier : Classifier
val images : Image []
Multiple items
module ParStream

from Nessos.Streams

--------------------
type ParStream<'T> =
  private {Impl: ParStreamImpl<'T>;}
  member Apply : collector:ParCollector<'T,'R> -> 'R
  member private Stream : unit -> Stream<'T>
  member DegreeOfParallelism : int
  member private PreserveOrdering : bool
  member private SourceType : SourceType

Full name: Nessos.Streams.ParStream<_>
val ofArray : source:'T [] -> ParStream<'T>

Full name: Nessos.Streams.ParStream.ofArray
val map : f:('T -> 'R) -> stream:ParStream<'T> -> ParStream<'R>

Full name: Nessos.Streams.ParStream.map
val toArray : stream:ParStream<'T> -> 'T []

Full name: Nessos.Streams.ParStream.toArray
val validateLocalMulticore : classifier:Classifier -> training:TrainingImage [] -> validation:TrainingImage [] -> float

Full name: 200-knn-digit-recognizer-example.validateLocalMulticore


 local multicore validation
val validation : TrainingImage []
val tr : TrainingImage
val expected : Classification
val prediction : Classification
val sum : stream:ParStream<'T> -> 'T (requires member ( + ) and member get_Zero)

Full name: Nessos.Streams.ParStream.sum
val results : float
Multiple items
val float : value:'T -> float (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.float

--------------------
type float = Double

Full name: Microsoft.FSharp.Core.float

--------------------
type float<'Measure> = float

Full name: Microsoft.FSharp.Core.float<_>
property Array.Length: int
val classifyCloud : classifier:Classifier -> training:TrainingImage [] -> images:Image [] -> MBrace.Core.Cloud<(ImageId * Classification) []>

Full name: 200-knn-digit-recognizer-example.classifyCloud


 Clasify test images using MBrace
Multiple items
module CloudFlow

from MBrace.Flow

--------------------
type CloudFlow =
  static member OfArray : source:'T [] -> CloudFlow<'T>
  static member OfCloudArrays : cloudArrays:seq<#CloudArray<'T>> -> LocalCloud<PersistedCloudFlow<'T>>
  static member OfCloudCollection : collection:ICloudCollection<'T> * ?sizeThresholdPerWorker:(unit -> int64) -> CloudFlow<'T>
  static member OfCloudDirectory : dirPath:string * serializer:ISerializer * ?sizeThresholdPerCore:int64 -> CloudFlow<'T>
  static member OfCloudDirectory : dirPath:string * ?deserializer:(Stream -> seq<'T>) * ?sizeThresholdPerCore:int64 -> CloudFlow<'T>
  static member OfCloudDirectory : dirPath:string * deserializer:(TextReader -> seq<'T>) * ?encoding:Encoding * ?sizeThresholdPerCore:int64 -> CloudFlow<'T>
  static member OfCloudDirectoryByLine : dirPath:string * ?encoding:Encoding * ?sizeThresholdPerCore:int64 -> CloudFlow<string>
  static member OfCloudFileByLine : path:string * ?encoding:Encoding -> CloudFlow<string>
  static member OfCloudFileByLine : paths:seq<string> * ?encoding:Encoding * ?sizeThresholdPerCore:int64 -> CloudFlow<string>
  static member OfCloudFiles : paths:seq<string> * serializer:ISerializer * ?sizeThresholdPerCore:int64 -> CloudFlow<'T>
  ...

Full name: MBrace.Flow.CloudFlow

--------------------
type CloudFlow<'T> =
  interface
    abstract member WithEvaluators : collectorFactory:LocalCloud<Collector<'T,'S>> -> projection:('S -> LocalCloud<'R>) -> combiner:('R [] -> LocalCloud<'R>) -> Cloud<'R>
    abstract member DegreeOfParallelism : int option
  end

Full name: MBrace.Flow.CloudFlow<_>
static member CloudFlow.OfArray : source:'T [] -> CloudFlow<'T>
val map : f:('T -> 'R) -> flow:CloudFlow<'T> -> CloudFlow<'R>

Full name: MBrace.Flow.CloudFlow.map
val toArray : flow:CloudFlow<'T> -> MBrace.Core.Cloud<'T []>

Full name: MBrace.Flow.CloudFlow.toArray
val validateCloud : classifier:Classifier -> training:TrainingImage [] -> validation:TrainingImage [] -> 'a

Full name: 200-knn-digit-recognizer-example.validateCloud


 Validate training images using MBrace
val filter : predicate:('T -> bool) -> flow:CloudFlow<'T> -> CloudFlow<'T>

Full name: MBrace.Flow.CloudFlow.filter
val length : flow:CloudFlow<'T> -> MBrace.Core.Cloud<int64>

Full name: MBrace.Flow.CloudFlow.length
val trainPath : string

Full name: 200-knn-digit-recognizer-example.trainPath
val testPath : string

Full name: 200-knn-digit-recognizer-example.testPath
val training : TrainingImage []

Full name: 200-knn-digit-recognizer-example.training
static member TrainingImage.Parse : file:string -> TrainingImage []


 Parses a training set from text using the Kaggle digit recognizer CSV format
val tests : Image []

Full name: 200-knn-digit-recognizer-example.tests
static member Image.Parse : file:string -> Image []


 Parses a set of points from text using the Kaggle digit recognizer CSV format
val classifier : Classifier

Full name: 200-knn-digit-recognizer-example.classifier
val validateTask : MBrace.Runtime.CloudProcess<obj>

Full name: 200-knn-digit-recognizer-example.validateTask
member MBrace.Runtime.MBraceClient.CreateProcess : workflow:MBrace.Core.Cloud<'T> * ?cancellationToken:MBrace.Core.ICloudCancellationToken * ?faultPolicy:MBrace.Core.FaultPolicy * ?target:MBrace.Core.IWorkerRef * ?additionalResources:MBrace.Core.Internals.ResourceRegistry * ?taskName:string -> MBrace.Runtime.CloudProcess<'T>
member MBrace.Runtime.MBraceClient.ShowWorkers : unit -> unit
member MBrace.Runtime.CloudProcess.ShowInfo : unit -> unit
val classifyTask : MBrace.Runtime.CloudProcess<obj>

Full name: 200-knn-digit-recognizer-example.classifyTask
val validateResult : obj

Full name: 200-knn-digit-recognizer-example.validateResult
property MBrace.Runtime.CloudProcess.Result: obj
val classifyResult : obj

Full name: 200-knn-digit-recognizer-example.classifyResult
Fork me on GitHub