MBrace.Core and MBrace.Azure


Example: Cloud-distributed WordCount

This example is from the MBrace Starter Kit.

This example implements the classic word count example commonly associated with distributed Map/Reduce frameworks. We use CloudFlow for the implementation and textfiles.com as our data source.

First, some basic type definitions:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
#load "../lib/utils.fsx"
#load "../lib/textfiles.fsx"

type WordFrequency = string * int64
type WordCount = WordFrequency []

// Regex word tokenizer
let wordRegex = new Regex(@"[\W]+", RegexOptions.Compiled)
let splitToWords (line : string) = wordRegex.Split line

/// normalize word
let normalize (word : string) = word.Trim().ToLower()

Now, define the words to ignore in the word count:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
/// The words ignored by wordcount
let noiseWords =
    hashSet [  
        "about"; "above"; "along"; "also"; "although"; "aren't"; "because"; "been";
        "cannot"; "could"; "couldn't"; "didn't"; "does"; "doesn't"; "e.g.";
        "either"; "etc."; "even"; "ever"; "from"; "further"; "gets"; "hardly";
        "hasn't"; "having"; "hence"; "here"; "hereby"; "herein"; "hereof";
        "hereon"; "hereto"; "herewith"; "however"; "i.e."; "into"; "it's"; "more";
        "most"; "near"; "onto"; "other"; "over"; "really"; "said"; "same";
        "should"; "shouldn't"; "since"; "some"; "such"; "than"; "that"; "their";
        "them"; "then"; "there"; "thereby"; "therefore"; "therefrom"; "therein";
        "thereof"; "thereon"; "thereto"; "therewith"; "these"; "they"; "this";
        "those"; "through"; "thus"; "under"; "until"; "unto"; "upon"; "very";
        "wasn't"; "were"; "what"; "when"; "where"; "whereby"; "wherein"; "whether";
        "which"; "while"; "whom"; "whose"; "with"; "without"; "would"; "your";
        "have"; "thou"; "will"; "shall" ]

/// specifies whether word is noise
let isNoiseWord (word : string) = word.Length <= 3 || noiseWords.Contains word

We are now ready to define our distributed workflows. First, we create a distributed download workflow that caches the contents of supplied urls across the cluster. This returns a PersistedCloudFlow type that can be readily used for consumption by future flow queries.

1: 
2: 
3: 
4: 
/// Downloads and caches text files across the cluster
let downloadAndCacheTextFiles (urls : seq<string>) : Cloud<PersistedCloudFlow<string>> =
    CloudFlow.OfHttpFileByLine urls
    |> CloudFlow.persist StorageLevel.Memory

The wordcount function can now be defined:

1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 
9: 
/// Computes the word count using the input cloud flow
let computeWordCount (cutoff : int) (lines : CloudFlow<string>) : Cloud<WordCount> =
    lines
    |> CloudFlow.collect splitToWords
    |> CloudFlow.map normalize
    |> CloudFlow.filter (not << isNoiseWord)
    |> CloudFlow.countBy id
    |> CloudFlow.sortBy (fun (_,c) -> -c) cutoff
    |> CloudFlow.toArray

Test the wordcount sample using textfiles.com

Step 1. Determine URIs to data inputs from textfiles.com

1: 
2: 
let files = TextFiles.crawlForTextFiles() // get text file data from textfiles.com
let testedFiles = files // |> Seq.take 50 // uncomment to use a smaller dataset

Step 2. Download URIs to across cluster and load in memory

1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 
let downloadTask = 
    downloadAndCacheTextFiles testedFiles
    |> cluster.CreateProcess

cluster.ShowWorkers()
cluster.ShowProcesses()

let persistedFlow = downloadTask.Result // get PersistedCloudFlow

Step 3. Perform wordcount on downloaded data

1: 
2: 
3: 
let wordCountTask = 
    computeWordCount 100 persistedFlow 
    |> cluster.CreateProcess

Check progress:

1: 
2: 
cluster.ShowWorkers()
cluster.ShowProcesses()

Wait for the results:

1: 
wordCountTask.Result

In this tutorial, you've learned how to perform a scalable textual analysis task using MBrace. Continue with further samples to learn more about the MBrace programming model.

Note, you can use the above techniques from both scripts and compiled projects. To see the components referenced by this script, see ThespianCluster.fsx or AzureCluster.fsx.

namespace System
namespace System.Collections
namespace System.Collections.Generic
namespace System.IO
namespace System.Text
namespace System.Text.RegularExpressions
namespace MBrace
namespace MBrace.Core
namespace MBrace.Flow
val cluster : MBrace.Thespian.ThespianCluster

Full name: 200-wordcount-example.cluster
module Config
val GetCluster : unit -> MBrace.Thespian.ThespianCluster

Full name: Config.GetCluster


 Gets or creates a new Thespian cluster session.
type WordFrequency = string * int64

Full name: 200-wordcount-example.WordFrequency
Multiple items
val string : value:'T -> string

Full name: Microsoft.FSharp.Core.Operators.string

--------------------
type string = String

Full name: Microsoft.FSharp.Core.string
Multiple items
val int64 : value:'T -> int64 (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.int64

