MBrace.Core and MBrace.Azure


Creating and Using Cloud Files

This tutorial is from the MBrace Starter Kit.

MBrace clusters have a cloud file system mapped to the corresponding cloud fabric. This can be used like a distributed file system such as HDFS.

Accessing the Cloud File System from F# scripts

First let's define and use some Unix-like file functions to access the cloud file system from your F# client script. (Using these is optional: you can also use the MBrace API directly).

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
let fs = cluster.Store.CloudFileSystem

let root = fs.Path.DefaultDirectory

let (++) path1 path2 = fs.Path.Combine(path1, path2)

let ls path = fs.File.Enumerate(path)

let rec lsRec path = 
    seq { yield! fs.File.Enumerate(path)
          for d in fs.Directory.Enumerate(path) do 
              yield! lsRec path }

let mkdir path = fs.Directory.Create(path)

let rmdir path = fs.Directory.Delete(path)

let rmdirRec path = fs.Directory.Delete(path,recursiveDelete=true)

let randdir() = fs.Path.GetRandomDirectoryName()

let randfile() = fs.Path.GetRandomFilePath()

let rm path = fs.File.Delete path

let cat path = fs.File.ReadAllText path

let catLines path = fs.File.ReadAllLines path

let catBytes path = fs.File.ReadAllBytes path

let write path text = fs.File.WriteAllText(path, text)

let writeLines path lines = fs.File.WriteAllLines(path, lines)

let writeBytes path bytes = fs.File.WriteAllBytes(path, bytes)

You now use these functions to create directories and files:

1: 
2: 
3: 
4: 
5: 
6: 
mkdir (root ++ "data")

write (root ++ "data" ++ "humpty.txt") "All the king's horses and all the king's men"
writeLines (root ++ "data" ++ "spider.txt") [ for i in 0 .. 1000 -> "Incy wincy spider climed up the water spout" ]

ls (root ++ "data")

Now check you've created the files correctly:

1: 
2: 
cat (root ++ "data" ++ "spider.txt") 
catLines (root ++ "data" ++ "spider.txt") 

Now remove the directory of data:

1: 
rmdirRec (root ++ "data")

Progammatic upload of data as part of cloud workflows

The Unix-like abbreviations from the previous section are for use from your client scripts. You can also use the MBrace cloud file API directly from cloud workflows.

First, create a local temp file.

1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 
9: 
let localTmpFile = 
    let path = Path.GetTempFileName()
    let lines = 
        [ for i in 1 .. 1000 do 
             let time = DateTime.Now.Date.AddSeconds(float i)
             let text = sprintf "click user%d %s" (i%10) (time.ToString())
             yield text ]
    File.WriteAllLines(path, lines)
    path

Next, you upload the created file to the tmp container in cloud storage. The tmp container will be created if it does not exist.

1: 
let cloudFile = fs.File.Upload(localTmpFile, sprintf "%s/tmp/%s" root (Path.GetFileName localTmpFile))     

After uploading the file, you remove the local file.

1: 
File.Delete localTmpFile

Now process the file in the MBrace cluster. This cloud expression runs in the MBrace cluster.

1: 
2: 
3: 
4: 
5: 
6: 
7: 
let lines = 
    cloud {
        let lines = fs.File.ReadAllLines cloudFile.Path
        let users = [ for line in lines -> line.Split(' ').[1] ]
        return users |> Seq.distinct |> Seq.toList
    } 
    |> cluster.Run

Using multiple cloud files as input to distributed cloud flows

Processing one small file in the cloud is not of much use. However multiple, large cloud files can be used as inputs to distributed cloud flows in a similar way to map-reduce jobs in Hadoop.

Next you generate a collection of 100 cloud files and process them using a distributed cloud flow.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
let dataDir = root ++ "data"

mkdir dataDir

let cloudFiles = 
    [ for i in 1 .. 100 ->
        local { 
            let lines = 
                [ for j in 1 .. 100000 -> 
                    "file " + string i + ", item " + string (i*100+j) + ", " + string (j+i*100) ] 
            let nm = dataDir + "/file" + string i
            do! CloudFile.Delete(nm)
            let file = fs.File.WriteAllLines(nm, lines)
            return file.Path
        } ]
   |> Cloud.ParallelBalanced
   |> cluster.Run

A collection of cloud files can be used as input to a cloud parallel data flow, summing the third column of each line of each file in a distributed way.

1: 
2: 
3: 
4: 
5: 
6: 
let sumOfLengthsOfLines =
    cloudFiles
    |> CloudFlow.OfCloudFileByLine
    |> CloudFlow.map (fun line -> line.Split(',').[2] |> int)
    |> CloudFlow.sum
    |> cluster.Run

