MBrace.Core and MBrace.Azure


Using Cloud Queues

This tutorial is from the MBrace Starter Kit.

In this tutorial you learn how to create and use cloud queues, which allow you to send messages between cloud workflows. The state of queues is kept in cloud storage.

First, you create a cloud queue:

1: 
let queue = CloudQueue.New<string>() |> cluster.Run

Next, you send to the channel by scheduling a cloud process to do the send:

1: 
cloud { queue.Enqueue "hello" } |> cluster.Run

Next, you receive from the channel by scheduling a cloud process to do the receive:

1: 
let msg = cloud { return queue.Dequeue() } |> cluster.Run

Next, you start a cloud task to send 100 messages to the queue:

1: 
2: 
3: 
4: 
5: 
6: 
let sendTask = 
    cloud { for i in [ 0 .. 100 ] do 
                queue.Enqueue (sprintf "hello%d" i) }
     |> cluster.CreateProcess

sendTask.ShowInfo() 

Next, you start a cloud task to wait for the 100 messages:

1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 
9: 
let receiveTask = 
    cloud { let results = new ResizeArray<_>()
            for i in [ 0 .. 100 ] do 
               let msg = queue.Dequeue()
               results.Add msg
            return results.ToArray() }
     |> cluster.CreateProcess

receiveTask.ShowInfo() 

Next, you wait for the result of the receiving cloud task:

1: 
receiveTask.Result

Using queues as inputs to reactive data parallel cloud flows

You now learn how to use cloud queues as inputs to a data parallel cloud flow.

1: 
#load "lib/sieve.fsx"

First, you create a request queue and an output queue:

1: 
2: 
let requestQueue = CloudQueue.New<int>() |> cluster.Run
let outputQueue = CloudQueue.New<int64>() |> cluster.Run

Next, you create a data parallel cloud workflow with 4-way parallelism that reads from the request queue. The requests are integer messages indicating a number of prime nnumbers to compute. The outputs are the sum of the prime numbers.

1: 
2: 
3: 
4: 
5: 
let processingFlow = 
    CloudFlow.OfCloudQueue(requestQueue, 4)
    |> CloudFlow.map (fun msg -> Sieve.getPrimes msg |> Array.map int64 |> Array.sum)
    |> CloudFlow.toCloudQueue outputQueue
    |> cluster.CreateProcess

This task will continue running until it is explicitly cancelled or the queues are deleted. Check on the task using the following:

1: 
processingFlow.ShowInfo() 

Next, you start a cloud task to send 100 different requests to the queue:

1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 
let requestTask = 
    cloud { for i in [ 1 .. 100 ] do 
                do requestQueue.Enqueue (i * 100000 % 787853) }
     |> cluster.CreateProcess

requestTask.ShowInfo() 

cluster.ShowProcesses()

Next, you run a cloud task to collect up to 10 results from the output queue. You can run this multiple times to collect all the results.

1: 
2: 
3: 
let collectedResults = 
    cloud { return outputQueue.DequeueBatch 10 }
     |> cluster.Run

Summary

In this tutorial, you've learned how to use queues in cloud storage and how to use them as inputs to data parallel cloud workflows. Continue with further samples to learn more about the MBrace programming model.

Note, you can use the above techniques from both scripts and compiled projects. To see the components referenced by this script, see ThespianCluster.fsx or AzureCluster.fsx.

namespace System
namespace System.IO
namespace MBrace
namespace MBrace.Core
namespace MBrace.Flow
val cluster : MBrace.Thespian.ThespianCluster

Full name: 8-using-cloud-queues.cluster
module Config
val GetCluster : unit -> MBrace.Thespian.ThespianCluster

Full name: Config.GetCluster


 Gets or creates a new Thespian cluster session.
val queue : obj

Full name: 8-using-cloud-queues.queue
Multiple items
val string : value:'T -> string

Full name: Microsoft.FSharp.Core.Operators.string

--------------------
type string = String

Full name: Microsoft.FSharp.Core.string
member MBrace.Runtime.MBraceClient.Run : workflow:MBrace.Core.Cloud<'T> * ?cancellationToken:MBrace.Core.ICloudCancellationToken * ?faultPolicy:MBrace.Core.FaultPolicy * ?target:MBrace.Core.IWorkerRef * ?additionalResources:MBrace.Core.Internals.ResourceRegistry * ?taskName:string -> 'T
val msg : obj

Full name: 8-using-cloud-queues.msg
val sendTask : MBrace.Runtime.CloudProcess<obj>

Full name: 8-using-cloud-queues.sendTask
val sprintf : format:Printf.StringFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.sprintf
member MBrace.Runtime.MBraceClient.CreateProcess : workflow:MBrace.Core.Cloud<'T> * ?cancellationToken:MBrace.Core.ICloudCancellationToken * ?faultPolicy:MBrace.Core.FaultPolicy * ?target:MBrace.Core.IWorkerRef * ?additionalResources:MBrace.Core.Internals.ResourceRegistry * ?taskName:string -> MBrace.Runtime.CloudProcess<'T>
member MBrace.Runtime.CloudProcess.ShowInfo : unit -> unit
val receiveTask : MBrace.Runtime.CloudProcess<obj>

