From 2811291d7b4b0db5faa01c808cd000a32bf52b17 Mon Sep 17 00:00:00 2001 From: ekexium Date: Fri, 15 Mar 2024 20:41:30 +0800 Subject: [PATCH 1/2] feat: implement staging for PipelinedMemDB Signed-off-by: ekexium --- internal/unionstore/pipelined_memdb.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/internal/unionstore/pipelined_memdb.go b/internal/unionstore/pipelined_memdb.go index 0ba6479d4a..161c100071 100644 --- a/internal/unionstore/pipelined_memdb.go +++ b/internal/unionstore/pipelined_memdb.go @@ -214,6 +214,11 @@ func (p *PipelinedMemDB) Flush(force bool) (bool, error) { if p.flushFunc == nil { return false, errors.New("flushFunc is not provided") } + + if len(p.memDB.stages) > 0 { + return false, errors.New("there are stages unreleased when Flush is called") + } + if !force && !p.needFlush() { return false, nil } @@ -375,17 +380,17 @@ func (p *PipelinedMemDB) SnapshotGetter() Getter { // Staging is not supported for PipelinedMemDB, it returns 0 handle. func (p *PipelinedMemDB) Staging() int { - panic("Staging is not supported for PipelinedMemDB") + return p.memDB.Staging() } // Cleanup implements MemBuffer interface. -func (p *PipelinedMemDB) Cleanup(int) { - panic("Cleanup is not supported for PipelinedMemDB") +func (p *PipelinedMemDB) Cleanup(h int) { + p.memDB.Cleanup(h) } // Release implements MemBuffer interface. -func (p *PipelinedMemDB) Release(int) { - panic("Release is not supported for PipelinedMemDB") +func (p *PipelinedMemDB) Release(h int) { + p.memDB.Release(h) } // Checkpoint implements MemBuffer interface. From 49b7b4b90fe8ea3764955d6a3a1aa12b7d748acc Mon Sep 17 00:00:00 2001 From: ekexium Date: Fri, 15 Mar 2024 21:59:50 +0800 Subject: [PATCH 2/2] comment: explain staging methods Signed-off-by: ekexium --- internal/unionstore/pipelined_memdb.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/internal/unionstore/pipelined_memdb.go b/internal/unionstore/pipelined_memdb.go index 161c100071..55dda8da29 100644 --- a/internal/unionstore/pipelined_memdb.go +++ b/internal/unionstore/pipelined_memdb.go @@ -378,7 +378,15 @@ func (p *PipelinedMemDB) SnapshotGetter() Getter { panic("SnapshotGetter is not supported for PipelinedMemDB") } -// Staging is not supported for PipelinedMemDB, it returns 0 handle. +// NOTE about the Staging()/Cleanup()/Release() methods: +// Its correctness is guaranteed by that no stage can exist when a Flush() is called. +// We guarantee in TiDB side that every stage lives inside a flush batch, +// which means the modifications all goes to the mutable memdb. +// Then the staging of the whole PipelinedMemDB can be directly implemented by its mutable memdb. +// +// Checkpoint()/RevertToCheckpoint() is not supported for PipelinedMemDB. + +// Staging implements MemBuffer interface. func (p *PipelinedMemDB) Staging() int { return p.memDB.Staging() }