Skip to content
This repository has been archived by the owner on Aug 30, 2024. It is now read-only.

Commit

Permalink
sender WIp
Browse files Browse the repository at this point in the history
  • Loading branch information
alexeychr committed Dec 14, 2023
1 parent 1d927a6 commit ba878a5
Showing 1 changed file with 287 additions and 0 deletions.
287 changes: 287 additions & 0 deletions src/chain-evm/sender.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,287 @@
import { Logger } from 'pino';
import Web3 from 'web3';
import { assert } from '../errors';
import {
EIP1559GasExtension,
FeeManagerV2,
GasCategory,
LegacyGasExtension,
TransactionTemplate,
} from './fees/manager';

export type InputTx = {
from: string;
data: string;
to: string;
value?: string;
};

enum EvmRpcError {
NonceTooLow,
TransactionUnderpriced,
Stuck,
Other,
}

function stringToError(error: any): EvmRpcError {
const stringifiedError = `${error}`;
if (stringifiedError.match(/too low/i)) return EvmRpcError.NonceTooLow;
if (stringifiedError.match(/underpriced/i)) return EvmRpcError.TransactionUnderpriced;
if (stringifiedError.match(/not mined/i)) return EvmRpcError.Stuck;
// else if (stringifiedError.match(/execution reverted/i)) return Error.EstimationReverted;
// else if (stringifiedError.match(/Transaction has been reverted by the EVM/i)) return Error.Reverted;
return EvmRpcError.Other;
}

type PopulatedTxn = TransactionTemplate & (EIP1559GasExtension | LegacyGasExtension);

type Signer = (transactionConfig: Parameters<Web3['eth']['sendTransaction']>[0]) => string;

type TransactionSenderOpts = {
sendMaxAttempts: number;
pollingIntervalMs: number;
pollingMaxAttempts: number;
};

type Web3TransactionReceipt = Awaited<ReturnType<Web3['eth']['getTransactionReceipt']>>;
type Broadcast = {
tx: PopulatedTxn;
broadcastId: number;
attempt: number;
hash?: string;
broadcastFailed?: boolean;
};

const defaultOpts: TransactionSenderOpts = {
sendMaxAttempts: 4,
pollingIntervalMs: 5_000,
pollingMaxAttempts: 5,
};

export class TransactionSender {
#startedAt?: Date;

#resolvedAt?: Date;

readonly #inputTx: Readonly<InputTx>;

readonly #feeManager: FeeManagerV2;

readonly #signer: Signer;

readonly #connection: Web3;

readonly #logger: Logger;

readonly #id: number;

readonly #opts: TransactionSenderOpts;

readonly #broadcasts: Array<Broadcast> = [];

#latestBroadcast?: Broadcast;

constructor(
tx: InputTx,
connection: Web3,
manager: FeeManagerV2,
signer: Signer,
logger: Logger,
opts?: TransactionSenderOpts,
) {
this.#inputTx = tx;
this.#connection = connection;
this.#feeManager = manager;
this.#signer = signer;
this.#id = new Date().getTime();
this.#logger = logger.child({ [TransactionSender.name]: this.#id });
this.#opts = { ...(opts || {}), ...defaultOpts };
}

async send(): Promise<string> {
assert(!this.#startedAt && !this.#resolvedAt, `sending already initiated (${this.#id})`);
this.#startedAt = new Date();

const txReceipt = await this.run();

this.#logger.info(
`tx ${txReceipt.transactionHash} confirmed at block #${txReceipt.blockNumber} at attempt #${this.#broadcastAttemptsCount}`,
);
return txReceipt.transactionHash;
}

get #broadcastAttemptsCount() {
return this.#broadcasts.length;
}

private async run(): Promise<Web3TransactionReceipt> {
const template = await this.getTransactionTemplate();
const tx = await this.#feeManager.populateTx(template);
return this.getPromise(tx);
}

private getPromise(tx: PopulatedTxn): Promise<Web3TransactionReceipt> {
const broadcast = {
tx,
broadcastId: new Date().getTime(),
attempt: this.#broadcastAttemptsCount + 1,
};
if (broadcast.attempt > this.#opts.sendMaxAttempts) {
this.#logger.debug(
`no more attempts (${this.#broadcastAttemptsCount}/${this.#opts.sendMaxAttempts})`,
);
return Promise.reject(new Error(`max attempts to send a txn reached`));
}
this.#broadcasts.push(broadcast);

return new Promise((resolve, reject) => {
this.signAndBroadcast(broadcast).then(
(hash) => this.handleBroadcastSuccess(hash, broadcast).then(resolve, reject),
(err) => this.handleBroadcastRejection(err, broadcast).then(resolve, reject),
);
});
}

