Skip to content

Commit

Permalink
eventBroker: add resolvedTsCache to reduce GC
Browse files Browse the repository at this point in the history
Signed-off-by: dongmen <[email protected]>
  • Loading branch information
asddongmen committed Aug 28, 2024
1 parent d80de7a commit 12b4315
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 19 deletions.
61 changes: 46 additions & 15 deletions pkg/eventservice/event_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ type eventBroker struct {

// messageCh is used to receive message from the scanWorker,
// and a goroutine is responsible for sending the message to the dispatchers.
messageCh chan wrapEvent
resolvedTsCache map[messaging.ServerId][]common.ResolvedEvent
messageCh chan wrapEvent
resolvedTsCaches map[messaging.ServerId]*resolvedTsCache

// wg is used to spawn the goroutines.
wg *sync.WaitGroup
Expand Down Expand Up @@ -82,7 +82,7 @@ func newEventBroker(
taskPool: newScanTaskPool(),
scanWorkerCount: defaultScanWorkerCount,
messageCh: make(chan wrapEvent, defaultChannelSize),
resolvedTsCache: make(map[messaging.ServerId][]common.ResolvedEvent),
resolvedTsCaches: make(map[messaging.ServerId]*resolvedTsCache),
cancel: cancel,
wg: wg,
metricEventServicePullerResolvedTs: metrics.EventServiceResolvedTsGauge,
Expand Down Expand Up @@ -257,7 +257,7 @@ func (c *eventBroker) runSendMessageWorker(ctx context.Context) {
c.flushResolvedTs(ctx, m.serverID)
c.sendMsg(ctx, tMsg)
case <-flushResolvedTsTicker.C:
for serverID := range c.resolvedTsCache {
for serverID := range c.resolvedTsCaches {
c.flushResolvedTs(ctx, serverID)
}
}
Expand All @@ -266,27 +266,24 @@ func (c *eventBroker) runSendMessageWorker(ctx context.Context) {
}

func (c *eventBroker) handleResolvedTs(ctx context.Context, e wrapEvent) {
cache, ok := c.resolvedTsCache[e.serverID]
cache, ok := c.resolvedTsCaches[e.serverID]
if !ok {
cache = make([]common.ResolvedEvent, 0, resolvedTsCacheSize)
c.resolvedTsCache[e.serverID] = cache
cache = newResolvedTsCache(resolvedTsCacheSize)
c.resolvedTsCaches[e.serverID] = cache
}
c.resolvedTsCache[e.serverID] = append(cache, e.resolvedEvent)
if len(c.resolvedTsCache[e.serverID]) >= resolvedTsCacheSize {
cache.add(e.resolvedEvent)
if cache.isFull() {
c.flushResolvedTs(ctx, e.serverID)
}
}

func (c *eventBroker) flushResolvedTs(ctx context.Context, serverID messaging.ServerId) {
events, ok := c.resolvedTsCache[serverID]
if !ok || len(events) == 0 {
cache, ok := c.resolvedTsCaches[serverID]
if !ok || cache.len == 0 {
return
}
msg := &common.BatchResolvedTs{}
msg.Events = append(msg.Events, events...)
// Clear the event to release the memory.
// Clear the cache.
c.resolvedTsCache[serverID] = c.resolvedTsCache[serverID][:0]
msg.Events = append(msg.Events, cache.getAll()...)
tMsg := messaging.NewSingleTargetMessage(
serverID,
messaging.EventCollectorTopic,
Expand Down Expand Up @@ -557,3 +554,37 @@ func newWrapDDLEvent(serverID messaging.ServerId, e common.DDLEvent) wrapEvent {
msgType: common.TypeDDLEvent,
}
}

type resolvedTsCache struct {
cache []common.ResolvedEvent
// len is the number of the events in the cache.
len int
// limit is the max number of the events that the cache can store.
limit int
}

func newResolvedTsCache(limit int) *resolvedTsCache {
return &resolvedTsCache{
cache: make([]common.ResolvedEvent, limit),
limit: limit,
}
}

func (c *resolvedTsCache) add(e common.ResolvedEvent) {
c.cache[c.len] = e
c.len++
}

func (c *resolvedTsCache) isFull() bool {
return c.len >= c.limit
}

func (c *resolvedTsCache) getAll() []common.ResolvedEvent {
res := c.cache[:c.len]
c.reset()
return res
}

func (c *resolvedTsCache) reset() {
c.len = 0
}
38 changes: 38 additions & 0 deletions pkg/eventservice/event_broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,3 +166,41 @@ func newTableSpan(tableID uint64, start, end string) *common.TableSpan {
}
return res
}

func TestResolvedTsCache(t *testing.T) {
rc := newResolvedTsCache(10)
require.Equal(t, 0, rc.len)
require.Equal(t, 10, len(rc.cache))
require.Equal(t, 10, rc.limit)

// Case 1: insert a new resolved ts
rc.add(common.ResolvedEvent{
DispatcherID: common.NewDispatcherID(),
ResolvedTs: 100,
})
require.Equal(t, 1, rc.len)
require.Equal(t, uint64(100), rc.cache[0].ResolvedTs)
require.False(t, rc.isFull())

// Case 2: add more resolved ts until full
i := 1
for !rc.isFull() {
rc.add(common.ResolvedEvent{
DispatcherID: common.NewDispatcherID(),
ResolvedTs: uint64(100 + i),
})
i++
}
require.Equal(t, 10, rc.len)
require.Equal(t, uint64(100), rc.cache[0].ResolvedTs)
require.Equal(t, uint64(109), rc.cache[9].ResolvedTs)
require.True(t, rc.isFull())

// Case 3: get all resolved ts
res := rc.getAll()
require.Equal(t, 10, len(res))
require.Equal(t, 0, rc.len)
require.Equal(t, uint64(100), res[0].ResolvedTs)
require.Equal(t, uint64(109), res[9].ResolvedTs)
require.False(t, rc.isFull())
}
8 changes: 4 additions & 4 deletions pkg/eventservice/event_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type mockMessageCenter struct {
messageCh chan *messaging.TargetMessage
}

func (m *mockMessageCenter) OnNodeChanges(newNodes []*common.NodeInfo, removedNodes []*common.NodeInfo) {
func (m *mockMessageCenter) OnNodeChanges(nodeInfos map[string]*common.NodeInfo) {

}

Expand Down Expand Up @@ -409,8 +409,8 @@ func TestDispatcherCommunicateWithEventService(t *testing.T) {
mysqlSink := sink.NewMysqlSink(model.DefaultChangeFeedID("test1"), 8, writer.NewMysqlConfig(), db)
tableSpan := &common.TableSpan{TableSpan: &heartbeatpb.TableSpan{TableID: 1, StartKey: nil, EndKey: nil}}
startTs := uint64(1)

tableEventDispatcher := dispatcher.NewDispatcher(tableSpan, mysqlSink, startTs, nil, nil)
id := common.NewDispatcherID()
tableEventDispatcher := dispatcher.NewDispatcher(id, tableSpan, mysqlSink, startTs, nil, nil)
appcontext.GetService[*eventcollector.EventCollector](appcontext.EventCollector).RegisterDispatcher(
eventcollector.RegisterInfo{
Dispatcher: tableEventDispatcher,
Expand Down Expand Up @@ -438,5 +438,5 @@ func TestDispatcherCommunicateWithEventService(t *testing.T) {

sourceSpanStat.update([]*common.TxnEvent{txnEvent}, txnEvent.CommitTs)

<-tableEventDispatcher.GetEventChan()
// <-tableEventDispatcher.GetEventChan()
}

0 comments on commit 12b4315

Please sign in to comment.