diff --git a/src/chain-evm/sender.ts b/src/chain-evm/sender.ts new file mode 100644 index 0000000..36602ee --- /dev/null +++ b/src/chain-evm/sender.ts @@ -0,0 +1,287 @@ +import { Logger } from 'pino'; +import Web3 from 'web3'; +import { assert } from '../errors'; +import { + EIP1559GasExtension, + FeeManagerV2, + GasCategory, + LegacyGasExtension, + TransactionTemplate, +} from './fees/manager'; + +export type InputTx = { + from: string; + data: string; + to: string; + value?: string; +}; + +enum EvmRpcError { + NonceTooLow, + TransactionUnderpriced, + Stuck, + Other, +} + +function stringToError(error: any): EvmRpcError { + const stringifiedError = `${error}`; + if (stringifiedError.match(/too low/i)) return EvmRpcError.NonceTooLow; + if (stringifiedError.match(/underpriced/i)) return EvmRpcError.TransactionUnderpriced; + if (stringifiedError.match(/not mined/i)) return EvmRpcError.Stuck; + // else if (stringifiedError.match(/execution reverted/i)) return Error.EstimationReverted; + // else if (stringifiedError.match(/Transaction has been reverted by the EVM/i)) return Error.Reverted; + return EvmRpcError.Other; +} + +type PopulatedTxn = TransactionTemplate & (EIP1559GasExtension | LegacyGasExtension); + +type Signer = (transactionConfig: Parameters[0]) => string; + +type TransactionSenderOpts = { + sendMaxAttempts: number; + pollingIntervalMs: number; + pollingMaxAttempts: number; +}; + +type Web3TransactionReceipt = Awaited>; +type Broadcast = { + tx: PopulatedTxn; + broadcastId: number; + attempt: number; + hash?: string; + broadcastFailed?: boolean; +}; + +const defaultOpts: TransactionSenderOpts = { + sendMaxAttempts: 4, + pollingIntervalMs: 5_000, + pollingMaxAttempts: 5, +}; + +export class TransactionSender { + #startedAt?: Date; + + #resolvedAt?: Date; + + readonly #inputTx: Readonly; + + readonly #feeManager: FeeManagerV2; + + readonly #signer: Signer; + + readonly #connection: Web3; + + readonly #logger: Logger; + + readonly #id: number; + + readonly #opts: TransactionSenderOpts; + + readonly #broadcasts: Array = []; + + #latestBroadcast?: Broadcast; + + constructor( + tx: InputTx, + connection: Web3, + manager: FeeManagerV2, + signer: Signer, + logger: Logger, + opts?: TransactionSenderOpts, + ) { + this.#inputTx = tx; + this.#connection = connection; + this.#feeManager = manager; + this.#signer = signer; + this.#id = new Date().getTime(); + this.#logger = logger.child({ [TransactionSender.name]: this.#id }); + this.#opts = { ...(opts || {}), ...defaultOpts }; + } + + async send(): Promise { + assert(!this.#startedAt && !this.#resolvedAt, `sending already initiated (${this.#id})`); + this.#startedAt = new Date(); + + const txReceipt = await this.run(); + + this.#logger.info( + `tx ${txReceipt.transactionHash} confirmed at block #${txReceipt.blockNumber} at attempt #${this.#broadcastAttemptsCount}`, + ); + return txReceipt.transactionHash; + } + + get #broadcastAttemptsCount() { + return this.#broadcasts.length; + } + + private async run(): Promise { + const template = await this.getTransactionTemplate(); + const tx = await this.#feeManager.populateTx(template); + return this.getPromise(tx); + } + + private getPromise(tx: PopulatedTxn): Promise { + const broadcast = { + tx, + broadcastId: new Date().getTime(), + attempt: this.#broadcastAttemptsCount + 1, + }; + if (broadcast.attempt > this.#opts.sendMaxAttempts) { + this.#logger.debug( + `no more attempts (${this.#broadcastAttemptsCount}/${this.#opts.sendMaxAttempts})`, + ); + return Promise.reject(new Error(`max attempts to send a txn reached`)); + } + this.#broadcasts.push(broadcast); + + return new Promise((resolve, reject) => { + this.signAndBroadcast(broadcast).then( + (hash) => this.handleBroadcastSuccess(hash, broadcast).then(resolve, reject), + (err) => this.handleBroadcastRejection(err, broadcast).then(resolve, reject), + ); + }); + } + + private pushReplacement(broadcast: Broadcast): Promise { + const isLastAttempt = this.#opts.sendMaxAttempts === broadcast.attempt + 1; + return new Promise((resolve, reject) => { + this.#feeManager + .populateReplacementTx( + broadcast.tx, + broadcast.tx, + undefined, + isLastAttempt ? GasCategory.HIGH : undefined, + ) + .then( + (tx) => { + resolve(this.getPromise(tx)); + }, + (err) => { + reject(new Error(`unable to populate txn: ${err}`)); + }, + ); + }); + } + + private handleBroadcastSuccess( + hash: string, + broadcast: Broadcast, + ): Promise { + return new Promise((resolve, reject) => { + let pollingAttemptsLeft = this.#opts.pollingMaxAttempts; + let pollingInterval: NodeJS.Timer; + let locked = false; + + const stopPolling = () => { + locked = false; + clearInterval(pollingInterval); + }; + + pollingInterval = setInterval(() => { + if (locked) return; + locked = true; + + if (pollingAttemptsLeft-- <= 0) { + this.#logger.debug(`poller reached max attempts, trying to replace txn`); + stopPolling(); + resolve(this.pushReplacement(broadcast)); + } + + this.#connection.eth.getTransactionReceipt(hash).then( + (transactionReceiptResult) => { + if (transactionReceiptResult.status === true) { + stopPolling(); + resolve(transactionReceiptResult); + } else if (transactionReceiptResult?.status === false) { + stopPolling(); + reject(new Error(`tx ${hash} reverted`)); + } + }, + (err) => { + this.#logger.debug(`unable to get txn receipt: ${err}, still working...`); + }, + ); + + locked = false; + }, this.#opts.pollingIntervalMs); + }); + } + + private handleBroadcastRejection(e: any, broadcast: Broadcast) { + const error = stringToError(e); + if (error === EvmRpcError.NonceTooLow) { + return this.getPromise({ ...broadcast.tx, nonce: broadcast.tx.nonce + 1 }); + } + if (error === EvmRpcError.TransactionUnderpriced) { + return this.pushReplacement(broadcast); + } + if (error === EvmRpcError.Stuck) { + return this.pushReplacement(broadcast); + } + return Promise.reject(e); + } + + private signAndBroadcast(broadcast: Broadcast): Promise { + this.#logger.info(`signing transaction: ${JSON.stringify(broadcast.tx)}`); + const signedTransactionData = this.#signer(broadcast.tx); + this.#latestBroadcast = broadcast; + + return this.broadcast(signedTransactionData); + } + + private async getTransactionTemplate(): Promise { + const tx = this.#inputTx; + const [nonce, gas, blockNumber] = await Promise.all([ + this.#connection.eth.getTransactionCount(tx.from), + this.#feeManager.estimateTx(tx), + this.#connection.eth.getBlockNumber(), + ]); + + this.#logger.debug(`current block: #${blockNumber}`); + return { + from: tx.from, + to: tx.to, + data: tx.data, + value: tx.value, + + gas, + nonce, + }; + } + + private broadcast(signedTransactionData: string): Promise { + const broadcast = this.#latestBroadcast; + assert(!!broadcast, 'broadcast not set'); + + return new Promise((resolve, reject) => { + const errorHandler = (error: any) => { + this.#logger.error(`broadcast#${broadcast.attempt} failed: ${error}`); + this.#logger.error(error); + // do not raise any error if txn hash has been already retrieved; the upper level code would handle this + // by quering the txn receipt + if (broadcast.hash === undefined) { + broadcast.broadcastFailed = true; + reject(new Error(`broadcast#${broadcast.attempt} failed: ${error}`)); + } + }; + + this.#logger.debug(`broadcasting (attempt#${broadcast.attempt}): ${signedTransactionData}`); + + // kinda weird code below: THREE checks + try { + // try-catch block is needed because sendSignedTransaction() may throw an error during tx preparation (e.g., incorrect gas value) + this.#connection.eth + .sendSignedTransaction(signedTransactionData) + .on('error', errorHandler) // this is needed of RPC node raises an error + .once('transactionHash', (hash: string) => { + this.#logger.debug(`broadcast#${broadcast.attempt} succeeded, txHash: ${hash}`); + broadcast.hash = hash; + resolve(hash); + }) + .catch(errorHandler); // this is needed to catch async errors occurred in another loop + } catch (error) { + errorHandler(error); + } + }); + } +}