MBrace.Core and MBrace.Azure


Example: Running python code using MBrace

This tutorial is from the MBrace Starter Kit.

In this tutorial, you can deploy and execute python code across an MBrace cluster. This is achieved by defining a workflow that performs on-demand, per-worker installation of python bits.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
// Uri to python installation archive; modify as appropriate
// Alternatively this can be changed to a blob uri and could accomodate any type of software
let pythonBits = "https://www.python.org/ftp/python/3.5.0/python-3.5.0-embed-amd64.zip"

/// workflow that downloads and installs python to the local computer
let installPython () = local {
    let tmp = Path.GetTempPath()
    let! worker = Cloud.CurrentWorker
    let localDir = Path.Combine(tmp, sprintf "%s-p%d" (Path.GetFileNameWithoutExtension pythonBits) worker.ProcessId)
    if not <| Directory.Exists localDir then
        let localArchive = Path.Combine(tmp, sprintf "%s-p%d" (Path.GetFileName pythonBits) worker.ProcessId)
        do! Cloud.Logf "Downloading python..."
        use wc = new System.Net.WebClient()
        wc.DownloadFile(Uri pythonBits, localArchive)
        do! Cloud.Logf "Extracting installation..."
        use fs = File.OpenRead localArchive
        use za = new ZipArchive(fs)
        za.ExtractToDirectory(localDir)
        do! Cloud.Logf "Installation complete."

    let pythonExe = Path.Combine(localDir, "python.exe")
    if not <| File.Exists pythonExe then
        return failwith "Could not locate python.exe in the local installation."

    return pythonExe
}

We now wrap the installation workflow in a DomainLocal type. This creates a serializable entity that will initialize the workflow exactly once in every AppDomain it is being executed. Compare this to the ThreadLocal class available in mscorlib.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
37: 
38: 
39: 
40: 
41: 
42: 
43: 
44: 
45: 
46: 
47: 
48: 
/// AppDomain-bound lazy python installer
let pythonInstaller = DomainLocal.Create(installPython())

/// Record containing results of a single python computation
type PythonResult =
    {
        StartTime : DateTimeOffset
        ExecutionTime : TimeSpan
        Stdout : string []
        Stderr : string []
        ExitCode : int
    }

/// Runs provided code in python and optional stdit inputs
/// returning the standard output as string
let runPythonScript (pythonCode : string) (stdin : string []) = local {
    // lazily install the python installation in the current machine
    // and retrieve the local executable
    let! pythonExe = pythonInstaller.Value
    // write python code to tmp file
    let pythonFile = Path.GetTempFileName()
    File.WriteAllText(pythonFile, pythonCode)
    // Launch the Python interpreter with provided arguments
    let prcInfo = ProcessStartInfo(pythonExe, 
                                    pythonFile, 
                                    UseShellExecute=false, 
                                    RedirectStandardInput=true, 
                                    RedirectStandardOutput=true,
                                    RedirectStandardError=true)

    let prc = new Process(StartInfo=prcInfo)
    let timer = new Stopwatch()
    let startTime = DateTimeOffset.Now
    timer.Start()
    prc.Start() |> ignore
    if stdin.Length > 0 then prc.StandardInput.Write(String.concat Environment.NewLine stdin)
    prc.StandardInput.Close()
    prc.WaitForExit()
    timer.Stop()
    let split (output:string) = output.Split([|Environment.NewLine|], StringSplitOptions.None)
    return {
        StartTime = startTime
        ExecutionTime = timer.Elapsed
        Stdout = prc.StandardOutput.ReadToEnd() |> split
        Stderr = prc.StandardError.ReadToEnd() |> split
        ExitCode = prc.ExitCode
    }
}

We can now test this set up by running python code in the cloud. Let's begin with a simple hello world example:

1: 
2: 
3: 
let cluster = Config.GetCluster()

runPythonScript """print("Hello, World!") """ [||] |> cluster.Run