--------------------
type int64 = Int64

Full name: Microsoft.FSharp.Core.int64

--------------------
type int64<'Measure> = int64

Full name: Microsoft.FSharp.Core.int64<_>
type WordCount = WordFrequency []

Full name: 200-wordcount-example.WordCount
val wordRegex : Regex

Full name: 200-wordcount-example.wordRegex
Multiple items
type Regex =
  new : pattern:string -> Regex + 1 overload
  member GetGroupNames : unit -> string[]
  member GetGroupNumbers : unit -> int[]
  member GroupNameFromNumber : i:int -> string
  member GroupNumberFromName : name:string -> int
  member IsMatch : input:string -> bool + 1 overload
  member Match : input:string -> Match + 2 overloads
  member Matches : input:string -> MatchCollection + 1 overload
  member Options : RegexOptions
  member Replace : input:string * replacement:string -> string + 5 overloads
  ...

Full name: System.Text.RegularExpressions.Regex

--------------------
Regex(pattern: string) : unit
Regex(pattern: string, options: RegexOptions) : unit
type RegexOptions =
  | None = 0
  | IgnoreCase = 1
  | Multiline = 2
  | ExplicitCapture = 4
  | Compiled = 8
  | Singleline = 16
  | IgnorePatternWhitespace = 32
  | RightToLeft = 64
  | ECMAScript = 256
  | CultureInvariant = 512

Full name: System.Text.RegularExpressions.RegexOptions
field RegexOptions.Compiled = 8
val splitToWords : line:string -> string []

Full name: 200-wordcount-example.splitToWords
val line : string
Regex.Split(input: string) : string []
Regex.Split(input: string, count: int) : string []
Regex.Split(input: string, count: int, startat: int) : string []
val normalize : word:string -> string

Full name: 200-wordcount-example.normalize


 normalize word
val word : string
String.Trim() : string
String.Trim([<ParamArray>] trimChars: char []) : string
val noiseWords : HashSet<string>

Full name: 200-wordcount-example.noiseWords


 The words ignored by wordcount
val hashSet : ts:seq<'T> -> HashSet<'T>

Full name: Utils.hashSet


 Creates a new HashSet with provided sequence
val isNoiseWord : word:string -> bool

Full name: 200-wordcount-example.isNoiseWord


 specifies whether word is noise
property String.Length: int
HashSet.Contains(item: string) : bool
val downloadAndCacheTextFiles : urls:seq<string> -> MBrace.Core.Cloud<PersistedCloudFlow<string>>

Full name: 200-wordcount-example.downloadAndCacheTextFiles


 Downloads and caches text files across the cluster
val urls : seq<string>
Multiple items
val seq : sequence:seq<'T> -> seq<'T>

Full name: Microsoft.FSharp.Core.Operators.seq

--------------------
type seq<'T> = IEnumerable<'T>

Full name: Microsoft.FSharp.Collections.seq<_>
Multiple items
type PersistedCloudFlow =
  private new : unit -> PersistedCloudFlow
  static member Concat : vectors:seq<PersistedCloudFlow<'T>> -> PersistedCloudFlow<'T>
  static member private New : elems:seq<'T> * ?storageLevel:StorageLevel * ?targetWorker:IWorkerRef * ?partitionThreshold:int64 -> LocalCloud<PersistedCloudFlow<'T>>
  static member OfCloudArrays : cloudArrays:seq<#CloudArray<'T>> -> LocalCloud<PersistedCloudFlow<'T>>
  static member OfCloudFlow : flow:CloudFlow<'T> * ?storageLevel:StorageLevel * ?partitionThreshold:int64 -> Cloud<PersistedCloudFlow<'T>>

Full name: MBrace.Flow.PersistedCloudFlow

