-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdistcache.go
144 lines (123 loc) · 2.71 KB
/
distcache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package distCache
import (
"errors"
"github.com/jasonzhao47/distCache/internal/lru"
"log"
"sync"
)
type Getter interface {
Get(string) ([]byte, error)
}
type GetterFunc func(key string) ([]byte, error)
func (fn GetterFunc) Get(key string) ([]byte, error) {
return fn(key)
}
type GroupCache struct {
getter Getter
// getter func - callback function for data source
// get the real data out from it
name string
cache cache[string, ByteView]
}
func (c *GroupCache) loadFromLocal(key string) (ByteView, error) {
val, err := c.getter.Get(key)
if err != nil {
return NewByteView([]byte{}), err
}
buf := NewByteView(val)
c.cache.add(key, buf)
return buf, nil
}
func (c *GroupCache) getFromPeer(peerName string, key byte) (ByteView, error) {
//TODO implement me
panic("implement me")
}
var (
ErrInternalError = errors.New("Internal error")
// global variable - shared memory!
groups map[string]*GroupCache
mu sync.RWMutex
)
// DistCache question: how to test concurrency?
type DistCache interface {
Get(name string, key string) (ByteView, error)
}
func NewGroups(getter Getter, name string, cacheBytes int) DistCache {
if getter == nil {
panic("empty data getter")
}
mu.Lock()
defer mu.Unlock()
gc := &GroupCache{
getter: getter,
name: name,
cache: cache[string, ByteView]{cacheBytes: cacheBytes},
}
groups[name] = gc
return gc
}
func GetGroup(name string) (*GroupCache, error) {
mu.RLock()
defer mu.RUnlock()
group, ok := groups[name]
if !ok {
return nil, ErrInternalError
}
return group, nil
}
func (c *GroupCache) Get(name string, key string) (ByteView, error) {
g, err := GetGroup(name)
if err != nil {
return NewByteView([]byte{}), err
}
val, ok := g.cache.get(key)
if !ok {
log.Printf("[Cache] not hit %v\n", val)
// val, peerOk := GetFromPeer()
// if !peerOk
// How to use this getter?
data, err := c.loadFromLocal(key)
if err != nil {
return nil, err
}
return data, nil
// problem: determine which node to emplace this data?
}
log.Printf("[Cache] hit %v\n", val)
return val, nil
}
// cache: wrapper for LRUCache, thread safe
type cache[T comparable, V interface{}] struct {
lru lru.LRUCache[T, V]
mu sync.Mutex
cacheBytes int
}
func (c *cache[T, V]) add(key T, value V) {
c.mu.Lock()
defer c.mu.Unlock()
if c.lru == nil {
lruCache, err := lru.New[T, V](c.cacheBytes)
if err != nil {
panic(err)
}
c.lru = lruCache
}
c.lru.Add(key, value)
return
}
func (c *cache[T, V]) get(key T) (value V, evicted bool) {
c.mu.Lock()
defer c.mu.Unlock()
if c.lru == nil {
return
}
if v, ok := c.lru.Get(key); ok {
return v, ok
}
return
}
func (c *cache[T, V]) removeOldest() {
c.mu.Lock()
defer c.mu.Unlock()
c.lru.RemoveOldest()
}