diff --git a/src/scripts/pull_and_save_block_events.ts b/src/scripts/pull_and_save_block_events.ts index 11d716ce..34efc4f5 100644 --- a/src/scripts/pull_and_save_block_events.ts +++ b/src/scripts/pull_and_save_block_events.ts @@ -1,6 +1,7 @@ import { BLOCKS_REORG_CHECK_INCREMENT, CHAIN_NAME, + CHAIN_NAME_LOWER, EVM_RPC_URL, MAX_BLOCKS_REORG, MAX_BLOCKS_TO_PULL, @@ -16,7 +17,7 @@ import { import { Block, Transaction, TransactionReceipt } from '../entities'; import { eventScrperProps, EventScraperProps } from '../events'; import { parseBlock, parseTransaction, parseTransactionReceipt } from '../parsers/web3/parse_web3_objects'; -import { chunk, logger } from '../utils'; +import { chunk, kafkaSendRawAsync, logger } from '../utils'; import { CURRENT_BLOCK, LATEST_SCRAPED_BLOCK, @@ -167,7 +168,12 @@ function findRanges(nums: number[]): BlockRange[] { return ranges; } -async function saveFullBlocks(connection: Connection, eventTables: string[], parsedFullBlocks: ParsedFullBlock[]) { +async function saveFullBlocks( + connection: Connection, + producer: Producer | null, + eventTables: string[], + parsedFullBlocks: ParsedFullBlock[], +) { const parsedBlocks = parsedFullBlocks.map((block) => block.parsedBlock); const parsedTransactions = parsedFullBlocks.map((block) => block.parsedTransactions).flat(); const parsedTransactionReceipts = parsedFullBlocks.map((block) => block.parsedTransactionReceipts).flat(); @@ -215,24 +221,39 @@ async function saveFullBlocks(connection: Connection, eventTables: string[], par // Insert - const promises: Promise[] = []; + const promises: Promise[] = []; /// Blocks SAVED_RESULTS.labels({ type: 'block' }).inc(parsedBlocks.length); for (const chunkItems of chunk(parsedBlocks, 300)) { promises.push(queryRunner.manager.insert(Block, chunkItems)); } + const blocksTopic = `event-scraper.${CHAIN_NAME_LOWER}.blocks.v0`; + promises.push(kafkaSendRawAsync(producer, blocksTopic, ['blockNumber'], parsedBlocks)); /// Transactions SAVED_RESULTS.labels({ type: 'transactions' }).inc(parsedTransactions.length); for (const chunkItems of chunk(parsedTransactions, 300)) { promises.push(queryRunner.manager.insert(Transaction, chunkItems)); } + const transactionsTopic = `event-scraper.${CHAIN_NAME_LOWER}.transactions.v0`; + promises.push( + kafkaSendRawAsync(producer, transactionsTopic, ['blockNumber', 'transactionHash'], parsedTransactions), + ); /// TransactionReceipts SAVED_RESULTS.labels({ type: 'transactionReceipts' }).inc(parsedTransactionReceipts.length); for (const chunkItems of chunk(parsedTransactionReceipts, 300)) { promises.push(queryRunner.manager.insert(TransactionReceipt, chunkItems)); } + const transactionReceiptsTopic = `event-scraper.${CHAIN_NAME_LOWER}.transaction-receipts.v0`; + promises.push( + kafkaSendRawAsync( + producer, + transactionReceiptsTopic, + ['blockNumber', 'transactionHash'], + parsedTransactionReceipts, + ), + ); /// Events parsedEventsByType.forEach(async (typedEvents: TypedEvents) => { @@ -240,12 +261,13 @@ async function saveFullBlocks(connection: Connection, eventTables: string[], par for (const chunkItems of chunk(typedEvents.events, 300)) { promises.push(queryRunner.manager.insert(typedEvents.eventType, chunkItems as any[])); } + + const topic = `event-scraper.${CHAIN_NAME_LOWER}.events.${pascalToSnake(typedEvents.eventName)}.v0`; + promises.push(kafkaSendRawAsync(producer, topic, ['blockHash', 'logIndex'], typedEvents.events)); }); await Promise.all(promises); await queryRunner.commitTransaction(); - - // TODO: Add Kafka support if we need it again } catch (err) { if (err instanceof QueryFailedError && err.message === 'could not serialize access due to concurrent update') { logger.warn('Simultaneous write attempt, will retry on the next run'); @@ -258,6 +280,11 @@ async function saveFullBlocks(connection: Connection, eventTables: string[], par await queryRunner.release(); } } + +function pascalToSnake(original: string): string { + return original.replace(/\.?([A-Z]+)/g, (_, cap) => '-' + cap.toLowerCase()).replace(/^-/, ''); +} + async function getParseSaveBlocksTransactionsEvents( connection: Connection, producer: Producer | null, @@ -326,7 +353,7 @@ async function getParseSaveBlocksTransactionsEvents( .filter((props) => props.enabled) .map((props: EventScraperProps) => props.table); - await saveFullBlocks(connection, eventTables, parsedFullBlocks); + await saveFullBlocks(connection, producer, eventTables, parsedFullBlocks); if (FEAT_TOKENS_FROM_TRANSFERS) { const tokensFromTransfers = [