Skip to content

Commit

Permalink
dispatcher: store db info in eventDispatcherManager (#241)
Browse files Browse the repository at this point in the history
  • Loading branch information
hongyunyan authored Aug 30, 2024
1 parent 5f2caa0 commit 790596a
Show file tree
Hide file tree
Showing 8 changed files with 507 additions and 207 deletions.
13 changes: 10 additions & 3 deletions downstreamadapter/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,11 @@ type Dispatcher struct {

resendTask *ResendTask
checkTableProgressEmptyTask *CheckProgressEmptyTask

schemaID int64
}

func NewDispatcher(id common.DispatcherID, tableSpan *common.TableSpan, sink sink.Sink, startTs uint64, statusesChan chan *heartbeatpb.TableSpanStatus, filter filter.Filter) *Dispatcher {
func NewDispatcher(id common.DispatcherID, tableSpan *common.TableSpan, sink sink.Sink, startTs uint64, statusesChan chan *heartbeatpb.TableSpanStatus, filter filter.Filter, schemaID int64) *Dispatcher {
dispatcher := &Dispatcher{
id: id,
tableSpan: tableSpan,
Expand All @@ -98,6 +100,7 @@ func NewDispatcher(id common.DispatcherID, tableSpan *common.TableSpan, sink sin
isRemoving: atomic.Bool{},
ddlPendingEvent: nil,
tableProgress: types.NewTableProgress(),
schemaID: schemaID,
}

dispatcherStatusDynamicStream := GetDispatcherStatusDynamicStream()
Expand Down Expand Up @@ -219,7 +222,7 @@ func (d *Dispatcher) DealWithDDLWhenProgressEmpty() {
IsBlocked: false,
BlockTs: d.ddlPendingEvent.CommitTs,
NeedDroppedDispatchers: d.ddlPendingEvent.GetNeedDroppedDispatchers().ToPB(),
NeedAddedTables: d.ddlPendingEvent.GetNeedAddedTables(),
NeedAddedTables: common.ToTablesPB(d.ddlPendingEvent.GetNeedAddedTables()),
},
}
d.SetResendTask(newResendTask(message, d))
Expand All @@ -234,7 +237,7 @@ func (d *Dispatcher) DealWithDDLWhenProgressEmpty() {
BlockTs: d.ddlPendingEvent.CommitTs,
BlockDispatchers: d.ddlPendingEvent.GetBlockedDispatchers().ToPB(),
NeedDroppedDispatchers: d.ddlPendingEvent.GetNeedDroppedDispatchers().ToPB(),
NeedAddedTables: d.ddlPendingEvent.GetNeedAddedTables(),
NeedAddedTables: common.ToTablesPB(d.ddlPendingEvent.GetNeedAddedTables()),
},
}
d.SetResendTask(newResendTask(message, d))
Expand Down Expand Up @@ -284,6 +287,10 @@ func (d *Dispatcher) SetResendTask(task *ResendTask) {
d.resendTask = task
}

func (d *Dispatcher) GetSchemaID() int64 {
return d.schemaID
}

//func (d *Dispatcher) GetSyncPointInfo() *SyncPointInfo {
// return d.syncPointInfo
// }
Expand Down
30 changes: 20 additions & 10 deletions downstreamadapter/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestBasicDispatcher(t *testing.T) {
tableSpanStatusChan := make(chan *heartbeatpb.TableSpanStatus, 10)
filter, _ := filter.NewFilter(&config.ReplicaConfig{Filter: &config.FilterConfig{}}, "")

dispatcher := NewDispatcher(common.NewDispatcherID(), tableSpan, mysqlSink, startTs, tableSpanStatusChan, filter)
dispatcher := NewDispatcher(common.NewDispatcherID(), tableSpan, mysqlSink, startTs, tableSpanStatusChan, filter, 0)

dispatcherEventsDynamicStream := GetDispatcherEventsDynamicStream()

Expand Down Expand Up @@ -138,7 +138,7 @@ func TestDispatcherWithSingleTableDDL(t *testing.T) {
tableSpanStatusChan := make(chan *heartbeatpb.TableSpanStatus, 10)
filter, _ := filter.NewFilter(&config.ReplicaConfig{Filter: &config.FilterConfig{}}, "")

dispatcher := NewDispatcher(common.NewDispatcherID(), tableSpan, mysqlSink, startTs, tableSpanStatusChan, filter)
dispatcher := NewDispatcher(common.NewDispatcherID(), tableSpan, mysqlSink, startTs, tableSpanStatusChan, filter, 0)

dispatcherEventsDynamicStream := GetDispatcherEventsDynamicStream()
dispatcherEventsDynamicStream.In() <- &common.TxnEvent{
Expand Down Expand Up @@ -193,7 +193,7 @@ func TestDispatcherWithCrossTableDDL(t *testing.T) {
tableSpanStatusChan := make(chan *heartbeatpb.TableSpanStatus, 10)
filter, _ := filter.NewFilter(&config.ReplicaConfig{Filter: &config.FilterConfig{}}, "")

dispatcher := NewDispatcher(common.NewDispatcherID(), tableSpan, mysqlSink, startTs, tableSpanStatusChan, filter)
dispatcher := NewDispatcher(common.NewDispatcherID(), tableSpan, mysqlSink, startTs, tableSpanStatusChan, filter, 0)

dispatcherEventsDynamicStream := GetDispatcherEventsDynamicStream()
dispatcherStatusDynamicStream := GetDispatcherStatusDynamicStream()
Expand Down Expand Up @@ -257,7 +257,7 @@ func TestDispatcherWithCrossTableDDLAndDML(t *testing.T) {
tableSpanStatusChan := make(chan *heartbeatpb.TableSpanStatus, 10)
filter, _ := filter.NewFilter(&config.ReplicaConfig{Filter: &config.FilterConfig{}}, "")

dispatcher := NewDispatcher(common.NewDispatcherID(), tableSpan, mysqlSink, startTs, tableSpanStatusChan, filter)
dispatcher := NewDispatcher(common.NewDispatcherID(), tableSpan, mysqlSink, startTs, tableSpanStatusChan, filter, 0)

dispatcherEventsDynamicStream := GetDispatcherEventsDynamicStream()
dispatcherStatusDynamicStream := GetDispatcherStatusDynamicStream()
Expand Down Expand Up @@ -437,13 +437,13 @@ func TestMultiDispatcherWithMultipleDDLs(t *testing.T) {

ddlTableSpan := &common.DDLSpan

tableTriggerEventDispatcher := NewDispatcher(common.NewDispatcherID(), ddlTableSpan, mysqlSink, startTs, statusChan, filter)
tableTriggerEventDispatcher := NewDispatcher(common.NewDispatcherID(), ddlTableSpan, mysqlSink, startTs, statusChan, filter, 0)

table1TableSpan := &common.TableSpan{TableSpan: &heartbeatpb.TableSpan{TableID: 1}}
table2TableSpan := &common.TableSpan{TableSpan: &heartbeatpb.TableSpan{TableID: 2}}

table1Dispatcher := NewDispatcher(common.NewDispatcherID(), table1TableSpan, mysqlSink, startTs, statusChan, filter)
table2Dispatcher := NewDispatcher(common.NewDispatcherID(), table2TableSpan, mysqlSink, startTs, statusChan, filter)
table1Dispatcher := NewDispatcher(common.NewDispatcherID(), table1TableSpan, mysqlSink, startTs, statusChan, filter, 0)
table2Dispatcher := NewDispatcher(common.NewDispatcherID(), table2TableSpan, mysqlSink, startTs, statusChan, filter, 0)

dbDispatcherIdsMap := make(map[int64][]common.DispatcherID)
dbDispatcherIdsMap[1] = append(dbDispatcherIdsMap[1], table1Dispatcher.id)
Expand Down Expand Up @@ -489,8 +489,13 @@ func TestMultiDispatcherWithMultipleDDLs(t *testing.T) {
TableName: "test_table",
Query: "Create table `test_schema`.`test_table` (id int primary key, name varchar(255))",
},
CommitTS: 103,
NeedAddedTables: []int64{3},
CommitTS: 103,
NeedAddedTables: []common.Table{
{
SchemaID: 1,
TableID: 3,
},
},
},
DispatcherID: tableTriggerEventDispatcher.id,
}
Expand Down Expand Up @@ -543,7 +548,12 @@ func TestMultiDispatcherWithMultipleDDLs(t *testing.T) {
tableTriggerEventDispatcher.id.ToPB(),
},
},
NeedAddedTables: []int64{4},
NeedAddedTables: []common.Table{
{
SchemaID: 1,
TableID: 1,
},
},
NeedDroppedDispatchers: &common.InfluencedDispatchers{
InfluenceType: common.Normal,
DispatcherIDs: []*heartbeatpb.DispatcherID{
Expand Down
81 changes: 76 additions & 5 deletions downstreamadapter/dispatchermanager/event_dispatcher_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ type EventDispatcherManager struct {

heartBeatTask *HeartBeatTask

schemaIDToDispatchers *SchemaIDToDispatchers

tableTriggerEventDispatcherID *common.DispatcherID

tableEventDispatcherCount prometheus.Gauge
metricCreateDispatcherDuration prometheus.Observer
metricCheckpointTs prometheus.Gauge
Expand All @@ -95,6 +99,7 @@ func NewEventDispatcherManager(changefeedID model.ChangeFeedID,
statusesChan: make(chan *heartbeatpb.TableSpanStatus, 10000),
cancel: cancel,
config: cfConfig,
schemaIDToDispatchers: newSchemaIDToDispatchers(),
tableEventDispatcherCount: metrics.TableEventDispatcherGauge.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricCreateDispatcherDuration: metrics.CreateDispatcherDuration.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricCheckpointTs: metrics.EventDispatcherManagerCheckpointTsGauge.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
Expand Down Expand Up @@ -188,14 +193,20 @@ func calculateStartSyncPointTs(startTs uint64, syncPointInterval time.Duration)
}
*/

func (e *EventDispatcherManager) NewDispatcher(id common.DispatcherID, tableSpan *common.TableSpan, startTs uint64) *dispatcher.Dispatcher {
func (e *EventDispatcherManager) NewDispatcher(id common.DispatcherID, tableSpan *common.TableSpan, startTs uint64, schemaID int64) *dispatcher.Dispatcher {
start := time.Now()
if _, ok := e.dispatcherMap.Get(id); ok {
log.Debug("table span already exists", zap.Any("tableSpan", tableSpan))
return nil
}

dispatcher := dispatcher.NewDispatcher(id, tableSpan, e.sink, startTs, e.statusesChan, e.filter)
dispatcher := dispatcher.NewDispatcher(id, tableSpan, e.sink, startTs, e.statusesChan, e.filter, schemaID)

if tableSpan.Equal(&common.DDLSpan) {
e.tableTriggerEventDispatcherID = &id
} else {
e.schemaIDToDispatchers.Set(schemaID, id)
}

// TODO:暂时不收 ddl 的 event
if !tableSpan.Equal(&common.DDLSpan) {
Expand Down Expand Up @@ -286,8 +297,12 @@ func (e *EventDispatcherManager) RemoveDispatcher(id common.DispatcherID) {
}

// Only called when the dispatcher is removed successfully.
func (e *EventDispatcherManager) cleanTableEventDispatcher(id common.DispatcherID) {
func (e *EventDispatcherManager) cleanTableEventDispatcher(id common.DispatcherID, schemaID int64) {
e.dispatcherMap.Delete(id)
e.schemaIDToDispatchers.Delete(schemaID, id)
if e.tableTriggerEventDispatcherID != nil && *e.tableTriggerEventDispatcherID == id {
e.tableTriggerEventDispatcherID = nil
}
e.tableEventDispatcherCount.Dec()
log.Info("table event dispatcher completely stopped, and delete it from event dispatcher manager", zap.Any("dispatcher id", id))
}
Expand Down Expand Up @@ -329,6 +344,7 @@ func (e *EventDispatcherManager) CollectHeartbeatInfo(needCompleteStatus bool) *
}

toReomveDispatcherIDs := make([]common.DispatcherID, 0)
removeDispatcherSchemaIDs := make([]int64, 0)
heartBeatInfo := &dispatcher.HeartBeatInfo{}
e.dispatcherMap.ForEach(func(id common.DispatcherID, dispatcherItem *dispatcher.Dispatcher) {
// TODO:ddlSpan先不参与
Expand All @@ -351,6 +367,7 @@ func (e *EventDispatcherManager) CollectHeartbeatInfo(needCompleteStatus bool) *
CheckpointTs: watermark.CheckpointTs,
})
toReomveDispatcherIDs = append(toReomveDispatcherIDs, id)
removeDispatcherSchemaIDs = append(removeDispatcherSchemaIDs, dispatcherItem.GetSchemaID())
}
}

Expand All @@ -365,8 +382,8 @@ func (e *EventDispatcherManager) CollectHeartbeatInfo(needCompleteStatus bool) *
}
})

for _, id := range toReomveDispatcherIDs {
e.cleanTableEventDispatcher(id)
for idx, id := range toReomveDispatcherIDs {
e.cleanTableEventDispatcher(id, removeDispatcherSchemaIDs[idx])
}
return &message
}
Expand All @@ -375,6 +392,10 @@ func (e *EventDispatcherManager) GetDispatcherMap() *DispatcherMap {
return e.dispatcherMap
}

func (e *EventDispatcherManager) GetSchemaIDToDispatchers() *SchemaIDToDispatchers {
return e.schemaIDToDispatchers
}

func (e *EventDispatcherManager) GetMaintainerID() messaging.ServerId {
return e.maintainerID
}
Expand All @@ -399,6 +420,15 @@ func (e *EventDispatcherManager) SetMaintainerID(maintainerID messaging.ServerId
e.maintainerID = maintainerID
}

// Get all dispatchers id of the specified schemaID. Including the tableTriggerEventDispatcherID if exists.
func (e *EventDispatcherManager) GetAllDispatchers(schemaID int64) []common.DispatcherID {
dispatcherIDs := e.GetSchemaIDToDispatchers().GetDispatcherIDs(schemaID)
if e.tableTriggerEventDispatcherID != nil {
dispatcherIDs = append(dispatcherIDs, *e.tableTriggerEventDispatcherID)
}
return dispatcherIDs
}

func (e *EventDispatcherManager) updateMetrics(ctx context.Context) error {
ticker := time.NewTicker(10 * time.Second)
e.wg.Add(1)
Expand Down Expand Up @@ -468,3 +498,44 @@ func (d *DispatcherMap) ForEach(fn func(id common.DispatcherID, dispatcher *disp
return true
})
}

type SchemaIDToDispatchers struct {
mutex sync.RWMutex
m map[int64]map[common.DispatcherID]interface{}
}

func newSchemaIDToDispatchers() *SchemaIDToDispatchers {
return &SchemaIDToDispatchers{
m: make(map[int64]map[common.DispatcherID]interface{}),
}
}

func (s *SchemaIDToDispatchers) Set(schemaID int64, dispatcherID common.DispatcherID) {
s.mutex.Lock()
defer s.mutex.Unlock()
if _, ok := s.m[schemaID]; !ok {
s.m[schemaID] = make(map[common.DispatcherID]interface{})
}
s.m[schemaID][dispatcherID] = struct{}{}
}

func (s *SchemaIDToDispatchers) Delete(schemaID int64, dispatcherID common.DispatcherID) {
s.mutex.Lock()
defer s.mutex.Unlock()
if _, ok := s.m[schemaID]; ok {
delete(s.m[schemaID], dispatcherID)
}
}

func (s *SchemaIDToDispatchers) GetDispatcherIDs(schemaID int64) []common.DispatcherID {
s.mutex.RLock()
defer s.mutex.RUnlock()
if ids, ok := s.m[schemaID]; ok {
dispatcherIDs := make([]common.DispatcherID, 0, len(ids))
for id := range ids {
dispatcherIDs = append(dispatcherIDs, id)
}
return dispatcherIDs
}
return nil
}
18 changes: 15 additions & 3 deletions downstreamadapter/dispatchermanager/heartbeat_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (h *SchedulerDispatcherRequestHandler) Handle(scheduleDispatcherRequest *he
scheduleAction := scheduleDispatcherRequest.ScheduleAction
config := scheduleDispatcherRequest.Config
if scheduleAction == heartbeatpb.ScheduleAction_Create {
eventDispatcherManager.NewDispatcher(common.NewDispatcherIDFromPB(config.DispatcherID), &common.TableSpan{TableSpan: config.Span}, config.StartTs)
eventDispatcherManager.NewDispatcher(common.NewDispatcherIDFromPB(config.DispatcherID), &common.TableSpan{TableSpan: config.Span}, config.StartTs, config.SchemaID)
} else if scheduleAction == heartbeatpb.ScheduleAction_Remove {
eventDispatcherManager.RemoveDispatcher(common.NewDispatcherIDFromPB(config.DispatcherID))
}
Expand Down Expand Up @@ -154,9 +154,21 @@ func (h *HeartBeatResponseHandler) Handle(heartbeatResponse *heartbeatpb.HeartBe
h.dispatcherStatusDynamicStream.In() <- dispatcher.NewDispatcherStatusWithID(dispatcherStatus, common.NewDispatcherIDFromPB(dispatcherID))
}
} else if influencedDispatchersType == heartbeatpb.InfluenceType_DB {
// 找出 db 对应的所有 id 扔进去, 记得查看 exclude_dispatcher_id
schemaID := dispatcherStatus.InfluencedDispatchers.SchemaID
excludeDispatcherID := common.NewDispatcherIDFromPB(dispatcherStatus.InfluencedDispatchers.ExcludeDispatcherId)
dispatcherIds := eventDispatcherManager.GetAllDispatchers(schemaID)
for _, id := range dispatcherIds {
if id != excludeDispatcherID {
h.dispatcherStatusDynamicStream.In() <- dispatcher.NewDispatcherStatusWithID(dispatcherStatus, id)
}
}
} else if influencedDispatchersType == heartbeatpb.InfluenceType_All {
// 遍历所有 dispatcher 扔进去, 记得查看 exclude_dispatcher_id
excludeDispatcherID := common.NewDispatcherIDFromPB(dispatcherStatus.InfluencedDispatchers.ExcludeDispatcherId)
eventDispatcherManager.GetDispatcherMap().ForEach(func(id common.DispatcherID, _ *dispatcher.Dispatcher) {
if id != excludeDispatcherID {
h.dispatcherStatusDynamicStream.In() <- dispatcher.NewDispatcherStatusWithID(dispatcherStatus, id)
}
})
}
}

Expand Down
Loading

0 comments on commit 790596a

Please sign in to comment.