diff --git a/.editorconfig b/.editorconfig index 876b864..c519751 100644 --- a/.editorconfig +++ b/.editorconfig @@ -201,6 +201,8 @@ dotnet_diagnostic.CA1861.severity = none dotnet_diagnostic.CA1846.severity = none # CA1845 : Use span-based 'string.Concat' and 'AsSpan' instead of 'Substring' dotnet_diagnostic.CA1845.severity = none +dotnet_diagnostic.cs0693.severity=none +dotnet_diagnostic.CA1000.severity=none ########################################## # Formatting Rules # https://docs.microsoft.com/dotnet/fundamentals/code-analysis/style-rules/formatting-rules diff --git a/dxfeed-graal-net-api.sln b/dxfeed-graal-net-api.sln index 31e278d..e9c85a5 100644 --- a/dxfeed-graal-net-api.sln +++ b/dxfeed-graal-net-api.sln @@ -40,6 +40,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CandleDataResponseReader", EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MarketDepthModelSample", "samples\MarketDepthModelSample\MarketDepthModelSample.csproj", "{930B1039-B76C-42C5-AD0F-9FA1A1FC9D84}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CandleChartSample", "samples\CandleChartSample\CandleChartSample.csproj", "{B74E8A86-1AB7-4B36-AED3-292CDD95BF90}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -118,6 +120,10 @@ Global {930B1039-B76C-42C5-AD0F-9FA1A1FC9D84}.Debug|Any CPU.Build.0 = Debug|Any CPU {930B1039-B76C-42C5-AD0F-9FA1A1FC9D84}.Release|Any CPU.ActiveCfg = Release|Any CPU {930B1039-B76C-42C5-AD0F-9FA1A1FC9D84}.Release|Any CPU.Build.0 = Release|Any CPU + {B74E8A86-1AB7-4B36-AED3-292CDD95BF90}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {B74E8A86-1AB7-4B36-AED3-292CDD95BF90}.Debug|Any CPU.Build.0 = Debug|Any CPU + {B74E8A86-1AB7-4B36-AED3-292CDD95BF90}.Release|Any CPU.ActiveCfg = Release|Any CPU + {B74E8A86-1AB7-4B36-AED3-292CDD95BF90}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(NestedProjects) = preSolution {73597E04-D8A8-4991-A759-7F886CBE2A8F} = {C4490D74-2970-4A1B-8178-A724A06B140A} @@ -136,5 +142,6 @@ Global {B9088D14-10F6-4D88-876D-062B9F6494AB} = {C4490D74-2970-4A1B-8178-A724A06B140A} {2567935E-FEFB-470A-BF17-7A883735C4BF} = {C4490D74-2970-4A1B-8178-A724A06B140A} {930B1039-B76C-42C5-AD0F-9FA1A1FC9D84} = {C4490D74-2970-4A1B-8178-A724A06B140A} + {B74E8A86-1AB7-4B36-AED3-292CDD95BF90} = {C4490D74-2970-4A1B-8178-A724A06B140A} EndGlobalSection EndGlobal diff --git a/dxfeed-graal-net-api.sln.DotSettings b/dxfeed-graal-net-api.sln.DotSettings index 9b7649e..f003618 100644 --- a/dxfeed-graal-net-api.sln.DotSettings +++ b/dxfeed-graal-net-api.sln.DotSettings @@ -30,6 +30,7 @@ True True True + True True True True diff --git a/samples/CandleChartSample/App.axaml b/samples/CandleChartSample/App.axaml new file mode 100644 index 0000000..f3ebcd9 --- /dev/null +++ b/samples/CandleChartSample/App.axaml @@ -0,0 +1,10 @@ + + + + + + + \ No newline at end of file diff --git a/samples/CandleChartSample/App.axaml.cs b/samples/CandleChartSample/App.axaml.cs new file mode 100644 index 0000000..ed8ad14 --- /dev/null +++ b/samples/CandleChartSample/App.axaml.cs @@ -0,0 +1,23 @@ +using Avalonia; +using Avalonia.Controls.ApplicationLifetimes; +using Avalonia.Markup.Xaml; + +namespace CandleChartSample; + +public partial class App : Application +{ + public override void Initialize() + { + AvaloniaXamlLoader.Load(this); + } + + public override void OnFrameworkInitializationCompleted() + { + if (ApplicationLifetime is IClassicDesktopStyleApplicationLifetime desktop) + { + desktop.MainWindow = new MainWindow(); + } + + base.OnFrameworkInitializationCompleted(); + } +} diff --git a/samples/CandleChartSample/CandleChartSample.csproj b/samples/CandleChartSample/CandleChartSample.csproj new file mode 100644 index 0000000..b499145 --- /dev/null +++ b/samples/CandleChartSample/CandleChartSample.csproj @@ -0,0 +1,26 @@ + + + + WinExe + net6.0 + enable + true + app.manifest + true + + + + + + + + + + + + + + + + + diff --git a/samples/CandleChartSample/CandleExtension.cs b/samples/CandleChartSample/CandleExtension.cs new file mode 100644 index 0000000..a9e46e1 --- /dev/null +++ b/samples/CandleChartSample/CandleExtension.cs @@ -0,0 +1,44 @@ +// +// Copyright © 2024 Devexperts LLC. All rights reserved. +// This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/. +// + +using System; +using DxFeed.Graal.Net.Events.Candles; +using ScottPlot; + +namespace CandleChartSample; + +public static class CandleExtension +{ + public static OHLC ToOHLC(this Candle candle) + { + var open = GetValueWithPriority(candle.Open, candle.High, candle.Low, candle.Close); + var high = GetValueWithPriority(candle.High, candle.Open, candle.Low, candle.Close); + var low = GetValueWithPriority(candle.Low, candle.Close, candle.Open, candle.High); + var close = GetValueWithPriority(candle.Close, candle.Low, candle.Open, candle.High); + + return new OHLC + { + Open = open, + High = high, + Low = low, + Close = close, + DateTime = DateTimeOffset.FromUnixTimeMilliseconds(candle.Time).DateTime.ToLocalTime(), + TimeSpan = TimeSpan.FromMilliseconds(candle.CandleSymbol!.Period!.PeriodIntervalMillis) + }; + } + + private static double GetValueWithPriority(params double[] values) + { + foreach (var value in values) + { + if (!double.IsNaN(value)) + { + return value; + } + } + return double.NaN; + } +} diff --git a/samples/CandleChartSample/CandleList.cs b/samples/CandleChartSample/CandleList.cs new file mode 100644 index 0000000..e742782 --- /dev/null +++ b/samples/CandleChartSample/CandleList.cs @@ -0,0 +1,111 @@ +// +// Copyright © 2024 Devexperts LLC. All rights reserved. +// This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/. +// + +using System; +using System.Collections.Generic; +using System.Linq; +using DxFeed.Graal.Net.Events; +using DxFeed.Graal.Net.Events.Candles; +using ScottPlot; + +namespace CandleChartSample; + +public class CandleList : List +{ + public void Update(IEnumerable candles, bool isSnapshot) + { + var sortedCandles = candles.OrderBy(c => c.Index); + + if (isSnapshot) + { + UpdateSnapshot(sortedCandles); + } + else + { + UpdateIncremental(sortedCandles); + } + } + + private void UpdateSnapshot(IEnumerable candles) + { + Clear(); + foreach (var candle in candles) + { + if (!ShouldRemove(candle)) + { + Add(candle.ToOHLC()); + } + } + } + + private void UpdateIncremental(IEnumerable candles) + { + foreach (var candle in candles) + { + if (ShouldRemove(candle)) + { + RemoveCandle(candle); + continue; + } + + var ohlc = candle.ToOHLC(); + var lastOhlc = LastOrDefault(); + switch (DateTime.Compare(ohlc.DateTime, lastOhlc.DateTime)) + { + case < 0: + InsertOrUpdate(ohlc); + break; + case 0: + AddOrUpdateLast(ohlc); + break; + case > 0: + Add(ohlc); + break; + } + } + } + + private void AddOrUpdateLast(OHLC ohlc) + { + if (IsEmpty()) + { + Add(ohlc); + } + else + { + this[Count - 1] = ohlc; + } + } + + private void InsertOrUpdate(OHLC ohlc) + { + var index = FindIndex(o => DateTime.Compare(ohlc.DateTime, o.DateTime) <= 0); + if (index >= 0 && this[index].DateTime.Equals(ohlc.DateTime)) + { + this[index] = ohlc; + } + else + { + Insert(index >= 0 ? index : 0, ohlc); + } + } + + private void RemoveCandle(Candle candle) => + RemoveAll(ohlc => candle.ToOHLC().DateTime.Equals(ohlc.DateTime)); + + private bool IsEmpty() => + Count == 0; + + private OHLC LastOrDefault() => + IsEmpty() ? new OHLC() : this[Count - 1]; + + private static bool ShouldRemove(Candle candle) => + EventFlags.IsRemove(candle) || + (double.IsNaN(candle.Open) && + double.IsNaN(candle.High) && + double.IsNaN(candle.Low) && + double.IsNaN(candle.Close)); +} diff --git a/samples/CandleChartSample/MainWindow.axaml b/samples/CandleChartSample/MainWindow.axaml new file mode 100644 index 0000000..d7d5003 --- /dev/null +++ b/samples/CandleChartSample/MainWindow.axaml @@ -0,0 +1,45 @@ + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/samples/CandleChartSample/MainWindow.axaml.cs b/samples/CandleChartSample/MainWindow.axaml.cs new file mode 100644 index 0000000..2501077 --- /dev/null +++ b/samples/CandleChartSample/MainWindow.axaml.cs @@ -0,0 +1,135 @@ +using System; +using System.Collections.Generic; +using System.Globalization; +using Avalonia.Controls; +using Avalonia.Input; +using Avalonia.Interactivity; +using Avalonia.Threading; +using DxFeed.Graal.Net; +using DxFeed.Graal.Net.Api; +using DxFeed.Graal.Net.Events; +using DxFeed.Graal.Net.Events.Candles; +using DxFeed.Graal.Net.Models; +using DxFeed.Graal.Net.Utils; + +namespace CandleChartSample; + +public partial class MainWindow : Window +{ + private readonly CandleList _candles = new(); + private readonly TimeSeriesTxModel.Builder _modelBuilder = new(); + private TimeSeriesTxModel? _model; + private string _symbol = string.Empty; + + public MainWindow() + { + SystemProperty.SetProperty("dxfeed.address", "demo.dxfeed.com:7300"); + InitializeComponent(); + InitializePlot(); + InitializeModelBuilder(); + HandleTextChanged(SymbolTextBox); + } + + private void InitializePlot() + { + AvaPlot.Plot.Axes.DateTimeTicksBottom(); + var candlePlot = AvaPlot.Plot.Add.Candlestick(_candles); + candlePlot.Axes.YAxis = AvaPlot.Plot.Axes.Right; + candlePlot.Axes.YAxis.Label.Text = "Price"; + FromTimeTextBox.Text = DateTimeOffset.Now.AddMonths(-6).ToString("yyyyMMdd", CultureInfo.InvariantCulture); + } + + private void InitializeModelBuilder() => + _modelBuilder + .WithFeed(DXFeed.GetInstance()) + .WithSnapshotProcessing(true) + .WithListener(OnCandleEventsReceived); + + protected override void OnClosed(EventArgs e) => + _model?.Dispose(); + + private void OnCandleEventsReceived(IndexedEventSource source, IEnumerable events, bool isSnapshot) + { + _candles.Update(events, isSnapshot); + Dispatcher.UIThread.Invoke(() => + { + if (isSnapshot) + { + AvaPlot.Plot.Axes.AutoScale(); + } + + AvaPlot.Refresh(); + }); + } + + private void OnKeyDown(object? sender, KeyEventArgs e) + { + if (e.Key == Key.Enter) + { + HandleTextChanged(sender as TextBox); + } + } + + private void OnLostFocus(object? sender, RoutedEventArgs e) => + HandleTextChanged(sender as TextBox); + + private void HandleTextChanged(TextBox? textBox) + { + if (textBox == null) + { + return; + } + + switch (textBox.Name) + { + case nameof(SymbolTextBox): + OnSymbolTextChanged(); + break; + case nameof(FromTimeTextBox): + OnFromTimeTextChanged(); + break; + } + } + + private void OnSymbolTextChanged() + { + if (_symbol.Equals(GetSymbol(), StringComparison.Ordinal)) + { + return; + } + + _symbol = GetSymbol(); + _model?.Dispose(); + _model = _modelBuilder + .WithSymbol(CandleSymbol.ValueOf(_symbol)) + .WithFromTime(GetFromTime()) + .Build(); + } + + private void OnFromTimeTextChanged() + { + if (_model == null) + { + OnSymbolTextChanged(); + } + + _model?.SetFromTime(GetFromTime()); + } + + private string GetSymbol() => + SymbolTextBox.Text ?? string.Empty; + + private long GetFromTime() + { + try + { + return string.IsNullOrWhiteSpace(FromTimeTextBox.Text) + ? 0 + : TimeFormat.Default.Parse(FromTimeTextBox.Text).ToUnixTimeMilliseconds(); + } + catch (Exception) + { + return 0; + } + } +} diff --git a/samples/CandleChartSample/Program.cs b/samples/CandleChartSample/Program.cs new file mode 100644 index 0000000..8b85caf --- /dev/null +++ b/samples/CandleChartSample/Program.cs @@ -0,0 +1,21 @@ +using Avalonia; +using System; + +namespace CandleChartSample; + +internal abstract class Program +{ + // Initialization code. Don't use any Avalonia, third-party APIs or any + // SynchronizationContext-reliant code before AppMain is called: things aren't initialized + // yet and stuff might break. + [STAThread] + public static void Main(string[] args) => BuildAvaloniaApp() + .StartWithClassicDesktopLifetime(args); + + // Avalonia configuration, don't remove; also used by visual designer. + private static AppBuilder BuildAvaloniaApp() + => AppBuilder.Configure() + .UsePlatformDetect() + .WithInterFont() + .LogToTrace(); +} diff --git a/samples/CandleChartSample/app.manifest b/samples/CandleChartSample/app.manifest new file mode 100644 index 0000000..403194d --- /dev/null +++ b/samples/CandleChartSample/app.manifest @@ -0,0 +1,18 @@ + + + + + + + + + + + + + + diff --git a/samples/MarketDepthModelSample/MainWindow.axaml.cs b/samples/MarketDepthModelSample/MainWindow.axaml.cs index 332f33f..94b3b89 100644 --- a/samples/MarketDepthModelSample/MainWindow.axaml.cs +++ b/samples/MarketDepthModelSample/MainWindow.axaml.cs @@ -16,10 +16,10 @@ namespace MarketDepthModelSample; public partial class MainWindow : Window { private readonly Orders orders = new(); - private readonly MarketDepthModel.Builder builder; + private readonly MarketDepthModel.Builder builder; private string symbol = string.Empty; private string sources = string.Empty; - private MarketDepthModel? model; + private MarketDepthModel? model; public MainWindow() { @@ -29,14 +29,14 @@ public MainWindow() BuyTable.Source = orders.BuyOrders; SellTable.Source = orders.SellOrders; - builder = MarketDepthModel.NewBuilder(typeof(Order)) + builder = MarketDepthModel.NewBuilder() .WithFeed(DXFeed.GetInstance()) .WithListener((buy, sell) => { Dispatcher.UIThread.Post(() => { - orders.UpdateBuy(buy.OfType()); - orders.UpdateSell(sell.OfType()); + orders.UpdateBuy(buy); + orders.UpdateSell(sell); }, DispatcherPriority.Normal); }); diff --git a/samples/MarketDepthModelSample/MarketDepthModelSample.csproj b/samples/MarketDepthModelSample/MarketDepthModelSample.csproj index 1b93fe4..09cddfe 100644 --- a/samples/MarketDepthModelSample/MarketDepthModelSample.csproj +++ b/samples/MarketDepthModelSample/MarketDepthModelSample.csproj @@ -2,7 +2,7 @@ Exe - net8.0 + net6.0 enable true app.manifest diff --git a/src/DxFeed.Graal.Net/Models/AbstractTxModel.cs b/src/DxFeed.Graal.Net/Models/AbstractTxModel.cs index e18ec3c..cffd73e 100644 --- a/src/DxFeed.Graal.Net/Models/AbstractTxModel.cs +++ b/src/DxFeed.Graal.Net/Models/AbstractTxModel.cs @@ -5,64 +5,85 @@ // using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; using DxFeed.Graal.Net.Api; using DxFeed.Graal.Net.Events; -using DxFeed.Graal.Net.Native.Model; namespace DxFeed.Graal.Net.Models; /// /// Abstract base class for models that handle transactions of . -/// This class manages all snapshot and transaction logic, subscription handling, and listener notifications. -/// -/// -/// This model is designed to handle incremental transactions. Users of this model only see the list +/// This model manages all snapshot and transaction logic, subscription handling, and listener notifications. +/// +///

