-
Notifications
You must be signed in to change notification settings - Fork 188
[WIP] Async Checkpoint with Causality Optimization #2262
base: master
Are you sure you want to change the base?
Conversation
[REVIEW NOTIFICATION] This pull request has not been approved. To complete the pull request process, please ask the reviewers in the list to review by filling The full list of commands accepted by this bot can be found here. Reviewer can indicate their review by submitting an approval review. |
syncer/syncer.go
Outdated
s.jobWg.Wait() | ||
s.addCount(true, adminQueueName, job.tp, 1, job.targetTable) | ||
return s.flushCheckPoints() | ||
s.flushCheckPointsAsync(job.wg, job.seq) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we trigger this flush whenever we call flushJobs()
, and from my understanding of the code, it's the situation need us to synchronously flush checkpoint. There are 3 places call such function in syncer.go, we could explore these cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, some of the flushJobs
should wait for the ddls actually finish executing.
@@ -916,6 +924,7 @@ func (s *Syncer) addJob(job *job) error { | |||
s.tctx.L().Info("All jobs is completed before syncer close, the coming job will be reject", zap.Any("job", job)) | |||
return nil | |||
} | |||
job.seq = s.getSeq() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to call getSeq for every job? and do we also need to use lock within getSeq()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only rows/flush events depend on this seq, but this no harm to allocate a sequence number for every event. BTW, becuase this function is only called in the main thread, this is no need to sync.
syncer/checkpoint.go
Outdated
|
||
snapshot := &removeCheckpointSnapshot{ | ||
id: id, | ||
globalPointSaveTime: cp.globalPointSaveTime, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cp.globalPointSaveTime is a reference here, and it's modifiable and by the time we flush/remove the checkpoint snapshot, its value might be changed. Or, we can have other methods to avoid such case happen.
The same question also applies to other fields.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems not, go's assignment operation is not by reference for common struct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my bad, cp.globalPointSaveTime is definitely not a good case here, i thought it was pointer before. i have concern for few other cases:
- snapshot.globalPoint = &cp.globalPoint.location
- tableCpSnapshots[tbl] = point.location
if needFlush { | ||
s.jobWg.Add(1) | ||
j := newFlushJob() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
needFlush is set to true, when we found out that globalPointSavedtime hasn't updated for certain interval. In a case when it happened, we will create flush job everytime when a new job is added. A control mechanism is needed here. I implemented that in my branch, we could take reference to that.
|
||
// Run read flush tasks from input and execute one by one | ||
func (w *checkpointFlushWorker) Run(ctx *tcontext.Context) { | ||
for msg := range w.input { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
based on my understanding here, we are flush checkpoint snapshot one by one, and i wonder how would we guarantee the speed of flush compared to running dml jobs.
Currently, we have snapshots
array field in checkpoint struct, the initial size of such array, and how we handle when the snapshots array are overloaded will need carefuly design
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we presume that flush checkpoint is not so slow. And since the length of the input chan is finite (16), so event if the flush operation is too slow, when the input chan is full, it will then block the caller from add more msg, thus it will block the whole sync. So the async mechanism should not be worse even if the flush is unexpectly slow.
return fmt.Sprintf("%v(flushed %v)", b.location.location, b.flushedLocation.location) | ||
} | ||
|
||
// |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment on exported type SnapshotID should be of the form "SnapshotID ..." (with optional leading article)
return fmt.Sprintf("%v(flushed %v)", b.location.location, b.flushedLocation.location) | ||
} | ||
|
||
// |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment on exported type SnapshotID should be of the form "SnapshotID ..." (with optional leading article)
return fmt.Sprintf("%v(flushed %v)", b.location.location, b.flushedLocation.location) | ||
} | ||
|
||
// |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment on exported type SnapshotID should be of the form "SnapshotID ..." (with optional leading article)
/cc @lance6716 @glorv |
What problem does this PR solve?
Initial PR for implementing Async Checkpoint with Causality Optimization, this PR is created for discussion purpose
What is changed and how it works?
Check List
Tests
Code changes
Side effects
Related changes
dm/dm-ansible