Skip to content

Commit

Permalink
feat: move event listener outside of the main loop
Browse files Browse the repository at this point in the history
feat: disabled events from filters
  • Loading branch information
platonfloria committed Apr 30, 2024
1 parent 5be7953 commit 525feb2
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 95 deletions.
41 changes: 0 additions & 41 deletions fastlane_bot/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,21 +46,15 @@
__VERSION__ = "3-b2.2"
__DATE__ = "20/June/2023"

import asyncio
import random
import json
import os
from _decimal import Decimal
from dataclasses import dataclass, asdict, field
from datetime import datetime
from threading import Thread
from typing import Generator, List, Dict, Tuple, Any, Callable
from typing import Optional

from web3 import Web3, AsyncWeb3
from web3.providers import WebsocketProviderV2


import fastlane_bot
from fastlane_bot.config import Config
from fastlane_bot.helpers import (
Expand Down Expand Up @@ -915,20 +909,6 @@ def get_tokens_in_exchange(
"""
return self.db.get_tokens_from_exchange(exchange_name=exchange_name)

def run_event_listener(self):
async def inner(mgr):
from fastlane_bot.events.listener import EventListener

base_exchanges = ["carbon_v1", "uniswap_v3", "uniswap_v2", "bancor_pol", "bancor_v2", "bancor_v3", "solidly_v2"]
async with AsyncWeb3.persistent_websocket(WebsocketProviderV2(mgr.cfg.network.WEBSOCKET_URL)) as w3:
event_listener = EventListener(manager=mgr, base_exchanges=base_exchanges, w3=w3)
async for events in event_listener.pull_block_events():
for event in events:
print(event)
mgr.update_from_event(event)

asyncio.run(inner(self.db.mgr))

def run(
self,
*,
Expand Down Expand Up @@ -982,24 +962,3 @@ def run(
)
except self.NoArbAvailable as e:
self.ConfigObj.logger.info(e)

thread = Thread(target=self.run_event_listener, args=(), daemon=True)
thread.start()

while True: # TODO: remove
...

if mode == "continuous":
self.run_continuous_mode(
flashloan_tokens, arb_mode, run_data_validator, randomizer
)
else:
self.run_single_mode(
flashloan_tokens,
CCm,
arb_mode,
run_data_validator,
randomizer,
replay_mode,
tenderly_fork,
)
2 changes: 2 additions & 0 deletions fastlane_bot/config/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,8 @@ class _ConfigNetworkSei(ConfigNetwork):
RPC_ENDPOINT = "https://evm-rpc.arctic-1.seinetwork.io/" # TODO currently Sei devnet
WEB3_ALCHEMY_PROJECT_ID = os.environ.get("WEB3_SEI")


WEBSOCKET_URL = "wss://evm-ws.arctic-1.seinetwork.io"
network_df = get_multichain_addresses(network=NETWORK_NAME)
FASTLANE_CONTRACT_ADDRESS = "0xC7Dd38e64822108446872c5C2105308058c5C55C" #TODO - UPDATE WITH Mainnet
MULTICALL_CONTRACT_ADDRESS = "0x1E05037b9c4fEFaF3c45CD6F4F2C3197e6A43cD8" # previously 0xcA11bde05977b3631167028862bE2a173976CA11
Expand Down
2 changes: 1 addition & 1 deletion fastlane_bot/events/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ async def _get_batched_events(self):
batch = self._event_buffer.copy()
self._event_buffer = []
yield batch
await asyncio.sleep(min(0.01, self._last_event_ts + self.NEW_EVENT_TIMEOUT - ts))
await asyncio.sleep(min(self.NEW_EVENT_TIMEOUT, self._last_event_ts + self.NEW_EVENT_TIMEOUT - ts))

async def _listen(self):
async for response in self._w3.ws.process_subscriptions():
Expand Down
137 changes: 84 additions & 53 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
(c) Copyright Bprotocol foundation 2023.
Licensed under MIT
"""
import asyncio

from fastlane_bot.exceptions import AsyncUpdateRetryException, ReadOnlyException, FlashloanUnavailableException
from fastlane_bot.events.version_utils import check_version_requirements
from fastlane_bot.tools.cpc import T
Expand All @@ -13,11 +15,13 @@

import os, sys
import time
from threading import Lock, Thread
from traceback import format_exc

import pandas as pd
from dotenv import load_dotenv
from web3 import Web3, HTTPProvider
from web3 import AsyncWeb3, Web3, HTTPProvider
from web3.providers import WebsocketProviderV2

from fastlane_bot import __version__ as bot_version
from fastlane_bot.events.async_backdate_utils import (
Expand Down Expand Up @@ -63,6 +67,9 @@
load_dotenv()


pool_data_lock = Lock()


def process_arguments(args):
"""
Process and transform command line arguments.
Expand Down Expand Up @@ -298,23 +305,55 @@ def main(args: argparse.Namespace) -> None:
# Add initial pool data to the manager
add_initial_pool_data(cfg, mgr, args.n_jobs)

handle_static_pools_update(mgr)

# Handle the initial iteration (backdate pools, update pools from contracts, etc.)
async_handle_initial_iteration(
backdate_pools=args.backdate_pools,
last_block=0,
mgr=mgr,
start_block=mgr.web3.eth.block_number if args.replay_from_block is None else args.replay_from_block,
)

thread = Thread(target=run_event_listener, args=(mgr,), daemon=True)
thread.start()

# Run the main loop
run(mgr, args)


def run_event_listener(mgr):
async def inner(mgr):
from fastlane_bot.events.listener import EventListener

base_exchanges = ["carbon_v1", "uniswap_v3", "uniswap_v2", "bancor_pol", "bancor_v2", "bancor_v3", "solidly_v2"]
async with AsyncWeb3.persistent_websocket(WebsocketProviderV2(mgr.cfg.network.WEBSOCKET_URL)) as w3:
event_listener = EventListener(manager=mgr, base_exchanges=base_exchanges, w3=w3)
async for events in event_listener.pull_block_events():
with pool_data_lock:
for event in events:
print(event)
mgr.update_from_event(event)

with pool_data_lock:
current_block = mgr.web3.eth.block_number

# Update new pool events from contracts
if len(mgr.pools_to_add_from_contracts) > 0:
mgr.cfg.logger.info(
f"Adding {len(mgr.pools_to_add_from_contracts)} new pools from contracts, "
f"{len(mgr.pool_data)} total pools currently exist. Current block: {current_block}."
)
_run_async_update_with_retries(mgr, current_block=current_block)
mgr.pools_to_add_from_contracts = []

asyncio.run(inner(mgr))


def run(mgr, args, tenderly_uri=None) -> None:
loop_idx = last_block = last_block_queried = total_iteration_time = 0
start_timeout = time.time()
mainnet_uri = mgr.cfg.w3.provider.endpoint_uri
handle_static_pools_update(mgr)

# Handle the initial iteration (backdate pools, update pools from contracts, etc.)
async_handle_initial_iteration(
backdate_pools=args.backdate_pools,
last_block=last_block,
mgr=mgr,
start_block=args.replay_from_block,
)

while True:
try:
Expand Down Expand Up @@ -349,9 +388,9 @@ def run(mgr, args, tenderly_uri=None) -> None:
)

# Log the current start, end and last block
mgr.cfg.logger.info(
f"Fetching events from {start_block} to {current_block}... {last_block}"
)
# mgr.cfg.logger.info(
# f"Fetching events from {start_block} to {current_block}... {last_block}"
# )

# Set the network connection to Mainnet if replaying from a block
set_network_to_mainnet_if_replay(
Expand Down Expand Up @@ -380,15 +419,6 @@ def run(mgr, args, tenderly_uri=None) -> None:
# Update the pools from the latest events
#update_pools_from_events(args.n_jobs, mgr, latest_events)

# Update new pool events from contracts
if len(mgr.pools_to_add_from_contracts) > 0:
mgr.cfg.logger.info(
f"Adding {len(mgr.pools_to_add_from_contracts)} new pools from contracts, "
f"{len(mgr.pool_data)} total pools currently exist. Current block: {current_block}."
)
_run_async_update_with_retries(mgr, current_block=current_block)
mgr.pools_to_add_from_contracts = []

# Increment the loop index
loop_idx += 1

Expand Down Expand Up @@ -425,39 +455,40 @@ def run(mgr, args, tenderly_uri=None) -> None:
# Re-initialize the bot
bot = init_bot(mgr)

# Verify that the state has changed
verify_state_changed(bot=bot, initial_state=initial_state, mgr=mgr)

# Verify that the minimum profit in BNT is respected
verify_min_bnt_is_respected(bot=bot, mgr=mgr)

if args.use_specific_exchange_for_target_tokens is not None:
target_tokens = bot.get_tokens_in_exchange(
exchange_name=args.use_specific_exchange_for_target_tokens
)
mgr.cfg.logger.info(
f"[main] Using only tokens in: {args.use_specific_exchange_for_target_tokens}, found {len(target_tokens)} tokens"
with pool_data_lock:
# Verify that the state has changed
verify_state_changed(bot=bot, initial_state=initial_state, mgr=mgr)

# Verify that the minimum profit in BNT is respected
# verify_min_bnt_is_respected(bot=bot, mgr=mgr)

if args.use_specific_exchange_for_target_tokens is not None:
target_tokens = bot.get_tokens_in_exchange(
exchange_name=args.use_specific_exchange_for_target_tokens
)
mgr.cfg.logger.info(
f"[main] Using only tokens in: {args.use_specific_exchange_for_target_tokens}, found {len(target_tokens)} tokens"
)

if not mgr.read_only:
handle_tokens_csv(mgr, mgr.prefix_path)

# Handle subsequent iterations
handle_subsequent_iterations(
arb_mode=args.arb_mode,
bot=bot,
flashloan_tokens=args.flashloan_tokens,
randomizer=args.randomizer,
run_data_validator=args.run_data_validator,
target_tokens=args.target_tokens,
loop_idx=loop_idx,
logging_path=args.logging_path,
replay_from_block=replay_from_block,
tenderly_uri=tenderly_uri,
mgr=mgr,
forked_from_block=forked_from_block,
)

if not mgr.read_only:
handle_tokens_csv(mgr, mgr.prefix_path)

# Handle subsequent iterations
handle_subsequent_iterations(
arb_mode=args.arb_mode,
bot=bot,
flashloan_tokens=args.flashloan_tokens,
randomizer=args.randomizer,
run_data_validator=args.run_data_validator,
target_tokens=args.target_tokens,
loop_idx=loop_idx,
logging_path=args.logging_path,
replay_from_block=replay_from_block,
tenderly_uri=tenderly_uri,
mgr=mgr,
forked_from_block=forked_from_block,
)

# Sleep for the polling interval
if not replay_from_block and args.polling_interval > 0:
mgr.cfg.logger.info(
Expand Down

0 comments on commit 525feb2

Please sign in to comment.