Skip to content

Commit

Permalink
Caplin: make archive states downloadable (#12713)
Browse files Browse the repository at this point in the history
  • Loading branch information
Giulio2002 authored Nov 15, 2024
1 parent 7458bfc commit 5540117
Show file tree
Hide file tree
Showing 11 changed files with 48 additions and 16 deletions.
6 changes: 6 additions & 0 deletions cl/antiquary/antiquary.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,12 @@ func (a *Antiquary) Loop() error {
if err := a.sn.OpenFolder(); err != nil {
return err
}
if a.stateSn != nil {
if err := a.stateSn.OpenFolder(); err != nil {
return err
}
}

defer logInterval.Stop()
if from != a.sn.BlocksAvailable() && a.sn.BlocksAvailable() != 0 {
a.logger.Info("[Antiquary] Stopping Caplin to process historical indicies", "from", from, "to", a.sn.BlocksAvailable())
Expand Down
6 changes: 5 additions & 1 deletion cl/antiquary/state_antiquary.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,11 +488,15 @@ func (s *Antiquary) IncrementBeaconState(ctx context.Context, to uint64) error {
if err != nil {
return err
}

log.Info("Historical states antiquated", "slot", s.currentState.Slot(), "root", libcommon.Hash(stateRoot), "latency", endTime)
if s.snapgen {
if s.stateSn != nil {
if err := s.stateSn.OpenFolder(); err != nil {
return err
}
}

if s.snapgen {

// Keep gnosis out for a bit
if s.currentState.BeaconConfig().ConfigName == "gnosis" {
Expand Down
2 changes: 0 additions & 2 deletions cl/validator/committee_subscription/committee_subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ func NewCommitteeSubscribeManagement(
netConfig *clparams.NetworkConfig,
ethClock eth_clock.EthereumClock,
sentinel sentinel.SentinelClient,
state *state.CachingBeaconState,
aggregationPool aggregation.AggregationPool,
syncedData *synced_data.SyncedDataManager,
) *CommitteeSubscribeMgmt {
Expand All @@ -79,7 +78,6 @@ func NewCommitteeSubscribeManagement(
netConfig: netConfig,
ethClock: ethClock,
sentinel: sentinel,
state: state,
aggregationPool: aggregationPool,
syncedData: syncedData,
validatorSubs: make(map[uint64]*validatorSub),
Expand Down
2 changes: 1 addition & 1 deletion cmd/caplin/caplin1/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func RunCaplinService(ctx context.Context, engine execution_client.ExecutionEngi
return err
}
beaconRpc := rpc.NewBeaconRpcP2P(ctx, sentinel, beaconConfig, ethClock)
committeeSub := committee_subscription.NewCommitteeSubscribeManagement(ctx, indexDB, beaconConfig, networkConfig, ethClock, sentinel, state, aggregationPool, syncedDataManager)
committeeSub := committee_subscription.NewCommitteeSubscribeManagement(ctx, indexDB, beaconConfig, networkConfig, ethClock, sentinel, aggregationPool, syncedDataManager)
batchSignatureVerifier := services.NewBatchSignatureVerifier(ctx, sentinel)
// Define gossip services
blockService := services.NewBlockService(ctx, indexDB, forkChoice, syncedDataManager, ethClock, beaconConfig, emitters)
Expand Down
8 changes: 8 additions & 0 deletions erigon-lib/chain/snapcfg/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ func (p Preverified) Typed(types []snaptype.Type) Preverified {
continue
}

if strings.HasPrefix(p.Name, "caplin") {
bestVersions.Set(p.Name, p)
continue
}

var preferredVersion, minVersion snaptype.Version

countSep := 0
Expand Down Expand Up @@ -393,6 +398,9 @@ func (c Cfg) MergeLimit(t snaptype.Enum, fromBlock uint64) uint64 {
if !ok {
continue
}
if strings.Contains(p.Name, "caplin") {
continue
}

if info.Ext != ".seg" || (t != snaptype.Unknown && t != info.Type.Enum()) {
continue
Expand Down
6 changes: 4 additions & 2 deletions erigon-lib/downloader/snaptype/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ func ParseFileName(dir, fileName string) (res FileInfo, isE3Seedable bool, ok bo
}
}
}
if strings.Contains(fileName, "caplin/") {
return res, isStateFile, true
}
return res, isStateFile, isStateFile
}

Expand Down Expand Up @@ -266,8 +269,7 @@ func (f FileInfo) CompareTo(o FileInfo) int {
return res
}

// this is a lexical comparison (don't use enum)
return strings.Compare(f.Type.Name(), o.Type.Name())
return strings.Compare(f.name, o.name)
}

func (f FileInfo) As(t Type) FileInfo {
Expand Down
2 changes: 1 addition & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -1017,7 +1017,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
backend.polygonDownloadSync = stagedsync.New(backend.config.Sync, stagedsync.DownloadSyncStages(
backend.sentryCtx, stagedsync.StageSnapshotsCfg(
backend.chainDB, *backend.sentriesClient.ChainConfig, config.Sync, dirs, blockRetire, backend.downloaderClient,
blockReader, backend.notifications, backend.agg, false, false, backend.silkworm, config.Prune,
blockReader, backend.notifications, backend.agg, false, false, false, backend.silkworm, config.Prune,
)), nil, nil, backend.logger)

// these range extractors set the db to the local db instead of the chain db
Expand Down
7 changes: 5 additions & 2 deletions eth/stagedsync/stage_snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ type SnapshotsCfg struct {

caplin bool
blobs bool
caplinState bool
agg *state.Aggregator
silkworm *silkworm.Silkworm
snapshotUploader *snapshotUploader
Expand All @@ -113,6 +114,7 @@ func StageSnapshotsCfg(db kv.RwDB,
agg *state.Aggregator,
caplin bool,
blobs bool,
caplinState bool,
silkworm *silkworm.Silkworm,
prune prune.Mode,
) SnapshotsCfg {
Expand All @@ -130,6 +132,7 @@ func StageSnapshotsCfg(db kv.RwDB,
syncConfig: syncConfig,
blobs: blobs,
prune: prune,
caplinState: caplinState,
}

if uploadFs := cfg.syncConfig.UploadLocation; len(uploadFs) > 0 {
Expand Down Expand Up @@ -277,7 +280,7 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R

diagnostics.Send(diagnostics.CurrentSyncSubStage{SubStage: "Download header-chain"})
// Download only the snapshots that are for the header chain.
if err := snapshotsync.WaitForDownloader(ctx, s.LogPrefix(), cfg.dirs, true /*headerChain=*/, cfg.blobs, cfg.prune, cstate, cfg.agg, tx, cfg.blockReader, &cfg.chainConfig, cfg.snapshotDownloader, s.state.StagesIdsList()); err != nil {
if err := snapshotsync.WaitForDownloader(ctx, s.LogPrefix(), cfg.dirs, true /*headerChain=*/, cfg.blobs, cfg.caplinState, cfg.prune, cstate, cfg.agg, tx, cfg.blockReader, &cfg.chainConfig, cfg.snapshotDownloader, s.state.StagesIdsList()); err != nil {
return err
}

Expand All @@ -286,7 +289,7 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R
}

diagnostics.Send(diagnostics.CurrentSyncSubStage{SubStage: "Download snapshots"})
if err := snapshotsync.WaitForDownloader(ctx, s.LogPrefix(), cfg.dirs, false /*headerChain=*/, cfg.blobs, cfg.prune, cstate, cfg.agg, tx, cfg.blockReader, &cfg.chainConfig, cfg.snapshotDownloader, s.state.StagesIdsList()); err != nil {
if err := snapshotsync.WaitForDownloader(ctx, s.LogPrefix(), cfg.dirs, false /*headerChain=*/, cfg.blobs, cfg.caplinState, cfg.prune, cstate, cfg.agg, tx, cfg.blockReader, &cfg.chainConfig, cfg.snapshotDownloader, s.state.StagesIdsList()); err != nil {
return err
}
if cfg.notifier.Events != nil {
Expand Down
16 changes: 13 additions & 3 deletions turbo/snapshotsync/snapshotsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func computeBlocksToPrune(blockReader blockReader, p prune.Mode) (blocksToPrune

// WaitForDownloader - wait for Downloader service to download all expected snapshots
// for MVP we sync with Downloader only once, in future will send new snapshots also
func WaitForDownloader(ctx context.Context, logPrefix string, dirs datadir.Dirs, headerchain, blobs bool, prune prune.Mode, caplin CaplinMode, agg *state.Aggregator, tx kv.RwTx, blockReader blockReader, cc *chain.Config, snapshotDownloader proto_downloader.DownloaderClient, stagesIdsList []string) error {
func WaitForDownloader(ctx context.Context, logPrefix string, dirs datadir.Dirs, headerchain, blobs, caplinState bool, prune prune.Mode, caplin CaplinMode, agg *state.Aggregator, tx kv.RwTx, blockReader blockReader, cc *chain.Config, snapshotDownloader proto_downloader.DownloaderClient, stagesIdsList []string) error {
snapshots := blockReader.Snapshots()
borSnapshots := blockReader.BorSnapshots()

Expand Down Expand Up @@ -331,15 +331,18 @@ func WaitForDownloader(ctx context.Context, logPrefix string, dirs datadir.Dirs,

// build all download requests
for _, p := range preverifiedBlockSnapshots {
if caplin == NoCaplin && (strings.Contains(p.Name, "beaconblocks") || strings.Contains(p.Name, "blobsidecars")) {
if caplin == NoCaplin && (strings.Contains(p.Name, "beaconblocks") || strings.Contains(p.Name, "blobsidecars") || strings.Contains(p.Name, "caplin")) {
continue
}
if caplin == OnlyCaplin && !strings.Contains(p.Name, "beaconblocks") && !strings.Contains(p.Name, "blobsidecars") {
if caplin == OnlyCaplin && !strings.Contains(p.Name, "beaconblocks") && !strings.Contains(p.Name, "blobsidecars") && !strings.Contains(p.Name, "caplin") {
continue
}
if !blobs && strings.Contains(p.Name, "blobsidecars") {
continue
}
if !caplinState && strings.Contains(p.Name, "caplin/") {
continue
}
if headerchain && !strings.Contains(p.Name, "headers") && !strings.Contains(p.Name, "bodies") {
continue
}
Expand Down Expand Up @@ -466,6 +469,13 @@ func WaitForDownloader(ctx context.Context, logPrefix string, dirs datadir.Dirs,
return err
}
}
if caplinState {
if _, err := snapshotDownloader.ProhibitNewDownloads(ctx, &proto_downloader.ProhibitNewDownloadsRequest{
Type: "caplin",
}); err != nil {
return err
}
}
}

firstNonGenesis, err := rawdbv3.SecondKey(tx, kv.Headers)
Expand Down
2 changes: 1 addition & 1 deletion turbo/stages/mock/mock_sentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK
mock.agg.SetProduceMod(mock.BlockReader.FreezingCfg().ProduceE3)
mock.Sync = stagedsync.New(
cfg.Sync,
stagedsync.DefaultStages(mock.Ctx, stagedsync.StageSnapshotsCfg(mock.DB, *mock.ChainConfig, cfg.Sync, dirs, blockRetire, snapDownloader, mock.BlockReader, mock.Notifications, mock.agg, false, false, nil, prune), stagedsync.StageHeadersCfg(mock.DB, mock.sentriesClient.Hd, mock.sentriesClient.Bd, *mock.ChainConfig, cfg.Sync, sendHeaderRequest, propagateNewBlockHashes, penalize, cfg.BatchSize, false, mock.BlockReader, blockWriter, dirs.Tmp, mock.Notifications), stagedsync.StageBorHeimdallCfg(mock.DB, snapDb, stagedsync.MiningState{}, *mock.ChainConfig, nil, nil, nil, mock.BlockReader, nil, nil, recents, signatures, false, nil), stagedsync.StageBlockHashesCfg(mock.DB, mock.Dirs.Tmp, mock.ChainConfig, blockWriter), stagedsync.StageBodiesCfg(mock.DB, mock.sentriesClient.Bd, sendBodyRequest, penalize, blockPropagator, cfg.Sync.BodyDownloadTimeoutSeconds, *mock.ChainConfig, mock.BlockReader, blockWriter), stagedsync.StageSendersCfg(mock.DB, mock.ChainConfig, cfg.Sync, false, dirs.Tmp, prune, mock.BlockReader, mock.sentriesClient.Hd), stagedsync.StageExecuteBlocksCfg(
stagedsync.DefaultStages(mock.Ctx, stagedsync.StageSnapshotsCfg(mock.DB, *mock.ChainConfig, cfg.Sync, dirs, blockRetire, snapDownloader, mock.BlockReader, mock.Notifications, mock.agg, false, false, false, nil, prune), stagedsync.StageHeadersCfg(mock.DB, mock.sentriesClient.Hd, mock.sentriesClient.Bd, *mock.ChainConfig, cfg.Sync, sendHeaderRequest, propagateNewBlockHashes, penalize, cfg.BatchSize, false, mock.BlockReader, blockWriter, dirs.Tmp, mock.Notifications), stagedsync.StageBorHeimdallCfg(mock.DB, snapDb, stagedsync.MiningState{}, *mock.ChainConfig, nil, nil, nil, mock.BlockReader, nil, nil, recents, signatures, false, nil), stagedsync.StageBlockHashesCfg(mock.DB, mock.Dirs.Tmp, mock.ChainConfig, blockWriter), stagedsync.StageBodiesCfg(mock.DB, mock.sentriesClient.Bd, sendBodyRequest, penalize, blockPropagator, cfg.Sync.BodyDownloadTimeoutSeconds, *mock.ChainConfig, mock.BlockReader, blockWriter), stagedsync.StageSendersCfg(mock.DB, mock.ChainConfig, cfg.Sync, false, dirs.Tmp, prune, mock.BlockReader, mock.sentriesClient.Hd), stagedsync.StageExecuteBlocksCfg(
mock.DB,
prune,
cfg.BatchSize,
Expand Down
7 changes: 4 additions & 3 deletions turbo/stages/stageloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,7 @@ func NewDefaultStages(ctx context.Context,
runInTestMode := cfg.ImportMode

return stagedsync.DefaultStages(ctx,
stagedsync.StageSnapshotsCfg(db, *controlServer.ChainConfig, cfg.Sync, dirs, blockRetire, snapDownloader, blockReader, notifications, agg, cfg.InternalCL && cfg.CaplinConfig.Backfilling, cfg.CaplinConfig.BlobBackfilling, silkworm, cfg.Prune),
stagedsync.StageSnapshotsCfg(db, *controlServer.ChainConfig, cfg.Sync, dirs, blockRetire, snapDownloader, blockReader, notifications, agg, cfg.InternalCL && cfg.CaplinConfig.Backfilling, cfg.CaplinConfig.BlobBackfilling, cfg.CaplinConfig.Archive, silkworm, cfg.Prune),
stagedsync.StageHeadersCfg(db, controlServer.Hd, controlServer.Bd, *controlServer.ChainConfig, cfg.Sync, controlServer.SendHeaderRequest, controlServer.PropagateNewBlockHashes, controlServer.Penalize, cfg.BatchSize, p2pCfg.NoDiscovery, blockReader, blockWriter, dirs.Tmp, notifications),
stagedsync.StageBorHeimdallCfg(db, snapDb, stagedsync.MiningState{}, *controlServer.ChainConfig, heimdallClient, heimdallStore, bridgeStore, blockReader, controlServer.Hd, controlServer.Penalize, recents, signatures, cfg.WithHeimdallWaypointRecording, nil),
stagedsync.StageBlockHashesCfg(db, dirs.Tmp, controlServer.ChainConfig, blockWriter),
Expand Down Expand Up @@ -734,7 +734,7 @@ func NewPipelineStages(ctx context.Context,

if len(cfg.Sync.UploadLocation) == 0 {
return stagedsync.PipelineStages(ctx,
stagedsync.StageSnapshotsCfg(db, *controlServer.ChainConfig, cfg.Sync, dirs, blockRetire, snapDownloader, blockReader, notifications, agg, cfg.InternalCL && cfg.CaplinConfig.Backfilling, cfg.CaplinConfig.BlobBackfilling, silkworm, cfg.Prune),
stagedsync.StageSnapshotsCfg(db, *controlServer.ChainConfig, cfg.Sync, dirs, blockRetire, snapDownloader, blockReader, notifications, agg, cfg.InternalCL && cfg.CaplinConfig.Backfilling, cfg.CaplinConfig.BlobBackfilling, cfg.CaplinConfig.Archive, silkworm, cfg.Prune),
stagedsync.StageBlockHashesCfg(db, dirs.Tmp, controlServer.ChainConfig, blockWriter),
stagedsync.StageSendersCfg(db, controlServer.ChainConfig, cfg.Sync, false, dirs.Tmp, cfg.Prune, blockReader, controlServer.Hd),
stagedsync.StageExecuteBlocksCfg(db, cfg.Prune, cfg.BatchSize, controlServer.ChainConfig, controlServer.Engine, &vm.Config{}, notifications, cfg.StateStream, false, false, cfg.ChaosMonkey, dirs, blockReader, controlServer.Hd, cfg.Genesis, cfg.Sync, SilkwormForExecutionStage(silkworm, cfg)),
Expand All @@ -743,7 +743,7 @@ func NewPipelineStages(ctx context.Context,
}

return stagedsync.UploaderPipelineStages(ctx,
stagedsync.StageSnapshotsCfg(db, *controlServer.ChainConfig, cfg.Sync, dirs, blockRetire, snapDownloader, blockReader, notifications, agg, cfg.InternalCL && cfg.CaplinConfig.Backfilling, cfg.CaplinConfig.BlobBackfilling, silkworm, cfg.Prune),
stagedsync.StageSnapshotsCfg(db, *controlServer.ChainConfig, cfg.Sync, dirs, blockRetire, snapDownloader, blockReader, notifications, agg, cfg.InternalCL && cfg.CaplinConfig.Backfilling, cfg.CaplinConfig.BlobBackfilling, cfg.CaplinConfig.Archive, silkworm, cfg.Prune),
stagedsync.StageHeadersCfg(db, controlServer.Hd, controlServer.Bd, *controlServer.ChainConfig, cfg.Sync, controlServer.SendHeaderRequest, controlServer.PropagateNewBlockHashes, controlServer.Penalize, cfg.BatchSize, p2pCfg.NoDiscovery, blockReader, blockWriter, dirs.Tmp, notifications),
stagedsync.StageBlockHashesCfg(db, dirs.Tmp, controlServer.ChainConfig, blockWriter),
stagedsync.StageSendersCfg(db, controlServer.ChainConfig, cfg.Sync, false, dirs.Tmp, cfg.Prune, blockReader, controlServer.Hd),
Expand Down Expand Up @@ -802,6 +802,7 @@ func NewPolygonSyncStages(
agg,
config.InternalCL && config.CaplinConfig.Backfilling,
config.CaplinConfig.BlobBackfilling,
config.CaplinConfig.Archive,
silkworm,
config.Prune,
),
Expand Down

0 comments on commit 5540117

Please sign in to comment.