forked from ozonru/filebeat-throttle-plugin
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbucket_limiter.go
123 lines (101 loc) · 2.85 KB
/
bucket_limiter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package throttleplugin
import (
"fmt"
"io"
"sync"
"time"
)
type BucketLimiter struct {
mu sync.Mutex
bucketInterval int64 // bucket interval in seconds (60 = 1 min)
limit int64 // maximum number of events per bucket
minBucketID int64 // minimum bucket id
buckets []int64
lastUpdate time.Time
}
func NewBucketLimiter(bucketInterval, limit, buckets int64, now time.Time) *BucketLimiter {
return &BucketLimiter{
bucketInterval: bucketInterval,
limit: limit,
minBucketID: timeToBucketID(now, bucketInterval) - buckets + 1,
buckets: make([]int64, buckets),
}
}
// Allow returns TRUE if event is allowed to be processed.
func (bl *BucketLimiter) Allow(t time.Time) bool {
index := timeToBucketID(t, bl.bucketInterval)
bl.mu.Lock()
defer bl.mu.Unlock()
bl.lastUpdate = time.Now()
max := bl.minBucketID + int64(len(bl.buckets)) - 1
if index < bl.minBucketID {
// limiter doesn't track that bucket anymore.
return false
}
if index > max {
// event from new bucket. We need to add N new buckets
n := index - max
for i := 0; int64(i) < n; i++ {
bl.buckets = append(bl.buckets, 0)
}
// remove old ones
bl.buckets = bl.buckets[n:]
// and set new min index
bl.minBucketID += n
}
return bl.increment(index)
}
// LastUpdate returns last Allow method call time.
func (bl *BucketLimiter) LastUpdate() time.Time {
bl.mu.Lock()
defer bl.mu.Unlock()
return bl.lastUpdate
}
// WriteStatus writes text based status into Writer.
func (bl *BucketLimiter) WriteStatus(w io.Writer) error {
bl.mu.Lock()
defer bl.mu.Unlock()
for i, value := range bl.buckets {
fmt.Fprintf(w, "#%s: ", bucketIDToTime(int64(i)+bl.minBucketID, bl.bucketInterval))
progress(w, value, bl.limit, 20)
fmt.Fprintf(w, " %d/%d\n", value, bl.limit)
}
return nil
}
// SetLimit updates limit value.
// Note: it's allowed only to change limit, not bucketInterval.
func (bl *BucketLimiter) SetLimit(limit int64) {
bl.mu.Lock()
bl.limit = limit
bl.mu.Unlock()
}
// increment adds 1 to specified bucket.
// Note: this func is not thread safe, so it must be guarded with lock.
func (сl *BucketLimiter) increment(index int64) bool {
i := index - сl.minBucketID
if сl.buckets[i] >= int64(сl.limit) {
return false
}
сl.buckets[i]++
return true
}
func progress(w io.Writer, current, limit, max int64) {
p := float64(current) / float64(limit) * float64(max)
fmt.Fprint(w, "[")
for i := int64(0); i < max; i++ {
if i < int64(p) {
fmt.Fprint(w, "#")
} else {
fmt.Fprint(w, "_")
}
}
fmt.Fprint(w, "]")
}
// bucketbucketIDToTime converts bucketID to time. This time is start of the bucket.
func bucketIDToTime(id int64, interval int64) time.Time {
return time.Unix(id*interval, 0)
}
// timeToBucketID converts time to bucketID.
func timeToBucketID(t time.Time, interval int64) int64 {
return t.Unix() / interval
}