Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dispatcher : Removing stopping component status #206

Merged
merged 1 commit into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions downstreamadapter/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type Dispatcher interface {
// PushEvent(event *eventpb.TxnEvent)
PushTxnEvent(event *common.TxnEvent)
GetComponentStatus() heartbeatpb.ComponentState
GetRemovingStatus() bool
}

type DispatcherType uint64
Expand Down Expand Up @@ -146,6 +147,7 @@ type HeartBeatInfo struct {
Id string
TableSpan *common.TableSpan
ComponentStatus heartbeatpb.ComponentState
IsRemoving bool
}

func CollectDispatcherHeartBeatInfo(d Dispatcher, h *HeartBeatInfo) {
Expand All @@ -167,6 +169,7 @@ func CollectDispatcherHeartBeatInfo(d Dispatcher, h *HeartBeatInfo) {
h.Id = d.GetId()
h.ComponentStatus = d.GetComponentStatus()
h.TableSpan = d.GetTableSpan()
h.IsRemoving = d.GetRemovingStatus()
}

/*
Expand Down
11 changes: 10 additions & 1 deletion downstreamadapter/dispatcher/table_event_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"bytes"
"context"
"sync"
"sync/atomic"
"time"

"github.com/flowbehappy/tigate/downstreamadapter/sink"
Expand Down Expand Up @@ -117,6 +118,8 @@ type TableEventDispatcher struct {

cancel context.CancelFunc
wg sync.WaitGroup

isRemoving atomic.Bool
}

func NewTableEventDispatcher(tableSpan *common.TableSpan, sink sink.Sink, startTs uint64, syncPointInfo *SyncPointInfo) *TableEventDispatcher {
Expand All @@ -133,7 +136,9 @@ func NewTableEventDispatcher(tableSpan *common.TableSpan, sink sink.Sink, startT
componentStatus: newComponentStateWithMutex(heartbeatpb.ComponentState_Working),
resolvedTs: newTsWithMutex(startTs),
cancel: cancel,
isRemoving: atomic.Bool{},
}

tableEventDispatcher.sink.AddTableSpan(tableSpan)
tableEventDispatcher.wg.Add(1)
go tableEventDispatcher.DispatcherEvents(ctx)
Expand Down Expand Up @@ -295,7 +300,7 @@ func (d *TableEventDispatcher) Remove() {
d.cancel()
d.sink.StopTableSpan(d.tableSpan)
log.Info("table event dispatcher component status changed to stopping", zap.String("table", d.tableSpan.String()))
d.componentStatus.Set(heartbeatpb.ComponentState_Stopping)
d.isRemoving.Store(true)
}

func (d *TableEventDispatcher) TryClose() (w heartbeatpb.Watermark, ok bool) {
Expand All @@ -322,3 +327,7 @@ func (d *TableEventDispatcher) TryClose() (w heartbeatpb.Watermark, ok bool) {
func (d *TableEventDispatcher) GetComponentStatus() heartbeatpb.ComponentState {
return d.componentStatus.Get()
}

func (d *TableEventDispatcher) GetRemovingStatus() bool {
return d.isRemoving.Load()
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package dispatcher

import (
"sync/atomic"

"github.com/flowbehappy/tigate/downstreamadapter/sink"
"github.com/flowbehappy/tigate/heartbeatpb"
"github.com/flowbehappy/tigate/pkg/common"
Expand Down Expand Up @@ -70,6 +72,8 @@ type TableTriggerEventDispatcher struct {
ResolvedTs uint64

MemoryUsage *MemoryUsage

IsRemoving atomic.Bool
}

func (d *TableTriggerEventDispatcher) GetSink() sink.Sink {
Expand Down Expand Up @@ -127,3 +131,7 @@ func (d *TableTriggerEventDispatcher) GetCheckpointTs() uint64 { return 0 }
func (d *TableTriggerEventDispatcher) GetComponentStatus() heartbeatpb.ComponentState {
return heartbeatpb.ComponentState_Working
}

func (d *TableTriggerEventDispatcher) GetRemovingStatus() bool {
return d.IsRemoving.Load()
}
10 changes: 3 additions & 7 deletions downstreamadapter/dispatchermanager/event_dispatcher_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,11 +288,7 @@ func (e *EventDispatcherManager) CollectHeartbeatInfoWhenStatesChanged(ctx conte
func (e *EventDispatcherManager) RemoveTableEventDispatcher(tableSpan *common.TableSpan) {
dispatcher, ok := e.dispatcherMap.Get(tableSpan)
if ok {
if dispatcher.GetComponentStatus() == heartbeatpb.ComponentState_Stopping {
e.GetTableSpanStatusesChan() <- &heartbeatpb.TableSpanStatus{
Span: tableSpan.TableSpan,
ComponentStatus: heartbeatpb.ComponentState_Stopping,
}
if dispatcher.GetRemovingStatus() == true {
return
}
appcontext.GetService[*eventcollector.EventCollector](appcontext.EventCollector).RemoveDispatcher(dispatcher)
Expand Down Expand Up @@ -322,6 +318,7 @@ func (e *EventDispatcherManager) newTableTriggerEventDispatcher(startTs uint64)
Sink: e.sink,
TableSpan: &common.DDLSpan,
State: dispatcher.NewState(),
IsRemoving: atomic.Bool{},
//MemoryUsage: dispatcher.NewMemoryUsage(),
}

Expand Down Expand Up @@ -386,8 +383,7 @@ func (e *EventDispatcherManager) CollectHeartbeatInfo(needCompleteStatus bool) *
// TODO: we need to consider how to deal with the checkpointTs of the removed dispatcher if the message will be discarded.
dispatcher.CollectDispatcherHeartBeatInfo(tableEventDispatcher, dispatcherHeartBeatInfo)

componentStatus := dispatcherHeartBeatInfo.ComponentStatus
if componentStatus == heartbeatpb.ComponentState_Stopping {
if dispatcherHeartBeatInfo.IsRemoving == true {
watermark, ok := tableEventDispatcher.TryClose()
if ok {
// remove successfully
Expand Down
Loading