--------------------
type PersistedCloudFlow<'T> =
  interface ICloudDisposable
  interface CloudFlow<'T>
  interface ITargetedPartitionCollection<'T>
  private new : partitions:(IWorkerRef * CloudArray<'T>) [] -> PersistedCloudFlow<'T>
  member GetInfo : unit -> string
  member GetPartitions : unit -> (IWorkerRef * CloudArray<'T>) []
  member ShowInfo : unit -> unit
  member ToEnumerable : unit -> seq<'T>
  override ToString : unit -> string
  member ElementCount : int64
  ...

Full name: MBrace.Flow.PersistedCloudFlow<_>
Multiple items
module CloudFlow

from MBrace.Flow

--------------------
module CloudFlow

from Utils

--------------------
type CloudFlow =
  static member OfArray : source:'T [] -> CloudFlow<'T>
  static member OfCloudArrays : cloudArrays:seq<#CloudArray<'T>> -> LocalCloud<PersistedCloudFlow<'T>>
  static member OfCloudCollection : collection:ICloudCollection<'T> * ?sizeThresholdPerWorker:(unit -> int64) -> CloudFlow<'T>
  static member OfCloudDirectory : dirPath:string * serializer:ISerializer * ?sizeThresholdPerCore:int64 -> CloudFlow<'T>
  static member OfCloudDirectory : dirPath:string * ?deserializer:(Stream -> seq<'T>) * ?sizeThresholdPerCore:int64 -> CloudFlow<'T>
  static member OfCloudDirectory : dirPath:string * deserializer:(TextReader -> seq<'T>) * ?encoding:Encoding * ?sizeThresholdPerCore:int64 -> CloudFlow<'T>
  static member OfCloudDirectoryByLine : dirPath:string * ?encoding:Encoding * ?sizeThresholdPerCore:int64 -> CloudFlow<string>
  static member OfCloudFileByLine : path:string * ?encoding:Encoding -> CloudFlow<string>
  static member OfCloudFileByLine : paths:seq<string> * ?encoding:Encoding * ?sizeThresholdPerCore:int64 -> CloudFlow<string>
  static member OfCloudFiles : paths:seq<string> * serializer:ISerializer * ?sizeThresholdPerCore:int64 -> CloudFlow<'T>
  ...

Full name: MBrace.Flow.CloudFlow

--------------------
type CloudFlow<'T> =
  interface
    abstract member WithEvaluators : collectorFactory:LocalCloud<Collector<'T,'S>> -> projection:('S -> LocalCloud<'R>) -> combiner:('R [] -> LocalCloud<'R>) -> Cloud<'R>
    abstract member DegreeOfParallelism : int option
  end

Full name: MBrace.Flow.CloudFlow<_>
static member CloudFlow.OfHttpFileByLine : urls:seq<string> * ?encoding:Text.Encoding -> CloudFlow<string>
static member CloudFlow.OfHttpFileByLine : url:string * ?encoding:Text.Encoding -> CloudFlow<string>
val persist : storageLevel:MBrace.Core.StorageLevel -> flow:CloudFlow<'T> -> MBrace.Core.Cloud<PersistedCloudFlow<'T>>

Full name: MBrace.Flow.CloudFlow.persist
val computeWordCount : cutoff:int -> lines:CloudFlow<string> -> MBrace.Core.Cloud<(string * int64) []>

Full name: 200-wordcount-example.computeWordCount


 Computes the word count using the input cloud flow
val cutoff : int
Multiple items
val int : value:'T -> int (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.int

--------------------
type int = int32

Full name: Microsoft.FSharp.Core.int

--------------------
type int<'Measure> = int

Full name: Microsoft.FSharp.Core.int<_>
val lines : CloudFlow<string>
val collect : f:('T -> #seq<'R>) -> flow:CloudFlow<'T> -> CloudFlow<'R>

Full name: MBrace.Flow.CloudFlow.collect
val map : f:('T -> 'R) -> flow:CloudFlow<'T> -> CloudFlow<'R>

Full name: MBrace.Flow.CloudFlow.map
val filter : predicate:('T -> bool) -> flow:CloudFlow<'T> -> CloudFlow<'T>

Full name: MBrace.Flow.CloudFlow.filter
val not : value:bool -> bool

Full name: Microsoft.FSharp.Core.Operators.not
val countBy : projection:('T -> 'Key) -> flow:CloudFlow<'T> -> CloudFlow<'Key * int64> (requires equality)

Full name: MBrace.Flow.CloudFlow.countBy
val id : x:'T -> 'T

Full name: Microsoft.FSharp.Core.Operators.id
val sortBy : projection:('T -> 'Key) -> takeCount:int -> flow:CloudFlow<'T> -> CloudFlow<'T> (requires comparison)

Full name: MBrace.Flow.CloudFlow.sortBy
val c : int64
val toArray : flow:CloudFlow<'T> -> MBrace.Core.Cloud<'T []>

Full name: MBrace.Flow.CloudFlow.toArray
val files : string []

Full name: 200-wordcount-example.files
module TextFiles


 Collection of utilities for downloading text files from textfiles.com
val crawlForTextFiles : unit -> string []

Full name: TextFiles.crawlForTextFiles


 Crawls for text files found in textfiles.com
val testedFiles : string []

Full name: 200-wordcount-example.testedFiles
val downloadTask : MBrace.Runtime.CloudProcess<PersistedCloudFlow<string>>

Full name: 200-wordcount-example.downloadTask
member MBrace.Runtime.MBraceClient.CreateProcess : workflow:MBrace.Core.Cloud<'T> * ?cancellationToken:MBrace.Core.ICloudCancellationToken * ?faultPolicy:MBrace.Core.FaultPolicy * ?target:MBrace.Core.IWorkerRef * ?additionalResources:MBrace.Core.Internals.ResourceRegistry * ?taskName:string -> MBrace.Runtime.CloudProcess<'T>
member MBrace.Runtime.MBraceClient.ShowWorkers : unit -> unit
member MBrace.Runtime.MBraceClient.ShowProcesses : unit -> unit
val persistedFlow : PersistedCloudFlow<string>

Full name: 200-wordcount-example.persistedFlow
property MBrace.Runtime.CloudProcess.Result: PersistedCloudFlow<string>
val wordCountTask : MBrace.Runtime.CloudProcess<(string * int64) []>

Full name: 200-wordcount-example.wordCountTask
property MBrace.Runtime.CloudProcess.Result: (string * int64) []
Fork me on GitHub