Skip to content

Commit

Permalink
txn: implement staging for PipelinedMemDB (#1230)
Browse files Browse the repository at this point in the history
* feat: implement staging for PipelinedMemDB

Signed-off-by: ekexium <[email protected]>

* comment: explain staging methods

Signed-off-by: ekexium <[email protected]>

---------

Signed-off-by: ekexium <[email protected]>
  • Loading branch information
ekexium authored Mar 18, 2024
1 parent 56418bf commit 73c0712
Showing 1 changed file with 20 additions and 6 deletions.
26 changes: 20 additions & 6 deletions internal/unionstore/pipelined_memdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,14 @@ func (p *PipelinedMemDB) Flush(force bool) (bool, error) {
if p.flushFunc == nil {
return false, errors.New("flushFunc is not provided")
}

// invalidate the batch get cache whether the flush is really triggered.
p.batchGetCache = nil

if len(p.memDB.stages) > 0 {
return false, errors.New("there are stages unreleased when Flush is called")
}

if !force && !p.needFlush() {
return false, nil
}
Expand Down Expand Up @@ -446,19 +452,27 @@ func (p *PipelinedMemDB) SnapshotGetter() Getter {
panic("SnapshotGetter is not supported for PipelinedMemDB")
}

// Staging is not supported for PipelinedMemDB, it returns 0 handle.
// NOTE about the Staging()/Cleanup()/Release() methods:
// Its correctness is guaranteed by that no stage can exist when a Flush() is called.
// We guarantee in TiDB side that every stage lives inside a flush batch,
// which means the modifications all goes to the mutable memdb.
// Then the staging of the whole PipelinedMemDB can be directly implemented by its mutable memdb.
//
// Checkpoint()/RevertToCheckpoint() is not supported for PipelinedMemDB.

// Staging implements MemBuffer interface.
func (p *PipelinedMemDB) Staging() int {
panic("Staging is not supported for PipelinedMemDB")
return p.memDB.Staging()
}

// Cleanup implements MemBuffer interface.
func (p *PipelinedMemDB) Cleanup(int) {
panic("Cleanup is not supported for PipelinedMemDB")
func (p *PipelinedMemDB) Cleanup(h int) {
p.memDB.Cleanup(h)
}

// Release implements MemBuffer interface.
func (p *PipelinedMemDB) Release(int) {
panic("Release is not supported for PipelinedMemDB")
func (p *PipelinedMemDB) Release(h int) {
p.memDB.Release(h)
}

// Checkpoint implements MemBuffer interface.
Expand Down

0 comments on commit 73c0712

Please sign in to comment.