This model is designed to handle incremental transactions. Users of this model only see the list /// of events in a consistent state. This model delays incoming events that are part of an incomplete snapshot -/// or ongoing transaction until the snapshot is complete or the transaction has ended. +/// or ongoing transaction until the snapshot is complete or the transaction has ended.

/// ///

Configuration

-/// This model must be configured using the . Specific implementations can add additional -/// configuration options as needed. +/// +///

This model must be configured using the . Specific implementations can add additional +/// configuration options as needed. This model requires a call to the method +/// (all inheritors must call this method) for subscription.

+/// +///

This model only supports single symbol subscriptions; multiple symbols cannot be configured.

/// ///

Resource management and closed models

-/// Attached model is a potential memory leak. If the pointer to attached model is lost, then there is no way +/// +///

Attached model is a potential memory leak. If the pointer to attached model is lost, then there is no way /// to detach this model from the feed and the model will not be reclaimed by the garbage collector as long as the /// corresponding feed is still used. Detached model can be reclaimed by the garbage collector, but detaching model -/// requires knowing the pointer to the feed at the place of the call, which is not always convenient. +/// requires knowing the pointer to the feed at the place of the call, which is not always convenient.

/// -/// The convenient way to detach model from the feed is to call its method. Closed model -/// becomes permanently detached from all feeds, removes all its listeners and is guaranteed to be reclaimable by -/// the garbage collector as soon as all external references to it are cleared. +///

