Skip to content

Commit

Permalink
optimize time.After
Browse files Browse the repository at this point in the history
  • Loading branch information
flowbehappy committed Aug 9, 2024
1 parent 976ec2a commit 31fc263
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 4 deletions.
8 changes: 7 additions & 1 deletion utils/dynstream/dnynamic_stream_bech_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ func runDynamicStream(pathCount int, eventCount int, times int) {
wg := &sync.WaitGroup{}
wg.Add(eventCount * pathCount)

total := &atomic.Int64{}

sendEvents := func(path Path, wg *sync.WaitGroup) {
total := &atomic.Int64{}
for i := 0; i < eventCount; i++ {
ds.In() <- &inc{times: times, n: total, done: wg, path: path}
}
Expand All @@ -32,6 +33,11 @@ func runDynamicStream(pathCount int, eventCount int, times int) {
}

wg.Wait()

if total.Load() != int64(pathCount*eventCount*times) {
panic("total != pathCount * eventCount * times")
}

ds.Close()
}

Expand Down
6 changes: 4 additions & 2 deletions utils/dynstream/dynamic_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ type DynamicStream[T Event, D any] struct {
}

func NewDynamicStreamDefault[T Event, D any](handler Handler[T, D]) *DynamicStream[T, D] {
streamCount := max(DefaultStreamCount, runtime.NumCPU()*2)
streamCount := max(DefaultStreamCount, runtime.NumCPU())
return NewDynamicStream[T, D](handler, DefaultSchedulerInterval, DefaultReportInterval, streamCount)
}

Expand Down Expand Up @@ -499,6 +499,7 @@ func (d *DynamicStream[T, D]) scheduler() {
}

nextSchedule := time.Now().Add(d.schedulerInterval)
timerChan := time.After(time.Until(nextSchedule))
Loop:
for {
select {
Expand Down Expand Up @@ -583,8 +584,9 @@ Loop:
continue
}
si.streamStat = stat
case <-time.After(time.Until(nextSchedule)):
case <-timerChan:
nextSchedule = time.Now().Add(d.schedulerInterval)
timerChan = time.After(time.Until(nextSchedule))
doSchedule(ruleType(scheduleRule.Next()), 0)
}
}
Expand Down
5 changes: 4 additions & 1 deletion utils/dynstream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ func (s *stream[T, D]) reportStatLoop() {

lastReportTime := time.Now()
nextReportTime := lastReportTime.Add(s.reportInterval)
reportWait := time.After(time.Until(nextReportTime))

reportRound := nextReportRound.Add(1)

handleCount := 0
Expand Down Expand Up @@ -273,11 +275,12 @@ func (s *stream[T, D]) reportStatLoop() {

lastReportTime = time.Now()
nextReportTime = lastReportTime.Add(s.reportInterval)
reportWait = time.After(time.Until(nextReportTime))
}

for {
select {
case <-time.After(time.Until(nextReportTime)):
case <-reportWait:
reportStat()
case <-s.reportNow:
reportStat()
Expand Down

0 comments on commit 31fc263

Please sign in to comment.