diff --git a/fastlane_bot/tests/test_899_CustomMulticall.py b/fastlane_bot/tests/test_899_CustomMulticall.py new file mode 100644 index 000000000..da5628765 --- /dev/null +++ b/fastlane_bot/tests/test_899_CustomMulticall.py @@ -0,0 +1,207 @@ +# ------------------------------------------------------------ +# Auto generated test file `test_899_CustomMulticall.py` +# ------------------------------------------------------------ +# source file = NBTest_899_CustomMulticall.py +# test id = 899 +# test comment = CustomMulticall +# ------------------------------------------------------------ + + +from fastlane_bot.data.abi import CARBON_CONTROLLER_ABI +import os +from unittest.mock import Mock, patch + +from dotenv import load_dotenv +load_dotenv() +import time +from fastlane_bot.config.multicaller import MultiCaller +from fastlane_bot.config.multicaller import ContractMethodWrapper + +from web3 import Web3 + +print("{0.__name__} v{0.__VERSION__} ({0.__DATE__})".format(MultiCaller)) +print("{0.__name__} v{0.__VERSION__} ({0.__DATE__})".format(ContractMethodWrapper)) + + +from fastlane_bot.testing import * + +#plt.style.use('seaborn-dark') +plt.rcParams['figure.figsize'] = [12,6] +from fastlane_bot import __VERSION__ +require("3.0", __VERSION__) + +WEB3_ALCHEMY_PROJECT_ID = os.environ.get("WEB3_ALCHEMY_PROJECT_ID") +RPC_URL = f"https://eth-mainnet.alchemyapi.io/v2/{WEB3_ALCHEMY_PROJECT_ID}" +CARBON_CONTROLLER_ADDRESS = "0xC537e898CD774e2dCBa3B14Ea6f34C93d5eA45e1" + +class MockWeb3: + class HTTPProvider: + pass + + class eth: + @staticmethod + def contract(address, abi): + return Mock() + + @staticmethod + def to_checksum_address(address): + return address + + @staticmethod + def to_checksum_address(address): + return address + +class MockContract: + + def __init__(self, address, abi): + self.address = address + self.abi = abi + + def functions(self): + return Mock() + + def encodeABI(self): + return Mock() + + def address(self): + return self.address + + def abi(self): + return self.abi + + def to_checksum_address(self, address): + return address + + # handle encoded data + def encode_abi(self): + return Mock() + + def decode_abi(self): + return Mock() + +start_time = time.time() + +w3 = Web3(Web3.HTTPProvider(RPC_URL, request_kwargs={'timeout': 60})) +contract = w3.eth.contract(address=CARBON_CONTROLLER_ADDRESS, abi=CARBON_CONTROLLER_ABI) + +mainnet_pairs = contract.caller.pairs() + +if len(mainnet_pairs) > 10: + mainnet_pairs = mainnet_pairs[:10] + +pair_fees_without_multicall = [contract.caller.pairTradingFeePPM(pair[0], pair[1]) for pair in mainnet_pairs] + +pair_fees_time_without_multicall = time.time() - start_time + +start_time = time.time() + +strats_by_pair_without_multicall = [contract.caller.strategiesByPair(pair[0], pair[1], 0, 5000) for pair in mainnet_pairs] + +strats_by_pair_time_without_multicall = time.time() - start_time + + + +# ------------------------------------------------------------ +# Test 899 +# File test_899_CustomMulticall.py +# Segment test_multicaller_init +# ------------------------------------------------------------ +def test_test_multicaller_init(): +# ------------------------------------------------------------ + + # + + + original_method = Mock() + multicaller = Mock() + + wrapper = ContractMethodWrapper(original_method, multicaller) + + assert wrapper.original_method == original_method + assert wrapper.multicaller == multicaller + # - + + +# ------------------------------------------------------------ +# Test 899 +# File test_899_CustomMulticall.py +# Segment test_contract_method_wrapper_call +# ------------------------------------------------------------ +def test_test_contract_method_wrapper_call(): +# ------------------------------------------------------------ + + # + + original_method = Mock() + multicaller = Mock() + + wrapper = ContractMethodWrapper(original_method, multicaller) + + result = wrapper('arg1', kwarg1='kwarg1') + + original_method.assert_called_with('arg1', kwarg1='kwarg1') + multicaller.add_call.assert_called_with(result) + # - + + +# ------------------------------------------------------------ +# Test 899 +# File test_899_CustomMulticall.py +# Segment test_multi_caller_init +# ------------------------------------------------------------ +def test_test_multi_caller_init(): +# ------------------------------------------------------------ + + # + + contract = Mock() + web3 = MockWeb3() + + multicaller = MultiCaller(contract, web3=web3) + + assert multicaller.contract == contract + assert multicaller.block_identifier == 'latest' + assert multicaller._contract_calls == [] + # - + + +# ------------------------------------------------------------ +# Test 899 +# File test_899_CustomMulticall.py +# Segment test_multi_caller_add_call +# ------------------------------------------------------------ +def test_test_multi_caller_add_call(): +# ------------------------------------------------------------ + + # + + contract = Mock() + web3 = MockWeb3() + + multicaller = MultiCaller(contract, web3=web3) + fn = Mock() + + multicaller.add_call(fn, 'arg1', kwarg1='kwarg1') + + assert len(multicaller._contract_calls) == 1 + # - + + +# ------------------------------------------------------------ +# Test 899 +# File test_899_CustomMulticall.py +# Segment test_multi_caller_context_manager +# ------------------------------------------------------------ +def test_test_multi_caller_context_manager(): +# ------------------------------------------------------------ + + # + + contract = Mock() + web3 = MockWeb3() + multicaller = MultiCaller(contract, web3=web3) + + with patch.object(multicaller, 'multicall') as mock_multicall: + with multicaller: + multicaller.multicall() + pass + + mock_multicall.assert_called_once() + # - + + \ No newline at end of file diff --git a/fastlane_bot/tests/test_901_TestMultiTriangleModeSlow.py b/fastlane_bot/tests/test_901_TestMultiTriangleModeSlow.py new file mode 100644 index 000000000..a003d4fdf --- /dev/null +++ b/fastlane_bot/tests/test_901_TestMultiTriangleModeSlow.py @@ -0,0 +1,301 @@ +# ------------------------------------------------------------ +# Auto generated test file `test_901_TestMultiTriangleModeSlow.py` +# ------------------------------------------------------------ +# source file = NBTest_901_TestMultiTriangleModeSlow.py +# test id = 901 +# test comment = TestMultiTriangleModeSlow +# ------------------------------------------------------------ + + + +""" +This module contains the tests for the exchanges classes +""" +from fastlane_bot import Bot, Config +from fastlane_bot.bot import CarbonBot +from fastlane_bot.tools.cpc import ConstantProductCurve as CPC +from fastlane_bot.events.exchanges import UniswapV2, UniswapV3, CarbonV1, BancorV3 +from fastlane_bot.events.interface import QueryInterface +from fastlane_bot.events.managers.manager import Manager +from fastlane_bot.events.interface import QueryInterface +from joblib import Parallel, delayed +import math +import json +print("{0.__name__} v{0.__VERSION__} ({0.__DATE__})".format(CPC)) +print("{0.__name__} v{0.__VERSION__} ({0.__DATE__})".format(Bot)) +print("{0.__name__} v{0.__VERSION__} ({0.__DATE__})".format(UniswapV2)) +print("{0.__name__} v{0.__VERSION__} ({0.__DATE__})".format(UniswapV3)) +print("{0.__name__} v{0.__VERSION__} ({0.__DATE__})".format(CarbonV1)) +print("{0.__name__} v{0.__VERSION__} ({0.__DATE__})".format(BancorV3)) +from fastlane_bot.testing import * + +#plt.style.use('seaborn-dark') +plt.rcParams['figure.figsize'] = [12,6] +from fastlane_bot import __VERSION__ +require("3.0", __VERSION__) + + + +C = cfg = Config.new(config=Config.CONFIG_MAINNET) +cfg.DEFAULT_MIN_PROFIT_GAS_TOKEN = 0.00001 +assert (C.NETWORK == C.NETWORK_MAINNET) +assert (C.PROVIDER == C.PROVIDER_ALCHEMY) +setup_bot = CarbonBot(ConfigObj=C) +pools = None +with open('fastlane_bot/tests/_data/latest_pool_data_testing.json') as f: + pools = json.load(f) +pools = [pool for pool in pools] +pools[0] +static_pools = pools +state = pools +exchanges = list({ex['exchange_name'] for ex in state}) +db = QueryInterface(state=state, ConfigObj=C, exchanges=exchanges) +setup_bot.db = db + +static_pool_data_filename = "static_pool_data" + +static_pool_data = pd.read_csv(f"fastlane_bot/data/{static_pool_data_filename}.csv", low_memory=False) + +uniswap_v2_event_mappings = pd.read_csv("fastlane_bot/data/uniswap_v2_event_mappings.csv", low_memory=False) + +tokens = pd.read_csv("fastlane_bot/data/tokens.csv", low_memory=False) + +exchanges = "carbon_v1,bancor_v3,uniswap_v3,uniswap_v2,sushiswap_v2" + +exchanges = exchanges.split(",") + + +alchemy_max_block_fetch = 20 +static_pool_data["cid"] = [ + cfg.w3.keccak(text=f"{row['descr']}").hex() + for index, row in static_pool_data.iterrows() + ] +static_pool_data = [ + row for index, row in static_pool_data.iterrows() + if row["exchange_name"] in exchanges +] + +static_pool_data = pd.DataFrame(static_pool_data) +static_pool_data['exchange_name'].unique() +mgr = Manager( + web3=cfg.w3, + w3_async=cfg.w3_async, + cfg=cfg, + pool_data=static_pool_data.to_dict(orient="records"), + SUPPORTED_EXCHANGES=exchanges, + alchemy_max_block_fetch=alchemy_max_block_fetch, + uniswap_v2_event_mappings=uniswap_v2_event_mappings, + tokens=tokens.to_dict(orient="records"), +) + +start_time = time.time() +Parallel(n_jobs=-1, backend="threading")( + delayed(mgr.add_pool_to_exchange)(row) for row in mgr.pool_data +) +cfg.logger.info(f"Time taken to add initial pools: {time.time() - start_time}") + +mgr.deduplicate_pool_data() +cids = [pool["cid"] for pool in mgr.pool_data] +assert len(cids) == len(set(cids)), "duplicate cid's exist in the pool data" +def init_bot(mgr: Manager) -> CarbonBot: + """ + Initializes the bot. + + Parameters + ---------- + mgr : Manager + The manager object. + + Returns + ------- + CarbonBot + The bot object. + """ + mgr.cfg.logger.info("Initializing the bot...") + bot = CarbonBot(ConfigObj=mgr.cfg) + bot.db = db + bot.db.mgr = mgr + assert isinstance( + bot.db, QueryInterface + ), "QueryInterface not initialized correctly" + return bot +bot = init_bot(mgr) +bot.db.remove_unmapped_uniswap_v2_pools() +bot.db.remove_zero_liquidity_pools() +bot.db.remove_unsupported_exchanges() +tokens = bot.db.get_tokens() +ADDRDEC = {t.address: (t.address, int(t.decimals)) for t in tokens if not math.isnan(t.decimals)} +flashloan_tokens = bot.RUN_FLASHLOAN_TOKENS +CCm = bot.get_curves() +pools = db.get_pool_data_with_tokens() + +arb_mode = "multi_triangle" + + +# ------------------------------------------------------------ +# Test 901 +# File test_901_TestMultiTriangleModeSlow.py +# Segment Test_min_profit +# ------------------------------------------------------------ +def test_test_min_profit(): +# ------------------------------------------------------------ + + assert(cfg.DEFAULT_MIN_PROFIT_GAS_TOKEN <= 0.0001), f"[TestMultiTriangleMode], default_min_profit_gas_token must be <= 0.0001 for this Notebook to run, currently set to {cfg.DEFAULT_MIN_PROFIT_GAS_TOKEN}" + + # ### Test_arb_mode_class + + arb_finder = bot._get_arb_finder("multi_triangle") + assert arb_finder.__name__ == "ArbitrageFinderTriangleMulti", f"[TestMultiTriangleMode] Expected arb_finder class name name = FindArbitrageMultiPairwise, found {arb_finder.__name__}" + + +# ------------------------------------------------------------ +# Test 901 +# File test_901_TestMultiTriangleModeSlow.py +# Segment Test_combos +# ------------------------------------------------------------ +def test_test_combos(): +# ------------------------------------------------------------ + + arb_finder = bot._get_arb_finder("multi_triangle") + finder = arb_finder( + flashloan_tokens=flashloan_tokens, + CCm=CCm, + mode="bothin", + result=arb_finder.AO_TOKENS, + ConfigObj=bot.ConfigObj, + ) + combos = finder.get_combos(flashloan_tokens=flashloan_tokens, CCm=CCm, arb_mode="multi_triangle") + assert len(combos) >= 1225, f"[TestMultiTriangleMode] Using wrong dataset, expected at least 1225 combos, found {len(combos)}" + + # + + # print(len(combos)) + # for ex in exchanges: + # count = 0 + # for pool in CCm: + # if ex in pool.descr: + # count +=1 + # print(f"found {count} pools for {ex}") + # - + + # ### Test_find_arbitrage_single + + # + + arb_finder = bot._get_arb_finder("multi_triangle") + finder = arb_finder( + flashloan_tokens=flashloan_tokens, + CCm=CCm, + mode="bothin", + result=arb_finder.AO_CANDIDATES, + ConfigObj=bot.ConfigObj, + ) + r = finder.find_arbitrage() + multi_carbon_count = 0 + for arb in r: + ( + best_profit, + best_trade_instructions_df, + best_trade_instructions_dic, + best_src_token, + best_trade_instructions, + ) = arb + if len(best_trade_instructions_dic) > 3: + multi_carbon_count += 1 + tkn_in = None + tkn_out = None + # Find the first Carbon Curve to establish tknin and tknout + for curve in best_trade_instructions_dic: + if "-0" in curve['cid'] or "-1" in curve['cid']: + tkn_in = curve["tknin"] + tknout = curve["tknout"] + break + for curve in best_trade_instructions_dic: + if "-0" in curve['cid'] or "-1" in curve['cid']: + if curve["tknin"] in [tkn_in, tkn_out] and curve["tknout"] in [tkn_in, tkn_out]: + assert curve["tknin"] in tkn_in, f"[TestMultiTriangleMode] Finding Carbon curves in opposite directions - not supported in this mode." + assert curve["tknout"] in tkn_out, f"[TestMultiTriangleMode] Finding Carbon curves in opposite directions - not supported in this mode." + + assert multi_carbon_count > 0, f"[TestMultiTriangleMode] Not finding arbs with multiple Carbon curves." + assert len(r) >= 58, f"[TestMultiTriangleMode] Expected at least 58 arbs, found {len(r)}" + # - + + +# ------------------------------------------------------------ +# Test 901 +# File test_901_TestMultiTriangleModeSlow.py +# Segment Test Triangle Single +# ------------------------------------------------------------ +def test_test_triangle_single(): +# ------------------------------------------------------------ + + arb_finder = bot._get_arb_finder("triangle") + assert arb_finder.__name__ == "ArbitrageFinderTriangleSingle", f"[TestMultiTriangleMode] Expected arb_finder class name name = ArbitrageFinderTriangleSingle, found {arb_finder.__name__}" + + +# ------------------------------------------------------------ +# Test 901 +# File test_901_TestMultiTriangleModeSlow.py +# Segment Test_combos_triangle_single +# ------------------------------------------------------------ +def test_test_combos_triangle_single(): +# ------------------------------------------------------------ + + arb_finder = bot._get_arb_finder("triangle") + finder = arb_finder( + flashloan_tokens=flashloan_tokens, + CCm=CCm, + mode="bothin", + result=arb_finder.AO_TOKENS, + ConfigObj=bot.ConfigObj, + ) + combos = finder.get_combos(flashloan_tokens=flashloan_tokens, CCm=CCm, arb_mode="multi_triangle") + assert len(combos) >= 1225, f"[TestMultiTriangleMode] Using wrong dataset, expected at least 1225 combos, found {len(combos)}" + + +# ------------------------------------------------------------ +# Test 901 +# File test_901_TestMultiTriangleModeSlow.py +# Segment Test_Find_Arbitrage_Single +# ------------------------------------------------------------ +def test_test_find_arbitrage_single(): +# ------------------------------------------------------------ + + # + + arb_finder = bot._get_arb_finder("triangle") + finder = arb_finder( + flashloan_tokens=flashloan_tokens, + CCm=CCm, + mode="bothin", + result=arb_finder.AO_CANDIDATES, + ConfigObj=bot.ConfigObj, + ) + r = finder.find_arbitrage() + multi_carbon_count = 0 + for arb in r: + ( + best_profit, + best_trade_instructions_df, + best_trade_instructions_dic, + best_src_token, + best_trade_instructions, + ) = arb + if len(best_trade_instructions_dic) > 3: + multi_carbon_count += 1 + tkn_in = None + tkn_out = None + # Find the first Carbon Curve to establish tknin and tknout + for curve in best_trade_instructions_dic: + if "-0" in curve['cid'] or "-1" in curve['cid']: + tkn_in = curve["tknin"] + tknout = curve["tknout"] + break + for curve in best_trade_instructions_dic: + if "-0" in curve['cid'] or "-1" in curve['cid']: + if curve["tknin"] in [tkn_in, tkn_out] and curve["tknout"] in [tkn_in, tkn_out]: + assert curve["tknin"] in tkn_in, f"[TestMultiTriangleMode] Finding Carbon curves in opposite directions - not supported in this mode." + assert curve["tknout"] in tkn_out, f"[TestMultiTriangleMode] Finding Carbon curves in opposite directions - not supported in this mode." + + assert multi_carbon_count == 0, f"[TestMultiTriangleMode] Expected 0 arbs with multiple Carbon curves for Triangle Single mode, found {multi_carbon_count}." + assert len(r) >= 58, f"[TestMultiTriangleMode] Expected at least 58 arbs, found {len(r)}" + # - + + \ No newline at end of file diff --git a/fastlane_bot/tests/test_903_FlashloanTokens.py b/fastlane_bot/tests/test_903_FlashloanTokens.py new file mode 100644 index 000000000..9176782ab --- /dev/null +++ b/fastlane_bot/tests/test_903_FlashloanTokens.py @@ -0,0 +1,90 @@ +# ------------------------------------------------------------ +# Auto generated test file `test_903_FlashloanTokens.py` +# ------------------------------------------------------------ +# source file = NBTest_903_FlashloanTokens.py +# test id = 903 +# test comment = FlashloanTokens +# ------------------------------------------------------------ + + + +""" +This module contains the tests which ensure the the flashloan_tokens parameter is respected when using the b3_two_hop and bancor_v3 arb modes. +""" +from fastlane_bot import Bot +from fastlane_bot.tools.cpc import ConstantProductCurve as CPC +from fastlane_bot.events.exchanges import UniswapV2, UniswapV3, CarbonV1, BancorV3 +import subprocess, os, sys +import pytest +print("{0.__name__} v{0.__VERSION__} ({0.__DATE__})".format(CPC)) +print("{0.__name__} v{0.__VERSION__} ({0.__DATE__})".format(Bot)) +print("{0.__name__} v{0.__VERSION__} ({0.__DATE__})".format(UniswapV2)) +print("{0.__name__} v{0.__VERSION__} ({0.__DATE__})".format(UniswapV3)) +print("{0.__name__} v{0.__VERSION__} ({0.__DATE__})".format(CarbonV1)) +print("{0.__name__} v{0.__VERSION__} ({0.__DATE__})".format(BancorV3)) +from fastlane_bot.testing import * +plt.rcParams['figure.figsize'] = [12,6] +from fastlane_bot import __VERSION__ +require("3.0", __VERSION__) + + + + +def find_main_py(): + # Start at the directory of the current script + cwd = os.path.abspath(os.path.join(os.getcwd())) + + with open("log.txt", "w") as f: + f.write(f"Searching for main.py in {cwd}") + + print(f"Searching for main.py in {cwd}") + while True: + # Check if main.py exists in the current directory + if "main.py" in os.listdir(cwd): + return cwd # Found the directory containing main.py + else: + # If not, go up one directory + new_cwd = os.path.dirname(cwd) + + # If we're already at the root directory, stop searching + if new_cwd == cwd: + raise FileNotFoundError("Could not find main.py in any parent directory") + + cwd = new_cwd + + +def run_command(mode): + + # Find the correct path to main.py + main_script_path = find_main_py() + print(f"Found main.py in {main_script_path}") + main_script_path = main_script_path + "/main.py" + + # Run the command + cmd = [ + "python", + main_script_path, + f"--arb_mode={mode}", + "--default_min_profit_gas_token=60", + "--limit_bancor3_flashloan_tokens=True", + "--alchemy_max_block_fetch=5", + "--logging_path=fastlane_bot/data/", + "--timeout=120", + "--blockchain=ethereum" + ] + + expected_log_line = "limiting flashloan_tokens to [" + result = subprocess.run(cmd, text=True, capture_output=True, check=True) + assert expected_log_line in result.stderr, result.stderr + + +# ------------------------------------------------------------ +# Test 903 +# File test_903_FlashloanTokens.py +# Segment Test Flashloan Tokens b3_two_hop +# ------------------------------------------------------------ +def test_test_flashloan_tokens_b3_two_hop(): +# ------------------------------------------------------------ + + # + is_executing=true + run_command("b3_two_hop") \ No newline at end of file diff --git a/fastlane_bot/tests/test_906_TargetTokens.py b/fastlane_bot/tests/test_906_TargetTokens.py new file mode 100644 index 000000000..b2651b76c --- /dev/null +++ b/fastlane_bot/tests/test_906_TargetTokens.py @@ -0,0 +1,91 @@ +# ------------------------------------------------------------ +# Auto generated test file `test_906_TargetTokens.py` +# ------------------------------------------------------------ +# source file = NBTest_906_TargetTokens.py +# test id = 906 +# test comment = TargetTokens +# ------------------------------------------------------------ + + + +""" +This module contains the tests which ensure the target_tokens parameter is respected. +""" +from fastlane_bot import Bot +from fastlane_bot.tools.cpc import ConstantProductCurve as CPC +from fastlane_bot.events.exchanges import UniswapV2, UniswapV3, CarbonV1, BancorV3 +import subprocess, os, sys +import pytest +print("{0.__name__} v{0.__VERSION__} ({0.__DATE__})".format(CPC)) +print("{0.__name__} v{0.__VERSION__} ({0.__DATE__})".format(Bot)) +print("{0.__name__} v{0.__VERSION__} ({0.__DATE__})".format(UniswapV2)) +print("{0.__name__} v{0.__VERSION__} ({0.__DATE__})".format(UniswapV3)) +print("{0.__name__} v{0.__VERSION__} ({0.__DATE__})".format(CarbonV1)) +print("{0.__name__} v{0.__VERSION__} ({0.__DATE__})".format(BancorV3)) +from fastlane_bot.testing import * +plt.rcParams['figure.figsize'] = [12,6] +from fastlane_bot import __VERSION__ +require("3.0", __VERSION__) + + + +from fastlane_bot.tools.cpc import T + + +def find_main_py(): + # Start at the directory of the current script + cwd = os.path.abspath(os.path.join(os.getcwd())) + + with open("log.txt", "w") as f: + f.write(f"Searching for main.py in {cwd}") + + print(f"Searching for main.py in {cwd}") + while True: + # Check if main.py exists in the current directory + if "main.py" in os.listdir(cwd): + return cwd # Found the directory containing main.py + else: + # If not, go up one directory + new_cwd = os.path.dirname(cwd) + + # If we're already at the root directory, stop searching + if new_cwd == cwd: + raise FileNotFoundError("Could not find main.py in any parent directory") + + cwd = new_cwd + + +def run_command(mode): + + # Find the correct path to main.py + main_script_path = find_main_py() + print(f"Found main.py in {main_script_path}") + main_script_path = main_script_path + "/main.py" + + # Run the command + cmd = [ + "python", + main_script_path, + f"--arb_mode={mode}", + # "--use_cached_events=True", + "--alchemy_max_block_fetch=5", + "--logging_path=fastlane_bot/data/", + "--timeout=120", + f"--target_tokens={T.WETH},{T.DAI}", + "--blockchain=ethereum" + ] + + expected_log_line = "Limiting pools by target_tokens. Removed " + result = subprocess.run(cmd, text=True, capture_output=True, check=True) + assert expected_log_line in result.stderr, result.stderr + + +# ------------------------------------------------------------ +# Test 906 +# File test_906_TargetTokens.py +# Segment Test Flashloan Tokens b3_two_hop +# ------------------------------------------------------------ +def test_test_flashloan_tokens_b3_two_hop(): +# ------------------------------------------------------------ + + run_command("single") \ No newline at end of file