Skip to content

Commit

Permalink
add observable subscription to Publisher
Browse files Browse the repository at this point in the history
  • Loading branch information
kosyloa committed Jan 2, 2024
1 parent ffda486 commit c79101d
Show file tree
Hide file tree
Showing 15 changed files with 496 additions and 28 deletions.
14 changes: 14 additions & 0 deletions DXFeedFramework.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@
641BDD612ACD697B00236B78 /* AbstractEventListener.swift in Sources */ = {isa = PBXBuildFile; fileRef = 641BDD602ACD697B00236B78 /* AbstractEventListener.swift */; };
641BDD622ACD697B00236B78 /* AbstractEventListener.swift in Sources */ = {isa = PBXBuildFile; fileRef = 641BDD602ACD697B00236B78 /* AbstractEventListener.swift */; };
641C64AC2B331B160023CFAD /* ObservableSubscriptionChangeListener.swift in Sources */ = {isa = PBXBuildFile; fileRef = 641C64AB2B331B160023CFAD /* ObservableSubscriptionChangeListener.swift */; };
641C64B02B34679D0023CFAD /* NativeObservableSubscription.swift in Sources */ = {isa = PBXBuildFile; fileRef = 641C64AF2B34679D0023CFAD /* NativeObservableSubscription.swift */; };
641C64B22B346A2E0023CFAD /* DXObservableSubscription.swift in Sources */ = {isa = PBXBuildFile; fileRef = 641C64B12B346A2E0023CFAD /* DXObservableSubscription.swift */; };
641C64B42B347C430023CFAD /* DXObservableSubscriptionTest.swift in Sources */ = {isa = PBXBuildFile; fileRef = 641C64B32B347C430023CFAD /* DXObservableSubscriptionTest.swift */; };
641E45F92B1DE51700649363 /* EventsListener.swift in Sources */ = {isa = PBXBuildFile; fileRef = 641E45F82B1DE51700649363 /* EventsListener.swift */; };
642528D02A3C534D00A04E41 /* TimeInterval+Ext.swift in Sources */ = {isa = PBXBuildFile; fileRef = 6469F8CD2A3B2F9900846831 /* TimeInterval+Ext.swift */; };
642528D12A3C534D00A04E41 /* TimeInterval+Ext.swift in Sources */ = {isa = PBXBuildFile; fileRef = 6469F8CD2A3B2F9900846831 /* TimeInterval+Ext.swift */; };
Expand Down Expand Up @@ -588,6 +591,10 @@
641BDD5C2ACD67A000236B78 /* LatencyListener.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = LatencyListener.swift; sourceTree = "<group>"; };
641BDD602ACD697B00236B78 /* AbstractEventListener.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = AbstractEventListener.swift; sourceTree = "<group>"; };
641C64AB2B331B160023CFAD /* ObservableSubscriptionChangeListener.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ObservableSubscriptionChangeListener.swift; sourceTree = "<group>"; };
641C64AE2B344E770023CFAD /* PublishProfiles.playground */ = {isa = PBXFileReference; lastKnownFileType = file.playground; path = PublishProfiles.playground; sourceTree = "<group>"; xcLanguageSpecificationIdentifier = xcode.lang.swift; };
641C64AF2B34679D0023CFAD /* NativeObservableSubscription.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = NativeObservableSubscription.swift; sourceTree = "<group>"; };
641C64B12B346A2E0023CFAD /* DXObservableSubscription.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = DXObservableSubscription.swift; sourceTree = "<group>"; };
641C64B32B347C430023CFAD /* DXObservableSubscriptionTest.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = DXObservableSubscriptionTest.swift; sourceTree = "<group>"; };
641E45F82B1DE51700649363 /* EventsListener.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = EventsListener.swift; sourceTree = "<group>"; };
64278C6B2A602CA20074B5AA /* CandleTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = CandleTests.swift; sourceTree = "<group>"; };
64278C6D2A602D2B0074B5AA /* Candle.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Candle.swift; sourceTree = "<group>"; };
Expand Down Expand Up @@ -947,6 +954,8 @@
isa = PBXGroup;
children = (
64104FCB2A2629D800D1FC41 /* NativeSubscription.swift */,
641C64AF2B34679D0023CFAD /* NativeObservableSubscription.swift */,
641C64B12B346A2E0023CFAD /* DXObservableSubscription.swift */,
);
path = Subscription;
sourceTree = "<group>";
Expand Down Expand Up @@ -1014,6 +1023,7 @@
640885C42B1F477A00E6CF88 /* DxFeedIpfConnect.playground */,
640885C52B1F553C00E6CF88 /* DXFeedLiveIpfSample.playground */,
640885C62B1F657300E6CF88 /* ScheduleSample.playground */,
641C64AE2B344E770023CFAD /* PublishProfiles.playground */,
);
path = Playgrounds;
sourceTree = "<group>";
Expand Down Expand Up @@ -1528,6 +1538,7 @@
6406F25A2AD987EB00B58C42 /* PublisherTest.swift */,
648C72462B18ABFC00E2FEF3 /* DXConnectionTest.swift */,
648C72482B19CA5A00E2FEF3 /* DXExceptionTest.swift */,
641C64B32B347C430023CFAD /* DXObservableSubscriptionTest.swift */,
);
path = DXFeedFrameworkTests;
sourceTree = "<group>";
Expand Down Expand Up @@ -2254,6 +2265,7 @@
64BDDB322AD7E5A600694210 /* SpreadOrderMapper.swift in Sources */,
64ACBCE32A289A0700032C53 /* TimeSeriesSubscriptionSymbol.swift in Sources */,
645A34952A937C7200709F29 /* BinaryInteger+Ext.swift in Sources */,
641C64B22B346A2E0023CFAD /* DXObservableSubscription.swift in Sources */,
64A42F502B0BA668001C3ACC /* DXTimeFormat.swift in Sources */,
64BA92652A306E0200BE26A0 /* Trade.swift in Sources */,
6486B9752AD0493F00D8D5FA /* TheoPrice+Ext.swift in Sources */,
Expand Down Expand Up @@ -2292,6 +2304,7 @@
642BE4CC2A2E1DB70052340A /* Mapper.swift in Sources */,
64104FC72A2613BC00D1FC41 /* ConcurrentArray.swift in Sources */,
6498E6B22AB1D41A0093A065 /* DXSchedule.swift in Sources */,
641C64B02B34679D0023CFAD /* NativeObservableSubscription.swift in Sources */,
64BDDB2C2AD7DB9B00694210 /* AnalyticOrder+Ext.swift in Sources */,
646407572AA0C44D006FF769 /* NativeInstrumentProfileCollector.swift in Sources */,
6486B9732AD045C800D8D5FA /* TheoPrice.swift in Sources */,
Expand Down Expand Up @@ -2441,6 +2454,7 @@
64D0DBE82A29FF4B00710605 /* EventsTest.swift in Sources */,
648C72492B19CA5A00E2FEF3 /* DXExceptionTest.swift in Sources */,
649813C42ADD5CB2003CE3B3 /* TestEndpoointStateListener.swift in Sources */,
641C64B42B347C430023CFAD /* DXObservableSubscriptionTest.swift in Sources */,
64ACBCEC2A29FE2300032C53 /* XCTestCase+Utils.swift in Sources */,
64ECD67F2A9CF4CB00B36935 /* IPFTests.swift in Sources */,
64ACBCD52A2789EF00032C53 /* TestListener.swift in Sources */,
Expand Down
21 changes: 10 additions & 11 deletions DXFeedFramework/Api/DXFeedSubcription.swift
Original file line number Diff line number Diff line change
Expand Up @@ -89,17 +89,6 @@ public class DXFeedSubcription {
public func removeSymbols(_ symbols: [Symbol]) throws {
try native.removeSymbols(symbols)
}

/// - Throws: GraalException. Rethrows exception from Java.
public func addChangeListener(_ listener: ObservableSubscriptionChangeListener) throws {
try native.addChangeListener(listener)
}

/// - Throws: GraalException. Rethrows exception from Java.
public func removeChangeListener(_ listener: ObservableSubscriptionChangeListener) throws {
try native.removeChangeListener(listener)
}

}

