Skip to content

Commit

Permalink
Merge pull request #11 from dxFeed/EN-1414-refactor-sub-dealloc
Browse files Browse the repository at this point in the history
En 1414 refactor sub dealloc
  • Loading branch information
alimantu authored May 20, 2020
2 parents 63186a9 + 0e269a0 commit 7e04f57
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 34 deletions.
50 changes: 23 additions & 27 deletions dxfeed/core/DXFeedPy.pyx
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# distutils: language = c++
# cython: always_allow_keywords=True
from libcpp.deque cimport deque as cppdq

from dxfeed.core.utils.helpers cimport *
from dxfeed.core.utils.helpers import *
Expand All @@ -12,6 +11,7 @@ from datetime import datetime
import pandas as pd
from typing import Optional, Union, Iterable
from warnings import warn
from weakref import WeakSet

# for importing variables
import dxfeed.core.listeners.listener as lis
Expand Down Expand Up @@ -53,24 +53,29 @@ cdef class ConnectionClass:
Data structure that contains connection
"""
cdef clib.dxf_connection_t connection
# sub_ptr_list contains pointers to all subscriptions related to current connection
cdef cppdq[clib.dxf_subscription_t *] sub_ptr_list
# each subscription has its own index in a list
cdef int subs_order
cdef object __sub_refs

def __init__(self):
self.subs_order = 0
self.__sub_refs = WeakSet()

def __dealloc__(self):
dxf_close_connection(self)

def get_sub_refs(self):
"""
Method to get list of references to all subscriptions related to current connection
Returns
-------
:list
List of weakref objects. Empty list if no refs
"""
return list(self.__sub_refs)

cpdef SubscriptionClass make_new_subscription(self, data_len: int):
cdef SubscriptionClass out = SubscriptionClass(data_len)
out.connection = self.connection
self.sub_ptr_list.push_back(&out.subscription) # append pointer to new subscription
out.subscription_order = self.subs_order # assign each subscription an index
self.subs_order += 1
out.con_sub_list_ptr = &self.sub_ptr_list # reverse pointer to pointers list
self.__sub_refs.add(out)
return out


Expand All @@ -80,9 +85,8 @@ cdef class SubscriptionClass:
"""
cdef clib.dxf_connection_t connection
cdef clib.dxf_subscription_t subscription
cdef int subscription_order # index in list of subscription pointers
cdef cppdq[clib.dxf_subscription_t *] *con_sub_list_ptr # pointer to list of subscription pointers
cdef dxf_event_listener_t listener
cdef object __weakref__ # Weak referencing enabling
cdef object event_type_str
cdef list columns
cdef object data
Expand All @@ -105,9 +109,7 @@ cdef class SubscriptionClass:
self.listener = NULL

def __dealloc__(self):
if self.subscription: # if connection is not closed
clib.dxf_close_subscription(self.subscription)
self.con_sub_list_ptr[0][self.subscription_order] = NULL
dxf_close_subscription(self)

def get_data(self):
"""
Expand Down Expand Up @@ -187,7 +189,7 @@ def dxf_create_connection_auth_bearer(address: Union[str, unicode, bytes],
address = address.encode('utf-8')
token = token.encode('utf-8')
clib.dxf_create_connection_auth_bearer(address, token,
NULL, NULL, NULL, NULL, NULL, &cc.connection)
NULL, NULL, NULL, NULL, NULL, &cc.connection)
error_code = process_last_error(verbose=False)
if error_code:
raise RuntimeError(f"In underlying C-API library error {error_code} occurred!")
Expand Down Expand Up @@ -229,7 +231,7 @@ def dxf_create_subscription(ConnectionClass cc, event_type: str, candle_time: Op
candle_time = datetime.strptime(candle_time, '%Y-%m-%d %H:%M:%S') if candle_time else datetime.utcnow()
timestamp = int((candle_time - datetime(1970, 1, 1)).total_seconds()) * 1000 - 5000
except ValueError:
raise Exception("Inapropriate date format, should be %Y-%m-%d %H:%M:%S")
raise Exception("Inappropriate date format, should be %Y-%m-%d %H:%M:%S")

if event_type == 'Candle':
clib.dxf_create_subscription_timed(sc.connection, et_type_int, timestamp, &sc.subscription)
Expand Down Expand Up @@ -368,16 +370,11 @@ def dxf_close_connection(ConnectionClass cc):
cc: ConnectionClass
Variable with connection information
"""

# close all subscriptions before closing the connection
for i in range(cc.sub_ptr_list.size()):
if cc.sub_ptr_list[i]: # subscription should not be closed previously
clib.dxf_close_subscription(cc.sub_ptr_list[i][0])
cc.sub_ptr_list[i][0] = NULL # mark subscription as closed

cc.sub_ptr_list.clear()

if cc.connection:
related_subs = cc.get_sub_refs()
for sub in related_subs:
dxf_close_subscription(sub)

clib.dxf_close_connection(cc.connection)
cc.connection = NULL

Expand All @@ -393,7 +390,6 @@ def dxf_close_subscription(SubscriptionClass sc):
if sc.subscription:
clib.dxf_close_subscription(sc.subscription)
sc.subscription = NULL
sc.con_sub_list_ptr[0][sc.subscription_order] = NULL

def dxf_get_current_connection_status(ConnectionClass cc, return_str: bool=True):
"""
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "dxfeed"
version = "0.2.0"
version = "0.2.1"
description = "DXFeed Python API via C API"
authors = ["Index Management Team <[email protected]>"]
build = "build.py"
Expand Down
9 changes: 3 additions & 6 deletions tests/test_dxfeedpy.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import dxfeed as dx
import pytest


class ValueStorage(object): # config
demo_address = 'demo.dxfeed.com:7300'
event_types = ['Trade', 'Quote', 'Summary', 'Profile', 'Order', 'TimeAndSale', 'Candle', 'TradeETH', 'SpreadOrder',
Expand Down Expand Up @@ -39,8 +40,7 @@ def test_fail_create_subscription_with_no_connection():


@pytest.mark.xfail()
def test_fail_to_use_subscription_without_connection():
connection = dx.dxf_create_connection(ValueStorage.demo_address)
def test_fail_to_use_subscription_without_connection(connection):
sub = dx.dxf_create_subscription(cc=connection, event_type='Trade')
dx.dxf_close_connection(connection)
dx.dxf_add_symbols(sc=sub, symbols=['AAPL'])
Expand Down Expand Up @@ -99,9 +99,7 @@ def test_symbol_clearing(connection):


@pytest.mark.parametrize('sub_type', ValueStorage.event_types)
def test_default_listeners(sub_type):
# fixture usage causes errors with, probably, threads
connection = dx.dxf_create_connection(ValueStorage.demo_address)
def test_default_listeners(connection, sub_type):
con_err_status = dx.process_last_error(verbose=False)
sub = dx.dxf_create_subscription(connection, sub_type)
sub_err_status = dx.process_last_error(verbose=False)
Expand All @@ -110,6 +108,5 @@ def test_default_listeners(sub_type):
dx.dxf_detach_listener(sub)
drop_lis_err_status = dx.process_last_error(verbose=False)
dx.dxf_close_subscription(sub)
dx.dxf_close_connection(connection)
assert (con_err_status, sub_err_status, add_lis_err_status, drop_lis_err_status) == (0, 0, 0, 0)

0 comments on commit 7e04f57

Please sign in to comment.