private pushReplacement(broadcast: Broadcast): Promise<Web3TransactionReceipt> {
const isLastAttempt = this.#opts.sendMaxAttempts === broadcast.attempt + 1;
return new Promise((resolve, reject) => {
this.#feeManager
.populateReplacementTx(
broadcast.tx,
broadcast.tx,
undefined,
isLastAttempt ? GasCategory.HIGH : undefined,
)
.then(
(tx) => {
resolve(this.getPromise(tx));
},
(err) => {
reject(new Error(`unable to populate txn: ${err}`));
},
);
});
}

private handleBroadcastSuccess(
hash: string,
broadcast: Broadcast,
): Promise<Web3TransactionReceipt> {
return new Promise((resolve, reject) => {
let pollingAttemptsLeft = this.#opts.pollingMaxAttempts;
let pollingInterval: NodeJS.Timer;
let locked = false;

const stopPolling = () => {
locked = false;
clearInterval(pollingInterval);
};

pollingInterval = setInterval(() => {
if (locked) return;
locked = true;

if (pollingAttemptsLeft-- <= 0) {
this.#logger.debug(`poller reached max attempts, trying to replace txn`);
stopPolling();
resolve(this.pushReplacement(broadcast));
}

this.#connection.eth.getTransactionReceipt(hash).then(
(transactionReceiptResult) => {
if (transactionReceiptResult.status === true) {
stopPolling();
resolve(transactionReceiptResult);
} else if (transactionReceiptResult?.status === false) {
stopPolling();
reject(new Error(`tx ${hash} reverted`));
}
},
(err) => {
this.#logger.debug(`unable to get txn receipt: ${err}, still working...`);
},
);

locked = false;
}, this.#opts.pollingIntervalMs);
});
}

private handleBroadcastRejection(e: any, broadcast: Broadcast) {
const error = stringToError(e);
if (error === EvmRpcError.NonceTooLow) {
return this.getPromise({ ...broadcast.tx, nonce: broadcast.tx.nonce + 1 });
}
if (error === EvmRpcError.TransactionUnderpriced) {
return this.pushReplacement(broadcast);
}
if (error === EvmRpcError.Stuck) {
return this.pushReplacement(broadcast);
}
return Promise.reject(e);
}

private signAndBroadcast(broadcast: Broadcast): Promise<string> {
this.#logger.info(`signing transaction: ${JSON.stringify(broadcast.tx)}`);
const signedTransactionData = this.#signer(broadcast.tx);
this.#latestBroadcast = broadcast;

return this.broadcast(signedTransactionData);
}

private async getTransactionTemplate(): Promise<TransactionTemplate> {
const tx = this.#inputTx;
const [nonce, gas, blockNumber] = await Promise.all([
this.#connection.eth.getTransactionCount(tx.from),
this.#feeManager.estimateTx(tx),
this.#connection.eth.getBlockNumber(),
]);

this.#logger.debug(`current block: #${blockNumber}`);
return {
from: tx.from,
to: tx.to,
data: tx.data,
value: tx.value,

gas,
nonce,
};
}

private broadcast(signedTransactionData: string): Promise<string> {
const broadcast = this.#latestBroadcast;
assert(!!broadcast, 'broadcast not set');

return new Promise((resolve, reject) => {
const errorHandler = (error: any) => {
this.#logger.error(`broadcast#${broadcast.attempt} failed: ${error}`);
this.#logger.error(error);
// do not raise any error if txn hash has been already retrieved; the upper level code would handle this
// by quering the txn receipt
if (broadcast.hash === undefined) {
broadcast.broadcastFailed = true;
reject(new Error(`broadcast#${broadcast.attempt} failed: ${error}`));
}
};

this.#logger.debug(`broadcasting (attempt#${broadcast.attempt}): ${signedTransactionData}`);

// kinda weird code below: THREE checks
try {
// try-catch block is needed because sendSignedTransaction() may throw an error during tx preparation (e.g., incorrect gas value)
this.#connection.eth
.sendSignedTransaction(signedTransactionData)
.on('error', errorHandler) // this is needed of RPC node raises an error
.once('transactionHash', (hash: string) => {
this.#logger.debug(`broadcast#${broadcast.attempt} succeeded, txHash: ${hash}`);
broadcast.hash = hash;
resolve(hash);
})
.catch(errorHandler); // this is needed to catch async errors occurred in another loop
} catch (error) {
errorHandler(error);
}
});
}
}

0 comments on commit ba878a5

Please sign in to comment.