The convenient way to detach model from the feed is to call its +/// method. +/// Closed model becomes permanently detached from all feeds, removes all its listeners and is guaranteed +/// to be reclaimable by the garbage collector as soon as all external references to it are cleared.

/// ///

Threads and locks

-/// This class is thread-safe and can be used concurrently from multiple threads without external synchronization. -///
-public abstract class AbstractTxModel +/// +///

This class is thread-safe and can be used concurrently from multiple threads without external synchronization.

+/// The corresponding to never be concurrent +/// +/// The type of indexed events processed by this model. +public abstract class AbstractTxModel : IDisposable + where TE : class, IIndexedEvent { - private protected readonly AbstractTxModelHandle handle; + private readonly ConcurrentDictionary> _processorsBySource = new(); + private readonly HashSet> _readyProcessors = new(); + private readonly DXFeedSubscription _subscription; + private readonly object _symbol; + private readonly TxModelListener? _listener; - private protected AbstractTxModel(AbstractTxModelHandle handle) => - this.handle = handle; + private protected AbstractTxModel(Builder builder) + { + _symbol = builder.Symbol ?? throw new InvalidOperationException("The 'symbol' must not be null."); + var feed = builder.Feed ?? throw new InvalidOperationException("The 'feed' must not be null."); + IsBatchProcessing = builder.IsBatchProcessing; + IsSnapshotProcessing = builder.IsSnapshotProcessing; + _listener = builder.Listener; + _subscription = feed.CreateSubscription(typeof(TE)); + _subscription.AddEventListener(ProcessEvents); + } /// - /// Gets whether batch processing is enabled. + /// Gets a value indicating whether if batch processing is enabled. /// See . /// /// /// true if batch processing is enabled; otherwise, false. /// - public bool IsBatchProcessing() => - handle.IsBatchProcessing(); + public bool IsBatchProcessing { get; } /// - /// Gets whether snapshot processing is enabled. + /// Gets a value indicating whether if snapshot processing is enabled. /// See . /// /// /// true if snapshot processing is enabled; otherwise, false. /// - public bool IsSnapshotProcessing() => - handle.IsSnapshotProcessing(); + public bool IsSnapshotProcessing { get; } /// /// Closes this model and makes it permanently detached. @@ -71,68 +92,152 @@ public bool IsSnapshotProcessing() => /// This method clears installed listeners and ensures that the model /// can be safely garbage-collected when all outside references to it are lost. /// - public void Close() => - handle.Close(); + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + /// + /// Processes a list of events, updating the relevant processors and handling batch processing. + /// + /// The list of events to process. + internal void ProcessEvents(IEnumerable events) + { + try + { + IndexedEventSource? source = null; + TxEventProcessor? currentProcessor = null; + foreach (var e in events.OfType()) + { + if (source == null || source.Id != e.EventSource.Id) + { + source = e.EventSource; + currentProcessor = GetOrCreateProcessor(source); + } + + if (currentProcessor!.ProcessEvent(e)) + { + _readyProcessors.Add(currentProcessor); + } + } + + foreach (var processor in _readyProcessors) + { + processor.ReceiveAllEventsInBatch(); + } + } + catch (Exception e) + { + // ToDo Add log entry. + Console.Error.WriteLine($"Exception in {GetType().Name} event listener: {e}"); + } + finally + { + _readyProcessors.Clear(); + } + } + + /// + /// Gets the undecorated symbol associated with the model. + /// + /// The undecorated symbol associated with the model. + protected object GetUndecoratedSymbol() => + _symbol; + + /// + /// Sets the subscription symbols for the model. + /// + /// The set of symbols to subscribe to. + protected void SetSymbols(HashSet symbols) => + _subscription.SetSymbols(symbols); + + /// + protected virtual void Dispose(bool disposing) + { + if (disposing) + { + _subscription.Dispose(); + } + } + + private TxEventProcessor GetOrCreateProcessor(IndexedEventSource source) => + _processorsBySource.GetOrAdd(source, src => new( + IsBatchProcessing, + IsSnapshotProcessing, + (transaction, isSnapshot) => + { + _listener?.Invoke(src, new List(transaction), isSnapshot); + })); /// - /// Abstract builder for building models inherited from . + /// Non-generic version, for erasing a generic type. + /// + public abstract class Builder + { + internal bool IsBatchProcessing { get; set; } + + internal bool IsSnapshotProcessing { get; set; } + + internal DXFeed? Feed { get; set; } + + internal object? Symbol { get; set; } + + internal TxModelListener? Listener { get; set; } + } + + /// + /// Abstract builder for building models inherited from . /// Specific implementations can add additional configuration options to this builder. + /// + ///

Inheritors of this class must override the abstract method to build a specific model.

///
- /// - /// Inheritors of this class must override the abstract method to build a specific model. - /// /// The type of the builder subclass. /// The type of the model subclass. - public abstract class Builder + public abstract class Builder : Builder where TB : Builder - where TM : AbstractTxModel + where TM : AbstractTxModel { /// - /// Initializes a new instance of the class with the specified event type. - /// - /// The type of events processed by the model being created. - protected Builder(Type eventType) - { - } - - /// - /// Enables or disables batch processing. This is enabled by default. - /// - /// - /// If batch processing is disabled, the model will notify the listener + /// Enables or disables batch processing. + /// This is enabled by default. + /// + ///

If batch processing is disabled, the model will notify the listener /// separately for each transaction (even if it is represented by a single event); - /// otherwise, transactions can be combined in a single listener call. + /// otherwise, transactions can be combined in a single listener call.

/// - ///
A transaction may represent either a snapshot or update events that are received after a snapshot. + ///

A transaction may represent either a snapshot or update events that are received after a snapshot. /// Whether this flag is set or not, the model will always notify listeners that a snapshot has been received /// and will not combine multiple snapshots or a snapshot with another transaction - /// into a single listener notification. - /// + /// into a single listener notification.

+ /// /// true to enable batch processing; false otherwise. /// The builder instance. public TB WithBatchProcessing(bool isBatchProcessing) { + IsBatchProcessing = isBatchProcessing; return (TB)this; } /// - /// Enables or disables snapshot processing. This is disabled by default. - /// - /// - /// If snapshot processing is enabled, transactions representing a snapshot will be processed as follows: + /// Enables or disables snapshot processing. + /// This is disabled by default. + /// + ///

If snapshot processing is enabled, transactions representing a snapshot will be processed as follows: /// events that are marked for removal will be removed, repeated indexes will be merged, and /// event flags of events are set to zero; otherwise, the user will see the snapshot in raw form, - /// with possible repeated indexes, events marked for removal, and event flags unchanged. + /// with possible repeated indexes, events marked for removal, and event flags unchanged.

/// - /// Whether this flag is set or not, in transactions that are not a snapshot, events that are marked + ///

Whether this flag is set or not, in transactions that are not a snapshot, events that are marked /// for removal will not be removed, repeated indexes will not be merged, and /// event flags of events will not be changed. - /// This flag only affects the processing of transactions that are a snapshot. - /// + /// This flag only affects the processing of transactions that are a snapshot.

