Skip to content

Commit

Permalink
Merge branch 'master' of github.com:tikv/client-go into config-flush-…
Browse files Browse the repository at this point in the history
…concurrency
  • Loading branch information
ekexium committed Nov 13, 2024
2 parents 8de9ca0 + 70049ae commit 222c260
Show file tree
Hide file tree
Showing 24 changed files with 1,884 additions and 558 deletions.
8 changes: 8 additions & 0 deletions error/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,14 @@ func (e *ErrTxnTooLarge) Error() string {
return fmt.Sprintf("txn too large, size: %v.", e.Size)
}

type ErrKeyTooLarge struct {
KeySize int
}

func (e *ErrKeyTooLarge) Error() string {
return fmt.Sprintf("key size too large, size: %v.", e.KeySize)
}

// ErrEntryTooLarge is the error when a key value entry is too large.
type ErrEntryTooLarge struct {
Limit uint64
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/prometheus/client_model v0.5.0
github.com/stretchr/testify v1.8.2
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a
github.com/tikv/pd/client v0.0.0-20240620115049-049de1761e56
github.com/tikv/pd/client v0.0.0-20241111073742-238d4d79ea31
github.com/twmb/murmur3 v1.1.3
go.etcd.io/etcd/api/v3 v3.5.10
go.etcd.io/etcd/client/v3 v3.5.10
Expand Down
5 changes: 3 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6Mwd
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
Expand All @@ -111,8 +112,8 @@ github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
github.com/tikv/pd/client v0.0.0-20240620115049-049de1761e56 h1:7TLLfwrKoty9UeJMsSopRTXYw8ooxcF0Z1fegXhIgks=
github.com/tikv/pd/client v0.0.0-20240620115049-049de1761e56/go.mod h1:EHHidLItrJGh0jqfdfFhIHG5vwkR8+43tFnp7v7iv1Q=
github.com/tikv/pd/client v0.0.0-20241111073742-238d4d79ea31 h1:oAYc4m5Eu1OY9ogJ103VO47AYPHvhtzbUPD8L8B67Qk=
github.com/tikv/pd/client v0.0.0-20241111073742-238d4d79ea31/go.mod h1:W5a0sDadwUpI9k8p7M77d3jo253ZHdmua+u4Ho4Xw8U=
github.com/twmb/murmur3 v1.1.3 h1:D83U0XYKcHRYwYIpBKf3Pks91Z0Byda/9SJ8B6EMRcA=
github.com/twmb/murmur3 v1.1.3/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/stretchr/testify v1.9.0
github.com/tidwall/gjson v1.14.1
github.com/tikv/client-go/v2 v2.0.8-0.20240626064248-4a72526f6c30
github.com/tikv/pd/client v0.0.0-20240620115049-049de1761e56
github.com/tikv/pd/client v0.0.0-20241111073742-238d4d79ea31
go.uber.org/goleak v1.3.0
)

