From 56418bf28fe6e9e9026cb3825304b8207077e639 Mon Sep 17 00:00:00 2001 From: ekexium Date: Mon, 18 Mar 2024 11:02:04 +0800 Subject: [PATCH] feat: implement Mem() and SetMemoryFootprintChangeHook (#1233) Signed-off-by: ekexium --- internal/unionstore/pipelined_memdb.go | 32 +++++++++++++++++++------- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/internal/unionstore/pipelined_memdb.go b/internal/unionstore/pipelined_memdb.go index 53e21554e..156086a1b 100644 --- a/internal/unionstore/pipelined_memdb.go +++ b/internal/unionstore/pipelined_memdb.go @@ -54,6 +54,7 @@ type PipelinedMemDB struct { // Some([...]) -> put // Some([]) -> delete batchGetCache map[string]util.Option[[]byte] + memChangeHook func(uint64) } const ( @@ -237,28 +238,36 @@ func (p *PipelinedMemDB) UpdateFlags(k []byte, ops ...kv.FlagsOp) { func (p *PipelinedMemDB) Set(key, value []byte) error { p.Lock() defer p.Unlock() - return p.memDB.Set(key, value) + err := p.memDB.Set(key, value) + p.onMemChange() + return err } // SetWithFlags sets the value for key k in the MemBuffer with flags. func (p *PipelinedMemDB) SetWithFlags(key, value []byte, ops ...kv.FlagsOp) error { p.Lock() defer p.Unlock() - return p.memDB.SetWithFlags(key, value, ops...) + err := p.memDB.SetWithFlags(key, value, ops...) + p.onMemChange() + return err } // Delete deletes the key k in the MemBuffer. func (p *PipelinedMemDB) Delete(key []byte) error { p.Lock() defer p.Unlock() - return p.memDB.Delete(key) + err := p.memDB.Delete(key) + p.onMemChange() + return err } // DeleteWithFlags deletes the key k in the MemBuffer with flags. func (p *PipelinedMemDB) DeleteWithFlags(key []byte, ops ...kv.FlagsOp) error { p.Lock() defer p.Unlock() - return p.memDB.DeleteWithFlags(key, ops...) + err := p.memDB.DeleteWithFlags(key, ops...) + p.onMemChange() + return err } // Flush is called during execution of a transaction, it does flush when there are enough keys and the ongoing flushingMemDB is done. @@ -303,6 +312,7 @@ func (p *PipelinedMemDB) Flush(force bool) (bool, error) { // this guarantees the onFlushing.Store(true) in another goroutine's Flush happens after onFlushing.Store(false) in this function. p.errCh <- err }(p.generation) + p.onMemChange() return true, nil } @@ -383,13 +393,19 @@ func (p *PipelinedMemDB) OnFlushing() bool { } // SetMemoryFootprintChangeHook sets the hook for memory footprint change. -// TODO: implement this. -func (p *PipelinedMemDB) SetMemoryFootprintChangeHook(hook func(uint64)) {} +func (p *PipelinedMemDB) SetMemoryFootprintChangeHook(hook func(uint64)) { + p.memChangeHook = hook +} + +func (p *PipelinedMemDB) onMemChange() { + if p.memChangeHook != nil { + p.memChangeHook(p.Mem()) + } +} // Mem returns the memory usage of MemBuffer. -// TODO: implement this. func (p *PipelinedMemDB) Mem() uint64 { - return 0 + return p.memDB.Mem() + p.flushingMemDB.Mem() } type errIterator struct {