MBrace.Core and MBrace.Azure


Example: Training in the Cloud

This example demonstrates Norvig's Spelling Corrector (http://norvig.com/spell-correct.html). It is a prototypical workflow for training and learning in the cloud. You use the cloud to extract statistical information from a body of text. The statistical summary is used locally in your client application.

This example is from the MBrace Starter Kit.

Part 1 - Extract Statistics in the Cloud

1: 
2: 
3: 
4: 
5: 
#load "../lib/utils.fsx"

// Initialize client object to an MBrace cluster
let cluster = Config.GetCluster() 
let fs = cluster.Store.CloudFileSystem

Step 1: download text file from source, saving it to blob storage chunked into smaller files of 10000 lines each.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
let download (uri: string) = 
    cloud {
        let webClient = new WebClient()
        do! Cloud.Log "Begin file download" 
        let text = webClient.DownloadString(uri) 
        do! Cloud.Log "file downloaded" 
        // Partition the big text into smaller files 
        let! files = 
            text.Split('\n')
            |> Array.chunkBySize 10000
            |> Array.mapi (fun index lines -> 
                 local { 
                    fs.File.Delete(sprintf "text/%d.txt" index) 
                    let file = fs.File.WriteAllLines(path = sprintf "text/%d.txt" index, lines = lines) 
                    return file })
            |> Local.Parallel
        return files
    }

let downloadTask = download "http://norvig.com/big.txt" |> cluster.CreateProcess

downloadTask.ShowInfo()

let files = downloadTask.Result

(** Now, take a look at the sizes of the files. *) 
let fileSizesJob = 
    files
    |> Array.map (fun f -> CloudFile.GetSize f.Path)
    |> Cloud.ParallelBalanced
    |> cluster.CreateProcess 

fileSizesJob.Status
fileSizesJob.ShowInfo()

let fileSizes = fileSizesJob.Result

In the second step, use cloud data flow to perform a parallel word frequency count on the stored text.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
let regex = Regex("[a-zA-Z]+", RegexOptions.Compiled)
let wordCountJob = 
    files
    |> Array.map (fun f -> f.Path)
    |> CloudFlow.OfCloudFileByLine
    |> CloudFlow.collect (fun text -> regex.Matches(text) |> Seq.cast)
    |> CloudFlow.map (fun (m:Match) -> m.Value.ToLower()) 
    |> CloudFlow.countBy id 
    |> CloudFlow.toArray
    |> cluster.CreateProcess

wordCountJob.ShowInfo()

cluster.ShowProcesses()

let NWORDS = wordCountJob.Result |> Map.ofArray

Part 2 - Use the Frequency Counts in our Application

In the final step, use the calculated frequency counts to compute suggested spelling corrections in your client. At this point, you've finished using the cluster and no longer need it.
We have the computed the frequency table, all the rest of this example is run locally.

The statistics could be saved to disk for use in an application. We will use them directly in the client.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
37: 
38: 
39: 
40: 
41: 
42: 
43: 
44: 
45: 
46: 
47: 
48: 
49: 
50: 
let isKnown word = NWORDS.ContainsKey word 

(** Compute the 1-character edits of the word: *) 
let edits1 (word: string) = 
    let splits = [for i in 0 .. word.Length do yield (word.[0..i-1], word.[i..])]
    let deletes = [for a, b in splits do if b <> "" then yield a + b.[1..]]
    let transposes = [for a, b in splits do if b.Length > 1 then yield a + string b.[1] + string b.[0] + b.[2..]]
    let replaces = [for a, b in splits do for c in 'a'..'z' do if b <> "" then yield a + string c + b.[1..]]
    let inserts = [for a, b in splits do for c in 'a'..'z' do yield a + string c + b]
    deletes @ transposes @ replaces @ inserts |> Set.ofList

edits1 "speling"
edits1 "pgantom"