+ /// /// true to enable snapshot processing; false otherwise. /// The builder instance. public TB WithSnapshotProcessing(bool isSnapshotProcessing) { + IsSnapshotProcessing = isSnapshotProcessing; return (TB)this; } @@ -144,6 +249,7 @@ public TB WithSnapshotProcessing(bool isSnapshotProcessing) /// The builder instance. public TB WithFeed(DXFeed feed) { + Feed = feed; return (TB)this; } @@ -155,6 +261,7 @@ public TB WithFeed(DXFeed feed) /// The builder instance. public TB WithSymbol(object symbol) { + Symbol = symbol; return (TB)this; } @@ -164,8 +271,9 @@ public TB WithSymbol(object symbol) /// /// The transaction listener. /// The builder instance. - public TB WithListener(TxModelListener listener) + public TB WithListener(TxModelListener listener) { + Listener = listener; return (TB)this; } diff --git a/src/DxFeed.Graal.Net/Models/IndexedTxModel.cs b/src/DxFeed.Graal.Net/Models/IndexedTxModel.cs index 4010b4d..43a6100 100644 --- a/src/DxFeed.Graal.Net/Models/IndexedTxModel.cs +++ b/src/DxFeed.Graal.Net/Models/IndexedTxModel.cs @@ -4,79 +4,86 @@ // If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/. // -using System; using System.Collections.Generic; using DxFeed.Graal.Net.Api; +using DxFeed.Graal.Net.Api.Osub; using DxFeed.Graal.Net.Events; -using DxFeed.Graal.Net.Native.Model; namespace DxFeed.Graal.Net.Models; /// /// An incremental model for indexed events. /// This model manages all snapshot and transaction logic, subscription handling, and listener notifications. -/// -/// -/// -/// This model is designed to handle incremental transactions. Users of this model only see the list +/// +///

This model is designed to handle incremental transactions. Users of this model only see the list /// of events in a consistent state. This model delays incoming events that are part of an incomplete snapshot /// or ongoing transaction until the snapshot is complete or the transaction has ended. This model notifies -/// the user of received transactions through an installed listener. -/// +/// the user of received transactions through an installed .

+/// ///

Configuration

-/// -/// This model must be configured using the class, as most configuration -/// settings cannot be changed once the model is built. This model requires configuration with -/// a symbol and -/// a sources -/// for subscription, and it must be attached to a +/// +///

This model must be configured using the class, as most configuration +/// settings cannot be changed once the model is built. This model requires configuration +/// and +/// for subscription, +/// and it must be attached to a /// instance to begin operation. -/// For ease of use, some of these configurations can be changed after the model is built, see -/// . -/// -/// -/// This model only supports single symbol subscriptions; multiple symbols cannot be configured. -/// +/// For ease of use, some of these configurations can be changed after the model is built, +/// see .

+/// +///

This model only supports single symbol subscriptions; multiple symbols cannot be configured.

+/// ///

Resource management and closed models

-/// -/// Attached model is a potential memory leak. If the pointer to attached model is lost, then there is no way +/// +///

Attached model is a potential memory leak. If the pointer to attached model is lost, then there is no way /// to detach this model from the feed and the model will not be reclaimed by the garbage collector as long as the /// corresponding feed is still used. Detached model can be reclaimed by the garbage collector, but detaching model -/// requires knowing the pointer to the feed at the place of the call, which is not always convenient. -/// -/// -/// The convenient way to detach model from the feed is to call its method. +/// requires knowing the pointer to the feed at the place of the call, which is not always convenient.

+/// +///

The convenient way to detach model from the feed is to call its +/// method. /// Closed model becomes permanently detached from all feeds, removes all its listeners and is guaranteed -/// to be reclaimable by the garbage collector as soon as all external references to it are cleared. -/// +/// to be reclaimable by the garbage collector as soon as all external references to it are cleared.

+/// ///

Threads and locks

-/// -/// This class is thread-safe and can be used concurrently from multiple threads without external synchronization. -/// -///
-public sealed class IndexedTxModel : AbstractTxModel +/// +///

This class is thread-safe and can be used concurrently from multiple threads without external synchronization.

+/// The corresponding to never be concurrent. +/// +/// The type of indexed events processed by this model. +public sealed class IndexedTxModel : AbstractTxModel + where TE : class, IIndexedEvent { - private IndexedTxModel(AbstractTxModelHandle handle) - : base(handle) + private readonly object _syncRoot = new(); + private HashSet _sources; + + private IndexedTxModel(Builder builder) + : base(builder) { + _sources = new HashSet(builder.Sources); + UpdateSubscription(GetUndecoratedSymbol(), _sources); } /// /// Factory method to create a new builder for this model. /// - /// The class type of the time series event. /// A new instance. - public static Builder NewBuilder(Type eventType) => - new(eventType); + public static Builder NewBuilder() => + new(); /// - /// Returns the current set of sources. + /// Gets the current set of sources. /// If no sources have been set, an empty set is returned, /// indicating that all possible sources have been subscribed to. /// /// The set of current sources. - public HashSet GetSources() => - ((IndexedTxModelHandle)handle).GetSources(); + public HashSet GetSources() + { + lock (_syncRoot) + { + return new HashSet(_sources); + } + } /// /// Sets the sources from which to subscribe for indexed events. @@ -85,7 +92,7 @@ public HashSet GetSources() => /// /// The specified sources. public void SetSources(params IndexedEventSource[] sources) => - ((IndexedTxModelHandle)handle).SetSources(sources); + SetSources(new HashSet(sources)); /// /// Sets the sources from which to subscribe for indexed events. @@ -93,59 +100,85 @@ public void SetSources(params IndexedEventSource[] sources) => /// If these sources have already been set, nothing happens. /// /// The specified sources. - public void SetSources(ICollection sources) => - ((IndexedTxModelHandle)handle).SetSources(sources); + public void SetSources(ICollection sources) + { + lock (_syncRoot) + { + if (_sources.SetEquals(sources)) + { + return; + } - /// - /// A builder class for creating an instance of . - /// - public class Builder : Builder + _sources = new HashSet(sources); + UpdateSubscription(GetUndecoratedSymbol(), _sources); + } + } + + private static HashSet DecorateSymbol(object symbol, HashSet sources) { - /// - /// Initializes a new instance of the class with the specified event type. - /// - /// The type of events processed by the model being created. - public Builder(Type eventType) - : base(eventType) + if (sources.Count == 0) { + return new HashSet { symbol }; } + var symbols = new HashSet(); + foreach (var source in sources) + { + symbols.Add(new IndexedEventSubscriptionSymbol(symbol, source)); + } + + return symbols; + } + + private void UpdateSubscription(object symbol, HashSet sources) => + SetSymbols(DecorateSymbol(symbol, sources)); + + /// + /// A builder class for creating an instance of . + /// + public new class Builder : Builder> + { + internal HashSet Sources { get; private set; } = new(); + /// /// Sets the sources from which to subscribe for indexed events. /// If no sources have been set, subscriptions will default to all possible sources. - /// - /// - /// The default value for this source is an empty set, + /// + ///

The default value for this source is an empty set, /// which means that this model subscribes to all available sources. /// These sources can be changed later, after the model has been created, - /// by calling . - /// + /// by calling .

+ /// /// The specified sources. - /// The builder instance. + /// this builder. public Builder WithSources(params IndexedEventSource[] sources) { + Sources = new HashSet(sources); return this; } /// /// Sets the sources from which to subscribe for indexed events. /// If no sources have been set, subscriptions will default to all possible sources. - /// - /// - /// The default value for this source is an empty set, + /// + ///

The default value for this source is an empty set, /// which means that this model subscribes to all available sources. /// These sources can be changed later, after the model has been created, - /// by calling . - /// + /// by calling .

