Skip to content

Commit

Permalink
optimize gc
Browse files Browse the repository at this point in the history
  • Loading branch information
flowbehappy committed Aug 10, 2024
1 parent 31fc263 commit 612f01a
Show file tree
Hide file tree
Showing 7 changed files with 367 additions and 265 deletions.
186 changes: 141 additions & 45 deletions utils/dynstream/dnynamic_stream_bech_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,100 +2,196 @@ package dynstream

import (
"fmt"
"runtime"
"sync"
"sync/atomic"
"testing"
)

func runDynamicStream(pathCount int, eventCount int, times int) {
handler := &incHandler{}
type intEvent int

ds := NewDynamicStreamDefault(handler)
ds.Start()
func (e intEvent) Path() int { return int(e) }

for i := 0; i < pathCount; i++ {
ds.AddPath(PathAndDest[D]{Path: Path(fmt.Sprintf("p%d", i)), Dest: D{}})
type intEventHandler struct {
inc *atomic.Int64
times int

wg *sync.WaitGroup
}

func (h *intEventHandler) Handle(event intEvent, dest D) {
for i := 0; i < h.times; i++ {
h.inc.Add(1)
}
h.wg.Done()
}

func prepareDynamicStream(pathCount int, eventCount int, times int) (*DynamicStream[int, intEvent, D], *atomic.Int64, *sync.WaitGroup) {
wg := &sync.WaitGroup{}
wg.Add(eventCount * pathCount)
inc := &atomic.Int64{}

total := &atomic.Int64{}
handler := &intEventHandler{
inc: inc,
times: times,
wg: wg}

sendEvents := func(path Path, wg *sync.WaitGroup) {
for i := 0; i < eventCount; i++ {
ds.In() <- &inc{times: times, n: total, done: wg, path: path}
}
}
ds := NewDynamicStreamDefault(handler)
ds.Start()

for i := 0; i < pathCount; i++ {
sendEvents(Path(fmt.Sprintf("p%d", i)), wg)
}

wg.Wait()

if total.Load() != int64(pathCount*eventCount*times) {
panic("total != pathCount * eventCount * times")
ds.AddPath(PathAndDest[int, D]{Path: i, Dest: D{}})
}

ds.Close()
return ds, inc, wg
}

func runGoroutine(pathCount int, eventCount int, times int) {
handler := &incHandler{}
chans := make([]chan *inc, pathCount)
for i := 0; i < pathCount; i++ {
chans[i] = make(chan *inc, eventCount)
func runDynamicStream(ds *DynamicStream[int, intEvent, D], pathCount int, eventCount int) {
cpuCount := runtime.NumCPU()
step := pathCount / cpuCount
for s := 0; s < cpuCount; s++ {
from := s * step
to := min((s+1)*step, pathCount)
go func(from, to, eventCount int) {
for i := 0; i < eventCount; i++ {
for p := from; p < to; p++ {
ds.In() <- intEvent(p)
}
}
}(from, to, eventCount)
}
// for p := 0; p < pathCount; p++ {
// go func(path int) {
// for i := 0; i < eventCount; i++ {
// ds.In() <- intEvent(path)
// }
// }(p)
// }
}

func prepareGoroutine(pathCount int, eventCount int, times int) ([]chan intEvent, *intEventHandler, *atomic.Int64, *sync.WaitGroup) {
wg := &sync.WaitGroup{}
wg.Add(eventCount * pathCount)
inc := &atomic.Int64{}

sendEvents := func(ch chan *inc, wg *sync.WaitGroup) {
total := &atomic.Int64{}
for i := 0; i < eventCount; i++ {
ch <- &inc{times: times, n: total, done: wg}
}
handler := &intEventHandler{
inc: inc,
times: times,
wg: wg,
}

chans := make([]chan intEvent, pathCount)
for i := 0; i < pathCount; i++ {
go sendEvents(chans[i], wg)
chans[i] = make(chan intEvent, 64)
}

return chans, handler, inc, wg
}

func runGoroutine(chans []chan intEvent, pathCount int, eventCount int, handler *intEventHandler) {
cpuCount := runtime.NumCPU()
step := pathCount / cpuCount
for s := 0; s < cpuCount; s++ {
from := s * step
to := min((s+1)*step, pathCount)
go func(from, to, eventCount int) {
for i := 0; i < eventCount; i++ {
for p := from; p < to; p++ {
chans[p] <- intEvent(p)
}
}
}(from, to, eventCount)
}

// for i := 0; i < pathCount; i++ {
// go func(ch chan intEvent, path int) {
// for i := 0; i < eventCount; i++ {
// ch <- intEvent(path)
// }
// }(chans[i], i)
// }

for i := 0; i < pathCount; i++ {
go func(ch chan *inc) {
go func(ch chan intEvent) {
for e := range ch {
handler.Handle(e, D{})
}
}(chans[i])
}

wg.Wait()
for i := 0; i < pathCount; i++ {
close(chans[i])
}
}

func BenchmarkDSDynamicStream1000x1000x100(b *testing.B) {
func BenchmarkDSDynamicSt1000x1000x100(b *testing.B) {
ds, inc, wg := prepareDynamicStream(1000, 1000, 100)

b.ResetTimer()

for k := 0; k < b.N; k++ {
runDynamicStream(1000, 1000, 100)
inc.Store(0)
runDynamicStream(ds, 1000, 1000)
wg.Wait()

if inc.Load() != int64(1000*1000*100) {
panic(fmt.Sprintf("total: %d, expected: %d", inc.Load(), 1000*1000*100))
}
}

ds.Close()
}

func BenchmarkDSDynamicStream1000000x1000x100(b *testing.B) {
func BenchmarkDSDynamicSt1000000x100x10(b *testing.B) {
ds, inc, wg := prepareDynamicStream(1000000, 100, 10)

b.ResetTimer()

for k := 0; k < b.N; k++ {
runDynamicStream(1000000, 100, 100)
inc.Store(0)
runDynamicStream(ds, 1000000, 100)
wg.Wait()

if inc.Load() != int64(1000000*100*10) {
panic(fmt.Sprintf("total: %d, expected: %d", inc.Load(), 1000000*100*10))
}
}

ds.Close()
}

func BenchmarkDSGroutine1000x1000x100(b *testing.B) {
func BenchmarkDSGoroutine1000x1000x100(b *testing.B) {
chans, handler, inc, wg := prepareGoroutine(1000, 1000, 100)

b.ResetTimer()

for k := 0; k < b.N; k++ {
runGoroutine(1000, 1000, 100)
inc.Store(0)
runGoroutine(chans, 1000, 1000, handler)
wg.Wait()

if inc.Load() != int64(1000*1000*100) {
panic(fmt.Sprintf("total: %d, expected: %d", inc.Load(), 1000*1000*100))
}
}

for _, c := range chans {
close(c)
}
}

func BenchmarkDSGroutine1000000x1000x100(b *testing.B) {
func BenchmarkDSGoroutine1000000x100x10(b *testing.B) {
chans, handler, inc, wg := prepareGoroutine(1000000, 100, 10)

b.ResetTimer()

for k := 0; k < b.N; k++ {
runGoroutine(1000000, 100, 100)
inc.Store(0)
runGoroutine(chans, 1000000, 100, handler)
wg.Wait()

if inc.Load() != int64(1000000*100*10) {
panic(fmt.Sprintf("total: %d, expected: %d", inc.Load(), 1000000*100*10))
}
}

for _, c := range chans {
close(c)
}
}
Loading

0 comments on commit 612f01a

Please sign in to comment.