Skip to content

Commit

Permalink
Core (cache) optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
lomik committed Oct 28, 2015
1 parent ee11646 commit d92a5de
Showing 1 changed file with 51 additions and 15 deletions.
66 changes: 51 additions & 15 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ func (c *Cache) SetInputCapacity(size int) {
c.inputCapacity = size
}

// Get any key/values pair from Cache
func (c *Cache) Get() *points.Points {
func (c *Cache) getNext() *points.Points {
for {
size := len(c.queue)
if size == 0 {
Expand All @@ -68,12 +67,31 @@ func (c *Cache) Get() *points.Points {
return values
}
}
return nil
}

func (c *Cache) getAny() *points.Points {
for _, values := range c.data {
return values
}
return nil
}

// Get any key/values pair from Cache
func (c *Cache) Get() *points.Points {
if values := c.getNext(); values != nil {
return values
}

c.updateQueue()

if values := c.getNext(); values != nil {
return values
}

return c.getAny()
}

// Remove key from cache
func (c *Cache) Remove(key string) {
if value, exists := c.data[key]; exists {
Expand All @@ -84,6 +102,9 @@ func (c *Cache) Remove(key string) {

// Pop return and remove next for save point from cache
func (c *Cache) Pop() *points.Points {
if c.size == 0 {
return nil
}
v := c.Get()
if v != nil {
c.Remove(v.Metric)
Expand Down Expand Up @@ -128,12 +149,7 @@ func (c *Cache) stat(metric string, value float64) {
c.queue = append(c.queue, &queueItem{key, 1})
}

// doCheckpoint reorder save queue, add carbon metrics to queue
func (c *Cache) doCheckpoint() {
start := time.Now()

inputLenBeforeCheckpoint := len(c.inputChan)

func (c *Cache) updateQueue() {
newQueue := make(queue, 0)

for key, values := range c.data {
Expand All @@ -143,6 +159,15 @@ func (c *Cache) doCheckpoint() {
sort.Sort(newQueue)

c.queue = newQueue
}

// doCheckpoint reorder save queue, add carbon metrics to queue
func (c *Cache) doCheckpoint() {
start := time.Now()

inputLenBeforeCheckpoint := len(c.inputChan)

c.updateQueue()

inputLenAfterCheckpoint := len(c.inputChan)

Expand Down Expand Up @@ -174,19 +199,30 @@ func (c *Cache) doCheckpoint() {
func (c *Cache) worker() {
var values *points.Points
var sendTo chan *points.Points
var forceReceive bool

forceReceiveThreshold := cap(c.inputChan) / 10

ticker := time.NewTicker(time.Minute)
defer ticker.Stop()

MAIN_LOOP:
for {
if values == nil {

if len(c.inputChan) > forceReceiveThreshold {
forceReceive = true
} else {
forceReceive = false
}

if values == nil && !forceReceive {
values = c.Pop()
}

if values != nil {
sendTo = c.outputChan
} else {
sendTo = nil
}
if values != nil {
sendTo = c.outputChan
} else {
sendTo = nil
}

select {
Expand All @@ -212,7 +248,7 @@ func (c *Cache) worker() {
c.overflowCnt++
}
case <-c.exitChan: // exit
break
break MAIN_LOOP
}
}

Expand Down

0 comments on commit d92a5de

Please sign in to comment.