(** Compute the 1-character edits of the word which are actually words *) 
let knownEdits1 word = 
    let result = [for w in edits1 word do if Map.containsKey w NWORDS then yield w] |> Set.ofList
    if result.IsEmpty then None else Some result 

knownEdits1 "fantom"
knownEdits1 "pgantom"

(** Compute the 2-character edits of the word which are actually words *) 
let knownEdits2 word = 
    let result = [for e1 in edits1 word do for e2 in edits1 e1 do if Map.containsKey e2 NWORDS then yield e2] |> Set.ofList
    if result.IsEmpty then None else Some result 

knownEdits2 "pgantom"
knownEdits2 "quyck"


(** Find the best correction for a word, preferring 0-edit, over 1-edit, over 2-edit, and sorting by frequency. *) 
let findBestCorrection (word: string) = 
    let words = 
        if isKnown word then Set.ofList [word] 
        else 
            match knownEdits1 word with
            | Some words -> words
            | None ->
            match knownEdits2 word with
            | Some words -> words
            | None -> Set.ofList [word]

    words |> Seq.sortBy (fun w -> -NWORDS.[w]) |> Seq.head

// Examples
findBestCorrection "speling"
findBestCorrection "korrecter"
findBestCorrection "fantom"
findBestCorrection "pgantom"

In this example, you've seen how cloud tasks can be used to extract statistical information returned to the client. Continue with further samples to learn more about the MBrace programming model.

Note, you can use the above techniques from both scripts and compiled projects. To see the components referenced by this script, see ThespianCluster.fsx or AzureCluster.fsx.

namespace System
namespace System.IO
namespace System.Net
namespace System.Text
namespace System.Text.RegularExpressions
namespace MBrace
namespace MBrace.Core
namespace MBrace.Flow
val cluster : MBrace.Thespian.ThespianCluster

Full name: 200-norvigs-spelling-corrector-example.cluster
module Config
val GetCluster : unit -> MBrace.Thespian.ThespianCluster

Full name: Config.GetCluster


 Gets or creates a new Thespian cluster session.
val fs : MBrace.Core.Internals.CloudFileSystem

Full name: 200-norvigs-spelling-corrector-example.fs
property MBrace.Runtime.MBraceClient.Store: MBrace.Core.Internals.CloudStoreClient
property MBrace.Core.Internals.CloudStoreClient.CloudFileSystem: MBrace.Core.Internals.CloudFileSystem
val download : uri:string -> 'a

Full name: 200-norvigs-spelling-corrector-example.download
val uri : string
Multiple items
val string : value:'T -> string

Full name: Microsoft.FSharp.Core.Operators.string

--------------------
type string = String

Full name: Microsoft.FSharp.Core.string
Multiple items
type WebClient =
  inherit Component
  new : unit -> WebClient
  member BaseAddress : string with get, set
  member CachePolicy : RequestCachePolicy with get, set
  member CancelAsync : unit -> unit
  member Credentials : ICredentials with get, set
  member DownloadData : address:string -> byte[] + 1 overload
  member DownloadDataAsync : address:Uri -> unit + 1 overload
  member DownloadFile : address:string * fileName:string -> unit + 1 overload
  member DownloadFileAsync : address:Uri * fileName:string -> unit + 1 overload
  member DownloadString : address:string -> string + 1 overload
  ...

Full name: System.Net.WebClient

--------------------
WebClient() : unit
type Array =
  member Clone : unit -> obj
  member CopyTo : array:Array * index:int -> unit + 1 overload
  member GetEnumerator : unit -> IEnumerator
  member GetLength : dimension:int -> int
  member GetLongLength : dimension:int -> int64
  member GetLowerBound : dimension:int -> int
  member GetUpperBound : dimension:int -> int
  member GetValue : [<ParamArray>] indices:int[] -> obj + 7 overloads
  member Initialize : unit -> unit
  member IsFixedSize : bool
  ...

