Skip to content

Commit

Permalink
add some documents
Browse files Browse the repository at this point in the history
  • Loading branch information
flowbehappy committed Aug 12, 2024
1 parent 6699cdf commit 00cd0ea
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 66 deletions.
80 changes: 42 additions & 38 deletions utils/dynstream/dynamic_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type cmd struct {
cmd interface{}
}

type addPathCmd[P Path, T Event, D any] struct {
type addPathCmd[P Path, T Event, D Dest] struct {
paths []PathAndDest[P, D]
pis []*pathInfo[P, T, D]
error error
Expand All @@ -53,7 +53,7 @@ type removePathCmd[P Path] struct {
wg sync.WaitGroup
}

type arrangeStreamCmd[P Path, T Event, D any] struct {
type arrangeStreamCmd[P Path, T Event, D Dest] struct {
oldStreams []*stream[P, T, D]

newStreams []*stream[P, T, D]
Expand All @@ -66,7 +66,7 @@ type reportAndScheduleCmd struct {
wg sync.WaitGroup
}

type streamInfo[P Path, T Event, D any] struct {
type streamInfo[P Path, T Event, D Dest] struct {
stream *stream[P, T, D]
streamStat streamStat[P, T, D]
pathMap map[*pathInfo[P, T, D]]struct{}
Expand All @@ -92,14 +92,23 @@ func (si *streamInfo[P, T, D]) period() time.Duration {
return si.streamStat.period
}

type sortedSIs[P Path, T Event, D any] []*streamInfo[P, T, D]
type sortedSIs[P Path, T Event, D Dest] []*streamInfo[P, T, D]

// implement sort.Interface
func (s sortedSIs[P, T, D]) Len() int { return len(s) }
func (s sortedSIs[P, T, D]) Less(i, j int) bool { return s[i].runtime() < s[j].runtime() }
func (s sortedSIs[P, T, D]) Swap(i, j int) { s[i], s[j] = s[j], s[i] }

type dynamicStreamImpl[P Path, T Event, D any] struct {
// This is the implementation of the DynamicStream interface.
// We use two goroutines
// 1. The distributor to distribute the events to the streams
// 2. The scheduler to balance the load of the streams
//
// A stream can handle events from multiple paths.
// Events from the same path are only processed by one particular stream at the same time.
// The scheduler use several strategies to balance the load of the streams, while the final balanace
// actions are moving the paths between the streams.
type dynamicStreamImpl[P Path, T Event, D Dest] struct {
schedulerInterval time.Duration
reportInterval time.Duration
trackTopPaths int
Expand All @@ -124,7 +133,7 @@ type dynamicStreamImpl[P Path, T Event, D any] struct {
distDone sync.WaitGroup
}

func newDynamicStreamImpl[P Path, T Event, D any](
func newDynamicStreamImpl[P Path, T Event, D Dest](
handler Handler[P, T, D],
schedulerInterval time.Duration,
reportInterval time.Duration,
Expand Down Expand Up @@ -169,6 +178,33 @@ func (d *dynamicStreamImpl[P, T, D]) Close() {
d.schdDone.Wait()
}

func (d *dynamicStreamImpl[P, T, D]) AddPath(paths ...PathAndDest[P, D]) error {
if d.hasClosed.Load() {
return NewAppErrorS(ErrorTypeClosed)
}
add := &addPathCmd[P, T, D]{paths: paths}
cmd := &cmd{
cmdType: typeAddPath,
cmd: add,
}
add.wg.Add(2) // need to wait for both scheduler and distributor
d.cmdToSchd <- cmd
add.wg.Wait()
return add.error
}

func (d *dynamicStreamImpl[P, T, D]) RemovePath(paths ...P) []error {
remove := &removePathCmd[P]{paths: paths}
cmd := &cmd{
cmdType: typeRemovePath,
cmd: remove,
}
remove.wg.Add(2) // need to wait for both scheduler and distributor
d.cmdToSchd <- cmd
remove.wg.Wait()
return remove.errors
}

func (d *dynamicStreamImpl[P, T, D]) scheduler() {
defer func() {
close(d.cmdToDist)
Expand Down Expand Up @@ -477,11 +513,6 @@ func (d *dynamicStreamImpl[P, T, D]) scheduler() {
} else {
panic("Unknown rule")
}

// Normally, we don't need to reset the statistics, but who knows.
// for _, si := range d.streamInfos {
// si.streamStat = nil
// }
}

nextSchedule := time.Now().Add(d.schedulerInterval)
Expand Down Expand Up @@ -635,30 +666,3 @@ func (d *dynamicStreamImpl[P, T, D]) distributor() {
}
}
}

func (d *dynamicStreamImpl[P, T, D]) AddPath(paths ...PathAndDest[P, D]) error {
if d.hasClosed.Load() {
return NewAppErrorS(ErrorTypeClosed)
}
add := &addPathCmd[P, T, D]{paths: paths}
cmd := &cmd{
cmdType: typeAddPath,
cmd: add,
}
add.wg.Add(2) // need to wait for both scheduler and distributor
d.cmdToSchd <- cmd
add.wg.Wait()
return add.error
}

func (d *dynamicStreamImpl[P, T, D]) RemovePath(paths ...P) []error {
remove := &removePathCmd[P]{paths: paths}
cmd := &cmd{
cmdType: typeRemovePath,
cmd: remove,
}
remove.wg.Add(2) // need to wait for both scheduler and distributor
d.cmdToSchd <- cmd
remove.wg.Wait()
return remove.errors
}
12 changes: 6 additions & 6 deletions utils/dynstream/dynamic_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ func TestDynamicStreamSchedule(t *testing.T) {
ds.In() <- newSimpleEvent("p1", wg)
ds.In() <- newSimpleEvent("p2", wg)
ds.In() <- newSimpleEvent("p3", wg)
ds.In() <- newSimpleEventSleep("p4", wg, 6*time.Millisecond)
ds.In() <- newSimpleEventSleep("p5", wg, 6*time.Millisecond)
ds.In() <- newSimpleEventSleep("p4", wg, 8*time.Millisecond)
ds.In() <- newSimpleEventSleep("p5", wg, 8*time.Millisecond)
wg.Wait()

scheduleNow(createSoloPath, 8*time.Millisecond)
Expand All @@ -112,7 +112,7 @@ func TestDynamicStreamSchedule(t *testing.T) {
ds.In() <- newSimpleEvent("p2", wg)
ds.In() <- newSimpleEvent("p3", wg)
ds.In() <- newSimpleEvent("p4", wg)
ds.In() <- newSimpleEventSleep("p5", wg, 6*time.Millisecond)
ds.In() <- newSimpleEventSleep("p5", wg, 8*time.Millisecond)
// time.Sleep(8 * time.Millisecond)
wg.Wait()

Expand Down Expand Up @@ -144,9 +144,9 @@ func TestDynamicStreamSchedule(t *testing.T) {
assert.Equal(t, 1, len(ds.streamInfos[3].pathMap)) // p5, Solo stream

wg = &sync.WaitGroup{}
ds.In() <- newSimpleEventSleep("p7", wg, 6*time.Millisecond)
ds.In() <- newSimpleEventSleep("p10", wg, 6*time.Millisecond)
ds.In() <- newSimpleEventSleep("p9", wg, 6*time.Millisecond)
ds.In() <- newSimpleEventSleep("p7", wg, 8*time.Millisecond)
ds.In() <- newSimpleEventSleep("p10", wg, 8*time.Millisecond)
ds.In() <- newSimpleEventSleep("p9", wg, 8*time.Millisecond)
// time.Sleep(8 * time.Millisecond)
wg.Wait()

Expand Down
15 changes: 10 additions & 5 deletions utils/dynstream/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@ type Path comparable
// An event belongs to a path.
type Event any

// A destination is the place where the event is sent to.
type Dest any

// The handler interface. The handler processes the event.
type Handler[P Path, T Event, D any] interface {
type Handler[P Path, T Event, D Dest] interface {
// Get the path of the event. This method is called once for each event.
Path(event T) P
// Handle processes the event.
Expand All @@ -23,7 +26,7 @@ type Handler[P Path, T Event, D any] interface {
Handle(event T, dest D) (await bool)
}

type PathAndDest[P Path, D any] struct {
type PathAndDest[P Path, D Dest] struct {
Path P
Dest D
}
Expand All @@ -32,8 +35,10 @@ type PathAndDest[P Path, D any] struct {
Dynamic stream is a stream that can process events with from different paths concurrently.
- Events from the same path are processed sequentially.
- Events from different paths are processed concurrently.
We assume that the handler is CPU-bound and should not be blocked by any waiting. Otherwise, events from other paths will be blocked.
*/
type DynamicStream[P Path, T Event, D any] interface {
type DynamicStream[P Path, T Event, D Dest] interface {
Start()
Close()
In() chan<- T
Expand All @@ -56,12 +61,12 @@ const DefaultReportInterval = 500 * time.Millisecond
// We don't need lots of streams because the hanle of events should be CPU-bound and should not be blocked by any waiting.
const DefaultStreamCount = 128

func NewDynamicStreamDefault[P Path, T Event, D any](handler Handler[P, T, D]) DynamicStream[P, T, D] {
func NewDynamicStreamDefault[P Path, T Event, D Dest](handler Handler[P, T, D]) DynamicStream[P, T, D] {
streamCount := max(DefaultStreamCount, runtime.NumCPU())
return NewDynamicStream[P, T, D](handler, DefaultSchedulerInterval, DefaultReportInterval, streamCount)
}

func NewDynamicStream[P Path, T Event, D any](
func NewDynamicStream[P Path, T Event, D Dest](
handler Handler[P, T, D],
schedulerInterval time.Duration,
reportInterval time.Duration,
Expand Down
35 changes: 18 additions & 17 deletions utils/dynstream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ var nextReportRound = atomic.Int64{}

// ====== internal types ======

type pathStat[P Path, T Event, D any] struct {
type pathStat[P Path, T Event, D Dest] struct {
pathInfo *pathInfo[P, T, D]
totalTime time.Duration
count int
Expand All @@ -35,7 +35,7 @@ func (p *pathStat[P, T, D]) CompareTo(o *pathStat[P, T, D]) int {
return int(p.totalTime - o.totalTime)
} // It is safe on a 64-bit machine.

type pathInfo[P Path, T Event, D any] struct {
type pathInfo[P Path, T Event, D Dest] struct {
// Note that although this struct is used by multiple goroutines, it doesn't need synchronization because
// different fields are either immutable or accessed by different goroutines.
// We use one struct to store them together to avoid mapping by path in different places in many times.
Expand All @@ -52,7 +52,7 @@ type pathInfo[P Path, T Event, D any] struct {
pathStat *pathStat[P, T, D]
}

func newPathInfo[P Path, T Event, D any](path P, dest D) *pathInfo[P, T, D] {
func newPathInfo[P Path, T Event, D Dest](path P, dest D) *pathInfo[P, T, D] {
pi := &pathInfo[P, T, D]{
path: path,
dest: dest,
Expand All @@ -67,7 +67,7 @@ func (pi *pathInfo[P, T, D]) resetStat() {
(*pi.pathStat) = pathStat[P, T, D]{pathInfo: pi}
}

type streamStat[P Path, T Event, D any] struct {
type streamStat[P Path, T Event, D Dest] struct {
id int

period time.Duration
Expand All @@ -79,7 +79,7 @@ type streamStat[P Path, T Event, D any] struct {
mostBusyPath heap.Heap[*pathStat[P, T, D]]
}

func tryAddPathToBusyHeap[P Path, T Event, D any](heap heap.Heap[*pathStat[P, T, D]], pi *pathStat[P, T, D], trackTop int) {
func tryAddPathToBusyHeap[P Path, T Event, D Dest](heap heap.Heap[*pathStat[P, T, D]], pi *pathStat[P, T, D], trackTop int) {
if heap.Len() < trackTop {
heap.AddOrUpdate(pi)
} else if top, _ := heap.PeekTop(); top.CompareTo(pi) < 0 {
Expand All @@ -89,25 +89,25 @@ func tryAddPathToBusyHeap[P Path, T Event, D any](heap heap.Heap[*pathStat[P, T,
}

// Only contains one kind of event:
// 1. event + pathInfo
// 1. event
// 2. wake = true
type eventWrap[P Path, T Event, D any] struct {
event T
pathInfo *pathInfo[P, T, D]
type eventWrap[P Path, T Event, D Dest] struct {
event T
wake bool

wake bool
pathInfo *pathInfo[P, T, D]
}

func (e eventWrap[P, T, D]) isZero() bool {
return e.pathInfo == nil
}

type eventSignal[P Path, T Event, D any] struct {
type eventSignal[P Path, T Event, D Dest] struct {
pathInfo *pathInfo[P, T, D]
eventCount int
}

type doneInfo[P Path, T Event, D any] struct {
type doneInfo[P Path, T Event, D Dest] struct {
pathInfo *pathInfo[P, T, D]
handleTime time.Duration
}
Expand All @@ -116,7 +116,10 @@ func (d doneInfo[P, T, D]) isZero() bool {
return d.pathInfo == nil
}

type stream[P Path, T Event, D any] struct {
// A stream uses two goroutines
// 1. handleLoop: to handle the events.
// 2. reportStatLoop: to report the statistics.
type stream[P Path, T Event, D Dest] struct {
id int

handler Handler[P, T, D]
Expand All @@ -139,7 +142,7 @@ type stream[P Path, T Event, D any] struct {
reportDone sync.WaitGroup
}

func newStream[P Path, T Event, D any](
func newStream[P Path, T Event, D Dest](
id int,
handler Handler[P, T, D],
reportChan chan streamStat[P, T, D],
Expand Down Expand Up @@ -177,9 +180,7 @@ func (s *stream[P, T, D]) start(acceptedPaths []*pathInfo[P, T, D], formerStream
go s.reportStatLoop()
}

// Close the stream and return the running event.
// Not all of the new streams need to wait for the former stream's handle goroutine to finish.
// Only the streams that are interested in the path of the running event need to wait.
// Close the stream and wait for all goroutines to exit.
func (s *stream[P, T, D]) close() {
if s.hasClosed.CompareAndSwap(false, true) {
close(s.inChan)
Expand Down

0 comments on commit 00cd0ea

Please sign in to comment.