diff --git a/pkg/eventservice/event_broker.go b/pkg/eventservice/event_broker.go index 9cd7ade09..4a966758b 100644 --- a/pkg/eventservice/event_broker.go +++ b/pkg/eventservice/event_broker.go @@ -110,6 +110,14 @@ func (c *eventBroker) sendWatermark( } } +func (c *eventBroker) asyncNotify(change *subscriptionChange) { + select { + case c.changedCh <- change: + default: + // TODO: add metrics to record the drop count. + } +} + func (c *eventBroker) runGenerateScanTask(ctx context.Context) { c.wg.Add(1) go func() { @@ -314,7 +322,7 @@ func (c *eventBroker) logSlowDispatchers(ctx context.Context) { func (c *eventBroker) updateMetrics(ctx context.Context) { c.wg.Add(1) - ticker := time.NewTicker(time.Second * 5) + ticker := time.NewTicker(time.Second * 10) go func() { defer c.wg.Done() log.Info("update metrics goroutine is started") @@ -374,8 +382,8 @@ type dispatcherStat struct { info DispatcherInfo spanSubscription *spanSubscription // The watermark of the events that have been sent to the dispatcher. - watermark atomic.Uint64 - notify chan *subscriptionChange + watermark atomic.Uint64 + onAsyncNotify func(*subscriptionChange) // The index of the task queue channel in the taskPool. // We need to make sure the tasks of the same dispatcher are sent to the same task queue // so that it will be handle by the same scan worker. To ensure all events of the dispatcher @@ -390,14 +398,12 @@ type dispatcherStat struct { } func newDispatcherStat( - startTs uint64, - info DispatcherInfo, - notify chan *subscriptionChange) *dispatcherStat { + startTs uint64, info DispatcherInfo, onAsyncNotify func(*subscriptionChange), +) *dispatcherStat { subscription := &spanSubscription{ span: info.GetTableSpan(), lastUpdate: atomic.Value{}, } - subscription.lastUpdate.Store(time.Now()) subscription.watermark.Store(uint64(startTs)) @@ -405,7 +411,7 @@ func newDispatcherStat( res := &dispatcherStat{ info: info, spanSubscription: subscription, - notify: notify, + onAsyncNotify: onAsyncNotify, metricSorterOutputEventCountKV: metrics.SorterOutputEventCount.WithLabelValues(namespace, id, "kv"), metricEventServiceSendKvCount: metrics.EventServiceSendEventCount.WithLabelValues(namespace, id, "kv"), @@ -428,15 +434,10 @@ func (a *dispatcherStat) onSubscriptionWatermark(watermark uint64) { } a.spanSubscription.watermark.Store(watermark) a.spanSubscription.lastUpdate.Store(time.Now()) - - sub := &subscriptionChange{ + a.onAsyncNotify(&subscriptionChange{ dispatcherInfo: a.info, eventCount: a.spanSubscription.newEventCount.Swap(0), - } - select { - case a.notify <- sub: - default: - } + }) } // TODO: consider to use a better way to update the event count, may be we only need to diff --git a/pkg/eventservice/event_broker_test.go b/pkg/eventservice/event_broker_test.go index fad8329b5..2e0d9f35d 100644 --- a/pkg/eventservice/event_broker_test.go +++ b/pkg/eventservice/event_broker_test.go @@ -19,13 +19,9 @@ func TestNewDispatcherStat(t *testing.T) { startTs: startTs, } - notify := make(chan *subscriptionChange) - - stat := newDispatcherStat(startTs, info, notify) - + stat := newDispatcherStat(startTs, info, func(c *subscriptionChange) {}) require.Equal(t, info, stat.info) require.Equal(t, startTs, stat.watermark.Load()) - require.Equal(t, notify, stat.notify) require.NotNil(t, stat.spanSubscription) require.Equal(t, startTs, stat.spanSubscription.watermark.Load()) require.Equal(t, 0, int(stat.spanSubscription.newEventCount.Load())) @@ -43,7 +39,12 @@ func TestDispatcherStatUpdateWatermark(t *testing.T) { notify := make(chan *subscriptionChange) - stat := newDispatcherStat(startTs, info, notify) + stat := newDispatcherStat(startTs, info, func(c *subscriptionChange) { + select { + case notify <- c: + default: + } + }) // Case 1: no new events, only watermark change wg.Add(1) @@ -99,7 +100,7 @@ func TestScanTaskPool_PushTask(t *testing.T) { startTs: 1000, span: span, } - dispatcherStat := newDispatcherStat(dispatcherInfo.startTs, dispatcherInfo, make(chan *subscriptionChange)) + dispatcherStat := newDispatcherStat(dispatcherInfo.startTs, dispatcherInfo, func(c *subscriptionChange) {}) // Create two tasks with overlapping data ranges task1 := &scanTask{ dispatcherStat: dispatcherStat, diff --git a/pkg/eventservice/event_service.go b/pkg/eventservice/event_service.go index 3f5093b84..2c8fcfc1c 100644 --- a/pkg/eventservice/event_service.go +++ b/pkg/eventservice/event_service.go @@ -113,11 +113,10 @@ func (s *eventService) registerDispatcher(ctx context.Context, info DispatcherIn c = newEventBroker(ctx, clusterID, s.eventStore, s.mc) s.brokers[clusterID] = c } - - dispatcher := newDispatcherStat(startTs, info, c.changedCh) + dispatcher := newDispatcherStat(startTs, info, c.asyncNotify) c.dispatchers.Store(info.GetID(), dispatcher) + brokerRegisterDuration := time.Since(start) - firstDuration := time.Since(start) start = time.Now() c.eventStore.RegisterDispatcher( info.GetID(), @@ -126,9 +125,12 @@ func (s *eventService) registerDispatcher(ctx context.Context, info DispatcherIn dispatcher.onNewEvent, dispatcher.onSubscriptionWatermark, ) + eventStoreRegisterDuration := time.Since(start) - secondDuration := time.Since(start) - log.Info("register acceptor", zap.Uint64("clusterID", clusterID), zap.String("acceptorID", info.GetID()), zap.Uint64("tableID", span.TableID), zap.Uint64("startTs", startTs), zap.Duration("firstDuration", firstDuration), zap.Duration("secondDuration", secondDuration)) + log.Info("register acceptor", zap.Uint64("clusterID", clusterID), + zap.String("acceptorID", info.GetID()), zap.Uint64("tableID", span.TableID), + zap.Uint64("startTs", startTs), zap.Duration("brokerRegisterDuration", brokerRegisterDuration), + zap.Duration("eventStoreRegisterDuration", eventStoreRegisterDuration)) } func (s *eventService) deregisterDispatcher(dispatcherInfo DispatcherInfo) { @@ -142,7 +144,6 @@ func (s *eventService) deregisterDispatcher(dispatcherInfo DispatcherInfo) { log.Info("deregister acceptor", zap.Uint64("clusterID", clusterID), zap.String("acceptorID", id)) } -// TODO: implement the following functions func msgToDispatcherInfo(msg *messaging.TargetMessage) DispatcherInfo { return msg.Message.(messaging.RegisterDispatcherRequest) } diff --git a/pkg/eventservice/event_service_performance_test.go b/pkg/eventservice/event_service_performance_test.go index 609846e95..4893c0645 100644 --- a/pkg/eventservice/event_service_performance_test.go +++ b/pkg/eventservice/event_service_performance_test.go @@ -26,7 +26,7 @@ func TestEventServiceOneMillionTable(t *testing.T) { defer cancel() wg := &sync.WaitGroup{} mockStore := newMockEventStore() - tableNum := 1000000 + tableNum := 100_0000 sendRound := 10 mc := &mockMessageCenter{ messageCh: make(chan *messaging.TargetMessage, 100), @@ -67,9 +67,7 @@ func TestEventServiceOneMillionTable(t *testing.T) { for i := 0; i < tableNum; i++ { acceptorInfo := newMockAcceptorInfo(uuid.New().String(), uint64(i)) dispatchers = append(dispatchers, acceptorInfo) - } - for _, dispatcher := range dispatchers { - esImpl.registerDispatcher(ctx, dispatcher) + esImpl.registerDispatcher(ctx, acceptorInfo) } log.Info("register 1 million tables", zap.Duration("cost", time.Since(start))) diff --git a/pkg/eventservice/event_service_test.go b/pkg/eventservice/event_service_test.go index 47d9dd0c6..698149416 100644 --- a/pkg/eventservice/event_service_test.go +++ b/pkg/eventservice/event_service_test.go @@ -24,11 +24,17 @@ import ( "go.uber.org/zap" ) +var _ messaging.MessageCenter = &mockMessageCenter{} + // mockMessageCenter is a mock implementation of the MessageCenter interface type mockMessageCenter struct { messageCh chan *messaging.TargetMessage } +func (m *mockMessageCenter) OnNodeChanges(newNodes []*common.NodeInfo, removedNodes []*common.NodeInfo) { + +} + func (m *mockMessageCenter) SendEvent(event ...*messaging.TargetMessage) error { for _, e := range event { m.messageCh <- e