-
Notifications
You must be signed in to change notification settings - Fork 11.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement consistency pruning in the builit-in pruner #20702
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎
3 Skipped Deployments
|
async fn prune(&self, from: u64, to: u64, conn: &mut db::Connection<'_>) -> Result<usize> { | ||
use sui_indexer_alt_schema::schema::obj_info::dsl; | ||
|
||
let to_prune = self.pruning_lookup_table.take(from, to)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems sensible enough for pruner to take self. The handler will know best the shape of the data it needs to do any consistent pruning, would likely be unwieldy otherwise
9196bde
to
e2b70f4
Compare
e2b70f4
to
cb2f77e
Compare
cb2f77e
to
0e652b0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like it overall! The only bit that I want to think about is how it interacts with resumption and the first checkpoint logic, but I think that is more to do with me being unsatisfied with how that bit of code works already -- I think this approach is pretty nice and clean.
@@ -27,6 +27,10 @@ pub trait Processor { | |||
/// How much concurrency to use when processing checkpoint data. | |||
const FANOUT: usize = 10; | |||
|
|||
/// Whether the pruner requires processed values in order to prune. | |||
/// This will determine the first checkpoint to process when we start the pipeline. | |||
const PRUNING_REQUIRES_PROCESSED_VALUES: bool = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is kind of an unfortunate consequence of this approach, but I'm coming around to it, with some small tweaks:
- This feature is unique to concurrent pipelines (as is pruning), so we should put it on the
concurrent::Handler
trait (although I see now above why you've put it onProcessor
based on where in the codebase you fetch the watermark). - I would name it for exactly what it controls (resuming from the pruner watermark), rather than what we might use it for (back-channeling values from the processor to the pruner). How about
RESUME_FROM_PRUNER_HI
or something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To me "resuming from the pruner watermark" is an implementation detail that would actually be difficult for pipeline builders to understand. What they care about is the fact that they want to use processed data to do pruning, and the need to resume ingestion from pruner hi is a side-effect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh fun, looks like I left the same comment haha
When I see the current name I'm inclined to ask what the processed values are and their relationship to the watermarks that dictate the pipeline, so it feels like an unnecessary abstraction to me. To extend a pipeline so that the processor and pruner can share data, the indexer provider would need to have knowledge of pruner watermark and related mechanics anyways (for example, the query to prune in object_info, determining the exclusive upper bound checkpoint to prune up to depending on object status)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me continue thinking about this after this PR.
I also want to find a way to enforce the behavior instead of relying on the flag since it's error prone.
0e652b0
to
52ea442
Compare
52ea442
to
471c8be
Compare
471c8be
to
4904e42
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm, basically pipelines should indicate whether they need to start from the pruner watermark or not. if a pipeline needs additional information to help it prune, the processor and pruner should be able to write to and read from shared data, and this is something that an indexer operator can easily build on, as demonstrated with obj_info
/// Whether the pruner requires processed values in order to prune. | ||
/// This will determine the first checkpoint to process when we start the pipeline. | ||
/// If this is true, when the pipeline starts, it will process all checkpoints from the | ||
/// pruner watermark, so that the pruner have access to the processed values for any unpruned | ||
/// checkpoints. | ||
/// If this is false, when the pipeline starts, it will process all checkpoints from the | ||
/// committer watermark. | ||
const PRUNING_REQUIRES_PROCESSED_VALUES: bool = false; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so, either start from pruner watermark, or if false, start from committer watermark? What about PRUNING_STARTS_FROM_PRUNER_WATERMARK
?
|
||
#[tokio::test] | ||
async fn test_add_new_pipeline() { | ||
let (mut indexer, _temp_db) = Indexer::new_for_testing().await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if Indexer::new_for_testing is meant for just these tests, we could have a function that spawns an indexer in the test module rather than a pub fn in the source code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's just because we have not written any tests that would start the indexer so far?
.concurrent_pipeline(ConcurrentPipeline3, ConcurrentConfig::default()) | ||
.await | ||
.unwrap(); | ||
assert_eq!(indexer.first_checkpoint_from_watermark, 6); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could also assert that the other pipeline is not affected
/// For each object, whether this object was mutated or deleted in this checkpoint. | ||
/// This will determine the prune checkpoint for this object. | ||
info: BTreeMap<ObjectID, UpdateKind>, | ||
} | ||
|
||
enum UpdateKind { | ||
/// This object was mutated in this checkpoint. | ||
/// To prune, we should prune anything prior to this checkpoint. | ||
Mutate, | ||
/// This object was deleted in this checkpoint. | ||
/// To prune, we should prune anything prior to this checkpoint, | ||
/// as well as this checkpoint. | ||
Delete, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
whether an object was mutated or deleted, they both share a common trait that we can prune anything prior to this checkpoint. then, we decide whether to prune /this/ checkpoint as well. What about just maintaining a map of ObjectID -> a bool indicating whether or not to also prune /this/ checkpoint?
// We do not need to prune if the object was created in this checkpoint, | ||
// because this object would not have been in the table prior to this checkpoint. | ||
if checkpoint_input_objects.contains_key(object_id) { | ||
prune_info.add_mutated_object(*object_id); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
had to reread this a few times but, i think i got it now. in other words, objects that are created in a checkpoint will not show up in checkpoint_input_objects - these objects were used as inputs to a tx in a checkpoint, so they must already exist prior to the checkpoint. So to exclude them, we only add mutated objects that were inputs to a tx
what about unwrapped objects? I think that's handled in the deletion case - when we wrap, it's considered deleted, and checkpoints including when it was wrapped will be pruned
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is correct!
pub(crate) fn new_for_testing(pipeline: &'p str, checkpoint_hi_inclusive: u64) -> Self { | ||
CommitterWatermark { | ||
pipeline: pipeline.into(), | ||
epoch_hi_inclusive: 0, | ||
checkpoint_hi_inclusive: checkpoint_hi_inclusive as i64, | ||
tx_hi: 0, | ||
timestamp_ms_hi_inclusive: 0, | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here, we could have this in the test module instead of on the struct? idk how big of a deal/ difference it would be
Description
This PR implements consistent pruning within the built-in pruner for concurrent pipelines.
Instead of relying on a separate pipeline to do pruning, this PR adds shared data between the processor and pruner for obj_info.
Note that this PR isn't finished yet, sending our early to make sure this is not in the wrong direction.
To implement this, I also need to make pruner take
self
so that it has access to shared data. The first checkpoint of indexer also needs special care to make sure for any pipeline that requires processed data to prune, we must start processing at the pruner watermark instead of commit watermark.Test plan
How did you test the new or updated feature?
Release notes
Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required.
For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates.