diff --git a/DXFeedFramework.xcodeproj/project.pbxproj b/DXFeedFramework.xcodeproj/project.pbxproj index 912ece23c..b90959645 100644 --- a/DXFeedFramework.xcodeproj/project.pbxproj +++ b/DXFeedFramework.xcodeproj/project.pbxproj @@ -69,6 +69,9 @@ 641BDD612ACD697B00236B78 /* AbstractEventListener.swift in Sources */ = {isa = PBXBuildFile; fileRef = 641BDD602ACD697B00236B78 /* AbstractEventListener.swift */; }; 641BDD622ACD697B00236B78 /* AbstractEventListener.swift in Sources */ = {isa = PBXBuildFile; fileRef = 641BDD602ACD697B00236B78 /* AbstractEventListener.swift */; }; 641C64AC2B331B160023CFAD /* ObservableSubscriptionChangeListener.swift in Sources */ = {isa = PBXBuildFile; fileRef = 641C64AB2B331B160023CFAD /* ObservableSubscriptionChangeListener.swift */; }; + 641C64B02B34679D0023CFAD /* NativeObservableSubscription.swift in Sources */ = {isa = PBXBuildFile; fileRef = 641C64AF2B34679D0023CFAD /* NativeObservableSubscription.swift */; }; + 641C64B22B346A2E0023CFAD /* DXObservableSubscription.swift in Sources */ = {isa = PBXBuildFile; fileRef = 641C64B12B346A2E0023CFAD /* DXObservableSubscription.swift */; }; + 641C64B42B347C430023CFAD /* DXObservableSubscriptionTest.swift in Sources */ = {isa = PBXBuildFile; fileRef = 641C64B32B347C430023CFAD /* DXObservableSubscriptionTest.swift */; }; 641E45F92B1DE51700649363 /* EventsListener.swift in Sources */ = {isa = PBXBuildFile; fileRef = 641E45F82B1DE51700649363 /* EventsListener.swift */; }; 642528D02A3C534D00A04E41 /* TimeInterval+Ext.swift in Sources */ = {isa = PBXBuildFile; fileRef = 6469F8CD2A3B2F9900846831 /* TimeInterval+Ext.swift */; }; 642528D12A3C534D00A04E41 /* TimeInterval+Ext.swift in Sources */ = {isa = PBXBuildFile; fileRef = 6469F8CD2A3B2F9900846831 /* TimeInterval+Ext.swift */; }; @@ -588,6 +591,10 @@ 641BDD5C2ACD67A000236B78 /* LatencyListener.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = LatencyListener.swift; sourceTree = ""; }; 641BDD602ACD697B00236B78 /* AbstractEventListener.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = AbstractEventListener.swift; sourceTree = ""; }; 641C64AB2B331B160023CFAD /* ObservableSubscriptionChangeListener.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ObservableSubscriptionChangeListener.swift; sourceTree = ""; }; + 641C64AE2B344E770023CFAD /* PublishProfiles.playground */ = {isa = PBXFileReference; lastKnownFileType = file.playground; path = PublishProfiles.playground; sourceTree = ""; xcLanguageSpecificationIdentifier = xcode.lang.swift; }; + 641C64AF2B34679D0023CFAD /* NativeObservableSubscription.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = NativeObservableSubscription.swift; sourceTree = ""; }; + 641C64B12B346A2E0023CFAD /* DXObservableSubscription.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = DXObservableSubscription.swift; sourceTree = ""; }; + 641C64B32B347C430023CFAD /* DXObservableSubscriptionTest.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = DXObservableSubscriptionTest.swift; sourceTree = ""; }; 641E45F82B1DE51700649363 /* EventsListener.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = EventsListener.swift; sourceTree = ""; }; 64278C6B2A602CA20074B5AA /* CandleTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = CandleTests.swift; sourceTree = ""; }; 64278C6D2A602D2B0074B5AA /* Candle.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Candle.swift; sourceTree = ""; }; @@ -947,6 +954,8 @@ isa = PBXGroup; children = ( 64104FCB2A2629D800D1FC41 /* NativeSubscription.swift */, + 641C64AF2B34679D0023CFAD /* NativeObservableSubscription.swift */, + 641C64B12B346A2E0023CFAD /* DXObservableSubscription.swift */, ); path = Subscription; sourceTree = ""; @@ -1014,6 +1023,7 @@ 640885C42B1F477A00E6CF88 /* DxFeedIpfConnect.playground */, 640885C52B1F553C00E6CF88 /* DXFeedLiveIpfSample.playground */, 640885C62B1F657300E6CF88 /* ScheduleSample.playground */, + 641C64AE2B344E770023CFAD /* PublishProfiles.playground */, ); path = Playgrounds; sourceTree = ""; @@ -1528,6 +1538,7 @@ 6406F25A2AD987EB00B58C42 /* PublisherTest.swift */, 648C72462B18ABFC00E2FEF3 /* DXConnectionTest.swift */, 648C72482B19CA5A00E2FEF3 /* DXExceptionTest.swift */, + 641C64B32B347C430023CFAD /* DXObservableSubscriptionTest.swift */, ); path = DXFeedFrameworkTests; sourceTree = ""; @@ -2254,6 +2265,7 @@ 64BDDB322AD7E5A600694210 /* SpreadOrderMapper.swift in Sources */, 64ACBCE32A289A0700032C53 /* TimeSeriesSubscriptionSymbol.swift in Sources */, 645A34952A937C7200709F29 /* BinaryInteger+Ext.swift in Sources */, + 641C64B22B346A2E0023CFAD /* DXObservableSubscription.swift in Sources */, 64A42F502B0BA668001C3ACC /* DXTimeFormat.swift in Sources */, 64BA92652A306E0200BE26A0 /* Trade.swift in Sources */, 6486B9752AD0493F00D8D5FA /* TheoPrice+Ext.swift in Sources */, @@ -2292,6 +2304,7 @@ 642BE4CC2A2E1DB70052340A /* Mapper.swift in Sources */, 64104FC72A2613BC00D1FC41 /* ConcurrentArray.swift in Sources */, 6498E6B22AB1D41A0093A065 /* DXSchedule.swift in Sources */, + 641C64B02B34679D0023CFAD /* NativeObservableSubscription.swift in Sources */, 64BDDB2C2AD7DB9B00694210 /* AnalyticOrder+Ext.swift in Sources */, 646407572AA0C44D006FF769 /* NativeInstrumentProfileCollector.swift in Sources */, 6486B9732AD045C800D8D5FA /* TheoPrice.swift in Sources */, @@ -2441,6 +2454,7 @@ 64D0DBE82A29FF4B00710605 /* EventsTest.swift in Sources */, 648C72492B19CA5A00E2FEF3 /* DXExceptionTest.swift in Sources */, 649813C42ADD5CB2003CE3B3 /* TestEndpoointStateListener.swift in Sources */, + 641C64B42B347C430023CFAD /* DXObservableSubscriptionTest.swift in Sources */, 64ACBCEC2A29FE2300032C53 /* XCTestCase+Utils.swift in Sources */, 64ECD67F2A9CF4CB00B36935 /* IPFTests.swift in Sources */, 64ACBCD52A2789EF00032C53 /* TestListener.swift in Sources */, diff --git a/DXFeedFramework/Api/DXFeedSubcription.swift b/DXFeedFramework/Api/DXFeedSubcription.swift index 60205280d..e3ea04bbc 100644 --- a/DXFeedFramework/Api/DXFeedSubcription.swift +++ b/DXFeedFramework/Api/DXFeedSubcription.swift @@ -89,17 +89,6 @@ public class DXFeedSubcription { public func removeSymbols(_ symbols: [Symbol]) throws { try native.removeSymbols(symbols) } - - /// - Throws: GraalException. Rethrows exception from Java. - public func addChangeListener(_ listener: ObservableSubscriptionChangeListener) throws { - try native.addChangeListener(listener) - } - - /// - Throws: GraalException. Rethrows exception from Java. - public func removeChangeListener(_ listener: ObservableSubscriptionChangeListener) throws { - try native.removeChangeListener(listener) - } - } extension DXFeedSubcription: DXEventListener { @@ -135,4 +124,14 @@ extension DXFeedSubcription: IObservableSubscription { public func isContains(_ eventType: EventCode) -> Bool { return events.contains(eventType) } + + /// - Throws: GraalException. Rethrows exception from Java. + public func addChangeListener(_ listener: ObservableSubscriptionChangeListener) throws { + try native.addChangeListener(listener) + } + + /// - Throws: GraalException. Rethrows exception from Java. + public func removeChangeListener(_ listener: ObservableSubscriptionChangeListener) throws { + try native.removeChangeListener(listener) + } } diff --git a/DXFeedFramework/Api/DXPublisher.swift b/DXFeedFramework/Api/DXPublisher.swift index 52efcabe4..d9fd1c486 100644 --- a/DXFeedFramework/Api/DXPublisher.swift +++ b/DXFeedFramework/Api/DXPublisher.swift @@ -12,6 +12,7 @@ import Foundation public class DXPublisher { /// Feed native wrapper. private let native: NativePublisher + private let subscriptionsByClass = ConcurrentDict() deinit { } @@ -33,8 +34,38 @@ public class DXPublisher { /// [Read it first Javadoc](https://docs.dxfeed.com/dxfeed/api/com/dxfeed/api/DXPublisher.html#publishEvents-java.util.Collection-) /// - Parameters: /// - events: The collection of events to publish. - /// - Throws: GraalException. Rethrows exception from Java.recore + /// - Throws: GraalException. Rethrows exception from Java. public func publish(events: [MarketEvent]) throws { try native.publishEvents(events: events) } + + /// Returns observable set of subscribed symbols for the specified event type. + /// Note, that subscription is represented by object symbols. + /// + /// The set of subscribed symbols contains ``WildcardSymbol/all`` if and + /// only if there is a subscription to this wildcard symbol. + /// If ``DXFeedTimeSeriesSubscription`` is used + /// to subscribe to time-service of the events of this type, then instances of + /// ``TimeSeriesSubscriptionSymbol`` class represent the corresponding subscription item. + /// The resulting observable subscription can generate repeated + /// ``ObservableSubscriptionChangeListener/symbolsAdded(symbols:)`` notifications to + /// its listeners for the same symbols without the corresponding + /// ``ObservableSubscriptionChangeListener/symbolsRemoved(symbols:)`` + /// notifications in between them. It happens when subscription disappears, cached data is lost, and subscription + /// reappears again. On each ``ObservableSubscriptionChangeListener/symbolsAdded(symbols:)`` + /// notification data provider shall ``publish(events:)`` the most recent events for + /// the corresponding symbols. + /// - Parameters: + /// - event: eventType the class of event. + /// - Returns: Observable subscription for the specified event type. + /// - Throws: GraalException. Rethrows exception from Java. + public func getSubscription(_ event: EventCode) throws -> IObservableSubscription { + if let subscription = subscriptionsByClass[event] { + return subscription + } else { + let subscription = try DXObservableSubscription(native: native.createSubscription(event), events: [event]) + subscriptionsByClass[event] = subscription + return subscription + } + } } diff --git a/DXFeedFramework/Api/Osub/IObservableSubscription.swift b/DXFeedFramework/Api/Osub/IObservableSubscription.swift index 9d2c17648..2999c4fb5 100644 --- a/DXFeedFramework/Api/Osub/IObservableSubscription.swift +++ b/DXFeedFramework/Api/Osub/IObservableSubscription.swift @@ -26,4 +26,8 @@ public protocol IObservableSubscription { /// - eventType: The event type. /// - Returns: **true** if this subscription contains the corresponding event type func isContains(_ eventType: EventCode) -> Bool + + func addChangeListener(_ listener: ObservableSubscriptionChangeListener) throws + + func removeChangeListener(_ listener: ObservableSubscriptionChangeListener) throws } diff --git a/DXFeedFramework/Api/Osub/ObservableSubscriptionChangeListener.swift b/DXFeedFramework/Api/Osub/ObservableSubscriptionChangeListener.swift index 0e2002416..263865f55 100644 --- a/DXFeedFramework/Api/Osub/ObservableSubscriptionChangeListener.swift +++ b/DXFeedFramework/Api/Osub/ObservableSubscriptionChangeListener.swift @@ -7,13 +7,10 @@ import Foundation - public protocol ObservableSubscriptionChangeListener: AnyObject { - func symbolsAdded(symbols: Set) where O: Symbol, - O: Hashable + func symbolsAdded(symbols: Set) - func symbolsRemoved(symbols: Set) where O: Symbol, - O: Hashable + func symbolsRemoved(symbols: Set) func subscriptionClosed() } diff --git a/DXFeedFramework/Api/Osub/TimeSeriesSubscriptionSymbol.swift b/DXFeedFramework/Api/Osub/TimeSeriesSubscriptionSymbol.swift index 5aef01550..c7c844269 100644 --- a/DXFeedFramework/Api/Osub/TimeSeriesSubscriptionSymbol.swift +++ b/DXFeedFramework/Api/Osub/TimeSeriesSubscriptionSymbol.swift @@ -61,3 +61,11 @@ extension TimeSeriesSubscriptionSymbol: CustomStringConvertible { return stringValue } } + +extension TimeSeriesSubscriptionSymbol: Hashable { + + public func hash(into hasher: inout Hasher) { + hasher.combine(self.symbol) + hasher.combine(self.fromTime) + } +} diff --git a/DXFeedFramework/Api/Osub/WildcardSymbol.swift b/DXFeedFramework/Api/Osub/WildcardSymbol.swift index 2000c37c4..e2ca2ffb2 100644 --- a/DXFeedFramework/Api/Osub/WildcardSymbol.swift +++ b/DXFeedFramework/Api/Osub/WildcardSymbol.swift @@ -45,3 +45,13 @@ public class WildcardSymbol: Symbol { return symbol } } + +extension WildcardSymbol: Hashable { + public static func == (lhs: WildcardSymbol, rhs: WildcardSymbol) -> Bool { + return lhs.symbol == rhs.symbol + } + + public func hash(into hasher: inout Hasher) { + hasher.combine(self.symbol) + } +} diff --git a/DXFeedFramework/Native/Feed/NativePublisher.swift b/DXFeedFramework/Native/Feed/NativePublisher.swift index 0417c7e21..77817db8f 100644 --- a/DXFeedFramework/Native/Feed/NativePublisher.swift +++ b/DXFeedFramework/Native/Feed/NativePublisher.swift @@ -44,4 +44,13 @@ class NativePublisher { listPointer)) } } + + func createSubscription(_ event: EventCode) throws -> NativeObservableSubscription? { + let thread = currentThread() + let subscription = try ErrorCheck.nativeCall(thread, + dxfg_DXPublisher_getSubscription(thread, + self.publisher, + event.nativeCode())) + return NativeObservableSubscription(subscription: subscription) + } } diff --git a/DXFeedFramework/Native/Subscription/DXObservableSubscription.swift b/DXFeedFramework/Native/Subscription/DXObservableSubscription.swift new file mode 100644 index 000000000..1a6686faf --- /dev/null +++ b/DXFeedFramework/Native/Subscription/DXObservableSubscription.swift @@ -0,0 +1,49 @@ +// +// +// Copyright (C) 2023 Devexperts LLC. All rights reserved. +// This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/. +// + +import Foundation + +class DXObservableSubscription { + /// Subscription native wrapper. + private let native: NativeObservableSubscription + /// List of event types associated with this ``DXObservableSubscription`` + fileprivate let events: Set + + /// - Throws: ``GraalException`` Rethrows exception from Java, ``ArgumentException/argumentNil`` + internal init(native: NativeObservableSubscription?, events: [EventCode]) throws { + if let native = native { + self.native = native + } else { + throw ArgumentException.argumentNil + } + self.events = Set(events) + } +} + +extension DXObservableSubscription: IObservableSubscription { + public func isClosed() -> Bool { + return native.isClosed() + } + + public var eventTypes: Set { + return events + } + + public func isContains(_ eventType: EventCode) -> Bool { + return events.contains(eventType) + } + + /// - Throws: GraalException. Rethrows exception from Java. + public func addChangeListener(_ listener: ObservableSubscriptionChangeListener) throws { + try native.addChangeListener(listener) + } + + /// - Throws: GraalException. Rethrows exception from Java. + public func removeChangeListener(_ listener: ObservableSubscriptionChangeListener) throws { + try native.removeChangeListener(listener) + } +} diff --git a/DXFeedFramework/Native/Subscription/NativeObservableSubscription.swift b/DXFeedFramework/Native/Subscription/NativeObservableSubscription.swift new file mode 100644 index 000000000..cc9e0aa0b --- /dev/null +++ b/DXFeedFramework/Native/Subscription/NativeObservableSubscription.swift @@ -0,0 +1,141 @@ +// +// +// Copyright (C) 2023 Devexperts LLC. All rights reserved. +// This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/. +// + +import Foundation +@_implementationOnly import graal_api + +/// Native wrapper over the Java com.dxfeed.api.DxFeedSubscription class. +/// The location of the imported functions is in the header files "dxfg_subscription.h". +class NativeObservableSubscription { + private class WeakSubscriptionChangeListener: WeakBox { } + + let subscription: UnsafeMutablePointer? + + private var subscriptionListener: WeakSubscriptionChangeListener? + var nativeSubscriptionChangeListener: UnsafeMutablePointer? + + weak var subscriptionChangeListener: ObservableSubscriptionChangeListener? + + static let symbolsAddedCallback: + dxfg_ObservableSubscriptionChangeListener_function_symbolsAdded = { _, symbols, context in + if let context = context { + let listener: AnyObject = bridge(ptr: context) + if let listener = listener as? WeakSubscriptionChangeListener { + if let symbols = symbols { + let result = SymbolMapper.newSymbols(symbols: symbols) + listener.value?.subscriptionChangeListener?.symbolsAdded(symbols: Set(result)) + } + } + } + } + + static let symbolsRemovedCallback: dxfg_ObservableSubscriptionChangeListener_function_symbolsRemoved + = { _, symbols, context in + if let context = context { + let listener: AnyObject = bridge(ptr: context) + if let listener = listener as? WeakSubscriptionChangeListener { + if let symbols = symbols { + let result = SymbolMapper.newSymbols(symbols: symbols) + listener.value?.subscriptionChangeListener?.symbolsRemoved(symbols: Set(result)) + } + } + } + } + + static let subscriptionClosedCallback: dxfg_ObservableSubscriptionChangeListener_function_subscriptionClosed + = {_, context in + if let context = context { + let listener: AnyObject = bridge(ptr: context) + if let listener = listener as? WeakSubscriptionChangeListener { + listener.value?.subscriptionChangeListener?.subscriptionClosed() + } + } + } + + deinit { + if let subscription = subscription { + let thread = currentThread() + _ = try? ErrorCheck.nativeCall(thread, + dxfg_JavaObjectHandler_release(thread, + &(subscription.pointee.handler))) + } + + if let nativeSubscriptionChangeListener = nativeSubscriptionChangeListener { + let thread = currentThread() + _ = try? ErrorCheck.nativeCall(thread, + dxfg_ObservableSubscription_removeChangeListener( + thread, + subscription, + nativeSubscriptionChangeListener)) + _ = try? ErrorCheck.nativeCall(thread, + dxfg_JavaObjectHandler_release( + thread, + &(nativeSubscriptionChangeListener.pointee.handler))) + } + } + + init(subscription: UnsafeMutablePointer?) { + self.subscription = subscription + } + + func isClosed() -> Bool { + let thread = currentThread() + let success = try? ErrorCheck.nativeCall(thread, dxfg_ObservableSubscription_isClosed(thread, + self.subscription)) + guard let success = success else { + return true + } + return success != 0 + } + + /// - Throws: GraalException. Rethrows exception from Java. + func addChangeListener(_ listener: ObservableSubscriptionChangeListener) throws { + if subscriptionChangeListener == nil { + let thread = currentThread() + + let weakListener = WeakSubscriptionChangeListener(value: self) + subscriptionListener = weakListener + let voidPtr = bridge(obj: weakListener) + + nativeSubscriptionChangeListener = try ErrorCheck.nativeCall( + thread, + dxfg_ObservableSubscriptionChangeListener_new(thread, + NativeObservableSubscription.symbolsAddedCallback, + NativeObservableSubscription.symbolsRemovedCallback, + NativeObservableSubscription.subscriptionClosedCallback, + voidPtr)) + _ = try ErrorCheck.nativeCall(thread, + dxfg_ObservableSubscription_addChangeListener(thread, + subscription, + nativeSubscriptionChangeListener)) + + } + subscriptionChangeListener = listener + } + + /// - Throws: GraalException. Rethrows exception from Java. + func removeChangeListener(_ listener: ObservableSubscriptionChangeListener) throws { + if listener === subscriptionChangeListener { + defer { + nativeSubscriptionChangeListener = nil + subscriptionChangeListener = nil + } + let thread = currentThread() + _ = try ErrorCheck.nativeCall(thread, + dxfg_ObservableSubscription_removeChangeListener( + thread, + subscription, + nativeSubscriptionChangeListener)) + if let nativeSubscriptionChangeListener = nativeSubscriptionChangeListener { + _ = try ErrorCheck.nativeCall(thread, + dxfg_JavaObjectHandler_release( + thread, + &(nativeSubscriptionChangeListener.pointee.handler))) + } + } + } +} diff --git a/DXFeedFramework/Native/Subscription/NativeSubscription.swift b/DXFeedFramework/Native/Subscription/NativeSubscription.swift index 6eab8c522..e1746d39e 100644 --- a/DXFeedFramework/Native/Subscription/NativeSubscription.swift +++ b/DXFeedFramework/Native/Subscription/NativeSubscription.swift @@ -20,7 +20,6 @@ class NativeSubscription { var nativeSubscriptionChangeListener: UnsafeMutablePointer? private var subscriptionListener: WeakSubscriptionChangeListener? - private let mapper = EventMapper() weak var listener: DXEventListener? weak var subscriptionChangeListener: ObservableSubscriptionChangeListener? @@ -62,17 +61,23 @@ class NativeSubscription { if let context = context { let listener: AnyObject = bridge(ptr: context) if let listener = listener as? WeakSubscriptionChangeListener { - listener.value?.subscriptionChangeListener?.symbolsAdded(symbols: [""]) + if let symbols = symbols { + let result = SymbolMapper.newSymbols(symbols: symbols) + listener.value?.subscriptionChangeListener?.symbolsAdded(symbols: Set(result)) + } } } } static let symbolsRemovedCallback: dxfg_ObservableSubscriptionChangeListener_function_symbolsRemoved - = {_, symbols, context in + = { _, symbols, context in if let context = context { let listener: AnyObject = bridge(ptr: context) if let listener = listener as? WeakSubscriptionChangeListener { - listener.value?.subscriptionChangeListener?.symbolsRemoved(symbols: [""]) + if let symbols = symbols { + let result = SymbolMapper.newSymbols(symbols: symbols) + listener.value?.subscriptionChangeListener?.symbolsRemoved(symbols: Set(result)) + } } } } @@ -82,7 +87,7 @@ class NativeSubscription { if let context = context { let listener: AnyObject = bridge(ptr: context) if let listener = listener as? WeakSubscriptionChangeListener { - listener.value?.subscriptionChangeListener?.subscriptionClosed()o + listener.value?.subscriptionChangeListener?.subscriptionClosed() } } } @@ -104,7 +109,21 @@ class NativeSubscription { dxfg_JavaObjectHandler_release(thread, &(subscription.pointee.handler))) } + + if let nativeSubscriptionChangeListener = nativeSubscriptionChangeListener { + let thread = currentThread() + _ = try? ErrorCheck.nativeCall(thread, + dxfg_DXFeedSubscription_removeChangeListener( + thread, + subscription, + nativeSubscriptionChangeListener)) + _ = try? ErrorCheck.nativeCall(thread, + dxfg_JavaObjectHandler_release( + thread, + &(nativeSubscriptionChangeListener.pointee.handler))) + } } + init(subscription: UnsafeMutablePointer?) { self.subscription = subscription } @@ -223,13 +242,22 @@ class NativeSubscription { /// - Throws: GraalException. Rethrows exception from Java. func removeChangeListener(_ listener: ObservableSubscriptionChangeListener) throws { if listener === subscriptionChangeListener { + defer { + nativeSubscriptionChangeListener = nil + subscriptionChangeListener = nil + } let thread = currentThread() _ = try ErrorCheck.nativeCall(thread, - dxfg_DXFeedSubscription_removeChangeListener(thread, - subscription, - nativeSubscriptionChangeListener)) - nativeSubscriptionChangeListener = nil - subscriptionChangeListener = nil + dxfg_DXFeedSubscription_removeChangeListener( + thread, + subscription, + nativeSubscriptionChangeListener)) + if let nativeSubscriptionChangeListener = nativeSubscriptionChangeListener { + _ = try ErrorCheck.nativeCall(thread, + dxfg_JavaObjectHandler_release( + thread, + &(nativeSubscriptionChangeListener.pointee.handler))) + } } } } diff --git a/DXFeedFramework/Native/SymbolMappers/SymbolMapper.swift b/DXFeedFramework/Native/SymbolMappers/SymbolMapper.swift index 3a7837138..fad5019e4 100644 --- a/DXFeedFramework/Native/SymbolMappers/SymbolMapper.swift +++ b/DXFeedFramework/Native/SymbolMappers/SymbolMapper.swift @@ -62,7 +62,7 @@ class SymbolMapper { } case WILDCARD: break - case INDEXED_EVENT_SUBSCRIPTION: break + case INDEXED_EVENT_SUBSCRIPTION: fatalError("Add case for INDEXED_EVENT_SUBSCRIPTION") case TIME_SERIES_SUBSCRIPTION: symbol.withMemoryRebound(to: dxfg_time_series_subscription_symbol_t.self, capacity: 1) { clearNative(symbol: $0.pointee.symbol) @@ -74,4 +74,48 @@ class SymbolMapper { symbol.deallocate() } } + + static func newSymbols(symbols: UnsafeMutablePointer) -> [AnyHashable] { + var results = [AnyHashable]() + for index in 0..) -> AnyHashable? { + let type = native.pointee.type + switch type { + case STRING: + let result = native.withMemoryRebound(to: dxfg_string_symbol_t.self, capacity: 1) { pointer in + String(pointee: pointer.pointee.symbol) + } + return result + case CANDLE: + let result = native.withMemoryRebound(to: dxfg_candle_symbol_t.self, capacity: 1) { pointer in + String(pointee: pointer.pointee.symbol) + } + return result + case WILDCARD: + return WildcardSymbol.all + case INDEXED_EVENT_SUBSCRIPTION: fatalError("Add case for INDEXED_EVENT_SUBSCRIPTION") + case TIME_SERIES_SUBSCRIPTION: + let result: TimeSeriesSubscriptionSymbol? = native.withMemoryRebound( + to: dxfg_time_series_subscription_symbol_t.self, capacity: 1) { pointer in + if let symbol = SymbolMapper.newSymbol(native: pointer.pointee.symbol) { + return TimeSeriesSubscriptionSymbol(symbol: symbol, fromTime: pointer.pointee.from_time) + } else { + return nil + } + } + return result + default: break + } + + return nil + } } diff --git a/DXFeedFrameworkTests/DXObservableSubscriptionTest.swift b/DXFeedFrameworkTests/DXObservableSubscriptionTest.swift new file mode 100644 index 000000000..f67060dcb --- /dev/null +++ b/DXFeedFrameworkTests/DXObservableSubscriptionTest.swift @@ -0,0 +1,66 @@ +// +// +// Copyright (C) 2023 Devexperts LLC. All rights reserved. +// This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/. +// + +import XCTest +@testable import DXFeedFramework + +final class DXObservableSubscriptionTest: XCTestCase { + class PublishListener: ObservableSubscriptionChangeListener { + let publisher: DXPublisher + + init(publisher: DXPublisher) { + self.publisher = publisher + } + + func symbolsAdded(symbols: Set) { + var events = [MarketEvent]() + symbols.forEach { symbol in + if let sSymbol = symbol as? Symbol { + if sSymbol.stringValue.hasSuffix(":TEST") { + var profile = Profile(sSymbol.stringValue) + profile.descriptionStr = "test symbol" + events.append(profile) + } + } + } + try? publisher.publish(events: events) + + } + + func symbolsRemoved(symbols: Set) { + // nothing to do here + } + + func subscriptionClosed() { + // nothing to do here + } + } + + override func setUpWithError() throws { + // Put setup code here. This method is called before the invocation of each test method in the class. + } + + override func tearDownWithError() throws { + // Put teardown code here. This method is called after the invocation of each test method in the class. + } + + func testCreateSubscription() throws { + let address = ":7700" + + let endpoint = try DXEndpoint.create(.publisher).connect(address) + let publisher = endpoint.getPublisher() + let listener = PublishListener(publisher: publisher!) + let subscription = try publisher?.getSubscription(EventCode.profile) + try subscription?.addChangeListener(listener) + + let receivedEventsExpectation = expectation(description: "Received events") + + wait(for: [receivedEventsExpectation], timeout: 20) + + } + +} diff --git a/Samples/Playgrounds/PublishProfiles.playground/Contents.swift b/Samples/Playgrounds/PublishProfiles.playground/Contents.swift new file mode 100644 index 000000000..a8149b559 --- /dev/null +++ b/Samples/Playgrounds/PublishProfiles.playground/Contents.swift @@ -0,0 +1,47 @@ +import UIKit +import PlaygroundSupport +import DXFeedFramework + +class PublishListener: ObservableSubscriptionChangeListener { + let publisher: DXPublisher + + init(publisher: DXPublisher) { + self.publisher = publisher + } + + func symbolsAdded(symbols: Set) { + var events = [MarketEvent]() + symbols.forEach { symbol in + if let sSymbol = symbol as? Symbol { + if sSymbol.stringValue.hasSuffix(":TEST") { + var profile = Profile(sSymbol.stringValue) + profile.descriptionStr = "test symbol" + events.append(profile) + } + } + } + try? publisher.publish(events: events) + } + + func symbolsRemoved(symbols: Set) { + // nothing to do here + } + + func subscriptionClosed() { + // nothing to do here + } +} + +let address = ":7700" + +let endpoint = try DXEndpoint.create(.publisher).connect(address) +let publisher = endpoint.getPublisher() +let listener = PublishListener(publisher: publisher!) +let subscription = try publisher?.getSubscription(EventCode.profile) +try subscription?.addChangeListener(listener) + +// infinity execution +PlaygroundPage.current.needsIndefiniteExecution = true + +// to finish execution run this line +PlaygroundPage.current.finishExecution() diff --git a/Samples/Playgrounds/PublishProfiles.playground/contents.xcplayground b/Samples/Playgrounds/PublishProfiles.playground/contents.xcplayground new file mode 100644 index 000000000..cf026f228 --- /dev/null +++ b/Samples/Playgrounds/PublishProfiles.playground/contents.xcplayground @@ -0,0 +1,4 @@ + + + + \ No newline at end of file