+ /// /// The specified sources. - /// The builder instance. + /// this builder. public Builder WithSources(ICollection sources) { + Sources = new HashSet(sources); return this; } - /// - public override IndexedTxModel Build() => - throw new NotImplementedException(); + /// + /// Builds an instance of based on the provided parameters. + /// + /// The created . + public override IndexedTxModel Build() => + new(this); } } diff --git a/src/DxFeed.Graal.Net/Models/MarketDepthListener.cs b/src/DxFeed.Graal.Net/Models/MarketDepthListener.cs index c419166..fa42038 100644 --- a/src/DxFeed.Graal.Net/Models/MarketDepthListener.cs +++ b/src/DxFeed.Graal.Net/Models/MarketDepthListener.cs @@ -9,6 +9,5 @@ namespace DxFeed.Graal.Net.Models; -public delegate void MarketDepthListener( - IEnumerable buy, - IEnumerable sell); +public delegate void MarketDepthListener(IEnumerable buy, IEnumerable sell) + where TE : OrderBase; diff --git a/src/DxFeed.Graal.Net/Models/MarketDepthModel.cs b/src/DxFeed.Graal.Net/Models/MarketDepthModel.cs index 90037f6..6c5a1d3 100644 --- a/src/DxFeed.Graal.Net/Models/MarketDepthModel.cs +++ b/src/DxFeed.Graal.Net/Models/MarketDepthModel.cs @@ -10,21 +10,21 @@ using System.Threading; using System.Threading.Tasks; using DxFeed.Graal.Net.Api; -using DxFeed.Graal.Net.Api.Osub; using DxFeed.Graal.Net.Events; using DxFeed.Graal.Net.Events.Market; namespace DxFeed.Graal.Net.Models; -public sealed class MarketDepthModel : IDisposable +public sealed class MarketDepthModel : IDisposable + where TE : OrderBase { - private static readonly IComparer OrderComparator = Comparer.Create((o1, o2) => + private static readonly IComparer OrderComparator = Comparer.Create((o1, o2) => { var ind1 = o1.Scope == Scope.Order; var ind2 = o2.Scope == Scope.Order; if (ind1 && ind2) { - // Both orders are individual orders + // Both orders are individual orders. var c = o1.TimeSequence.CompareTo(o2.TimeSequence); // asc if (c != 0) { @@ -38,17 +38,17 @@ public sealed class MarketDepthModel : IDisposable { if (ind1) { - // First order is individual, second is not + // First order is individual, second is not. return 1; } if (ind2) { - // Second order is individual, first is not + // Second order is individual, first is not. return -1; } - // Both orders are non-individual orders + // Both orders are non-individual orders. var c = o2.Size.CompareTo(o1.Size); // desc if (c != 0) { @@ -87,7 +87,7 @@ public sealed class MarketDepthModel : IDisposable } }); - private static readonly IComparer BuyComparator = Comparer.Create((o1, o2) => + private static readonly IComparer BuyComparator = Comparer.Create((o1, o2) => { if (o1.Price < o2.Price) { @@ -102,7 +102,7 @@ public sealed class MarketDepthModel : IDisposable return OrderComparator.Compare(o1, o2); }); - private static readonly IComparer SellComparator = Comparer.Create((o1, o2) => + private static readonly IComparer SellComparator = Comparer.Create((o1, o2) => { if (o1.Price < o2.Price) { @@ -118,51 +118,25 @@ public sealed class MarketDepthModel : IDisposable }); private readonly object syncRoot = new(); - private readonly Dictionary ordersByIndex = new(); - private readonly OrderSet buyOrders = new(BuyComparator); - private readonly OrderSet sellOrders = new(SellComparator); - private readonly IndexedTxModel txModel; + private readonly Dictionary ordersByIndex = new(); + private readonly OrderSet buyOrders = new(BuyComparator); + private readonly OrderSet sellOrders = new(SellComparator); + private readonly IndexedTxModel txModel; + private readonly MarketDepthListener? listener; private CancellationTokenSource cts = new(); - private readonly MarketDepthListener listener; - private volatile bool taskScheduled = false; + private volatile bool taskScheduled; private Task? task; private long aggregationPeriodMillis; private int depthLimit; - private bool l = true; - private DXFeedSubscription sub; - - private MarketDepthModel( - IndexedTxModel.Builder txModelBuilder, - MarketDepthListener listener, - long aggregationPeriodMillis, - int depthLimit, - DXFeed feed, - object symbol, - DXFeedSubscription subscription, - List sources) + + private MarketDepthModel(Builder builder) { - this.depthLimit = depthLimit; - buyOrders.DepthLimit = depthLimit; - sellOrders.DepthLimit = depthLimit; - this.listener = listener; - this.aggregationPeriodMillis = aggregationPeriodMillis; - //sub = subscription; - sub = feed.CreateSubscription(typeof(Order)); - //txModel = txModelBuilder.WithListener(EventReceived).Build(); - sub.AddEventListener(events => - { - if (events.First() is Order o) - { - EventReceived(o.EventSource, events.OfType(), l); - l = false; - } - }); - if (sources.Count == 0) - sub.AddSymbols(symbol); - else - { - sub.AddSymbols(new IndexedEventSubscriptionSymbol(symbol, sources.First())); - } + this.depthLimit = builder.DepthLimit; + buyOrders.SetDepthLimit(depthLimit); + sellOrders.SetDepthLimit(depthLimit); + this.listener = builder.Listener; + this.aggregationPeriodMillis = builder.AggregationPeriodMillis; + txModel = builder.TxModelBuilder.WithListener(EventReceived).Build(); } private bool IsChanged => @@ -191,8 +165,8 @@ public void SetDepthLimit(int value) } depthLimit = value; - buyOrders.DepthLimit = value; - sellOrders.DepthLimit = value; + buyOrders.SetDepthLimit(value); + sellOrders.SetDepthLimit(value); TryCancelTask(); NotifyListeners(); } @@ -225,31 +199,35 @@ public void SetAggregationPeriod(long value) } } - public static Builder NewBuilder(Type eventType) => - new(eventType); + public void SetSources(params OrderSource[] sources) => + txModel.SetSources(new List(sources)); + public void SetSources(ICollection sources) => + txModel.SetSources(new List(sources)); - public void Dispose() - { - // TODO txModel - sub.Close(); - } + public static Builder NewBuilder() => + new(); + + public void Dispose() => + txModel.Dispose(); - private void EventReceived(IndexedEventSource source, IEnumerable events, bool isSnapshot) + private void EventReceived(IndexedEventSource source, IEnumerable events, bool isSnapshot) { lock (syncRoot) { - if (Update(source, events, isSnapshot)) + if (!Update(source, events, isSnapshot)) { - if (isSnapshot || aggregationPeriodMillis == 0) - { - TryCancelTask(); - NotifyListeners(); - } - else - { - ScheduleTaskIfNeeded(aggregationPeriodMillis); - } + return; + } + + if (isSnapshot || aggregationPeriodMillis == 0) + { + TryCancelTask(); + NotifyListeners(); + } + else + { + ScheduleTaskIfNeeded(aggregationPeriodMillis); } } } @@ -260,7 +238,7 @@ private void NotifyListeners() { try { - listener.Invoke(GetBuyOrders(), GetSellOrders()); + listener?.Invoke(GetBuyOrders(), GetSellOrders()); } finally { @@ -310,14 +288,14 @@ private bool TryCancelTask() } } - private bool Update(IndexedEventSource source, IEnumerable events, bool isSnapshot) + private bool Update(IndexedEventSource source, IEnumerable events, bool isSnapshot) { if (isSnapshot) { ClearBySource(source); } - foreach (var order in events.OfType()) + foreach (var order in events) { if (ordersByIndex.TryGetValue(order.Index, out var removed)) { @@ -335,10 +313,10 @@ private bool Update(IndexedEventSource source, IEnumerable events return IsChanged; } - private List GetBuyOrders() => + private List GetBuyOrders() => buyOrders.ToList(); - private List GetSellOrders() => + private List GetSellOrders() => sellOrders.ToList(); private void ClearBySource(IndexedEventSource source) @@ -351,65 +329,95 @@ private void ClearBySource(IndexedEventSource source) sellOrders.ClearBySource(source); } - private bool ShallAdd(OrderBase order) => + private bool ShallAdd(TE order) => order.HasSize && (order.EventFlags & EventFlags.RemoveEvent) == 0; - private OrderSet GetOrderSetForOrder(OrderBase order) => + private OrderSet GetOrderSetForOrder(TE order) => order.OrderSide == Side.Buy ? buyOrders : sellOrders; public class Builder { - private List sources; - private DXFeed feed; - private Type eventType; - private object symbol; - private DXFeedSubscription subscription; - private IndexedTxModel.Builder txModelBuilder; - private MarketDepthListener listener; - private long aggregationPeriodMillis; - private int depthLimit; + internal IndexedTxModel.Builder TxModelBuilder { get; } = IndexedTxModel.NewBuilder(); - public Builder(Type eventType) - { - this.eventType = eventType; - //txModelBuilder = IndexedTxModel.NewBuilder(eventType); - } + internal MarketDepthListener? Listener { get; private set; } + internal long AggregationPeriodMillis { get; private set; } + + internal int DepthLimit { get; private set; } + + /// + /// Sets the for the model being created. + /// The feed cannot be attached after the model has been built. + /// + /// The . + /// The builder instance. public Builder WithFeed(DXFeed feed) { - this.feed = feed; - //txModelBuilder.WithFeed(feed); - //subscription = feed.CreateSubscription(eventType); + TxModelBuilder.WithFeed(feed); return this; } - public Builder WithSources(params OrderSource[] sources) + /// + /// Sets the subscription symbol for the model being created. + /// The symbol cannot be added or changed after the model has been built. + /// + /// The subscription symbol. + /// The builder instance. + public Builder WithSymbol(string symbol) { - this.sources = new List(sources); - //txModelBuilder.WithSources(new List(sources)); + TxModelBuilder.WithSymbol(symbol); return this; } - public Builder WithSources(ICollection sources) + /// + /// Sets the listener for transaction notifications. + /// The listener cannot be changed or added once the model has been built. + /// + /// The transaction listener. + /// The builder instance. + public Builder WithListener(MarketDepthListener listener) { - this.sources = new List(sources); - //txModelBuilder.WithSources(new List(sources)); + Listener = listener; return this; } - public Builder WithSymbol(string symbol) + /// + /// Sets the sources from which to subscribe for indexed events. + /// If no sources have been set, subscriptions will default to all possible sources. + /// + /// + /// The default value for this source is an empty set, + /// which means that this model subscribes to all available sources. + /// + /// The specified sources. + /// this builder. + public Builder WithSources(params OrderSource[] sources) { - this.symbol = symbol; - //txModelBuilder.WithSymbol(symbol); + TxModelBuilder.WithSources(new List(sources)); return this; } - public Builder WithListener(MarketDepthListener listener) + /// + /// Sets the sources from which to subscribe for indexed events. + /// If no sources have been set, subscriptions will default to all possible sources. + /// + /// + /// The default value for this source is an empty set, + /// which means that this model subscribes to all available sources. + /// + /// The specified sources. + /// this builder. + public Builder WithSources(ICollection sources) { - this.listener = listener; + TxModelBuilder.WithSources(new List(sources)); return this; } + /// + /// Sets the aggregation period. + /// + /// The aggregation period in milliseconds. + /// this builder. public Builder WithAggregationPeriod(long aggregationPeriodMillis) { if (aggregationPeriodMillis < 0) @@ -417,10 +425,15 @@ public Builder WithAggregationPeriod(long aggregationPeriodMillis) aggregationPeriodMillis = 0; } - this.aggregationPeriodMillis = aggregationPeriodMillis; + AggregationPeriodMillis = aggregationPeriodMillis; return this; } + /// + /// Sets the depth limit. + /// + /// The depth limit. + /// this builder. public Builder WithDepthLimit(int depthLimit) { if (depthLimit < 0) @@ -428,114 +441,143 @@ public Builder WithDepthLimit(int depthLimit) depthLimit = 0; } - this.depthLimit = depthLimit; + DepthLimit = depthLimit; return this; } - public MarketDepthModel Build() => - new(txModelBuilder, listener, aggregationPeriodMillis, depthLimit, feed, symbol, subscription, sources); + /// + /// Builds an instance of based on the provided parameters. + /// + /// The created . + public MarketDepthModel Build() => + new(this); } - private sealed class OrderSet - where T : OrderBase + /// + /// Represents a set of orders, sorted by a comparator. + /// + private sealed class OrderSet { - private readonly List snapshot = new(); - private readonly IComparer comparator; - private readonly SortedSet orders; - private int depthLimit; + private readonly List _snapshot = new(); + private readonly IComparer _comparator; + private readonly SortedSet _orders; + private int _depthLimit; - public OrderSet(IComparer comparator) + /// + /// Initializes a new instance of the class with specified comparator. + /// + /// The comparator to use for sorting orders. + public OrderSet(IComparer comparator) { - this.comparator = comparator; - orders = new SortedSet(comparator); + _comparator = comparator; + _orders = new SortedSet(comparator); } - public int DepthLimit + /// + /// Gets a value indicating whether this set has changed. + /// + public bool IsChanged { get; private set; } + + /// + /// Sets the depth limit. + /// + /// The new depth limit. + public void SetDepthLimit(int depthLimit) { - get => depthLimit; - set + if (_depthLimit == depthLimit) { - if (depthLimit == value) - { - return; - } - - depthLimit = value; - IsChanged = true; + return; } - } - public bool IsChanged { get; private set; } + _depthLimit = depthLimit; + IsChanged = true; + } - public void Add(T order) + /// + /// Adds an order to the set. + /// + /// The order to add. + public void Add(TE order) { - if (orders.Add(order)) + if (_orders.Add(order)) { MarkAsChangedIfNeeded(order); } } - public void Remove(T order) + /// + /// Removes an order from the set. + /// + /// The order to remove. + public void Remove(TE order) { - if (orders.Remove(order)) + if (_orders.Remove(order)) { MarkAsChangedIfNeeded(order); } } + /// + /// Clears orders from the set by source. + /// + /// The source to clear orders by. public void ClearBySource(IndexedEventSource source) => - IsChanged = orders.RemoveWhere(order => order.EventSource.Equals(source)) > 0; + IsChanged = _orders.RemoveWhere(order => order.EventSource.Equals(source)) > 0; - public List ToList() + /// + /// Converts the set to a list. + /// + /// The list of orders. + public List ToList() { if (IsChanged) { UpdateSnapshot(); } - return new List(snapshot); + return new List(_snapshot); } private void UpdateSnapshot() { IsChanged = false; - snapshot.Clear(); - var limit = IsDepthLimitUnbounded() ? int.MaxValue : depthLimit; - var it = orders.GetEnumerator(); + _snapshot.Clear(); + var limit = IsDepthLimitUnbounded() ? int.MaxValue : _depthLimit; + var it = _orders.GetEnumerator(); for (var i = 0; i < limit && it.MoveNext(); ++i) { - snapshot.Add(it.Current); + _snapshot.Add(it.Current); } } - private void MarkAsChangedIfNeeded(T order) + private void MarkAsChangedIfNeeded(TE order) { if (IsChanged) { return; } - if (IsDepthLimitUnbounded() || IsSizeWithinDepthLimit() || IsOrderWithinDepthLimit(order)) + if (IsDepthLimitUnbounded() || IsOrderCountWithinDepthLimit() || IsOrderWithinDepthLimit(order)) { IsChanged = true; } } private bool IsDepthLimitUnbounded() => - depthLimit <= 0 || depthLimit == int.MaxValue; + _depthLimit <= 0 || _depthLimit == int.MaxValue; - private bool IsSizeWithinDepthLimit() => - orders.Count <= depthLimit; + private bool IsOrderCountWithinDepthLimit() => + _orders.Count <= _depthLimit; - private bool IsOrderWithinDepthLimit(T order) + private bool IsOrderWithinDepthLimit(TE order) { - if (snapshot.Count == 0) + if (_snapshot.Count == 0) { return true; } - var last = snapshot[snapshot.Count - 1]; - return comparator.Compare(last, order) >= 0; + var last = _snapshot[_snapshot.Count - 1]; + return _comparator.Compare(last, order) >= 0; } } } diff --git a/src/DxFeed.Graal.Net/Models/TimeSeriesTxModel.cs b/src/DxFeed.Graal.Net/Models/TimeSeriesTxModel.cs index 203325b..98270fe 100644 --- a/src/DxFeed.Graal.Net/Models/TimeSeriesTxModel.cs +++ b/src/DxFeed.Graal.Net/Models/TimeSeriesTxModel.cs @@ -4,115 +4,139 @@ // If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/. // -using System; +using System.Collections.Generic; using DxFeed.Graal.Net.Api; -using DxFeed.Graal.Net.Native.Model; +using DxFeed.Graal.Net.Api.Osub; +using DxFeed.Graal.Net.Events; namespace DxFeed.Graal.Net.Models; /// /// An incremental model for time-series events. /// This model manages all snapshot and transaction logic, subscription handling, and listener notifications. -/// -/// -/// -/// This model is designed to handle incremental transactions. Users of this model only see the list +/// +///

