diff --git a/NBTest_068_exceptions.py b/NBTest_068_exceptions.py index 6c728f302..923fc9f7b 100644 --- a/NBTest_068_exceptions.py +++ b/NBTest_068_exceptions.py @@ -1,7 +1,7 @@ # Happy path test with various realistic test values import pytest -from fastlane_bot.events.exceptions import AyncUpdateRetryException +from fastlane_bot.events.exceptions import AsyncUpdateRetryException @pytest.mark.parametrize( @@ -14,7 +14,7 @@ ) def test_aync_update_retry_exception_with_message(message, id): # Act - exception = AyncUpdateRetryException(message) + exception = AsyncUpdateRetryException(message) # Assert assert str(exception) == message, f"Test case {id} failed: The exception message does not match the expected message." @@ -28,7 +28,7 @@ def test_aync_update_retry_exception_with_message(message, id): ) def test_aync_update_retry_exception_with_empty_message(message, id): # Act - exception = AyncUpdateRetryException(message) + exception = AsyncUpdateRetryException(message) # Assert assert str(exception) == message, f"Test case {id} failed: The exception message should be empty." @@ -44,5 +44,5 @@ def test_aync_update_retry_exception_with_empty_message(message, id): ) def test_aync_update_retry_exception_raises(message, id): # Act & Assert - with pytest.raises(AyncUpdateRetryException, match=message): - raise AyncUpdateRetryException(message) + with pytest.raises(AsyncUpdateRetryException, match=message): + raise AsyncUpdateRetryException(message) diff --git a/fastlane_bot/events/async_event_update_utils.py b/fastlane_bot/events/async_event_update_utils.py index cf66f2005..84fa45f0f 100644 --- a/fastlane_bot/events/async_event_update_utils.py +++ b/fastlane_bot/events/async_event_update_utils.py @@ -2,7 +2,6 @@ import os import time from glob import glob -from random import shuffle from typing import Any, List, Dict, Tuple, Type, Callable import nest_asyncio @@ -14,7 +13,6 @@ from fastlane_bot.data.abi import ERC20_ABI from fastlane_bot.events.async_utils import get_contract_chunks -from fastlane_bot.events.exceptions import AyncUpdateRetryException from fastlane_bot.events.utils import update_pools_from_events nest_asyncio.apply() @@ -544,45 +542,3 @@ def async_update_pools_from_contracts(mgr: Any, current_block: int, logging_path mgr.cfg.logger.info( f"Async Updating pools from contracts took {(time.time() - start_time):0.4f} seconds" ) - - -def update_remaining_pools(mgr): - remaining_pools = [] - all_events = [pool[2] for pool in mgr.pools_to_add_from_contracts] - for event in all_events: - addr = mgr.web3.to_checksum_address(event["address"]) - ex_name = mgr.exchange_name_from_event(event) - if not ex_name: - mgr.cfg.logger.warning("[run_async_update_with_retries] ex_name not found from event") - continue - - key, key_value = mgr.get_key_and_value(event, addr, ex_name) - pool_info = mgr.get_pool_info(key, key_value, ex_name) - - if not pool_info: - remaining_pools.append((addr, ex_name, event, key, key_value)) - - shuffle(remaining_pools) # shuffle to avoid repeated immediate failure of the same pool - return remaining_pools - - -def run_async_update_with_retries(mgr, current_block, logging_path, retry_interval=1, max_retries=5): - failed_async_calls = 0 - - while failed_async_calls < max_retries: - try: - async_update_pools_from_contracts(mgr, current_block, logging_path) - return # Successful execution - except AyncUpdateRetryException as e: - failed_async_calls += 1 - mgr.pools_to_add_from_contracts = update_remaining_pools(mgr) - mgr.cfg.logger.error(f"Attempt {failed_async_calls} failed: {e}") - - # Handling failure after retries - mgr.cfg.logger.error( - f"[main run.py] async_update_pools_from_contracts failed after {len(mgr.pools_to_add_from_contracts)} attempts. List of failed pools: {mgr.pools_to_add_from_contracts}" - ) - mgr.pools_to_add_from_contracts = [] - raise Exception("[main run.py] async_update_pools_from_contracts failed after maximum retries.") - - diff --git a/fastlane_bot/events/exceptions.py b/fastlane_bot/events/exceptions.py index 7e9e2e8b2..0bbb1da3b 100644 --- a/fastlane_bot/events/exceptions.py +++ b/fastlane_bot/events/exceptions.py @@ -7,7 +7,7 @@ def __str__(self): f"create this file.") -class AyncUpdateRetryException(Exception): +class AsyncUpdateRetryException(Exception): """ Exception raised when async_update_pools_from_contracts fails and needs to be retried. """ diff --git a/fastlane_bot/events/managers/manager.py b/fastlane_bot/events/managers/manager.py index 8a6460b72..0f1b2bdaa 100644 --- a/fastlane_bot/events/managers/manager.py +++ b/fastlane_bot/events/managers/manager.py @@ -87,7 +87,7 @@ def handle_pair_created(self, event: Dict[str, Any]): self.fee_pairs.update(fee_pairs) def update_from_pool_info( - self, pool_info: Optional[Dict[str, Any]] = None, current_block: int = None + self, pool_info: Optional[Dict[str, Any]] = None, current_block: int = None ) -> Dict[str, Any]: """ Update the pool info. @@ -101,8 +101,8 @@ def update_from_pool_info( """ if "last_updated_block" in pool_info: if ( - type(pool_info["last_updated_block"]) == int - and pool_info["last_updated_block"] == current_block + type(pool_info["last_updated_block"]) == int + and pool_info["last_updated_block"] == current_block ): return pool_info else: @@ -150,11 +150,11 @@ def update_from_pool_info( return pool_info def update_from_contract( - self, - address: str = None, - contract: Optional[Contract] = None, - pool_info: Optional[Dict[str, Any]] = None, - block_number: int = None, + self, + address: str = None, + contract: Optional[Contract] = None, + pool_info: Optional[Dict[str, Any]] = None, + block_number: int = None, ) -> Dict[str, Any]: """ Update the state from the contract (instead of events). @@ -212,13 +212,13 @@ def update_from_contract( return pool_info def update( - self, - event: Dict[str, Any] = None, - address: str = None, - token_address: bool = False, - pool_info: Dict[str, Any] = None, - contract: Contract = None, - block_number: int = None, + self, + event: Dict[str, Any] = None, + address: str = None, + token_address: bool = False, + pool_info: Dict[str, Any] = None, + contract: Contract = None, + block_number: int = None, ) -> None: """ Update the state. @@ -286,8 +286,8 @@ def update( time.sleep(random.random()) def handle_pair_trading_fee_updated( - self, - event: Dict[str, Any] = None, + self, + event: Dict[str, Any] = None, ): """ Handle the pair trading fee updated event by updating the fee pairs and pool info for the given pair. @@ -305,20 +305,20 @@ def handle_pair_trading_fee_updated( for idx, pool in enumerate(self.pool_data): if ( - pool["tkn0_address"] == tkn0_address - and pool["tkn1_address"] == tkn1_address - and pool["exchange_name"] == "carbon_v1" + pool["tkn0_address"] == tkn0_address + and pool["tkn1_address"] == tkn1_address + and pool["exchange_name"] == "carbon_v1" ): self._handle_pair_trading_fee_updated(fee, pool, idx) elif ( - pool["tkn0_address"] == tkn1_address - and pool["tkn1_address"] == tkn0_address - and pool["exchange_name"] == "carbon_v1" + pool["tkn0_address"] == tkn1_address + and pool["tkn1_address"] == tkn0_address + and pool["exchange_name"] == "carbon_v1" ): self._handle_pair_trading_fee_updated(fee, pool, idx) def _handle_pair_trading_fee_updated( - self, fee: int, pool: Dict[str, Any], idx: int + self, fee: int, pool: Dict[str, Any], idx: int ): """ Handle the pair trading fee updated event by updating the fee pairs and pool info for the given pair. @@ -360,3 +360,22 @@ def handle_trading_fee_updated(self): ] pool["fee_float"] = pool["fee"] / 1e6 pool["descr"] = self.pool_descr_from_info(pool) + + def update_remaining_pools(self): + remaining_pools = [] + all_events = [pool[2] for pool in self.pools_to_add_from_contracts] + for event in all_events: + addr = self.web3.to_checksum_address(event["address"]) + ex_name = self.exchange_name_from_event(event) + if not ex_name: + self.cfg.logger.warning("[run_async_update_with_retries] ex_name not found from event") + continue + + key, key_value = self.get_key_and_value(event, addr, ex_name) + pool_info = self.get_pool_info(key, key_value, ex_name) + + if not pool_info: + remaining_pools.append((addr, ex_name, event, key, key_value)) + + random.shuffle(remaining_pools) + self.pools_to_add_from_contracts = remaining_pools diff --git a/main.py b/main.py index 43fbb076b..01fd7e9d0 100644 --- a/main.py +++ b/main.py @@ -5,7 +5,8 @@ (c) Copyright Bprotocol foundation 2023. Licensed under MIT """ -from fastlane_bot.events.exceptions import ReadOnlyException + +from fastlane_bot.events.exceptions import ReadOnlyException, AsyncUpdateRetryException from fastlane_bot.events.version_utils import check_version_requirements from fastlane_bot.tools.cpc import T @@ -26,7 +27,7 @@ async_handle_initial_iteration, ) from fastlane_bot.events.async_event_update_utils import ( - async_update_pools_from_contracts, run_async_update_with_retries, + async_update_pools_from_contracts, ) from fastlane_bot.events.managers.manager import Manager from fastlane_bot.events.multicall_utils import multicall_every_iteration @@ -673,6 +674,7 @@ def run( current_block=current_block, logging_path=logging_path, ) + mgr.pools_to_add_from_contracts = [] # Increment the loop index loop_idx += 1 @@ -854,6 +856,27 @@ def run( break +def run_async_update_with_retries(mgr, current_block, logging_path, max_retries=5): + failed_async_calls = 0 + + while failed_async_calls < max_retries: + try: + async_update_pools_from_contracts(mgr, current_block, logging_path) + return # Successful execution + except AsyncUpdateRetryException as e: + failed_async_calls += 1 + mgr.cfg.logger.error(f"Attempt {failed_async_calls} failed: {e}") + mgr.update_remaining_pools() + + # Handling failure after retries + mgr.cfg.logger.error( + f"[main run.py] async_update_pools_from_contracts failed after " + f"{len(mgr.pools_to_add_from_contracts)} attempts. List of failed pools: {mgr.pools_to_add_from_contracts}" + ) + + raise AsyncUpdateRetryException("[main.py] async_update_pools_from_contracts failed after maximum retries.") + + if __name__ == "__main__": main() diff --git a/resources/NBTest/NBTest_068_exceptions.py b/resources/NBTest/NBTest_068_exceptions.py index 403014659..85b5c8906 100644 --- a/resources/NBTest/NBTest_068_exceptions.py +++ b/resources/NBTest/NBTest_068_exceptions.py @@ -1,6 +1,6 @@ import pytest -from fastlane_bot.events.exceptions import AyncUpdateRetryException +from fastlane_bot.events.exceptions import AsyncUpdateRetryException @pytest.mark.parametrize( @@ -13,7 +13,7 @@ ) def test_aync_update_retry_exception_with_message(message, id): # Act - exception = AyncUpdateRetryException(message) + exception = AsyncUpdateRetryException(message) # Assert assert str(exception) == message, f"Test case {id} failed: The exception message does not match the expected message." @@ -27,7 +27,7 @@ def test_aync_update_retry_exception_with_message(message, id): ) def test_aync_update_retry_exception_with_empty_message(message, id): # Act - exception = AyncUpdateRetryException(message) + exception = AsyncUpdateRetryException(message) # Assert assert str(exception) == message, f"Test case {id} failed: The exception message should be empty." @@ -43,5 +43,5 @@ def test_aync_update_retry_exception_with_empty_message(message, id): ) def test_aync_update_retry_exception_raises(message, id): # Act & Assert - with pytest.raises(AyncUpdateRetryException, match=message): - raise AyncUpdateRetryException(message) + with pytest.raises(AsyncUpdateRetryException, match=message): + raise AsyncUpdateRetryException(message) diff --git a/resources/NBTest/NBTest_908_run_async_update_with_retries.py b/resources/NBTest/NBTest_908_run_async_update_with_retries.py index 4ef7a2ee9..046b8913b 100644 --- a/resources/NBTest/NBTest_908_run_async_update_with_retries.py +++ b/resources/NBTest/NBTest_908_run_async_update_with_retries.py @@ -8,7 +8,7 @@ update_remaining_pools, run_async_update_with_retries, ) # replace with your actual module name -from fastlane_bot.events.exceptions import AyncUpdateRetryException +from fastlane_bot.events.exceptions import AsyncUpdateRetryException @pytest.fixture @@ -93,6 +93,21 @@ def test_successful_execution_first_try(mock_get_new_pool_data, mocker): mgr.cfg.logger.error.assert_not_called() +@patch("fastlane_bot.events.async_event_update_utils.get_new_pool_data") +def test_pools_to_add_from_contracts_is_cleared_upon_success(mock_get_new_pool_data, mocker): + mgr = setup_mock_mgr() + mocker.patch("time.sleep") # To avoid actual sleep calls + current_block = max( + int(pool[2]["blockNumber"]) + for pool in mgr.pools_to_add_from_contracts + if "blockNumber" in pool[2] + ) + mock_get_new_pool_data.return_value = mgr.pool_data + run_async_update_with_retries(mgr, current_block=current_block, logging_path="") + + assert mgr.pools_to_add_from_contracts == [], "mgr.pools_to_add_from_contracts is not reset to [] upon success" + + @patch("fastlane_bot.events.async_event_update_utils.async_update_pools_from_contracts") def test_successful_execution_after_retries( mock_async_update_pools_from_contracts, mocker @@ -100,7 +115,7 @@ def test_successful_execution_after_retries( mgr = setup_mock_mgr() mgr.get_key_and_value.side_effect = ("key1", "value1"), ("key2", "value2") mock_async_update_pools_from_contracts.side_effect = [ - AyncUpdateRetryException, + AsyncUpdateRetryException, None, ] mocker.patch("time.sleep") # To avoid actual sleep calls @@ -115,7 +130,7 @@ def test_failure_after_max_retries( mock_async_update_pools_from_contracts, mock_get_new_pool_data, mocker ): mgr = setup_mock_mgr() - mock_async_update_pools_from_contracts.side_effect = AyncUpdateRetryException + mock_async_update_pools_from_contracts.side_effect = AsyncUpdateRetryException mocker.patch("time.sleep") # To avoid actual sleep calls current_block = max( int(pool[2]["blockNumber"])