extension DXFeedSubcription: DXEventListener {
Expand Down Expand Up @@ -135,4 +124,14 @@ extension DXFeedSubcription: IObservableSubscription {
public func isContains(_ eventType: EventCode) -> Bool {
return events.contains(eventType)
}

/// - Throws: GraalException. Rethrows exception from Java.
public func addChangeListener(_ listener: ObservableSubscriptionChangeListener) throws {
try native.addChangeListener(listener)
}

/// - Throws: GraalException. Rethrows exception from Java.
public func removeChangeListener(_ listener: ObservableSubscriptionChangeListener) throws {
try native.removeChangeListener(listener)
}
}
33 changes: 32 additions & 1 deletion DXFeedFramework/Api/DXPublisher.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import Foundation
public class DXPublisher {
/// Feed native wrapper.
private let native: NativePublisher
private let subscriptionsByClass = ConcurrentDict<EventCode, DXObservableSubscription>()

deinit {
}
Expand All @@ -33,8 +34,38 @@ public class DXPublisher {
/// [Read it first Javadoc](https://docs.dxfeed.com/dxfeed/api/com/dxfeed/api/DXPublisher.html#publishEvents-java.util.Collection-)
/// - Parameters:
/// - events: The collection of events to publish.
/// - Throws: GraalException. Rethrows exception from Java.recore
/// - Throws: GraalException. Rethrows exception from Java.
public func publish(events: [MarketEvent]) throws {
try native.publishEvents(events: events)
}

/// Returns observable set of subscribed symbols for the specified event type.
/// Note, that subscription is represented by object symbols.
///
/// The set of subscribed symbols contains ``WildcardSymbol/all`` if and
/// only if there is a subscription to this wildcard symbol.
/// If ``DXFeedTimeSeriesSubscription`` is used
/// to subscribe to time-service of the events of this type, then instances of
/// ``TimeSeriesSubscriptionSymbol`` class represent the corresponding subscription item.
/// The resulting observable subscription can generate repeated
/// ``ObservableSubscriptionChangeListener/symbolsAdded(symbols:)`` notifications to
/// its listeners for the same symbols without the corresponding
/// ``ObservableSubscriptionChangeListener/symbolsRemoved(symbols:)``
/// notifications in between them. It happens when subscription disappears, cached data is lost, and subscription
/// reappears again. On each ``ObservableSubscriptionChangeListener/symbolsAdded(symbols:)``
/// notification data provider shall ``publish(events:)`` the most recent events for
/// the corresponding symbols.
/// - Parameters:
/// - event: eventType the class of event.
/// - Returns: Observable subscription for the specified event type.
/// - Throws: GraalException. Rethrows exception from Java.
public func getSubscription(_ event: EventCode) throws -> IObservableSubscription {
if let subscription = subscriptionsByClass[event] {
return subscription
} else {
let subscription = try DXObservableSubscription(native: native.createSubscription(event), events: [event])
subscriptionsByClass[event] = subscription
return subscription
}
}
}
4 changes: 4 additions & 0 deletions DXFeedFramework/Api/Osub/IObservableSubscription.swift
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,8 @@ public protocol IObservableSubscription {
/// - eventType: The event type.
/// - Returns: **true** if this subscription contains the corresponding event type
func isContains(_ eventType: EventCode) -> Bool

func addChangeListener(_ listener: ObservableSubscriptionChangeListener) throws

func removeChangeListener(_ listener: ObservableSubscriptionChangeListener) throws
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,10 @@

import Foundation


public protocol ObservableSubscriptionChangeListener: AnyObject {
func symbolsAdded<O>(symbols: Set<O>) where O: Symbol,
O: Hashable
func symbolsAdded(symbols: Set<AnyHashable>)

func symbolsRemoved<O>(symbols: Set<O>) where O: Symbol,
O: Hashable
func symbolsRemoved(symbols: Set<AnyHashable>)

func subscriptionClosed()
}
8 changes: 8 additions & 0 deletions DXFeedFramework/Api/Osub/TimeSeriesSubscriptionSymbol.swift
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,11 @@ extension TimeSeriesSubscriptionSymbol: CustomStringConvertible {
return stringValue
}
}

extension TimeSeriesSubscriptionSymbol: Hashable {

public func hash(into hasher: inout Hasher) {
hasher.combine(self.symbol)
hasher.combine(self.fromTime)
}
}
10 changes: 10 additions & 0 deletions DXFeedFramework/Api/Osub/WildcardSymbol.swift
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,13 @@ public class WildcardSymbol: Symbol {
return symbol
}
}

extension WildcardSymbol: Hashable {
public static func == (lhs: WildcardSymbol, rhs: WildcardSymbol) -> Bool {
return lhs.symbol == rhs.symbol
}

public func hash(into hasher: inout Hasher) {
hasher.combine(self.symbol)
}
}
9 changes: 9 additions & 0 deletions DXFeedFramework/Native/Feed/NativePublisher.swift
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,13 @@ class NativePublisher {
listPointer))
}
}