This model is designed to handle incremental transactions. Users of this model only see the list /// of events in a consistent state. This model delays incoming events that are part of an incomplete snapshot /// or ongoing transaction until the snapshot is complete or the transaction has ended. This model notifies -/// the user of received transactions through an installed listener. -/// +/// the user of received transactions through an installed .

+/// ///

Configuration

-/// -/// This model must be configured using the class, as most configuration -/// settings cannot be changed once the model is built. This model requires configuration with -/// a symbol and a -/// fromTime for subscription, and it must be attached +/// +///

This model must be configured using the class, as most configuration +/// settings cannot be changed once the model is built. This model requires configuration +/// and +/// for subscription, and it must be attached /// to a instance to begin operation. -/// For ease of use, some of these configurations can be changed after the model is built, see -/// . -/// -/// -/// This model only supports single symbol subscriptions; multiple symbols cannot be configured. -/// +/// For ease of use, some of these configurations can be changed after the model is built, +/// see .

+/// +///

This model only supports single symbol subscriptions; multiple symbols cannot be configured.

+/// ///

Resource management and closed models

-/// -/// Attached model is a potential memory leak. If the pointer to the attached model is lost, then there is no way +/// +///

Attached model is a potential memory leak. If the pointer to attached model is lost, then there is no way /// to detach this model from the feed and the model will not be reclaimed by the garbage collector as long as the -/// corresponding feed is still used. Detached model can be reclaimed by the garbage collector, but detaching the model -/// requires knowing the pointer to the feed at the place of the call, which is not always convenient. -/// -/// -/// The convenient way to detach the model from the feed is to call its method. +/// corresponding feed is still used. Detached model can be reclaimed by the garbage collector, but detaching model +/// requires knowing the pointer to the feed at the place of the call, which is not always convenient.

