Example: Cloud-distributed WordCount
This example is from the MBrace Starter Kit.
This example implements the classic word count example commonly associated with distributed Map/Reduce frameworks. We use CloudFlow for the implementation and textfiles.com as our data source.
First, some basic type definitions:
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: |
|
Now, define the words to ignore in the word count:
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: 17: 18: 19: |
|
We are now ready to define our distributed workflows. First, we create a distributed download workflow that caches the contents of supplied urls across the cluster. This returns a PersistedCloudFlow type that can be readily used for consumption by future flow queries.
1: 2: 3: 4: |
|
The wordcount function can now be defined:
1: 2: 3: 4: 5: 6: 7: 8: 9: |
|
Test the wordcount sample using textfiles.com
Step 1. Determine URIs to data inputs from textfiles.com
1: 2: |
|
Step 2. Download URIs to across cluster and load in memory
1: 2: 3: 4: 5: 6: 7: 8: |
|
Step 3. Perform wordcount on downloaded data
1: 2: 3: |
|
Check progress:
1: 2: |
|
Wait for the results:
1:
|
|
In this tutorial, you've learned how to perform a scalable textual analysis task using MBrace. Continue with further samples to learn more about the MBrace programming model.
Note, you can use the above techniques from both scripts and compiled projects. To see the components referenced by this script, see ThespianCluster.fsx or AzureCluster.fsx.
Full name: 200-wordcount-example.cluster
Full name: Config.GetCluster
Gets or creates a new Thespian cluster session.
Full name: 200-wordcount-example.WordFrequency
val string : value:'T -> string
Full name: Microsoft.FSharp.Core.Operators.string
--------------------
type string = String
Full name: Microsoft.FSharp.Core.string
val int64 : value:'T -> int64 (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.int64
--------------------
type int64 = Int64
Full name: Microsoft.FSharp.Core.int64
--------------------
type int64<'Measure> = int64
Full name: Microsoft.FSharp.Core.int64<_>
Full name: 200-wordcount-example.WordCount
Full name: 200-wordcount-example.wordRegex
type Regex =
new : pattern:string -> Regex + 1 overload
member GetGroupNames : unit -> string[]
member GetGroupNumbers : unit -> int[]
member GroupNameFromNumber : i:int -> string
member GroupNumberFromName : name:string -> int
member IsMatch : input:string -> bool + 1 overload
member Match : input:string -> Match + 2 overloads
member Matches : input:string -> MatchCollection + 1 overload
member Options : RegexOptions
member Replace : input:string * replacement:string -> string + 5 overloads
...
Full name: System.Text.RegularExpressions.Regex
--------------------
Regex(pattern: string) : unit
Regex(pattern: string, options: RegexOptions) : unit
| None = 0
| IgnoreCase = 1
| Multiline = 2
| ExplicitCapture = 4
| Compiled = 8
| Singleline = 16
| IgnorePatternWhitespace = 32
| RightToLeft = 64
| ECMAScript = 256
| CultureInvariant = 512
Full name: System.Text.RegularExpressions.RegexOptions
Full name: 200-wordcount-example.splitToWords
Regex.Split(input: string, count: int) : string []
Regex.Split(input: string, count: int, startat: int) : string []
Full name: 200-wordcount-example.normalize
normalize word
String.Trim([<ParamArray>] trimChars: char []) : string
Full name: 200-wordcount-example.noiseWords
The words ignored by wordcount
Full name: Utils.hashSet
Creates a new HashSet with provided sequence
Full name: 200-wordcount-example.isNoiseWord
specifies whether word is noise
Full name: 200-wordcount-example.downloadAndCacheTextFiles
Downloads and caches text files across the cluster
val seq : sequence:seq<'T> -> seq<'T>
Full name: Microsoft.FSharp.Core.Operators.seq
--------------------
type seq<'T> = IEnumerable<'T>
Full name: Microsoft.FSharp.Collections.seq<_>
type PersistedCloudFlow =
private new : unit -> PersistedCloudFlow
static member Concat : vectors:seq<PersistedCloudFlow<'T>> -> PersistedCloudFlow<'T>
static member private New : elems:seq<'T> * ?storageLevel:StorageLevel * ?targetWorker:IWorkerRef * ?partitionThreshold:int64 -> LocalCloud<PersistedCloudFlow<'T>>
static member OfCloudArrays : cloudArrays:seq<#CloudArray<'T>> -> LocalCloud<PersistedCloudFlow<'T>>
static member OfCloudFlow : flow:CloudFlow<'T> * ?storageLevel:StorageLevel * ?partitionThreshold:int64 -> Cloud<PersistedCloudFlow<'T>>
Full name: MBrace.Flow.PersistedCloudFlow
--------------------
type PersistedCloudFlow<'T> =
interface ICloudDisposable
interface CloudFlow<'T>
interface ITargetedPartitionCollection<'T>
private new : partitions:(IWorkerRef * CloudArray<'T>) [] -> PersistedCloudFlow<'T>
member GetInfo : unit -> string
member GetPartitions : unit -> (IWorkerRef * CloudArray<'T>) []
member ShowInfo : unit -> unit
member ToEnumerable : unit -> seq<'T>
override ToString : unit -> string
member ElementCount : int64
...
Full name: MBrace.Flow.PersistedCloudFlow<_>
module CloudFlow
from MBrace.Flow
--------------------
module CloudFlow
from Utils
--------------------
type CloudFlow =
static member OfArray : source:'T [] -> CloudFlow<'T>
static member OfCloudArrays : cloudArrays:seq<#CloudArray<'T>> -> LocalCloud<PersistedCloudFlow<'T>>
static member OfCloudCollection : collection:ICloudCollection<'T> * ?sizeThresholdPerWorker:(unit -> int64) -> CloudFlow<'T>
static member OfCloudDirectory : dirPath:string * serializer:ISerializer * ?sizeThresholdPerCore:int64 -> CloudFlow<'T>
static member OfCloudDirectory : dirPath:string * ?deserializer:(Stream -> seq<'T>) * ?sizeThresholdPerCore:int64 -> CloudFlow<'T>
static member OfCloudDirectory : dirPath:string * deserializer:(TextReader -> seq<'T>) * ?encoding:Encoding * ?sizeThresholdPerCore:int64 -> CloudFlow<'T>
static member OfCloudDirectoryByLine : dirPath:string * ?encoding:Encoding * ?sizeThresholdPerCore:int64 -> CloudFlow<string>
static member OfCloudFileByLine : path:string * ?encoding:Encoding -> CloudFlow<string>
static member OfCloudFileByLine : paths:seq<string> * ?encoding:Encoding * ?sizeThresholdPerCore:int64 -> CloudFlow<string>
static member OfCloudFiles : paths:seq<string> * serializer:ISerializer * ?sizeThresholdPerCore:int64 -> CloudFlow<'T>
...
Full name: MBrace.Flow.CloudFlow
--------------------
type CloudFlow<'T> =
interface
abstract member WithEvaluators : collectorFactory:LocalCloud<Collector<'T,'S>> -> projection:('S -> LocalCloud<'R>) -> combiner:('R [] -> LocalCloud<'R>) -> Cloud<'R>
abstract member DegreeOfParallelism : int option
end
Full name: MBrace.Flow.CloudFlow<_>
static member CloudFlow.OfHttpFileByLine : url:string * ?encoding:Text.Encoding -> CloudFlow<string>
Full name: MBrace.Flow.CloudFlow.persist
Full name: 200-wordcount-example.computeWordCount
Computes the word count using the input cloud flow
val int : value:'T -> int (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.int
--------------------
type int = int32
Full name: Microsoft.FSharp.Core.int
--------------------
type int<'Measure> = int
Full name: Microsoft.FSharp.Core.int<_>
Full name: MBrace.Flow.CloudFlow.collect
Full name: MBrace.Flow.CloudFlow.map
Full name: MBrace.Flow.CloudFlow.filter
Full name: Microsoft.FSharp.Core.Operators.not
Full name: MBrace.Flow.CloudFlow.countBy
Full name: Microsoft.FSharp.Core.Operators.id
Full name: MBrace.Flow.CloudFlow.sortBy
Full name: MBrace.Flow.CloudFlow.toArray
Full name: 200-wordcount-example.files
Collection of utilities for downloading text files from textfiles.com
Full name: TextFiles.crawlForTextFiles
Crawls for text files found in textfiles.com
Full name: 200-wordcount-example.testedFiles
Full name: 200-wordcount-example.downloadTask
Full name: 200-wordcount-example.persistedFlow
Full name: 200-wordcount-example.wordCountTask