Skip to content

Commit

Permalink
upgrade cache pool
Browse files Browse the repository at this point in the history
  • Loading branch information
huangjunshi committed May 21, 2021
1 parent b0a4ded commit d117f6d
Show file tree
Hide file tree
Showing 6 changed files with 213 additions and 91 deletions.
7 changes: 3 additions & 4 deletions etc/local/cache.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
# pool configs
_pool: &_pool
pool_init_size: 5
pool_max_active: 10
pool_max_idle: 5
pool_min_idle: 0
pool_init_size: 10
pool_idle_min_size: 5
pool_idle_timeout_min: 5

# drivers
drivers:
Expand Down
21 changes: 7 additions & 14 deletions lib/example/gcacheclient/example_gcacheclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,21 +54,14 @@ func TestDriver() {
cache.Set("test11", "test1")
cache.Close()

// gutil.Dump(cache.Set("test1", "test1"))
// gutil.Dump(cache.SetTTL("test2", "test2", 5*time.Second))
// gutil.Dump(cache.Get("test1"))
// gutil.Dump(cache.Del("test1"))
// test connection timeout
// for i := 0; i < 5; i++ {
// time.Sleep(10 * time.Second)
// cache = gcache.D("default")
// cache.Set("test11", "test1")
// cache.Close()
// }

// incr, _ := cache.Incr("incr")
// gutil.Dump("IncrD:", incr)

// incr, _ = cache.Incr("incr")
// gutil.Dump("IncrD:", incr)

// incrBy, _ := cache.IncrBy("incr", 5)
// gutil.Dump("IncrByD:", incrBy)

cache.Close()
}

// TestRegion :
Expand Down
14 changes: 6 additions & 8 deletions lib/gcache/driver/gcache_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@ type Driver struct {
Algo string
Nodes []string
// pool attrs start
PoolInitSize int
PoolMaxActive int
PoolMaxIdle int
PoolMinIdle int
PoolInitSize int
PoolIdleMinSize int
PoolIdleTimeoutMin int
// pool attrs end
}

Expand Down Expand Up @@ -49,10 +48,9 @@ func Init() bool {
Algo: _cDriver["algo"].(string),
Nodes: _cNodes,
// pool attrs start
PoolInitSize: _cDriver["pool_init_size"].(int),
PoolMaxActive: _cDriver["pool_max_active"].(int),
PoolMaxIdle: _cDriver["pool_max_idle"].(int),
PoolMinIdle: _cDriver["pool_min_idle"].(int),
PoolInitSize: _cDriver["pool_init_size"].(int),
PoolIdleMinSize: _cDriver["pool_idle_min_size"].(int),
PoolIdleTimeoutMin: _cDriver["pool_idle_timeout_min"].(int),
// pool attrs end
}
// check driver
Expand Down
85 changes: 21 additions & 64 deletions lib/gcache/pool/gcache_pool.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package gcachepool

import (
"errors"
"strconv"
"strings"
"sync"
"time"

"github.com/go-redis/redis"
base "github.com/jameschz/go-base/lib/base"
Expand All @@ -16,9 +15,7 @@ import (
var (
_debugStatus bool
_cPoolInit bool
_cPoolIdle map[string]*base.Stack
_cPoolActive map[string]*base.Hmap
_cPoolLock sync.Mutex
_cPool *base.Hmap
)

// private
Expand Down Expand Up @@ -51,10 +48,21 @@ func createDataSource(driver *gcachedriver.Driver, node string) *gcachebase.Data
}
//add end
ds.RedisConn = redis.NewClient(&redis.Options{
// Addr: node,
// Basic Settings
Addr: addr,
DB: db,
// Pool Settings
PoolSize: driver.PoolIdleMinSize,
MinIdleConns: driver.PoolIdleMinSize,
IdleTimeout: time.Duration(driver.PoolIdleTimeoutMin * int(time.Minute)),
})
if ds.RedisConn == nil {
panic("gcache> new redis client error")
}
_, err := ds.RedisConn.Ping().Result()
if err != nil {
panic("gcache> ping redis error : " + err.Error())
}
}
// for debug
debugPrint("gcachepool.createDataSource", ds)
Expand All @@ -63,8 +71,7 @@ func createDataSource(driver *gcachedriver.Driver, node string) *gcachebase.Data

