Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Sep 11, 2024
1 parent c27cb1a commit 8af397e
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 53 deletions.
37 changes: 29 additions & 8 deletions pkg/eventservice/event_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package eventservice
import (
"context"
"hash/crc32"
"strconv"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -59,12 +60,14 @@ type eventBroker struct {
// and a goroutine is responsible for sending the message to the dispatchers.
messageCh chan wrapEvent
resolvedTsCaches map[messaging.ServerId]*resolvedTsCache
notifyCh chan *spanSubscription

// wg is used to spawn the goroutines.
wg *sync.WaitGroup
// cancel is used to cancel the goroutines spawned by the eventBroker.
cancel context.CancelFunc

metricDispatcherCount prometheus.Gauge
metricEventServicePullerResolvedTs prometheus.Gauge
metricEventServiceDispatcherResolvedTs prometheus.Gauge
metricEventServiceResolvedTsLag prometheus.Gauge
Expand All @@ -86,6 +89,7 @@ func newEventBroker(
eventStore: eventStore,
mounter: mounter.NewMounter(tz),
schemaStore: schemaStore,
notifyCh: make(chan *spanSubscription, defaultChannelSize*16),
dispatchers: sync.Map{},
tableTriggerDispatchers: sync.Map{},
spans: make(map[common.TableID]*spanSubscription),
Expand All @@ -96,6 +100,7 @@ func newEventBroker(
resolvedTsCaches: make(map[messaging.ServerId]*resolvedTsCache),
cancel: cancel,
wg: wg,
metricDispatcherCount: metrics.EventServiceDispatcherGuage.WithLabelValues(strconv.FormatUint(id, 10)),
metricEventServicePullerResolvedTs: metrics.EventServiceResolvedTsGauge,
metricEventServiceResolvedTsLag: metrics.EventServiceResolvedTsLagGauge.WithLabelValues("puller"),
metricEventServiceDispatcherResolvedTs: metrics.EventServiceResolvedTsLagGauge.WithLabelValues("dispatcher"),
Expand All @@ -105,6 +110,7 @@ func newEventBroker(
c.tickTableTriggerDispatchers(ctx)
c.runSendMessageWorker(ctx)
c.updateMetrics(ctx)
c.runGenTasks(ctx)
return c
}

Expand Down Expand Up @@ -142,6 +148,26 @@ func (c *eventBroker) runScanWorker(ctx context.Context) {
}
}

func (c *eventBroker) runGenTasks(ctx context.Context) {
go func() {
for {
select {
case <-ctx.Done():
return
case s := <-c.notifyCh:
s.dispatchers.RLock()
defer s.dispatchers.RUnlock()
for _, stat := range s.dispatchers.m {
c.taskPool.pushTask(&scanTask{
dispatcherStat: stat,
createTime: time.Now(),
})
}
}
}
}()
}

// TODO: maybe event driven model is better. It is coupled with the detail implementation of
// the schemaStore, we will refactor it later.
func (c *eventBroker) tickTableTriggerDispatchers(ctx context.Context) {
Expand Down Expand Up @@ -397,14 +423,7 @@ func (c *eventBroker) close() {

func (c *eventBroker) onNotify(s *spanSubscription, watermark uint64) {
s.onSubscriptionWatermark(watermark)
s.dispatchers.RLock()
defer s.dispatchers.RUnlock()
for _, stat := range s.dispatchers.m {
c.taskPool.pushTask(&scanTask{
dispatcherStat: stat,
createTime: time.Now(),
})
}
c.notifyCh <- s
}

func (c *eventBroker) addDispatcher(info DispatcherInfo) {
Expand All @@ -413,6 +432,7 @@ func (c *eventBroker) addDispatcher(info DispatcherInfo) {
panic(err)
}

defer c.metricDispatcherCount.Inc()
start := time.Now()
id := info.GetID()
span := info.GetTableSpan()
Expand Down Expand Up @@ -452,6 +472,7 @@ func (c *eventBroker) addDispatcher(info DispatcherInfo) {
}

func (c *eventBroker) removeDispatcher(dispatcherInfo DispatcherInfo) {
defer c.metricDispatcherCount.Dec()
id := dispatcherInfo.GetID()
stat, ok := c.dispatchers.Load(id)
if !ok {
Expand Down
43 changes: 0 additions & 43 deletions pkg/eventservice/event_broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,49 +96,6 @@ func TestDispatcherStatUpdateWatermark(t *testing.T) {
wg.Wait()
}

func TestScanTaskPool_PushTask(t *testing.T) {
pool := newScanTaskPool()
span := newTableSpan(1, "a", "b")
dispatcherInfo := &mockDispatcherInfo{
id: common.NewDispatcherID(),
clusterID: 1,
startTs: 1000,
span: span,
}
s := newSpanSubscription(span, dispatcherInfo.startTs)
dispatcherStat := newDispatcherStat(dispatcherInfo.startTs, dispatcherInfo, s, nil)

now := time.Now()
task1 := &scanTask{
dispatcherStat: dispatcherStat,
createTime: now.Add(1 * time.Second),
}

task2 := &scanTask{
dispatcherStat: dispatcherStat,
createTime: now.Add(2 * time.Second),
}

// make the pool contain the task1 already
pool.taskSet[dispatcherInfo.GetID()] = task1

// Verify that the task is in the taskSet
task, ok := pool.taskSet[dispatcherInfo.id]
require.True(t, ok)
require.Equal(t, task1, task)

// Push the second task
pool.pushTask(task2)
// Verify that the task1 is sent to corresponding pendingTaskQueue, task2 is dropped since duplicate.
receivedTask := <-pool.pendingTaskQueue[dispatcherStat.workerIndex]
require.Equal(t, task1, receivedTask)

// Verify that the task is removed from taskSet
task, ok = pool.taskSet[dispatcherInfo.GetID()]
require.False(t, ok)
require.Nil(t, task)
}

func newTableSpan(tableID int64, start, end string) *heartbeatpb.TableSpan {
return &heartbeatpb.TableSpan{
TableID: tableID,
Expand Down
10 changes: 8 additions & 2 deletions pkg/messaging/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,18 +109,24 @@ func (b *Bytes) Unmarshal(data []byte) error {

type RegisterDispatcherRequest struct {
*eventpb.RegisterDispatcherRequest
id common.DispatcherID
}

func (r RegisterDispatcherRequest) Marshal() ([]byte, error) {
return r.RegisterDispatcherRequest.Marshal()
}

func (r RegisterDispatcherRequest) Unmarshal(data []byte) error {
return r.RegisterDispatcherRequest.Unmarshal(data)
err := r.RegisterDispatcherRequest.Unmarshal(data)
if err != nil {
return err
}
r.id = common.NewDispatcherIDFromPB(r.DispatcherId)
return nil
}

func (r RegisterDispatcherRequest) GetID() common.DispatcherID {
return common.NewDispatcherIDFromPB(r.DispatcherId)
return r.id
}

func (r RegisterDispatcherRequest) GetClusterID() uint64 {
Expand Down
8 changes: 8 additions & 0 deletions pkg/metrics/eventService.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ var (
Help: "The duration of scanning task in queue",
Buckets: prometheus.DefBuckets,
})
EventServiceDispatcherGuage = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "event_service",
Name: "dispatcher_count",
Help: "The number of dispatchers in event service",
}, []string{"cluster"})
)

// InitMetrics registers all metrics in this file.
Expand All @@ -74,4 +81,5 @@ func InitEventServiceMetrics(registry *prometheus.Registry) {
registry.MustRegister(EventServiceResolvedTsGauge)
registry.MustRegister(EventServiceResolvedTsLagGauge)
registry.MustRegister(EventServiceScanTaskInQueueDuration)
registry.MustRegister(EventServiceDispatcherGuage)
}

0 comments on commit 8af397e

Please sign in to comment.