Skip to content

Commit

Permalink
Merge pull request #255 from lidofinance/chore/check-slot-finality
Browse files Browse the repository at this point in the history
chore: add slot finality checking
  • Loading branch information
AlexanderLukin authored Aug 26, 2024
2 parents 826d40a + d92713f commit 3b1b9ef
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 88 deletions.
2 changes: 1 addition & 1 deletion src/app/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import { LoggerModule } from 'common/logger';
import { PrometheusModule } from 'common/prometheus';
import { ClickhouseModule } from 'storage/clickhouse';

import { InspectorModule } from '../inspector';
import { AppService } from './app.service';
import { InspectorModule } from '../inspector';

@Module({
imports: [LoggerModule, HealthModule, ConfigModule, PrometheusModule, ClickhouseModule, InspectorModule],
Expand Down
2 changes: 1 addition & 1 deletion src/app/app.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import * as buildInfo from 'build-info';
import { ConfigService } from 'common/config';
import { PrometheusService } from 'common/prometheus';

import { InspectorService } from '../inspector';
import { APP_NAME } from './app.constants';
import { InspectorService } from '../inspector';

@Injectable()
export class AppService implements OnModuleInit, OnApplicationBootstrap {
Expand Down
93 changes: 27 additions & 66 deletions src/common/consensus-provider/consensus-provider.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ import { request } from 'undici';
import { IncomingHttpHeaders } from 'undici/types/header';
import BodyReadable from 'undici/types/readable';

import { ConfigService } from 'common/config';
import { range } from 'common/functions/range';
import { ConfigService, WorkingMode } from 'common/config';
import { rejectDelay } from 'common/functions/rejectDelay';
import { retrier } from 'common/functions/retrier';
import { urljoin } from 'common/functions/urljoin';
Expand All @@ -16,15 +15,7 @@ import { EpochProcessingState } from 'storage/clickhouse';

import { BlockCache, BlockCacheService } from './block-cache';
import { MaxDeepError, ResponseError, errCommon, errRequest } from './errors';
import {
BlockHeaderResponse,
BlockInfoResponse,
FinalityCheckpointsResponse,
GenesisResponse,
ProposerDutyInfo,
SyncCommitteeInfo,
VersionResponse,
} from './intefaces';
import { BlockHeaderResponse, BlockInfoResponse, GenesisResponse, ProposerDutyInfo, SyncCommitteeInfo, VersionResponse } from './intefaces';
import { BlockId, Epoch, Slot, StateId } from './types';

let ssz: typeof import('@lodestar/types').ssz;
Expand All @@ -41,6 +32,7 @@ interface RequestRetryOptions {
@Injectable()
export class ConsensusProviderService {
protected apiUrls: string[];
protected workingMode: string;
protected version = '';
protected genesisTime = 0;
protected defaultMaxSlotDeepCount = 32;
Expand All @@ -49,7 +41,6 @@ export class ConsensusProviderService {
protected endpoints = {
version: 'eth/v1/node/version',
genesis: 'eth/v1/beacon/genesis',
beaconHeadFinalityCheckpoints: 'eth/v1/beacon/states/head/finality_checkpoints',
blockInfo: (blockId: BlockId): string => `eth/v2/beacon/blocks/${blockId}`,
beaconHeaders: (blockId: BlockId): string => `eth/v1/beacon/headers/${blockId}`,
attestationCommittees: (stateId: StateId, epoch: Epoch): string => `eth/v1/beacon/states/${stateId}/committees?epoch=${epoch}`,
Expand All @@ -65,6 +56,7 @@ export class ConsensusProviderService {
protected readonly cache: BlockCacheService,
) {
this.apiUrls = config.get('CL_API_URLS') as NonEmptyArray<string>;
this.workingMode = config.get('WORKING_MODE');
}

public async getVersion(): Promise<string> {
Expand All @@ -88,29 +80,23 @@ export class ConsensusProviderService {
return (this.genesisTime = genesisTime);
}

public async getFinalizedEpoch(): Promise<Epoch> {
return Number(
(
await this.retryRequest<FinalityCheckpointsResponse>(async (apiURL: string) =>
this.apiGet(apiURL, this.endpoints.beaconHeadFinalityCheckpoints),
)
).finalized.epoch,
);
}

public async getLatestBlockHeader(processingState: EpochProcessingState): Promise<BlockHeaderResponse | void> {
const latestFrom = this.config.get('WORKING_MODE');
return await this.retryRequest<BlockHeaderResponse>(
async (apiURL: string) => this.apiGet(apiURL, this.endpoints.beaconHeaders(latestFrom)),
async (apiURL: string) => this.apiGet(apiURL, this.endpoints.beaconHeaders(this.workingMode)),
{
maxRetries: this.config.get('CL_API_GET_BLOCK_INFO_MAX_RETRIES'),
useFallbackOnResolved: (r) => {
if (this.workingMode === WorkingMode.Finalized && r.hasOwnProperty('finalized') && !r.finalized) {
this.logger.error(`getLatestBlockHeader: slot [${r.data.header.message.slot}] is not finalized`);
return true;
}

const nodeLatestSlot = Number(r.data.header.message.slot);

if (nodeLatestSlot < this.latestSlot.slot) {
// we assume that the node must never return a slot less than the last saved slot
this.logger.error(
`Received ${latestFrom} slot [${nodeLatestSlot}] is less than last [${this.latestSlot.slot}] slot received before, but shouldn't`,
`Received ${this.workingMode} slot [${nodeLatestSlot}] is less than last [${this.latestSlot.slot}] slot received before, but shouldn't`,
);
return true;
}
Expand Down Expand Up @@ -229,46 +215,6 @@ export class ConsensusProviderService {
return (await this.getPreviousNotMissedBlockHeader(dutyRootSlot, this.defaultMaxSlotDeepCount, ignoreCache)).root;
}

/**
* Trying to get nearest block with slot attestation info.
* Assumed that the ideal attestation is included in the next non-missed block
*/
public async getBlockInfoWithSlotAttestations(
slot: Slot,
maxDeep = this.defaultMaxSlotDeepCount,
): Promise<[BlockInfoResponse | undefined, Array<number>]> {
const nearestBlockIncludedAttestations = slot + 1; // good attestation should be included to the next block
let blockInfo;
let missedSlots: number[] = [];
try {
blockInfo = await this.getNextNotMissedBlockInfo(nearestBlockIncludedAttestations, maxDeep);
} catch (e) {
if (e instanceof MaxDeepError) {
this.logger.error(`Error when trying to get nearest block with attestations for slot ${slot}: from ${slot} to ${slot + maxDeep}`);
missedSlots = range(nearestBlockIncludedAttestations, nearestBlockIncludedAttestations + maxDeep + 1);
} else {
throw e;
}
}

if (blockInfo && nearestBlockIncludedAttestations != Number(blockInfo.message.slot)) {
missedSlots = range(nearestBlockIncludedAttestations, Number(blockInfo.message.slot));
}
return [blockInfo, missedSlots];
}

public async getNextNotMissedBlockInfo(slot: Slot, maxDeep = this.defaultMaxSlotDeepCount): Promise<BlockInfoResponse | undefined> {
const blockInfo = await this.getBlockInfo(slot);
if (!blockInfo) {
if (maxDeep < 1) {
throw new MaxDeepError(`Error when trying to get next not missed block info. From ${slot} to ${slot + maxDeep}`);
}
this.logger.log(`Try to get next info from ${slot + 1} slot because ${slot} is missing`);
return await this.getNextNotMissedBlockInfo(slot + 1, maxDeep - 1);
}
return blockInfo;
}

public async getState(stateId: StateId): Promise<ContainerTreeViewType<typeof anySsz.BeaconState.fields>> {
const { body, headers } = await this.retryRequest<{ body: BodyReadable; headers: IncomingHttpHeaders }>(
async (apiURL: string) => await this.apiGetStream(apiURL, this.endpoints.state(stateId), { accept: 'application/octet-stream' }),
Expand All @@ -294,6 +240,13 @@ export class ConsensusProviderService {
async (apiURL: string) => this.apiGet(apiURL, this.endpoints.blockInfo(blockId)),
{
maxRetries: this.config.get('CL_API_GET_BLOCK_INFO_MAX_RETRIES'),
useFallbackOnResolved: (r) => {
if (this.workingMode === WorkingMode.Finalized && blockId !== 'head' && r.hasOwnProperty('finalized') && !r.finalized) {
this.logger.error(`getBlockInfo: slot [${r.data.message.slot}] is not finalized`);
return true;
}
return false;
},
useFallbackOnRejected: (last_fallback_err, curr_fallback_error) => {
if (last_fallback_err && last_fallback_err.$httpCode == 404 && curr_fallback_error.$httpCode != 404) {
this.logger.debug('Request error from last fallback was 404, but current is not. Will be used previous error');
Expand Down Expand Up @@ -325,7 +278,15 @@ export class ConsensusProviderService {
}

public async getSyncCommitteeInfo(stateId: StateId, epoch: Epoch): Promise<SyncCommitteeInfo> {
return await this.retryRequest(async (apiURL: string) => this.apiGet(apiURL, this.endpoints.syncCommittee(stateId, epoch)));
return await this.retryRequest(async (apiURL: string) => this.apiGet(apiURL, this.endpoints.syncCommittee(stateId, epoch)), {
useFallbackOnResolved: (r) => {
if (this.workingMode === WorkingMode.Finalized && stateId !== 'head' && r.hasOwnProperty('finalized') && !r.finalized) {
this.logger.error(`getSyncCommitteeInfo: state ${stateId} for epoch ${epoch} is not finalized`);
return true;
}
return false;
},
});
}

public async getCanonicalProposerDuties(epoch: Epoch, maxRetriesForGetCanonical = 3, ignoreCache = false): Promise<ProposerDutyInfo[]> {
Expand Down
17 changes: 1 addition & 16 deletions src/common/consensus-provider/intefaces/response.interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,21 +69,6 @@ export interface Withdrawal {
amount: string;
}

export interface FinalityCheckpointsResponse {
previous_justified: {
epoch: string;
root: RootHex;
};
current_justified: {
epoch: string;
root: RootHex;
};
finalized: {
epoch: string;
root: RootHex;
};
}

export interface GenesisResponse {
/**
* example: 1590832934
Expand Down Expand Up @@ -132,7 +117,7 @@ export interface BeaconBlockAttestation {
export interface StateValidatorResponse {
index: string;
balance: string;
status: typeof ValStatus[keyof typeof ValStatus];
status: (typeof ValStatus)[keyof typeof ValStatus];
validator: {
pubkey: string;
withdrawal_credentials: string;
Expand Down
4 changes: 2 additions & 2 deletions src/duty/duty.metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ import { ConfigService } from 'common/config';
import { allSettled } from 'common/functions/allSettled';
import { PrometheusService, TrackTask } from 'common/prometheus';

import { Epoch } from '../common/consensus-provider/types';
import { ClickhouseService } from '../storage';
import { AttestationMetrics } from './attestation';
import { ProposeMetrics } from './propose';
import { StateMetrics } from './state';
import { SummaryMetrics } from './summary';
import { SyncMetrics } from './sync';
import { WithdrawalsMetrics } from './withdrawal';
import { Epoch } from '../common/consensus-provider/types';
import { ClickhouseService } from '../storage';

@Injectable()
export class DutyMetrics {
Expand Down
2 changes: 1 addition & 1 deletion src/duty/duty.rewards.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import { ConfigService } from 'common/config';
import { allSettled } from 'common/functions/allSettled';
import { PrometheusService, TrackTask } from 'common/prometheus';

import { Epoch } from '../common/consensus-provider/types';
import { AttestationRewards } from './attestation';
import { ProposeRewards } from './propose';
import { SyncRewards } from './sync';
import { Epoch } from '../common/consensus-provider/types';

@Injectable()
export class DutyRewards {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import { batch } from 'stream-json/utils/Batch';

import { unblock } from 'common/functions/unblock';

import { RegistrySource, RegistrySourceKey, RegistrySourceOperator } from '../registry-source.interface';
import { KeysapiSourceClient } from './keysapi-source.client';
import { RegistrySource, RegistrySourceKey, RegistrySourceOperator } from '../registry-source.interface';

@Injectable()
export class KeysapiSourceService implements RegistrySource {
Expand Down

0 comments on commit 3b1b9ef

Please sign in to comment.