Full name: System.Array
Multiple items
val chunkBySize : n:int -> numbers:'T [] -> 'T [] []

Full name: Utils.Array.chunkBySize

--------------------
val chunkBySize : chunkSize:int -> array:'T [] -> 'T [] []

Full name: Microsoft.FSharp.Collections.Array.chunkBySize
val mapi : mapping:(int -> 'T -> 'U) -> array:'T [] -> 'U []

Full name: Microsoft.FSharp.Collections.Array.mapi
property MBrace.Core.Internals.CloudFileSystem.File: MBrace.Core.Internals.CloudFileClient
member MBrace.Core.Internals.CloudFileClient.Delete : path:string -> unit
val sprintf : format:Printf.StringFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.sprintf
member MBrace.Core.Internals.CloudFileClient.WriteAllLines : path:string * lines:seq<string> * ?encoding:Text.Encoding -> MBrace.Core.CloudFileInfo
val downloadTask : MBrace.Runtime.CloudProcess<obj []>

Full name: 200-norvigs-spelling-corrector-example.downloadTask
member MBrace.Runtime.MBraceClient.CreateProcess : workflow:MBrace.Core.Cloud<'T> * ?cancellationToken:MBrace.Core.ICloudCancellationToken * ?faultPolicy:MBrace.Core.FaultPolicy * ?target:MBrace.Core.IWorkerRef * ?additionalResources:MBrace.Core.Internals.ResourceRegistry * ?taskName:string -> MBrace.Runtime.CloudProcess<'T>
member MBrace.Runtime.CloudProcess.ShowInfo : unit -> unit
val files : obj []

Full name: 200-norvigs-spelling-corrector-example.files
property MBrace.Runtime.CloudProcess.Result: obj []
val fileSizesJob : MBrace.Runtime.CloudProcess<obj>

Full name: 200-norvigs-spelling-corrector-example.fileSizesJob
val map : mapping:('T -> 'U) -> array:'T [] -> 'U []

Full name: Microsoft.FSharp.Collections.Array.map
val f : obj
type Path =
  static val DirectorySeparatorChar : char
  static val AltDirectorySeparatorChar : char
  static val VolumeSeparatorChar : char
  static val InvalidPathChars : char[]
  static val PathSeparator : char
  static member ChangeExtension : path:string * extension:string -> string
  static member Combine : [<ParamArray>] paths:string[] -> string + 3 overloads
  static member GetDirectoryName : path:string -> string
  static member GetExtension : path:string -> string
  static member GetFileName : path:string -> string
  ...

Full name: System.IO.Path
property MBrace.Runtime.CloudProcess.Status: MBrace.Core.CloudProcessStatus
val fileSizes : obj

Full name: 200-norvigs-spelling-corrector-example.fileSizes
property MBrace.Runtime.CloudProcess.Result: obj
val regex : Regex

Full name: 200-norvigs-spelling-corrector-example.regex
Multiple items
type Regex =
  new : pattern:string -> Regex + 1 overload
  member GetGroupNames : unit -> string[]
  member GetGroupNumbers : unit -> int[]
  member GroupNameFromNumber : i:int -> string
  member GroupNumberFromName : name:string -> int
  member IsMatch : input:string -> bool + 1 overload
  member Match : input:string -> Match + 2 overloads
  member Matches : input:string -> MatchCollection + 1 overload
  member Options : RegexOptions
  member Replace : input:string * replacement:string -> string + 5 overloads
  ...

Full name: System.Text.RegularExpressions.Regex

--------------------
Regex(pattern: string) : unit
Regex(pattern: string, options: RegexOptions) : unit
type RegexOptions =
  | None = 0
  | IgnoreCase = 1
  | Multiline = 2
  | ExplicitCapture = 4
  | Compiled = 8
  | Singleline = 16
  | IgnorePatternWhitespace = 32
  | RightToLeft = 64
  | ECMAScript = 256
  | CultureInvariant = 512

