Creating an Incremental Stock Analysis
This example shows how to create a stock trading simulator.
First, setup the CSV type provider to read a list of stocks in a strongly typed way:
1: 2: 3: |
|
Load the list of stocks. This is relatively small data, so we can read it locally.
1:
|
|
Next, define a type that represents stock trading data:
1: 2: 3: 4: 5: |
|
Next, you extract some essential information from the list of stocks.
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: |
|
Next, define a function to generate simulated market data at a one timestamp based on the input list of stocks and their average prices:
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: |
|
Next, define the queue which stores incoming market trading data.
We group many data packages together, write them to storage, and put them into the queue as one element. Group many packages together will reduce the number of cloud I/O operations which is restricted by quota on most fabrics. Additionally, the size of elements we can write to the queue is also restricted, so we write a cloud value, and the queue holds a reference to this cloud value.
1: 2: 3: |
|
Next, define the queue to store analysis results:
1:
|
|
Next, define a function to generate simulated market data and write it into the request queue: Wait 3 seconds between two slices.
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: |
|
Next, start the simulation task to generate market data into the request queue:
1: 2: |
|
Next, you define a function to determine if market data has a large ask or bid volume:
1: 2: 3: 4: 5: 6: 7: |
|
Next, define the task to process simulated stock trading data and generate signals when a stock with large ask or bid volume is detected.
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: 17: 18: 19: |
|
You now start the analysis task:
1:
|
|
Next, get batches of results from the result queue:
1:
|
|
Finally, cancel the running simulation tasks:
1: 2: |
|
And check that all tasks have completed on the cluster:
1: 2: |
|
Summary
In texample, you learned you to create a simulation running in the cloud. The components in the simulation take base data and write outputs to cloud queues. Continue with further samples to learn more about the MBrace programming model.
Note, you can use the above techniques from both scripts and compiled projects. To see the components referenced by this script, see ThespianCluster.fsx or AzureCluster.fsx.
namespace FSharp
--------------------
namespace Microsoft.FSharp
namespace FSharp.Data
--------------------
namespace Microsoft.FSharp.Data
Full name: 400-stock-trading-analysis-example.cluster
Full name: Config.GetCluster
Gets or creates a new Thespian cluster session.
type LiteralAttribute =
inherit Attribute
new : unit -> LiteralAttribute
Full name: Microsoft.FSharp.Core.LiteralAttribute
--------------------
new : unit -> LiteralAttribute
Full name: 400-stock-trading-analysis-example.stockDataPath
Full name: 400-stock-trading-analysis-example.Stocks
Full name: FSharp.Data.CsvProvider
<summary>Typed representation of a CSV file.</summary>
<param name='Sample'>Location of a CSV sample file or a string containing a sample CSV document.</param>
<param name='Separators'>Column delimiter(s). Defaults to `,`.</param>
<param name='InferRows'>Number of rows to use for inference. Defaults to `1000`. If this is zero, all rows are used.</param>
<param name='Schema'>Optional column types, in a comma separated list. Valid types are `int`, `int64`, `bool`, `float`, `decimal`, `date`, `guid`, `string`, `int?`, `int64?`, `bool?`, `float?`, `decimal?`, `date?`, `guid?`, `int option`, `int64 option`, `bool option`, `float option`, `decimal option`, `date option`, `guid option` and `string option`.
You can also specify a unit and the name of the column like this: `Name (type<unit>)`, or you can override only the name. If you don't want to specify all the columns, you can reference the columns by name like this: `ColumnName=type`.</param>
<param name='HasHeaders'>Whether the sample contains the names of the columns as its first line.</param>
<param name='IgnoreErrors'>Whether to ignore rows that have the wrong number of columns or which can't be parsed using the inferred or specified schema. Otherwise an exception is thrown when these rows are encountered.</param>
<param name='SkipRows'>SKips the first n rows of the CSV file.</param>
<param name='AssumeMissingValues'>When set to true, the type provider will assume all columns can have missing values, even if in the provided sample all values are present. Defaults to false.</param>
<param name='PreferOptionals'>When set to true, inference will prefer to use the option type instead of nullable types, `double.NaN` or `""` for missing values. Defaults to false.</param>
<param name='Quote'>The quotation mark (for surrounding values containing the delimiter). Defaults to `"`.</param>
<param name='MissingValues'>The set of strings recogized as missing values. Defaults to `NaN,NA,N/A,#N/A,:,-,TBA,TBD`.</param>
<param name='CacheRows'>Whether the rows should be caches so they can be iterated multiple times. Defaults to true. Disable for large datasets.</param>
<param name='Culture'>The culture used for parsing numbers and dates. Defaults to the invariant culture.</param>
<param name='Encoding'>The encoding used to read the sample. You can specify either the character set name or the codepage number. Defaults to UTF8 for files, and to ISO-8859-1 the for HTTP requests, unless `charset` is specified in the `Content-Type` response header.</param>
<param name='ResolutionFolder'>A directory that is used when resolving relative file references (at design time and in hosted execution).</param>
<param name='EmbeddedResource'>When specified, the type provider first attempts to load the sample from the specified resource
(e.g. 'MyCompany.MyAssembly, resource_name.csv'). This is useful when exposing types generated by the type provider.</param>
Full name: 400-stock-trading-analysis-example.data
Loads CSV from the specified uri
CsvProvider<...>.Load(reader: TextReader) : CsvProvider<...>
Loads CSV from the specified reader
CsvProvider<...>.Load(stream: Stream) : CsvProvider<...>
Loads CSV from the specified stream
{Symbol: string;
Price: double;
Volume: double;}
Full name: 400-stock-trading-analysis-example.StockInfo
val string : value:'T -> string
Full name: Microsoft.FSharp.Core.Operators.string
--------------------
type string = String
Full name: Microsoft.FSharp.Core.string
val double : value:'T -> double (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.double
--------------------
type double = Double
Full name: Microsoft.FSharp.Core.double
Full name: 400-stock-trading-analysis-example.stockInfo
{Symbol: string;
Price: double;
Volume: double;
Asks: double [];
Bids: double [];}
Full name: 400-stock-trading-analysis-example.MarketDataPackage
Full name: 400-stock-trading-analysis-example.SimulateMarketSlice
type Random =
new : unit -> Random + 1 overload
member Next : unit -> int + 2 overloads
member NextBytes : buffer:byte[] -> unit
member NextDouble : unit -> float
Full name: System.Random
--------------------
Random() : unit
Random(Seed: int) : unit
val float : value:'T -> float (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.float
--------------------
type float = Double
Full name: Microsoft.FSharp.Core.float
--------------------
type float<'Measure> = float
Full name: Microsoft.FSharp.Core.float<_>
Random.Next(maxValue: int) : int
Random.Next(minValue: int, maxValue: int) : int
Full name: 400-stock-trading-analysis-example.MarketDataGroup
Full name: 400-stock-trading-analysis-example.tradingDataQueue
Full name: 400-stock-trading-analysis-example.resultQueue
Full name: 400-stock-trading-analysis-example.SimulateMarket
Full name: 400-stock-trading-analysis-example.simulationTask
Full name: 400-stock-trading-analysis-example.LargeBidVolume
Full name: 400-stock-trading-analysis-example.LargeAskVolume
Full name: 400-stock-trading-analysis-example.HasLargeAskOrBid
member Clone : unit -> obj
member CopyTo : array:Array * index:int -> unit + 1 overload
member GetEnumerator : unit -> IEnumerator
member GetLength : dimension:int -> int
member GetLongLength : dimension:int -> int64
member GetLowerBound : dimension:int -> int
member GetUpperBound : dimension:int -> int
member GetValue : [<ParamArray>] indices:int[] -> obj + 7 overloads
member Initialize : unit -> unit
member IsFixedSize : bool
...
Full name: System.Array
Full name: Microsoft.FSharp.Collections.Array.filter
from Microsoft.FSharp.Collections
Full name: Microsoft.FSharp.Collections.Seq.length
Full name: 400-stock-trading-analysis-example.AnalyzeMarketData
module CloudFlow
from MBrace.Flow
--------------------
type CloudFlow =
static member OfArray : source:'T [] -> CloudFlow<'T>
static member OfCloudArrays : cloudArrays:seq<#CloudArray<'T>> -> LocalCloud<PersistedCloudFlow<'T>>
static member OfCloudCollection : collection:ICloudCollection<'T> * ?sizeThresholdPerWorker:(unit -> int64) -> CloudFlow<'T>
static member OfCloudDirectory : dirPath:string * serializer:ISerializer * ?sizeThresholdPerCore:int64 -> CloudFlow<'T>
static member OfCloudDirectory : dirPath:string * ?deserializer:(Stream -> seq<'T>) * ?sizeThresholdPerCore:int64 -> CloudFlow<'T>
static member OfCloudDirectory : dirPath:string * deserializer:(TextReader -> seq<'T>) * ?encoding:Encoding * ?sizeThresholdPerCore:int64 -> CloudFlow<'T>
static member OfCloudDirectoryByLine : dirPath:string * ?encoding:Encoding * ?sizeThresholdPerCore:int64 -> CloudFlow<string>
static member OfCloudFileByLine : path:string * ?encoding:Encoding -> CloudFlow<string>
static member OfCloudFileByLine : paths:seq<string> * ?encoding:Encoding * ?sizeThresholdPerCore:int64 -> CloudFlow<string>
static member OfCloudFiles : paths:seq<string> * serializer:ISerializer * ?sizeThresholdPerCore:int64 -> CloudFlow<'T>
...
Full name: MBrace.Flow.CloudFlow
--------------------
type CloudFlow<'T> =
interface
abstract member WithEvaluators : collectorFactory:LocalCloud<Collector<'T,'S>> -> projection:('S -> LocalCloud<'R>) -> combiner:('R [] -> LocalCloud<'R>) -> Cloud<'R>
abstract member DegreeOfParallelism : int option
end
Full name: MBrace.Flow.CloudFlow<_>
Full name: MBrace.Flow.CloudFlow.collect
Full name: MBrace.Flow.CloudFlow.filter
Full name: MBrace.Flow.CloudFlow.toArray
Full name: 400-stock-trading-analysis-example.analysisTask