From b14cb503939b738399965fd893143cc24595faf7 Mon Sep 17 00:00:00 2001 From: Doron Zavelevsky Date: Sat, 14 Dec 2024 01:09:11 +0000 Subject: [PATCH 1/2] fetching strategies for batched pairs --- src/chain-cache/ChainSync.ts | 154 +++++++++++++++++------------------ src/chain-cache/index.ts | 5 +- src/chain-cache/utils.ts | 12 --- src/common/types.ts | 6 ++ src/contracts-api/Reader.ts | 26 ++++++ 5 files changed, 109 insertions(+), 94 deletions(-) diff --git a/src/chain-cache/ChainSync.ts b/src/chain-cache/ChainSync.ts index 7cf73a9..0f80ba2 100644 --- a/src/chain-cache/ChainSync.ts +++ b/src/chain-cache/ChainSync.ts @@ -1,5 +1,5 @@ import { ChainCache } from './ChainCache'; -import { findAndRemoveLeading, toPairKey } from './utils'; +import { toPairKey } from './utils'; import { Logger } from '../common/logger'; import { BlockMetadata, @@ -18,16 +18,19 @@ export class ChainSync { private _chainCache: ChainCache; private _syncCalled: boolean = false; private _slowPollPairs: boolean = false; - private _pairs: TokenPair[] = []; + private _uncachedPairs: TokenPair[] = []; // keep the time stamp of last fetch private _lastFetch: number = Date.now(); - private _initialSyncDone: boolean = false; - private _maxBlockAge?: number; + private _numOfPairsToBatch: number; - constructor(fetcher: Fetcher, chainCache: ChainCache, maxBlockAge?: number) { + constructor( + fetcher: Fetcher, + chainCache: ChainCache, + numOfPairsToBatch: number = 100 + ) { this._fetcher = fetcher; this._chainCache = chainCache; - this._maxBlockAge = maxBlockAge; + this._numOfPairsToBatch = numOfPairsToBatch; } public async startDataSync(): Promise { @@ -43,62 +46,46 @@ export class ChainSync { logger.debug('startDataSync - cache is new', arguments); // cache starts from scratch so we want to avoid getting events from the beginning of time this._chainCache.applyBatchedUpdates(blockNumber, [], [], [], [], []); - } else if ( - this._maxBlockAge !== undefined && - blockNumber - latestBlockInCache > this._maxBlockAge - ) { - logger.debug( - `startDataSync - cache is too old: current block ${blockNumber}, cache block ${latestBlockInCache}`, - arguments - ); - // cache is too old so we want to clear it and avoid getting events from the beginning of time - this._chainCache.clear(true); - this._chainCache.applyBatchedUpdates(blockNumber, [], [], [], [], []); } // let's fetch all pairs from the chain and set them to the cache - to be used by the following syncs - await this._updatePairsFromChain(); + await this._updateUncachedPairsFromChain(); // _populateFeesData() should run first, before _populatePairsData() gets to manipulate the pairs list await Promise.all([ - this._populateFeesData(this._pairs), + this._populateFeesData(this._uncachedPairs), this._populatePairsData(), this._syncEvents(), ]); } // reads all pairs from chain and sets to private field - private async _updatePairsFromChain() { - logger.debug('_updatePairsFromChain fetches pairs'); - this._pairs = [...(await this._fetcher.pairs())]; - logger.debug('_updatePairsFromChain fetched pairs', this._pairs); + private async _updateUncachedPairsFromChain() { + logger.debug('_updateUncachedPairsFromChain fetches pairs'); + const pairs = await this._fetcher.pairs(); + logger.debug('_updateUncachedPairsFromChain fetched pairs', pairs); this._lastFetch = Date.now(); - if (this._pairs.length === 0) { + if (pairs.length === 0) { logger.error( - '_updatePairsFromChain fetched no pairs - this indicates a problem' + '_updateUncachedPairsFromChain fetched no pairs - this indicates a problem' ); } + + // let's filter the uncached pairs + this._uncachedPairs = pairs.filter( + (pair) => !this._chainCache.hasCachedPair(pair[0], pair[1]) + ); } - private async _populateFeesData( - pairs: TokenPair[], - skipCache = false - ): Promise { + private async _populateFeesData(pairs: TokenPair[]): Promise { logger.debug('populateFeesData called'); if (pairs.length === 0) { - logger.error('populateFeesData called with no pairs - skipping'); + logger.log('populateFeesData called with no pairs - skipping'); return; } - const uncachedPairs = skipCache - ? pairs - : pairs.filter( - (pair) => !this._chainCache.hasCachedPair(pair[0], pair[1]) - ); - - if (uncachedPairs.length === 0) return; const feeUpdates: [string, string, number][] = - await this._fetcher.pairsTradingFeePPM(uncachedPairs); + await this._fetcher.pairsTradingFeePPM(pairs); logger.debug('populateFeesData fetched fee updates', feeUpdates); @@ -109,11 +96,9 @@ export class ChainSync { // `_populatePairsData` sets timeout and returns immediately. It does the following: // 1. Fetches all token pairs from the fetcher - // 2. selects a pair that's not in the cache - // 3. fetches strategies for the pair - // 4. adds the pair to the cache - // 5. sets short timeout to continue with the next pair - // 6. if there are no more pairs, it sets a timeout to call itself again + // 2. fetches strategies for all uncached pairs + // 3. adds the pairs strategies to the cache + // 4. sets a timeout to call itself again private async _populatePairsData(): Promise { logger.debug('_populatePairsData called'); // this indicates we want to poll for pairs only once a minute. @@ -122,40 +107,33 @@ export class ChainSync { const processPairs = async () => { try { - if (this._pairs.length === 0) { + if (this._uncachedPairs.length === 0) { // if we have no pairs we need to fetch - unless we're in slow poll mode and less than a minute has passed since last fetch if (this._slowPollPairs && Date.now() - this._lastFetch < 60000) { // go back to sleep setTimeout(processPairs, 1000); return; } - await this._updatePairsFromChain(); + await this._updateUncachedPairsFromChain(); } - // let's find the first pair that's not in the cache and clear it from the list along with all the items before it - const nextPairToSync = findAndRemoveLeading( - this._pairs, - (pair) => !this._chainCache.hasCachedPair(pair[0], pair[1]) - ); - if (nextPairToSync) { - logger.debug('_populatePairsData adds pair to cache', nextPairToSync); - // we have a pair to sync - let's do it - add its strategies to the cache and then to minimal timeout to process the next pair - await this.syncPairData( - nextPairToSync[0], - nextPairToSync[1], - !this._initialSyncDone - ); - setTimeout(processPairs, 1); - } else { - // list is now empty and there are no more pairs to sync - we can poll them less frequently - // we will wake up once a second just to check if we're still in slow poll mode, - // but if not - we will actually poll once a minute + + if (this._uncachedPairs.length > 0) { logger.debug( - '_populatePairsData handled all pairs and goes to slow poll mode' + '_populatePairsData will now sync data for', + this._uncachedPairs ); - this._slowPollPairs = true; - this._initialSyncDone = true; - setTimeout(processPairs, 1000); + // we have pairs to sync - let's split them into batches - add their strategies to the cache and go into slow poll mode + await this._syncPairDataBatch(); } + // list is now empty and there are no more pairs to sync - we can poll them less frequently + // we will wake up once a second just to check if we're still in slow poll mode, + // but if not - we will actually poll once a minute + logger.debug( + '_populatePairsData handled all pairs and goes to slow poll mode' + ); + this._slowPollPairs = true; + setTimeout(processPairs, 1000); + return; } catch (e) { logger.error('Error while syncing pairs data', e); setTimeout(processPairs, 60000); @@ -164,11 +142,31 @@ export class ChainSync { setTimeout(processPairs, 1); } - public async syncPairData( - token0: string, - token1: string, - noPairAddedEvent: boolean = false - ): Promise { + private async _syncPairDataBatch(): Promise { + // Split all uncached pairs into batches + const batches: TokenPair[][] = []; + for ( + let i = 0; + i < this._uncachedPairs.length; + i += this._numOfPairsToBatch + ) { + batches.push(this._uncachedPairs.slice(i, i + this._numOfPairsToBatch)); + } + this._uncachedPairs = []; + const strategiesBatches = await Promise.all( + batches.map((batch) => this._fetcher.strategiesByPairs(batch)) + ); + strategiesBatches.flat().forEach((pairStrategies) => { + this._chainCache.addPair( + pairStrategies.pair[0], + pairStrategies.pair[1], + pairStrategies.strategies, + true + ); + }); + } + + public async syncPairData(token0: string, token1: string): Promise { if (!this._syncCalled) { throw new Error( 'ChainSync.startDataSync() must be called before syncPairData()' @@ -176,7 +174,7 @@ export class ChainSync { } const strategies = await this._fetcher.strategiesByPair(token0, token1); if (this._chainCache.hasCachedPair(token0, token1)) return; - this._chainCache.addPair(token0, token1, strategies, noPairAddedEvent); + this._chainCache.addPair(token0, token1, strategies, false); } // used to break the blocks between latestBlock + 1 and currentBlock to chunks of 1000 blocks @@ -348,10 +346,7 @@ export class ChainSync { logger.debug( '_syncEvents noticed at least one default fee update - refetching pair fees for all pairs' ); - await this._populateFeesData( - [...(await this._fetcher.pairs())], - true - ); + await this._populateFeesData([...(await this._fetcher.pairs())]); } if (newlyCreatedPairs.length > 0) { logger.debug( @@ -359,7 +354,7 @@ export class ChainSync { ); this._slowPollPairs = false; logger.debug('_syncEvents fetching fees for the new pairs'); - await this._populateFeesData(newlyCreatedPairs, true); + await this._populateFeesData(newlyCreatedPairs); } } } catch (err) { @@ -371,9 +366,8 @@ export class ChainSync { setTimeout(processEvents, 1); } private _resetPairsFetching() { - this._pairs = []; + this._uncachedPairs = []; this._slowPollPairs = false; - this._initialSyncDone = false; } private async _detectReorg(currentBlock: number): Promise { diff --git a/src/chain-cache/index.ts b/src/chain-cache/index.ts index 648fe02..a8edb53 100644 --- a/src/chain-cache/index.ts +++ b/src/chain-cache/index.ts @@ -11,6 +11,7 @@ export * from './types'; * use the ChainCache and ChainSync classes directly. * @param {Fetcher} fetcher - fetcher to use for syncing the cache * @param {string} cachedData - serialized cache data to initialize the cache with + * @param {number} numOfPairsToBatch - number of pairs to fetch in a single batch - adapt this value based on the RPC limits and testing * @returns an object with the initialized cache and a function to start syncing the cache * @example * const { cache, startDataSync } = initSyncedCache(fetcher, cachedData); @@ -20,7 +21,7 @@ export * from './types'; export const initSyncedCache = ( fetcher: Fetcher, cachedData?: string, - maxBlockAge?: number + numOfPairsToBatch?: number ): { cache: ChainCache; startDataSync: () => Promise } => { let cache: ChainCache | undefined; if (cachedData) { @@ -31,7 +32,7 @@ export const initSyncedCache = ( cache = new ChainCache(); } - const syncer = new ChainSync(fetcher, cache, maxBlockAge); + const syncer = new ChainSync(fetcher, cache, numOfPairsToBatch); cache.setCacheMissHandler(syncer.syncPairData.bind(syncer)); return { cache, startDataSync: syncer.startDataSync.bind(syncer) }; }; diff --git a/src/chain-cache/utils.ts b/src/chain-cache/utils.ts index 0dbc43e..f860369 100644 --- a/src/chain-cache/utils.ts +++ b/src/chain-cache/utils.ts @@ -39,18 +39,6 @@ export const toDirectionKey = (token0: string, token1: string): string => { return toKey([token0, token1]); }; -// find and return an element in an array, and remove it and all elements before it. If not found, remove all elements. -export const findAndRemoveLeading = ( - arr: T[], - predicate: (value: T) => boolean -): T | undefined => { - let element = undefined; - do { - element = arr.shift(); - } while (element && !predicate(element)); - return element; -}; - export function isOrderTradable(order: EncodedOrder): boolean { return order.y.gt(0) && (order.A.gt(0) || order.B.gt(0)); } diff --git a/src/common/types.ts b/src/common/types.ts index 2ed8f8b..19dd3af 100644 --- a/src/common/types.ts +++ b/src/common/types.ts @@ -154,6 +154,12 @@ export type BlockMetadata = { export interface Fetcher { pairs(): Promise; strategiesByPair(token0: string, token1: string): Promise; + strategiesByPairs(pairs: TokenPair[]): Promise< + { + pair: TokenPair; + strategies: EncodedStrategy[]; + }[] + >; pairTradingFeePPM(token0: string, token1: string): Promise; pairsTradingFeePPM(pairs: TokenPair[]): Promise<[string, string, number][]>; tradingFeePPM(): Promise; diff --git a/src/contracts-api/Reader.ts b/src/contracts-api/Reader.ts index f8a52bf..0db16d6 100644 --- a/src/contracts-api/Reader.ts +++ b/src/contracts-api/Reader.ts @@ -103,6 +103,32 @@ export default class Reader implements Fetcher { return res.map((r) => toStrategy(r)); } + // TODO: add a method to get all strategies by a list of pairs. Returns a collection of pairs and their strategies. It will use multicall to call strategiesByPair method from the contracts. + public async strategiesByPairs(pairs: TokenPair[]): Promise< + { + pair: TokenPair; + strategies: EncodedStrategy[]; + }[] + > { + const results = await this._multicall( + pairs.map((pair) => ({ + contractAddress: this._contracts.carbonController.address, + interface: this._contracts.carbonController.interface, + methodName: 'strategiesByPair', + methodParameters: [pair[0], pair[1], 0, 0], + })) + ); + if (!results || results.length === 0) return []; + console.debug('results', results); + return results.map((result, i) => { + const strategiesResult = result[0] as StrategyStructOutput[]; + return { + pair: pairs[i], + strategies: strategiesResult.map((r) => toStrategy(r)), + }; + }); + } + public async tokensByOwner(owner: string) { if (!owner) return []; From 63640674ca4575ebc9a0895cb8969898ccb6890e Mon Sep 17 00:00:00 2001 From: Doron Zavelevsky Date: Sat, 14 Dec 2024 01:16:42 +0000 Subject: [PATCH 2/2] error handling in sync --- src/chain-cache/ChainSync.ts | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/src/chain-cache/ChainSync.ts b/src/chain-cache/ChainSync.ts index 0f80ba2..c58fc62 100644 --- a/src/chain-cache/ChainSync.ts +++ b/src/chain-cache/ChainSync.ts @@ -152,18 +152,24 @@ export class ChainSync { ) { batches.push(this._uncachedPairs.slice(i, i + this._numOfPairsToBatch)); } - this._uncachedPairs = []; - const strategiesBatches = await Promise.all( - batches.map((batch) => this._fetcher.strategiesByPairs(batch)) - ); - strategiesBatches.flat().forEach((pairStrategies) => { - this._chainCache.addPair( - pairStrategies.pair[0], - pairStrategies.pair[1], - pairStrategies.strategies, - true + + try { + const strategiesBatches = await Promise.all( + batches.map((batch) => this._fetcher.strategiesByPairs(batch)) ); - }); + strategiesBatches.flat().forEach((pairStrategies) => { + this._chainCache.addPair( + pairStrategies.pair[0], + pairStrategies.pair[1], + pairStrategies.strategies, + true + ); + }); + this._uncachedPairs = []; + } catch (error) { + logger.error('Failed to fetch strategies for pairs batch:', error); + throw error; // Re-throw to be handled by caller + } } public async syncPairData(token0: string, token1: string): Promise {