From 903a2f5af335149ac770af5d858270cb96bb23ec Mon Sep 17 00:00:00 2001 From: Platon Floria Date: Wed, 1 May 2024 10:55:48 +0300 Subject: [PATCH] feat: reconnect websocket --- main.py | 48 +++++++++++++++++++++++++++++------------------- 1 file changed, 29 insertions(+), 19 deletions(-) diff --git a/main.py b/main.py index 5a8077222..592b32d62 100644 --- a/main.py +++ b/main.py @@ -6,6 +6,8 @@ Licensed under MIT """ import asyncio +import logging +import websockets from fastlane_bot.exceptions import AsyncUpdateRetryException, ReadOnlyException, FlashloanUnavailableException from fastlane_bot.events.version_utils import check_version_requirements @@ -67,6 +69,9 @@ load_dotenv() +logger = logging.getLogger(__name__) + + pool_data_lock = Lock() @@ -327,25 +332,30 @@ async def inner(mgr): from fastlane_bot.events.listener import EventListener base_exchanges = ["carbon_v1", "uniswap_v3", "uniswap_v2", "bancor_pol", "bancor_v2", "bancor_v3", "solidly_v2"] - async with AsyncWeb3.persistent_websocket(WebsocketProviderV2(mgr.cfg.network.WEBSOCKET_URL)) as w3: - event_listener = EventListener(manager=mgr, base_exchanges=base_exchanges, w3=w3) - async for events in event_listener.pull_block_events(): - with pool_data_lock: - for event in events: - print(event) - mgr.update_from_event(event) - - with pool_data_lock: - current_block = mgr.web3.eth.block_number - - # Update new pool events from contracts - if len(mgr.pools_to_add_from_contracts) > 0: - mgr.cfg.logger.info( - f"Adding {len(mgr.pools_to_add_from_contracts)} new pools from contracts, " - f"{len(mgr.pool_data)} total pools currently exist. Current block: {current_block}." - ) - _run_async_update_with_retries(mgr, current_block=current_block) - mgr.pools_to_add_from_contracts = [] + while True: + try: + async with AsyncWeb3.persistent_websocket(WebsocketProviderV2(mgr.cfg.network.WEBSOCKET_URL)) as w3: + event_listener = EventListener(manager=mgr, base_exchanges=base_exchanges, w3=w3) + async for events in event_listener.pull_block_events(): + with pool_data_lock: + for event in events: + print(event) + mgr.update_from_event(event) + + with pool_data_lock: + current_block = mgr.web3.eth.block_number + + # Update new pool events from contracts + if len(mgr.pools_to_add_from_contracts) > 0: + mgr.cfg.logger.info( + f"Adding {len(mgr.pools_to_add_from_contracts)} new pools from contracts, " + f"{len(mgr.pool_data)} total pools currently exist. Current block: {current_block}." + ) + _run_async_update_with_retries(mgr, current_block=current_block) + mgr.pools_to_add_from_contracts = [] + except websockets.exceptions.ConnectionClosed: + logger.info("Websocket connection lost. Reconnecting ...") + await asyncio.sleep(1) asyncio.run(inner(mgr))