diff --git a/downstreamadapter/dispatcher/dispatcher.go b/downstreamadapter/dispatcher/dispatcher.go index 5f5bf5c81..7fc306035 100644 --- a/downstreamadapter/dispatcher/dispatcher.go +++ b/downstreamadapter/dispatcher/dispatcher.go @@ -65,6 +65,7 @@ type Dispatcher interface { // PushEvent(event *eventpb.TxnEvent) PushTxnEvent(event *common.TxnEvent) GetComponentStatus() heartbeatpb.ComponentState + GetRemovingStatus() bool } type DispatcherType uint64 @@ -146,6 +147,7 @@ type HeartBeatInfo struct { Id string TableSpan *common.TableSpan ComponentStatus heartbeatpb.ComponentState + IsRemoving bool } func CollectDispatcherHeartBeatInfo(d Dispatcher, h *HeartBeatInfo) { @@ -167,6 +169,7 @@ func CollectDispatcherHeartBeatInfo(d Dispatcher, h *HeartBeatInfo) { h.Id = d.GetId() h.ComponentStatus = d.GetComponentStatus() h.TableSpan = d.GetTableSpan() + h.IsRemoving = d.GetRemovingStatus() } /* diff --git a/downstreamadapter/dispatcher/table_event_dispatcher.go b/downstreamadapter/dispatcher/table_event_dispatcher.go index 8400fba99..f9b34bd5e 100644 --- a/downstreamadapter/dispatcher/table_event_dispatcher.go +++ b/downstreamadapter/dispatcher/table_event_dispatcher.go @@ -17,6 +17,7 @@ import ( "bytes" "context" "sync" + "sync/atomic" "time" "github.com/flowbehappy/tigate/downstreamadapter/sink" @@ -117,6 +118,8 @@ type TableEventDispatcher struct { cancel context.CancelFunc wg sync.WaitGroup + + isRemoving atomic.Bool } func NewTableEventDispatcher(tableSpan *common.TableSpan, sink sink.Sink, startTs uint64, syncPointInfo *SyncPointInfo) *TableEventDispatcher { @@ -133,7 +136,9 @@ func NewTableEventDispatcher(tableSpan *common.TableSpan, sink sink.Sink, startT componentStatus: newComponentStateWithMutex(heartbeatpb.ComponentState_Working), resolvedTs: newTsWithMutex(startTs), cancel: cancel, + isRemoving: atomic.Bool{}, } + tableEventDispatcher.sink.AddTableSpan(tableSpan) tableEventDispatcher.wg.Add(1) go tableEventDispatcher.DispatcherEvents(ctx) @@ -295,7 +300,7 @@ func (d *TableEventDispatcher) Remove() { d.cancel() d.sink.StopTableSpan(d.tableSpan) log.Info("table event dispatcher component status changed to stopping", zap.String("table", d.tableSpan.String())) - d.componentStatus.Set(heartbeatpb.ComponentState_Stopping) + d.isRemoving.Store(true) } func (d *TableEventDispatcher) TryClose() (w heartbeatpb.Watermark, ok bool) { @@ -322,3 +327,7 @@ func (d *TableEventDispatcher) TryClose() (w heartbeatpb.Watermark, ok bool) { func (d *TableEventDispatcher) GetComponentStatus() heartbeatpb.ComponentState { return d.componentStatus.Get() } + +func (d *TableEventDispatcher) GetRemovingStatus() bool { + return d.isRemoving.Load() +} diff --git a/downstreamadapter/dispatcher/table_trigger_event_dispatcher.go b/downstreamadapter/dispatcher/table_trigger_event_dispatcher.go index 21f63d559..bd4dfabd1 100644 --- a/downstreamadapter/dispatcher/table_trigger_event_dispatcher.go +++ b/downstreamadapter/dispatcher/table_trigger_event_dispatcher.go @@ -14,6 +14,8 @@ package dispatcher import ( + "sync/atomic" + "github.com/flowbehappy/tigate/downstreamadapter/sink" "github.com/flowbehappy/tigate/heartbeatpb" "github.com/flowbehappy/tigate/pkg/common" @@ -70,6 +72,8 @@ type TableTriggerEventDispatcher struct { ResolvedTs uint64 MemoryUsage *MemoryUsage + + IsRemoving atomic.Bool } func (d *TableTriggerEventDispatcher) GetSink() sink.Sink { @@ -127,3 +131,7 @@ func (d *TableTriggerEventDispatcher) GetCheckpointTs() uint64 { return 0 } func (d *TableTriggerEventDispatcher) GetComponentStatus() heartbeatpb.ComponentState { return heartbeatpb.ComponentState_Working } + +func (d *TableTriggerEventDispatcher) GetRemovingStatus() bool { + return d.IsRemoving.Load() +} diff --git a/downstreamadapter/dispatchermanager/event_dispatcher_manager.go b/downstreamadapter/dispatchermanager/event_dispatcher_manager.go index d22f1bb2a..934d18942 100644 --- a/downstreamadapter/dispatchermanager/event_dispatcher_manager.go +++ b/downstreamadapter/dispatchermanager/event_dispatcher_manager.go @@ -288,11 +288,7 @@ func (e *EventDispatcherManager) CollectHeartbeatInfoWhenStatesChanged(ctx conte func (e *EventDispatcherManager) RemoveTableEventDispatcher(tableSpan *common.TableSpan) { dispatcher, ok := e.dispatcherMap.Get(tableSpan) if ok { - if dispatcher.GetComponentStatus() == heartbeatpb.ComponentState_Stopping { - e.GetTableSpanStatusesChan() <- &heartbeatpb.TableSpanStatus{ - Span: tableSpan.TableSpan, - ComponentStatus: heartbeatpb.ComponentState_Stopping, - } + if dispatcher.GetRemovingStatus() == true { return } appcontext.GetService[*eventcollector.EventCollector](appcontext.EventCollector).RemoveDispatcher(dispatcher) @@ -322,6 +318,7 @@ func (e *EventDispatcherManager) newTableTriggerEventDispatcher(startTs uint64) Sink: e.sink, TableSpan: &common.DDLSpan, State: dispatcher.NewState(), + IsRemoving: atomic.Bool{}, //MemoryUsage: dispatcher.NewMemoryUsage(), } @@ -386,8 +383,7 @@ func (e *EventDispatcherManager) CollectHeartbeatInfo(needCompleteStatus bool) * // TODO: we need to consider how to deal with the checkpointTs of the removed dispatcher if the message will be discarded. dispatcher.CollectDispatcherHeartBeatInfo(tableEventDispatcher, dispatcherHeartBeatInfo) - componentStatus := dispatcherHeartBeatInfo.ComponentStatus - if componentStatus == heartbeatpb.ComponentState_Stopping { + if dispatcherHeartBeatInfo.IsRemoving == true { watermark, ok := tableEventDispatcher.TryClose() if ok { // remove successfully