From ed9967766c2a4706fd94977686d6af3b59711785 Mon Sep 17 00:00:00 2001 From: Konstantin Ivaschenko Date: Wed, 10 Jul 2024 14:13:42 +0300 Subject: [PATCH] [MDAPI-67] [.NET] Integrate IndexedEventTxModel --- .editorconfig | 4 +- StyleCop.xml | 1 + dxfeed-graal-net-api.sln | 14 + dxfeed-graal-net-api.sln.DotSettings | 1 + samples/CandleChartSample/App.axaml | 10 + samples/CandleChartSample/App.axaml.cs | 23 + .../CandleChartSample.csproj | 26 + samples/CandleChartSample/CandleExtension.cs | 44 ++ samples/CandleChartSample/CandleList.cs | 111 +++ samples/CandleChartSample/MainWindow.axaml | 52 ++ samples/CandleChartSample/MainWindow.axaml.cs | 135 ++++ samples/CandleChartSample/Program.cs | 21 + samples/CandleChartSample/app.manifest | 18 + samples/MarketDepthModelSample/App.axaml | 12 + samples/MarketDepthModelSample/App.axaml.cs | 23 + .../MarketDepthModelSample/MainWindow.axaml | 96 +++ .../MainWindow.axaml.cs | 134 ++++ .../MarketDepthModelSample.csproj | 35 + samples/MarketDepthModelSample/Orders.cs | 56 ++ samples/MarketDepthModelSample/Program.cs | 22 + .../RangedObservableCollection.cs | 31 + samples/MarketDepthModelSample/app.manifest | 18 + src/DxFeed.Graal.Net/Api/DXEndpoint.cs | 8 + .../Models/AbstractTxModel.cs | 282 ++++++++ src/DxFeed.Graal.Net/Models/IndexedTxModel.cs | 184 +++++ .../Models/MarketDepthListener.cs | 22 + .../Models/MarketDepthModel.cs | 643 ++++++++++++++++++ .../Models/TimeSeriesTxModel.cs | 142 ++++ .../Models/TxEventProcessor.cs | 192 ++++++ .../Models/TxModelListener.cs | 33 + .../Native/Endpoint/DXEndpointHandle.cs | 14 + .../Native/Endpoint/DXEndpointWrapper.cs | 8 + .../Native/Executors/InPlaceExecutor.cs | 43 ++ .../Models/AbstractTxModelTest.cs | 436 ++++++++++++ .../Models/IndexedTxModelTest.cs | 172 +++++ .../Models/MarketDepthModelTest.cs | 434 ++++++++++++ 36 files changed, 3499 insertions(+), 1 deletion(-) create mode 100644 samples/CandleChartSample/App.axaml create mode 100644 samples/CandleChartSample/App.axaml.cs create mode 100644 samples/CandleChartSample/CandleChartSample.csproj create mode 100644 samples/CandleChartSample/CandleExtension.cs create mode 100644 samples/CandleChartSample/CandleList.cs create mode 100644 samples/CandleChartSample/MainWindow.axaml create mode 100644 samples/CandleChartSample/MainWindow.axaml.cs create mode 100644 samples/CandleChartSample/Program.cs create mode 100644 samples/CandleChartSample/app.manifest create mode 100644 samples/MarketDepthModelSample/App.axaml create mode 100644 samples/MarketDepthModelSample/App.axaml.cs create mode 100644 samples/MarketDepthModelSample/MainWindow.axaml create mode 100644 samples/MarketDepthModelSample/MainWindow.axaml.cs create mode 100644 samples/MarketDepthModelSample/MarketDepthModelSample.csproj create mode 100644 samples/MarketDepthModelSample/Orders.cs create mode 100644 samples/MarketDepthModelSample/Program.cs create mode 100644 samples/MarketDepthModelSample/RangedObservableCollection.cs create mode 100644 samples/MarketDepthModelSample/app.manifest create mode 100644 src/DxFeed.Graal.Net/Models/AbstractTxModel.cs create mode 100644 src/DxFeed.Graal.Net/Models/IndexedTxModel.cs create mode 100644 src/DxFeed.Graal.Net/Models/MarketDepthListener.cs create mode 100644 src/DxFeed.Graal.Net/Models/MarketDepthModel.cs create mode 100644 src/DxFeed.Graal.Net/Models/TimeSeriesTxModel.cs create mode 100644 src/DxFeed.Graal.Net/Models/TxEventProcessor.cs create mode 100644 src/DxFeed.Graal.Net/Models/TxModelListener.cs create mode 100644 src/DxFeed.Graal.Net/Native/Executors/InPlaceExecutor.cs create mode 100644 tests/DxFeed.Graal.Net.Tests/Models/AbstractTxModelTest.cs create mode 100644 tests/DxFeed.Graal.Net.Tests/Models/IndexedTxModelTest.cs create mode 100644 tests/DxFeed.Graal.Net.Tests/Models/MarketDepthModelTest.cs diff --git a/.editorconfig b/.editorconfig index 9263be76..c519751b 100644 --- a/.editorconfig +++ b/.editorconfig @@ -201,6 +201,8 @@ dotnet_diagnostic.CA1861.severity = none dotnet_diagnostic.CA1846.severity = none # CA1845 : Use span-based 'string.Concat' and 'AsSpan' instead of 'Substring' dotnet_diagnostic.CA1845.severity = none +dotnet_diagnostic.cs0693.severity=none +dotnet_diagnostic.CA1000.severity=none ########################################## # Formatting Rules # https://docs.microsoft.com/dotnet/fundamentals/code-analysis/style-rules/formatting-rules @@ -361,7 +363,7 @@ dotnet_naming_symbols.stylecop_fields_must_be_private_group.applicable_accessibi dotnet_naming_symbols.stylecop_fields_must_be_private_group.applicable_kinds = field dotnet_naming_rule.stylecop_instance_fields_must_be_private_rule.symbols = stylecop_fields_must_be_private_group dotnet_naming_rule.stylecop_instance_fields_must_be_private_rule.style = disallowed_style -dotnet_naming_rule.stylecop_instance_fields_must_be_private_rule.severity = error +dotnet_naming_rule.stylecop_instance_fields_must_be_private_rule.severity = none # Private fields must be camelCase # https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1306.md diff --git a/StyleCop.xml b/StyleCop.xml index cd0b0130..354d234b 100644 --- a/StyleCop.xml +++ b/StyleCop.xml @@ -11,5 +11,6 @@ + diff --git a/dxfeed-graal-net-api.sln b/dxfeed-graal-net-api.sln index 81acbc49..e9c85a53 100644 --- a/dxfeed-graal-net-api.sln +++ b/dxfeed-graal-net-api.sln @@ -38,6 +38,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SimpleAuthSample", "samples EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CandleDataResponseReader", "samples\CandleDataResponseReader\CandleDataResponseReader.csproj", "{2567935E-FEFB-470A-BF17-7A883735C4BF}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MarketDepthModelSample", "samples\MarketDepthModelSample\MarketDepthModelSample.csproj", "{930B1039-B76C-42C5-AD0F-9FA1A1FC9D84}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CandleChartSample", "samples\CandleChartSample\CandleChartSample.csproj", "{B74E8A86-1AB7-4B36-AED3-292CDD95BF90}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -112,6 +116,14 @@ Global {2567935E-FEFB-470A-BF17-7A883735C4BF}.Debug|Any CPU.Build.0 = Debug|Any CPU {2567935E-FEFB-470A-BF17-7A883735C4BF}.Release|Any CPU.ActiveCfg = Release|Any CPU {2567935E-FEFB-470A-BF17-7A883735C4BF}.Release|Any CPU.Build.0 = Release|Any CPU + {930B1039-B76C-42C5-AD0F-9FA1A1FC9D84}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {930B1039-B76C-42C5-AD0F-9FA1A1FC9D84}.Debug|Any CPU.Build.0 = Debug|Any CPU + {930B1039-B76C-42C5-AD0F-9FA1A1FC9D84}.Release|Any CPU.ActiveCfg = Release|Any CPU + {930B1039-B76C-42C5-AD0F-9FA1A1FC9D84}.Release|Any CPU.Build.0 = Release|Any CPU + {B74E8A86-1AB7-4B36-AED3-292CDD95BF90}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {B74E8A86-1AB7-4B36-AED3-292CDD95BF90}.Debug|Any CPU.Build.0 = Debug|Any CPU + {B74E8A86-1AB7-4B36-AED3-292CDD95BF90}.Release|Any CPU.ActiveCfg = Release|Any CPU + {B74E8A86-1AB7-4B36-AED3-292CDD95BF90}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(NestedProjects) = preSolution {73597E04-D8A8-4991-A759-7F886CBE2A8F} = {C4490D74-2970-4A1B-8178-A724A06B140A} @@ -129,5 +141,7 @@ Global {C457D53F-A033-465C-B250-A6CC60D02F98} = {C4490D74-2970-4A1B-8178-A724A06B140A} {B9088D14-10F6-4D88-876D-062B9F6494AB} = {C4490D74-2970-4A1B-8178-A724A06B140A} {2567935E-FEFB-470A-BF17-7A883735C4BF} = {C4490D74-2970-4A1B-8178-A724A06B140A} + {930B1039-B76C-42C5-AD0F-9FA1A1FC9D84} = {C4490D74-2970-4A1B-8178-A724A06B140A} + {B74E8A86-1AB7-4B36-AED3-292CDD95BF90} = {C4490D74-2970-4A1B-8178-A724A06B140A} EndGlobalSection EndGlobal diff --git a/dxfeed-graal-net-api.sln.DotSettings b/dxfeed-graal-net-api.sln.DotSettings index 9b7649e6..f003618c 100644 --- a/dxfeed-graal-net-api.sln.DotSettings +++ b/dxfeed-graal-net-api.sln.DotSettings @@ -30,6 +30,7 @@ True True True + True True True True diff --git a/samples/CandleChartSample/App.axaml b/samples/CandleChartSample/App.axaml new file mode 100644 index 00000000..34486f97 --- /dev/null +++ b/samples/CandleChartSample/App.axaml @@ -0,0 +1,10 @@ + + + + + + + diff --git a/samples/CandleChartSample/App.axaml.cs b/samples/CandleChartSample/App.axaml.cs new file mode 100644 index 00000000..ed8ad143 --- /dev/null +++ b/samples/CandleChartSample/App.axaml.cs @@ -0,0 +1,23 @@ +using Avalonia; +using Avalonia.Controls.ApplicationLifetimes; +using Avalonia.Markup.Xaml; + +namespace CandleChartSample; + +public partial class App : Application +{ + public override void Initialize() + { + AvaloniaXamlLoader.Load(this); + } + + public override void OnFrameworkInitializationCompleted() + { + if (ApplicationLifetime is IClassicDesktopStyleApplicationLifetime desktop) + { + desktop.MainWindow = new MainWindow(); + } + + base.OnFrameworkInitializationCompleted(); + } +} diff --git a/samples/CandleChartSample/CandleChartSample.csproj b/samples/CandleChartSample/CandleChartSample.csproj new file mode 100644 index 00000000..b4991457 --- /dev/null +++ b/samples/CandleChartSample/CandleChartSample.csproj @@ -0,0 +1,26 @@ + + + + WinExe + net6.0 + enable + true + app.manifest + true + + + + + + + + + + + + + + + + + diff --git a/samples/CandleChartSample/CandleExtension.cs b/samples/CandleChartSample/CandleExtension.cs new file mode 100644 index 00000000..a9e46e1f --- /dev/null +++ b/samples/CandleChartSample/CandleExtension.cs @@ -0,0 +1,44 @@ +// +// Copyright © 2024 Devexperts LLC. All rights reserved. +// This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/. +// + +using System; +using DxFeed.Graal.Net.Events.Candles; +using ScottPlot; + +namespace CandleChartSample; + +public static class CandleExtension +{ + public static OHLC ToOHLC(this Candle candle) + { + var open = GetValueWithPriority(candle.Open, candle.High, candle.Low, candle.Close); + var high = GetValueWithPriority(candle.High, candle.Open, candle.Low, candle.Close); + var low = GetValueWithPriority(candle.Low, candle.Close, candle.Open, candle.High); + var close = GetValueWithPriority(candle.Close, candle.Low, candle.Open, candle.High); + + return new OHLC + { + Open = open, + High = high, + Low = low, + Close = close, + DateTime = DateTimeOffset.FromUnixTimeMilliseconds(candle.Time).DateTime.ToLocalTime(), + TimeSpan = TimeSpan.FromMilliseconds(candle.CandleSymbol!.Period!.PeriodIntervalMillis) + }; + } + + private static double GetValueWithPriority(params double[] values) + { + foreach (var value in values) + { + if (!double.IsNaN(value)) + { + return value; + } + } + return double.NaN; + } +} diff --git a/samples/CandleChartSample/CandleList.cs b/samples/CandleChartSample/CandleList.cs new file mode 100644 index 00000000..e742782d --- /dev/null +++ b/samples/CandleChartSample/CandleList.cs @@ -0,0 +1,111 @@ +// +// Copyright © 2024 Devexperts LLC. All rights reserved. +// This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/. +// + +using System; +using System.Collections.Generic; +using System.Linq; +using DxFeed.Graal.Net.Events; +using DxFeed.Graal.Net.Events.Candles; +using ScottPlot; + +namespace CandleChartSample; + +public class CandleList : List +{ + public void Update(IEnumerable candles, bool isSnapshot) + { + var sortedCandles = candles.OrderBy(c => c.Index); + + if (isSnapshot) + { + UpdateSnapshot(sortedCandles); + } + else + { + UpdateIncremental(sortedCandles); + } + } + + private void UpdateSnapshot(IEnumerable candles) + { + Clear(); + foreach (var candle in candles) + { + if (!ShouldRemove(candle)) + { + Add(candle.ToOHLC()); + } + } + } + + private void UpdateIncremental(IEnumerable candles) + { + foreach (var candle in candles) + { + if (ShouldRemove(candle)) + { + RemoveCandle(candle); + continue; + } + + var ohlc = candle.ToOHLC(); + var lastOhlc = LastOrDefault(); + switch (DateTime.Compare(ohlc.DateTime, lastOhlc.DateTime)) + { + case < 0: + InsertOrUpdate(ohlc); + break; + case 0: + AddOrUpdateLast(ohlc); + break; + case > 0: + Add(ohlc); + break; + } + } + } + + private void AddOrUpdateLast(OHLC ohlc) + { + if (IsEmpty()) + { + Add(ohlc); + } + else + { + this[Count - 1] = ohlc; + } + } + + private void InsertOrUpdate(OHLC ohlc) + { + var index = FindIndex(o => DateTime.Compare(ohlc.DateTime, o.DateTime) <= 0); + if (index >= 0 && this[index].DateTime.Equals(ohlc.DateTime)) + { + this[index] = ohlc; + } + else + { + Insert(index >= 0 ? index : 0, ohlc); + } + } + + private void RemoveCandle(Candle candle) => + RemoveAll(ohlc => candle.ToOHLC().DateTime.Equals(ohlc.DateTime)); + + private bool IsEmpty() => + Count == 0; + + private OHLC LastOrDefault() => + IsEmpty() ? new OHLC() : this[Count - 1]; + + private static bool ShouldRemove(Candle candle) => + EventFlags.IsRemove(candle) || + (double.IsNaN(candle.Open) && + double.IsNaN(candle.High) && + double.IsNaN(candle.Low) && + double.IsNaN(candle.Close)); +} diff --git a/samples/CandleChartSample/MainWindow.axaml b/samples/CandleChartSample/MainWindow.axaml new file mode 100644 index 00000000..dd4a7832 --- /dev/null +++ b/samples/CandleChartSample/MainWindow.axaml @@ -0,0 +1,52 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/samples/CandleChartSample/MainWindow.axaml.cs b/samples/CandleChartSample/MainWindow.axaml.cs new file mode 100644 index 00000000..25010772 --- /dev/null +++ b/samples/CandleChartSample/MainWindow.axaml.cs @@ -0,0 +1,135 @@ +using System; +using System.Collections.Generic; +using System.Globalization; +using Avalonia.Controls; +using Avalonia.Input; +using Avalonia.Interactivity; +using Avalonia.Threading; +using DxFeed.Graal.Net; +using DxFeed.Graal.Net.Api; +using DxFeed.Graal.Net.Events; +using DxFeed.Graal.Net.Events.Candles; +using DxFeed.Graal.Net.Models; +using DxFeed.Graal.Net.Utils; + +namespace CandleChartSample; + +public partial class MainWindow : Window +{ + private readonly CandleList _candles = new(); + private readonly TimeSeriesTxModel.Builder _modelBuilder = new(); + private TimeSeriesTxModel? _model; + private string _symbol = string.Empty; + + public MainWindow() + { + SystemProperty.SetProperty("dxfeed.address", "demo.dxfeed.com:7300"); + InitializeComponent(); + InitializePlot(); + InitializeModelBuilder(); + HandleTextChanged(SymbolTextBox); + } + + private void InitializePlot() + { + AvaPlot.Plot.Axes.DateTimeTicksBottom(); + var candlePlot = AvaPlot.Plot.Add.Candlestick(_candles); + candlePlot.Axes.YAxis = AvaPlot.Plot.Axes.Right; + candlePlot.Axes.YAxis.Label.Text = "Price"; + FromTimeTextBox.Text = DateTimeOffset.Now.AddMonths(-6).ToString("yyyyMMdd", CultureInfo.InvariantCulture); + } + + private void InitializeModelBuilder() => + _modelBuilder + .WithFeed(DXFeed.GetInstance()) + .WithSnapshotProcessing(true) + .WithListener(OnCandleEventsReceived); + + protected override void OnClosed(EventArgs e) => + _model?.Dispose(); + + private void OnCandleEventsReceived(IndexedEventSource source, IEnumerable events, bool isSnapshot) + { + _candles.Update(events, isSnapshot); + Dispatcher.UIThread.Invoke(() => + { + if (isSnapshot) + { + AvaPlot.Plot.Axes.AutoScale(); + } + + AvaPlot.Refresh(); + }); + } + + private void OnKeyDown(object? sender, KeyEventArgs e) + { + if (e.Key == Key.Enter) + { + HandleTextChanged(sender as TextBox); + } + } + + private void OnLostFocus(object? sender, RoutedEventArgs e) => + HandleTextChanged(sender as TextBox); + + private void HandleTextChanged(TextBox? textBox) + { + if (textBox == null) + { + return; + } + + switch (textBox.Name) + { + case nameof(SymbolTextBox): + OnSymbolTextChanged(); + break; + case nameof(FromTimeTextBox): + OnFromTimeTextChanged(); + break; + } + } + + private void OnSymbolTextChanged() + { + if (_symbol.Equals(GetSymbol(), StringComparison.Ordinal)) + { + return; + } + + _symbol = GetSymbol(); + _model?.Dispose(); + _model = _modelBuilder + .WithSymbol(CandleSymbol.ValueOf(_symbol)) + .WithFromTime(GetFromTime()) + .Build(); + } + + private void OnFromTimeTextChanged() + { + if (_model == null) + { + OnSymbolTextChanged(); + } + + _model?.SetFromTime(GetFromTime()); + } + + private string GetSymbol() => + SymbolTextBox.Text ?? string.Empty; + + private long GetFromTime() + { + try + { + return string.IsNullOrWhiteSpace(FromTimeTextBox.Text) + ? 0 + : TimeFormat.Default.Parse(FromTimeTextBox.Text).ToUnixTimeMilliseconds(); + } + catch (Exception) + { + return 0; + } + } +} diff --git a/samples/CandleChartSample/Program.cs b/samples/CandleChartSample/Program.cs new file mode 100644 index 00000000..8b85caf8 --- /dev/null +++ b/samples/CandleChartSample/Program.cs @@ -0,0 +1,21 @@ +using Avalonia; +using System; + +namespace CandleChartSample; + +internal abstract class Program +{ + // Initialization code. Don't use any Avalonia, third-party APIs or any + // SynchronizationContext-reliant code before AppMain is called: things aren't initialized + // yet and stuff might break. + [STAThread] + public static void Main(string[] args) => BuildAvaloniaApp() + .StartWithClassicDesktopLifetime(args); + + // Avalonia configuration, don't remove; also used by visual designer. + private static AppBuilder BuildAvaloniaApp() + => AppBuilder.Configure() + .UsePlatformDetect() + .WithInterFont() + .LogToTrace(); +} diff --git a/samples/CandleChartSample/app.manifest b/samples/CandleChartSample/app.manifest new file mode 100644 index 00000000..403194d8 --- /dev/null +++ b/samples/CandleChartSample/app.manifest @@ -0,0 +1,18 @@ + + + + + + + + + + + + + + diff --git a/samples/MarketDepthModelSample/App.axaml b/samples/MarketDepthModelSample/App.axaml new file mode 100644 index 00000000..81228b0e --- /dev/null +++ b/samples/MarketDepthModelSample/App.axaml @@ -0,0 +1,12 @@ + + + + + + + + + diff --git a/samples/MarketDepthModelSample/App.axaml.cs b/samples/MarketDepthModelSample/App.axaml.cs new file mode 100644 index 00000000..bbdbfa79 --- /dev/null +++ b/samples/MarketDepthModelSample/App.axaml.cs @@ -0,0 +1,23 @@ +using Avalonia; +using Avalonia.Controls.ApplicationLifetimes; +using Avalonia.Markup.Xaml; + +namespace MarketDepthModelSample; + +public partial class App : Application +{ + public override void Initialize() + { + AvaloniaXamlLoader.Load(this); + } + + public override void OnFrameworkInitializationCompleted() + { + if (ApplicationLifetime is IClassicDesktopStyleApplicationLifetime desktop) + { + desktop.MainWindow = new MainWindow(); + } + + base.OnFrameworkInitializationCompleted(); + } +} diff --git a/samples/MarketDepthModelSample/MainWindow.axaml b/samples/MarketDepthModelSample/MainWindow.axaml new file mode 100644 index 00000000..78d6c930 --- /dev/null +++ b/samples/MarketDepthModelSample/MainWindow.axaml @@ -0,0 +1,96 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/samples/MarketDepthModelSample/MainWindow.axaml.cs b/samples/MarketDepthModelSample/MainWindow.axaml.cs new file mode 100644 index 00000000..96332750 --- /dev/null +++ b/samples/MarketDepthModelSample/MainWindow.axaml.cs @@ -0,0 +1,134 @@ +using System; +using System.Collections.Generic; +using System.Globalization; +using System.Linq; +using Avalonia.Controls; +using Avalonia.Input; +using Avalonia.Interactivity; +using Avalonia.Threading; +using DxFeed.Graal.Net; +using DxFeed.Graal.Net.Api; +using DxFeed.Graal.Net.Events.Market; +using DxFeed.Graal.Net.Models; + +namespace MarketDepthModelSample; + +public partial class MainWindow : Window +{ + private readonly Orders orders = new(); + private readonly MarketDepthModel.Builder builder; + private string symbol = string.Empty; + private string sources = string.Empty; + private MarketDepthModel? model; + + public MainWindow() + { + SystemProperty.SetProperty("dxfeed.address", "demo.dxfeed.com:7300"); + + InitializeComponent(); + BuyTable.Source = orders.BuyOrders; + SellTable.Source = orders.SellOrders; + + builder = MarketDepthModel.NewBuilder() + .WithFeed(DXFeed.GetInstance()) + .WithListener((buy, sell) => + { + Dispatcher.UIThread.Post(() => + { + orders.UpdateBuy(buy); + orders.UpdateSell(sell); + }, DispatcherPriority.Normal); + }); + + HandleTextChanged(SymbolTextBox); + } + + protected override void OnClosed(EventArgs e) => + model?.Dispose(); + + private void OnKeyDown(object sender, KeyEventArgs e) + { + if (e.Key == Key.Enter) + { + HandleTextChanged(sender as TextBox); + } + } + + private void OnLostFocus(object sender, RoutedEventArgs e) => + HandleTextChanged(sender as TextBox); + + private void HandleTextChanged(TextBox? textBox) + { + if (textBox == null) + { + return; + } + + switch (textBox.Name) + { + case nameof(SymbolTextBox): + case nameof(SourcesTextBox): + OnSymbolTextChanged(); + break; + case nameof(DepthLimitTextBox): + OnDepthLimitTextChanged(); + break; + case nameof(PeriodTextBox): + OnAggregationPeriodTextChanged(); + break; + } + } + + private void OnSymbolTextChanged() + { + if (symbol.Equals(SymbolTextBox.Text, StringComparison.Ordinal) && + sources.Equals(SourcesTextBox.Text, StringComparison.Ordinal)) + { + return; + } + + symbol = SymbolTextBox.Text ?? string.Empty; + sources = SourcesTextBox.Text ?? string.Empty; + + model?.Dispose(); + model = builder + .WithSymbol(GetSymbol()) + .WithSources(GetSources()) + .WithDepthLimit(GetDepthLimit()) + .WithAggregationPeriod(GetAggregationPeriod()) + .Build(); + } + + private void OnDepthLimitTextChanged() => + model?.SetDepthLimit(GetDepthLimit()); + + private void OnAggregationPeriodTextChanged() => + model?.SetAggregationPeriod(GetAggregationPeriod()); + + private string GetSymbol() => + SymbolTextBox.Text ?? string.Empty; + + private List GetSources() + { + var sourceList = new List(); + if (!string.IsNullOrWhiteSpace(SourcesTextBox.Text)) + { + foreach (var source in SourcesTextBox.Text.Split(",")) + { + sourceList.Add(OrderSource.ValueOf(source)); + } + } + + return sourceList; + } + + private int GetDepthLimit() => + string.IsNullOrWhiteSpace(DepthLimitTextBox.Text) + ? 0 + : int.Parse(DepthLimitTextBox.Text, CultureInfo.InvariantCulture); + + private int GetAggregationPeriod() => + string.IsNullOrWhiteSpace(PeriodTextBox.Text) + ? 0 + : int.Parse(PeriodTextBox.Text, CultureInfo.InvariantCulture); +} diff --git a/samples/MarketDepthModelSample/MarketDepthModelSample.csproj b/samples/MarketDepthModelSample/MarketDepthModelSample.csproj new file mode 100644 index 00000000..09cddfef --- /dev/null +++ b/samples/MarketDepthModelSample/MarketDepthModelSample.csproj @@ -0,0 +1,35 @@ + + + + Exe + net6.0 + enable + true + app.manifest + true + + + + ../../artifacts/Debug/Samples/ + + + + ../../artifacts/Release/Samples/ + + + + + + + + + + + + + + + + + + diff --git a/samples/MarketDepthModelSample/Orders.cs b/samples/MarketDepthModelSample/Orders.cs new file mode 100644 index 00000000..bd69f5f1 --- /dev/null +++ b/samples/MarketDepthModelSample/Orders.cs @@ -0,0 +1,56 @@ +// +// Copyright © 2024 Devexperts LLC. All rights reserved. +// This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/. +// + +using System.Collections.Generic; +using System.Collections.ObjectModel; +using Avalonia.Controls; +using Avalonia.Controls.Models.TreeDataGrid; +using DxFeed.Graal.Net.Events.Market; +using DxFeed.Graal.Net.Utils; + +namespace MarketDepthModelSample; + +public class Orders +{ + private readonly RangedObservableCollection buyOrders = new(); + private readonly RangedObservableCollection sellOrders = new(); + + public Orders() + { + BuyOrders = CreateDataGridSource(buyOrders); + SellOrders = CreateDataGridSource(sellOrders); + } + + public ITreeDataGridSource BuyOrders { get; } + + public ITreeDataGridSource SellOrders { get; } + + public void UpdateBuy(IEnumerable orders) => + buyOrders.ReplaceRange(orders); + + public void UpdateSell(IEnumerable orders) => + sellOrders.ReplaceRange(orders); + + private static FlatTreeDataGridSource CreateDataGridSource(ObservableCollection collection) => + new(collection) + { + Columns = + { + new TextColumn("№", o => GetRowNumber(collection, o)), + new TextColumn("Symbol", o => o.EventSymbol), + new TextColumn("DateTime", o => TimeFormat.Default.WithMillis().Format(o.Time)), + new TextColumn("EX", o => StringUtil.EncodeChar(o.ExchangeCode)), + new TextColumn("Source", o => o.EventSource), + new TextColumn("Scope", o => o.Scope), + new TextColumn("MM", o => o.MarketMaker), + new TextColumn("Price", o => o.Price), + new TextColumn("Size", o => o.Size), + } + }; + + private static int GetRowNumber(ObservableCollection collection, Order order) => + collection.IndexOf(order) + 1; +} diff --git a/samples/MarketDepthModelSample/Program.cs b/samples/MarketDepthModelSample/Program.cs new file mode 100644 index 00000000..e78bc3ab --- /dev/null +++ b/samples/MarketDepthModelSample/Program.cs @@ -0,0 +1,22 @@ +using System; +using Avalonia; + +namespace MarketDepthModelSample; + +internal abstract class Program +{ + // Initialization code. Don't use any Avalonia, third-party APIs or any + // SynchronizationContext-reliant code before AppMain is called: things aren't initialized + // yet and stuff might break. + [STAThread] + public static void Main(string[] args) => + BuildAvaloniaApp() + .StartWithClassicDesktopLifetime(args); + + // Avalonia configuration, don't remove; also used by visual designer. + private static AppBuilder BuildAvaloniaApp() + => AppBuilder.Configure() + .UsePlatformDetect() + .WithInterFont() + .LogToTrace(); +} diff --git a/samples/MarketDepthModelSample/RangedObservableCollection.cs b/samples/MarketDepthModelSample/RangedObservableCollection.cs new file mode 100644 index 00000000..bf6f2005 --- /dev/null +++ b/samples/MarketDepthModelSample/RangedObservableCollection.cs @@ -0,0 +1,31 @@ +// +// Copyright © 2024 Devexperts LLC. All rights reserved. +// This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/. +// + +namespace MarketDepthModelSample; + +using System; +using System.Collections.Generic; +using System.Collections.ObjectModel; +using System.Collections.Specialized; + +public class RangedObservableCollection : ObservableCollection +{ + public void ReplaceRange(IEnumerable collection) + { + if (collection == null) + { + throw new ArgumentNullException(nameof(collection)); + } + + Items.Clear(); + foreach (var i in collection) + { + Items.Add(i); + } + + OnCollectionChanged(new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Reset)); + } +} diff --git a/samples/MarketDepthModelSample/app.manifest b/samples/MarketDepthModelSample/app.manifest new file mode 100644 index 00000000..795dd244 --- /dev/null +++ b/samples/MarketDepthModelSample/app.manifest @@ -0,0 +1,18 @@ + + + + + + + + + + + + + + diff --git a/src/DxFeed.Graal.Net/Api/DXEndpoint.cs b/src/DxFeed.Graal.Net/Api/DXEndpoint.cs index b61d25ff..7df18b6a 100644 --- a/src/DxFeed.Graal.Net/Api/DXEndpoint.cs +++ b/src/DxFeed.Graal.Net/Api/DXEndpoint.cs @@ -14,6 +14,7 @@ using DxFeed.Graal.Net.Events; using DxFeed.Graal.Net.Native.Endpoint; using DxFeed.Graal.Net.Native.ErrorHandling; +using DxFeed.Graal.Net.Native.Executors; using DxFeed.Graal.Net.Utils; using static DxFeed.Graal.Net.Native.Endpoint.DXEndpointWrapper; @@ -659,6 +660,13 @@ public DXPublisher GetPublisher() => public void Dispose() => Close(); + /// + /// For testing only. + /// + /// The executor. + internal void Executor(InPlaceExecutor executor) => + _endpointNative.Executor(executor); + /// /// Closes all associated resources with this . /// diff --git a/src/DxFeed.Graal.Net/Models/AbstractTxModel.cs b/src/DxFeed.Graal.Net/Models/AbstractTxModel.cs new file mode 100644 index 00000000..876c362c --- /dev/null +++ b/src/DxFeed.Graal.Net/Models/AbstractTxModel.cs @@ -0,0 +1,282 @@ +// +// Copyright © 2024 Devexperts LLC. All rights reserved. +// This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/. +// + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using DxFeed.Graal.Net.Api; +using DxFeed.Graal.Net.Events; + +namespace DxFeed.Graal.Net.Models; + +/// +/// Abstract base class for models that handle transactions of . +/// This model manages all snapshot and transaction logic, subscription handling, and listener notifications. +/// +///

