MBrace.Core and MBrace.Azure


Creating an Incremental Stock Analysis

This example shows how to create a stock trading simulator.

First, setup the CSV type provider to read a list of stocks in a strongly typed way:

1: 
2: 
3: 
[<Literal>]
let stockDataPath = __SOURCE_DIRECTORY__ + "/../../data/stock-data.csv"
type Stocks = CsvProvider<stockDataPath>

Load the list of stocks. This is relatively small data, so we can read it locally.

1: 
let data = Stocks.Load(stockDataPath)

Next, define a type that represents stock trading data:

1: 
2: 
3: 
4: 
5: 
type StockInfo = {
    Symbol: string
    Price: double
    Volume: double
}

Next, you extract some essential information from the list of stocks.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
let stockInfo = 
    [| for row in data.Rows do
        yield { Symbol=row.Symbol; Price=double row.Price; Volume=double row.Volume; }
    |] 

// Record for a single data package from the trading API.
type MarketDataPackage = {    
    Symbol : string
    Price : double
    Volume: double
    Asks: double[]
    Bids: double[]
}

Next, define a function to generate simulated market data at a one timestamp based on the input list of stocks and their average prices:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
let SimulateMarketSlice (stockInfo : StockInfo[]) =
    let r = Random()
    [| for s in stockInfo -> 
         { Symbol = s.Symbol
           Price = s.Price *  (1.0 + r.NextDouble()/5.0 - 0.1)
           Volume = s.Volume
           Asks = [| float (r.Next(500, 1500)) |]
           Bids = [| float (r.Next(500, 1500)) |] 
         } 
    |]

Next, define the queue which stores incoming market trading data.

We group many data packages together, write them to storage, and put them into the queue as one element. Group many packages together will reduce the number of cloud I/O operations which is restricted by quota on most fabrics. Additionally, the size of elements we can write to the queue is also restricted, so we write a cloud value, and the queue holds a reference to this cloud value.

1: 
2: 
3: 
type MarketDataGroup = CloudValue<MarketDataPackage[]>

let tradingDataQueue = CloudQueue.New<MarketDataGroup>() |> cluster.Run

Next, define the queue to store analysis results:

1: 
let resultQueue = CloudQueue.New<MarketDataGroup>() |> cluster.Run

Next, define a function to generate simulated market data and write it into the request queue: Wait 3 seconds between two slices.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
let SimulateMarket stockInfo =
    cloud {                
        while true do
        //for i in 1..10 do
            let md = SimulateMarketSlice stockInfo
            let! mdc = CloudValue.New md
            tradingDataQueue.Enqueue mdc
            do! Cloud.Sleep 3000
    } 
    

Next, start the simulation task to generate market data into the request queue:

1: 
2: 
let simulationTask = SimulateMarket stockInfo |> cluster.CreateProcess   
 

Next, you define a function to determine if market data has a large ask or bid volume:

1: 
2: 
3: 
4: 
5: 
6: 
7: 
let LargeBidVolume = 1000.0
let LargeAskVolume = 1000.0

let HasLargeAskOrBid(md : MarketDataPackage) = 
    let largeAsk = md.Asks |> Array.filter(fun v -> v > LargeBidVolume) |> Seq.length
    let largeBid = md.Bids |> Array.filter(fun v -> v > LargeAskVolume) |> Seq.length
    largeAsk + largeBid > 0

Next, define the task to process simulated stock trading data and generate signals when a stock with large ask or bid volume is detected.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
let AnalyzeMarketData = 
    cloud {
        while true do
            let dataGroup = tradingDataQueue.Dequeue()
            let dataGroups = [| dataGroup |]                    
            if dataGroups.Length > 0 then
                // The task is simple now, just get the market data which has large asks or bids.
                let! stocksWithLargeAskOrBid = 
                    dataGroups
                    |> CloudFlow.OfArray
                    |> CloudFlow.collect(fun p -> p.Value)
                    |> CloudFlow.filter(fun md ->  HasLargeAskOrBid(md))
                    |> CloudFlow.toArray

                let! analysisResult = CloudValue.New stocksWithLargeAskOrBid
                resultQueue.Enqueue analysisResult
            else
                do! Cloud.Sleep(1000)
    }

You now start the analysis task:

1: 
let analysisTask = AnalyzeMarketData |> cluster.CreateProcess

