Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix memory leak and avoid lock when dstore disabled #21

Merged
merged 1 commit into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions cassandra/prefix_switch.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type PrefixSwitcher struct {
lock sync.RWMutex
currentTrieMap map[string]string
cstarEnabled bool
bdbEnabled bool
}

func (s PrefixSwitchStatus) IsReadOnBeansdb() bool {
Expand Down Expand Up @@ -157,30 +158,31 @@ func GetPrefixSwitchTrieFromCfg(
}
}

func NewPrefixSwitcher(config *config.CassandraStoreCfg, cqlStore *CassandraStore) (*PrefixSwitcher, error) {
func NewPrefixSwitcher(cfg *config.ProxyConfig, cqlStore *CassandraStore) (*PrefixSwitcher, error) {
f := new(PrefixSwitcher)

if !config.Enable {
if !cfg.CassandraStoreCfg.Enable {
f.defaultT = PrefixSwitchBrw
f.cstarEnabled = false
return f, nil
}

prefixTrie, nowMap, err := GetPrefixSwitchTrieFromCfg(config, cqlStore)
prefixTrie, nowMap, err := GetPrefixSwitchTrieFromCfg(&cfg.CassandraStoreCfg, cqlStore)
if err != nil {
return nil, err
}

f.trie = prefixTrie
f.cstarEnabled = true

defaultS, err := strToSwitchStatus(config.SwitchToKeyDefault)
defaultS, err := strToSwitchStatus(cfg.SwitchToKeyDefault)
if err != nil {
return nil, err
}

f.defaultT = defaultS
f.currentTrieMap = nowMap
f.bdbEnabled = cfg.DStoreConfig.Enable
return f, nil
}

Expand Down Expand Up @@ -215,6 +217,10 @@ func (s *PrefixSwitcher) matchStatus(key string) PrefixSwitchStatus {
}

func (s *PrefixSwitcher) GetStatus(key string) PrefixSwitchStatus {
if !s.bdbEnabled && s.cstarEnabled {
return PrefixSwitchCrw
}

if !s.cstarEnabled {
return PrefixSwitchBrw
}
Expand All @@ -226,15 +232,25 @@ func (s *PrefixSwitcher) GetStatus(key string) PrefixSwitchStatus {

// check key prefix and return bdb read enable c* read enable
func (s *PrefixSwitcher) ReadEnabledOn(key string) (bool, bool) {
if !s.bdbEnabled && s.cstarEnabled {
return false, true
}

if !s.cstarEnabled {
return true, false
}

status := s.GetStatus(key)
return status.IsReadOnBeansdb(), status.IsReadOnCstar()
}

// check keys prefix list and return bdb read keys and c* read keys
func (s *PrefixSwitcher) ReadEnableOnKeys(keys []string) (bkeys []string, ckeys []string) {
if !s.bdbEnabled && s.cstarEnabled {
ckeys = keys
return
}

if !s.cstarEnabled {
bkeys = keys
return
Expand All @@ -261,6 +277,10 @@ func (s *PrefixSwitcher) ReadEnableOnKeys(keys []string) (bkeys []string, ckeys

// check key prefix and return bdb write enable c* write enable
func (s *PrefixSwitcher) WriteEnabledOn(key string) (bool, bool) {
if !s.bdbEnabled && s.cstarEnabled {
return false, true
}

if !s.cstarEnabled {
return true, false
}
Expand Down
10 changes: 7 additions & 3 deletions dstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (s *Storage) InitStorageEngine(pCfg *config.ProxyConfig) error {

s.cstar = cstar

switcher, err := cassandra.NewPrefixSwitcher(&proxyConf.CassandraStoreCfg, cstar)
switcher, err := cassandra.NewPrefixSwitcher(proxyConf, cstar)
if err != nil {
return err
}
Expand All @@ -63,7 +63,7 @@ func (s *Storage) InitStorageEngine(pCfg *config.ProxyConfig) error {
s.dualWErrHandler = dualWErrHandler
logger.Infof("dual write log send to: %s", s.dualWErrHandler.EFile)
} else {
switcher, err := cassandra.NewPrefixSwitcher(&proxyConf.CassandraStoreCfg, nil)
switcher, err := cassandra.NewPrefixSwitcher(proxyConf, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -269,14 +269,18 @@ func (c *StorageClient) getMulti(keys []string) (rs map[string]*mc.Item, targets
}

func (c *StorageClient) GetMulti(keys []string) (rs map[string]*mc.Item, err error) {
// The keys args MUST BE DEDUPLICATED, if not, there will be MEMORY LEAK
keys = deduplicateKeys(keys)

timer := prometheus.NewTimer(
cmdE2EDurationSeconds.WithLabelValues("getm"),
)
defer timer.ObserveDuration()

bkeys, ckeys := c.pswitcher.ReadEnableOnKeys(keys)

rs = make(map[string]*mc.Item, len(keys))

if len(bkeys) > 0 {
totalReqs.WithLabelValues("getm", "beansdb").Inc()
c.sched = GetScheduler()
Expand Down
21 changes: 21 additions & 0 deletions dstore/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package dstore

func deduplicateKeys(keys []string) []string {
dedup := make(map[string]struct{}, len(keys))

for _, k := range keys {
if _, ok := dedup[k]; ok {
continue
} else {
dedup[k] = struct{}{}
}
}

dedupKs := make([]string, len(dedup))
i := 0
for k := range dedup {
dedupKs[i] = k
i++
}
return dedupKs
}
38 changes: 38 additions & 0 deletions dstore/utils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package dstore

import (
"fmt"
"testing"
)

func TestDeduplicateKeys(t *testing.T) {
test := []string{"a", "b", "c", "d", "a"}
dtk := deduplicateKeys(test)
if (len(dtk) != 4) {
t.Errorf("string slice should be deduplicated: %s", dtk)
}
test2 := []string{"a", "n"}
dtk2 := deduplicateKeys(test2)
if (len(dtk2) != 2) {
t.Errorf("string slice %s has no duplications", test2)
}
t.Logf("after dedup: %s | %s", dtk, dtk2)
}

func BenchmarkDeduplicateKeys(b *testing.B) {
test := []string{
"/frodo_feed/title_vecs/3055:4601087161",
"/frodo_feed/title_vecs/3055:4601087162",
"/frodo_feed/title_vecs/3055:4601087161",
"/frodo_feed/title_vecs/3055:4601087165",
"/frodo_feed/title_vecs/3055:4601087161",
}

for j := 0; j < 200; j++ {
test = append(test, fmt.Sprintf("/frodo_feed/title_vecs/3055:460108716%d", j))
}

for i := 0; i < b.N; i++ {
deduplicateKeys(test)
}
}
Loading