Cleanup the cloud data

1: 
rmdirRec (root ++ "data")

Summary

In this tutorial, you've learned how to use cloud files, from some simple Unix-like operations to using multiple cloud files as partitioned inputs into CloudFlow programming. Continue with further samples to learn more about the MBrace programming model.

Note, you can use the above techniques from both scripts and compiled projects. To see the components referenced by this script, see ThespianCluster.fsx or AzureCluster.fsx.

namespace System
namespace System.IO
namespace MBrace
namespace MBrace.Core
namespace MBrace.Flow
val cluster : MBrace.Thespian.ThespianCluster

Full name: 7-using-cloud-data-files.cluster
module Config
val GetCluster : unit -> MBrace.Thespian.ThespianCluster

Full name: Config.GetCluster


 Gets or creates a new Thespian cluster session.
val fs : MBrace.Core.Internals.CloudFileSystem

Full name: 7-using-cloud-data-files.fs
property MBrace.Runtime.MBraceClient.Store: MBrace.Core.Internals.CloudStoreClient
property MBrace.Core.Internals.CloudStoreClient.CloudFileSystem: MBrace.Core.Internals.CloudFileSystem
val root : string

Full name: 7-using-cloud-data-files.root
property MBrace.Core.Internals.CloudFileSystem.Path: MBrace.Core.Internals.CloudPathClient
property MBrace.Core.Internals.CloudPathClient.DefaultDirectory: string
val path1 : string
val path2 : string
member MBrace.Core.Internals.CloudPathClient.Combine : [<ParamArray>] paths:string [] -> string
val ls : path:string -> MBrace.Core.CloudFileInfo []

Full name: 7-using-cloud-data-files.ls
val path : string
property MBrace.Core.Internals.CloudFileSystem.File: MBrace.Core.Internals.CloudFileClient
member MBrace.Core.Internals.CloudFileClient.Enumerate : dirPath:string -> MBrace.Core.CloudFileInfo []
val lsRec : path:string -> seq<MBrace.Core.CloudFileInfo>

Full name: 7-using-cloud-data-files.lsRec
Multiple items
val seq : sequence:seq<'T> -> seq<'T>

Full name: Microsoft.FSharp.Core.Operators.seq

--------------------
type seq<'T> = Collections.Generic.IEnumerable<'T>

Full name: Microsoft.FSharp.Collections.seq<_>
val d : MBrace.Core.CloudDirectoryInfo
property MBrace.Core.Internals.CloudFileSystem.Directory: MBrace.Core.Internals.CloudDirectoryClient
member MBrace.Core.Internals.CloudDirectoryClient.Enumerate : dirPath:string -> MBrace.Core.CloudDirectoryInfo []
val mkdir : path:string -> MBrace.Core.CloudDirectoryInfo

Full name: 7-using-cloud-data-files.mkdir
member MBrace.Core.Internals.CloudDirectoryClient.Create : dirPath:string -> MBrace.Core.CloudDirectoryInfo
val rmdir : path:string -> unit

Full name: 7-using-cloud-data-files.rmdir
member MBrace.Core.Internals.CloudDirectoryClient.Delete : dirPath:string * ?recursiveDelete:bool -> unit
val rmdirRec : path:string -> unit

Full name: 7-using-cloud-data-files.rmdirRec
val randdir : unit -> string

Full name: 7-using-cloud-data-files.randdir
member MBrace.Core.Internals.CloudPathClient.GetRandomDirectoryName : unit -> string
val randfile : unit -> string

Full name: 7-using-cloud-data-files.randfile
member MBrace.Core.Internals.CloudPathClient.GetRandomFilePath : ?container:string -> string
val rm : path:string -> unit

Full name: 7-using-cloud-data-files.rm
member MBrace.Core.Internals.CloudFileClient.Delete : path:string -> unit
val cat : path:string -> string

Full name: 7-using-cloud-data-files.cat
member MBrace.Core.Internals.CloudFileClient.ReadAllText : path:string * ?encoding:Text.Encoding -> string
val catLines : path:string -> string []

Full name: 7-using-cloud-data-files.catLines
member MBrace.Core.Internals.CloudFileClient.ReadAllLines : path:string * ?encoding:Text.Encoding -> string []
val catBytes : path:string -> byte []

Full name: 7-using-cloud-data-files.catBytes
member MBrace.Core.Internals.CloudFileClient.ReadAllBytes : path:string -> byte []
val write : path:string -> text:string -> MBrace.Core.CloudFileInfo

