Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[indexer-alt] add prune impls for each pipeline #20635

Merged
merged 3 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions crates/sui-indexer-alt/src/handlers/ev_emit_mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::ops::Range;
use std::{collections::BTreeSet, sync::Arc};

use anyhow::Result;
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::models::cp_sequence_numbers::tx_interval;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_schema::{events::StoredEvEmitMod, schema::ev_emit_mod};
use sui_pg_db as db;
Expand Down Expand Up @@ -57,4 +60,21 @@ impl Handler for EvEmitMod {
.execute(conn)
.await?)
}

async fn prune(
&self,
from: u64,
to_exclusive: u64,
conn: &mut db::Connection<'_>,
) -> Result<usize> {
let Range {
start: from_tx,
end: to_tx,
} = tx_interval(conn, from..to_exclusive).await?;

let filter = ev_emit_mod::table
.filter(ev_emit_mod::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why - 1?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The to_tx is the first tx of the to_exclusive checkpoint and should not be pruned


Ok(diesel::delete(filter).execute(conn).await?)
}
}
25 changes: 23 additions & 2 deletions crates/sui-indexer-alt/src/handlers/ev_struct_inst.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::{collections::BTreeSet, sync::Arc};
use std::{collections::BTreeSet, ops::Range, sync::Arc};

use anyhow::{Context, Result};
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_framework::{
models::cp_sequence_numbers::tx_interval,
pipeline::{concurrent::Handler, Processor},
};
use sui_indexer_alt_schema::{events::StoredEvStructInst, schema::ev_struct_inst};
use sui_pg_db as db;
use sui_types::full_checkpoint_content::CheckpointData;
Expand Down Expand Up @@ -60,4 +64,21 @@ impl Handler for EvStructInst {
.execute(conn)
.await?)
}

async fn prune(
&self,
from: u64,
to_exclusive: u64,
conn: &mut db::Connection<'_>,
) -> Result<usize> {
let Range {
start: from_tx,
end: to_tx,
} = tx_interval(conn, from..to_exclusive).await?;

let filter = ev_struct_inst::table
.filter(ev_struct_inst::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1));

Ok(diesel::delete(filter).execute(conn).await?)
}
}
13 changes: 13 additions & 0 deletions crates/sui-indexer-alt/src/handlers/kv_checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::sync::Arc;

use anyhow::{Context, Result};
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_schema::{checkpoints::StoredCheckpoint, schema::kv_checkpoints};
Expand Down Expand Up @@ -38,4 +39,16 @@ impl Handler for KvCheckpoints {
.execute(conn)
.await?)
}

async fn prune(
&self,
from: u64,
to_exclusive: u64,
conn: &mut db::Connection<'_>,
) -> Result<usize> {
let filter = kv_checkpoints::table
.filter(kv_checkpoints::sequence_number.between(from as i64, to_exclusive as i64 - 1));

Ok(diesel::delete(filter).execute(conn).await?)
}
}
26 changes: 25 additions & 1 deletion crates/sui-indexer-alt/src/handlers/kv_epoch_ends.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::ops::Range;
use std::sync::Arc;

use anyhow::{bail, Context, Result};
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_framework::{
models::cp_sequence_numbers::epoch_interval,
pipeline::{concurrent::Handler, Processor},
};
use sui_indexer_alt_schema::{epochs::StoredEpochEnd, schema::kv_epoch_ends};
use sui_pg_db as db;
use sui_types::{
Expand Down Expand Up @@ -125,4 +130,23 @@ impl Handler for KvEpochEnds {
.execute(conn)
.await?)
}

async fn prune(
&self,
from: u64,
to_exclusive: u64,
conn: &mut db::Connection<'_>,
) -> Result<usize> {
let Range {
start: from_epoch,
end: to_epoch,
} = epoch_interval(conn, from..to_exclusive).await?;
if from_epoch < to_epoch {
let filter = kv_epoch_ends::table
.filter(kv_epoch_ends::epoch.between(from_epoch as i64, to_epoch as i64 - 1));
Ok(diesel::delete(filter).execute(conn).await?)
} else {
Ok(0)
}
}
}
27 changes: 26 additions & 1 deletion crates/sui-indexer-alt/src/handlers/kv_epoch_starts.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::ops::Range;
use std::sync::Arc;

use anyhow::{bail, Context, Result};
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_framework::{
models::cp_sequence_numbers::epoch_interval,
pipeline::{concurrent::Handler, Processor},
};
use sui_indexer_alt_schema::{epochs::StoredEpochStart, schema::kv_epoch_starts};
use sui_pg_db as db;
use sui_types::{
Expand Down Expand Up @@ -72,4 +77,24 @@ impl Handler for KvEpochStarts {
.execute(conn)
.await?)
}

async fn prune(
&self,
from: u64,
to_exclusive: u64,
conn: &mut db::Connection<'_>,
) -> Result<usize> {
let Range {
start: from_epoch,
end: to_epoch,
} = epoch_interval(conn, from..to_exclusive).await?;
if from_epoch < to_epoch {
let filter = kv_epoch_starts::table
.filter(kv_epoch_starts::epoch.between(from_epoch as i64, to_epoch as i64 - 1));

Ok(diesel::delete(filter).execute(conn).await?)
} else {
Ok(0)
}
}
}
16 changes: 16 additions & 0 deletions crates/sui-indexer-alt/src/handlers/kv_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::sync::Arc;