Next, get batches of results from the result queue:

1: 
resultQueue.DequeueBatch(10)

Finally, cancel the running simulation tasks:

1: 
2: 
simulationTask.Cancel()
analysisTask.Cancel()

And check that all tasks have completed on the cluster:

1: 
2: 
cluster.ShowProcesses()
cluster.ShowWorkers()

Summary

In texample, you learned you to create a simulation running in the cloud. The components in the simulation take base data and write outputs to cloud queues. Continue with further samples to learn more about the MBrace programming model.

Note, you can use the above techniques from both scripts and compiled projects. To see the components referenced by this script, see ThespianCluster.fsx or AzureCluster.fsx.

namespace System
namespace System.IO
namespace MBrace
namespace MBrace.Core
namespace MBrace.Flow
Multiple items
namespace FSharp

--------------------
namespace Microsoft.FSharp
Multiple items
namespace FSharp.Data

--------------------
namespace Microsoft.FSharp.Data
val cluster : MBrace.Thespian.ThespianCluster

Full name: 400-stock-trading-analysis-example.cluster
module Config
val GetCluster : unit -> MBrace.Thespian.ThespianCluster

Full name: Config.GetCluster


 Gets or creates a new Thespian cluster session.
Multiple items
type LiteralAttribute =
  inherit Attribute
  new : unit -> LiteralAttribute

Full name: Microsoft.FSharp.Core.LiteralAttribute

--------------------
new : unit -> LiteralAttribute
val stockDataPath : string

Full name: 400-stock-trading-analysis-example.stockDataPath
type Stocks = CsvProvider<...>

Full name: 400-stock-trading-analysis-example.Stocks
type CsvProvider

Full name: FSharp.Data.CsvProvider


<summary>Typed representation of a CSV file.</summary>
       <param name='Sample'>Location of a CSV sample file or a string containing a sample CSV document.</param>
       <param name='Separators'>Column delimiter(s). Defaults to `,`.</param>
       <param name='InferRows'>Number of rows to use for inference. Defaults to `1000`. If this is zero, all rows are used.</param>
       <param name='Schema'>Optional column types, in a comma separated list. Valid types are `int`, `int64`, `bool`, `float`, `decimal`, `date`, `guid`, `string`, `int?`, `int64?`, `bool?`, `float?`, `decimal?`, `date?`, `guid?`, `int option`, `int64 option`, `bool option`, `float option`, `decimal option`, `date option`, `guid option` and `string option`.
       You can also specify a unit and the name of the column like this: `Name (type&lt;unit&gt;)`, or you can override only the name. If you don't want to specify all the columns, you can reference the columns by name like this: `ColumnName=type`.</param>
       <param name='HasHeaders'>Whether the sample contains the names of the columns as its first line.</param>
       <param name='IgnoreErrors'>Whether to ignore rows that have the wrong number of columns or which can't be parsed using the inferred or specified schema. Otherwise an exception is thrown when these rows are encountered.</param>
       <param name='SkipRows'>SKips the first n rows of the CSV file.</param>
       <param name='AssumeMissingValues'>When set to true, the type provider will assume all columns can have missing values, even if in the provided sample all values are present. Defaults to false.</param>
       <param name='PreferOptionals'>When set to true, inference will prefer to use the option type instead of nullable types, `double.NaN` or `""` for missing values. Defaults to false.</param>
       <param name='Quote'>The quotation mark (for surrounding values containing the delimiter). Defaults to `"`.</param>
       <param name='MissingValues'>The set of strings recogized as missing values. Defaults to `NaN,NA,N/A,#N/A,:,-,TBA,TBD`.</param>
       <param name='CacheRows'>Whether the rows should be caches so they can be iterated multiple times. Defaults to true. Disable for large datasets.</param>
       <param name='Culture'>The culture used for parsing numbers and dates. Defaults to the invariant culture.</param>
       <param name='Encoding'>The encoding used to read the sample. You can specify either the character set name or the codepage number. Defaults to UTF8 for files, and to ISO-8859-1 the for HTTP requests, unless `charset` is specified in the `Content-Type` response header.</param>
       <param name='ResolutionFolder'>A directory that is used when resolving relative file references (at design time and in hosted execution).</param>
       <param name='EmbeddedResource'>When specified, the type provider first attempts to load the sample from the specified resource
          (e.g. 'MyCompany.MyAssembly, resource_name.csv'). This is useful when exposing types generated by the type provider.</param>