func createSubscription(_ event: EventCode) throws -> NativeObservableSubscription? {
let thread = currentThread()
let subscription = try ErrorCheck.nativeCall(thread,
dxfg_DXPublisher_getSubscription(thread,
self.publisher,
event.nativeCode()))
return NativeObservableSubscription(subscription: subscription)
}
}
49 changes: 49 additions & 0 deletions DXFeedFramework/Native/Subscription/DXObservableSubscription.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
//
//
// Copyright (C) 2023 Devexperts LLC. All rights reserved.
// This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
//

import Foundation

class DXObservableSubscription {
/// Subscription native wrapper.
private let native: NativeObservableSubscription
/// List of event types associated with this ``DXObservableSubscription``
fileprivate let events: Set<EventCode>

/// - Throws: ``GraalException`` Rethrows exception from Java, ``ArgumentException/argumentNil``
internal init(native: NativeObservableSubscription?, events: [EventCode]) throws {
if let native = native {
self.native = native
} else {
throw ArgumentException.argumentNil
}
self.events = Set(events)
}
}

extension DXObservableSubscription: IObservableSubscription {
public func isClosed() -> Bool {
return native.isClosed()
}

public var eventTypes: Set<EventCode> {
return events
}

public func isContains(_ eventType: EventCode) -> Bool {
return events.contains(eventType)
}

/// - Throws: GraalException. Rethrows exception from Java.
public func addChangeListener(_ listener: ObservableSubscriptionChangeListener) throws {
try native.addChangeListener(listener)
}

/// - Throws: GraalException. Rethrows exception from Java.
public func removeChangeListener(_ listener: ObservableSubscriptionChangeListener) throws {
try native.removeChangeListener(listener)
}
}
Loading

0 comments on commit c79101d

Please sign in to comment.