// private
func releaseDataSource(ds *gcachebase.DataSource) {
if ds.RedisConn != nil {
ds.RedisConn.Close()
if ds != nil {
ds = nil
}
// for debug
Expand All @@ -90,21 +97,16 @@ func Init() (err error) {
// init drivers
gcachedriver.Init()
// init pool by drivers
_cPool = base.NewHmap()
cDrivers := gcachedriver.GetDrivers()
_cPoolIdle = make(map[string]*base.Stack, 0)
_cPoolActive = make(map[string]*base.Hmap, 0)
for cName, cDriver := range cDrivers {
for _, cNode := range cDriver.Nodes {
cKey := getDataSourceKey(cName, cNode)
_cPoolIdle[cKey] = base.NewStack()
_cPoolActive[cKey] = base.NewHmap()
for i := 0; i < cDriver.PoolInitSize; i++ {
_cPoolIdle[cKey].Push(createDataSource(cDriver, cNode))
}
_cPool.Set(cKey, createDataSource(cDriver, cNode))
}
}
// for debug
debugPrint("gcachepool.Init", _cPoolIdle, _cPoolActive)
debugPrint("gcachepool.Init", _cPool)
// init ok status
if err == nil {
_cPoolInit = true
Expand All @@ -117,59 +119,14 @@ func Fetch(cName string, cNode string) (ds *gcachebase.DataSource, err error) {
// get data source key
cKey := getDataSourceKey(cName, cNode)
// get driver by name
cDriver := gcachedriver.GetDriver(cName)
// fetch start >>> lock
_cPoolLock.Lock()
// reach to max active size
activeSize := _cPoolActive[cKey].Len()
if cDriver.PoolMaxActive <= activeSize {
return nil, errors.New("gcachepool : max active limit")
}
// add if not enough
idleSize := _cPoolIdle[cKey].Len()
if cDriver.PoolMinIdle >= idleSize {
idleSizeAdd := cDriver.PoolMaxIdle - idleSize
for i := 0; i < idleSizeAdd; i++ {
_cPoolIdle[cKey].Push(createDataSource(cDriver, cNode))
}
// for debug
debugPrint("gcachepool.Fetch Add", _cPoolIdle[cKey].Len(), _cPoolActive[cKey].Len())
}
// fetch from front
if _cPoolIdle[cKey].Len() >= 1 {
ds = _cPoolIdle[cKey].Pop().(*gcachebase.DataSource)
_cPoolActive[cKey].Set(ds.ID, ds)
} else {
return nil, errors.New("gcachepool : no enough ds")
}
// for debug
debugPrint("gcachepool.Fetch", _cPoolIdle[cKey].Len(), _cPoolActive[cKey].Len())
// fetch end >>> unlock
_cPoolLock.Unlock()
ds = _cPool.Get(cKey).(*gcachebase.DataSource)
// return ds 0
return ds, err
}

// Return : public
func Return(ds *gcachebase.DataSource) (err error) {
// get data source key
cKey := getDataSourceKey(ds.Name, ds.Node)
// get driver by name
cDriver := gcachedriver.GetDriver(ds.Name)
// return start >>> lock
_cPoolLock.Lock()
// delete from active list
_cPoolActive[cKey].Delete(ds.ID)
// return or release
idleSize := _cPoolIdle[cKey].Len()
if cDriver.PoolMaxIdle <= idleSize {
releaseDataSource(ds)
} else {
_cPoolIdle[cKey].Push(ds)
}
// return end >>> unlock
_cPoolLock.Unlock()
// for debug
debugPrint("gcachepool.Return", _cPoolIdle[cKey].Len(), _cPoolActive[cKey].Len())
// release datasource
releaseDataSource(ds)
return err
}
175 changes: 175 additions & 0 deletions lib/gcache/pool/gcache_pool.go.old
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
package gcachepool

import (
"errors"
"strconv"
"strings"
"sync"

"github.com/go-redis/redis"
base "github.com/jameschz/go-base/lib/base"
gcachebase "github.com/jameschz/go-base/lib/gcache/base"
gcachedriver "github.com/jameschz/go-base/lib/gcache/driver"
gutil "github.com/jameschz/go-base/lib/gutil"
)

var (
_debugStatus bool
_cPoolInit bool
_cPoolIdle map[string]*base.Stack
_cPoolActive map[string]*base.Hmap
_cPoolLock sync.Mutex
)

// private
func debugPrint(vals ...interface{}) {
if _debugStatus == true {
gutil.Dump(vals...)
}
}

// private
func createDataSource(driver *gcachedriver.Driver, node string) *gcachebase.DataSource {
ds := &gcachebase.DataSource{}
ds.Name = driver.Name
ds.Node = node
ds.ID = gutil.UUID()
switch driver.Type {
case "redis":
//add begin (db select)
addr := node
db := 0
nodeArr := strings.Split(node, ":")
if len(nodeArr) > 2 {
var build strings.Builder
build.WriteString(nodeArr[0])
build.WriteString(":")
build.WriteString(nodeArr[1])

addr = build.String()
db, _ = strconv.Atoi(nodeArr[2])
}
//add end
ds.RedisConn = redis.NewClient(&redis.Options{
// Addr: node,
Addr: addr,
DB: db,
})
}
// for debug
debugPrint("gcachepool.createDataSource", ds)
return ds
}

// private
func releaseDataSource(ds *gcachebase.DataSource) {
if ds.RedisConn != nil {
ds.RedisConn.Close()
ds = nil
}
// for debug
debugPrint("gcachepool.releaseDataSource", ds)
}

// private
func getDataSourceKey(name string, node string) string {
return name + ":" + node
}

// SetDebug : public
func SetDebug(status bool) {
_debugStatus = status
}

// Init : public
func Init() (err error) {
// init once
if _cPoolInit == true {
return nil
}
// init drivers
gcachedriver.Init()
// init pool by drivers
cDrivers := gcachedriver.GetDrivers()
_cPoolIdle = make(map[string]*base.Stack, 0)
_cPoolActive = make(map[string]*base.Hmap, 0)
for cName, cDriver := range cDrivers {
for _, cNode := range cDriver.Nodes {
cKey := getDataSourceKey(cName, cNode)
_cPoolIdle[cKey] = base.NewStack()
_cPoolActive[cKey] = base.NewHmap()
for i := 0; i < cDriver.PoolInitSize; i++ {
_cPoolIdle[cKey].Push(createDataSource(cDriver, cNode))
}
}
}
// for debug
debugPrint("gcachepool.Init", _cPoolIdle, _cPoolActive)
// init ok status
if err == nil {
_cPoolInit = true
}
return err
}

// Fetch : public
func Fetch(cName string, cNode string) (ds *gcachebase.DataSource, err error) {
// get data source key
cKey := getDataSourceKey(cName, cNode)
// get driver by name
cDriver := gcachedriver.GetDriver(cName)
// fetch start >>> lock
_cPoolLock.Lock()
// reach to max active size
activeSize := _cPoolActive[cKey].Len()
if cDriver.PoolMaxActive <= activeSize {
return nil, errors.New("gcachepool : max active limit")
}
// add if not enough
idleSize := _cPoolIdle[cKey].Len()
if cDriver.PoolMinIdle >= idleSize {
idleSizeAdd := cDriver.PoolMaxIdle - idleSize
for i := 0; i < idleSizeAdd; i++ {
_cPoolIdle[cKey].Push(createDataSource(cDriver, cNode))
}
// for debug
debugPrint("gcachepool.Fetch Add", _cPoolIdle[cKey].Len(), _cPoolActive[cKey].Len())
}
// fetch from front
if _cPoolIdle[cKey].Len() >= 1 {
ds = _cPoolIdle[cKey].Pop().(*gcachebase.DataSource)
_cPoolActive[cKey].Set(ds.ID, ds)
} else {
return nil, errors.New("gcachepool : no enough ds")
}
// for debug
debugPrint("gcachepool.Fetch", _cPoolIdle[cKey].Len(), _cPoolActive[cKey].Len())
// fetch end >>> unlock
_cPoolLock.Unlock()
// return ds 0
return ds, err
}

// Return : public
func Return(ds *gcachebase.DataSource) (err error) {
// get data source key
cKey := getDataSourceKey(ds.Name, ds.Node)
// get driver by name
cDriver := gcachedriver.GetDriver(ds.Name)
// return start >>> lock
_cPoolLock.Lock()
// delete from active list
_cPoolActive[cKey].Delete(ds.ID)
// return or release
idleSize := _cPoolIdle[cKey].Len()
if cDriver.PoolMaxIdle <= idleSize {
releaseDataSource(ds)
} else {
_cPoolIdle[cKey].Push(ds)
}
// return end >>> unlock
_cPoolLock.Unlock()
// for debug
debugPrint("gcachepool.Return", _cPoolIdle[cKey].Len(), _cPoolActive[cKey].Len())
return err
}
2 changes: 1 addition & 1 deletion lib/gdo/pool/gdo_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func Fetch(dbName string) (ds *gdobase.DataSource, err error) {

// Return : public
func Return(ds *gdobase.DataSource) (err error) {
// return start >>> lock
// release datasource
releaseDataSource(ds)
// return 0
return err
Expand Down

0 comments on commit d117f6d

Please sign in to comment.