This model is designed to handle incremental transactions. Users of this model only see the list +/// of events in a consistent state. This model delays incoming events that are part of an incomplete snapshot +/// or ongoing transaction until the snapshot is complete or the transaction has ended.

+/// +///

Configuration

+/// +///

This model must be configured using the . Specific implementations can add additional +/// configuration options as needed. This model requires a call to the method +/// (all inheritors must call this method) for subscription.

+/// +///

This model only supports single symbol subscriptions; multiple symbols cannot be configured.

+/// +///

Resource management and closed models

+/// +///

Attached model is a potential memory leak. If the pointer to attached model is lost, then there is no way +/// to detach this model from the feed and the model will not be reclaimed by the garbage collector as long as the +/// corresponding feed is still used. Detached model can be reclaimed by the garbage collector, but detaching model +/// requires knowing the pointer to the feed at the place of the call, which is not always convenient.

+/// +///

The convenient way to detach model from the feed is to call its +/// method. +/// Closed model becomes permanently detached from all feeds, removes all its listeners and is guaranteed +/// to be reclaimable by the garbage collector as soon as all external references to it are cleared.

+/// +///

Threads and locks

+/// +///

This class is thread-safe and can be used concurrently from multiple threads without external synchronization.

+/// The corresponding to never be concurrent +///
+/// The type of indexed events processed by this model. +public abstract class AbstractTxModel : IDisposable + where TE : class, IIndexedEvent +{ + private readonly ConcurrentDictionary> _processorsBySource = new(); + private readonly HashSet> _readyProcessors = new(); + private readonly DXFeedSubscription _subscription; + private readonly object _symbol; + private readonly TxModelListener? _listener; + + private protected AbstractTxModel(Builder builder) + { + _symbol = builder.Symbol ?? throw new InvalidOperationException("The 'symbol' must not be null."); + var feed = builder.Feed ?? throw new InvalidOperationException("The 'feed' must not be null."); + IsBatchProcessing = builder.IsBatchProcessing; + IsSnapshotProcessing = builder.IsSnapshotProcessing; + _listener = builder.Listener; + _subscription = feed.CreateSubscription(typeof(TE)); + _subscription.AddEventListener(ProcessEvents); + } + + /// + /// Gets a value indicating whether if batch processing is enabled. + /// See . + /// + /// + /// true if batch processing is enabled; otherwise, false. + /// + public bool IsBatchProcessing { get; } + + /// + /// Gets a value indicating whether if snapshot processing is enabled. + /// See . + /// + /// + /// true if snapshot processing is enabled; otherwise, false. + /// + public bool IsSnapshotProcessing { get; } + + /// + /// Closes this model and makes it permanently detached. + /// + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + /// + /// Processes a list of events, updating the relevant processors and handling batch processing. + /// + /// The list of events to process. + internal void ProcessEvents(IEnumerable events) + { + try + { + IndexedEventSource? source = null; + TxEventProcessor? currentProcessor = null; + foreach (var e in events.OfType()) + { + if (source == null || source.Id != e.EventSource.Id) + { + source = e.EventSource; + currentProcessor = GetOrCreateProcessor(source); + } + + if (currentProcessor!.ProcessEvent(e)) + { + _readyProcessors.Add(currentProcessor); + } + } + + foreach (var processor in _readyProcessors) + { + processor.ReceiveAllEventsInBatch(); + } + } + catch (Exception e) + { + // ToDo Add log entry. + Console.Error.WriteLine($"Exception in {GetType().Name} event listener: {e}"); + } + finally + { + _readyProcessors.Clear(); + } + } + + /// + /// Gets the undecorated symbol associated with the model. + /// + /// The undecorated symbol associated with the model. + protected object GetUndecoratedSymbol() => + _symbol; + + /// + /// Sets the subscription symbols for the model. + /// + /// The set of symbols to subscribe to. + protected void SetSymbols(HashSet symbols) => + _subscription.SetSymbols(symbols); + + /// + protected virtual void Dispose(bool disposing) + { + if (disposing) + { + _subscription.Dispose(); + } + } + + private TxEventProcessor GetOrCreateProcessor(IndexedEventSource source) => + _processorsBySource.GetOrAdd(source, src => new( + IsBatchProcessing, + IsSnapshotProcessing, + (transaction, isSnapshot) => + { + _listener?.Invoke(src, new List(transaction), isSnapshot); + })); + + /// + /// Non-generic version, for erasing a generic type. + /// + public abstract class Builder + { + internal bool IsBatchProcessing { get; set; } = true; + + internal bool IsSnapshotProcessing { get; set; } + + internal DXFeed? Feed { get; set; } + + internal object? Symbol { get; set; } + + internal TxModelListener? Listener { get; set; } + } + + /// + /// Abstract builder for building models inherited from . + /// Specific implementations can add additional configuration options to this builder. + /// + ///

