Skip to content

Commit

Permalink
Merge pull request #333 from bancorprotocol/331-trading-on-aerodrome_…
Browse files Browse the repository at this point in the history
…v2-is-not-properly-supported

331 trading on aerodrome v2 is not properly supported
  • Loading branch information
mikewcasale authored Jan 29, 2024
2 parents 61a6147 + 6a8190e commit 1b05191
Show file tree
Hide file tree
Showing 9 changed files with 413 additions and 122 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/run-pytest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
- name: Install Dependencies
run: |
python -m pip install --upgrade pip
pip install jupytext pytest
pip install jupytext pytest pytest-mock
pip install -r requirements.txt --force-reinstall
- name: Run Tests
run: |
Expand Down
48 changes: 48 additions & 0 deletions NBTest_068_exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Happy path test with various realistic test values
import pytest

from fastlane_bot.events.exceptions import AyncUpdateRetryException


@pytest.mark.parametrize(
"message, id",
[
("Failed to update, retrying...", 'happy-1'),
("Update failed at step 3, retrying...", 'happy-2'),
("Temporary network issue, attempt retry...", 'happy-3'),
],
)
def test_aync_update_retry_exception_with_message(message, id):
# Act
exception = AyncUpdateRetryException(message)

# Assert
assert str(exception) == message, f"Test case {id} failed: The exception message does not match the expected message."

# Edge case test with empty string as message
@pytest.mark.parametrize(
"message, id",
[
("", 'edge-1'),
],
)
def test_aync_update_retry_exception_with_empty_message(message, id):
# Act
exception = AyncUpdateRetryException(message)

# Assert
assert str(exception) == message, f"Test case {id} failed: The exception message should be empty."

# Happy path case test which raises exceptions (should raise AyncUpdateRetryException)
@pytest.mark.parametrize(
"message, id",
[
('happy-1', 'happy-1'),
(None, 'happy-2'),
('3', 'happy-3'),
],
)
def test_aync_update_retry_exception_raises(message, id):
# Act & Assert
with pytest.raises(AyncUpdateRetryException, match=message):
raise AyncUpdateRetryException(message)
136 changes: 93 additions & 43 deletions fastlane_bot/events/async_event_update_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
import os
import time
from glob import glob
from random import shuffle
from typing import Any, List, Dict, Tuple, Type, Callable

import nest_asyncio
import numpy as np
import pandas as pd
from pandas import DataFrame
Expand All @@ -12,11 +14,12 @@

from fastlane_bot.data.abi import ERC20_ABI
from fastlane_bot.events.async_utils import get_contract_chunks
from fastlane_bot.events.exchanges import exchange_factory
from fastlane_bot.events.exceptions import AyncUpdateRetryException
from fastlane_bot.events.utils import update_pools_from_events
import nest_asyncio

nest_asyncio.apply()


async def get_missing_tkn(contract: AsyncContract, tkn: str) -> pd.DataFrame:
try:
symbol = await contract.functions.symbol().call()
Expand Down Expand Up @@ -58,7 +61,7 @@ async def main_get_missing_tkn(c: List[Dict[str, Any]]) -> pd.DataFrame:


async def get_token_and_fee(
exchange_name: str, ex: Any, address: str, contract: AsyncContract, event: Any
exchange_name: str, ex: Any, address: str, contract: AsyncContract, event: Any
) -> Tuple[str, str, str, str, str, int or None, str or None] or Tuple[
str, str, None, None, None, None, None
]:
Expand Down Expand Up @@ -132,22 +135,22 @@ async def main_get_tokens_and_fee(c: List[Dict[str, Any]]) -> pd.DataFrame:


def pair_name(
t0_symbol: str,
tkn0_address: str,
t1_symbol: str,
tkn1_address: str,
key_digits: int = 4,
t0_symbol: str,
tkn0_address: str,
t1_symbol: str,
tkn1_address: str,
key_digits: int = 4,
) -> str:
return f"{t0_symbol}-{tkn0_address[-key_digits:]}/{t1_symbol}-{tkn1_address[-key_digits:]}"