Full name: 7-using-cloud-data-files.write
val text : string
member MBrace.Core.Internals.CloudFileClient.WriteAllText : path:string * text:string * ?encoding:Text.Encoding -> MBrace.Core.CloudFileInfo
val writeLines : path:string -> lines:seq<string> -> MBrace.Core.CloudFileInfo

Full name: 7-using-cloud-data-files.writeLines
val lines : seq<string>
member MBrace.Core.Internals.CloudFileClient.WriteAllLines : path:string * lines:seq<string> * ?encoding:Text.Encoding -> MBrace.Core.CloudFileInfo
val writeBytes : path:string -> bytes:byte [] -> MBrace.Core.CloudFileInfo

Full name: 7-using-cloud-data-files.writeBytes
val bytes : byte []
member MBrace.Core.Internals.CloudFileClient.WriteAllBytes : path:string * buffer:byte [] -> MBrace.Core.CloudFileInfo
val i : int
val localTmpFile : string

Full name: 7-using-cloud-data-files.localTmpFile
type Path =
  static val DirectorySeparatorChar : char
  static val AltDirectorySeparatorChar : char
  static val VolumeSeparatorChar : char
  static val InvalidPathChars : char[]
  static val PathSeparator : char
  static member ChangeExtension : path:string * extension:string -> string
  static member Combine : [<ParamArray>] paths:string[] -> string + 3 overloads
  static member GetDirectoryName : path:string -> string
  static member GetExtension : path:string -> string
  static member GetFileName : path:string -> string
  ...

Full name: System.IO.Path
Path.GetTempFileName() : string
val lines : string list
val time : DateTime
Multiple items
type DateTime =
  struct
    new : ticks:int64 -> DateTime + 10 overloads
    member Add : value:TimeSpan -> DateTime
    member AddDays : value:float -> DateTime
    member AddHours : value:float -> DateTime
    member AddMilliseconds : value:float -> DateTime
    member AddMinutes : value:float -> DateTime
    member AddMonths : months:int -> DateTime
    member AddSeconds : value:float -> DateTime
    member AddTicks : value:int64 -> DateTime
    member AddYears : value:int -> DateTime
    ...
  end

Full name: System.DateTime

--------------------
DateTime()
   (+0 other overloads)
DateTime(ticks: int64) : unit
   (+0 other overloads)
DateTime(ticks: int64, kind: DateTimeKind) : unit
   (+0 other overloads)
DateTime(year: int, month: int, day: int) : unit
   (+0 other overloads)
DateTime(year: int, month: int, day: int, calendar: Globalization.Calendar) : unit
   (+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int) : unit
   (+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int, kind: DateTimeKind) : unit
   (+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int, calendar: Globalization.Calendar) : unit
   (+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int, millisecond: int) : unit
   (+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int, millisecond: int, kind: DateTimeKind) : unit
   (+0 other overloads)
property DateTime.Now: DateTime
property DateTime.Date: DateTime
DateTime.AddSeconds(value: float) : DateTime
Multiple items
val float : value:'T -> float (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.float

--------------------
type float = Double

Full name: Microsoft.FSharp.Core.float

--------------------
type float<'Measure> = float

Full name: Microsoft.FSharp.Core.float<_>
val sprintf : format:Printf.StringFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.sprintf
DateTime.ToString() : string
DateTime.ToString(provider: IFormatProvider) : string
DateTime.ToString(format: string) : string
DateTime.ToString(format: string, provider: IFormatProvider) : string
type File =
  static member AppendAllLines : path:string * contents:IEnumerable<string> -> unit + 1 overload
  static member AppendAllText : path:string * contents:string -> unit + 1 overload
  static member AppendText : path:string -> StreamWriter
  static member Copy : sourceFileName:string * destFileName:string -> unit + 1 overload
  static member Create : path:string -> FileStream + 3 overloads
  static member CreateText : path:string -> StreamWriter
  static member Decrypt : path:string -> unit
  static member Delete : path:string -> unit
  static member Encrypt : path:string -> unit
  static member Exists : path:string -> bool
  ...

Full name: System.IO.File
File.WriteAllLines(path: string, contents: Collections.Generic.IEnumerable<string>) : unit
File.WriteAllLines(path: string, contents: string []) : unit
File.WriteAllLines(path: string, contents: Collections.Generic.IEnumerable<string>, encoding: Text.Encoding) : unit
File.WriteAllLines(path: string, contents: string [], encoding: Text.Encoding) : unit
val cloudFile : MBrace.Core.CloudFileInfo

