Skip to content

Commit

Permalink
Merge pull request #191 from alpacahq/rc4
Browse files Browse the repository at this point in the history
Fix exception handling, timestamp precisions
  • Loading branch information
Shlomi Kushchi authored May 6, 2020
2 parents a735f24 + 8e884e3 commit b342fef
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 18 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ the corresponding channel names. For example, if you subscribe to
`trade_updates`, a WebSocket connects to Alpaca stream API, and
if `AM.*` given to the `subscribe()` method, a WebSocket connection is
established to Polygon's interface. If your account is enabled for
Alpaca Data API streaming, adding `polyfeed/` prefix to `T.<symbol>`,
Alpaca Data API streaming, adding `alpacadatav1/` prefix to `T.<symbol>`,
`Q.<symbol>` and `AM.<symbol>` will also connect to the data stream
interface.

Expand Down Expand Up @@ -249,7 +249,7 @@ async def on_second_bars(conn, channel, bar):
conn.run(['trade_updates', 'AM.*'])

# if Data API streaming is enabled
# conn.run(['trade_updates', 'polyfeed/AM.SPY'])
# conn.run(['trade_updates', 'alpacadatav1/AM.SPY'])

```

Expand Down
2 changes: 1 addition & 1 deletion alpaca_trade_api/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .rest import REST # noqa
from .stream2 import StreamConn # noqa

__version__ = '0.47rc3'
__version__ = '0.47rc4'
20 changes: 14 additions & 6 deletions alpaca_trade_api/entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,20 @@ def __getattr__(self, key):
if key in self._raw:
val = self._raw[key]
if key == 'timestamp':
return pd.Timestamp(val, tz=NY, unit='ms')
return pd.Timestamp(val, tz=NY, unit=self.unit)
return val
return getattr(super(), key)


class Agg(_Timestamped, Entity):
class _NanoTimestamped(_Timestamped):
unit = 'ns'


class _MilliTimestamped(_Timestamped):
unit = 'ms'


class Agg(_MilliTimestamped, Entity):
pass


Expand Down Expand Up @@ -185,11 +193,11 @@ def df(self):
return self._df


class Trade(_Timestamped, Entity):
class Trade(_NanoTimestamped, Entity):
pass


class Quote(_Timestamped, Entity):
class Quote(_NanoTimestamped, Entity):
pass


Expand Down Expand Up @@ -247,7 +255,7 @@ def df(self):


trade_mapping = {
"sym": "symbol",
"T": "symbol",
"c": "conditions",
"x": "exchange",
"p": "price",
Expand All @@ -268,7 +276,7 @@ def df(self):
}

agg_mapping = {
"sym": "symbol",
"T": "symbol",
"o": "open",
"c": "close",
"h": "high",
Expand Down
4 changes: 4 additions & 0 deletions alpaca_trade_api/polygon/streamconn.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ async def _recv(self):
await self.close()
asyncio.ensure_future(self._ensure_ws())

async def consume(self):
if self._consume_task:
await self._consume_task

async def _consume_msg(self):
async for data in self._stream:
stream = data.get('ev')
Expand Down
20 changes: 13 additions & 7 deletions alpaca_trade_api/stream2.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from .common import get_base_url, get_data_url, get_credentials
from .entity import Account, Entity, trade_mapping, agg_mapping, quote_mapping
from . import polygon
from .polygon.entity import Trade, Quote, Agg
from .entity import Trade, Quote, Agg
import logging


Expand Down Expand Up @@ -52,6 +52,10 @@ async def _connect(self):

self._consume_task = asyncio.ensure_future(self._consume_msg())

async def consume(self):
if self._consume_task:
await self._consume_task

async def _consume_msg(self):
ws = self._ws
try:
Expand Down Expand Up @@ -239,26 +243,28 @@ async def subscribe(self, channels):
async def unsubscribe(self, channels):
'''Handle unsubscribing from channels.'''

data_prefixes = ('Q.', 'T.', 'AM.')
if self._data_stream == 'polygon':
data_prefixes = ('Q.', 'T.', 'A.', 'AM.')

data_channels = [
c for c in channels
if c.startswith(data_prefixes)
if c.startswith(self._data_prefixes)
]

if data_channels:
await self.data_ws.unsubscribe(data_channels)

async def consume(self):
await asyncio.gather(
self.trading_ws.consume(),
self.data_ws.consume(),
)

def run(self, initial_channels=[]):
'''Run forever and block until exception is raised.
initial_channels is the channels to start with.
'''
loop = self.loop
try:
loop.run_until_complete(self.subscribe(initial_channels))
loop.run_forever()
loop.run_until_complete(self.consume())
except KeyboardInterrupt:
logging.info("Exiting on Interrupt")
finally:
Expand Down
4 changes: 2 additions & 2 deletions tests/test_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ def test_data(reqmock):
"cond2": 16,
"cond3": 0,
"cond4": 0,
"timestamp": 1518086464720
"timestamp": 1518101436000900000
}
}
'''
Expand All @@ -422,7 +422,7 @@ def test_data(reqmock):
"bidprice": 159.45,
"bidsize": 20,
"bidexchange": 12,
"timestamp": 1518086601843
"timestamp": 1518101436000900000
}
}'''
)
Expand Down

0 comments on commit b342fef

Please sign in to comment.