Skip to content

Commit

Permalink
fetching strategies for batched pairs
Browse files Browse the repository at this point in the history
  • Loading branch information
zavelevsky committed Dec 14, 2024
1 parent 692d6a3 commit b14cb50
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 94 deletions.
154 changes: 74 additions & 80 deletions src/chain-cache/ChainSync.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { ChainCache } from './ChainCache';
import { findAndRemoveLeading, toPairKey } from './utils';
import { toPairKey } from './utils';
import { Logger } from '../common/logger';
import {
BlockMetadata,
Expand All @@ -18,16 +18,19 @@ export class ChainSync {
private _chainCache: ChainCache;
private _syncCalled: boolean = false;
private _slowPollPairs: boolean = false;
private _pairs: TokenPair[] = [];
private _uncachedPairs: TokenPair[] = [];
// keep the time stamp of last fetch
private _lastFetch: number = Date.now();
private _initialSyncDone: boolean = false;
private _maxBlockAge?: number;
private _numOfPairsToBatch: number;

constructor(fetcher: Fetcher, chainCache: ChainCache, maxBlockAge?: number) {
constructor(
fetcher: Fetcher,
chainCache: ChainCache,
numOfPairsToBatch: number = 100
) {
this._fetcher = fetcher;
this._chainCache = chainCache;
this._maxBlockAge = maxBlockAge;
this._numOfPairsToBatch = numOfPairsToBatch;
}

public async startDataSync(): Promise<void> {
Expand All @@ -43,62 +46,46 @@ export class ChainSync {
logger.debug('startDataSync - cache is new', arguments);
// cache starts from scratch so we want to avoid getting events from the beginning of time
this._chainCache.applyBatchedUpdates(blockNumber, [], [], [], [], []);
} else if (
this._maxBlockAge !== undefined &&
blockNumber - latestBlockInCache > this._maxBlockAge
) {
logger.debug(
`startDataSync - cache is too old: current block ${blockNumber}, cache block ${latestBlockInCache}`,
arguments
);
// cache is too old so we want to clear it and avoid getting events from the beginning of time
this._chainCache.clear(true);
this._chainCache.applyBatchedUpdates(blockNumber, [], [], [], [], []);
}

// let's fetch all pairs from the chain and set them to the cache - to be used by the following syncs
await this._updatePairsFromChain();
await this._updateUncachedPairsFromChain();

// _populateFeesData() should run first, before _populatePairsData() gets to manipulate the pairs list
await Promise.all([
this._populateFeesData(this._pairs),
this._populateFeesData(this._uncachedPairs),
this._populatePairsData(),
this._syncEvents(),
]);
}

// reads all pairs from chain and sets to private field
private async _updatePairsFromChain() {
logger.debug('_updatePairsFromChain fetches pairs');
this._pairs = [...(await this._fetcher.pairs())];
logger.debug('_updatePairsFromChain fetched pairs', this._pairs);
private async _updateUncachedPairsFromChain() {
logger.debug('_updateUncachedPairsFromChain fetches pairs');
const pairs = await this._fetcher.pairs();
logger.debug('_updateUncachedPairsFromChain fetched pairs', pairs);
this._lastFetch = Date.now();
if (this._pairs.length === 0) {
if (pairs.length === 0) {
logger.error(
'_updatePairsFromChain fetched no pairs - this indicates a problem'
'_updateUncachedPairsFromChain fetched no pairs - this indicates a problem'
);
}

// let's filter the uncached pairs
this._uncachedPairs = pairs.filter(
(pair) => !this._chainCache.hasCachedPair(pair[0], pair[1])
);
}

private async _populateFeesData(
pairs: TokenPair[],
skipCache = false
): Promise<void> {
private async _populateFeesData(pairs: TokenPair[]): Promise<void> {
logger.debug('populateFeesData called');
if (pairs.length === 0) {
logger.error('populateFeesData called with no pairs - skipping');
logger.log('populateFeesData called with no pairs - skipping');
return;
}
const uncachedPairs = skipCache
? pairs
: pairs.filter(
(pair) => !this._chainCache.hasCachedPair(pair[0], pair[1])
);

if (uncachedPairs.length === 0) return;

const feeUpdates: [string, string, number][] =
await this._fetcher.pairsTradingFeePPM(uncachedPairs);
await this._fetcher.pairsTradingFeePPM(pairs);

logger.debug('populateFeesData fetched fee updates', feeUpdates);

Expand All @@ -109,11 +96,9 @@ export class ChainSync {

// `_populatePairsData` sets timeout and returns immediately. It does the following:
// 1. Fetches all token pairs from the fetcher
// 2. selects a pair that's not in the cache
// 3. fetches strategies for the pair
// 4. adds the pair to the cache
// 5. sets short timeout to continue with the next pair
// 6. if there are no more pairs, it sets a timeout to call itself again
// 2. fetches strategies for all uncached pairs
// 3. adds the pairs strategies to the cache
// 4. sets a timeout to call itself again
private async _populatePairsData(): Promise<void> {
logger.debug('_populatePairsData called');
// this indicates we want to poll for pairs only once a minute.
Expand All @@ -122,40 +107,33 @@ export class ChainSync {

const processPairs = async () => {
try {
if (this._pairs.length === 0) {
if (this._uncachedPairs.length === 0) {
// if we have no pairs we need to fetch - unless we're in slow poll mode and less than a minute has passed since last fetch
if (this._slowPollPairs && Date.now() - this._lastFetch < 60000) {
// go back to sleep
setTimeout(processPairs, 1000);
return;
}
await this._updatePairsFromChain();
await this._updateUncachedPairsFromChain();
}
// let's find the first pair that's not in the cache and clear it from the list along with all the items before it
const nextPairToSync = findAndRemoveLeading<TokenPair>(
this._pairs,
(pair) => !this._chainCache.hasCachedPair(pair[0], pair[1])
);
if (nextPairToSync) {
logger.debug('_populatePairsData adds pair to cache', nextPairToSync);
// we have a pair to sync - let's do it - add its strategies to the cache and then to minimal timeout to process the next pair
await this.syncPairData(
nextPairToSync[0],
nextPairToSync[1],
!this._initialSyncDone
);
setTimeout(processPairs, 1);
} else {
// list is now empty and there are no more pairs to sync - we can poll them less frequently
// we will wake up once a second just to check if we're still in slow poll mode,
// but if not - we will actually poll once a minute

if (this._uncachedPairs.length > 0) {
logger.debug(
'_populatePairsData handled all pairs and goes to slow poll mode'
'_populatePairsData will now sync data for',
this._uncachedPairs
);
this._slowPollPairs = true;
this._initialSyncDone = true;
setTimeout(processPairs, 1000);
// we have pairs to sync - let's split them into batches - add their strategies to the cache and go into slow poll mode
await this._syncPairDataBatch();
}
// list is now empty and there are no more pairs to sync - we can poll them less frequently
// we will wake up once a second just to check if we're still in slow poll mode,
// but if not - we will actually poll once a minute
logger.debug(
'_populatePairsData handled all pairs and goes to slow poll mode'
);
this._slowPollPairs = true;
setTimeout(processPairs, 1000);
return;
} catch (e) {
logger.error('Error while syncing pairs data', e);
setTimeout(processPairs, 60000);
Expand All @@ -164,19 +142,39 @@ export class ChainSync {
setTimeout(processPairs, 1);
}

public async syncPairData(
token0: string,
token1: string,
noPairAddedEvent: boolean = false
): Promise<void> {
private async _syncPairDataBatch(): Promise<void> {
// Split all uncached pairs into batches
const batches: TokenPair[][] = [];
for (
let i = 0;
i < this._uncachedPairs.length;
i += this._numOfPairsToBatch
) {
batches.push(this._uncachedPairs.slice(i, i + this._numOfPairsToBatch));
}
this._uncachedPairs = [];
const strategiesBatches = await Promise.all(
batches.map((batch) => this._fetcher.strategiesByPairs(batch))
);
strategiesBatches.flat().forEach((pairStrategies) => {
this._chainCache.addPair(
pairStrategies.pair[0],
pairStrategies.pair[1],
pairStrategies.strategies,
true
);
});
}

public async syncPairData(token0: string, token1: string): Promise<void> {
if (!this._syncCalled) {
throw new Error(
'ChainSync.startDataSync() must be called before syncPairData()'
);
}
const strategies = await this._fetcher.strategiesByPair(token0, token1);
if (this._chainCache.hasCachedPair(token0, token1)) return;
this._chainCache.addPair(token0, token1, strategies, noPairAddedEvent);
this._chainCache.addPair(token0, token1, strategies, false);
}

// used to break the blocks between latestBlock + 1 and currentBlock to chunks of 1000 blocks
Expand Down Expand Up @@ -348,18 +346,15 @@ export class ChainSync {
logger.debug(
'_syncEvents noticed at least one default fee update - refetching pair fees for all pairs'
);
await this._populateFeesData(
[...(await this._fetcher.pairs())],
true
);
await this._populateFeesData([...(await this._fetcher.pairs())]);
}
if (newlyCreatedPairs.length > 0) {
logger.debug(
'_syncEvents noticed at least one new pair created - setting slow poll mode to false'
);
this._slowPollPairs = false;
logger.debug('_syncEvents fetching fees for the new pairs');
await this._populateFeesData(newlyCreatedPairs, true);
await this._populateFeesData(newlyCreatedPairs);
}
}
} catch (err) {
Expand All @@ -371,9 +366,8 @@ export class ChainSync {
setTimeout(processEvents, 1);
}
private _resetPairsFetching() {
this._pairs = [];
this._uncachedPairs = [];
this._slowPollPairs = false;
this._initialSyncDone = false;
}

private async _detectReorg(currentBlock: number): Promise<boolean> {
Expand Down
5 changes: 3 additions & 2 deletions src/chain-cache/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export * from './types';
* use the ChainCache and ChainSync classes directly.
* @param {Fetcher} fetcher - fetcher to use for syncing the cache
* @param {string} cachedData - serialized cache data to initialize the cache with
* @param {number} numOfPairsToBatch - number of pairs to fetch in a single batch - adapt this value based on the RPC limits and testing
* @returns an object with the initialized cache and a function to start syncing the cache
* @example
* const { cache, startDataSync } = initSyncedCache(fetcher, cachedData);
Expand All @@ -20,7 +21,7 @@ export * from './types';
export const initSyncedCache = (
fetcher: Fetcher,
cachedData?: string,
maxBlockAge?: number
numOfPairsToBatch?: number
): { cache: ChainCache; startDataSync: () => Promise<void> } => {
let cache: ChainCache | undefined;
if (cachedData) {
Expand All @@ -31,7 +32,7 @@ export const initSyncedCache = (
cache = new ChainCache();
}

const syncer = new ChainSync(fetcher, cache, maxBlockAge);
const syncer = new ChainSync(fetcher, cache, numOfPairsToBatch);
cache.setCacheMissHandler(syncer.syncPairData.bind(syncer));
return { cache, startDataSync: syncer.startDataSync.bind(syncer) };
};
12 changes: 0 additions & 12 deletions src/chain-cache/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,6 @@ export const toDirectionKey = (token0: string, token1: string): string => {
return toKey([token0, token1]);
};

// find and return an element in an array, and remove it and all elements before it. If not found, remove all elements.
export const findAndRemoveLeading = <T>(
arr: T[],
predicate: (value: T) => boolean
): T | undefined => {
let element = undefined;
do {
element = arr.shift();
} while (element && !predicate(element));
return element;
};

export function isOrderTradable(order: EncodedOrder): boolean {
return order.y.gt(0) && (order.A.gt(0) || order.B.gt(0));
}
6 changes: 6 additions & 0 deletions src/common/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,12 @@ export type BlockMetadata = {
export interface Fetcher {
pairs(): Promise<TokenPair[]>;
strategiesByPair(token0: string, token1: string): Promise<EncodedStrategy[]>;
strategiesByPairs(pairs: TokenPair[]): Promise<
{
pair: TokenPair;
strategies: EncodedStrategy[];
}[]
>;
pairTradingFeePPM(token0: string, token1: string): Promise<number>;
pairsTradingFeePPM(pairs: TokenPair[]): Promise<[string, string, number][]>;
tradingFeePPM(): Promise<number>;
Expand Down
26 changes: 26 additions & 0 deletions src/contracts-api/Reader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,32 @@ export default class Reader implements Fetcher {
return res.map((r) => toStrategy(r));
}

// TODO: add a method to get all strategies by a list of pairs. Returns a collection of pairs and their strategies. It will use multicall to call strategiesByPair method from the contracts.
public async strategiesByPairs(pairs: TokenPair[]): Promise<
{
pair: TokenPair;
strategies: EncodedStrategy[];
}[]
> {
const results = await this._multicall(
pairs.map((pair) => ({
contractAddress: this._contracts.carbonController.address,
interface: this._contracts.carbonController.interface,
methodName: 'strategiesByPair',
methodParameters: [pair[0], pair[1], 0, 0],
}))
);
if (!results || results.length === 0) return [];
console.debug('results', results);
return results.map((result, i) => {
const strategiesResult = result[0] as StrategyStructOutput[];
return {
pair: pairs[i],
strategies: strategiesResult.map((r) => toStrategy(r)),
};
});
}

public async tokensByOwner(owner: string) {
if (!owner) return [];

Expand Down

0 comments on commit b14cb50

Please sign in to comment.