Skip to content

Commit

Permalink
Merge pull request #342 from bancorprotocol/341-excessive-main-loop-r…
Browse files Browse the repository at this point in the history
…untime

bugfix for excessive main loop runtime
  • Loading branch information
mikewcasale authored Jan 31, 2024
2 parents 0539ab2 + 147a7d1 commit 7f6d2aa
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 84 deletions.
10 changes: 5 additions & 5 deletions NBTest_068_exceptions.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Happy path test with various realistic test values
import pytest

from fastlane_bot.events.exceptions import AyncUpdateRetryException
from fastlane_bot.events.exceptions import AsyncUpdateRetryException


@pytest.mark.parametrize(
Expand All @@ -14,7 +14,7 @@
)
def test_aync_update_retry_exception_with_message(message, id):
# Act
exception = AyncUpdateRetryException(message)
exception = AsyncUpdateRetryException(message)

# Assert
assert str(exception) == message, f"Test case {id} failed: The exception message does not match the expected message."
Expand All @@ -28,7 +28,7 @@ def test_aync_update_retry_exception_with_message(message, id):
)
def test_aync_update_retry_exception_with_empty_message(message, id):
# Act
exception = AyncUpdateRetryException(message)
exception = AsyncUpdateRetryException(message)

# Assert
assert str(exception) == message, f"Test case {id} failed: The exception message should be empty."
Expand All @@ -44,5 +44,5 @@ def test_aync_update_retry_exception_with_empty_message(message, id):
)
def test_aync_update_retry_exception_raises(message, id):
# Act & Assert
with pytest.raises(AyncUpdateRetryException, match=message):
raise AyncUpdateRetryException(message)
with pytest.raises(AsyncUpdateRetryException, match=message):
raise AsyncUpdateRetryException(message)
44 changes: 0 additions & 44 deletions fastlane_bot/events/async_event_update_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import os
import time
from glob import glob
from random import shuffle
from typing import Any, List, Dict, Tuple, Type, Callable

import nest_asyncio
Expand All @@ -14,7 +13,6 @@

from fastlane_bot.data.abi import ERC20_ABI
from fastlane_bot.events.async_utils import get_contract_chunks
from fastlane_bot.events.exceptions import AyncUpdateRetryException
from fastlane_bot.events.utils import update_pools_from_events

nest_asyncio.apply()
Expand Down Expand Up @@ -544,45 +542,3 @@ def async_update_pools_from_contracts(mgr: Any, current_block: int, logging_path
mgr.cfg.logger.info(
f"Async Updating pools from contracts took {(time.time() - start_time):0.4f} seconds"
)


def update_remaining_pools(mgr):
remaining_pools = []
all_events = [pool[2] for pool in mgr.pools_to_add_from_contracts]
for event in all_events:
addr = mgr.web3.to_checksum_address(event["address"])
ex_name = mgr.exchange_name_from_event(event)
if not ex_name:
mgr.cfg.logger.warning("[run_async_update_with_retries] ex_name not found from event")
continue

key, key_value = mgr.get_key_and_value(event, addr, ex_name)
pool_info = mgr.get_pool_info(key, key_value, ex_name)

if not pool_info:
remaining_pools.append((addr, ex_name, event, key, key_value))

shuffle(remaining_pools) # shuffle to avoid repeated immediate failure of the same pool
return remaining_pools


def run_async_update_with_retries(mgr, current_block, logging_path, retry_interval=1, max_retries=5):
failed_async_calls = 0

while failed_async_calls < max_retries:
try:
async_update_pools_from_contracts(mgr, current_block, logging_path)
return # Successful execution
except AyncUpdateRetryException as e:
failed_async_calls += 1
mgr.pools_to_add_from_contracts = update_remaining_pools(mgr)
mgr.cfg.logger.error(f"Attempt {failed_async_calls} failed: {e}")

# Handling failure after retries
mgr.cfg.logger.error(
f"[main run.py] async_update_pools_from_contracts failed after {len(mgr.pools_to_add_from_contracts)} attempts. List of failed pools: {mgr.pools_to_add_from_contracts}"
)
mgr.pools_to_add_from_contracts = []
raise Exception("[main run.py] async_update_pools_from_contracts failed after maximum retries.")


2 changes: 1 addition & 1 deletion fastlane_bot/events/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ def __str__(self):
f"create this file.")


class AyncUpdateRetryException(Exception):
class AsyncUpdateRetryException(Exception):
"""
Exception raised when async_update_pools_from_contracts fails and needs to be retried.
"""
Expand Down
67 changes: 43 additions & 24 deletions fastlane_bot/events/managers/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def handle_pair_created(self, event: Dict[str, Any]):
self.fee_pairs.update(fee_pairs)

