Skip to content

Commit

Permalink
feat: implement Mem() and SetMemoryFootprintChangeHook
Browse files Browse the repository at this point in the history
Signed-off-by: ekexium <[email protected]>
  • Loading branch information
ekexium committed Mar 17, 2024
1 parent 98a7df8 commit c1658d0
Showing 1 changed file with 24 additions and 8 deletions.
32 changes: 24 additions & 8 deletions internal/unionstore/pipelined_memdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type PipelinedMemDB struct {
// Some([...]) -> put
// Some([]) -> delete
batchGetCache map[string]util.Option[[]byte]
memChangeHook func(uint64)
}

const (
Expand Down Expand Up @@ -237,28 +238,36 @@ func (p *PipelinedMemDB) UpdateFlags(k []byte, ops ...kv.FlagsOp) {
func (p *PipelinedMemDB) Set(key, value []byte) error {
p.Lock()
defer p.Unlock()
return p.memDB.Set(key, value)
err := p.memDB.Set(key, value)
p.onMemChange()
return err
}

// SetWithFlags sets the value for key k in the MemBuffer with flags.
func (p *PipelinedMemDB) SetWithFlags(key, value []byte, ops ...kv.FlagsOp) error {
p.Lock()
defer p.Unlock()
return p.memDB.SetWithFlags(key, value, ops...)
err := p.memDB.SetWithFlags(key, value, ops...)
p.onMemChange()
return err
}

// Delete deletes the key k in the MemBuffer.
func (p *PipelinedMemDB) Delete(key []byte) error {
p.Lock()
defer p.Unlock()
return p.memDB.Delete(key)
err := p.memDB.Delete(key)
p.onMemChange()
return err
}

// DeleteWithFlags deletes the key k in the MemBuffer with flags.
func (p *PipelinedMemDB) DeleteWithFlags(key []byte, ops ...kv.FlagsOp) error {
p.Lock()
defer p.Unlock()
return p.memDB.DeleteWithFlags(key, ops...)
err := p.memDB.DeleteWithFlags(key, ops...)
p.onMemChange()
return err
}

// Flush is called during execution of a transaction, it does flush when there are enough keys and the ongoing flushingMemDB is done.
Expand Down Expand Up @@ -303,6 +312,7 @@ func (p *PipelinedMemDB) Flush(force bool) (bool, error) {
// this guarantees the onFlushing.Store(true) in another goroutine's Flush happens after onFlushing.Store(false) in this function.
p.errCh <- err
}(p.generation)
p.onMemChange()
return true, nil
}

Expand Down Expand Up @@ -383,13 +393,19 @@ func (p *PipelinedMemDB) OnFlushing() bool {
}

// SetMemoryFootprintChangeHook sets the hook for memory footprint change.
// TODO: implement this.
func (p *PipelinedMemDB) SetMemoryFootprintChangeHook(hook func(uint64)) {}
func (p *PipelinedMemDB) SetMemoryFootprintChangeHook(hook func(uint64)) {
p.memChangeHook = hook
}

func (p *PipelinedMemDB) onMemChange() {
if p.memChangeHook != nil {
p.memChangeHook(p.Mem())
}
}

// Mem returns the memory usage of MemBuffer.
// TODO: implement this.
func (p *PipelinedMemDB) Mem() uint64 {
return 0
return p.memDB.Mem() + p.flushingMemDB.Mem()
}

type errIterator struct {
Expand Down

0 comments on commit c1658d0

Please sign in to comment.