Skip to content

Commit

Permalink
feat: reconnect websocket
Browse files Browse the repository at this point in the history
  • Loading branch information
platonfloria committed May 2, 2024
1 parent 3b62853 commit fcc4978
Showing 1 changed file with 29 additions and 19 deletions.
48 changes: 29 additions & 19 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
Licensed under MIT
"""
import asyncio
import logging
import websockets

from fastlane_bot.exceptions import AsyncUpdateRetryException, ReadOnlyException, FlashloanUnavailableException
from fastlane_bot.events.version_utils import check_version_requirements
Expand Down Expand Up @@ -67,6 +69,9 @@
load_dotenv()


logger = logging.getLogger(__name__)


pool_data_lock = Lock()


Expand Down Expand Up @@ -327,25 +332,30 @@ async def inner(mgr):
from fastlane_bot.events.listener import EventListener

base_exchanges = ["carbon_v1", "uniswap_v3", "uniswap_v2", "bancor_pol", "bancor_v2", "bancor_v3", "solidly_v2"]
async with AsyncWeb3.persistent_websocket(WebsocketProviderV2(mgr.cfg.network.WEBSOCKET_URL)) as w3:
event_listener = EventListener(manager=mgr, base_exchanges=base_exchanges, w3=w3)
async for events in event_listener.pull_block_events():
with pool_data_lock:
for event in events:
print(event)
mgr.update_from_event(event)

with pool_data_lock:
current_block = mgr.web3.eth.block_number

# Update new pool events from contracts
if len(mgr.pools_to_add_from_contracts) > 0:
mgr.cfg.logger.info(
f"Adding {len(mgr.pools_to_add_from_contracts)} new pools from contracts, "
f"{len(mgr.pool_data)} total pools currently exist. Current block: {current_block}."
)
_run_async_update_with_retries(mgr, current_block=current_block)
mgr.pools_to_add_from_contracts = []
while True:
try:
async with AsyncWeb3.persistent_websocket(WebsocketProviderV2(mgr.cfg.network.WEBSOCKET_URL)) as w3:
event_listener = EventListener(manager=mgr, base_exchanges=base_exchanges, w3=w3)
async for events in event_listener.pull_block_events():
with pool_data_lock:
for event in events:
print(event)
mgr.update_from_event(event)

with pool_data_lock:
current_block = mgr.web3.eth.block_number

# Update new pool events from contracts
if len(mgr.pools_to_add_from_contracts) > 0:
mgr.cfg.logger.info(
f"Adding {len(mgr.pools_to_add_from_contracts)} new pools from contracts, "
f"{len(mgr.pool_data)} total pools currently exist. Current block: {current_block}."
)
_run_async_update_with_retries(mgr, current_block=current_block)
mgr.pools_to_add_from_contracts = []
except websockets.exceptions.ConnectionClosed:
logger.info("Websocket connection lost. Reconnecting ...")
await asyncio.sleep(1)

asyncio.run(inner(mgr))

Expand Down

0 comments on commit fcc4978

Please sign in to comment.