use anyhow::{Context, Result};
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_schema::{schema::kv_transactions, transactions::StoredTransaction};
Expand Down Expand Up @@ -66,4 +67,19 @@ impl Handler for KvTransactions {
.execute(conn)
.await?)
}

async fn prune(
&self,
from: u64,
to_exclusive: u64,
conn: &mut db::Connection<'_>,
) -> Result<usize> {
// TODO: use tx_interval. `tx_sequence_number` needs to be added to this table, and an index
// created as its primary key is on `tx_digest`.
let filter = kv_transactions::table.filter(
kv_transactions::cp_sequence_number.between(from as i64, to_exclusive as i64 - 1),
);

Ok(diesel::delete(filter).execute(conn).await?)
}
}
24 changes: 23 additions & 1 deletion crates/sui-indexer-alt/src/handlers/tx_affected_addresses.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::ops::Range;
use std::sync::Arc;

use anyhow::Result;
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use itertools::Itertools;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_framework::{
models::cp_sequence_numbers::tx_interval,
pipeline::{concurrent::Handler, Processor},
};
use sui_indexer_alt_schema::{
schema::tx_affected_addresses, transactions::StoredTxAffectedAddress,
};
Expand Down Expand Up @@ -69,4 +74,21 @@ impl Handler for TxAffectedAddresses {
.execute(conn)
.await?)
}

async fn prune(
&self,
from: u64,
to_exclusive: u64,
conn: &mut db::Connection<'_>,
) -> Result<usize> {
let Range {
start: from_tx,
end: to_tx,
} = tx_interval(conn, from..to_exclusive).await?;
let filter = tx_affected_addresses::table.filter(
tx_affected_addresses::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1),
);

Ok(diesel::delete(filter).execute(conn).await?)
}
}
24 changes: 23 additions & 1 deletion crates/sui-indexer-alt/src/handlers/tx_affected_objects.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::ops::Range;
use std::sync::Arc;

use anyhow::Result;
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_framework::{
models::cp_sequence_numbers::tx_interval,
pipeline::{concurrent::Handler, Processor},
};
use sui_indexer_alt_schema::{schema::tx_affected_objects, transactions::StoredTxAffectedObject};
use sui_pg_db as db;
use sui_types::{effects::TransactionEffectsAPI, full_checkpoint_content::CheckpointData};
Expand Down Expand Up @@ -59,4 +64,21 @@ impl Handler for TxAffectedObjects {
.execute(conn)
.await?)
}

async fn prune(
&self,
from: u64,
to_exclusive: u64,
conn: &mut db::Connection<'_>,
) -> Result<usize> {
let Range {
start: from_tx,
end: to_tx,
} = tx_interval(conn, from..to_exclusive).await?;
let filter = tx_affected_objects::table.filter(
tx_affected_objects::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1),
);

Ok(diesel::delete(filter).execute(conn).await?)
}
}
24 changes: 23 additions & 1 deletion crates/sui-indexer-alt/src/handlers/tx_balance_changes.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::ops::Range;
use std::{collections::BTreeMap, sync::Arc};

use anyhow::{Context, Result};
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_framework::{
models::cp_sequence_numbers::tx_interval,
pipeline::{concurrent::Handler, Processor},
};
use sui_indexer_alt_schema::{
schema::tx_balance_changes,
transactions::{BalanceChange, StoredTxBalanceChange},
Expand Down Expand Up @@ -65,6 +70,23 @@ impl Handler for TxBalanceChanges {
.execute(conn)
.await?)
}

async fn prune(
&self,
from: u64,
to_exclusive: u64,
conn: &mut db::Connection<'_>,
) -> Result<usize> {
let Range {
start: from_tx,
end: to_tx,
} = tx_interval(conn, from..to_exclusive).await?;
let filter = tx_balance_changes::table.filter(
tx_balance_changes::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1),
);

Ok(diesel::delete(filter).execute(conn).await?)
}
}

/// Calculate balance changes based on the object's input and output objects.
Expand Down
23 changes: 22 additions & 1 deletion crates/sui-indexer-alt/src/handlers/tx_calls.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::ops::Range;
use std::sync::Arc;

use anyhow::{Ok, Result};
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_framework::{
models::cp_sequence_numbers::tx_interval,
pipeline::{concurrent::Handler, Processor},
};
use sui_indexer_alt_schema::{schema::tx_calls, transactions::StoredTxCalls};
use sui_pg_db as db;
use sui_types::full_checkpoint_content::CheckpointData;
Expand Down Expand Up @@ -62,4 +67,20 @@ impl Handler for TxCalls {
.execute(conn)
.await?)
}

async fn prune(
&self,
from: u64,
to_exclusive: u64,
conn: &mut db::Connection<'_>,
) -> Result<usize> {
let Range {
start: from_tx,
end: to_tx,
} = tx_interval(conn, from..to_exclusive).await?;
let filter = tx_calls::table
.filter(tx_calls::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1));

Ok(diesel::delete(filter).execute(conn).await?)
}
}
Loading
Loading