Skip to content

Commit

Permalink
feat: ibc app
Browse files Browse the repository at this point in the history
  • Loading branch information
phamphong9981 committed Aug 15, 2023
1 parent f1344e0 commit 89c4bd0
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 10 deletions.
5 changes: 5 additions & 0 deletions ci/config.json.ci
Original file line number Diff line number Diff line change
Expand Up @@ -179,5 +179,10 @@
"key": "crawlIbcTao",
"millisecondRepeatJob": 2000,
"blocksPerCall": 100
},
"crawlIbcApp": {
"key": "crawlIbcApp",
"millisecondRepeatJob": 2000,
"blocksPerCall": 100
}
}
19 changes: 11 additions & 8 deletions migrations/20230810094108_create_ibc_tx_model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@ export async function up(knex: Knex): Promise<void> {
await knex.schema.createTable('ibc_message', (table) => {
table.increments();
table.integer('transaction_message_id').index();
table.string('src_channel_id').notNullable();
table.string('src_port_id').notNullable();
table.string('dst_channel_id').notNullable();
table.string('dst_port_id').notNullable();
table.string('type').notNullable();
table.integer('sequence').notNullable();
table.string('sequence_key').notNullable();
table.foreign('ibc_client_id').references('ibc_client.id');
table.string('src_channel_id').notNullable().index();
table.string('src_port_id').notNullable().index();
table.string('dst_channel_id').notNullable().index();
table.string('dst_port_id').notNullable().index();
table.string('type').notNullable().index();
table.integer('sequence').notNullable().index();
table.string('sequence_key').notNullable().index();
table.jsonb('data');
table
.foreign('transaction_message_id')
.references('transaction_message.id');
});
}

Expand Down
2 changes: 2 additions & 0 deletions src/models/event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ export class Event extends BaseModel {

source!: string;

attributes!: EventAttribute[];

static get tableName() {
return 'event';
}
Expand Down
6 changes: 6 additions & 0 deletions src/models/event_attribute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,5 +119,11 @@ export class EventAttribute extends BaseModel {
USE_FEEGRANT_GRANTEE: 'use_feegrant.grantee',
TX_FEE: 'tx.fee',
TX_FEE_PAYER: 'tx.fee_payer',
DATA_HEX: 'packet_data_hex',
SEQUENCE: 'packet_sequence',
SRC_PORT: 'packet_src_port',
SRC_CHANNEL: 'packet_src_channel',
DST_PORT: 'packet_dst_port',
DST_CHANNEL: 'packet_dst_channel',
};
}
96 changes: 96 additions & 0 deletions src/models/ibc_message.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/* eslint-disable import/no-cycle */
import { Model } from 'objection';
import BaseModel from './base';
import { IbcChannel } from './ibc_channel';
import { TransactionMessage } from './transaction_message';

export class IbcMessage extends BaseModel {
[relation: string]: any | any[];

id!: number;

transaction_message_id!: number;

src_channel_id!: string;

src_port_id!: string;

dst_channel_id!: string;

dst_port_id!: string;

type!: string;

sequence!: number;

sequence_key!: string;

data!: any;

static get tableName() {
return 'ibc_message';
}

static get jsonSchema() {
return {
type: 'object',
required: [
'transaction_message_id',
'src_channel_id',
'src_port_id',
'dst_channel_id',
'dst_port_id',
'type',
'sequence',
'sequence_key',
],
properties: {
transaction_message_id: { type: 'number' },
src_channel_id: { type: 'string' },
src_port_id: { type: 'string' },
dst_channel_id: { type: 'string' },
dst_port_id: { type: 'string' },
type: { type: 'string' },
sequence: { type: 'number' },
sequence_key: { type: 'string' },
status: { type: 'boolean' },
},
};
}

static get relationMappings() {
return {
message: {
relation: Model.BelongsToOneRelation,
modelClass: TransactionMessage,
join: {
from: 'ibc_message.transaction_message_id',
to: 'transaction_message.id',
},
},
src_channel: {
relation: Model.BelongsToOneRelation,
modelClass: IbcChannel,
join: {
from: 'ibc_message.src_channel_id',
to: 'ibc_channel.id',
},
},
dst_channel: {
relation: Model.BelongsToOneRelation,
modelClass: IbcChannel,
join: {
from: 'ibc_message.dst_channel_id',
to: 'ibc_channel.id',
},
},
};
}

static EVENT_TYPE = {
SEND_PACKET: 'send_packet',
RECV_PACKET: 'recv_packet',
ACKNOWLEDGE_PACKET: 'acknowledge_packet',
TIMEOUT_PACKET: 'timeout_packet',
};
}
1 change: 1 addition & 0 deletions src/models/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ export * from './cw20_total_holder_stats';
export * from './ibc_client';
export * from './ibc_connection';
export * from './ibc_channel';
export * from './ibc_message';
95 changes: 93 additions & 2 deletions src/services/ibc/crawl_ibc_app.service.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
import { fromHex, fromUtf8 } from '@cosmjs/encoding';
import { Service } from '@ourparentcenter/moleculer-decorators-extended';
import { Knex } from 'knex';
import { ServiceBroker } from 'moleculer';
import knex from '../../common/utils/db_connection';
import config from '../../../config.json' assert { type: 'json' };
import BullableService, { QueueHandler } from '../../base/bullable.service';
import { BULL_JOB_NAME, SERVICE } from '../../common';
import { BlockCheckpoint } from '../../models';
import { getAttributeFrom } from '../../common/utils/smart_contract';
import {
BlockCheckpoint,
Event,
EventAttribute,
IbcMessage,
} from '../../models';

