diff --git a/ci/config.json.ci b/ci/config.json.ci index afc6cd564..a2dd5d87c 100644 --- a/ci/config.json.ci +++ b/ci/config.json.ci @@ -179,5 +179,10 @@ "key": "crawlIbcTao", "millisecondRepeatJob": 2000, "blocksPerCall": 100 + }, + "crawlIbcApp": { + "key": "crawlIbcApp", + "millisecondRepeatJob": 2000, + "blocksPerCall": 100 } } diff --git a/migrations/20230810094108_create_ibc_tx_model.ts b/migrations/20230810094108_create_ibc_tx_model.ts index 0bc1f0d20..3a376120f 100644 --- a/migrations/20230810094108_create_ibc_tx_model.ts +++ b/migrations/20230810094108_create_ibc_tx_model.ts @@ -4,14 +4,17 @@ export async function up(knex: Knex): Promise { await knex.schema.createTable('ibc_message', (table) => { table.increments(); table.integer('transaction_message_id').index(); - table.string('src_channel_id').notNullable(); - table.string('src_port_id').notNullable(); - table.string('dst_channel_id').notNullable(); - table.string('dst_port_id').notNullable(); - table.string('type').notNullable(); - table.integer('sequence').notNullable(); - table.string('sequence_key').notNullable(); - table.foreign('ibc_client_id').references('ibc_client.id'); + table.string('src_channel_id').notNullable().index(); + table.string('src_port_id').notNullable().index(); + table.string('dst_channel_id').notNullable().index(); + table.string('dst_port_id').notNullable().index(); + table.string('type').notNullable().index(); + table.integer('sequence').notNullable().index(); + table.string('sequence_key').notNullable().index(); + table.jsonb('data'); + table + .foreign('transaction_message_id') + .references('transaction_message.id'); }); } diff --git a/src/models/event.ts b/src/models/event.ts index da9a05b6b..13dfe8e3e 100644 --- a/src/models/event.ts +++ b/src/models/event.ts @@ -20,6 +20,8 @@ export class Event extends BaseModel { source!: string; + attributes!: EventAttribute[]; + static get tableName() { return 'event'; } diff --git a/src/models/event_attribute.ts b/src/models/event_attribute.ts index 4660efc5e..45f172712 100644 --- a/src/models/event_attribute.ts +++ b/src/models/event_attribute.ts @@ -119,5 +119,11 @@ export class EventAttribute extends BaseModel { USE_FEEGRANT_GRANTEE: 'use_feegrant.grantee', TX_FEE: 'tx.fee', TX_FEE_PAYER: 'tx.fee_payer', + DATA_HEX: 'packet_data_hex', + SEQUENCE: 'packet_sequence', + SRC_PORT: 'packet_src_port', + SRC_CHANNEL: 'packet_src_channel', + DST_PORT: 'packet_dst_port', + DST_CHANNEL: 'packet_dst_channel', }; } diff --git a/src/models/ibc_message.ts b/src/models/ibc_message.ts new file mode 100644 index 000000000..54845c24d --- /dev/null +++ b/src/models/ibc_message.ts @@ -0,0 +1,96 @@ +/* eslint-disable import/no-cycle */ +import { Model } from 'objection'; +import BaseModel from './base'; +import { IbcChannel } from './ibc_channel'; +import { TransactionMessage } from './transaction_message'; + +export class IbcMessage extends BaseModel { + [relation: string]: any | any[]; + + id!: number; + + transaction_message_id!: number; + + src_channel_id!: string; + + src_port_id!: string; + + dst_channel_id!: string; + + dst_port_id!: string; + + type!: string; + + sequence!: number; + + sequence_key!: string; + + data!: any; + + static get tableName() { + return 'ibc_message'; + } + + static get jsonSchema() { + return { + type: 'object', + required: [ + 'transaction_message_id', + 'src_channel_id', + 'src_port_id', + 'dst_channel_id', + 'dst_port_id', + 'type', + 'sequence', + 'sequence_key', + ], + properties: { + transaction_message_id: { type: 'number' }, + src_channel_id: { type: 'string' }, + src_port_id: { type: 'string' }, + dst_channel_id: { type: 'string' }, + dst_port_id: { type: 'string' }, + type: { type: 'string' }, + sequence: { type: 'number' }, + sequence_key: { type: 'string' }, + status: { type: 'boolean' }, + }, + }; + } + + static get relationMappings() { + return { + message: { + relation: Model.BelongsToOneRelation, + modelClass: TransactionMessage, + join: { + from: 'ibc_message.transaction_message_id', + to: 'transaction_message.id', + }, + }, + src_channel: { + relation: Model.BelongsToOneRelation, + modelClass: IbcChannel, + join: { + from: 'ibc_message.src_channel_id', + to: 'ibc_channel.id', + }, + }, + dst_channel: { + relation: Model.BelongsToOneRelation, + modelClass: IbcChannel, + join: { + from: 'ibc_message.dst_channel_id', + to: 'ibc_channel.id', + }, + }, + }; + } + + static EVENT_TYPE = { + SEND_PACKET: 'send_packet', + RECV_PACKET: 'recv_packet', + ACKNOWLEDGE_PACKET: 'acknowledge_packet', + TIMEOUT_PACKET: 'timeout_packet', + }; +} diff --git a/src/models/index.ts b/src/models/index.ts index 02310cefe..bddfaad36 100644 --- a/src/models/index.ts +++ b/src/models/index.ts @@ -28,3 +28,4 @@ export * from './cw20_total_holder_stats'; export * from './ibc_client'; export * from './ibc_connection'; export * from './ibc_channel'; +export * from './ibc_message'; diff --git a/src/services/ibc/crawl_ibc_app.service.ts b/src/services/ibc/crawl_ibc_app.service.ts index 36f0df7eb..5da8eaa40 100644 --- a/src/services/ibc/crawl_ibc_app.service.ts +++ b/src/services/ibc/crawl_ibc_app.service.ts @@ -1,9 +1,18 @@ +import { fromHex, fromUtf8 } from '@cosmjs/encoding'; import { Service } from '@ourparentcenter/moleculer-decorators-extended'; +import { Knex } from 'knex'; import { ServiceBroker } from 'moleculer'; +import knex from '../../common/utils/db_connection'; import config from '../../../config.json' assert { type: 'json' }; import BullableService, { QueueHandler } from '../../base/bullable.service'; import { BULL_JOB_NAME, SERVICE } from '../../common'; -import { BlockCheckpoint } from '../../models'; +import { getAttributeFrom } from '../../common/utils/smart_contract'; +import { + BlockCheckpoint, + Event, + EventAttribute, + IbcMessage, +} from '../../models'; @Service({ name: SERVICE.V1.CrawlIBCAppService.key, @@ -29,6 +38,88 @@ export default class CrawlIbcAppService extends BullableService { `Handle IBC/APP, startHeight: ${startHeight}, endHeight: ${endHeight}` ); if (startHeight > endHeight) return; - this.logger.info(updateBlockCheckpoint); + const events = await Event.query() + .withGraphFetched('attributes') + .joinRelated('message') + .select('event.id', 'event.type', 'message.id as message_id') + .whereIn('event.type', [ + IbcMessage.EVENT_TYPE.ACKNOWLEDGE_PACKET, + IbcMessage.EVENT_TYPE.RECV_PACKET, + IbcMessage.EVENT_TYPE.SEND_PACKET, + IbcMessage.EVENT_TYPE.TIMEOUT_PACKET, + ]) + .andWhere('event.block_height', '>', startHeight) + .andWhere('event.block_height', '<=', endHeight) + .orderBy('event.id'); + await knex.transaction(async (trx) => { + await this.handleIbcMessage(events, trx); + updateBlockCheckpoint.height = endHeight; + await BlockCheckpoint.query() + .transacting(trx) + .insert(updateBlockCheckpoint) + .onConflict('job_name') + .merge(); + }); + } + + async handleIbcMessage(events: Event[], trx: Knex.Transaction) { + const ibcMessage = events.map((event) => { + const srcChannel = getAttributeFrom( + event.attributes, + EventAttribute.ATTRIBUTE_KEY.SRC_CHANNEL + ); + const srcPort = getAttributeFrom( + event.attributes, + EventAttribute.ATTRIBUTE_KEY.SRC_PORT + ); + const dstChannel = getAttributeFrom( + event.attributes, + EventAttribute.ATTRIBUTE_KEY.DST_CHANNEL + ); + const dstPort = getAttributeFrom( + event.attributes, + EventAttribute.ATTRIBUTE_KEY.DST_PORT + ); + const sequence = getAttributeFrom( + event.attributes, + EventAttribute.ATTRIBUTE_KEY.SEQUENCE + ); + const dataHex = getAttributeFrom( + event.attributes, + EventAttribute.ATTRIBUTE_KEY.DATA_HEX + ); + return IbcMessage.fromJson({ + transaction_message_id: event.message_id, + src_channel_id: srcChannel, + src_port_id: srcPort, + dst_channel_id: dstChannel, + dst_port_id: dstPort, + type: event.type, + sequence, + sequence_key: `${srcChannel}.${srcPort}.${dstChannel}.${dstPort}.${sequence}`, + data: fromUtf8(fromHex(dataHex)), + }); + }); + if (ibcMessage.length > 0) { + await IbcMessage.query().insert(ibcMessage).transacting(trx); + } + } + + async _start(): Promise { + await this.createJob( + BULL_JOB_NAME.CRAWL_IBC_APP, + BULL_JOB_NAME.CRAWL_IBC_APP, + {}, + { + removeOnComplete: true, + removeOnFail: { + count: 3, + }, + repeat: { + every: config.crawlIbcApp.millisecondRepeatJob, + }, + } + ); + return super._start(); } }