Let's try passing an input through stdin

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
let greet (name : string) = cloud {
    let code = """
from sys import stdin
name = stdin.readline()
print ("Hello, " + name + "!")
""" 

    return! runPythonScript code [|name|]
}

greet "F#" |> cluster.Run

Let's now try a distributed workflow. Our goal is to use python to fetch the hostnames of every individual worker in the cluster:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
let getHostnamePython () = cloud {
    let code = """
import socket
print (socket.gethostname())
"""

    let! result = runPythonScript code [||]
    return result.Stdout.[0]
}

Cloud.ParallelEverywhere(getHostnamePython()) |> cluster.Run

In this tutorial, you've learned how to distribute python code using clean-slate MBrace clusters. Further features, such as timeouts, cancellation and asynchronous execution can be easily implemented using the MBrace primitives and are left as an exercise to the reader.

Continue with further samples to learn more about the MBrace programming model.

Note, you can use the above techniques from both scripts and compiled projects. To see the components referenced by this script, see ThespianCluster.fsx or AzureCluster.fsx.

namespace System
namespace System.IO
namespace System.IO.Compression
namespace System.Net
namespace System.Diagnostics
namespace MBrace
namespace MBrace.Core
namespace MBrace.Library
namespace MBrace.Flow
val pythonBits : string

Full name: 200-launching-python-example.pythonBits
val installPython : unit -> 'a

Full name: 200-launching-python-example.installPython


 workflow that downloads and installs python to the local computer
type Path =
  static val DirectorySeparatorChar : char
  static val AltDirectorySeparatorChar : char
  static val VolumeSeparatorChar : char
  static val InvalidPathChars : char[]
  static val PathSeparator : char
  static member ChangeExtension : path:string * extension:string -> string
  static member Combine : [<ParamArray>] paths:string[] -> string + 3 overloads
  static member GetDirectoryName : path:string -> string
  static member GetExtension : path:string -> string
  static member GetFileName : path:string -> string
  ...

Full name: System.IO.Path
Path.GetTempPath() : string
module Cloud

from MBrace.Library
Path.Combine([<ParamArray>] paths: string []) : string
Path.Combine(path1: string, path2: string) : string
Path.Combine(path1: string, path2: string, path3: string) : string
Path.Combine(path1: string, path2: string, path3: string, path4: string) : string
val sprintf : format:Printf.StringFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.sprintf
Path.GetFileNameWithoutExtension(path: string) : string
val not : value:bool -> bool

Full name: Microsoft.FSharp.Core.Operators.not
type Directory =
  static member CreateDirectory : path:string -> DirectoryInfo + 1 overload
  static member Delete : path:string -> unit + 1 overload
  static member EnumerateDirectories : path:string -> IEnumerable<string> + 2 overloads
  static member EnumerateFileSystemEntries : path:string -> IEnumerable<string> + 2 overloads
  static member EnumerateFiles : path:string -> IEnumerable<string> + 2 overloads
  static member Exists : path:string -> bool
  static member GetAccessControl : path:string -> DirectorySecurity + 1 overload
  static member GetCreationTime : path:string -> DateTime
  static member GetCreationTimeUtc : path:string -> DateTime
  static member GetCurrentDirectory : unit -> string
  ...

Full name: System.IO.Directory
Directory.Exists(path: string) : bool
Path.GetFileName(path: string) : string
Multiple items
type WebClient =
  inherit Component
  new : unit -> WebClient
  member BaseAddress : string with get, set
  member CachePolicy : RequestCachePolicy with get, set
  member CancelAsync : unit -> unit
  member Credentials : ICredentials with get, set
  member DownloadData : address:string -> byte[] + 1 overload
  member DownloadDataAsync : address:Uri -> unit + 1 overload
  member DownloadFile : address:string * fileName:string -> unit + 1 overload
  member DownloadFileAsync : address:Uri * fileName:string -> unit + 1 overload
  member DownloadString : address:string -> string + 1 overload
  ...