def update_from_pool_info(
self, pool_info: Optional[Dict[str, Any]] = None, current_block: int = None
self, pool_info: Optional[Dict[str, Any]] = None, current_block: int = None
) -> Dict[str, Any]:
"""
Update the pool info.
Expand All @@ -101,8 +101,8 @@ def update_from_pool_info(
"""
if "last_updated_block" in pool_info:
if (
type(pool_info["last_updated_block"]) == int
and pool_info["last_updated_block"] == current_block
type(pool_info["last_updated_block"]) == int
and pool_info["last_updated_block"] == current_block
):
return pool_info
else:
Expand Down Expand Up @@ -150,11 +150,11 @@ def update_from_pool_info(
return pool_info

def update_from_contract(
self,
address: str = None,
contract: Optional[Contract] = None,
pool_info: Optional[Dict[str, Any]] = None,
block_number: int = None,
self,
address: str = None,
contract: Optional[Contract] = None,
pool_info: Optional[Dict[str, Any]] = None,
block_number: int = None,
) -> Dict[str, Any]:
"""
Update the state from the contract (instead of events).
Expand Down Expand Up @@ -212,13 +212,13 @@ def update_from_contract(
return pool_info

def update(
self,
event: Dict[str, Any] = None,
address: str = None,
token_address: bool = False,
pool_info: Dict[str, Any] = None,
contract: Contract = None,
block_number: int = None,
self,
event: Dict[str, Any] = None,
address: str = None,
token_address: bool = False,
pool_info: Dict[str, Any] = None,
contract: Contract = None,
block_number: int = None,
) -> None:
"""
Update the state.
Expand Down Expand Up @@ -286,8 +286,8 @@ def update(
time.sleep(random.random())

def handle_pair_trading_fee_updated(
self,
event: Dict[str, Any] = None,
self,
event: Dict[str, Any] = None,
):
"""
Handle the pair trading fee updated event by updating the fee pairs and pool info for the given pair.
Expand All @@ -305,20 +305,20 @@ def handle_pair_trading_fee_updated(

for idx, pool in enumerate(self.pool_data):
if (
pool["tkn0_address"] == tkn0_address
and pool["tkn1_address"] == tkn1_address
and pool["exchange_name"] == "carbon_v1"
pool["tkn0_address"] == tkn0_address
and pool["tkn1_address"] == tkn1_address
and pool["exchange_name"] == "carbon_v1"
):
self._handle_pair_trading_fee_updated(fee, pool, idx)
elif (
pool["tkn0_address"] == tkn1_address
and pool["tkn1_address"] == tkn0_address
and pool["exchange_name"] == "carbon_v1"
pool["tkn0_address"] == tkn1_address
and pool["tkn1_address"] == tkn0_address
and pool["exchange_name"] == "carbon_v1"
):
self._handle_pair_trading_fee_updated(fee, pool, idx)

def _handle_pair_trading_fee_updated(
self, fee: int, pool: Dict[str, Any], idx: int
self, fee: int, pool: Dict[str, Any], idx: int
):
"""
Handle the pair trading fee updated event by updating the fee pairs and pool info for the given pair.
Expand Down Expand Up @@ -360,3 +360,22 @@ def handle_trading_fee_updated(self):
]
pool["fee_float"] = pool["fee"] / 1e6
pool["descr"] = self.pool_descr_from_info(pool)

def update_remaining_pools(self):
remaining_pools = []
all_events = [pool[2] for pool in self.pools_to_add_from_contracts]
for event in all_events:
addr = self.web3.to_checksum_address(event["address"])
ex_name = self.exchange_name_from_event(event)
if not ex_name:
self.cfg.logger.warning("[run_async_update_with_retries] ex_name not found from event")
continue

key, key_value = self.get_key_and_value(event, addr, ex_name)
pool_info = self.get_pool_info(key, key_value, ex_name)

if not pool_info:
remaining_pools.append((addr, ex_name, event, key, key_value))

random.shuffle(remaining_pools)
self.pools_to_add_from_contracts = remaining_pools
27 changes: 25 additions & 2 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
(c) Copyright Bprotocol foundation 2023.
Licensed under MIT
"""
from fastlane_bot.events.exceptions import ReadOnlyException

from fastlane_bot.events.exceptions import ReadOnlyException, AsyncUpdateRetryException
from fastlane_bot.events.version_utils import check_version_requirements
from fastlane_bot.tools.cpc import T

Expand All @@ -26,7 +27,7 @@
async_handle_initial_iteration,
)
from fastlane_bot.events.async_event_update_utils import (
async_update_pools_from_contracts, run_async_update_with_retries,
async_update_pools_from_contracts,
)
from fastlane_bot.events.managers.manager import Manager
from fastlane_bot.events.multicall_utils import multicall_every_iteration
Expand Down Expand Up @@ -673,6 +674,7 @@ def run(
current_block=current_block,
logging_path=logging_path,
)
mgr.pools_to_add_from_contracts = []

# Increment the loop index
loop_idx += 1
Expand Down Expand Up @@ -854,6 +856,27 @@ def run(
break


def run_async_update_with_retries(mgr, current_block, logging_path, max_retries=5):
failed_async_calls = 0

while failed_async_calls < max_retries:
try:
async_update_pools_from_contracts(mgr, current_block, logging_path)
return # Successful execution
except AsyncUpdateRetryException as e:
failed_async_calls += 1
mgr.cfg.logger.error(f"Attempt {failed_async_calls} failed: {e}")
mgr.update_remaining_pools()

# Handling failure after retries
mgr.cfg.logger.error(
f"[main run.py] async_update_pools_from_contracts failed after "
f"{len(mgr.pools_to_add_from_contracts)} attempts. List of failed pools: {mgr.pools_to_add_from_contracts}"
)

raise AsyncUpdateRetryException("[main.py] async_update_pools_from_contracts failed after maximum retries.")


if __name__ == "__main__":
main()

Expand Down
10 changes: 5 additions & 5 deletions resources/NBTest/NBTest_068_exceptions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pytest

from fastlane_bot.events.exceptions import AyncUpdateRetryException
from fastlane_bot.events.exceptions import AsyncUpdateRetryException


@pytest.mark.parametrize(
Expand All @@ -13,7 +13,7 @@
)
def test_aync_update_retry_exception_with_message(message, id):
# Act
exception = AyncUpdateRetryException(message)
exception = AsyncUpdateRetryException(message)

# Assert
assert str(exception) == message, f"Test case {id} failed: The exception message does not match the expected message."
Expand All @@ -27,7 +27,7 @@ def test_aync_update_retry_exception_with_message(message, id):
)
def test_aync_update_retry_exception_with_empty_message(message, id):
# Act
exception = AyncUpdateRetryException(message)
exception = AsyncUpdateRetryException(message)

# Assert
assert str(exception) == message, f"Test case {id} failed: The exception message should be empty."
Expand All @@ -43,5 +43,5 @@ def test_aync_update_retry_exception_with_empty_message(message, id):
)
def test_aync_update_retry_exception_raises(message, id):
# Act & Assert
with pytest.raises(AyncUpdateRetryException, match=message):
raise AyncUpdateRetryException(message)
with pytest.raises(AsyncUpdateRetryException, match=message):
raise AsyncUpdateRetryException(message)
21 changes: 18 additions & 3 deletions resources/NBTest/NBTest_908_run_async_update_with_retries.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
update_remaining_pools,
run_async_update_with_retries,
) # replace with your actual module name
from fastlane_bot.events.exceptions import AyncUpdateRetryException
from fastlane_bot.events.exceptions import AsyncUpdateRetryException


@pytest.fixture
Expand Down Expand Up @@ -93,14 +93,29 @@ def test_successful_execution_first_try(mock_get_new_pool_data, mocker):
mgr.cfg.logger.error.assert_not_called()


@patch("fastlane_bot.events.async_event_update_utils.get_new_pool_data")
def test_pools_to_add_from_contracts_is_cleared_upon_success(mock_get_new_pool_data, mocker):
mgr = setup_mock_mgr()
mocker.patch("time.sleep") # To avoid actual sleep calls
current_block = max(
int(pool[2]["blockNumber"])
for pool in mgr.pools_to_add_from_contracts
if "blockNumber" in pool[2]
)
mock_get_new_pool_data.return_value = mgr.pool_data
run_async_update_with_retries(mgr, current_block=current_block, logging_path="")

assert mgr.pools_to_add_from_contracts == [], "mgr.pools_to_add_from_contracts is not reset to [] upon success"


@patch("fastlane_bot.events.async_event_update_utils.async_update_pools_from_contracts")
def test_successful_execution_after_retries(
mock_async_update_pools_from_contracts, mocker
):
mgr = setup_mock_mgr()
mgr.get_key_and_value.side_effect = ("key1", "value1"), ("key2", "value2")
mock_async_update_pools_from_contracts.side_effect = [
AyncUpdateRetryException,
AsyncUpdateRetryException,
None,
]
mocker.patch("time.sleep") # To avoid actual sleep calls
Expand All @@ -115,7 +130,7 @@ def test_failure_after_max_retries(
mock_async_update_pools_from_contracts, mock_get_new_pool_data, mocker
):
mgr = setup_mock_mgr()
mock_async_update_pools_from_contracts.side_effect = AyncUpdateRetryException
mock_async_update_pools_from_contracts.side_effect = AsyncUpdateRetryException
mocker.patch("time.sleep") # To avoid actual sleep calls
current_block = max(
int(pool[2]["blockNumber"])
Expand Down

0 comments on commit 7f6d2aa

Please sign in to comment.