Full name: 7-using-cloud-data-files.cloudFile
member MBrace.Core.Internals.CloudFileClient.Upload : sourcePaths:seq<string> * targetDirectory:string * ?overwrite:bool * ?compress:bool -> MBrace.Core.CloudFileInfo []
member MBrace.Core.Internals.CloudFileClient.Upload : sourcePath:string * targetPath:string * ?overwrite:bool * ?compress:bool -> MBrace.Core.CloudFileInfo
Path.GetFileName(path: string) : string
File.Delete(path: string) : unit
val lines : obj

Full name: 7-using-cloud-data-files.lines
property MBrace.Core.CloudFileInfo.Path: string
module Seq

from Microsoft.FSharp.Collections
val distinct : source:seq<'T> -> seq<'T> (requires equality)

Full name: Microsoft.FSharp.Collections.Seq.distinct
val toList : source:seq<'T> -> 'T list

Full name: Microsoft.FSharp.Collections.Seq.toList
member MBrace.Runtime.MBraceClient.Run : workflow:MBrace.Core.Cloud<'T> * ?cancellationToken:MBrace.Core.ICloudCancellationToken * ?faultPolicy:MBrace.Core.FaultPolicy * ?target:MBrace.Core.IWorkerRef * ?additionalResources:MBrace.Core.Internals.ResourceRegistry * ?taskName:string -> 'T
val dataDir : string

Full name: 7-using-cloud-data-files.dataDir
val cloudFiles : obj

Full name: 7-using-cloud-data-files.cloudFiles
Multiple items
val string : value:'T -> string

Full name: Microsoft.FSharp.Core.Operators.string

--------------------
type string = String

Full name: Microsoft.FSharp.Core.string
val sumOfLengthsOfLines : int

Full name: 7-using-cloud-data-files.sumOfLengthsOfLines
Multiple items
module CloudFlow

from MBrace.Flow

--------------------
module CloudFlow

from Utils

--------------------
type CloudFlow =
  static member OfArray : source:'T [] -> CloudFlow<'T>
  static member OfCloudArrays : cloudArrays:seq<#CloudArray<'T>> -> LocalCloud<PersistedCloudFlow<'T>>
  static member OfCloudCollection : collection:ICloudCollection<'T> * ?sizeThresholdPerWorker:(unit -> int64) -> CloudFlow<'T>
  static member OfCloudDirectory : dirPath:string * serializer:ISerializer * ?sizeThresholdPerCore:int64 -> CloudFlow<'T>
  static member OfCloudDirectory : dirPath:string * ?deserializer:(Stream -> seq<'T>) * ?sizeThresholdPerCore:int64 -> CloudFlow<'T>
  static member OfCloudDirectory : dirPath:string * deserializer:(TextReader -> seq<'T>) * ?encoding:Encoding * ?sizeThresholdPerCore:int64 -> CloudFlow<'T>
  static member OfCloudDirectoryByLine : dirPath:string * ?encoding:Encoding * ?sizeThresholdPerCore:int64 -> CloudFlow<string>
  static member OfCloudFileByLine : path:string * ?encoding:Encoding -> CloudFlow<string>
  static member OfCloudFileByLine : paths:seq<string> * ?encoding:Encoding * ?sizeThresholdPerCore:int64 -> CloudFlow<string>
  static member OfCloudFiles : paths:seq<string> * serializer:ISerializer * ?sizeThresholdPerCore:int64 -> CloudFlow<'T>
  ...

Full name: MBrace.Flow.CloudFlow

--------------------
type CloudFlow<'T> =
  interface
    abstract member WithEvaluators : collectorFactory:LocalCloud<Collector<'T,'S>> -> projection:('S -> LocalCloud<'R>) -> combiner:('R [] -> LocalCloud<'R>) -> Cloud<'R>
    abstract member DegreeOfParallelism : int option
  end

Full name: MBrace.Flow.CloudFlow<_>
static member CloudFlow.OfCloudFileByLine : path:string * ?encoding:Text.Encoding -> CloudFlow<string>
static member CloudFlow.OfCloudFileByLine : paths:seq<string> * ?encoding:Text.Encoding * ?sizeThresholdPerCore:int64 -> CloudFlow<string>
val map : f:('T -> 'R) -> flow:CloudFlow<'T> -> CloudFlow<'R>

Full name: MBrace.Flow.CloudFlow.map
val line : obj
Multiple items
val int : value:'T -> int (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.int

--------------------
type int = int32

Full name: Microsoft.FSharp.Core.int

--------------------
type int<'Measure> = int

Full name: Microsoft.FSharp.Core.int<_>
val sum : flow:CloudFlow<'T> -> MBrace.Core.Cloud<'T> (requires member ( + ) and member get_Zero)

Full name: MBrace.Flow.CloudFlow.sum
Fork me on GitHub