Full name: System.Net.WebClient

--------------------
WebClient() : unit
Multiple items
type Uri =
  new : uriString:string -> Uri + 5 overloads
  member AbsolutePath : string
  member AbsoluteUri : string
  member Authority : string
  member DnsSafeHost : string
  member Equals : comparand:obj -> bool
  member Fragment : string
  member GetComponents : components:UriComponents * format:UriFormat -> string
  member GetHashCode : unit -> int
  member GetLeftPart : part:UriPartial -> string
  ...

Full name: System.Uri

--------------------
Uri(uriString: string) : unit
Uri(uriString: string, uriKind: UriKind) : unit
Uri(baseUri: Uri, relativeUri: string) : unit
Uri(baseUri: Uri, relativeUri: Uri) : unit
type File =
  static member AppendAllLines : path:string * contents:IEnumerable<string> -> unit + 1 overload
  static member AppendAllText : path:string * contents:string -> unit + 1 overload
  static member AppendText : path:string -> StreamWriter
  static member Copy : sourceFileName:string * destFileName:string -> unit + 1 overload
  static member Create : path:string -> FileStream + 3 overloads
  static member CreateText : path:string -> StreamWriter
  static member Decrypt : path:string -> unit
  static member Delete : path:string -> unit
  static member Encrypt : path:string -> unit
  static member Exists : path:string -> bool
  ...

Full name: System.IO.File
File.OpenRead(path: string) : FileStream
Multiple items
type ZipArchive =
  new : stream:Stream -> ZipArchive + 3 overloads
  member CreateEntry : entryName:string -> ZipArchiveEntry + 1 overload
  member Dispose : unit -> unit
  member Entries : ReadOnlyCollection<ZipArchiveEntry>
  member GetEntry : entryName:string -> ZipArchiveEntry
  member Mode : ZipArchiveMode

Full name: System.IO.Compression.ZipArchive

--------------------
ZipArchive(stream: Stream) : unit
ZipArchive(stream: Stream, mode: ZipArchiveMode) : unit
ZipArchive(stream: Stream, mode: ZipArchiveMode, leaveOpen: bool) : unit
ZipArchive(stream: Stream, mode: ZipArchiveMode, leaveOpen: bool, entryNameEncoding: Text.Encoding) : unit
File.Exists(path: string) : bool
val failwith : message:string -> 'T

Full name: Microsoft.FSharp.Core.Operators.failwith
val pythonInstaller : obj

Full name: 200-launching-python-example.pythonInstaller


 AppDomain-bound lazy python installer