val data : CsvProvider<...>

Full name: 400-stock-trading-analysis-example.data
CsvProvider<...>.Load(uri: string) : CsvProvider<...>


Loads CSV from the specified uri

CsvProvider<...>.Load(reader: TextReader) : CsvProvider<...>


Loads CSV from the specified reader

CsvProvider<...>.Load(stream: Stream) : CsvProvider<...>


Loads CSV from the specified stream
type StockInfo =
  {Symbol: string;
   Price: double;
   Volume: double;}

Full name: 400-stock-trading-analysis-example.StockInfo
StockInfo.Symbol: string
Multiple items
val string : value:'T -> string

Full name: Microsoft.FSharp.Core.Operators.string

--------------------
type string = String

Full name: Microsoft.FSharp.Core.string
StockInfo.Price: double
Multiple items
val double : value:'T -> double (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.double

--------------------
type double = Double

Full name: Microsoft.FSharp.Core.double
StockInfo.Volume: double
val stockInfo : StockInfo []

Full name: 400-stock-trading-analysis-example.stockInfo
val row : CsvProvider<...>.Row
property Runtime.CsvFile.Rows: seq<CsvProvider<...>.Row>
property CsvProvider<...>.Row.Symbol: string
property CsvProvider<...>.Row.Price: decimal
property CsvProvider<...>.Row.Volume: int
type MarketDataPackage =
  {Symbol: string;
   Price: double;
   Volume: double;
   Asks: double [];
   Bids: double [];}

Full name: 400-stock-trading-analysis-example.MarketDataPackage
MarketDataPackage.Symbol: string
MarketDataPackage.Price: double
MarketDataPackage.Volume: double
MarketDataPackage.Asks: double []
MarketDataPackage.Bids: double []
val SimulateMarketSlice : stockInfo:StockInfo [] -> MarketDataPackage []

Full name: 400-stock-trading-analysis-example.SimulateMarketSlice
val stockInfo : StockInfo []
val r : Random
Multiple items
type Random =
  new : unit -> Random + 1 overload
  member Next : unit -> int + 2 overloads
  member NextBytes : buffer:byte[] -> unit
  member NextDouble : unit -> float

Full name: System.Random

--------------------
Random() : unit
Random(Seed: int) : unit
val s : StockInfo
Random.NextDouble() : float
Multiple items
val float : value:'T -> float (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.float

--------------------
type float = Double

Full name: Microsoft.FSharp.Core.float

--------------------
type float<'Measure> = float

Full name: Microsoft.FSharp.Core.float<_>
Random.Next() : int
Random.Next(maxValue: int) : int
Random.Next(minValue: int, maxValue: int) : int
type MarketDataGroup = obj

Full name: 400-stock-trading-analysis-example.MarketDataGroup
val tradingDataQueue : obj

Full name: 400-stock-trading-analysis-example.tradingDataQueue
member MBrace.Runtime.MBraceClient.Run : workflow:MBrace.Core.Cloud<'T> * ?cancellationToken:MBrace.Core.ICloudCancellationToken * ?faultPolicy:MBrace.Core.FaultPolicy * ?target:MBrace.Core.IWorkerRef * ?additionalResources:MBrace.Core.Internals.ResourceRegistry * ?taskName:string -> 'T
val resultQueue : obj

Full name: 400-stock-trading-analysis-example.resultQueue
val SimulateMarket : stockInfo:'a -> 'b

Full name: 400-stock-trading-analysis-example.SimulateMarket
val stockInfo : 'a
val simulationTask : MBrace.Runtime.CloudProcess<obj>

Full name: 400-stock-trading-analysis-example.simulationTask
member MBrace.Runtime.MBraceClient.CreateProcess : workflow:MBrace.Core.Cloud<'T> * ?cancellationToken:MBrace.Core.ICloudCancellationToken * ?faultPolicy:MBrace.Core.FaultPolicy * ?target:MBrace.Core.IWorkerRef * ?additionalResources:MBrace.Core.Internals.ResourceRegistry * ?taskName:string -> MBrace.Runtime.CloudProcess<'T>
val LargeBidVolume : float

Full name: 400-stock-trading-analysis-example.LargeBidVolume
val LargeAskVolume : float

