diff --git a/README.md b/README.md index f5823649..1de12416 100644 --- a/README.md +++ b/README.md @@ -205,7 +205,7 @@ the corresponding channel names. For example, if you subscribe to `trade_updates`, a WebSocket connects to Alpaca stream API, and if `AM.*` given to the `subscribe()` method, a WebSocket connection is established to Polygon's interface. If your account is enabled for -Alpaca Data API streaming, adding `polyfeed/` prefix to `T.`, +Alpaca Data API streaming, adding `alpacadatav1/` prefix to `T.`, `Q.` and `AM.` will also connect to the data stream interface. @@ -249,7 +249,7 @@ async def on_second_bars(conn, channel, bar): conn.run(['trade_updates', 'AM.*']) # if Data API streaming is enabled -# conn.run(['trade_updates', 'polyfeed/AM.SPY']) +# conn.run(['trade_updates', 'alpacadatav1/AM.SPY']) ``` diff --git a/alpaca_trade_api/__init__.py b/alpaca_trade_api/__init__.py index 9730fdca..998b3663 100644 --- a/alpaca_trade_api/__init__.py +++ b/alpaca_trade_api/__init__.py @@ -1,4 +1,4 @@ from .rest import REST # noqa from .stream2 import StreamConn # noqa -__version__ = '0.47rc3' +__version__ = '0.47rc4' diff --git a/alpaca_trade_api/entity.py b/alpaca_trade_api/entity.py index cbfa41ad..97f9c491 100644 --- a/alpaca_trade_api/entity.py +++ b/alpaca_trade_api/entity.py @@ -134,12 +134,20 @@ def __getattr__(self, key): if key in self._raw: val = self._raw[key] if key == 'timestamp': - return pd.Timestamp(val, tz=NY, unit='ms') + return pd.Timestamp(val, tz=NY, unit=self.unit) return val return getattr(super(), key) -class Agg(_Timestamped, Entity): +class _NanoTimestamped(_Timestamped): + unit = 'ns' + + +class _MilliTimestamped(_Timestamped): + unit = 'ms' + + +class Agg(_MilliTimestamped, Entity): pass @@ -185,11 +193,11 @@ def df(self): return self._df -class Trade(_Timestamped, Entity): +class Trade(_NanoTimestamped, Entity): pass -class Quote(_Timestamped, Entity): +class Quote(_NanoTimestamped, Entity): pass @@ -247,7 +255,7 @@ def df(self): trade_mapping = { - "sym": "symbol", + "T": "symbol", "c": "conditions", "x": "exchange", "p": "price", @@ -268,7 +276,7 @@ def df(self): } agg_mapping = { - "sym": "symbol", + "T": "symbol", "o": "open", "c": "close", "h": "high", diff --git a/alpaca_trade_api/polygon/streamconn.py b/alpaca_trade_api/polygon/streamconn.py index 110ffdd4..0ce9b2fc 100644 --- a/alpaca_trade_api/polygon/streamconn.py +++ b/alpaca_trade_api/polygon/streamconn.py @@ -98,6 +98,10 @@ async def _recv(self): await self.close() asyncio.ensure_future(self._ensure_ws()) + async def consume(self): + if self._consume_task: + await self._consume_task + async def _consume_msg(self): async for data in self._stream: stream = data.get('ev') diff --git a/alpaca_trade_api/stream2.py b/alpaca_trade_api/stream2.py index e890c6f0..e9a79660 100644 --- a/alpaca_trade_api/stream2.py +++ b/alpaca_trade_api/stream2.py @@ -6,7 +6,7 @@ from .common import get_base_url, get_data_url, get_credentials from .entity import Account, Entity, trade_mapping, agg_mapping, quote_mapping from . import polygon -from .polygon.entity import Trade, Quote, Agg +from .entity import Trade, Quote, Agg import logging @@ -52,6 +52,10 @@ async def _connect(self): self._consume_task = asyncio.ensure_future(self._consume_msg()) + async def consume(self): + if self._consume_task: + await self._consume_task + async def _consume_msg(self): ws = self._ws try: @@ -239,18 +243,20 @@ async def subscribe(self, channels): async def unsubscribe(self, channels): '''Handle unsubscribing from channels.''' - data_prefixes = ('Q.', 'T.', 'AM.') - if self._data_stream == 'polygon': - data_prefixes = ('Q.', 'T.', 'A.', 'AM.') - data_channels = [ c for c in channels - if c.startswith(data_prefixes) + if c.startswith(self._data_prefixes) ] if data_channels: await self.data_ws.unsubscribe(data_channels) + async def consume(self): + await asyncio.gather( + self.trading_ws.consume(), + self.data_ws.consume(), + ) + def run(self, initial_channels=[]): '''Run forever and block until exception is raised. initial_channels is the channels to start with. @@ -258,7 +264,7 @@ def run(self, initial_channels=[]): loop = self.loop try: loop.run_until_complete(self.subscribe(initial_channels)) - loop.run_forever() + loop.run_until_complete(self.consume()) except KeyboardInterrupt: logging.info("Exiting on Interrupt") finally: diff --git a/tests/test_rest.py b/tests/test_rest.py index 4a2816e1..381d0600 100644 --- a/tests/test_rest.py +++ b/tests/test_rest.py @@ -399,7 +399,7 @@ def test_data(reqmock): "cond2": 16, "cond3": 0, "cond4": 0, - "timestamp": 1518086464720 + "timestamp": 1518101436000900000 } } ''' @@ -422,7 +422,7 @@ def test_data(reqmock): "bidprice": 159.45, "bidsize": 20, "bidexchange": 12, - "timestamp": 1518086601843 + "timestamp": 1518101436000900000 } }''' )