@Service({
name: SERVICE.V1.CrawlIBCAppService.key,
Expand All @@ -29,6 +38,88 @@ export default class CrawlIbcAppService extends BullableService {
`Handle IBC/APP, startHeight: ${startHeight}, endHeight: ${endHeight}`
);
if (startHeight > endHeight) return;
this.logger.info(updateBlockCheckpoint);
const events = await Event.query()
.withGraphFetched('attributes')
.joinRelated('message')
.select('event.id', 'event.type', 'message.id as message_id')
.whereIn('event.type', [
IbcMessage.EVENT_TYPE.ACKNOWLEDGE_PACKET,
IbcMessage.EVENT_TYPE.RECV_PACKET,
IbcMessage.EVENT_TYPE.SEND_PACKET,
IbcMessage.EVENT_TYPE.TIMEOUT_PACKET,
])
.andWhere('event.block_height', '>', startHeight)
.andWhere('event.block_height', '<=', endHeight)
.orderBy('event.id');
await knex.transaction(async (trx) => {
await this.handleIbcMessage(events, trx);
updateBlockCheckpoint.height = endHeight;
await BlockCheckpoint.query()
.transacting(trx)
.insert(updateBlockCheckpoint)
.onConflict('job_name')
.merge();
});
}

async handleIbcMessage(events: Event[], trx: Knex.Transaction) {
const ibcMessage = events.map((event) => {
const srcChannel = getAttributeFrom(
event.attributes,
EventAttribute.ATTRIBUTE_KEY.SRC_CHANNEL
);
const srcPort = getAttributeFrom(
event.attributes,
EventAttribute.ATTRIBUTE_KEY.SRC_PORT
);
const dstChannel = getAttributeFrom(
event.attributes,
EventAttribute.ATTRIBUTE_KEY.DST_CHANNEL
);
const dstPort = getAttributeFrom(
event.attributes,
EventAttribute.ATTRIBUTE_KEY.DST_PORT
);
const sequence = getAttributeFrom(
event.attributes,
EventAttribute.ATTRIBUTE_KEY.SEQUENCE
);
const dataHex = getAttributeFrom(
event.attributes,
EventAttribute.ATTRIBUTE_KEY.DATA_HEX
);
return IbcMessage.fromJson({
transaction_message_id: event.message_id,
src_channel_id: srcChannel,
src_port_id: srcPort,
dst_channel_id: dstChannel,
dst_port_id: dstPort,
type: event.type,
sequence,
sequence_key: `${srcChannel}.${srcPort}.${dstChannel}.${dstPort}.${sequence}`,
data: fromUtf8(fromHex(dataHex)),
});
});
if (ibcMessage.length > 0) {
await IbcMessage.query().insert(ibcMessage).transacting(trx);
}
}

async _start(): Promise<void> {
await this.createJob(
BULL_JOB_NAME.CRAWL_IBC_APP,
BULL_JOB_NAME.CRAWL_IBC_APP,
{},
{
removeOnComplete: true,
removeOnFail: {
count: 3,
},
repeat: {
every: config.crawlIbcApp.millisecondRepeatJob,
},
}
);
return super._start();
}
}

0 comments on commit 89c4bd0

Please sign in to comment.