Inheritors of this class must override the abstract method to build a specific model.

+ ///
+ /// The type of the builder subclass. + /// The type of the model subclass. + public abstract class Builder : Builder + where TB : Builder + where TM : AbstractTxModel + { + /// + /// Enables or disables batch processing. + /// This is enabled by default. + /// + ///

If batch processing is disabled, the model will notify the listener + /// separately for each transaction (even if it is represented by a single event); + /// otherwise, transactions can be combined in a single listener call.

+ /// + ///

A transaction may represent either a snapshot or update events that are received after a snapshot. + /// Whether this flag is set or not, the model will always notify listeners that a snapshot has been received + /// and will not combine multiple snapshots or a snapshot with another transaction + /// into a single listener notification.

+ ///
+ /// true to enable batch processing; false otherwise. + /// The builder instance. + public TB WithBatchProcessing(bool isBatchProcessing) + { + IsBatchProcessing = isBatchProcessing; + return (TB)this; + } + + /// + /// Enables or disables snapshot processing. + /// This is disabled by default. + /// + ///

If snapshot processing is enabled, transactions representing a snapshot will be processed as follows: + /// events that are marked for removal will be removed, repeated indexes will be merged, and + /// event flags of events are set to zero; otherwise, the user will see the snapshot in raw form, + /// with possible repeated indexes, events marked for removal, and event flags unchanged.

+ /// + ///

Whether this flag is set or not, in transactions that are not a snapshot, events that are marked + /// for removal will not be removed, repeated indexes will not be merged, and + /// event flags of events will not be changed. + /// This flag only affects the processing of transactions that are a snapshot.

+ ///
+ /// true to enable snapshot processing; false otherwise. + /// The builder instance. + public TB WithSnapshotProcessing(bool isSnapshotProcessing) + { + IsSnapshotProcessing = isSnapshotProcessing; + return (TB)this; + } + + /// + /// Sets the for the model being created. + /// The feed cannot be attached after the model has been built. + /// + /// The . + /// The builder instance. + public TB WithFeed(DXFeed feed) + { + Feed = feed; + return (TB)this; + } + + /// + /// Sets the subscription symbol for the model being created. + /// The symbol cannot be added or changed after the model has been built. + /// + /// The subscription symbol. + /// The builder instance. + public TB WithSymbol(object symbol) + { + Symbol = symbol; + return (TB)this; + } + + /// + /// Sets the listener for transaction notifications. + /// The listener cannot be changed or added once the model has been built. + /// + /// The transaction listener. + /// The builder instance. + public TB WithListener(TxModelListener listener) + { + Listener = listener; + return (TB)this; + } + + /// + /// Builds an instance of the model based on the provided parameters. + /// + /// The created model. + public abstract TM Build(); + } +} diff --git a/src/DxFeed.Graal.Net/Models/IndexedTxModel.cs b/src/DxFeed.Graal.Net/Models/IndexedTxModel.cs new file mode 100644 index 00000000..43a61008 --- /dev/null +++ b/src/DxFeed.Graal.Net/Models/IndexedTxModel.cs @@ -0,0 +1,184 @@ +// +// Copyright © 2024 Devexperts LLC. All rights reserved. +// This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/. +// + +using System.Collections.Generic; +using DxFeed.Graal.Net.Api; +using DxFeed.Graal.Net.Api.Osub; +using DxFeed.Graal.Net.Events; + +namespace DxFeed.Graal.Net.Models; + +/// +/// An incremental model for indexed events. +/// This model manages all snapshot and transaction logic, subscription handling, and listener notifications. +/// +///

This model is designed to handle incremental transactions. Users of this model only see the list +/// of events in a consistent state. This model delays incoming events that are part of an incomplete snapshot +/// or ongoing transaction until the snapshot is complete or the transaction has ended. This model notifies +/// the user of received transactions through an installed .

+/// +///

Configuration

+/// +///

This model must be configured using the class, as most configuration +/// settings cannot be changed once the model is built. This model requires configuration +/// and +/// for subscription, +/// and it must be attached to a +/// instance to begin operation. +/// For ease of use, some of these configurations can be changed after the model is built, +/// see .

+/// +///

This model only supports single symbol subscriptions; multiple symbols cannot be configured.

+/// +///

Resource management and closed models

+/// +///

Attached model is a potential memory leak. If the pointer to attached model is lost, then there is no way +/// to detach this model from the feed and the model will not be reclaimed by the garbage collector as long as the +/// corresponding feed is still used. Detached model can be reclaimed by the garbage collector, but detaching model +/// requires knowing the pointer to the feed at the place of the call, which is not always convenient.

+/// +///

The convenient way to detach model from the feed is to call its +/// method. +/// Closed model becomes permanently detached from all feeds, removes all its listeners and is guaranteed +/// to be reclaimable by the garbage collector as soon as all external references to it are cleared.

+/// +///

Threads and locks

+/// +///

This class is thread-safe and can be used concurrently from multiple threads without external synchronization.

