Skip to content

Commit

Permalink
indexer-alt: configure consistency retention in one place (#20889)
Browse files Browse the repository at this point in the history
## Description

Add back a dedicated top-level configuration for consistency, to control
the retention of the consistent range. Now that this range is controlled
by the pruner, it can be configured by a `PrunerConfig` as well, rather
than a dedicated `ConsistencyConfig`.

The motivation for this is that the E2E testing setup needs to configure
the consistent range, and it would be better to avoid having to
enumerate all the consistent pipelines in the test harness because it
would be easy to forget to update that list, whereas with this approach,
we take advantage of the fact that there is an `add_consistent` macro to
register the pipeline, and it will do the right thing.

## Test plan

```
sui$ cargo check
sui$ cargo nextest run -p sui-indexer-alt
sui$ cargo run -p sui-indexer-alt -- generate-config
```

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] gRPC:
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
  • Loading branch information
amnn authored Jan 15, 2025
1 parent fefeec2 commit a46f697
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 9 deletions.
23 changes: 17 additions & 6 deletions crates/sui-indexer-alt/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ pub struct IndexerConfig {
/// How checkpoints are read by the indexer.
pub ingestion: IngestionLayer,

/// Configuration for the retention on consistent pipelines.
pub consistency: PrunerLayer,

/// Default configuration for committers that is shared by all pipelines. Pipelines can
/// override individual settings in their own configuration sections.
pub committer: CommitterLayer,
Expand Down Expand Up @@ -109,12 +112,15 @@ pub struct PrunerLayer {
#[derive(Clone, Default, Debug)]
#[serde(rename_all = "snake_case")]
pub struct PipelineLayer {
// Consistent pipelines
pub coin_balance_buckets: Option<CommitterLayer>,
pub obj_info: Option<CommitterLayer>,

// Sequential pipelines
pub sum_displays: Option<SequentialLayer>,
pub sum_packages: Option<SequentialLayer>,

// All concurrent pipelines
pub coin_balance_buckets: Option<ConcurrentLayer>,
pub cp_sequence_numbers: Option<ConcurrentLayer>,
pub ev_emit_mod: Option<ConcurrentLayer>,
pub ev_struct_inst: Option<ConcurrentLayer>,
Expand All @@ -125,7 +131,6 @@ pub struct PipelineLayer {
pub kv_objects: Option<ConcurrentLayer>,
pub kv_protocol_configs: Option<ConcurrentLayer>,
pub kv_transactions: Option<ConcurrentLayer>,
pub obj_info: Option<ConcurrentLayer>,
pub obj_versions: Option<ConcurrentLayer>,
pub tx_affected_addresses: Option<ConcurrentLayer>,
pub tx_affected_objects: Option<ConcurrentLayer>,
Expand All @@ -145,6 +150,11 @@ impl IndexerConfig {
let mut example: Self = Default::default();

example.ingestion = IngestionConfig::default().into();
example.consistency = PrunerLayer {
interval_ms: Some(60_000),
retention: Some(4 * 60 * 60),
..Default::default()
};
example.committer = CommitterConfig::default().into();
example.pruner = PrunerConfig::default().into();
example.pipeline = PipelineLayer::example();
Expand Down Expand Up @@ -233,9 +243,10 @@ impl PipelineLayer {
/// configure.
pub fn example() -> Self {
PipelineLayer {
coin_balance_buckets: Some(Default::default()),
obj_info: Some(Default::default()),
sum_displays: Some(Default::default()),
sum_packages: Some(Default::default()),
coin_balance_buckets: Some(Default::default()),
cp_sequence_numbers: Some(Default::default()),
ev_emit_mod: Some(Default::default()),
ev_struct_inst: Some(Default::default()),
Expand All @@ -246,7 +257,6 @@ impl PipelineLayer {
kv_objects: Some(Default::default()),
kv_protocol_configs: Some(Default::default()),
kv_transactions: Some(Default::default()),
obj_info: Some(Default::default()),
obj_versions: Some(Default::default()),
tx_affected_addresses: Some(Default::default()),
tx_affected_objects: Some(Default::default()),
Expand All @@ -270,6 +280,7 @@ impl Merge for IndexerConfig {
check_extra("top-level", other.extra);
IndexerConfig {
ingestion: self.ingestion.merge(other.ingestion),
consistency: self.consistency.merge(other.consistency),
committer: self.committer.merge(other.committer),
pruner: self.pruner.merge(other.pruner),
pipeline: self.pipeline.merge(other.pipeline),
Expand Down Expand Up @@ -353,9 +364,10 @@ impl Merge for PipelineLayer {
check_extra("pipeline", self.extra);
check_extra("pipeline", other.extra);
PipelineLayer {
coin_balance_buckets: self.coin_balance_buckets.merge(other.coin_balance_buckets),
obj_info: self.obj_info.merge(other.obj_info),
sum_displays: self.sum_displays.merge(other.sum_displays),
sum_packages: self.sum_packages.merge(other.sum_packages),
coin_balance_buckets: self.coin_balance_buckets.merge(other.coin_balance_buckets),
cp_sequence_numbers: self.cp_sequence_numbers.merge(other.cp_sequence_numbers),
ev_emit_mod: self.ev_emit_mod.merge(other.ev_emit_mod),
ev_struct_inst: self.ev_struct_inst.merge(other.ev_struct_inst),
Expand All @@ -366,7 +378,6 @@ impl Merge for PipelineLayer {
kv_objects: self.kv_objects.merge(other.kv_objects),
kv_protocol_configs: self.kv_protocol_configs.merge(other.kv_protocol_configs),
kv_transactions: self.kv_transactions.merge(other.kv_transactions),
obj_info: self.obj_info.merge(other.obj_info),
obj_versions: self.obj_versions.merge(other.obj_versions),
tx_affected_addresses: self
.tx_affected_addresses
Expand Down
26 changes: 23 additions & 3 deletions crates/sui-indexer-alt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub async fn start_indexer(
) -> anyhow::Result<()> {
let IndexerConfig {
ingestion,
consistency,
committer,
pruner,
pipeline,
Expand Down Expand Up @@ -79,6 +80,7 @@ pub async fn start_indexer(
} = pipeline.finish();

let ingestion = ingestion.finish(IngestionConfig::default());
let consistency = consistency.finish(PrunerConfig::default());
let committer = committer.finish(CommitterConfig::default());
let pruner = pruner.finish(PrunerConfig::default());

Expand Down Expand Up @@ -110,6 +112,22 @@ pub async fn start_indexer(
// `concurrent` "write-ahead log" pipeline, with their configuration based on the supplied
// ConsistencyConfig.

macro_rules! add_consistent {
($handler:expr, $config:expr) => {
if let Some(layer) = $config {
indexer
.concurrent_pipeline(
$handler,
ConcurrentConfig {
committer: layer.finish(committer.clone()),
pruner: Some(consistency.clone()),
},
)
.await?
}
};
}

macro_rules! add_concurrent {
($handler:expr, $config:expr) => {
if let Some(layer) = $config {
Expand Down Expand Up @@ -150,12 +168,15 @@ pub async fn start_indexer(
add_concurrent!(KvProtocolConfigs(genesis.clone()), kv_protocol_configs);
}

// Other summary tables (without write-ahead log)
// Consistent pipelines
add_consistent!(CoinBalanceBuckets::default(), coin_balance_buckets);
add_consistent!(ObjInfo::default(), obj_info);

// Summary tables (without write-ahead log)
add_sequential!(SumDisplays, sum_displays);
add_sequential!(SumPackages, sum_packages);

// Unpruned concurrent pipelines
add_concurrent!(CoinBalanceBuckets::default(), coin_balance_buckets);
add_concurrent!(CpSequenceNumbers, cp_sequence_numbers);
add_concurrent!(EvEmitMod, ev_emit_mod);
add_concurrent!(EvStructInst, ev_struct_inst);
Expand All @@ -164,7 +185,6 @@ pub async fn start_indexer(
add_concurrent!(KvEpochStarts, kv_epoch_starts);
add_concurrent!(KvObjects, kv_objects);
add_concurrent!(KvTransactions, kv_transactions);
add_concurrent!(ObjInfo::default(), obj_info);
add_concurrent!(ObjVersions, obj_versions);
add_concurrent!(TxAffectedAddresses, tx_affected_addresses);
add_concurrent!(TxAffectedObjects, tx_affected_objects);
Expand Down

0 comments on commit a46f697

Please sign in to comment.