+/// +///

The convenient way to detach model from the feed is to call its +/// method. /// Closed model becomes permanently detached from all feeds, removes all its listeners and is guaranteed -/// to be reclaimable by the garbage collector as soon as all external references to it are cleared. -/// +/// to be reclaimable by the garbage collector as soon as all external references to it are cleared.

+/// ///

Threads and locks

-/// -/// This class is thread-safe and can be used concurrently from multiple threads without external synchronization. -/// -///
-public class TimeSeriesTxModel : AbstractTxModel +/// +///

This class is thread-safe and can be used concurrently from multiple threads without external synchronization.

+/// The corresponding to never be concurrent. +/// +/// The type of time series events processed by this model. +public sealed class TimeSeriesTxModel : AbstractTxModel + where TE : class, ITimeSeriesEvent { - private TimeSeriesTxModel(AbstractTxModelHandle handle) - : base(handle) + private readonly object _syncRoot = new(); + private long _fromTime; + + private TimeSeriesTxModel(Builder builder) + : base(builder) { + _fromTime = builder.FromTime; + UpdateSubscription(GetUndecoratedSymbol(), _fromTime); } /// /// Factory method to create a new builder for this model. /// - /// The class type of the time series event. /// A new instance. - public static Builder NewBuilder(Type eventType) => - new(eventType); + public static Builder NewBuilder() => + new(); /// - /// Returns the time from which to subscribe for time-series, + /// Gets the time from which to subscribe for time-series, /// or if this model is not subscribed (this is the default value). /// /// The time in milliseconds since Unix epoch of January 1, 1970. - public long GetFromTime() => - ((TimeSeriesTxModelHandle)handle).GetFromTime(); + public long GetFromTime() + { + lock (_syncRoot) + { + return _fromTime; + } + } /// /// Sets the time from which to subscribe for time-series. /// If this time has already been set, nothing happens. /// /// The time in milliseconds since Unix epoch of January 1, 1970. - public void SetFromTime(long fromTime) => - ((TimeSeriesTxModelHandle)handle).SetFromTime(fromTime); + public void SetFromTime(long fromTime) + { + lock (_syncRoot) + { + if (_fromTime == fromTime) + { + return; + } + + _fromTime = fromTime; + UpdateSubscription(GetUndecoratedSymbol(), _fromTime); + } + } + + private static HashSet DecorateSymbol(object symbol, long fromTime) => + fromTime == long.MaxValue + ? new HashSet() + : new HashSet { new TimeSeriesSubscriptionSymbol(symbol, fromTime) }; + + private void UpdateSubscription(object symbol, long fromTime) => + SetSymbols(DecorateSymbol(symbol, fromTime)); /// - /// A builder class for creating an instance of . + /// A builder class for creating an instance of . /// - public class Builder : Builder + public new class Builder : Builder> { - /// - /// Initializes a new instance of the class with the specified event type. - /// - /// The type of events processed by the model being created. - public Builder(Type eventType) - : base(eventType) - { - } + internal long FromTime { get; private set; } = long.MaxValue; /// /// Sets the time from which to subscribe for time-series. - /// - /// - /// This time defaults to , which means that this model is not subscribed. + /// + ///

This time defaults to , which means that this model is not subscribed. /// This time can be changed later, after the model has been created, - /// by calling . - /// + /// by calling .

+ /// /// The time in milliseconds since Unix epoch of January 1, 1970. - /// The builder instance. + /// this builder. public Builder WithFromTime(long fromTime) { + FromTime = fromTime; return this; } - /// - public override TimeSeriesTxModel Build() => - throw new NotImplementedException(); + /// + /// Builds an instance of based on the provided parameters. + /// + /// The created . + public override TimeSeriesTxModel Build() => + new(this); } } diff --git a/src/DxFeed.Graal.Net/Models/TxEventProcessor.cs b/src/DxFeed.Graal.Net/Models/TxEventProcessor.cs new file mode 100644 index 0000000..dec720c --- /dev/null +++ b/src/DxFeed.Graal.Net/Models/TxEventProcessor.cs @@ -0,0 +1,192 @@ +// +// Copyright © 2024 Devexperts LLC. All rights reserved. +// This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/. +// + +using System.Collections.Generic; +using System.Collections.Specialized; +using System.Linq; +using DxFeed.Graal.Net.Events; +using DxFeed.Graal.Net.Events.Market; + +namespace DxFeed.Graal.Net.Models; + +internal class TxEventProcessor + where TE : class, IIndexedEvent +{ + private readonly List _pendingEvents = new(); + private readonly ITransactionProcessor _transactionProcessor; + private readonly ISnapshotProcessor _snapshotProcessor; + private bool _isSnapshotBeginSeen; + private bool _isPartialSnapshot; + private bool _isCompleteSnapshot; + + public TxEventProcessor(bool isBatchProcessing, bool isSnapshotProcessing, Listener listener) + { + _transactionProcessor = isBatchProcessing + ? new BatchTransactionProcessor(listener) + : new NotifyTransactionProcessor(listener); + _snapshotProcessor = isSnapshotProcessing + ? new ProcessingSnapshotProcessor(listener) + : new NotifySnapshotProcessor(listener); + } + + public delegate void Listener(IEnumerable events, bool isSnapshot); + + private interface ITransactionProcessor + { + void ProcessTransaction(List events); + + void ProcessingBatch(); + } + + private interface ISnapshotProcessor + { + void ProcessSnapshot(List events); + } + + public bool ProcessEvent(TE e) + { + if (EventFlags.IsSnapshotBegin(e)) + { + _isSnapshotBeginSeen = true; + _isPartialSnapshot = true; + _isCompleteSnapshot = false; + + // Remove any unprocessed leftovers on new snapshot. + _pendingEvents.Clear(); + } + else if (!_isSnapshotBeginSeen) + { + // Ignore all events until at least one SNAPSHOT_BEGIN is received. + return false; + } + + if (_isPartialSnapshot && EventFlags.IsSnapshotEndOrSnip(e)) + { + _isPartialSnapshot = false; + _isCompleteSnapshot = true; + } + + // Defer processing of this event while snapshot in progress or tx pending. + _pendingEvents.Add(e); + if (EventFlags.IsPending(e) || _isPartialSnapshot) + { + // Waiting for the end of snapshot or transaction. + return false; + } + + // We have completed transaction or snapshot. + if (_isCompleteSnapshot) + { + // Completed snapshot. + _snapshotProcessor.ProcessSnapshot(_pendingEvents); + _isCompleteSnapshot = false; + } + else + { + // Completed transaction. + _transactionProcessor.ProcessTransaction(_pendingEvents); + } + + _pendingEvents.Clear(); + return true; + } + + public void ReceiveAllEventsInBatch() => + _transactionProcessor.ProcessingBatch(); + + private sealed class BatchTransactionProcessor : ITransactionProcessor + { + private readonly Listener _listener; + private readonly List _transactions = new(); + + public BatchTransactionProcessor(Listener listener) => + _listener = listener; + + public void ProcessTransaction(List events) => + _transactions.AddRange(events); + + public void ProcessingBatch() + { + if (_transactions.Count == 0) + { + return; + } + + _listener.Invoke(_transactions, false); + _transactions.Clear(); + } + } + + private sealed class NotifyTransactionProcessor : ITransactionProcessor + { + private readonly Listener _listener; + + public NotifyTransactionProcessor(Listener listener) => + _listener = listener; + + public void ProcessTransaction(List events) => + _listener.Invoke(events, false); + + public void ProcessingBatch() + { + // nothing to do + } + } + + private sealed class NotifySnapshotProcessor : ISnapshotProcessor + { + private readonly Listener _listener; + + public NotifySnapshotProcessor(Listener listener) => + _listener = listener; + + public void ProcessSnapshot(List events) => + _listener.Invoke(events, true); + } + + private sealed class ProcessingSnapshotProcessor : ISnapshotProcessor + { + private readonly Listener _listener; + private readonly OrderedDictionary _snapshot = new(); + + public ProcessingSnapshotProcessor(Listener listener) => + _listener = listener; + + public void ProcessSnapshot(List events) + { + foreach (var e in events) + { + if (IsRemove(e)) + { + _snapshot.Remove(e.Index); + } + else + { + e.EventFlags = 0; + _snapshot[e.Index] = e; + } + } + + _listener.Invoke(_snapshot.Values.OfType(), true); + _snapshot.Clear(); + } + + private static bool IsRemove(TE e) + { + if ((e.EventFlags & EventFlags.RemoveEvent) != 0) + { + return true; + } + + if (e is OrderBase order) + { + return !order.HasSize; + } + + return false; + } + } +} diff --git a/src/DxFeed.Graal.Net/Models/TxModelListener.cs b/src/DxFeed.Graal.Net/Models/TxModelListener.cs index 12d6be8..e99e122 100644 --- a/src/DxFeed.Graal.Net/Models/TxModelListener.cs +++ b/src/DxFeed.Graal.Net/Models/TxModelListener.cs @@ -11,22 +11,23 @@ namespace DxFeed.Graal.Net.Models; /// /// Invoked when a complete transaction (one or more) is received. This behavior can be changed when building -/// the model; see . -/// -/// -/// Only events that have the same and +/// the model; see . +/// +///