def get_pool_info(
pool: pd.Series,
mgr: Any,
current_block: int,
tkn0: Dict[str, Any],
tkn1: Dict[str, Any],
pool_data_keys: frozenset,
pool: pd.Series,
mgr: Any,
current_block: int,
tkn0: Dict[str, Any],
tkn1: Dict[str, Any],
pool_data_keys: frozenset,
) -> Dict[str, Any]:
fee_raw = eval(str(pool["fee"]))
pool_info = {
Expand Down Expand Up @@ -185,6 +188,7 @@ def get_pool_info(

return pool_info


def sanitize_token_symbol(token_symbol: str, token_address: str, read_only: bool) -> str:
"""
This function ensures token symbols are compatible with the bot's data structures.
Expand All @@ -207,7 +211,7 @@ def sanitize_token_symbol(token_symbol: str, token_address: str, read_only: bool


def add_token_info(
pool_info: Dict[str, Any], tkn0: Dict[str, Any], tkn1: Dict[str, Any], read_only: bool
pool_info: Dict[str, Any], tkn0: Dict[str, Any], tkn1: Dict[str, Any], read_only: bool
) -> Dict[str, Any]:
print(f"called add_token_info")
tkn0_symbol = tkn0["symbol"].replace("/", "_").replace("-", "_")
Expand All @@ -229,7 +233,7 @@ def add_token_info(


def add_missing_keys(
pool_info: Dict[str, Any], pool_data_keys: frozenset, keys: List[str]
pool_info: Dict[str, Any], pool_data_keys: frozenset, keys: List[str]
) -> Dict[str, Any]:
for key in pool_data_keys:
if key in pool_info:
Expand All @@ -241,11 +245,11 @@ def add_missing_keys(


def get_new_pool_data(
current_block: int,
keys: List[str],
mgr: Any,
tokens_and_fee_df: pd.DataFrame,
tokens_df: pd.DataFrame,
current_block: int,
keys: List[str],
mgr: Any,
tokens_and_fee_df: pd.DataFrame,
tokens_df: pd.DataFrame,
) -> List[Dict]:
# Convert tokens_df to a dictionary keyed by address for faster access
tokens_dict = tokens_df.set_index("address").to_dict(orient="index")
Expand Down Expand Up @@ -274,16 +278,16 @@ def get_new_pool_data(


def get_token_contracts(
mgr: Any, tokens_and_fee_df: pd.DataFrame
mgr: Any, tokens_and_fee_df: pd.DataFrame
) -> Tuple[
List[Dict[str, Type[AsyncContract] or AsyncContract or Any] or None or Any],
DataFrame,
]:
# for each token in the pools, check whether we have the token info in the tokens.csv static data, and ifr not,
# add it
tokens = (
tokens_and_fee_df["tkn0_address"].tolist()
+ tokens_and_fee_df["tkn1_address"].tolist()
tokens_and_fee_df["tkn0_address"].tolist()
+ tokens_and_fee_df["tkn1_address"].tolist()
)
tokens = list(set(tokens))
tokens_df = pd.read_csv(
Expand All @@ -307,14 +311,14 @@ def get_token_contracts(


def process_contract_chunks(
base_filename: str,
chunks: List[Any],
dirname: str,
filename: str,
subset: List[str],
func: Callable,
df_combined: pd.DataFrame = None,
read_only: bool = False,
base_filename: str,
chunks: List[Any],
dirname: str,
filename: str,
subset: List[str],
func: Callable,
df_combined: pd.DataFrame = None,
read_only: bool = False,
) -> pd.DataFrame:
lst = []
# write chunks to csv
Expand Down Expand Up @@ -361,13 +365,13 @@ def get_pool_contracts(mgr: Any) -> List[Dict[str, Any]]:
contracts = []
for add, en, event, key, value in mgr.pools_to_add_from_contracts:
exchange_name = mgr.exchange_name_from_event(event)
ex = exchange_factory.get_exchange(key=exchange_name, cfg=mgr.cfg, exchange_initialized=False)
ex = mgr.exchanges[exchange_name]
abi = ex.get_abi()
address = event["address"]
contracts.append(
{
"exchange_name": exchange_name,
"ex": exchange_factory.get_exchange(key=exchange_name, cfg=mgr.cfg, exchange_initialized=False),
"ex": ex,
"address": address,
"contract": mgr.w3_async.eth.contract(address=address, abi=abi),
"event": event,
Expand Down Expand Up @@ -464,11 +468,11 @@ def async_update_pools_from_contracts(mgr: Any, current_block: int, logging_path
)

new_pool_data_df["descr"] = (
new_pool_data_df["exchange_name"]
+ " "
+ new_pool_data_df["pair_name"]
+ " "
+ new_pool_data_df["fee"].astype(str)
new_pool_data_df["exchange_name"]
+ " "
+ new_pool_data_df["pair_name"]
+ " "
+ new_pool_data_df["fee"].astype(str)
)

# Initialize web3
Expand Down Expand Up @@ -519,13 +523,17 @@ def async_update_pools_from_contracts(mgr: Any, current_block: int, logging_path
new_num_pools_in_data = len(mgr.pool_data)
new_pools_added = new_num_pools_in_data - orig_num_pools_in_data

mgr.cfg.logger.debug(f"[async_event_update_utils.async_update_pools_from_contracts] new_pools_added: {new_pools_added}")
mgr.cfg.logger.debug(f"[async_event_update_utils.async_update_pools_from_contracts] orig_num_pools_in_data: {orig_num_pools_in_data}")
mgr.cfg.logger.debug(f"[async_event_update_utils.async_update_pools_from_contracts] duplicate_new_pool_ct: {duplicate_new_pool_ct}")
mgr.cfg.logger.debug(
f"[async_event_update_utils.async_update_pools_from_contracts] new_pools_added: {new_pools_added}")
mgr.cfg.logger.debug(
f"[async_event_update_utils.async_update_pools_from_contracts] orig_num_pools_in_data: {orig_num_pools_in_data}")
mgr.cfg.logger.debug(
f"[async_event_update_utils.async_update_pools_from_contracts] duplicate_new_pool_ct: {duplicate_new_pool_ct}")
mgr.cfg.logger.debug(
f"[async_event_update_utils.async_update_pools_from_contracts] pools_to_add_from_contracts: {len(mgr.pools_to_add_from_contracts)}"
)
mgr.cfg.logger.debug(f"[async_event_update_utils.async_update_pools_from_contracts] final pool_data ct: {len(mgr.pool_data)}")
mgr.cfg.logger.debug(
f"[async_event_update_utils.async_update_pools_from_contracts] final pool_data ct: {len(mgr.pool_data)}")
mgr.cfg.logger.debug(
f"[async_event_update_utils.async_update_pools_from_contracts] compare {new_pools_added + duplicate_new_pool_ct},{len(mgr.pools_to_add_from_contracts)}"
)
Expand All @@ -536,3 +544,45 @@ def async_update_pools_from_contracts(mgr: Any, current_block: int, logging_path
mgr.cfg.logger.info(
f"Async Updating pools from contracts took {(time.time() - start_time):0.4f} seconds"
)


def update_remaining_pools(mgr):
remaining_pools = []
all_events = [pool[2] for pool in mgr.pools_to_add_from_contracts]
for event in all_events:
addr = mgr.web3.to_checksum_address(event["address"])
ex_name = mgr.exchange_name_from_event(event)
if not ex_name:
mgr.cfg.logger.warning("[run_async_update_with_retries] ex_name not found from event")
continue

key, key_value = mgr.get_key_and_value(event, addr, ex_name)
pool_info = mgr.get_pool_info(key, key_value, ex_name)

if not pool_info:
remaining_pools.append((addr, ex_name, event, key, key_value))

shuffle(remaining_pools) # shuffle to avoid repeated immediate failure of the same pool
return remaining_pools


def run_async_update_with_retries(mgr, current_block, logging_path, retry_interval=1, max_retries=5):
failed_async_calls = 0

while failed_async_calls < max_retries:
try:
async_update_pools_from_contracts(mgr, current_block, logging_path)
return # Successful execution
except AyncUpdateRetryException as e:
failed_async_calls += 1
mgr.pools_to_add_from_contracts = update_remaining_pools(mgr)
mgr.cfg.logger.error(f"Attempt {failed_async_calls} failed: {e}")

# Handling failure after retries
mgr.cfg.logger.error(
f"[main run.py] async_update_pools_from_contracts failed after {len(mgr.pools_to_add_from_contracts)} attempts. List of failed pools: {mgr.pools_to_add_from_contracts}"
)
mgr.pools_to_add_from_contracts = []
raise Exception("[main run.py] async_update_pools_from_contracts failed after maximum retries.")


6 changes: 2 additions & 4 deletions fastlane_bot/events/async_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ def get_contract_chunks(contracts: List[Dict[str, Any]]) -> List[List[Dict[str,

def get_abis_and_exchanges(mgr: Any) -> Dict[str, Any]:
abis = {}
exchanges = {}
for exchange in mgr.exchanges:
exchanges[exchange] = exchange_factory.get_exchange(key=exchange, cfg=mgr.cfg, exchange_initialized=False)
abis[exchange] = exchanges[exchange].get_abi()
for exchange_name, exchange in mgr.exchanges.items():
abis[exchange_name] = exchange.get_abi()
return abis
9 changes: 8 additions & 1 deletion fastlane_bot/events/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,11 @@ def __init__(self, filepath):

def __str__(self):
return (f"tokens.csv does not exist at {self.filepath}. Please run the bot without the `read_only` flag to "
f"create this file.")
f"create this file.")


class AyncUpdateRetryException(Exception):
"""
Exception raised when async_update_pools_from_contracts fails and needs to be retried.
"""
pass
10 changes: 5 additions & 5 deletions fastlane_bot/events/pools/solidly_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def update_from_event(
data["router"] = self.router_address
return data

async def update_from_contract(
def update_from_contract(
self,
contract: Contract,
tenderly_fork_id: str = None,
Expand All @@ -85,15 +85,15 @@ async def update_from_contract(
"""
See base class.
"""
reserve_balance = await contract.caller.getReserves()
reserve_balance = contract.caller.getReserves()

try:
factory_address = await contract.caller.factory()
factory_address = contract.caller.factory()
except Exception:
# Velocimeter does not expose factory function - call voter to get an address that is the same for all Velcoimeter pools
factory_address = await contract.caller.voter()
factory_address = contract.caller.voter()

self.is_stable = await contract.caller.stable()
self.is_stable = contract.caller.stable()
params = {

"tkn0_balance": reserve_balance[0],
Expand Down
Loading

0 comments on commit 1b05191

Please sign in to comment.