Full name: 8-using-cloud-queues.receiveTask
type ResizeArray<'T> = Collections.Generic.List<'T>

Full name: Microsoft.FSharp.Collections.ResizeArray<_>
property MBrace.Runtime.CloudProcess.Result: obj
val requestQueue : MBrace.Core.CloudQueue<int>

Full name: 8-using-cloud-queues.requestQueue
Multiple items
val int : value:'T -> int (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.int

--------------------
type int = int32

Full name: Microsoft.FSharp.Core.int

--------------------
type int<'Measure> = int

Full name: Microsoft.FSharp.Core.int<_>
val outputQueue : MBrace.Core.CloudQueue<int64>

Full name: 8-using-cloud-queues.outputQueue
Multiple items
val int64 : value:'T -> int64 (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.int64

--------------------
type int64 = Int64

Full name: Microsoft.FSharp.Core.int64

--------------------
type int64<'Measure> = int64

Full name: Microsoft.FSharp.Core.int64<_>
val processingFlow : MBrace.Runtime.CloudProcess<unit>

Full name: 8-using-cloud-queues.processingFlow
Multiple items
module CloudFlow

from MBrace.Flow

--------------------
type CloudFlow =
  static member OfArray : source:'T [] -> CloudFlow<'T>
  static member OfCloudArrays : cloudArrays:seq<#CloudArray<'T>> -> LocalCloud<PersistedCloudFlow<'T>>
  static member OfCloudCollection : collection:ICloudCollection<'T> * ?sizeThresholdPerWorker:(unit -> int64) -> CloudFlow<'T>
  static member OfCloudDirectory : dirPath:string * serializer:ISerializer * ?sizeThresholdPerCore:int64 -> CloudFlow<'T>
  static member OfCloudDirectory : dirPath:string * ?deserializer:(Stream -> seq<'T>) * ?sizeThresholdPerCore:int64 -> CloudFlow<'T>
  static member OfCloudDirectory : dirPath:string * deserializer:(TextReader -> seq<'T>) * ?encoding:Encoding * ?sizeThresholdPerCore:int64 -> CloudFlow<'T>
  static member OfCloudDirectoryByLine : dirPath:string * ?encoding:Encoding * ?sizeThresholdPerCore:int64 -> CloudFlow<string>
  static member OfCloudFileByLine : path:string * ?encoding:Encoding -> CloudFlow<string>
  static member OfCloudFileByLine : paths:seq<string> * ?encoding:Encoding * ?sizeThresholdPerCore:int64 -> CloudFlow<string>
  static member OfCloudFiles : paths:seq<string> * serializer:ISerializer * ?sizeThresholdPerCore:int64 -> CloudFlow<'T>
  ...

Full name: MBrace.Flow.CloudFlow

--------------------
type CloudFlow<'T> =
  interface
    abstract member WithEvaluators : collectorFactory:LocalCloud<Collector<'T,'S>> -> projection:('S -> LocalCloud<'R>) -> combiner:('R [] -> LocalCloud<'R>) -> Cloud<'R>
    abstract member DegreeOfParallelism : int option
  end

Full name: MBrace.Flow.CloudFlow<_>
static member CloudFlow.OfCloudQueue : queue:MBrace.Core.CloudQueue<'T> * degreeOfParallelism:int -> CloudFlow<'T>
val map : f:('T -> 'R) -> flow:CloudFlow<'T> -> CloudFlow<'R>

Full name: MBrace.Flow.CloudFlow.map
val msg : int
module Sieve
val getPrimes : nmax:int -> int []

Full name: Sieve.getPrimes
type Array =
  member Clone : unit -> obj
  member CopyTo : array:Array * index:int -> unit + 1 overload
  member GetEnumerator : unit -> IEnumerator
  member GetLength : dimension:int -> int
  member GetLongLength : dimension:int -> int64
  member GetLowerBound : dimension:int -> int
  member GetUpperBound : dimension:int -> int
  member GetValue : [<ParamArray>] indices:int[] -> obj + 7 overloads
  member Initialize : unit -> unit
  member IsFixedSize : bool
  ...

Full name: System.Array
val map : mapping:('T -> 'U) -> array:'T [] -> 'U []

Full name: Microsoft.FSharp.Collections.Array.map
val sum : array:'T [] -> 'T (requires member ( + ) and member get_Zero)

Full name: Microsoft.FSharp.Collections.Array.sum
val toCloudQueue : queue:MBrace.Core.CloudQueue<'T> -> flow:CloudFlow<'T> -> MBrace.Core.Cloud<unit>

Full name: MBrace.Flow.CloudFlow.toCloudQueue
val requestTask : MBrace.Runtime.CloudProcess<obj>

Full name: 8-using-cloud-queues.requestTask
member MBrace.Runtime.MBraceClient.ShowProcesses : unit -> unit
val collectedResults : obj

Full name: 8-using-cloud-queues.collectedResults
Fork me on GitHub