+/// The corresponding to never be concurrent. +///
+/// The type of indexed events processed by this model. +public sealed class IndexedTxModel : AbstractTxModel + where TE : class, IIndexedEvent +{ + private readonly object _syncRoot = new(); + private HashSet _sources; + + private IndexedTxModel(Builder builder) + : base(builder) + { + _sources = new HashSet(builder.Sources); + UpdateSubscription(GetUndecoratedSymbol(), _sources); + } + + /// + /// Factory method to create a new builder for this model. + /// + /// A new instance. + public static Builder NewBuilder() => + new(); + + /// + /// Gets the current set of sources. + /// If no sources have been set, an empty set is returned, + /// indicating that all possible sources have been subscribed to. + /// + /// The set of current sources. + public HashSet GetSources() + { + lock (_syncRoot) + { + return new HashSet(_sources); + } + } + + /// + /// Sets the sources from which to subscribe for indexed events. + /// If an empty list is provided, subscriptions will default to all available sources. + /// If these sources have already been set, nothing happens. + /// + /// The specified sources. + public void SetSources(params IndexedEventSource[] sources) => + SetSources(new HashSet(sources)); + + /// + /// Sets the sources from which to subscribe for indexed events. + /// If an empty set is provided, subscriptions will default to all available sources. + /// If these sources have already been set, nothing happens. + /// + /// The specified sources. + public void SetSources(ICollection sources) + { + lock (_syncRoot) + { + if (_sources.SetEquals(sources)) + { + return; + } + + _sources = new HashSet(sources); + UpdateSubscription(GetUndecoratedSymbol(), _sources); + } + } + + private static HashSet DecorateSymbol(object symbol, HashSet sources) + { + if (sources.Count == 0) + { + return new HashSet { symbol }; + } + + var symbols = new HashSet(); + foreach (var source in sources) + { + symbols.Add(new IndexedEventSubscriptionSymbol(symbol, source)); + } + + return symbols; + } + + private void UpdateSubscription(object symbol, HashSet sources) => + SetSymbols(DecorateSymbol(symbol, sources)); + + /// + /// A builder class for creating an instance of . + /// + public new class Builder : Builder> + { + internal HashSet Sources { get; private set; } = new(); + + /// + /// Sets the sources from which to subscribe for indexed events. + /// If no sources have been set, subscriptions will default to all possible sources. + /// + ///

The default value for this source is an empty set, + /// which means that this model subscribes to all available sources. + /// These sources can be changed later, after the model has been created, + /// by calling .

+ ///
+ /// The specified sources. + /// this builder. + public Builder WithSources(params IndexedEventSource[] sources) + { + Sources = new HashSet(sources); + return this; + } + + /// + /// Sets the sources from which to subscribe for indexed events. + /// If no sources have been set, subscriptions will default to all possible sources. + /// + ///

The default value for this source is an empty set, + /// which means that this model subscribes to all available sources. + /// These sources can be changed later, after the model has been created, + /// by calling .

+ ///
+ /// The specified sources. + /// this builder. + public Builder WithSources(ICollection sources) + { + Sources = new HashSet(sources); + return this; + } + + /// + /// Builds an instance of based on the provided parameters. + /// + /// The created . + public override IndexedTxModel Build() => + new(this); + } +} diff --git a/src/DxFeed.Graal.Net/Models/MarketDepthListener.cs b/src/DxFeed.Graal.Net/Models/MarketDepthListener.cs new file mode 100644 index 00000000..df9ba21b --- /dev/null +++ b/src/DxFeed.Graal.Net/Models/MarketDepthListener.cs @@ -0,0 +1,22 @@ +// +// Copyright © 2024 Devexperts LLC. All rights reserved. +// This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/. +// + +using System.Collections.Generic; +using DxFeed.Graal.Net.Events.Market; + +namespace DxFeed.Graal.Net.Models; + +/// +/// Invoked when the order book is changed. +///

The delegate is used to handle notifications +/// of changes to the market depth, including updates to the buy and sell orders. +/// Implement this delegate to process or react to changes in the market order book.

+///
+/// The collection of buy orders. +/// The collection of sell orders. +/// The type of order derived from . +public delegate void MarketDepthListener(IEnumerable buy, IEnumerable sell) + where TE : OrderBase; diff --git a/src/DxFeed.Graal.Net/Models/MarketDepthModel.cs b/src/DxFeed.Graal.Net/Models/MarketDepthModel.cs new file mode 100644 index 00000000..c0ca2105 --- /dev/null +++ b/src/DxFeed.Graal.Net/Models/MarketDepthModel.cs @@ -0,0 +1,643 @@ +// +// Copyright © 2024 Devexperts LLC. All rights reserved. +// This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/. +// + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using DxFeed.Graal.Net.Api; +using DxFeed.Graal.Net.Events; +using DxFeed.Graal.Net.Events.Market; + +namespace DxFeed.Graal.Net.Models; + +/// +/// Represents a model for market depth, tracking buy and sell orders and notifies listener +/// of changes in the order book. +/// +///

This model can set depth limit and aggregation period. This model notifies +/// the user of received transactions through an installed .

+/// +///

The depth limit specifies the maximum number of buy or sell orders to maintain in the order book. +/// For example, if the depth limit is set to 10, the model will only keep track of the top 10 buy orders +/// and the top 10 sell orders. This helps in managing the size of the order book.

+/// +///

The aggregation period, specified in milliseconds, determines the frequency at which the model aggregates +/// and notifies changes in the order book to the listeners. For instance, if the aggregation period is +/// set to 1000 milliseconds the model will aggregate changes and notify listeners every second. +/// A value of 0 means that changes are notified immediately.

+/// +///

Configuration

+/// +///

This model must be configured using the class, as most configuration +/// settings cannot be changed once the model is built. This model requires configuration +/// and it must be attached to a +/// instance to begin operation.

+/// +///

This model only supports single symbol subscriptions; multiple symbols cannot be configured.

+/// +///

Resource management and closed models

+/// +///

Attached model is a potential memory leak. If the pointer to attached model is lost, then there is no way +/// to detach this model from the feed and the model will not be reclaimed by the garbage collector as long as the +/// corresponding feed is still used. Detached model can be reclaimed by the garbage collector, but detaching model +/// requires knowing the pointer to the feed at the place of the call, which is not always convenient.

+/// +///

The convenient way to detach model from the feed is to call its +/// method. +/// Closed model becomes permanently detached from all feeds, removes all its listeners and is guaranteed +/// to be reclaimable by the garbage collector as soon as all external references to it are cleared.

+/// +///

Threads and locks

+/// +///

This class is thread-safe and can be used concurrently from multiple threads without external synchronization.

+/// The corresponding to never be concurrent. +///
+/// The type of order derived from . +public sealed class MarketDepthModel : IDisposable + where TE : OrderBase +{ + private static readonly Comparer OrderComparator = Comparer.Create((o1, o2) => + { + var ind1 = o1.Scope == Scope.Order; + var ind2 = o2.Scope == Scope.Order; + if (ind1 && ind2) + { + // Both orders are individual orders. + var c = o1.TimeSequence.CompareTo(o2.TimeSequence); // asc + if (c != 0) + { + return c; + } + + c = o1.Index.CompareTo(o2.Index); // asc + return c; + } + else + { + if (ind1) + { + // First order is individual, second is not. + return 1; + } + + if (ind2) + { + // Second order is individual, first is not. + return -1; + } + + // Both orders are non-individual orders. + var c = o2.Size.CompareTo(o1.Size); // desc + if (c != 0) + { + return c; + } + + c = o1.TimeSequence.CompareTo(o2.TimeSequence); // asc + if (c != 0) + { + return c; + } + + c = o1.Scope.CompareTo(o2.Scope); // asc + if (c != 0) + { + return c; + } + + c = o1.ExchangeCode.CompareTo(o2.ExchangeCode); // asc + if (c != 0) + { + return c; + } + + if (o1 is Order order1 && o2 is Order order2) + { + c = string.Compare(order1.MarketMaker, order2.MarketMaker, StringComparison.Ordinal); // asc + if (c != 0) + { + return c; + } + } + + c = o1.Index.CompareTo(o2.Index); // asc + return c; + } + }); + + private static readonly Comparer BuyComparator = Comparer.Create((o1, o2) => + { + if (o1.Price < o2.Price) + { + return 1; // desc + } + + if (o1.Price > o2.Price) + { + return -1; + } + + return OrderComparator.Compare(o1, o2); + }); + + private static readonly Comparer SellComparator = Comparer.Create((o1, o2) => + { + if (o1.Price < o2.Price) + { + return -1; // asc + } + + if (o1.Price > o2.Price) + { + return 1; + } + + return OrderComparator.Compare(o1, o2); + }); + + private readonly object _syncRoot = new(); + private readonly Dictionary _ordersByIndex = new(); + private readonly SortedOrderSet _buyOrders = new(BuyComparator); + private readonly SortedOrderSet _sellOrders = new(SellComparator); + private readonly IndexedTxModel _txModel; + private readonly MarketDepthListener? _listener; + private CancellationTokenSource? _cts; + private volatile bool _taskScheduled; + private Task? _task; + private long _aggregationPeriodMillis; + private int _depthLimit; + + private MarketDepthModel(Builder builder) + { + _depthLimit = builder.DepthLimit; + _buyOrders.SetDepthLimit(_depthLimit); + _sellOrders.SetDepthLimit(_depthLimit); + _listener = builder.Listener; + _aggregationPeriodMillis = builder.AggregationPeriodMillis; + _txModel = builder.TxModelBuilder.WithListener(EventReceived).Build(); + } + + /// + /// Creates a new builder instance for constructing a MarketDepthModel. + /// + /// A new instance of the builder. + public static Builder NewBuilder() => + new(); + + /// + /// Gets the depth limit of the order book. + /// + /// The current depth limit. + public int GetDepthLimit() + { + lock (_syncRoot) + { + return _depthLimit; + } + } + + /// + /// Sets the depth limit of the order book. + /// + /// The new depth limit value. + public void SetDepthLimit(int value) + { + lock (_syncRoot) + { + if (value < 0) + { + value = 0; + } + + if (value == _depthLimit) + { + return; + } + + _depthLimit = value; + _buyOrders.SetDepthLimit(value); + _sellOrders.SetDepthLimit(value); + TryCancelTask(); + NotifyListeners(); + } + } + + /// + /// Gets the aggregation period in milliseconds. + /// + /// The current aggregation period. + public long GetAggregationPeriod() + { + lock (_syncRoot) + { + return _aggregationPeriodMillis; + } + } + + /// + /// Sets the aggregation period in milliseconds. + /// + /// The new aggregation period value. + public void SetAggregationPeriod(long value) + { + lock (_syncRoot) + { + if (value < 0) + { + value = 0; + } + + if (value == _aggregationPeriodMillis) + { + return; + } + + _aggregationPeriodMillis = value; + RescheduleTaskIfNeeded(_aggregationPeriodMillis); + } + } + + /// + /// Closes this model and makes it permanently detached. + /// + public void Dispose() => + _txModel.Dispose(); + + private static bool ShallAdd(TE order) => + order.HasSize && (order.EventFlags & EventFlags.RemoveEvent) == 0; + + private void EventReceived(IndexedEventSource source, IEnumerable events, bool isSnapshot) + { + lock (_syncRoot) + { + if (!Update(source, events, isSnapshot)) + { + return; + } + + if (isSnapshot || _aggregationPeriodMillis == 0) + { + TryCancelTask(); + NotifyListeners(); + } + else + { + ScheduleTaskIfNeeded(_aggregationPeriodMillis); + } + } + } + + private void NotifyListeners() + { + lock (_syncRoot) + { + try + { + _listener?.Invoke(GetBuyOrders(), GetSellOrders()); + } + finally + { + _taskScheduled = false; + } + } + } + + private void ScheduleTaskIfNeeded(long delay) + { + lock (_syncRoot) + { + if (!_taskScheduled) + { + _taskScheduled = true; + _cts = new CancellationTokenSource(); + _task = Task.Delay(TimeSpan.FromMilliseconds(delay), _cts.Token) + .ContinueWith(_ => NotifyListeners(), TaskScheduler.Default); + } + } + } + + private void RescheduleTaskIfNeeded(long delay) + { + lock (_syncRoot) + { + if (TryCancelTask() && delay != 0) + { + ScheduleTaskIfNeeded(delay); + } + } + } + + private bool TryCancelTask() + { + lock (_syncRoot) + { + if (_taskScheduled && _task is { IsCompleted: false }) + { + _cts?.Cancel(); + _cts?.Dispose(); + _taskScheduled = false; + return true; + } + + return false; + } + } + + private bool Update(IndexedEventSource source, IEnumerable events, bool isSnapshot) + { + if (isSnapshot) + { + ClearBySource(source); + } + + foreach (var order in events) + { + if (_ordersByIndex.TryGetValue(order.Index, out var removed)) + { + _ordersByIndex.Remove(order.Index); + GetOrderSetForOrder(removed).Remove(removed); + } + + if (ShallAdd(order)) + { + _ordersByIndex.Add(order.Index, order); + GetOrderSetForOrder(order).Add(order); + } + } + + return _buyOrders.IsChanged || _sellOrders.IsChanged; + } + + private List GetBuyOrders() => + _buyOrders.ToList(); + + private List GetSellOrders() => + _sellOrders.ToList(); + + private void ClearBySource(IndexedEventSource source) + { + _ordersByIndex + .Where(p => p.Value.EventSource.Equals(source)) + .ToList() + .ForEach(p => _ordersByIndex.Remove(p.Key)); + _buyOrders.ClearBySource(source); + _sellOrders.ClearBySource(source); + } + + private SortedOrderSet GetOrderSetForOrder(TE order) => + order.OrderSide == Side.Buy ? _buyOrders : _sellOrders; + + /// + /// Builder class for constructing instances of MarketDepthModel. + /// + public class Builder + { + internal IndexedTxModel.Builder TxModelBuilder { get; } = IndexedTxModel.NewBuilder(); + + internal MarketDepthListener? Listener { get; private set; } + + internal long AggregationPeriodMillis { get; private set; } + + internal int DepthLimit { get; private set; } + + /// + /// Sets the for the model being created. + /// The feed cannot be attached after the model has been built. + /// + /// The . + /// The builder instance. + public Builder WithFeed(DXFeed feed) + { + TxModelBuilder.WithFeed(feed); + return this; + } + + /// + /// Sets the subscription symbol for the model being created. + /// The symbol cannot be added or changed after the model has been built. + /// + /// The subscription symbol. + /// The builder instance. + public Builder WithSymbol(string symbol) + { + TxModelBuilder.WithSymbol(symbol); + return this; + } + + /// + /// Sets the listener for transaction notifications. + /// The listener cannot be changed or added once the model has been built. + /// + /// The transaction listener. + /// The builder instance. + public Builder WithListener(MarketDepthListener listener) + { + Listener = listener; + return this; + } + + /// + /// Sets the sources from which to subscribe for indexed events. + /// If no sources have been set, subscriptions will default to all possible sources. + /// + /// + /// The default value for this source is an empty set, + /// which means that this model subscribes to all available sources. + /// + /// The specified sources. + /// this builder. + public Builder WithSources(params OrderSource[] sources) + { + TxModelBuilder.WithSources(new List(sources)); + return this; + } + + /// + /// Sets the sources from which to subscribe for indexed events. + /// If no sources have been set, subscriptions will default to all possible sources. + /// + /// + /// The default value for this source is an empty set, + /// which means that this model subscribes to all available sources. + /// + /// The specified sources. + /// this builder. + public Builder WithSources(ICollection sources) + { + TxModelBuilder.WithSources(new List(sources)); + return this; + } + + /// + /// Sets the aggregation period. + /// + /// The aggregation period in milliseconds. + /// this builder. + public Builder WithAggregationPeriod(long aggregationPeriodMillis) + { + if (aggregationPeriodMillis < 0) + { + aggregationPeriodMillis = 0; + } + + AggregationPeriodMillis = aggregationPeriodMillis; + return this; + } + + /// + /// Sets the depth limit. + /// + /// The depth limit. + /// this builder. + public Builder WithDepthLimit(int depthLimit) + { + if (depthLimit < 0) + { + depthLimit = 0; + } + + DepthLimit = depthLimit; + return this; + } + + /// + /// Builds an instance of based on the provided parameters. + /// + /// The created . + public MarketDepthModel Build() => + new(this); + } + + /// + /// Represents a set of orders, sorted by a comparator. + /// + private sealed class SortedOrderSet + { + private readonly List _snapshot = new(); + private readonly IComparer _comparator; + private readonly SortedSet _orders; + private int _depthLimit; + + /// + /// Initializes a new instance of the class with specified comparator. + /// + /// The comparator to use for sorting orders. + public SortedOrderSet(IComparer comparator) + { + _comparator = comparator; + _orders = new SortedSet(comparator); + } + + /// + /// Gets a value indicating whether this set has changed. + /// + public bool IsChanged { get; private set; } + + /// + /// Sets the depth limit. + /// + /// The new depth limit. + public void SetDepthLimit(int depthLimit) + { + if (_depthLimit == depthLimit) + { + return; + } + + _depthLimit = depthLimit; + IsChanged = true; + } + + /// + /// Adds an order to the set. + /// + /// The order to add. + public void Add(TE order) + { + if (_orders.Add(order)) + { + MarkAsChangedIfNeeded(order); + } + } + + /// + /// Removes an order from the set. + /// + /// The order to remove. + public void Remove(TE order) + { + if (_orders.Remove(order)) + { + MarkAsChangedIfNeeded(order); + } + } + + /// + /// Clears orders from the set by source. + /// + /// The source to clear orders by. + public void ClearBySource(IndexedEventSource source) => + IsChanged = _orders.RemoveWhere(order => order.EventSource.Equals(source)) > 0; + + /// + /// Converts the set to a list. + /// + /// The list of orders. + public List ToList() + { + if (IsChanged) + { + UpdateSnapshot(); + } + + return new List(_snapshot); + } + + private void UpdateSnapshot() + { + IsChanged = false; + _snapshot.Clear(); + var limit = IsDepthLimitUnbounded() ? int.MaxValue : _depthLimit; + var it = _orders.GetEnumerator(); + for (var i = 0; i < limit && it.MoveNext(); ++i) + { + _snapshot.Add(it.Current); + } + } + + private void MarkAsChangedIfNeeded(TE order) + { + if (IsChanged) + { + return; + } + + if (IsDepthLimitUnbounded() || IsOrderCountWithinDepthLimit() || IsOrderWithinDepthLimit(order)) + { + IsChanged = true; + } + } + + private bool IsDepthLimitUnbounded() => + _depthLimit <= 0 || _depthLimit == int.MaxValue; + + private bool IsOrderCountWithinDepthLimit() => + _orders.Count <= _depthLimit; + + private bool IsOrderWithinDepthLimit(TE order) + { + if (_snapshot.Count == 0) + { + return true; + } + + var last = _snapshot[_snapshot.Count - 1]; + return _comparator.Compare(last, order) >= 0; + } + } +} diff --git a/src/DxFeed.Graal.Net/Models/TimeSeriesTxModel.cs b/src/DxFeed.Graal.Net/Models/TimeSeriesTxModel.cs new file mode 100644 index 00000000..98270fee --- /dev/null +++ b/src/DxFeed.Graal.Net/Models/TimeSeriesTxModel.cs @@ -0,0 +1,142 @@ +// +// Copyright © 2024 Devexperts LLC. All rights reserved. +// This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/. +// + +using System.Collections.Generic; +using DxFeed.Graal.Net.Api; +using DxFeed.Graal.Net.Api.Osub; +using DxFeed.Graal.Net.Events; + +namespace DxFeed.Graal.Net.Models; + +/// +/// An incremental model for time-series events. +/// This model manages all snapshot and transaction logic, subscription handling, and listener notifications. +/// +///

