diff --git a/pkg/eventservice/event_broker.go b/pkg/eventservice/event_broker.go index 02c0f5164..03da82b5a 100644 --- a/pkg/eventservice/event_broker.go +++ b/pkg/eventservice/event_broker.go @@ -3,6 +3,7 @@ package eventservice import ( "context" "hash/crc32" + "strconv" "sync" "sync/atomic" "time" @@ -59,12 +60,14 @@ type eventBroker struct { // and a goroutine is responsible for sending the message to the dispatchers. messageCh chan wrapEvent resolvedTsCaches map[messaging.ServerId]*resolvedTsCache + notifyCh chan *spanSubscription // wg is used to spawn the goroutines. wg *sync.WaitGroup // cancel is used to cancel the goroutines spawned by the eventBroker. cancel context.CancelFunc + metricDispatcherCount prometheus.Gauge metricEventServicePullerResolvedTs prometheus.Gauge metricEventServiceDispatcherResolvedTs prometheus.Gauge metricEventServiceResolvedTsLag prometheus.Gauge @@ -86,6 +89,7 @@ func newEventBroker( eventStore: eventStore, mounter: mounter.NewMounter(tz), schemaStore: schemaStore, + notifyCh: make(chan *spanSubscription, defaultChannelSize*16), dispatchers: sync.Map{}, tableTriggerDispatchers: sync.Map{}, spans: make(map[common.TableID]*spanSubscription), @@ -96,6 +100,7 @@ func newEventBroker( resolvedTsCaches: make(map[messaging.ServerId]*resolvedTsCache), cancel: cancel, wg: wg, + metricDispatcherCount: metrics.EventServiceDispatcherGuage.WithLabelValues(strconv.FormatUint(id, 10)), metricEventServicePullerResolvedTs: metrics.EventServiceResolvedTsGauge, metricEventServiceResolvedTsLag: metrics.EventServiceResolvedTsLagGauge.WithLabelValues("puller"), metricEventServiceDispatcherResolvedTs: metrics.EventServiceResolvedTsLagGauge.WithLabelValues("dispatcher"), @@ -105,6 +110,7 @@ func newEventBroker( c.tickTableTriggerDispatchers(ctx) c.runSendMessageWorker(ctx) c.updateMetrics(ctx) + c.runGenTasks(ctx) return c } @@ -142,6 +148,26 @@ func (c *eventBroker) runScanWorker(ctx context.Context) { } } +func (c *eventBroker) runGenTasks(ctx context.Context) { + go func() { + for { + select { + case <-ctx.Done(): + return + case s := <-c.notifyCh: + s.dispatchers.RLock() + defer s.dispatchers.RUnlock() + for _, stat := range s.dispatchers.m { + c.taskPool.pushTask(&scanTask{ + dispatcherStat: stat, + createTime: time.Now(), + }) + } + } + } + }() +} + // TODO: maybe event driven model is better. It is coupled with the detail implementation of // the schemaStore, we will refactor it later. func (c *eventBroker) tickTableTriggerDispatchers(ctx context.Context) { @@ -397,14 +423,7 @@ func (c *eventBroker) close() { func (c *eventBroker) onNotify(s *spanSubscription, watermark uint64) { s.onSubscriptionWatermark(watermark) - s.dispatchers.RLock() - defer s.dispatchers.RUnlock() - for _, stat := range s.dispatchers.m { - c.taskPool.pushTask(&scanTask{ - dispatcherStat: stat, - createTime: time.Now(), - }) - } + c.notifyCh <- s } func (c *eventBroker) addDispatcher(info DispatcherInfo) { @@ -413,6 +432,7 @@ func (c *eventBroker) addDispatcher(info DispatcherInfo) { panic(err) } + defer c.metricDispatcherCount.Inc() start := time.Now() id := info.GetID() span := info.GetTableSpan() @@ -452,6 +472,7 @@ func (c *eventBroker) addDispatcher(info DispatcherInfo) { } func (c *eventBroker) removeDispatcher(dispatcherInfo DispatcherInfo) { + defer c.metricDispatcherCount.Dec() id := dispatcherInfo.GetID() stat, ok := c.dispatchers.Load(id) if !ok { diff --git a/pkg/eventservice/event_broker_test.go b/pkg/eventservice/event_broker_test.go index afdeb5576..8d5b777d4 100644 --- a/pkg/eventservice/event_broker_test.go +++ b/pkg/eventservice/event_broker_test.go @@ -96,49 +96,6 @@ func TestDispatcherStatUpdateWatermark(t *testing.T) { wg.Wait() } -func TestScanTaskPool_PushTask(t *testing.T) { - pool := newScanTaskPool() - span := newTableSpan(1, "a", "b") - dispatcherInfo := &mockDispatcherInfo{ - id: common.NewDispatcherID(), - clusterID: 1, - startTs: 1000, - span: span, - } - s := newSpanSubscription(span, dispatcherInfo.startTs) - dispatcherStat := newDispatcherStat(dispatcherInfo.startTs, dispatcherInfo, s, nil) - - now := time.Now() - task1 := &scanTask{ - dispatcherStat: dispatcherStat, - createTime: now.Add(1 * time.Second), - } - - task2 := &scanTask{ - dispatcherStat: dispatcherStat, - createTime: now.Add(2 * time.Second), - } - - // make the pool contain the task1 already - pool.taskSet[dispatcherInfo.GetID()] = task1 - - // Verify that the task is in the taskSet - task, ok := pool.taskSet[dispatcherInfo.id] - require.True(t, ok) - require.Equal(t, task1, task) - - // Push the second task - pool.pushTask(task2) - // Verify that the task1 is sent to corresponding pendingTaskQueue, task2 is dropped since duplicate. - receivedTask := <-pool.pendingTaskQueue[dispatcherStat.workerIndex] - require.Equal(t, task1, receivedTask) - - // Verify that the task is removed from taskSet - task, ok = pool.taskSet[dispatcherInfo.GetID()] - require.False(t, ok) - require.Nil(t, task) -} - func newTableSpan(tableID int64, start, end string) *heartbeatpb.TableSpan { return &heartbeatpb.TableSpan{ TableID: tableID, diff --git a/pkg/messaging/message.go b/pkg/messaging/message.go index c0440174c..34613a62a 100644 --- a/pkg/messaging/message.go +++ b/pkg/messaging/message.go @@ -109,6 +109,7 @@ func (b *Bytes) Unmarshal(data []byte) error { type RegisterDispatcherRequest struct { *eventpb.RegisterDispatcherRequest + id common.DispatcherID } func (r RegisterDispatcherRequest) Marshal() ([]byte, error) { @@ -116,11 +117,16 @@ func (r RegisterDispatcherRequest) Marshal() ([]byte, error) { } func (r RegisterDispatcherRequest) Unmarshal(data []byte) error { - return r.RegisterDispatcherRequest.Unmarshal(data) + err := r.RegisterDispatcherRequest.Unmarshal(data) + if err != nil { + return err + } + r.id = common.NewDispatcherIDFromPB(r.DispatcherId) + return nil } func (r RegisterDispatcherRequest) GetID() common.DispatcherID { - return common.NewDispatcherIDFromPB(r.DispatcherId) + return r.id } func (r RegisterDispatcherRequest) GetClusterID() uint64 { diff --git a/pkg/metrics/eventService.go b/pkg/metrics/eventService.go index 0082b2ac3..e48370f91 100644 --- a/pkg/metrics/eventService.go +++ b/pkg/metrics/eventService.go @@ -64,6 +64,13 @@ var ( Help: "The duration of scanning task in queue", Buckets: prometheus.DefBuckets, }) + EventServiceDispatcherGuage = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "event_service", + Name: "dispatcher_count", + Help: "The number of dispatchers in event service", + }, []string{"cluster"}) ) // InitMetrics registers all metrics in this file. @@ -74,4 +81,5 @@ func InitEventServiceMetrics(registry *prometheus.Registry) { registry.MustRegister(EventServiceResolvedTsGauge) registry.MustRegister(EventServiceResolvedTsLagGauge) registry.MustRegister(EventServiceScanTaskInQueueDuration) + registry.MustRegister(EventServiceDispatcherGuage) }