Skip to content

Commit

Permalink
IPFCollector add each listener(it receives full batch after adding)
Browse files Browse the repository at this point in the history
  • Loading branch information
kosyloa committed Nov 2, 2023
1 parent d6f5be8 commit c695db1
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 42 deletions.
17 changes: 2 additions & 15 deletions DXFeedFramework/Ipf/Live/DXInstrumentProfileCollector.swift
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,7 @@ public class DXInstrumentProfileCollector {
public func add<O>(observer: O) throws
where O: DXInstrumentProfileUpdateListener,
O: Hashable {
try listeners.reader { [weak self] in
if $0.count == 0 {
try self?.native.addListener(self)
}
}
try native.addListener(observer)
listeners.insert(observer)
}

Expand All @@ -50,6 +46,7 @@ public class DXInstrumentProfileCollector {
public func remove<O>(observer: O)
where O: DXInstrumentProfileUpdateListener,
O: Hashable {
native.removeListener(observer)
listeners.remove(observer)
}

Expand Down Expand Up @@ -125,13 +122,3 @@ public class DXInstrumentProfileCollector {
}
}

extension DXInstrumentProfileCollector: DXInstrumentProfileUpdateListener {
public func instrumentProfilesUpdated(_ instruments: [InstrumentProfile]) {
listeners.reader { items in
let enumerator = items.objectEnumerator()
while let observer = enumerator.nextObject() as? DXInstrumentProfileUpdateListener {
observer.instrumentProfilesUpdated(instruments)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,18 @@ import Foundation
/// Native wrapper over the Java com.dxfeed.ipf.live.InstrumentProfileCollector class.
/// The location of the imported functions is in the header files "dxfg_ipf.h".
public class NativeInstrumentProfileCollector {
private class WeakListener: WeakBox<NativeInstrumentProfileCollector> { }
private class WeakListener: WeakBox<DXInstrumentProfileUpdateListener> { }
private static let listeners = ConcurrentArray<WeakListener>()

let collector: UnsafeMutablePointer<dxfg_ipf_collector_t>?
private var nativeListener: UnsafeMutablePointer<dxfg_ipf_update_listener_t>?
private var nativeListeners = [UnsafeMutablePointer<dxfg_ipf_update_listener_t>]()

private weak var listener: DXInstrumentProfileUpdateListener?
private static let mapper = InstrumentProfileMapper()

private static let finalizeCallback: dxfg_finalize_function = { _, context in
if let context = context {
let endpoint: AnyObject = bridge(ptr: context)
if let listener = endpoint as? WeakListener {
if let listener = endpoint as? WeakListener {
NativeInstrumentProfileCollector.listeners.removeAll(where: {
return $0 === listener
})
Expand All @@ -49,13 +48,13 @@ public class NativeInstrumentProfileCollector {
print("NativeInstrumentProfileCollector: exception \(error)")
}
}
listener.value?.listener?.instrumentProfilesUpdated(profiles)
listener.value?.instrumentProfilesUpdated(profiles)
}
}
}

private func removeListener() {
if let nativeListener = nativeListener {
deinit {
nativeListeners.forEach { nativeListener in
let thread = currentThread()
_ = try? ErrorCheck.nativeCall(thread,
dxfg_InstrumentProfileCollector_removeUpdateListener(thread,
Expand All @@ -64,13 +63,8 @@ public class NativeInstrumentProfileCollector {
_ = try? ErrorCheck.nativeCall(thread,
dxfg_JavaObjectHandler_release(thread,
&(nativeListener.pointee.handler)))
self.nativeListener = nil
self.listener = nil
}
}

deinit {
removeListener()
nativeListeners.removeAll()
let thread = currentThread()
if let collector = collector {
_ = try? ErrorCheck.nativeCall(thread,
Expand Down Expand Up @@ -127,11 +121,12 @@ public class NativeInstrumentProfileCollector {
}

func addListener(_ listener: DXInstrumentProfileUpdateListener?) throws {
removeListener()
guard let listener = listener else {
return
}
let thread = currentThread()
self.listener = listener

let weakListener = WeakListener(value: self)
let weakListener = WeakListener(value: listener)
NativeInstrumentProfileCollector.listeners.append(newElement: weakListener)
let voidPtr = bridge(obj: weakListener)

Expand All @@ -140,8 +135,6 @@ public class NativeInstrumentProfileCollector {
dxfg_InstrumentProfileUpdateListener_new(thread,
callback,
voidPtr))
self.nativeListener = nativeListener

try ErrorCheck.nativeCall(thread, dxfg_Object_finalize(thread,
&(nativeListener.pointee.handler),
NativeInstrumentProfileCollector.finalizeCallback,
Expand All @@ -150,7 +143,17 @@ public class NativeInstrumentProfileCollector {
_ = try ErrorCheck.nativeCall(thread,
dxfg_InstrumentProfileCollector_addUpdateListener(thread,
self.collector,
self.nativeListener))
nativeListener))
self.nativeListeners.append(nativeListener)
}

func removeListener(_ listener: DXInstrumentProfileUpdateListener?) {
NativeInstrumentProfileCollector.listeners.reader {
$0.forEach { weakListener in
if weakListener.value === listener {
weakListener.resetValue()
}
}
}
}
}
4 changes: 4 additions & 0 deletions DXFeedFramework/Utils/WeakBox.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,8 @@ class WeakBox<T> {
init(value: T) {
self._value = value as AnyObject
}

func resetValue() {
_value = nil
}
}
46 changes: 38 additions & 8 deletions DXFeedFrameworkTests/IPFTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -175,25 +175,55 @@ STOCK,EREGL:TR,EREĞLİ DEMİR VE ÇELİK FABRİKALARI1 T.A.Ş.,TR,XIST,XIST,TRY
wait(for: [expectation], timeout: 1.0)
}

func testProfileListener() throws {
func testProfileMultipleListeners() throws {
let collector = try DXInstrumentProfileCollector()
let expectation = expectation(description: "Received profile")
let newProfile = InstrumentProfile()
newProfile.symbol = "TEST_123"
let expectation1 = expectation(description: "Received profile")
expectation1.assertForOverFulfill = false
let expectation3 = expectation(description: "Received profile")
expectation3.assertForOverFulfill = false
let bothListnersProfile = InstrumentProfile()
bothListnersProfile.symbol = "TEST_123"
let firstListnerProfile = InstrumentProfile()
bothListnersProfile.symbol = "OnlyFirst_123"
let listener = AnonymousProfileListener { anonymCl in
anonymCl.callback = { profiles in
if profiles.count == 1 {
let profile = profiles.first
if profile == newProfile {
expectation.fulfill()
if profile == bothListnersProfile {
expectation1.fulfill()
}
if profile == firstListnerProfile {
expectation3.fulfill()
}
}
}
return anonymCl
}
let expectation2 = expectation(description: "Received profile after publish")
expectation2.assertForOverFulfill = false

let listener2 = AnonymousProfileListener { anonymCl in
anonymCl.callback = { profiles in
if profiles.count == 1 {
let profile = profiles.first
if profile == bothListnersProfile {
expectation2.fulfill()
}
if profile == firstListnerProfile {
XCTAssert(false)
}
}
}
return anonymCl
}
try collector.updateInstrumentProfile(profile: bothListnersProfile)
wait(seconds: 1)
try collector.add(observer: listener)
try collector.updateInstrumentProfile(profile: newProfile)
wait(for: [expectation], timeout: 2.0)
try collector.add(observer: listener2)
wait(for: [expectation1, expectation2], timeout: 2.0)
collector.remove(observer: listener2)
try collector.updateInstrumentProfile(profile: firstListnerProfile)
wait(for: [expectation3], timeout: 2.0)
}

func testConnectionState() {
Expand Down

0 comments on commit c695db1

Please sign in to comment.