Skip to content

Commit

Permalink
Add readonly rr scheduler test
Browse files Browse the repository at this point in the history
  • Loading branch information
dispensable committed Feb 2, 2024
1 parent 7c558b3 commit c101cf9
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 242 deletions.
14 changes: 9 additions & 5 deletions dstore/read_only_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import (
type RRReadScheduler struct {
hosts []*Host
current atomic.Int32
totalHosts int32
totalHostsI32 int32
totalHosts int
totalHostsF64 float64
quit bool
}

Expand All @@ -23,12 +25,14 @@ func NewRRReadScheduler(route *dbcfg.RouteTable) *RRReadScheduler {
host := NewHost(server.Addr)
rrsche.hosts[idx] = host
}
rrsche.totalHosts = int32(len(rrsche.hosts))
rrsche.totalHosts = len(rrsche.hosts)
rrsche.totalHostsI32 = int32(rrsche.totalHosts)
rrsche.totalHostsF64 = float64(rrsche.totalHosts)
return rrsche
}

func (sch *RRReadScheduler) GetHostsByKey(key string) (hosts []*Host) {
next := sch.current.Add(1) % sch.totalHosts
next := sch.current.Add(1) % sch.totalHostsI32
sch.current.Store(next)
rrrStoreReqs.WithLabelValues(sch.hosts[next].Addr).Inc()
return sch.hosts[next:next+1]
Expand All @@ -47,9 +51,9 @@ func (sch *RRReadScheduler) FeedbackLatency(host *Host, key string, startTime ti

// route some keys to group of hosts
func (sch *RRReadScheduler) DivideKeysByBucket(keys []string) [][]string {
numKeysPer := int(math.Round(float64(len(keys)) / float64(len(sch.hosts))))
numKeysPer := int(math.Round(float64(len(keys)) / sch.totalHostsF64))
rs := make([][]string, len(sch.hosts))
maxEndIdx := len(keys) - 1
maxEndIdx := len(sch.hosts) - 1

startIdx := 0
partIdx := 0
Expand Down
265 changes: 30 additions & 235 deletions dstore/read_only_scheduler_test.go
Original file line number Diff line number Diff line change
@@ -1,248 +1,43 @@
package dstore

import (
"errors"
"flag"
"fmt"
"io/ioutil"
"net"
"os"
"os/exec"
"os/user"
"path"
"path/filepath"
"testing"
"time"

dbcfg "github.com/douban/gobeansdb/gobeansdb"
mc "github.com/douban/gobeansdb/memcache"
yaml "gopkg.in/yaml.v2"

"github.com/douban/gobeansproxy/config"
"github.com/douban/gobeansproxy/dstore"
"github.com/douban/gobeansproxy/utils"
dbcfg "github.com/douban/gobeansdb/config"
"github.com/stretchr/testify/assert"
)

var testDataDir = flag.String("testDataDir", "/tmp/gobeansdbproxy/bdb/data/", "this dir will be used by gobeansdb and proxy")


func setupSuite(tb testing.TB) func(tb testing.TB) {
user, err := user.Current()
if err != nil {
tb.Fatalf("get username err: %s", err)
}
gopath := os.Getenv("GOPATH")
gobeansdbBin := filepath.Join(gopath, "bin", "gobeansdb")

if _, err := os.Stat(gobeansdbBin); errors.Is(err, os.ErrNotExist) {
tb.Fatalf("gobeansdb binary not exists, %s", gobeansdbBin)
}

projDir := utils.GetProjectHomeDir()

allGobeansdb := []*exec.Cmd{}
for _, p := range []string{"57980", "57981", "57982", "57983"} {
conn, _ := net.DialTimeout("tcp", net.JoinHostPort("localhost", p), time.Second)
if conn != nil {
conn.Close()
tb.Logf("%s port already listening ignore start ...", p)
continue
}

// we modify config when developer run test without container
gobeansdbCfg := fmt.Sprintf("%s/.doubanpde/scripts/bdb/gobeansproxy/%s/conf/", projDir, p)
cfgParsed := dbcfg.DBConfig{}
yfile, err := ioutil.ReadFile(filepath.Join(gobeansdbCfg, "global.yaml"))
if err != nil {
tb.Fatal(err)
}
err = yaml.Unmarshal(yfile, &cfgParsed)
if err != nil {
tb.Fatalf("load cfg %s err: %s", gobeansdbCfg, err)
}
dataPath := filepath.Join(*testDataDir, p, user.Username, "data")
logPath := filepath.Join(*testDataDir, p, user.Username, "log")
for _, pp := range []string{dataPath, logPath} {
err = os.MkdirAll(pp, os.ModePerm)
if err != nil {
tb.Fatalf("create dir %s err: %s", pp, err)
}
}
cfgParsed.ServerConfig.AccessLog = filepath.Join(logPath, "access.log")
cfgParsed.ServerConfig.ErrorLog = filepath.Join(logPath, "error.log")
cfgParsed.HStoreConfig.DBLocalConfig.Home = dataPath
gobeansdbTestCfg := fmt.Sprintf("%s/.doubanpde/scripts/bdb/gobeansproxy/%s/testconf/", projDir, p)
err = os.MkdirAll(gobeansdbTestCfg, os.ModePerm)
if err != nil {
tb.Fatalf("create dir %s err: %s", gobeansdbTestCfg, err)
}
c, err := yaml.Marshal(cfgParsed)
if err != nil {
tb.Fatalf("marshal cfg err: %s", err)
}
func TestDivideKeyByHosts(t *testing.T) {
route := new(dbcfg.RouteTable)
route.Main = append(
route.Main, dbcfg.Server{Addr: "127.0.0.1:7700"},
dbcfg.Server{Addr: "127.0.0.1:7701"}, dbcfg.Server{Addr: "127.0.0.1:7702"},
)
InitGlobalManualScheduler(route, 1, NoBucketsRounRobinROSchduler)

dbGlobalCfg := filepath.Join(gobeansdbTestCfg, "global.yaml")
f, err := os.OpenFile(dbGlobalCfg, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755)
if err != nil {
tb.Fatal(err)
}
defer f.Close()
_, err = f.Write(c)
if err != nil {
tb.Fatal(err)
}
routeCfg := filepath.Join(gobeansdbTestCfg, "route.yaml")
rcfg, err := ioutil.ReadFile(filepath.Join(gobeansdbCfg, "route.yaml"))
if err != nil {
tb.Fatal(err)
}
err = ioutil.WriteFile(routeCfg, rcfg, 0644)
if err != nil {
tb.Fatal(err)
}

cmd := exec.Command(
gobeansdbBin,
"-confdir",
gobeansdbTestCfg,
)
if err := cmd.Start(); err != nil {
tb.Fatalf("failed to start %s gobeansdb: %s", p, err)
}
tb.Logf("start %s with pid: %d", cmd, cmd.Process.Pid)
allGobeansdb = append(allGobeansdb, cmd)
rrKeyHostCnt := map[string]int{
"127.0.0.1:7700": 0,
"127.0.0.1:7701": 0,
"127.0.0.1:7702": 0,
}
// wait some time let the server started
time.Sleep(time.Second * 5)

return func(tb testing.TB) {
for _, execCmd := range allGobeansdb {
if err := execCmd.Process.Kill(); err != nil {
tb.Fatalf("failed to kill process %s: %s", execCmd, err)
}
}
for i := 1; i < 100; i++ {
testKeys := []string{}
for j := 0; j < i; j++ {
hosts := globalScheduler.GetHostsByKey("j")
assert.True(t, len(hosts) == 1, "rrr scheduler only return one host for one key")
rrKeyHostCnt[hosts[0].Addr] += 1
testKeys = append(testKeys, "")
}
result := globalScheduler.DivideKeysByBucket(testKeys)
assert.Equal(t, len(route.Main), len(result), "keys should be split part max")
totalK := 0
for _, k := range result {
totalK += len(k)
}
assert.Equal(t, len(testKeys), totalK, "all key must parted")
assert.True(t, len(testKeys[len(testKeys)-1]) - len(testKeys[0]) < 3, "keys cap diff should less than server nums")
}
}

func testClientSet(t *testing.T, c mc.StorageClient, key string, val []byte) {
assert := assert.New(t)
flag := 2
ok, err := clientSet(c, key, val, flag)
setHosts := c.GetSuccessedTargets()
c.Clean()
assert.True(ok)
assert.Nil(err)
assert.True(len(setHosts) > 0)

v, err := c.Get(key)
getHosts := c.GetSuccessedTargets()
c.Clean()

assert.Equal(val, v.Body)
assert.Equal(flag, v.Flag)
assert.Equal(2, len(getHosts))
assert.True(hasIntersection(setHosts, getHosts))
}

func clientSet(c mc.StorageClient, key string, val []byte, flag int) (bool, error) {
item := newItem(flag, val)
defer item.Free()
noreply := false
return c.Set(key, item, noreply)
}

func hasIntersection(arr1 []string, arr2 []string) bool {
for _, i := range arr1 {
for _, j := range arr2 {
if i == j {
return true
}
}
}
return false
}

func testFailStoreClient(t *testing.T, c mc.StorageClient) {
assert := assert.New(t)
key := "/test/fail/client"

_, err := c.Get(key)
assert.NotNil(err)

_, err = c.Set("key", &mc.Item{}, false)
assert.NotNil(err)

_, err = c.GetMulti([]string{"key"})
assert.NotNil(err)
}

func testStoreClient(t *testing.T, c mc.StorageClient) {
assert := assert.New(t)
key1 := "/test/client/1"

r, _ := c.Get(key1)
assert.Nil(r)
assert.True(len(c.GetSuccessedTargets()) > 2)
c.Clean()

// set
key2 := "/test/client/2"
val2 := []byte("value 2")
testClientSet(t, c, key2, val2)

key3 := "/test/client/3"
val3 := []byte("value 3")
testClientSet(t, c, key3, val3)

// get multi
items, _ := c.GetMulti([]string{key1, key2, key3})
c.Clean()
assert.Equal(2, len(items))

keyNum := 100
keys := make([]string, keyNum)
flagm := 3
valm := []byte("value multi")
for i := 0; i < keyNum; i++ {
keys[i] = fmt.Sprintf("/test/client/multi_%d", i)
ok, _ := clientSet(c, keys[i], valm, flagm)
c.Clean()
assert.True(ok)
}
items, err := c.GetMulti(keys)
c.Clean()
assert.Nil(err)
assert.Equal(keyNum, len(items))

// large obj
key4 := "/test/client/4"
val4 := make([]byte, 1024*1000)
testClientSet(t, c, key4, val4)

// delete
key6 := "/test/client/6"
val6 := []byte("value 6")
testClientSet(t, c, key6, val6)
ok, _ := c.Delete(key6)
assert.True(ok)
v6, _ := c.Get(key6)
assert.Nil(v6)
}

func TestDStoreOnly(t *testing.T) {
teardown := setupSuite(t)
defer teardown(t)

homeDir := utils.GetProjectHomeDir()
confdir := path.Join(homeDir, ".doubanpde", "scripts", "bdb", "gobeansproxy", "dstore-only", "conf")
proxyConf := &config.Proxy
proxyConf.Load(confdir)

InitGlobalManualScheduler(config.Route, proxyConf.N, dstore.BucketsManualSchduler)
storage := new(Storage)
storage.InitStorageEngine(proxyConf)
c := NewStorageClient(proxyConf.N, proxyConf.W, proxyConf.R, storage.cstar, storage.PSwitcher, storage.dualWErrHandler)

testStoreClient(t, c)
assert.True(t, rrKeyHostCnt["127.0.0.1:7700"] - rrKeyHostCnt["127.0.0.1:7701"] < 3, "rr should be balanced")
assert.True(t, rrKeyHostCnt["127.0.0.1:7700"] - rrKeyHostCnt["127.0.0.1:7702"] < 3, "rr should be balanced")
}
3 changes: 1 addition & 2 deletions dstore/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
yaml "gopkg.in/yaml.v2"

"github.com/douban/gobeansproxy/config"
"github.com/douban/gobeansproxy/dstore"
"github.com/douban/gobeansproxy/utils"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -239,7 +238,7 @@ func TestDStoreOnly(t *testing.T) {
proxyConf := &config.Proxy
proxyConf.Load(confdir)

InitGlobalManualScheduler(config.Route, proxyConf.N, dstore.BucketsManualSchduler)
InitGlobalManualScheduler(config.Route, proxyConf.N, BucketsManualSchduler)
storage := new(Storage)
storage.InitStorageEngine(proxyConf)
c := NewStorageClient(proxyConf.N, proxyConf.W, proxyConf.R, storage.cstar, storage.PSwitcher, storage.dualWErrHandler)
Expand Down

0 comments on commit c101cf9

Please sign in to comment.