Skip to content

Commit

Permalink
refactor event broker
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Aug 19, 2024
1 parent b7dc129 commit a67a296
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 32 deletions.
31 changes: 16 additions & 15 deletions pkg/eventservice/event_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,14 @@ func (c *eventBroker) sendWatermark(
}
}

func (c *eventBroker) asyncNotify(change *subscriptionChange) {
select {
case c.changedCh <- change:
default:
// TODO: add metrics to record the drop count.
}
}

func (c *eventBroker) runGenerateScanTask(ctx context.Context) {
c.wg.Add(1)
go func() {
Expand Down Expand Up @@ -314,7 +322,7 @@ func (c *eventBroker) logSlowDispatchers(ctx context.Context) {

func (c *eventBroker) updateMetrics(ctx context.Context) {
c.wg.Add(1)
ticker := time.NewTicker(time.Second * 5)
ticker := time.NewTicker(time.Second * 10)
go func() {
defer c.wg.Done()
log.Info("update metrics goroutine is started")
Expand Down Expand Up @@ -374,8 +382,8 @@ type dispatcherStat struct {
info DispatcherInfo
spanSubscription *spanSubscription
// The watermark of the events that have been sent to the dispatcher.
watermark atomic.Uint64
notify chan *subscriptionChange
watermark atomic.Uint64
onAsyncNotify func(*subscriptionChange)
// The index of the task queue channel in the taskPool.
// We need to make sure the tasks of the same dispatcher are sent to the same task queue
// so that it will be handle by the same scan worker. To ensure all events of the dispatcher
Expand All @@ -390,22 +398,20 @@ type dispatcherStat struct {
}

func newDispatcherStat(
startTs uint64,
info DispatcherInfo,
notify chan *subscriptionChange) *dispatcherStat {
startTs uint64, info DispatcherInfo, onAsyncNotify func(*subscriptionChange),
) *dispatcherStat {
subscription := &spanSubscription{
span: info.GetTableSpan(),
lastUpdate: atomic.Value{},
}

subscription.lastUpdate.Store(time.Now())
subscription.watermark.Store(uint64(startTs))

namespace, id := info.GetChangefeedID()
res := &dispatcherStat{
info: info,
spanSubscription: subscription,
notify: notify,
onAsyncNotify: onAsyncNotify,

metricSorterOutputEventCountKV: metrics.SorterOutputEventCount.WithLabelValues(namespace, id, "kv"),
metricEventServiceSendKvCount: metrics.EventServiceSendEventCount.WithLabelValues(namespace, id, "kv"),
Expand All @@ -428,15 +434,10 @@ func (a *dispatcherStat) onSubscriptionWatermark(watermark uint64) {
}
a.spanSubscription.watermark.Store(watermark)
a.spanSubscription.lastUpdate.Store(time.Now())

sub := &subscriptionChange{
a.onAsyncNotify(&subscriptionChange{
dispatcherInfo: a.info,
eventCount: a.spanSubscription.newEventCount.Swap(0),
}
select {
case a.notify <- sub:
default:
}
})
}

// TODO: consider to use a better way to update the event count, may be we only need to
Expand Down
15 changes: 8 additions & 7 deletions pkg/eventservice/event_broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,9 @@ func TestNewDispatcherStat(t *testing.T) {
startTs: startTs,
}

notify := make(chan *subscriptionChange)

stat := newDispatcherStat(startTs, info, notify)

stat := newDispatcherStat(startTs, info, func(c *subscriptionChange) {})
require.Equal(t, info, stat.info)
require.Equal(t, startTs, stat.watermark.Load())
require.Equal(t, notify, stat.notify)
require.NotNil(t, stat.spanSubscription)
require.Equal(t, startTs, stat.spanSubscription.watermark.Load())
require.Equal(t, 0, int(stat.spanSubscription.newEventCount.Load()))
Expand All @@ -43,7 +39,12 @@ func TestDispatcherStatUpdateWatermark(t *testing.T) {

notify := make(chan *subscriptionChange)

stat := newDispatcherStat(startTs, info, notify)
stat := newDispatcherStat(startTs, info, func(c *subscriptionChange) {
select {
case notify <- c:
default:
}
})

// Case 1: no new events, only watermark change
wg.Add(1)
Expand Down Expand Up @@ -99,7 +100,7 @@ func TestScanTaskPool_PushTask(t *testing.T) {
startTs: 1000,
span: span,
}
dispatcherStat := newDispatcherStat(dispatcherInfo.startTs, dispatcherInfo, make(chan *subscriptionChange))
dispatcherStat := newDispatcherStat(dispatcherInfo.startTs, dispatcherInfo, func(c *subscriptionChange) {})
// Create two tasks with overlapping data ranges
task1 := &scanTask{
dispatcherStat: dispatcherStat,
Expand Down
13 changes: 7 additions & 6 deletions pkg/eventservice/event_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,10 @@ func (s *eventService) registerDispatcher(ctx context.Context, info DispatcherIn
c = newEventBroker(ctx, clusterID, s.eventStore, s.mc)
s.brokers[clusterID] = c
}

dispatcher := newDispatcherStat(startTs, info, c.changedCh)
dispatcher := newDispatcherStat(startTs, info, c.asyncNotify)
c.dispatchers.Store(info.GetID(), dispatcher)
brokerRegisterDuration := time.Since(start)

firstDuration := time.Since(start)
start = time.Now()
c.eventStore.RegisterDispatcher(
info.GetID(),
Expand All @@ -126,9 +125,12 @@ func (s *eventService) registerDispatcher(ctx context.Context, info DispatcherIn
dispatcher.onNewEvent,
dispatcher.onSubscriptionWatermark,
)
eventStoreRegisterDuration := time.Since(start)

secondDuration := time.Since(start)
log.Info("register acceptor", zap.Uint64("clusterID", clusterID), zap.String("acceptorID", info.GetID()), zap.Uint64("tableID", span.TableID), zap.Uint64("startTs", startTs), zap.Duration("firstDuration", firstDuration), zap.Duration("secondDuration", secondDuration))
log.Info("register acceptor", zap.Uint64("clusterID", clusterID),
zap.String("acceptorID", info.GetID()), zap.Uint64("tableID", span.TableID),
zap.Uint64("startTs", startTs), zap.Duration("brokerRegisterDuration", brokerRegisterDuration),
zap.Duration("eventStoreRegisterDuration", eventStoreRegisterDuration))
}

func (s *eventService) deregisterDispatcher(dispatcherInfo DispatcherInfo) {
Expand All @@ -142,7 +144,6 @@ func (s *eventService) deregisterDispatcher(dispatcherInfo DispatcherInfo) {
log.Info("deregister acceptor", zap.Uint64("clusterID", clusterID), zap.String("acceptorID", id))
}

// TODO: implement the following functions
func msgToDispatcherInfo(msg *messaging.TargetMessage) DispatcherInfo {
return msg.Message.(messaging.RegisterDispatcherRequest)
}
6 changes: 2 additions & 4 deletions pkg/eventservice/event_service_performance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestEventServiceOneMillionTable(t *testing.T) {
defer cancel()
wg := &sync.WaitGroup{}
mockStore := newMockEventStore()
tableNum := 1000000
tableNum := 100_0000
sendRound := 10
mc := &mockMessageCenter{
messageCh: make(chan *messaging.TargetMessage, 100),
Expand Down Expand Up @@ -67,9 +67,7 @@ func TestEventServiceOneMillionTable(t *testing.T) {
for i := 0; i < tableNum; i++ {
acceptorInfo := newMockAcceptorInfo(uuid.New().String(), uint64(i))
dispatchers = append(dispatchers, acceptorInfo)
}
for _, dispatcher := range dispatchers {
esImpl.registerDispatcher(ctx, dispatcher)
esImpl.registerDispatcher(ctx, acceptorInfo)
}
log.Info("register 1 million tables", zap.Duration("cost", time.Since(start)))

Expand Down
6 changes: 6 additions & 0 deletions pkg/eventservice/event_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,17 @@ import (
"go.uber.org/zap"
)

var _ messaging.MessageCenter = &mockMessageCenter{}

// mockMessageCenter is a mock implementation of the MessageCenter interface
type mockMessageCenter struct {
messageCh chan *messaging.TargetMessage
}

func (m *mockMessageCenter) OnNodeChanges(newNodes []*common.NodeInfo, removedNodes []*common.NodeInfo) {

}

func (m *mockMessageCenter) SendEvent(event ...*messaging.TargetMessage) error {
for _, e := range event {
m.messageCh <- e
Expand Down

0 comments on commit a67a296

Please sign in to comment.