Skip to content

Commit

Permalink
feat: handle batch too large errors from node
Browse files Browse the repository at this point in the history
  • Loading branch information
BobTheBuidler committed Aug 15, 2023
1 parent 834fe5e commit 305784a
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 20 deletions.
8 changes: 7 additions & 1 deletion dank_mids/_exceptions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@

from typing import TYPE_CHECKING, Union
import logging
import re
from typing import TYPE_CHECKING, Union

from aiohttp.client_exceptions import ClientResponseError

Expand All @@ -24,6 +25,11 @@ class ResponseNotReady(ValueError):
class PayloadTooLarge(BadResponse):
pass

class ExceedsMaxBatchSize(BadResponse):
@property
def limit(self) -> int:
return int(re.match(r'batch limit (\d+) exceeded', self.response.error.message).group(1))

class DankMidsClientResponseError(ClientResponseError):
"""A wrapper around the standard aiohttp ClientResponseError that attaches the request that generated the error."""
def __init__(
Expand Down
37 changes: 23 additions & 14 deletions dank_mids/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,19 +137,28 @@ def _reduce_chunk_size(self, num_calls: int, chunk_name: Literal["multicall", "j
return
# NOTE: We need the 2nd check because one of the other calls in a batch might have already reduced the chunk size
if chunk_name == "jsonrpc batch":
if new_chunk_size < ENVS.MAX_JSONRPC_BATCH_SIZE:
old_chunk_size = ENVS.MAX_JSONRPC_BATCH_SIZE
ENVS.MAX_JSONRPC_BATCH_SIZE = new_chunk_size
else:
logger.info("new chunk size %s is not lower than max batch size %s", new_chunk_size, str(ENVS.MAX_JSONRPC_BATCH_SIZE))
return
self.set_batch_size_limit(new_chunk_size)
logger.info("The failed batch had %s calls", num_calls)
return
elif chunk_name == "multicall":
if new_chunk_size < self.batcher.step:
old_chunk_size = self.batcher.step
self.batcher.step = new_chunk_size
else:
logger.info("new chunk size %s is not lower than batcher step %s", new_chunk_size, self.batcher.step)
return
self.set_multicall_size_limit(new_chunk_size)
logger.info("The failed multicall had %s calls", num_calls)
return
raise DankMidsInternalError(ValueError(f"chunk name {chunk_name} is invalid"))

def set_multicall_size_limit(self, new_limit: int) -> None:
existing_limit = self.batcher.step
if new_limit < existing_limit:
self.batcher.step = new_limit
logger.warning("multicall size limit reduced from %s to %s", existing_limit, new_limit)
else:
logger.info("new multicall size limit %s is not lower than existing limit %s", new_limit, existing_limit)

def set_batch_size_limit(self, new_limit: int) -> None:
existing_limit = ENVS.MAX_JSONRPC_BATCH_SIZE
if new_limit < existing_limit:
ENVS.MAX_JSONRPC_BATCH_SIZE = new_limit
logger.warning("jsonrpc batch size limit reduced from %s to %s", existing_limit, new_limit)
else:
raise DankMidsInternalError(ValueError(f"chunk name {chunk_name} is invalid"))
logger.warning(f'{chunk_name} batch size reduced from {old_chunk_size} to {new_chunk_size}. The failed batch had {num_calls} calls.')
logger.info("new jsonrpc batch size limit %s is not lower than existing limit %s", new_limit, int(existing_limit))

11 changes: 8 additions & 3 deletions dank_mids/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
from dank_mids._demo_mode import demo_logger
from dank_mids._exceptions import (BadResponse, DankMidsClientResponseError,
DankMidsInternalError, EmptyBatch,
PayloadTooLarge, ResponseNotReady,
internal_err_types)
ExceedsMaxBatchSize, PayloadTooLarge,
ResponseNotReady, internal_err_types)
from dank_mids.helpers import decode, session
from dank_mids.helpers.helpers import set_done
from dank_mids.types import (BatchId, BlockId, JSONRPCBatchResponse,
Expand Down Expand Up @@ -397,7 +397,7 @@ def should_retry(self, e: Exception) -> bool:
if "out of gas" in f"{e}":
# TODO Remember which contracts/calls are gas guzzlers
logger.debug('out of gas. cut in half, trying again')
elif isinstance(e, PayloadTooLarge) or any(err in f"{e}".lower() for err in constants.RETRY_ERRS):
elif any(err in f"{e}".lower() for err in constants.RETRY_ERRS):
# TODO: use these exceptions to optimize for the user's node
logger.debug('Dank too loud. Bisecting batch and retrying.')
elif isinstance(e, BadResponse) and 'invalid request' in f"{e}":
Expand Down Expand Up @@ -630,6 +630,11 @@ async def get_response(self) -> None:
raise DankMidsInternalError(e) from e
except EmptyBatch as e:
logger.warning("These EmptyBatch exceptions shouldn't actually happen and this except clause can probably be removed soon.")
except ExceedsMaxBatchSize as e:
print('asd')
logger.warning("exceeded max batch size for your node")
self.controller.set_batch_size_limit(e.limit)
await self.bisect_and_retry(e)
except PayloadTooLarge as e:
# TODO: track too large payloads and do some better optimizations for batch sizing
self.adjust_batch_size()
Expand Down
9 changes: 7 additions & 2 deletions dank_mids/types.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import re
from time import time
from typing import (TYPE_CHECKING, Any, Callable, Coroutine, DefaultDict, Dict,
List, Literal, NewType, Optional, TypedDict, TypeVar,
Expand All @@ -10,7 +11,7 @@
from web3.types import RPCEndpoint, RPCResponse

from dank_mids import constants, stats
from dank_mids._exceptions import BadResponse, PayloadTooLarge
from dank_mids._exceptions import BadResponse, ExceedsMaxBatchSize, PayloadTooLarge

if TYPE_CHECKING:
from dank_mids.requests import Multicall
Expand Down Expand Up @@ -96,7 +97,11 @@ class PartialResponse(_DictStruct):
def exception(self) -> BadResponse:
if self.error is None:
raise AttributeError(f"{self} did not error.")
return PayloadTooLarge(self) if self.payload_too_large else BadResponse(self)
return (
PayloadTooLarge(self) if self.payload_too_large
else ExceedsMaxBatchSize(self) if re.match(r'batch limit (\d+) exceeded', self.error.message)
else BadResponse(self)
)

@property
def payload_too_large(self) -> bool:
Expand Down
3 changes: 3 additions & 0 deletions tests/test_dank_mids.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ def test_bad_hex_handling():
assert await_awaitable(Call(chainlinkfeed, 'latestAnswer()(uint)', block_id=14_000_000).coroutine()) == 15717100
assert chainlinkfeed in _get_controller().no_multicall

def test_json_batch():
await_awaitable(gather(MULTIBLOCK_WORK))

def test_next_cid():
assert _get_controller().call_uid.next + 1 == _get_controller().call_uid.next

Expand Down

0 comments on commit 305784a

Please sign in to comment.