This model is designed to handle incremental transactions. Users of this model only see the list +/// of events in a consistent state. This model delays incoming events that are part of an incomplete snapshot +/// or ongoing transaction until the snapshot is complete or the transaction has ended. This model notifies +/// the user of received transactions through an installed .

+/// +///

Configuration

+/// +///

This model must be configured using the class, as most configuration +/// settings cannot be changed once the model is built. This model requires configuration +/// and +/// for subscription, and it must be attached +/// to a instance to begin operation. +/// For ease of use, some of these configurations can be changed after the model is built, +/// see .

+/// +///

This model only supports single symbol subscriptions; multiple symbols cannot be configured.

+/// +///

Resource management and closed models

+/// +///

Attached model is a potential memory leak. If the pointer to attached model is lost, then there is no way +/// to detach this model from the feed and the model will not be reclaimed by the garbage collector as long as the +/// corresponding feed is still used. Detached model can be reclaimed by the garbage collector, but detaching model +/// requires knowing the pointer to the feed at the place of the call, which is not always convenient.

+/// +///

The convenient way to detach model from the feed is to call its +/// method. +/// Closed model becomes permanently detached from all feeds, removes all its listeners and is guaranteed +/// to be reclaimable by the garbage collector as soon as all external references to it are cleared.

+/// +///

Threads and locks

+/// +///

This class is thread-safe and can be used concurrently from multiple threads without external synchronization.

+/// The corresponding to never be concurrent. +///
+/// The type of time series events processed by this model. +public sealed class TimeSeriesTxModel : AbstractTxModel + where TE : class, ITimeSeriesEvent +{ + private readonly object _syncRoot = new(); + private long _fromTime; + + private TimeSeriesTxModel(Builder builder) + : base(builder) + { + _fromTime = builder.FromTime; + UpdateSubscription(GetUndecoratedSymbol(), _fromTime); + } + + /// + /// Factory method to create a new builder for this model. + /// + /// A new instance. + public static Builder NewBuilder() => + new(); + + /// + /// Gets the time from which to subscribe for time-series, + /// or if this model is not subscribed (this is the default value). + /// + /// The time in milliseconds since Unix epoch of January 1, 1970. + public long GetFromTime() + { + lock (_syncRoot) + { + return _fromTime; + } + } + + /// + /// Sets the time from which to subscribe for time-series. + /// If this time has already been set, nothing happens. + /// + /// The time in milliseconds since Unix epoch of January 1, 1970. + public void SetFromTime(long fromTime) + { + lock (_syncRoot) + { + if (_fromTime == fromTime) + { + return; + } + + _fromTime = fromTime; + UpdateSubscription(GetUndecoratedSymbol(), _fromTime); + } + } + + private static HashSet DecorateSymbol(object symbol, long fromTime) => + fromTime == long.MaxValue + ? new HashSet() + : new HashSet { new TimeSeriesSubscriptionSymbol(symbol, fromTime) }; + + private void UpdateSubscription(object symbol, long fromTime) => + SetSymbols(DecorateSymbol(symbol, fromTime)); + + /// + /// A builder class for creating an instance of . + /// + public new class Builder : Builder> + { + internal long FromTime { get; private set; } = long.MaxValue; + + /// + /// Sets the time from which to subscribe for time-series. + /// + ///

This time defaults to , which means that this model is not subscribed. + /// This time can be changed later, after the model has been created, + /// by calling .

+ ///
+ /// The time in milliseconds since Unix epoch of January 1, 1970. + /// this builder. + public Builder WithFromTime(long fromTime) + { + FromTime = fromTime; + return this; + } + + /// + /// Builds an instance of based on the provided parameters. + /// + /// The created . + public override TimeSeriesTxModel Build() => + new(this); + } +} diff --git a/src/DxFeed.Graal.Net/Models/TxEventProcessor.cs b/src/DxFeed.Graal.Net/Models/TxEventProcessor.cs new file mode 100644 index 00000000..dec720c4 --- /dev/null +++ b/src/DxFeed.Graal.Net/Models/TxEventProcessor.cs @@ -0,0 +1,192 @@ +// +// Copyright © 2024 Devexperts LLC. All rights reserved. +// This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/. +// + +using System.Collections.Generic; +using System.Collections.Specialized; +using System.Linq; +using DxFeed.Graal.Net.Events; +using DxFeed.Graal.Net.Events.Market; + +namespace DxFeed.Graal.Net.Models; + +internal class TxEventProcessor + where TE : class, IIndexedEvent +{ + private readonly List _pendingEvents = new(); + private readonly ITransactionProcessor _transactionProcessor; + private readonly ISnapshotProcessor _snapshotProcessor; + private bool _isSnapshotBeginSeen; + private bool _isPartialSnapshot; + private bool _isCompleteSnapshot; + + public TxEventProcessor(bool isBatchProcessing, bool isSnapshotProcessing, Listener listener) + { + _transactionProcessor = isBatchProcessing + ? new BatchTransactionProcessor(listener) + : new NotifyTransactionProcessor(listener); + _snapshotProcessor = isSnapshotProcessing + ? new ProcessingSnapshotProcessor(listener) + : new NotifySnapshotProcessor(listener); + } + + public delegate void Listener(IEnumerable events, bool isSnapshot); + + private interface ITransactionProcessor + { + void ProcessTransaction(List events); + + void ProcessingBatch(); + } + + private interface ISnapshotProcessor + { + void ProcessSnapshot(List events); + } + + public bool ProcessEvent(TE e) + { + if (EventFlags.IsSnapshotBegin(e)) + { + _isSnapshotBeginSeen = true; + _isPartialSnapshot = true; + _isCompleteSnapshot = false; + + // Remove any unprocessed leftovers on new snapshot. + _pendingEvents.Clear(); + } + else if (!_isSnapshotBeginSeen) + { + // Ignore all events until at least one SNAPSHOT_BEGIN is received. + return false; + } + + if (_isPartialSnapshot && EventFlags.IsSnapshotEndOrSnip(e)) + { + _isPartialSnapshot = false; + _isCompleteSnapshot = true; + } + + // Defer processing of this event while snapshot in progress or tx pending. + _pendingEvents.Add(e); + if (EventFlags.IsPending(e) || _isPartialSnapshot) + { + // Waiting for the end of snapshot or transaction. + return false; + } + + // We have completed transaction or snapshot. + if (_isCompleteSnapshot) + { + // Completed snapshot. + _snapshotProcessor.ProcessSnapshot(_pendingEvents); + _isCompleteSnapshot = false; + } + else + { + // Completed transaction. + _transactionProcessor.ProcessTransaction(_pendingEvents); + } + + _pendingEvents.Clear(); + return true; + } + + public void ReceiveAllEventsInBatch() => + _transactionProcessor.ProcessingBatch(); + + private sealed class BatchTransactionProcessor : ITransactionProcessor + { + private readonly Listener _listener; + private readonly List _transactions = new(); + + public BatchTransactionProcessor(Listener listener) => + _listener = listener; + + public void ProcessTransaction(List events) => + _transactions.AddRange(events); + + public void ProcessingBatch() + { + if (_transactions.Count == 0) + { + return; + } + + _listener.Invoke(_transactions, false); + _transactions.Clear(); + } + } + + private sealed class NotifyTransactionProcessor : ITransactionProcessor + { + private readonly Listener _listener; + + public NotifyTransactionProcessor(Listener listener) => + _listener = listener; + + public void ProcessTransaction(List events) => + _listener.Invoke(events, false); + + public void ProcessingBatch() + { + // nothing to do + } + } + + private sealed class NotifySnapshotProcessor : ISnapshotProcessor + { + private readonly Listener _listener; + + public NotifySnapshotProcessor(Listener listener) => + _listener = listener; + + public void ProcessSnapshot(List events) => + _listener.Invoke(events, true); + } + + private sealed class ProcessingSnapshotProcessor : ISnapshotProcessor + { + private readonly Listener _listener; + private readonly OrderedDictionary _snapshot = new(); + + public ProcessingSnapshotProcessor(Listener listener) => + _listener = listener; + + public void ProcessSnapshot(List events) + { + foreach (var e in events) + { + if (IsRemove(e)) + { + _snapshot.Remove(e.Index); + } + else + { + e.EventFlags = 0; + _snapshot[e.Index] = e; + } + } + + _listener.Invoke(_snapshot.Values.OfType(), true); + _snapshot.Clear(); + } + + private static bool IsRemove(TE e) + { + if ((e.EventFlags & EventFlags.RemoveEvent) != 0) + { + return true; + } + + if (e is OrderBase order) + { + return !order.HasSize; + } + + return false; + } + } +} diff --git a/src/DxFeed.Graal.Net/Models/TxModelListener.cs b/src/DxFeed.Graal.Net/Models/TxModelListener.cs new file mode 100644 index 00000000..e99e1220 --- /dev/null +++ b/src/DxFeed.Graal.Net/Models/TxModelListener.cs @@ -0,0 +1,33 @@ +// +// Copyright © 2024 Devexperts LLC. All rights reserved. +// This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/. +// + +using System.Collections.Generic; +using DxFeed.Graal.Net.Events; + +namespace DxFeed.Graal.Net.Models; + +/// +/// Invoked when a complete transaction (one or more) is received. This behavior can be changed when building +/// the model; see . +/// +///

Only events that have the same and +/// can be in the same listener call and cannot be mixed within a single call. If there are multiple sources, +/// listener notifications will happen separately for each source.

+/// +///

A transaction can also be a snapshot. In such cases, the flag is set to true, +/// indicating that all state based on previously received events for the corresponding +/// should be cleared. A snapshot can also be post-processed or raw; +/// see . +/// If is true, +/// the transaction containing the snapshot can be empty (events.Count == 0), +/// meaning that an empty snapshot was received.