Multiple items
type DomainLocal =
  static member Create : factory:LocalCloud<'T> -> DomainLocalMBrace<'T>
  static member Create : factory:(unit -> 'T) -> DomainLocal<'T>

Full name: MBrace.Library.DomainLocal

--------------------
type DomainLocal<'T> =
  private new : factory:(unit -> 'T) -> DomainLocal<'T>
  member IsValueCreated : bool
  member Value : 'T

Full name: MBrace.Library.DomainLocal<_>
static member DomainLocal.Create : factory:MBrace.Core.LocalCloud<'T> -> DomainLocalMBrace<'T>
static member DomainLocal.Create : factory:(unit -> 'T) -> DomainLocal<'T>
type PythonResult =
  {StartTime: DateTimeOffset;
   ExecutionTime: TimeSpan;
   Stdout: string [];
   Stderr: string [];
   ExitCode: int;}

Full name: 200-launching-python-example.PythonResult


 Record containing results of a single python computation
PythonResult.StartTime: DateTimeOffset
Multiple items
type DateTimeOffset =
  struct
    new : dateTime:DateTime -> DateTimeOffset + 5 overloads
    member Add : timeSpan:TimeSpan -> DateTimeOffset
    member AddDays : days:float -> DateTimeOffset
    member AddHours : hours:float -> DateTimeOffset
    member AddMilliseconds : milliseconds:float -> DateTimeOffset
    member AddMinutes : minutes:float -> DateTimeOffset
    member AddMonths : months:int -> DateTimeOffset
    member AddSeconds : seconds:float -> DateTimeOffset
    member AddTicks : ticks:int64 -> DateTimeOffset
    member AddYears : years:int -> DateTimeOffset
    ...
  end

Full name: System.DateTimeOffset

--------------------
DateTimeOffset()
DateTimeOffset(dateTime: DateTime) : unit
DateTimeOffset(ticks: int64, offset: TimeSpan) : unit
DateTimeOffset(dateTime: DateTime, offset: TimeSpan) : unit
DateTimeOffset(year: int, month: int, day: int, hour: int, minute: int, second: int, offset: TimeSpan) : unit
DateTimeOffset(year: int, month: int, day: int, hour: int, minute: int, second: int, millisecond: int, offset: TimeSpan) : unit
DateTimeOffset(year: int, month: int, day: int, hour: int, minute: int, second: int, millisecond: int, calendar: Globalization.Calendar, offset: TimeSpan) : unit
PythonResult.ExecutionTime: TimeSpan
Multiple items
type TimeSpan =
  struct
    new : ticks:int64 -> TimeSpan + 3 overloads
    member Add : ts:TimeSpan -> TimeSpan
    member CompareTo : value:obj -> int + 1 overload
    member Days : int
    member Duration : unit -> TimeSpan
    member Equals : value:obj -> bool + 1 overload
    member GetHashCode : unit -> int
    member Hours : int
    member Milliseconds : int
    member Minutes : int
    ...
  end

Full name: System.TimeSpan

--------------------
TimeSpan()
TimeSpan(ticks: int64) : unit
TimeSpan(hours: int, minutes: int, seconds: int) : unit
TimeSpan(days: int, hours: int, minutes: int, seconds: int) : unit
TimeSpan(days: int, hours: int, minutes: int, seconds: int, milliseconds: int) : unit
PythonResult.Stdout: string []
Multiple items
val string : value:'T -> string

Full name: Microsoft.FSharp.Core.Operators.string

--------------------
type string = String

Full name: Microsoft.FSharp.Core.string
PythonResult.Stderr: string []
PythonResult.ExitCode: int
Multiple items
val int : value:'T -> int (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.int

--------------------
type int = int32

Full name: Microsoft.FSharp.Core.int

--------------------
type int<'Measure> = int

Full name: Microsoft.FSharp.Core.int<_>
val runPythonScript : pythonCode:string -> stdin:string [] -> 'a

Full name: 200-launching-python-example.runPythonScript


 Runs provided code in python and optional stdit inputs
 returning the standard output as string
val pythonCode : string
val stdin : string []
Path.GetTempFileName() : string
File.WriteAllText(path: string, contents: string) : unit
File.WriteAllText(path: string, contents: string, encoding: Text.Encoding) : unit
Multiple items
type ProcessStartInfo =
  new : unit -> ProcessStartInfo + 2 overloads
  member Arguments : string with get, set
  member CreateNoWindow : bool with get, set
  member Domain : string with get, set
  member EnvironmentVariables : StringDictionary
  member ErrorDialog : bool with get, set
  member ErrorDialogParentHandle : nativeint with get, set
  member FileName : string with get, set
  member LoadUserProfile : bool with get, set
  member Password : SecureString with get, set
  ...

Full name: System.Diagnostics.ProcessStartInfo

--------------------
ProcessStartInfo() : unit
ProcessStartInfo(fileName: string) : unit
ProcessStartInfo(fileName: string, arguments: string) : unit
Multiple items
type Process =
  inherit Component
  new : unit -> Process
  member BasePriority : int
  member BeginErrorReadLine : unit -> unit
  member BeginOutputReadLine : unit -> unit
  member CancelErrorRead : unit -> unit
  member CancelOutputRead : unit -> unit
  member Close : unit -> unit
  member CloseMainWindow : unit -> bool
  member EnableRaisingEvents : bool with get, set
  member ExitCode : int
  ...

Full name: System.Diagnostics.Process

--------------------
Process() : unit
Multiple items
type Stopwatch =
  new : unit -> Stopwatch
  member Elapsed : TimeSpan
  member ElapsedMilliseconds : int64
  member ElapsedTicks : int64
  member IsRunning : bool
  member Reset : unit -> unit
  member Restart : unit -> unit
  member Start : unit -> unit
  member Stop : unit -> unit
  static val Frequency : int64
  ...

Full name: System.Diagnostics.Stopwatch

--------------------
Stopwatch() : unit
property DateTimeOffset.Now: DateTimeOffset
val ignore : value:'T -> unit

Full name: Microsoft.FSharp.Core.Operators.ignore
property Array.Length: int
Multiple items
type String =
  new : value:char -> string + 7 overloads
  member Chars : int -> char
  member Clone : unit -> obj
  member CompareTo : value:obj -> int + 1 overload
  member Contains : value:string -> bool
  member CopyTo : sourceIndex:int * destination:char[] * destinationIndex:int * count:int -> unit
  member EndsWith : value:string -> bool + 2 overloads
  member Equals : obj:obj -> bool + 2 overloads
  member GetEnumerator : unit -> CharEnumerator
  member GetHashCode : unit -> int
  ...

Full name: System.String

--------------------
String(value: nativeptr<char>) : unit
String(value: nativeptr<sbyte>) : unit
String(value: char []) : unit
String(c: char, count: int) : unit
String(value: nativeptr<char>, startIndex: int, length: int) : unit
String(value: nativeptr<sbyte>, startIndex: int, length: int) : unit
String(value: char [], startIndex: int, length: int) : unit
String(value: nativeptr<sbyte>, startIndex: int, length: int, enc: Text.Encoding) : unit
val concat : sep:string -> strings:seq<string> -> string

Full name: Microsoft.FSharp.Core.String.concat
type Environment =
  static member CommandLine : string
  static member CurrentDirectory : string with get, set
  static member Exit : exitCode:int -> unit
  static member ExitCode : int with get, set
  static member ExpandEnvironmentVariables : name:string -> string
  static member FailFast : message:string -> unit + 1 overload
  static member GetCommandLineArgs : unit -> string[]
  static member GetEnvironmentVariable : variable:string -> string + 1 overload
  static member GetEnvironmentVariables : unit -> IDictionary + 1 overload
  static member GetFolderPath : folder:SpecialFolder -> string + 1 overload
  ...
  nested type SpecialFolder
  nested type SpecialFolderOption

Full name: System.Environment
property Environment.NewLine: string
type StringSplitOptions =
  | None = 0
  | RemoveEmptyEntries = 1

Full name: System.StringSplitOptions
field StringSplitOptions.None = 0
val cluster : MBrace.Thespian.ThespianCluster

Full name: 200-launching-python-example.cluster
module Config
val GetCluster : unit -> MBrace.Thespian.ThespianCluster

Full name: Config.GetCluster


 Gets or creates a new Thespian cluster session.
member MBrace.Runtime.MBraceClient.Run : workflow:MBrace.Core.Cloud<'T> * ?cancellationToken:MBrace.Core.ICloudCancellationToken * ?faultPolicy:MBrace.Core.FaultPolicy * ?target:MBrace.Core.IWorkerRef * ?additionalResources:MBrace.Core.Internals.ResourceRegistry * ?taskName:string -> 'T
val greet : name:string -> 'a

Full name: 200-launching-python-example.greet
val name : string
val getHostnamePython : unit -> 'a

Full name: 200-launching-python-example.getHostnamePython
Fork me on GitHub