Full name: 400-stock-trading-analysis-example.LargeAskVolume
val HasLargeAskOrBid : md:MarketDataPackage -> bool

Full name: 400-stock-trading-analysis-example.HasLargeAskOrBid
val md : MarketDataPackage
val largeAsk : int
type Array =
  member Clone : unit -> obj
  member CopyTo : array:Array * index:int -> unit + 1 overload
  member GetEnumerator : unit -> IEnumerator
  member GetLength : dimension:int -> int
  member GetLongLength : dimension:int -> int64
  member GetLowerBound : dimension:int -> int
  member GetUpperBound : dimension:int -> int
  member GetValue : [<ParamArray>] indices:int[] -> obj + 7 overloads
  member Initialize : unit -> unit
  member IsFixedSize : bool
  ...

Full name: System.Array
val filter : predicate:('T -> bool) -> array:'T [] -> 'T []

Full name: Microsoft.FSharp.Collections.Array.filter
val v : double
module Seq

from Microsoft.FSharp.Collections
val length : source:seq<'T> -> int

Full name: Microsoft.FSharp.Collections.Seq.length
val largeBid : int
val AnalyzeMarketData : MBrace.Core.Cloud<obj>

Full name: 400-stock-trading-analysis-example.AnalyzeMarketData
Multiple items
module CloudFlow

from MBrace.Flow

--------------------
type CloudFlow =
  static member OfArray : source:'T [] -> CloudFlow<'T>
  static member OfCloudArrays : cloudArrays:seq<#CloudArray<'T>> -> LocalCloud<PersistedCloudFlow<'T>>
  static member OfCloudCollection : collection:ICloudCollection<'T> * ?sizeThresholdPerWorker:(unit -> int64) -> CloudFlow<'T>
  static member OfCloudDirectory : dirPath:string * serializer:ISerializer * ?sizeThresholdPerCore:int64 -> CloudFlow<'T>
  static member OfCloudDirectory : dirPath:string * ?deserializer:(Stream -> seq<'T>) * ?sizeThresholdPerCore:int64 -> CloudFlow<'T>
  static member OfCloudDirectory : dirPath:string * deserializer:(TextReader -> seq<'T>) * ?encoding:Encoding * ?sizeThresholdPerCore:int64 -> CloudFlow<'T>
  static member OfCloudDirectoryByLine : dirPath:string * ?encoding:Encoding * ?sizeThresholdPerCore:int64 -> CloudFlow<string>
  static member OfCloudFileByLine : path:string * ?encoding:Encoding -> CloudFlow<string>
  static member OfCloudFileByLine : paths:seq<string> * ?encoding:Encoding * ?sizeThresholdPerCore:int64 -> CloudFlow<string>
  static member OfCloudFiles : paths:seq<string> * serializer:ISerializer * ?sizeThresholdPerCore:int64 -> CloudFlow<'T>
  ...

Full name: MBrace.Flow.CloudFlow

--------------------
type CloudFlow<'T> =
  interface
    abstract member WithEvaluators : collectorFactory:LocalCloud<Collector<'T,'S>> -> projection:('S -> LocalCloud<'R>) -> combiner:('R [] -> LocalCloud<'R>) -> Cloud<'R>
    abstract member DegreeOfParallelism : int option
  end

Full name: MBrace.Flow.CloudFlow<_>
static member CloudFlow.OfArray : source:'T [] -> CloudFlow<'T>
val collect : f:('T -> #seq<'R>) -> flow:CloudFlow<'T> -> CloudFlow<'R>

Full name: MBrace.Flow.CloudFlow.collect
val filter : predicate:('T -> bool) -> flow:CloudFlow<'T> -> CloudFlow<'T>

Full name: MBrace.Flow.CloudFlow.filter
val toArray : flow:CloudFlow<'T> -> MBrace.Core.Cloud<'T []>

Full name: MBrace.Flow.CloudFlow.toArray
val analysisTask : MBrace.Runtime.CloudProcess<obj>

Full name: 400-stock-trading-analysis-example.analysisTask
override MBrace.Runtime.CloudProcess.Cancel : unit -> unit
member MBrace.Runtime.MBraceClient.ShowProcesses : unit -> unit
member MBrace.Runtime.MBraceClient.ShowWorkers : unit -> unit
Fork me on GitHub