+///
+/// The source of the indexed events. +/// The list of received events representing one or more transactions. +/// Indicates if the events form a snapshot. +/// The type of indexed events passed to this listener. +public delegate void TxModelListener(IndexedEventSource source, IEnumerable events, bool isSnapshot) + where TE : IIndexedEvent; diff --git a/src/DxFeed.Graal.Net/Native/Endpoint/DXEndpointHandle.cs b/src/DxFeed.Graal.Net/Native/Endpoint/DXEndpointHandle.cs index b4eaca46..d62ceeab 100644 --- a/src/DxFeed.Graal.Net/Native/Endpoint/DXEndpointHandle.cs +++ b/src/DxFeed.Graal.Net/Native/Endpoint/DXEndpointHandle.cs @@ -6,6 +6,7 @@ using System.Diagnostics.CodeAnalysis; using System.Runtime.InteropServices; +using DxFeed.Graal.Net.Native.Executors; using DxFeed.Graal.Net.Native.Feed; using DxFeed.Graal.Net.Native.Interop; using DxFeed.Graal.Net.Native.Publisher; @@ -61,6 +62,9 @@ public void RemoveStateChangeListener(StateChangeListenerHandle listener) => public DXPublisherHandle GetPublisher() => SafeCall(Import.GetPublisher(CurrentThread, this)); + public void Executor(InPlaceExecutor executor) => + SafeCall(Import.Executor(CurrentThread, this, executor)); + private static class Import { [DllImport( @@ -211,5 +215,15 @@ public static extern int RemoveStateChangeListener( public static extern DXPublisherHandle GetPublisher( nint thread, DXEndpointHandle endpoint); + + [DllImport( + ImportInfo.DllName, + CallingConvention = CallingConvention.Cdecl, + CharSet = CharSet.Ansi, + EntryPoint = "dxfg_DXEndpoint_executor")] + public static extern int Executor( + nint thread, + DXEndpointHandle endpoint, + InPlaceExecutor executor); } } diff --git a/src/DxFeed.Graal.Net/Native/Endpoint/DXEndpointWrapper.cs b/src/DxFeed.Graal.Net/Native/Endpoint/DXEndpointWrapper.cs index 4acabd01..194bc1c5 100644 --- a/src/DxFeed.Graal.Net/Native/Endpoint/DXEndpointWrapper.cs +++ b/src/DxFeed.Graal.Net/Native/Endpoint/DXEndpointWrapper.cs @@ -5,6 +5,7 @@ // using System; +using DxFeed.Graal.Net.Native.Executors; using DxFeed.Graal.Net.Native.Feed; using DxFeed.Graal.Net.Native.Interop; using DxFeed.Graal.Net.Native.Publisher; @@ -18,6 +19,7 @@ internal sealed unsafe class DXEndpointWrapper : IDisposable private readonly Lazy feed; private readonly Lazy publisher; private readonly HandleMap listeners; + private InPlaceExecutor? _executor; private DXEndpointWrapper(DXEndpointHandle endpoint) { @@ -80,6 +82,12 @@ public FeedNative GetFeed() => public DXPublisherHandle GetPublisher() => publisher.Value; + public void Executor(InPlaceExecutor executor) + { + _executor = executor; + endpoint.Executor(executor); + } + public void Dispose() => Close(); diff --git a/src/DxFeed.Graal.Net/Native/Executors/InPlaceExecutor.cs b/src/DxFeed.Graal.Net/Native/Executors/InPlaceExecutor.cs new file mode 100644 index 00000000..e075d819 --- /dev/null +++ b/src/DxFeed.Graal.Net/Native/Executors/InPlaceExecutor.cs @@ -0,0 +1,43 @@ +// +// Copyright © 2024 Devexperts LLC. All rights reserved. +// This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/. +// + +using System.Runtime.InteropServices; +using DxFeed.Graal.Net.Native.ErrorHandling; +using DxFeed.Graal.Net.Native.Interop; + +namespace DxFeed.Graal.Net.Native.Executors; + +internal class InPlaceExecutor : JavaHandle +{ + public static InPlaceExecutor Create() => + ErrorCheck.SafeCall(Import.Create(CurrentThread)); + + public void ProcessAllPendingTasks() => + ErrorCheck.SafeCall(Import.ProcessAllPendingTasks(CurrentThread, this)); + + private static class Import + { + [DllImport( + ImportInfo.DllName, + CallingConvention = CallingConvention.Cdecl, + CharSet = CharSet.Ansi, + ExactSpelling = true, + BestFitMapping = false, + ThrowOnUnmappableChar = true, + EntryPoint = "dxfg_ExecutorBaseOnConcurrentLinkedQueue_new")] + public static extern InPlaceExecutor Create(nint thread); + + [DllImport( + ImportInfo.DllName, + CallingConvention = CallingConvention.Cdecl, + CharSet = CharSet.Ansi, + ExactSpelling = true, + BestFitMapping = false, + ThrowOnUnmappableChar = true, + EntryPoint = "dxfg_ExecutorBaseOnConcurrentLinkedQueue_processAllPendingTasks")] + public static extern InPlaceExecutor ProcessAllPendingTasks(nint thread, InPlaceExecutor executor); + } +} diff --git a/tests/DxFeed.Graal.Net.Tests/Models/AbstractTxModelTest.cs b/tests/DxFeed.Graal.Net.Tests/Models/AbstractTxModelTest.cs new file mode 100644 index 00000000..e7b54600 --- /dev/null +++ b/tests/DxFeed.Graal.Net.Tests/Models/AbstractTxModelTest.cs @@ -0,0 +1,436 @@ +// +// Copyright © 2024 Devexperts LLC. All rights reserved. +// This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/. +// + +using DxFeed.Graal.Net.Api; +using DxFeed.Graal.Net.Events; +using DxFeed.Graal.Net.Models; +using DxFeed.Graal.Net.Native.Executors; +using static DxFeed.Graal.Net.Events.EventFlags; + +namespace DxFeed.Graal.Net.Tests.Models; + +public abstract class AbstractTxModelTest + where TE : class, IIndexedEvent + where TM : AbstractTxModel + where TB : AbstractTxModel.Builder +{ + protected const string TestSymbol = "TEST-SYMBOL"; + + internal InPlaceExecutor Executor { get; } = InPlaceExecutor.Create(); + + private readonly List PublishedEvents = new(); + + private DXEndpoint Endpoint; + + protected int ListenerNotificationCounter { get; set; } + protected int SnapshotNotificationCounter { get; set; } + + protected static IEnumerable Params() + { + for (var i = 0; i < 4; i++) + { + var isBatchProcessing = (i & 1) != 0; + var isSnapshotProcessing = (i & 2) != 0; + yield return new TestCaseData(isBatchProcessing, isSnapshotProcessing) + .SetName( + $"TestCase {i} - isBatchProcessing:{isBatchProcessing}; isSnapshotProcessing:{isSnapshotProcessing}"); + } + } + + [SetUp] + public void SetUp() + { + Endpoint = DXEndpoint.Create(DXEndpoint.Role.LocalHub); + Endpoint.Executor(Executor); + Feed = Endpoint.GetFeed(); + Publisher = Endpoint.GetPublisher(); + } + + [TearDown] + public void TearDown() + { + SnapshotNotificationCounter = 0; + ListenerNotificationCounter = 0; + ReceivedEvents.Clear(); + PublishedEvents.Clear(); + Model?.Dispose(); + Endpoint.Dispose(); + } + + [OneTimeTearDown] + public void OneTimeTearDown() => + Executor.Dispose(); + + [Test, TestCaseSource(nameof(Params))] + public void TestSnapshotAndUpdate(bool isBatchProcessing, bool isSnapshotProcessing) + { + Model = Builder() + .WithBatchProcessing(isBatchProcessing) + .WithSnapshotProcessing(isSnapshotProcessing) + .Build(); + + AddToPublish(1, 1, SnapshotBegin); + AddToPublish(0, 2, SnapshotEnd); + AddToPublish(1, 3); + AddToPublish(3, 4); + AddToPublish(2, 5); + PublishDeferred(false); + AssertSnapshotNotification(1); // only one snapshot + AssertListenerNotification(Model.IsBatchProcessing ? 2 : 4); + AssertReceivedEventCount(5); + AssertEvent(1, 1, Model.IsSnapshotProcessing ? 0 : SnapshotBegin); + AssertEvent(0, 2, Model.IsSnapshotProcessing ? 0 : SnapshotEnd); + AssertEvent(1, 3, 0); // event with index 1 is not merged + AssertEvent(3, 4, 0); + AssertEvent(2, 5, 0); + } + + [Test, TestCaseSource(nameof(Params))] + public void TestEmptySnapshot(bool isBatchProcessing, bool isSnapshotProcessing) + { + Model = Builder() + .WithBatchProcessing(isBatchProcessing) + .WithSnapshotProcessing(isSnapshotProcessing) + .Build(); + + AddToPublish(0, 1, SnapshotBegin | SnapshotEnd | RemoveEvent); + PublishDeferred(false); + AssertSnapshotNotification(1); // only one snapshot + if (Model.IsSnapshotProcessing) + { + AssertReceivedEventCount(0); // event with RemoveEvent flag was removed inside the snapshot + } + else + { + AssertReceivedEventCount(1); // event with RemoveEvent flag is saved + AssertEvent(0, 1, SnapshotBegin | SnapshotEnd | RemoveEvent); + } + } + + [Test, TestCaseSource(nameof(Params))] + public void TestSnapshotWithPending(bool isBatchProcessing, bool isSnapshotProcessing) + { + Model = Builder() + .WithBatchProcessing(isBatchProcessing) + .WithSnapshotProcessing(isSnapshotProcessing) + .Build(); + + AddToPublish(1, 1, SnapshotBegin); + PublishDeferred(false); + AssertIsChanged(false); // not processed yet + AssertReceivedEventCount(0); + + AddToPublish(0, 2, SnapshotEnd | TxPending); + PublishDeferred(false); + AssertIsChanged(false); // not processed yet, because the pending flag is set + + AddToPublish(1, 3); // event without pending + PublishDeferred(false); + AssertListenerNotification(1); // since the transaction ended here + AssertSnapshotNotification(1); // and it's all one snapshot + if (Model.IsSnapshotProcessing) + { + AssertReceivedEventCount(2); // the same indices within a snapshot were merged + AssertEvent(1, 3, 0); + AssertEvent(0, 2, 0); + } + else + { + AssertReceivedEventCount(3); + AssertEvent(1, 1, SnapshotBegin); + AssertEvent(0, 2, SnapshotEnd | TxPending); + AssertEvent(1, 3, 0); + } + } + + [Test, TestCaseSource(nameof(Params))] + public void TestMultipleSnapshot(bool isBatchProcessing, bool isSnapshotProcessing) + { + Model = Builder() + .WithBatchProcessing(isBatchProcessing) + .WithSnapshotProcessing(isSnapshotProcessing) + .Build(); + + AddToPublish(1, 1, SnapshotBegin); + PublishDeferred(false); + AssertIsChanged(false); // not processed yet + AssertReceivedEventCount(0); + + AddToPublish(0, 2, SnapshotEnd); + AddToPublish(2, 3, SnapshotBegin); + PublishDeferred(false); + AssertListenerNotification(1); + AssertSnapshotNotification(1); // only one snapshot so far, beginning of the second one is in the buffer + AssertReceivedEventCount(2); + AssertEvent(1, 1, Model.IsSnapshotProcessing ? 0 : SnapshotBegin); + AssertEvent(0, 2, Model.IsSnapshotProcessing ? 0 : SnapshotEnd); + + AddToPublish(0, 4, SnapshotEnd); // end of second snapshot + AddToPublish(3, 5); // update after second snapshot + PublishDeferred(false); + AssertSnapshotNotification(1); + AssertListenerNotification(2); + AssertReceivedEventCount(3); + AssertEvent(2, 3, Model.IsSnapshotProcessing ? 0 : SnapshotBegin); + AssertEvent(0, 4, Model.IsSnapshotProcessing ? 0 : SnapshotEnd); + AssertEvent(3, 5, 0); + } + + [Test, TestCaseSource(nameof(Params))] + public void TestMultipleSnapshotInOneBatch(bool isBatchProcessing, bool isSnapshotProcessing) + { + Model = Builder() + .WithBatchProcessing(isBatchProcessing) + .WithSnapshotProcessing(isSnapshotProcessing) + .Build(); + + AddToPublish(0, 1, SnapshotBegin | SnapshotEnd); + AddToPublish(0, 2, SnapshotBegin | SnapshotSnip); + AddToPublish(0, 3, SnapshotBegin | RemoveEvent | SnapshotSnip | SnapshotEnd); + PublishDeferred(false); + AssertListenerNotification(3); + AssertSnapshotNotification(3); + if (Model.IsSnapshotProcessing) + { + AssertReceivedEventCount(2); // no event with RemoveEvent flag + AssertEvent(0, 1, 0); + AssertEvent(0, 2, 0); + } + else + { + AssertReceivedEventCount(3); + AssertEvent(0, 1, SnapshotBegin | SnapshotEnd); + AssertEvent(0, 2, SnapshotBegin | SnapshotSnip); + AssertEvent(0, 3, SnapshotBegin | RemoveEvent | SnapshotSnip | SnapshotEnd); + } + } + + [Test, TestCaseSource(nameof(Params))] + public void TestIncompleteSnapshot(bool isBatchProcessing, bool isSnapshotProcessing) + { + Model = Builder() + .WithBatchProcessing(isBatchProcessing) + .WithSnapshotProcessing(isSnapshotProcessing) + .Build(); + + AddToPublish(1, 1, SnapshotBegin); + PublishDeferred(false); + AssertIsChanged(false); // not processed yet + AssertReceivedEventCount(0); + + AddToPublish(2, 2, SnapshotBegin); // yet another snapshot begins + AddToPublish(3, 3); // event part of a snapshot + PublishDeferred(false); + AssertIsChanged(false); // not processed yet + AssertReceivedEventCount(0); + + AddToPublish(4, 4, SnapshotBegin); // start new snapshot + PublishDeferred(false); + AssertIsChanged(false); + AssertReceivedEventCount(0); + + AddToPublish(0, 5, SnapshotEnd); // full snapshot + AddToPublish(5, 6); // update event after the snapshot end in the same batch + AddToPublish(6, 7); // yet another update event after the snapshot end in the same batch + PublishDeferred(false); + AssertListenerNotification(Model.IsBatchProcessing ? 2 : 3); + AssertSnapshotNotification(1); // of which one snapshot + AssertReceivedEventCount(4); // chunks of previous snapshots have been deleted + AssertEvent(4, 4, Model.IsSnapshotProcessing ? 0 : SnapshotBegin); + AssertEvent(0, 5, Model.IsSnapshotProcessing ? 0 : SnapshotEnd); + AssertEvent(5, 6, 0); + AssertEvent(6, 7, 0); + + AddToPublish(7, 4, SnapshotBegin); // the snapshot hasn't ended yet + PublishDeferred(false); + AssertIsChanged(false); // not processed yet + } + + [Test, TestCaseSource(nameof(Params))] + public void TestPending(bool isBatchProcessing, bool isSnapshotProcessing) + { + Model = Builder() + .WithBatchProcessing(isBatchProcessing) + .WithSnapshotProcessing(isSnapshotProcessing) + .Build(); + + AddToPublish(0, 1, SnapshotBegin | SnapshotEnd); + PublishDeferred(false); + AssertIsChanged(true); + AssertIsSnapshot(true); + AssertReceivedEventCount(1); + + AddToPublish(1, 2, TxPending); // publish pending event + AddToPublish(2, 3, TxPending); // publish pending event, same index as the previous one + PublishDeferred(false); + AssertIsChanged(false); // not processed yet + + AddToPublish(3, 4, 0); // publish without pending + AddToPublish(4, 5, 0); // publish without pending + PublishDeferred(false); + AssertListenerNotification(Model.IsBatchProcessing ? 1 : 2); + AssertIsSnapshot(false); + AssertReceivedEventCount(5); // all published events, without merge + AssertEvent(0, 1, Model.IsSnapshotProcessing ? 0 : SnapshotBegin | SnapshotEnd); + AssertEvent(1, 2, TxPending); + AssertEvent(2, 3, TxPending); + AssertEvent(3, 4, 0); + AssertEvent(4, 5, 0); + } + + [Test, TestCaseSource(nameof(Params))] + public void TestEventsWithoutSnapshot(bool isBatchProcessing, bool isSnapshotProcessing) + { + Model = Builder() + .WithBatchProcessing(isBatchProcessing) + .WithSnapshotProcessing(isSnapshotProcessing) + .Build(); + + AddToPublish(2, 1); + AddToPublish(3, 2); + AddToPublish(1, 3); + AddToPublish(1, 4); // same index as the previous one + AddToPublish(0, 5); + PublishDeferred(false); + AssertIsChanged(false); // events prior to the snapshot are ignored + + AddToPublish(0, 1, SnapshotBegin | SnapshotEnd); + AddToPublish(1, 2); + PublishDeferred(false); + AssertIsChanged(true); + AssertReceivedEventCount(2); // after receiving a snapshot, all events are received + AssertEvent(0, 1, Model.IsSnapshotProcessing ? 0 : SnapshotBegin | SnapshotEnd); + AssertEvent(1, 2, 0); + } + + [Test, TestCaseSource(nameof(Params))] + public void TestSnapshotWithRemoveAndPending(bool isBatchProcessing, bool isSnapshotProcessing) + { + Model = Builder() + .WithBatchProcessing(isBatchProcessing) + .WithSnapshotProcessing(isSnapshotProcessing) + .Build(); + + AddToPublish(7, 1, SnapshotBegin); + AddToPublish(6, 2); + AddToPublish(5, 3, RemoveEvent); + AddToPublish(4, 4); + AddToPublish(3, 5); + AddToPublish(2, 6, TxPending); + AddToPublish(2, 7); + AddToPublish(1, 8); + AddToPublish(0, double.NaN, SnapshotEnd | TxPending | RemoveEvent); + AddToPublish(1, 9); + PublishDeferred(false); + if (Model.IsSnapshotProcessing) + { + AssertReceivedEventCount(6); + AssertEvent(7, 1, 0); + AssertEvent(6, 2, 0); + AssertEvent(4, 4, 0); + AssertEvent(3, 5, 0); + AssertEvent(2, 7, 0); + AssertEvent(1, 9, 0); + } + else + { + AssertReceivedEventCount(10); + AssertEvent(7, 1, SnapshotBegin); + AssertEvent(6, 2, 0); + AssertEvent(5, 3, RemoveEvent); + AssertEvent(4, 4, 0); + AssertEvent(3, 5, 0); + AssertEvent(2, 6, TxPending); + AssertEvent(2, 7, 0); + AssertEvent(1, 8, 0); + AssertEvent(0, double.NaN, SnapshotEnd | TxPending | RemoveEvent); + AssertEvent(1, 9, 0); + } + } + + [Test, TestCaseSource(nameof(Params))] + public void TestCloseAbruptly(bool isBatchProcessing, bool isSnapshotProcessing) + { + Model = Builder() + .WithBatchProcessing(isBatchProcessing) + .WithSnapshotProcessing(isSnapshotProcessing) + .Build(); + + AddToPublish(0, 12.34, SnapshotBegin | SnapshotEnd); + PublishDeferred(true); + AssertIsChanged(true); + AssertIsSnapshot(true); + + Model.Dispose(); + + AddToPublish(2, 56.78, 0); // emulate stale events processing + PublishDeferred(true); + AssertIsChanged(false); // no change after close + } + + + protected DXFeed Feed { get; private set; } + protected DXPublisher Publisher { get; private set; } + protected TM? Model { get; set; } + protected Queue ReceivedEvents { get; } = new(); + + protected void AssertIsChanged(bool isChanged) + { + Assert.That(isChanged ? ListenerNotificationCounter > 0 : ListenerNotificationCounter == 0); + ListenerNotificationCounter = 0; + } + + protected void AssertSnapshotNotification(int count) + { + Assert.That(count, Is.EqualTo(SnapshotNotificationCounter)); + SnapshotNotificationCounter = 0; + } + + protected void AssertReceivedEventCount(int count) => + Assert.That(count, Is.EqualTo(ReceivedEvents.Count)); + + protected abstract void AssertEvent(int index, double size, int eventFlags); + + protected abstract TE CreateEvent(int index, double size, int eventFlags); + + protected abstract TB Builder(); + + private void AssertIsSnapshot(bool isSnapshot) + { + Assert.That(isSnapshot ? SnapshotNotificationCounter > 0 : SnapshotNotificationCounter == 0); + SnapshotNotificationCounter = 0; + } + + private void AssertListenerNotification(int count) + { + Assert.That(count, Is.EqualTo(ListenerNotificationCounter)); + ListenerNotificationCounter = 0; + } + + private void AddToPublish(int index, double size) => + AddToPublish(index, size, 0); + + private void AddToPublish(int index, double size, int eventFlags) + { + var e = CreateEvent(index, size, eventFlags); + PublishedEvents.Add(e); + } + + private void PublishDeferred(bool withPublisher) + { + if (withPublisher) + { + Publisher.PublishEvents(PublishedEvents); + Executor.ProcessAllPendingTasks(); + } + else + { + Model?.ProcessEvents(PublishedEvents); + } + + PublishedEvents.Clear(); + } +} diff --git a/tests/DxFeed.Graal.Net.Tests/Models/IndexedTxModelTest.cs b/tests/DxFeed.Graal.Net.Tests/Models/IndexedTxModelTest.cs new file mode 100644 index 00000000..fb384db1 --- /dev/null +++ b/tests/DxFeed.Graal.Net.Tests/Models/IndexedTxModelTest.cs @@ -0,0 +1,172 @@ +// +// Copyright © 2024 Devexperts LLC. All rights reserved. +// This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/. +// + +using DxFeed.Graal.Net.Api; +using DxFeed.Graal.Net.Events; +using DxFeed.Graal.Net.Events.Market; +using DxFeed.Graal.Net.Models; + +namespace DxFeed.Graal.Net.Tests.Models; + +[TestFixture] +public class IndexedTxModelTest : AbstractTxModelTest, IndexedTxModel.Builder> +{ + [Test] + public void TestInitialState() + { + var builder = IndexedTxModel.NewBuilder(); + Assert.Throws(() => builder.Build()); // if the symbol and feed is not set + + Model = builder.WithFeed(DXFeed.GetInstance()).WithSymbol(TestSymbol).Build(); + Assert.Multiple(() => + { + Assert.That(Model.IsBatchProcessing, Is.True); + Assert.That(Model.IsSnapshotProcessing, Is.False); + Assert.That(Model.GetSources(), Is.Empty); + }); + } + + [Test] + public void TestChangeSource() + { + var dexOrder = CreateOrder(OrderSource.DEX, 1); + var ntvOrder = CreateOrder(OrderSource.ntv, 2); + var ntvOrderUpperCase = CreateOrder(OrderSource.NTV, 3); + + Model = Builder() + .WithSources(OrderSource.AGGREGATE_ASK, OrderSource.AGGREGATE_BID) // add two sources + .WithSources(OrderSource.ntv, OrderSource.NTV) // override previous sources + .Build(); + + var sources = Model.GetSources(); + Assert.Multiple(() => + { + Assert.That(sources, Has.Count.EqualTo(2)); + Assert.That(!sources.Except(new HashSet { OrderSource.ntv, OrderSource.NTV }).Any()); + }); + + Publish(dexOrder); // publish an unsubscribed source + AssertIsChanged(false); + + Publish(ntvOrder, ntvOrderUpperCase, dexOrder); // publish two subscribed and unsubscribed sources + AssertIsChanged(true); + AssertSnapshotNotification(2); + AssertReceivedEventCount(2); + + Model.SetSources(OrderSource.DEX); // change source + sources = Model.GetSources(); + Assert.Multiple(() => + { + Assert.That(sources, Has.Count.EqualTo(1)); + Assert.That(sources, Does.Contain(OrderSource.DEX)); + }); + + Publish(dexOrder); // publish a subscribed source + AssertIsChanged(true); + AssertSnapshotNotification(1); + AssertReceivedEventCount(3); + } + + [Test] + public void TestEmptySource() + { + var dexOrder = IndexedTxModelTest.CreateOrder(OrderSource.DEX, 1); + var ntvOrder = IndexedTxModelTest.CreateOrder(OrderSource.ntv, 2); + var ntvOrderUpperCase = IndexedTxModelTest.CreateOrder(OrderSource.NTV, 3); + + Model = Builder().Build(); + var sources = Model.GetSources(); + Assert.That(sources, Is.Empty); + + Publish(ntvOrder, ntvOrderUpperCase, dexOrder); // empty sources means subscribing to all available sources + AssertIsChanged(true); + AssertSnapshotNotification(3); + AssertReceivedEventCount(3); + } + + /// + /// Asserts the properties of an event. + /// + /// Index of the event. + /// Size of the event. + /// Flags associated with the event. + protected override void AssertEvent(int index, double size, int eventFlags) + { + var order = ReceivedEvents.Dequeue(); + Assert.Multiple(() => + { + Assert.That(order.EventSymbol, Is.EqualTo(TestSymbol)); + Assert.That(index, Is.EqualTo(order.Index)); + Assert.That(size, Is.EqualTo(order.Size)); + Assert.That(eventFlags, Is.EqualTo(order.EventFlags)); + }); + } + + /// + /// Creates an event. + /// + /// Index of the event. + /// Size of the event. + /// Flags associated with the event. + /// A new instance of . + protected override Order CreateEvent(int index, double size, int eventFlags) => + new(TestSymbol) + { + Index = index, + EventSource = OrderSource.DEFAULT, + Size = size, + EventFlags = eventFlags, + OrderSide = Side.Buy + }; + + /// + /// Creates a builder for the . + /// + /// A new instance of . + protected override IndexedTxModel.Builder Builder() => + IndexedTxModel.NewBuilder() + .WithFeed(Feed) + .WithSymbol(TestSymbol) + .WithListener((_, events, isSnapshot) => + { + ++ListenerNotificationCounter; + if (isSnapshot) + { + ++SnapshotNotificationCounter; + } + + foreach (var e in events) + { + ReceivedEvents.Enqueue(e); + } + }); + + /// + /// Creates an order with the specified source and size. + /// + /// Source of the order. + /// Size of the order. + /// A new instance of . + private static Order CreateOrder(OrderSource source, double size) => + new(TestSymbol) + { + Index = 0, + EventSource = source, + Size = size, + EventFlags = EventFlags.SnapshotBegin | EventFlags.SnapshotEnd, + OrderSide = Side.Buy + }; + + /// + /// Publishes the specified orders. + /// + /// Orders to publish. + private void Publish(params IEventType[] orders) + { + Publisher.PublishEvents(orders); + Executor.ProcessAllPendingTasks(); + } +} diff --git a/tests/DxFeed.Graal.Net.Tests/Models/MarketDepthModelTest.cs b/tests/DxFeed.Graal.Net.Tests/Models/MarketDepthModelTest.cs new file mode 100644 index 00000000..7b0e33b7 --- /dev/null +++ b/tests/DxFeed.Graal.Net.Tests/Models/MarketDepthModelTest.cs @@ -0,0 +1,434 @@ +// +// Copyright © 2024 Devexperts LLC. All rights reserved. +// This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/. +// + +using DxFeed.Graal.Net.Api; +using DxFeed.Graal.Net.Events; +using DxFeed.Graal.Net.Events.Market; +using DxFeed.Graal.Net.Models; +using DxFeed.Graal.Net.Native.Executors; + +// ReSharper disable PossibleMultipleEnumeration + +namespace DxFeed.Graal.Net.Tests.Models; + +[TestFixture] +public class MarketDepthModelTest +{ + private const string Symbol = "INDEX-TEST"; + + private readonly InPlaceExecutor _executor = InPlaceExecutor.Create(); + private readonly List _publishedEvents = new(); + private readonly List _buyOrders = new(); + private readonly List _sellOrders = new(); + private DXEndpoint _endpoint; + private DXFeed _feed; + private DXPublisher _publisher; + private MarketDepthModel _model; + + private int _listenerCalls; + private int _changesBuy; + private int _changesSell; + + [SetUp] + public void SetUp() + { + _endpoint = DXEndpoint.Create(DXEndpoint.Role.LocalHub); + _endpoint.Executor(_executor); + _feed = _endpoint.GetFeed(); + _publisher = _endpoint.GetPublisher(); + _model = CreateBuilder().Build(); + } + + [TearDown] + public void TearDown() + { + _listenerCalls = 0; + _buyOrders.Clear(); + _publishedEvents.Clear(); + _sellOrders.Clear(); + _model.Dispose(); + _endpoint.Dispose(); + } + + + [OneTimeTearDown] + public void OneTimeTearDown() => + _executor.Dispose(); + + [Test] + public void TestRemoveNonExistent() => + PublishAndProcess(false, CreateOrder(0, Side.Buy, 1, 1, EventFlags.RemoveEvent)); + + [Test] + public void TestRemoveBySizeAndByFlags() + { + var o1 = CreateOrder(2, Side.Buy, 3, 1, 0); + var o2 = CreateOrder(1, Side.Buy, 2, 1, 0); + var o3 = CreateOrder(0, Side.Buy, 1, 1, EventFlags.SnapshotBegin | EventFlags.SnapshotEnd); + + PublishAndProcess(true, o3, o2, o1); + CheckBuySize(3); + CheckSellSize(0); + CheckOrder(Side.Buy, o1, 0); + CheckOrder(Side.Buy, o2, 1); + CheckOrder(Side.Buy, o3, 2); + + PublishAndProcess(true, CreateOrder(2, Side.Buy, 1, double.NaN, 0)); + CheckBuySize(2); + CheckSellSize(0); + CheckOrder(Side.Buy, o2, 0); + + PublishAndProcess(true, CreateOrder(1, Side.Buy, 1, 1, EventFlags.RemoveEvent)); + CheckBuySize(1); + CheckSellSize(0); + CheckOrder(Side.Buy, o3, 0); + + PublishAndProcess(true, CreateOrder(0, Side.Buy, 1, 1, EventFlags.SnapshotEnd | EventFlags.RemoveEvent)); + CheckBookSize(0); + CheckBuySize(0); + CheckSellSize(0); + } + + [Test] + public void TestOrderChangeSide() + { + var buy = CreateOrder(0, Side.Buy, 1, 1, EventFlags.SnapshotBegin | EventFlags.SnapshotEnd); + PublishAndProcess(true, buy); + CheckBookSize(1); + CheckBuySize(1); + CheckOrder(Side.Buy, buy, 0); + + var sell = CreateOrder(0, Side.Sell, 1, 1, 0); + PublishAndProcess(true, sell); + CheckBookSize(1); + CheckSellSize(1); + CheckOrder(Side.Sell, sell, 0); + } + + [Test] + public void TestOrderPriorityAfterUpdate() + { + var b1 = CreateOrder(0, Side.Buy, 100, 1, EventFlags.SnapshotBegin | EventFlags.SnapshotEnd); + var b2 = CreateOrder(1, Side.Buy, 150, 1, 0); + var s1 = CreateOrder(3, Side.Sell, 150, 1, 0); + var s2 = CreateOrder(2, Side.Sell, 100, 1, 0); + + PublishAndProcess(b1); + CheckOrder(Side.Buy, b1, 0); + PublishAndProcess(b2); + CheckOrder(Side.Buy, b2, 0); + CheckOrder(Side.Buy, b1, 1); + + PublishAndProcess(s1); + CheckOrder(Side.Sell, s1, 0); + PublishAndProcess(s2); + CheckOrder(Side.Sell, s2, 0); + CheckOrder(Side.Sell, s1, 1); + } + + [Test] + public void TestMultipleUpdatesWithMixedSides() + { + var buyLowPrice = CreateOrder(0, Side.Buy, 100, 1, EventFlags.SnapshotBegin | EventFlags.SnapshotEnd); + var buyHighPrice = CreateOrder(1, Side.Buy, 200, 1, 0); + var sellLowPrice = CreateOrder(2, Side.Sell, 150, 1, 0); + var sellHighPrice = CreateOrder(3, Side.Sell, 250, 1, 0); + + PublishAndProcess(buyLowPrice, sellHighPrice, buyHighPrice, sellLowPrice); + CheckBookSize(4); + + CheckOrder(Side.Buy, buyHighPrice, 0); + CheckOrder(Side.Sell, sellLowPrice, 0); + } + + [Test] + public void TestDuplicateOrderIndexUpdatesExistingOrder() + { + var originalIndexOrder = CreateOrder(0, Side.Buy, 100, 1, EventFlags.SnapshotBegin | EventFlags.SnapshotEnd); + var duplicateIndexOrder = CreateOrder(0, Side.Buy, 150, 1, 0); + + PublishAndProcess(originalIndexOrder, duplicateIndexOrder); + CheckBookSize(1); + CheckOrder(Side.Buy, duplicateIndexOrder, 0); + } + + [Test] + public void TestEnforceEntryLimit() + { + _model.Dispose(); + _model = CreateBuilder().WithDepthLimit(3).Build(); + PublishAndProcess(true, CreateOrder(0, Side.Buy, 5, 1, EventFlags.SnapshotBegin | EventFlags.SnapshotEnd)); + PublishAndProcess(true, CreateOrder(1, Side.Buy, 4, 1, 0)); + PublishAndProcess(true, CreateOrder(2, Side.Buy, 3, 1, 0)); + + PublishAndProcess(false, CreateOrder(3, Side.Buy, 2, 1, 0)); // outside limit + PublishAndProcess(false, CreateOrder(4, Side.Buy, 1, 1, 0)); // outside limit + PublishAndProcess(false, CreateOrder(4, Side.Buy, 1, 2, 0)); // modify outside limit + PublishAndProcess(false, CreateOrder(3, Side.Buy, 1, double.NaN, 0)); // remove outside limit + PublishAndProcess(true, CreateOrder(2, Side.Buy, 3, 2, 0)); // update in limit + PublishAndProcess(true, CreateOrder(1, Side.Buy, 3, double.NaN, 0)); // remove in limit + + PublishAndProcess(true, CreateOrder(4, Side.Sell, 1, 1, 0)); + PublishAndProcess(true, CreateOrder(5, Side.Sell, 2, 1, 0)); + PublishAndProcess(true, CreateOrder(6, Side.Sell, 3, 1, 0)); + + PublishAndProcess(false, CreateOrder(7, Side.Sell, 4, 1, 0)); // outside limit + CheckChanged(false); + PublishAndProcess(false, CreateOrder(8, Side.Sell, 5, 1, 0)); // outside limit + CheckChanged(false); + PublishAndProcess(false, CreateOrder(8, Side.Sell, 5, 2, 0)); // modify outside limit + CheckChanged(false); + PublishAndProcess(false, CreateOrder(8, Side.Sell, 5, double.NaN, 0)); // remove outside limit + CheckChanged(false); + PublishAndProcess(true, CreateOrder(6, Side.Sell, 4, 2, 0)); // update in limit + PublishAndProcess(true, CreateOrder(5, Side.Sell, 2, double.NaN, 0)); // remove in limit + + _model.SetDepthLimit(0); // disable limit + PublishAndProcess(true, CreateOrder(4, Side.Buy, 1, 3, 0)); + PublishAndProcess(true, CreateOrder(8, Side.Sell, 1, 3, 0)); + + _model.SetDepthLimit(1); + PublishAndProcess(true, CreateOrder(0, Side.Buy, 2, 1, 0)); + PublishAndProcess(false, CreateOrder(1, Side.Buy, 2, 1, 0)); + } + + [Test] + public void TestStressBuySellOrders() + { + PublishAndProcess(false, + CreateOrder(0, Side.Buy, double.NaN, double.NaN, + EventFlags.SnapshotBegin | EventFlags.SnapshotEnd | EventFlags.RemoveEvent)); + _listenerCalls = 0; + _buyOrders.Clear(); + _publishedEvents.Clear(); + _sellOrders.Clear(); + _listenerCalls = 0; + _changesBuy = 0; + _changesSell = 0; + + var rnd = new Random(1); + const int bookSize = 100; + var book = new Order[bookSize]; + var expectedBuy = 0; + var expectedSell = 0; + for (var i = 0; i < 10000; i++) + { + var index = rnd.Next(bookSize); + var order = CreateOrder(Scope.Order, rnd.Next(2) != 0 ? Side.Buy : Side.Sell, index, rnd.Next(10), '\0', + null); + var old = book[index]; + book[index] = order; + var deltaBuy = OneIfBuy(order) - OneIfBuy(old); + var deltaSell = OneIfSell(order) - OneIfSell(old); + expectedBuy += deltaBuy; + expectedSell += deltaSell; + PublishAndProcess(order); + switch (order.OrderSide) + { + case Side.Buy: + CheckChangesBuy(deltaBuy != 0 || (!Same(order, old) && old.OrderSide == Side.Buy) ? 1 : 0); + CheckChangesSell(OneIfSell(old)); + break; + case Side.Sell: + CheckChangesSell(deltaSell != 0 || (!Same(order, old) && old.OrderSide == Side.Sell) ? 1 : 0); + CheckChangesBuy(OneIfBuy(old)); + break; + default: + Assert.Fail(); + break; + } + + Assert.Multiple(() => + { + Assert.That(_buyOrders, Has.Count.EqualTo(expectedBuy)); + Assert.That(_sellOrders, Has.Count.EqualTo(expectedSell)); + }); + } + + var snapshotOrder = CreateOrder(Scope.Order, Side.Undefined, 0, 0, '\0', null); + snapshotOrder.EventFlags = EventFlags.SnapshotBegin | EventFlags.SnapshotEnd | EventFlags.RemoveEvent; + _publisher.PublishEvents(new List { snapshotOrder }); + _executor.ProcessAllPendingTasks(); + CheckChangesBuy(expectedBuy > 0 ? 1 : 0); + CheckChangesSell(expectedSell > 0 ? 1 : 0); + Assert.Multiple(() => + { + Assert.That(_buyOrders, Is.Empty); + Assert.That(_sellOrders, Is.Empty); + }); + } + + [Test] + public void TestNegativeAggregationPeriod() + { + _model = CreateBuilder().WithAggregationPeriod(-1).Build(); + Assert.That(_model.GetAggregationPeriod(), Is.EqualTo(0)); + _model.SetAggregationPeriod(0); + Assert.That(_model.GetAggregationPeriod(), Is.EqualTo(0)); + _model.SetAggregationPeriod(-1); + Assert.That(_model.GetAggregationPeriod(), Is.EqualTo(0)); + } + + [Test] + public void TestNegativeDepthLimit() + { + _model = CreateBuilder().WithDepthLimit(-1).Build(); + Assert.That(_model.GetDepthLimit(), Is.EqualTo(0)); + _model.SetDepthLimit(0); + Assert.That(_model.GetDepthLimit(), Is.EqualTo(0)); + _model.SetDepthLimit(int.MaxValue); + Assert.That(_model.GetDepthLimit(), Is.EqualTo(int.MaxValue)); + _model.SetDepthLimit(-1); + Assert.That(_model.GetDepthLimit(), Is.EqualTo(0)); + _model.SetDepthLimit(int.MinValue); + Assert.That(_model.GetDepthLimit(), Is.EqualTo(0)); + } + + private void CheckChanged(bool expected) + { + Assert.That(expected ? _listenerCalls > 0 : _listenerCalls == 0); + _listenerCalls = 0; + } + + private void CheckBookSize(int size) => + Assert.That(_buyOrders.Count + _sellOrders.Count, Is.EqualTo(size)); + + private void CheckBuySize(int size) => + Assert.That(_buyOrders, Has.Count.EqualTo(size)); + + private void CheckSellSize(int size) => + Assert.That(_sellOrders, Has.Count.EqualTo(size)); + + private void CheckChangesBuy(int n) + { + Assert.That(_changesBuy, Is.EqualTo(n)); + _changesBuy = 0; + } + + private void CheckChangesSell(int n) + { + Assert.That(_changesSell, Is.EqualTo(n)); + _changesSell = 0; + } + + private void CheckOrder(Side side, Order order, int pos) + { + var orders = side == Side.Buy ? _buyOrders : _sellOrders; + Assert.Multiple(() => + { + Assert.That(orders, Has.Count.GreaterThan(pos)); + Assert.That(Same(order, orders[pos])); + }); + } + + private void Publish(Order order) => + _publishedEvents.Add(order); + + private void Publish(params Order[] orders) => + _publishedEvents.AddRange(orders); + + private void Process() + { + _publisher.PublishEvents(_publishedEvents); + _executor.ProcessAllPendingTasks(); + _publishedEvents.Clear(); + } + + private void PublishAndProcess(Order order) + { + Publish(order); + Process(); + } + + private void PublishAndProcess(bool expected, Order order) + { + Publish(order); + Process(); + CheckChanged(expected); + } + + private void PublishAndProcess(params Order[] orders) + { + Publish(orders); + Process(); + } + + private void PublishAndProcess(bool expected, params Order[] orders) + { + Publish(orders); + Process(); + CheckChanged(expected); + } + + private static int OneIfBuy(Order? order) => + order is { OrderSide: Side.Buy } and not { Size: 0 } ? 1 : 0; + + private static int OneIfSell(Order? order) => + order is { OrderSide: Side.Sell } and not { Size: 0 } ? 1 : 0; + + private static bool Same(Order order, Order old) + { + if (order is { Size: 0 }) + { + return true; // order with zero size is the same as null (missing) + } + + // Check just relevant attributes + return order.Scope == old.Scope && + order.OrderSide == old.OrderSide && + order.Index == old.Index && + order.Size.Equals(old.Size) && + Equals(order.EventSource, old.EventSource); + } + + private static Order CreateOrder(int index, Side side, double price, double size, int eventFlags) => + new(Symbol) + { + Index = index, + Price = price, + Size = size, + EventFlags = eventFlags, + OrderSide = side + }; + + private static Order CreateOrder(Scope scope, Side side, long index, int value, char exchange, string? mmid) + { + var order = new Order(Symbol); + order.Scope = scope; + order.Index = index; + order.OrderSide = side; + order.Price = value * 10; + order.Size = value; + order.ExchangeCode = exchange; + order.MarketMaker = mmid; + return order; + } + + private MarketDepthModel.Builder CreateBuilder() => + MarketDepthModel.NewBuilder() + .WithFeed(_feed) + .WithSymbol(Symbol) + .WithSources(OrderSource.DEFAULT) + .WithListener((buyOrders, sellOrders) => + { + _listenerCalls++; + if (!_buyOrders.SequenceEqual(buyOrders)) + { + _changesBuy++; + } + + _buyOrders.Clear(); + _buyOrders.AddRange(buyOrders); + + if (!_sellOrders.SequenceEqual(sellOrders)) + { + _changesSell++; + } + + _sellOrders.Clear(); + _sellOrders.AddRange(sellOrders); + }); +}