Full name: System.Text.RegularExpressions.RegexOptions
field RegexOptions.Compiled = 8
val wordCountJob : MBrace.Runtime.CloudProcess<(string * int64) []>

Full name: 200-norvigs-spelling-corrector-example.wordCountJob
Multiple items
module CloudFlow

from MBrace.Flow

--------------------
module CloudFlow

from Utils

--------------------
type CloudFlow =
  static member OfArray : source:'T [] -> CloudFlow<'T>
  static member OfCloudArrays : cloudArrays:seq<#CloudArray<'T>> -> LocalCloud<PersistedCloudFlow<'T>>
  static member OfCloudCollection : collection:ICloudCollection<'T> * ?sizeThresholdPerWorker:(unit -> int64) -> CloudFlow<'T>
  static member OfCloudDirectory : dirPath:string * serializer:ISerializer * ?sizeThresholdPerCore:int64 -> CloudFlow<'T>
  static member OfCloudDirectory : dirPath:string * ?deserializer:(Stream -> seq<'T>) * ?sizeThresholdPerCore:int64 -> CloudFlow<'T>
  static member OfCloudDirectory : dirPath:string * deserializer:(TextReader -> seq<'T>) * ?encoding:Encoding * ?sizeThresholdPerCore:int64 -> CloudFlow<'T>
  static member OfCloudDirectoryByLine : dirPath:string * ?encoding:Encoding * ?sizeThresholdPerCore:int64 -> CloudFlow<string>
  static member OfCloudFileByLine : path:string * ?encoding:Encoding -> CloudFlow<string>
  static member OfCloudFileByLine : paths:seq<string> * ?encoding:Encoding * ?sizeThresholdPerCore:int64 -> CloudFlow<string>
  static member OfCloudFiles : paths:seq<string> * serializer:ISerializer * ?sizeThresholdPerCore:int64 -> CloudFlow<'T>
  ...

Full name: MBrace.Flow.CloudFlow

--------------------
type CloudFlow<'T> =
  interface
    abstract member WithEvaluators : collectorFactory:LocalCloud<Collector<'T,'S>> -> projection:('S -> LocalCloud<'R>) -> combiner:('R [] -> LocalCloud<'R>) -> Cloud<'R>
    abstract member DegreeOfParallelism : int option
  end

Full name: MBrace.Flow.CloudFlow<_>
static member CloudFlow.OfCloudFileByLine : path:string * ?encoding:Text.Encoding -> CloudFlow<string>
static member CloudFlow.OfCloudFileByLine : paths:seq<string> * ?encoding:Text.Encoding * ?sizeThresholdPerCore:int64 -> CloudFlow<string>
val collect : f:('T -> #seq<'R>) -> flow:CloudFlow<'T> -> CloudFlow<'R>

Full name: MBrace.Flow.CloudFlow.collect
val text : string
Regex.Matches(input: string) : MatchCollection
Regex.Matches(input: string, startat: int) : MatchCollection
module Seq

from Microsoft.FSharp.Collections
val cast : source:Collections.IEnumerable -> seq<'T>

Full name: Microsoft.FSharp.Collections.Seq.cast
val map : f:('T -> 'R) -> flow:CloudFlow<'T> -> CloudFlow<'R>

Full name: MBrace.Flow.CloudFlow.map
val m : Match
type Match =
  inherit Group
  member Groups : GroupCollection
  member NextMatch : unit -> Match
  member Result : replacement:string -> string
  static member Empty : Match
  static member Synchronized : inner:Match -> Match

Full name: System.Text.RegularExpressions.Match
property Capture.Value: string
String.ToLower() : string
String.ToLower(culture: Globalization.CultureInfo) : string
val countBy : projection:('T -> 'Key) -> flow:CloudFlow<'T> -> CloudFlow<'Key * int64> (requires equality)

Full name: MBrace.Flow.CloudFlow.countBy
val id : x:'T -> 'T

Full name: Microsoft.FSharp.Core.Operators.id
val toArray : flow:CloudFlow<'T> -> MBrace.Core.Cloud<'T []>

Full name: MBrace.Flow.CloudFlow.toArray
member MBrace.Runtime.MBraceClient.ShowProcesses : unit -> unit
val NWORDS : Map<string,int64>

Full name: 200-norvigs-spelling-corrector-example.NWORDS
property MBrace.Runtime.CloudProcess.Result: (string * int64) []
Multiple items
module Map

from Microsoft.FSharp.Collections

--------------------
type Map<'Key,'Value (requires comparison)> =
  interface IEnumerable
  interface IComparable
  interface IEnumerable<KeyValuePair<'Key,'Value>>
  interface ICollection<KeyValuePair<'Key,'Value>>
  interface IDictionary<'Key,'Value>
  new : elements:seq<'Key * 'Value> -> Map<'Key,'Value>
  member Add : key:'Key * value:'Value -> Map<'Key,'Value>
  member ContainsKey : key:'Key -> bool
  override Equals : obj -> bool
  member Remove : key:'Key -> Map<'Key,'Value>
  ...

Full name: Microsoft.FSharp.Collections.Map<_,_>

--------------------
new : elements:seq<'Key * 'Value> -> Map<'Key,'Value>
val ofArray : elements:('Key * 'T) [] -> Map<'Key,'T> (requires comparison)

Full name: Microsoft.FSharp.Collections.Map.ofArray
val isKnown : word:string -> bool

Full name: 200-norvigs-spelling-corrector-example.isKnown
val word : string
member Map.ContainsKey : key:'Key -> bool
val edits1 : word:string -> Set<string>

Full name: 200-norvigs-spelling-corrector-example.edits1
val splits : (string * string) list
val i : int
property String.Length: int
val deletes : string list
val a : string
val b : string
val transposes : string list
val replaces : string list
val c : char
val inserts : string list
Multiple items
module Set

from Microsoft.FSharp.Collections

--------------------
type Set<'T (requires comparison)> =
  interface IComparable
  interface IEnumerable
  interface IEnumerable<'T>
  interface ICollection<'T>
  new : elements:seq<'T> -> Set<'T>
  member Add : value:'T -> Set<'T>
  member Contains : value:'T -> bool
  override Equals : obj -> bool
  member IsProperSubsetOf : otherSet:Set<'T> -> bool
  member IsProperSupersetOf : otherSet:Set<'T> -> bool
  ...

Full name: Microsoft.FSharp.Collections.Set<_>

--------------------
new : elements:seq<'T> -> Set<'T>
val ofList : elements:'T list -> Set<'T> (requires comparison)

Full name: Microsoft.FSharp.Collections.Set.ofList
val knownEdits1 : word:string -> Set<string> option

Full name: 200-norvigs-spelling-corrector-example.knownEdits1
val result : Set<string>
val w : string
val containsKey : key:'Key -> table:Map<'Key,'T> -> bool (requires comparison)

Full name: Microsoft.FSharp.Collections.Map.containsKey
property Set.IsEmpty: bool
union case Option.None: Option<'T>
union case Option.Some: Value: 'T -> Option<'T>
val knownEdits2 : word:string -> Set<string> option

Full name: 200-norvigs-spelling-corrector-example.knownEdits2
val e1 : string
val e2 : string
val findBestCorrection : word:string -> string

Full name: 200-norvigs-spelling-corrector-example.findBestCorrection
val words : Set<string>
val sortBy : projection:('T -> 'Key) -> source:seq<'T> -> seq<'T> (requires comparison)

Full name: Microsoft.FSharp.Collections.Seq.sortBy
val head : source:seq<'T> -> 'T

Full name: Microsoft.FSharp.Collections.Seq.head
Fork me on GitHub