Skip to content

Commit

Permalink
Re add Kafka support
Browse files Browse the repository at this point in the history
  • Loading branch information
Ktl-XV committed Mar 26, 2024
1 parent 32357fc commit 8456897
Showing 1 changed file with 33 additions and 6 deletions.
39 changes: 33 additions & 6 deletions src/scripts/pull_and_save_block_events.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {
BLOCKS_REORG_CHECK_INCREMENT,
CHAIN_NAME,
CHAIN_NAME_LOWER,
EVM_RPC_URL,
MAX_BLOCKS_REORG,
MAX_BLOCKS_TO_PULL,
Expand All @@ -16,7 +17,7 @@ import {
import { Block, Transaction, TransactionReceipt } from '../entities';
import { eventScrperProps, EventScraperProps } from '../events';
import { parseBlock, parseTransaction, parseTransactionReceipt } from '../parsers/web3/parse_web3_objects';
import { chunk, logger } from '../utils';
import { chunk, kafkaSendRawAsync, logger } from '../utils';
import {
CURRENT_BLOCK,
LATEST_SCRAPED_BLOCK,
Expand Down Expand Up @@ -167,7 +168,12 @@ function findRanges(nums: number[]): BlockRange[] {
return ranges;
}

async function saveFullBlocks(connection: Connection, eventTables: string[], parsedFullBlocks: ParsedFullBlock[]) {
async function saveFullBlocks(
connection: Connection,
producer: Producer | null,
eventTables: string[],
parsedFullBlocks: ParsedFullBlock[],
) {
const parsedBlocks = parsedFullBlocks.map((block) => block.parsedBlock);
const parsedTransactions = parsedFullBlocks.map((block) => block.parsedTransactions).flat();
const parsedTransactionReceipts = parsedFullBlocks.map((block) => block.parsedTransactionReceipts).flat();
Expand Down Expand Up @@ -215,37 +221,53 @@ async function saveFullBlocks(connection: Connection, eventTables: string[], par

// Insert

const promises: Promise<InsertResult>[] = [];
const promises: Promise<InsertResult | void>[] = [];
/// Blocks
SAVED_RESULTS.labels({ type: 'block' }).inc(parsedBlocks.length);
for (const chunkItems of chunk(parsedBlocks, 300)) {
promises.push(queryRunner.manager.insert(Block, chunkItems));
}
const blocksTopic = `event-scraper.${CHAIN_NAME_LOWER}.blocks.v0`;
promises.push(kafkaSendRawAsync(producer, blocksTopic, ['blockNumber'], parsedBlocks));

/// Transactions
SAVED_RESULTS.labels({ type: 'transactions' }).inc(parsedTransactions.length);
for (const chunkItems of chunk(parsedTransactions, 300)) {
promises.push(queryRunner.manager.insert(Transaction, chunkItems));
}
const transactionsTopic = `event-scraper.${CHAIN_NAME_LOWER}.transactions.v0`;
promises.push(
kafkaSendRawAsync(producer, transactionsTopic, ['blockNumber', 'transactionHash'], parsedTransactions),
);

/// TransactionReceipts
SAVED_RESULTS.labels({ type: 'transactionReceipts' }).inc(parsedTransactionReceipts.length);
for (const chunkItems of chunk(parsedTransactionReceipts, 300)) {
promises.push(queryRunner.manager.insert(TransactionReceipt, chunkItems));
}
const transactionReceiptsTopic = `event-scraper.${CHAIN_NAME_LOWER}.transaction-receipts.v0`;
promises.push(
kafkaSendRawAsync(
producer,
transactionReceiptsTopic,
['blockNumber', 'transactionHash'],
parsedTransactionReceipts,
),
);

/// Events
parsedEventsByType.forEach(async (typedEvents: TypedEvents) => {
SAVED_RESULTS.labels({ type: 'event', event: typedEvents.eventName }).inc(typedEvents.events.length);
for (const chunkItems of chunk(typedEvents.events, 300)) {
promises.push(queryRunner.manager.insert(typedEvents.eventType, chunkItems as any[]));
}

const topic = `event-scraper.${CHAIN_NAME_LOWER}.events.${pascalToSnake(typedEvents.eventName)}.v0`;
promises.push(kafkaSendRawAsync(producer, topic, ['blockHash', 'logIndex'], typedEvents.events));
});

await Promise.all(promises);
await queryRunner.commitTransaction();

// TODO: Add Kafka support if we need it again
} catch (err) {
if (err instanceof QueryFailedError && err.message === 'could not serialize access due to concurrent update') {
logger.warn('Simultaneous write attempt, will retry on the next run');
Expand All @@ -258,6 +280,11 @@ async function saveFullBlocks(connection: Connection, eventTables: string[], par
await queryRunner.release();
}
}

function pascalToSnake(original: string): string {
return original.replace(/\.?([A-Z]+)/g, (_, cap) => '-' + cap.toLowerCase()).replace(/^-/, '');
}

async function getParseSaveBlocksTransactionsEvents(
connection: Connection,
producer: Producer | null,
Expand Down Expand Up @@ -326,7 +353,7 @@ async function getParseSaveBlocksTransactionsEvents(
.filter((props) => props.enabled)
.map((props: EventScraperProps) => props.table);

await saveFullBlocks(connection, eventTables, parsedFullBlocks);
await saveFullBlocks(connection, producer, eventTables, parsedFullBlocks);

if (FEAT_TOKENS_FROM_TRANSFERS) {
const tokensFromTransfers = [
Expand Down

0 comments on commit 8456897

Please sign in to comment.