Only events that have the same and /// can be in the same listener call and cannot be mixed within a single call. If there are multiple sources, -/// listener notifications will happen separately for each source. +/// listener notifications will happen separately for each source.

/// -/// A transaction can also be a snapshot. In such cases, the flag is set to true, +///

A transaction can also be a snapshot. In such cases, the flag is set to true, /// indicating that all state based on previously received events for the corresponding /// should be cleared. A snapshot can also be post-processed or raw; -/// see . -/// If is true, +/// see . +/// If is true, /// the transaction containing the snapshot can be empty (events.Count == 0), -/// meaning that an empty snapshot was received. -/// +/// meaning that an empty snapshot was received.

+/// /// The source of the indexed events. /// The list of received events representing one or more transactions. /// Indicates if the events form a snapshot. -public delegate void TxModelListener(IndexedEventSource source, IEnumerable events, bool isSnapshot); +/// The type of indexed events passed to this listener. +public delegate void TxModelListener(IndexedEventSource source, IEnumerable events, bool isSnapshot) + where TE : IIndexedEvent; diff --git a/src/DxFeed.Graal.Net/Native/Model/AbstractTxModelHandle.cs b/src/DxFeed.Graal.Net/Native/Model/AbstractTxModelHandle.cs deleted file mode 100644 index 6355b58..0000000 --- a/src/DxFeed.Graal.Net/Native/Model/AbstractTxModelHandle.cs +++ /dev/null @@ -1,22 +0,0 @@ -// -// Copyright © 2024 Devexperts LLC. All rights reserved. -// This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. -// If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/. -// - -using DxFeed.Graal.Net.Native.Interop; - -namespace DxFeed.Graal.Net.Native.Model; - -internal class AbstractTxModelHandle : JavaHandle -{ - public bool IsBatchProcessing() => - false; - - public bool IsSnapshotProcessing() => - false; - - public void Close() - { - } -} diff --git a/src/DxFeed.Graal.Net/Native/Model/IndexedTxModelHandle.cs b/src/DxFeed.Graal.Net/Native/Model/IndexedTxModelHandle.cs deleted file mode 100644 index ff80a24..0000000 --- a/src/DxFeed.Graal.Net/Native/Model/IndexedTxModelHandle.cs +++ /dev/null @@ -1,23 +0,0 @@ -// -// Copyright © 2024 Devexperts LLC. All rights reserved. -// This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. -// If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/. -// - -using System; -using System.Collections.Generic; -using DxFeed.Graal.Net.Events; - -namespace DxFeed.Graal.Net.Native.Model; - -internal class IndexedTxModelHandle : AbstractTxModelHandle -{ - public HashSet GetSources() => - throw new NotImplementedException(); - - public void SetSources(params IndexedEventSource[] sources) => - throw new NotImplementedException(); - - public void SetSources(ICollection sources) => - throw new NotImplementedException(); -} diff --git a/src/DxFeed.Graal.Net/Native/Model/TimeSeriesTxModelHandle.cs b/src/DxFeed.Graal.Net/Native/Model/TimeSeriesTxModelHandle.cs deleted file mode 100644 index 3304392..0000000 --- a/src/DxFeed.Graal.Net/Native/Model/TimeSeriesTxModelHandle.cs +++ /dev/null @@ -1,18 +0,0 @@ -// -// Copyright © 2024 Devexperts LLC. All rights reserved. -// This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. -// If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/. -// - -using System; - -namespace DxFeed.Graal.Net.Native.Model; - -internal class TimeSeriesTxModelHandle : AbstractTxModelHandle -{ - public long GetFromTime() => - throw new NotImplementedException(); - - public void SetFromTime(long fromTime) => - throw new NotImplementedException(); -} diff --git a/src/DxFeed.Graal.Net/Native/Model/TxEventListenerHandle.cs b/src/DxFeed.Graal.Net/Native/Model/TxEventListenerHandle.cs deleted file mode 100644 index 5e9ba8d..0000000 --- a/src/DxFeed.Graal.Net/Native/Model/TxEventListenerHandle.cs +++ /dev/null @@ -1,63 +0,0 @@ -// -// Copyright © 2024 Devexperts LLC. All rights reserved. -// This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. -// If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/. -// - -using System; -using System.Linq; -using System.Runtime.InteropServices; -using DxFeed.Graal.Net.Api; -using DxFeed.Graal.Net.Events; -using DxFeed.Graal.Net.Events.Market; -using DxFeed.Graal.Net.Models; -using DxFeed.Graal.Net.Native.Endpoint; -using DxFeed.Graal.Net.Native.Events; -using DxFeed.Graal.Net.Native.Interop; -using static DxFeed.Graal.Net.Native.ErrorHandling.ErrorCheck; - -namespace DxFeed.Graal.Net.Native.Model; - -internal class TxEventListenerHandle : JavaHandle -{ - private static readonly unsafe Delegate OnEventsReceivedDelegate = new OnEventsReceivedDelegateType(OnEventReceived); - - [UnmanagedFunctionPointer(CallingConvention.Cdecl)] - private unsafe delegate void OnEventsReceivedDelegateType(IntPtr thread, int source, ListNative* events, bool isSnapshot, GCHandle handle); - - public static TxEventListenerHandle Create(TxModelListener listener) => - CreateAndRegisterFinalize(listener, handle => SafeCall(Import.New(CurrentThread, OnEventsReceivedDelegate, handle))); - - private static unsafe void OnEventReceived(IntPtr thread, int source, ListNative* events, bool isSnapshot, GCHandle handle) - { - if (!handle.IsAllocated) - { - return; - } - - var listener = handle.Target as TxModelListener; - try - { - var eventList = EventMapper.FromNative(events); - listener?.Invoke(OrderSource.ValueOf(source), eventList.OfType(), isSnapshot); - } - catch (Exception e) - { - // ToDo Add log entry. - Console.Error.WriteLine($"Exception in user {nameof(TxModelListener)}. {e}"); - } - } - - private static class Import - { - [DllImport( - ImportInfo.DllName, - CallingConvention = CallingConvention.Cdecl, - CharSet = CharSet.Ansi, - EntryPoint = "dxfg_PropertyChangeListener_new")] - public static extern TxEventListenerHandle New( - nint thread, - Delegate listener, - GCHandle handle); - } -}