Expand Down
4 changes: 2 additions & 2 deletions integration_tests/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -474,8 +474,8 @@ github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tikv/pd/client v0.0.0-20240620115049-049de1761e56 h1:7TLLfwrKoty9UeJMsSopRTXYw8ooxcF0Z1fegXhIgks=
github.com/tikv/pd/client v0.0.0-20240620115049-049de1761e56/go.mod h1:EHHidLItrJGh0jqfdfFhIHG5vwkR8+43tFnp7v7iv1Q=
github.com/tikv/pd/client v0.0.0-20241111073742-238d4d79ea31 h1:oAYc4m5Eu1OY9ogJ103VO47AYPHvhtzbUPD8L8B67Qk=
github.com/tikv/pd/client v0.0.0-20241111073742-238d4d79ea31/go.mod h1:W5a0sDadwUpI9k8p7M77d3jo253ZHdmua+u4Ho4Xw8U=
github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs=
github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU=
github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI=
Expand Down
4 changes: 2 additions & 2 deletions internal/client/client_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,13 @@ func buildResourceControlInterceptor(
resp, err := next(target, req)
if resp != nil {
respInfo := resourcecontrol.MakeResponseInfo(resp)
consumption, err = resourceControlInterceptor.OnResponse(resourceGroupName, reqInfo, respInfo)
consumption, waitDuration, err = resourceControlInterceptor.OnResponseWait(ctx, resourceGroupName, reqInfo, respInfo)
if err != nil {
return nil, err
}
if ruDetails != nil {
detail := ruDetails.(*util.RUDetails)
detail.Update(consumption, time.Duration(0))
detail.Update(consumption, waitDuration)
}
}
return resp, err
Expand Down
93 changes: 44 additions & 49 deletions internal/unionstore/art/art.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type ART struct {
len int
size int

// The lastTraversedNode stores addr in uint64 of the last traversed node, include search and recursiveInsert.
// The lastTraversedNode stores addr in uint64 of the last traversed node, includes search and recursiveInsert.
// Compare to atomic.Pointer, atomic.Uint64 can avoid heap allocation, so it's more efficient.
lastTraversedNode atomic.Uint64
hitCount atomic.Uint64
Expand All @@ -68,8 +68,13 @@ func New() *ART {
}

func (t *ART) Get(key []byte) ([]byte, error) {
if t.vlogInvalid {
// panic for easier debugging.
panic("vlog is reset")
}

// 1. search the leaf node.
_, leaf := t.search(key)
_, leaf := t.traverse(key, false)
if leaf == nil || leaf.vAddr.IsNull() {
return nil, tikverr.ErrNotExist
}
Expand All @@ -79,7 +84,7 @@ func (t *ART) Get(key []byte) ([]byte, error) {

// GetFlags returns the latest flags associated with key.
func (t *ART) GetFlags(key []byte) (kv.KeyFlags, error) {
_, leaf := t.search(key)
_, leaf := t.traverse(key, false)
if leaf == nil {
return 0, tikverr.ErrNotExist
}
Expand All @@ -90,6 +95,17 @@ func (t *ART) GetFlags(key []byte) (kv.KeyFlags, error) {
}

func (t *ART) Set(key artKey, value []byte, ops ...kv.FlagsOp) error {
if t.vlogInvalid {
// panic for easier debugging.
panic("vlog is reset")
}

if len(key) > MaxKeyLen {
return &tikverr.ErrKeyTooLarge{
KeySize: len(key),
}
}

if value != nil {
if size := uint64(len(key) + len(value)); size > t.entrySizeLimit {
return &tikverr.ErrEntryTooLarge{
Expand All @@ -98,11 +114,12 @@ func (t *ART) Set(key artKey, value []byte, ops ...kv.FlagsOp) error {
}
}
}

if len(t.stages) == 0 {
t.dirty = true
}
// 1. create or search the existing leaf in the tree.
addr, leaf := t.recursiveInsert(key)
addr, leaf := t.traverse(key, true)
// 2. set the value and flags.
t.setValue(addr, leaf, value, ops)
if uint64(t.Size()) > t.bufferSizeLimit {
Expand All @@ -111,26 +128,30 @@ func (t *ART) Set(key artKey, value []byte, ops ...kv.FlagsOp) error {
return nil
}

// search wraps searchImpl with cache.
func (t *ART) search(key artKey) (arena.MemdbArenaAddr, *artLeaf) {
// traverse wraps search and recursiveInsert with cache.
func (t *ART) traverse(key artKey, insert bool) (arena.MemdbArenaAddr, *artLeaf) {
// check cache
addr, leaf, found := t.checkKeyInCache(key)
if found {
t.hitCount.Add(1)
return addr, leaf
}
t.missCount.Add(1)
addr, leaf = t.searchImpl(key)
if insert {
addr, leaf = t.recursiveInsert(key)
} else {
addr, leaf = t.search(key)
}
if !addr.IsNull() {
t.updateLastTraversed(addr)
}
return addr, leaf
}

// searchImpl looks up the leaf with the given key.
// search looks up the leaf with the given key.
// It returns the memory arena address and leaf itself it there is a match leaf,
// returns arena.NullAddr and nil if the key is not found.
func (t *ART) searchImpl(key artKey) (arena.MemdbArenaAddr, *artLeaf) {
func (t *ART) search(key artKey) (arena.MemdbArenaAddr, *artLeaf) {
current := t.root
if current == nullArtNode {
return arena.NullAddr, nil
Expand Down Expand Up @@ -179,25 +200,9 @@ func (t *ART) searchImpl(key artKey) (arena.MemdbArenaAddr, *artLeaf) {
}
}

// recursiveInsert wraps recursiveInsertImpl with cache.
func (t *ART) recursiveInsert(key artKey) (arena.MemdbArenaAddr, *artLeaf) {
addr, leaf, found := t.checkKeyInCache(key)
if found {
t.hitCount.Add(1)
return addr, leaf
}
t.missCount.Add(1)
addr, leaf = t.recursiveInsertImpl(key)
if !addr.IsNull() {
t.updateLastTraversed(addr)
}
return addr, leaf
}

// recursiveInsertImpl returns the node address of the key.
// recursiveInsert returns the node address of the key.
// It will insert the key if not exists, returns the newly inserted or existing leaf.
func (t *ART) recursiveInsertImpl(key artKey) (arena.MemdbArenaAddr, *artLeaf) {

func (t *ART) recursiveInsert(key artKey) (arena.MemdbArenaAddr, *artLeaf) {
// lazy init root node and allocator.
// this saves memory for read only txns.
if t.root.addr.IsNull() {
Expand Down Expand Up @@ -300,11 +305,7 @@ func (t *ART) expandLeaf(key artKey, depth uint32, prev, current artNode) (arena
newAn.addChild(&t.allocator, l2Key.charAt(int(depth)), !l2Key.valid(int(depth)), leaf2Addr)

// swap the old leaf with the new node4.
if prev == nullArtNode {
t.root = newAn
} else {
prev.replaceChild(&t.allocator, key.charAt(prevDepth), newAn)
}
prev.replaceChild(&t.allocator, key.charAt(prevDepth), newAn)
return leaf2Addr.addr, leaf2
}

Expand All @@ -328,30 +329,25 @@ func (t *ART) expandNode(key artKey, depth, mismatchIdx uint32, prev, current ar
newN4.setPrefix(key[depth:], mismatchIdx)

// update prefix for old node and move it as a child of the new node.
var prefix artKey
if currNode.prefixLen <= maxPrefixLen {
nodeKey := currNode.prefix[mismatchIdx]
currNode.prefixLen -= mismatchIdx + 1
copy(currNode.prefix[:], currNode.prefix[mismatchIdx+1:])
newAn.addChild(&t.allocator, nodeKey, false, current)
// node.prefixLen <= maxPrefixLen means all the prefix is in the prefix array.
// The char at mismatchIdx will be stored in the index of new node.
prefix = currNode.prefix[:]
} else {
currNode.prefixLen -= mismatchIdx + 1
// Unless, we need to find the prefix in the leaf.
// Any leaves in the node should have the same prefix, we use minimum node here.
leafArtNode := minimum(&t.allocator, current)
leaf := leafArtNode.asLeaf(&t.allocator)
leafKey := artKey(leaf.GetKey())
kMin := depth + mismatchIdx + 1
kMax := depth + mismatchIdx + 1 + min(currNode.prefixLen, maxPrefixLen)
copy(currNode.prefix[:], leafKey[kMin:kMax])
newAn.addChild(&t.allocator, leafKey.charAt(int(depth+mismatchIdx)), !leafKey.valid(int(depth)), current)
prefix = leafArtNode.asLeaf(&t.allocator).GetKey()[depth : depth+currNode.prefixLen]
}
nodeChar := prefix[mismatchIdx]
currNode.setPrefix(prefix[mismatchIdx+1:], currNode.prefixLen-mismatchIdx-1)
newAn.addChild(&t.allocator, nodeChar, false, current)

// insert the artLeaf into new node
newLeafAddr, newLeaf := t.newLeaf(key)
newAn.addChild(&t.allocator, key.charAt(int(depth+mismatchIdx)), !key.valid(int(depth+mismatchIdx)), newLeafAddr)
if prev == nullArtNode {
t.root = newAn
} else {
prev.replaceChild(&t.allocator, key.charAt(prevDepth), newAn)
}
prev.replaceChild(&t.allocator, key.charAt(prevDepth), newAn)
return newLeafAddr.addr, newLeaf
}

Expand Down Expand Up @@ -577,7 +573,6 @@ func (t *ART) SelectValueHistory(key []byte, predicate func(value []byte) bool)
return nil, nil
}
return t.allocator.vlogAllocator.GetValue(result), nil

}

func (t *ART) SetMemoryFootprintChangeHook(hook func(uint64)) {
Expand Down
2 changes: 2 additions & 0 deletions internal/unionstore/art/art_arena.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
// reusing blocks reduces the memory pieces.
type nodeArena struct {
arena.MemdbArena
// The ART node will expand to a higher capacity, and the address of the freed node will be stored in the free list for reuse.
// By reusing the freed node, memory usage and fragmentation can be reduced.
freeNode4 []arena.MemdbArenaAddr
freeNode16 []arena.MemdbArenaAddr
freeNode48 []arena.MemdbArenaAddr
Expand Down
Loading

0 comments on commit 222c260

Please sign in to comment.