From 73c0712c0189d2d7ea5ff3caa0e04444e4dbedd2 Mon Sep 17 00:00:00 2001 From: ekexium Date: Mon, 18 Mar 2024 14:25:05 +0800 Subject: [PATCH] txn: implement staging for PipelinedMemDB (#1230) * feat: implement staging for PipelinedMemDB Signed-off-by: ekexium * comment: explain staging methods Signed-off-by: ekexium --------- Signed-off-by: ekexium --- internal/unionstore/pipelined_memdb.go | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/internal/unionstore/pipelined_memdb.go b/internal/unionstore/pipelined_memdb.go index 156086a1b..cd72c8698 100644 --- a/internal/unionstore/pipelined_memdb.go +++ b/internal/unionstore/pipelined_memdb.go @@ -278,8 +278,14 @@ func (p *PipelinedMemDB) Flush(force bool) (bool, error) { if p.flushFunc == nil { return false, errors.New("flushFunc is not provided") } + // invalidate the batch get cache whether the flush is really triggered. p.batchGetCache = nil + + if len(p.memDB.stages) > 0 { + return false, errors.New("there are stages unreleased when Flush is called") + } + if !force && !p.needFlush() { return false, nil } @@ -446,19 +452,27 @@ func (p *PipelinedMemDB) SnapshotGetter() Getter { panic("SnapshotGetter is not supported for PipelinedMemDB") } -// Staging is not supported for PipelinedMemDB, it returns 0 handle. +// NOTE about the Staging()/Cleanup()/Release() methods: +// Its correctness is guaranteed by that no stage can exist when a Flush() is called. +// We guarantee in TiDB side that every stage lives inside a flush batch, +// which means the modifications all goes to the mutable memdb. +// Then the staging of the whole PipelinedMemDB can be directly implemented by its mutable memdb. +// +// Checkpoint()/RevertToCheckpoint() is not supported for PipelinedMemDB. + +// Staging implements MemBuffer interface. func (p *PipelinedMemDB) Staging() int { - panic("Staging is not supported for PipelinedMemDB") + return p.memDB.Staging() } // Cleanup implements MemBuffer interface. -func (p *PipelinedMemDB) Cleanup(int) { - panic("Cleanup is not supported for PipelinedMemDB") +func (p *PipelinedMemDB) Cleanup(h int) { + p.memDB.Cleanup(h) } // Release implements MemBuffer interface. -func (p *PipelinedMemDB) Release(int) { - panic("Release is not supported for PipelinedMemDB") +func (p *PipelinedMemDB) Release(h int) { + p.memDB.Release(h) } // Checkpoint implements MemBuffer interface.