Using Cloud Queues
This tutorial is from the MBrace Starter Kit.
In this tutorial you learn how to create and use cloud queues, which allow you to send messages between cloud workflows. The state of queues is kept in cloud storage.
First, you create a cloud queue:
1:
|
|
Next, you send to the channel by scheduling a cloud process to do the send:
1:
|
|
Next, you receive from the channel by scheduling a cloud process to do the receive:
1:
|
|
Next, you start a cloud task to send 100 messages to the queue:
1: 2: 3: 4: 5: 6: |
|
Next, you start a cloud task to wait for the 100 messages:
1: 2: 3: 4: 5: 6: 7: 8: 9: |
|
Next, you wait for the result of the receiving cloud task:
1:
|
|
Using queues as inputs to reactive data parallel cloud flows
You now learn how to use cloud queues as inputs to a data parallel cloud flow.
1:
|
|
First, you create a request queue and an output queue:
1: 2: |
|
Next, you create a data parallel cloud workflow with 4-way parallelism that reads from the request queue. The requests are integer messages indicating a number of prime nnumbers to compute. The outputs are the sum of the prime numbers.
1: 2: 3: 4: 5: |
|
This task will continue running until it is explicitly cancelled or the queues are deleted. Check on the task using the following:
1:
|
|
Next, you start a cloud task to send 100 different requests to the queue:
1: 2: 3: 4: 5: 6: 7: 8: |
|
Next, you run a cloud task to collect up to 10 results from the output queue. You can run this multiple times to collect all the results.
1: 2: 3: |
|
Summary
In this tutorial, you've learned how to use queues in cloud storage and how to use them as inputs to data parallel cloud workflows. Continue with further samples to learn more about the MBrace programming model.
Note, you can use the above techniques from both scripts and compiled projects. To see the components referenced by this script, see ThespianCluster.fsx or AzureCluster.fsx.
Full name: 8-using-cloud-queues.cluster
Full name: Config.GetCluster
Gets or creates a new Thespian cluster session.
Full name: 8-using-cloud-queues.queue
val string : value:'T -> string
Full name: Microsoft.FSharp.Core.Operators.string
--------------------
type string = String
Full name: Microsoft.FSharp.Core.string
Full name: 8-using-cloud-queues.msg
Full name: 8-using-cloud-queues.sendTask
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.sprintf
Full name: 8-using-cloud-queues.receiveTask
Full name: Microsoft.FSharp.Collections.ResizeArray<_>
Full name: 8-using-cloud-queues.requestQueue
val int : value:'T -> int (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.int
--------------------
type int = int32
Full name: Microsoft.FSharp.Core.int
--------------------
type int<'Measure> = int
Full name: Microsoft.FSharp.Core.int<_>
Full name: 8-using-cloud-queues.outputQueue
val int64 : value:'T -> int64 (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.int64
--------------------
type int64 = Int64
Full name: Microsoft.FSharp.Core.int64
--------------------
type int64<'Measure> = int64
Full name: Microsoft.FSharp.Core.int64<_>
Full name: 8-using-cloud-queues.processingFlow
module CloudFlow
from MBrace.Flow
--------------------
type CloudFlow =
static member OfArray : source:'T [] -> CloudFlow<'T>
static member OfCloudArrays : cloudArrays:seq<#CloudArray<'T>> -> LocalCloud<PersistedCloudFlow<'T>>
static member OfCloudCollection : collection:ICloudCollection<'T> * ?sizeThresholdPerWorker:(unit -> int64) -> CloudFlow<'T>
static member OfCloudDirectory : dirPath:string * serializer:ISerializer * ?sizeThresholdPerCore:int64 -> CloudFlow<'T>
static member OfCloudDirectory : dirPath:string * ?deserializer:(Stream -> seq<'T>) * ?sizeThresholdPerCore:int64 -> CloudFlow<'T>
static member OfCloudDirectory : dirPath:string * deserializer:(TextReader -> seq<'T>) * ?encoding:Encoding * ?sizeThresholdPerCore:int64 -> CloudFlow<'T>
static member OfCloudDirectoryByLine : dirPath:string * ?encoding:Encoding * ?sizeThresholdPerCore:int64 -> CloudFlow<string>
static member OfCloudFileByLine : path:string * ?encoding:Encoding -> CloudFlow<string>
static member OfCloudFileByLine : paths:seq<string> * ?encoding:Encoding * ?sizeThresholdPerCore:int64 -> CloudFlow<string>
static member OfCloudFiles : paths:seq<string> * serializer:ISerializer * ?sizeThresholdPerCore:int64 -> CloudFlow<'T>
...
Full name: MBrace.Flow.CloudFlow
--------------------
type CloudFlow<'T> =
interface
abstract member WithEvaluators : collectorFactory:LocalCloud<Collector<'T,'S>> -> projection:('S -> LocalCloud<'R>) -> combiner:('R [] -> LocalCloud<'R>) -> Cloud<'R>
abstract member DegreeOfParallelism : int option
end
Full name: MBrace.Flow.CloudFlow<_>
Full name: MBrace.Flow.CloudFlow.map
Full name: Sieve.getPrimes
member Clone : unit -> obj
member CopyTo : array:Array * index:int -> unit + 1 overload
member GetEnumerator : unit -> IEnumerator
member GetLength : dimension:int -> int
member GetLongLength : dimension:int -> int64
member GetLowerBound : dimension:int -> int
member GetUpperBound : dimension:int -> int
member GetValue : [<ParamArray>] indices:int[] -> obj + 7 overloads
member Initialize : unit -> unit
member IsFixedSize : bool
...
Full name: System.Array
Full name: Microsoft.FSharp.Collections.Array.map
Full name: Microsoft.FSharp.Collections.Array.sum
Full name: MBrace.Flow.CloudFlow.toCloudQueue
Full name: 8-using-cloud-queues.requestTask
Full name: 8-using-cloud-queues.collectedResults