Skip to content

Commit

Permalink
types : change dispatcher id to GID type (#214)
Browse files Browse the repository at this point in the history
  • Loading branch information
hongyunyan authored Aug 20, 2024
1 parent c10ebf3 commit 0b0259d
Show file tree
Hide file tree
Showing 17 changed files with 438 additions and 162 deletions.
7 changes: 3 additions & 4 deletions downstreamadapter/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/flowbehappy/tigate/heartbeatpb"
"github.com/flowbehappy/tigate/pkg/common"
"github.com/flowbehappy/tigate/pkg/filter"
"github.com/google/uuid"
"github.com/pingcap/log"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -63,7 +62,7 @@ The workflow related to the dispatcher is as follows:
*/

type Dispatcher struct {
id string
id common.DispatcherID
eventCh chan *common.TxnEvent // 转换成一个函数
tableSpan *common.TableSpan
sink sink.Sink
Expand Down Expand Up @@ -95,7 +94,7 @@ type Dispatcher struct {
func NewDispatcher(tableSpan *common.TableSpan, sink sink.Sink, startTs uint64, tableSpanStatusesChan chan *heartbeatpb.TableSpanStatus, filter filter.Filter) *Dispatcher {
ctx, cancel := context.WithCancel(context.Background())
dispatcher := &Dispatcher{
id: uuid.NewString(),
id: common.NewDispatcherID(),
eventCh: make(chan *common.TxnEvent, 16),
tableSpan: tableSpan,
sink: sink,
Expand Down Expand Up @@ -177,7 +176,7 @@ func (d *Dispatcher) UpdateResolvedTs(ts uint64) {
d.GetEventChan() <- &common.TxnEvent{ResolvedTs: ts}
}

func (d *Dispatcher) GetId() string {
func (d *Dispatcher) GetId() common.DispatcherID {
return d.id
}

Expand Down
2 changes: 1 addition & 1 deletion downstreamadapter/dispatcher/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ Mainly about the progress of each dispatcher:
*/
type HeartBeatInfo struct {
heartbeatpb.Watermark
Id string
Id common.DispatcherID
TableSpan *common.TableSpan
ComponentStatus heartbeatpb.ComponentState
IsRemoving bool
Expand Down
12 changes: 6 additions & 6 deletions downstreamadapter/eventcollector/event_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type DispatcherMap struct {
m sync.Map
}

func (m *DispatcherMap) Get(dispatcherId string) (*dispatcher.Dispatcher, bool) {
func (m *DispatcherMap) Get(dispatcherId common.DispatcherID) (*dispatcher.Dispatcher, bool) {
d, ok := m.m.Load(dispatcherId)
if !ok {
return nil, false
Expand All @@ -45,11 +45,11 @@ func (m *DispatcherMap) Get(dispatcherId string) (*dispatcher.Dispatcher, bool)
return dispatcher, ok
}

func (m *DispatcherMap) Set(dispatcherId string, d *dispatcher.Dispatcher) {
func (m *DispatcherMap) Set(dispatcherId common.DispatcherID, d *dispatcher.Dispatcher) {
m.m.Store(dispatcherId, d)
}

func (m *DispatcherMap) Delete(dispatcherId string) {
func (m *DispatcherMap) Delete(dispatcherId common.DispatcherID) {
m.m.Delete(dispatcherId)
}

Expand Down Expand Up @@ -116,7 +116,7 @@ func (c *EventCollector) RegisterDispatcher(info RegisterInfo) error {
Topic: messaging.EventServiceTopic,
Type: messaging.TypeRegisterDispatcherRequest,
Message: messaging.RegisterDispatcherRequest{RegisterDispatcherRequest: &eventpb.RegisterDispatcherRequest{
DispatcherId: info.Dispatcher.GetId(),
DispatcherId: info.Dispatcher.GetId().ToPB(),
TableSpan: info.Dispatcher.GetTableSpan().TableSpan,
Remove: false,
StartTs: info.StartTs,
Expand All @@ -140,7 +140,7 @@ func (c *EventCollector) RemoveDispatcher(d *dispatcher.Dispatcher) error {
Topic: messaging.EventServiceTopic,
Type: messaging.TypeRegisterDispatcherRequest,
Message: messaging.RegisterDispatcherRequest{RegisterDispatcherRequest: &eventpb.RegisterDispatcherRequest{
DispatcherId: d.GetId(),
DispatcherId: d.GetId().ToPB(),
Remove: true,
ServerId: c.serverId.String(),
TableSpan: d.GetTableSpan().TableSpan,
Expand All @@ -155,7 +155,7 @@ func (c *EventCollector) RemoveDispatcher(d *dispatcher.Dispatcher) error {
}
return err
}
c.dispatcherMap.Delete(string(d.GetId()))
c.dispatcherMap.Delete(d.GetId())
return nil
}

Expand Down
375 changes: 292 additions & 83 deletions eventpb/event.pb.go

Large diffs are not rendered by default.

10 changes: 8 additions & 2 deletions eventpb/event.proto
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,13 @@ message TableInfo {
message EventFeed {
repeated TxnEvent txn_events = 1; // 包含多个 txn events
uint64 resolved_ts = 2; // 跟上述 event 同时出现 -- 就是标记目前达成的 resolvedTs
string dispatcher_id = 3; // 表示这个 event 是 对应哪个 dispatcher 的
DispatcherID dispatcher_id = 3; // 表示这个 event 是 对应哪个 dispatcher 的
float ratio = 4; // 表示这个 event 应该被发送到哪个 dispatcher 的 ratio
TableInfo table_info = 5; // 包含 table 相关信息,包括表名,主键,列名等
}

message RegisterDispatcherRequest {
string dispatcher_id = 1;
DispatcherID dispatcher_id = 1;
heartbeatpb.TableSpan table_span = 2;
uint64 start_ts = 3;
string server_id = 4;
Expand All @@ -68,3 +68,9 @@ message RegisterDispatcherRequest {
string changefeed_id = 7;
FilterConfig filter_config = 8;
}


message DispatcherID {
uint64 high = 1;
uint64 low = 2;
}
20 changes: 10 additions & 10 deletions logservice/eventstore/eventstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,16 @@ type EventStore interface {
// add a callback to be called when a new event is added to the store;
// but for old data this is not feasiable? may we can just return a current watermark when register
RegisterDispatcher(
dispatcherID string,
dispatcherID common.DispatcherID,
span *common.TableSpan,
startTS common.Ts,
observer EventObserver,
notifier WatermarkNotifier,
) error

UpdateDispatcherSendTS(dispatcherID string, gcTS uint64) error
UpdateDispatcherSendTS(dispatcherID common.DispatcherID, gcTS uint64) error

UnregisterDispatcher(dispatcherID string) error
UnregisterDispatcher(dispatcherID common.DispatcherID) error

// TODO: ignore large txn now, so we can read all transactions of the same commit ts at one time
// [startCommitTS, endCommitTS)?
Expand Down Expand Up @@ -99,8 +99,8 @@ type eventStore struct {

mu sync.RWMutex
// TODO: rename the following variables
tables *common.SpanHashMap[string]
spans map[string]*tableState
tables *common.SpanHashMap[common.DispatcherID]
spans map[common.DispatcherID]*tableState
}

const dataDir = "event_store"
Expand Down Expand Up @@ -158,8 +158,8 @@ func NewEventStore(

gcManager: newGCManager(),

tables: common.NewSpanHashMap[string](),
spans: make(map[string]*tableState),
tables: common.NewSpanHashMap[common.DispatcherID](),
spans: make(map[common.DispatcherID]*tableState),
}

for i := range dbs {
Expand Down Expand Up @@ -397,7 +397,7 @@ func (e *eventStore) deleteEvents(span heartbeatpb.TableSpan, startCommitTS uint
return db.DeleteRange(start, end, pebble.NoSync)
}

func (e *eventStore) RegisterDispatcher(dispatcherID string, tableSpan *common.TableSpan, startTS common.Ts, observer EventObserver, notifier WatermarkNotifier) error {
func (e *eventStore) RegisterDispatcher(dispatcherID common.DispatcherID, tableSpan *common.TableSpan, startTS common.Ts, observer EventObserver, notifier WatermarkNotifier) error {
span := *tableSpan.TableSpan
log.Info("register dispatcher",
zap.Any("dispatcherID", dispatcherID),
Expand Down Expand Up @@ -425,7 +425,7 @@ func (e *eventStore) RegisterDispatcher(dispatcherID string, tableSpan *common.T
return nil
}

func (e *eventStore) UpdateDispatcherSendTS(dispatcherID string, sendTS uint64) error {
func (e *eventStore) UpdateDispatcherSendTS(dispatcherID common.DispatcherID, sendTS uint64) error {
e.schemaStore.UpdateDispatcherSendTS(dispatcherID, common.Ts(sendTS))
e.mu.Lock()
defer e.mu.Unlock()
Expand All @@ -444,7 +444,7 @@ func (e *eventStore) UpdateDispatcherSendTS(dispatcherID string, sendTS uint64)
return nil
}

func (e *eventStore) UnregisterDispatcher(dispatcherID string) error {
func (e *eventStore) UnregisterDispatcher(dispatcherID common.DispatcherID) error {
e.schemaStore.UnregisterDispatcher(dispatcherID)
e.mu.Lock()
defer e.mu.Unlock()
Expand Down
12 changes: 6 additions & 6 deletions logservice/schemastore/multiversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type versionedTableInfoStore struct {
// dispatcherID -> max ts successfully send to dispatcher
// gcTS = min(dispatchers[dispatcherID])
// when gc, just need retain one version <= gcTS
dispatchers map[string]common.Ts
dispatchers map[common.DispatcherID]common.Ts

// ordered by ts
infos []*tableInfoItem
Expand All @@ -57,7 +57,7 @@ type versionedTableInfoStore struct {
func newEmptyVersionedTableInfoStore(tableID common.TableID) *versionedTableInfoStore {
return &versionedTableInfoStore{
tableID: tableID,
dispatchers: make(map[string]common.Ts),
dispatchers: make(map[common.DispatcherID]common.Ts),
infos: make([]*tableInfoItem, 0),
deleteVersion: math.MaxUint64,
initialized: false,
Expand Down Expand Up @@ -136,7 +136,7 @@ func (v *versionedTableInfoStore) getTableInfo(ts common.Ts) (*common.TableInfo,
}

// only keep one item with the largest version <= gcTS
func removeUnusedInfos(infos []*tableInfoItem, dispatchers map[string]common.Ts) []*tableInfoItem {
func removeUnusedInfos(infos []*tableInfoItem, dispatchers map[common.DispatcherID]common.Ts) []*tableInfoItem {
if len(infos) == 0 {
log.Fatal("no table info found")
}
Expand All @@ -159,7 +159,7 @@ func removeUnusedInfos(infos []*tableInfoItem, dispatchers map[string]common.Ts)
return infos[target-1:]
}

func (v *versionedTableInfoStore) registerDispatcher(dispatcherID string, ts common.Ts) {
func (v *versionedTableInfoStore) registerDispatcher(dispatcherID common.DispatcherID, ts common.Ts) {
v.mu.Lock()
defer v.mu.Unlock()
if _, ok := v.dispatchers[dispatcherID]; ok {
Expand All @@ -169,7 +169,7 @@ func (v *versionedTableInfoStore) registerDispatcher(dispatcherID string, ts com
}

// return true when the store can be removed(no registered dispatchers)
func (v *versionedTableInfoStore) unregisterDispatcher(dispatcherID string) bool {
func (v *versionedTableInfoStore) unregisterDispatcher(dispatcherID common.DispatcherID) bool {
v.mu.Lock()
defer v.mu.Unlock()
delete(v.dispatchers, dispatcherID)
Expand All @@ -180,7 +180,7 @@ func (v *versionedTableInfoStore) unregisterDispatcher(dispatcherID string) bool
return false
}

func (v *versionedTableInfoStore) updateDispatcherSendTS(dispatcherID string, ts common.Ts) error {
func (v *versionedTableInfoStore) updateDispatcherSendTS(dispatcherID common.DispatcherID, ts common.Ts) error {
v.mu.Lock()
defer v.mu.Unlock()
if oldTS, ok := v.dispatchers[dispatcherID]; !ok {
Expand Down
16 changes: 8 additions & 8 deletions logservice/schemastore/schemastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,19 @@ type SchemaStore interface {
// RegisterDispatcher register the dispatcher into the schema store.
// TODO: return a table info
// TODO: add filter
RegisterDispatcher(dispatcherID string, tableID common.TableID, ts common.Ts) error
RegisterDispatcher(dispatcherID common.DispatcherID, tableID common.TableID, ts common.Ts) error

// TODO: add interface for TableEventDispatcher

UpdateDispatcherSendTS(dispatcherID string, ts common.Ts) error
UpdateDispatcherSendTS(dispatcherID common.DispatcherID, ts common.Ts) error

UnregisterDispatcher(dispatcherID string) error
UnregisterDispatcher(dispatcherID common.DispatcherID) error

GetMaxFinishedDDLTS() common.Ts

GetTableInfo(tableID common.TableID, ts common.Ts) (*common.TableInfo, error)

GetNextDDLEvent(dispatcherID string) (*DDLEvent, common.Ts, error)
GetNextDDLEvent(dispatcherID common.DispatcherID) (*DDLEvent, common.Ts, error)
}

type schemaStore struct {
Expand Down Expand Up @@ -265,7 +265,7 @@ func (s *schemaStore) GetAllPhysicalTables(snapTs common.Ts, f filter.Filter) ([
}

func (s *schemaStore) RegisterDispatcher(
dispatcherID string, tableID common.TableID, startTS common.Ts,
dispatcherID common.DispatcherID, tableID common.TableID, startTS common.Ts,
) error {
s.mu.Lock()
// TODO: fix me in the future
Expand Down Expand Up @@ -353,7 +353,7 @@ func (s *schemaStore) RegisterDispatcher(
return nil
}

func (s *schemaStore) UpdateDispatcherSendTS(dispatcherID string, ts common.Ts) error {
func (s *schemaStore) UpdateDispatcherSendTS(dispatcherID common.DispatcherID, ts common.Ts) error {
s.mu.RLock()
defer s.mu.RUnlock()
info, ok := s.dispatchersMap[dispatcherID]
Expand All @@ -365,7 +365,7 @@ func (s *schemaStore) UpdateDispatcherSendTS(dispatcherID string, ts common.Ts)
return nil
}

func (s *schemaStore) UnregisterDispatcher(dispatcherID string) error {
func (s *schemaStore) UnregisterDispatcher(dispatcherID common.DispatcherID) error {
s.mu.Lock()
defer s.mu.Unlock()
info, ok := s.dispatchersMap[dispatcherID]
Expand Down Expand Up @@ -418,7 +418,7 @@ func (s *schemaStore) GetTableInfo(tableID common.TableID, ts common.Ts) (*commo
return store.getTableInfo(ts)
}

func (s *schemaStore) GetNextDDLEvent(dispatcherID string) (*DDLEvent, common.Ts, error) {
func (s *schemaStore) GetNextDDLEvent(dispatcherID common.DispatcherID) (*DDLEvent, common.Ts, error) {
return nil, 0, nil
}

Expand Down
2 changes: 1 addition & 1 deletion logservice/schemastore/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,4 @@ type DatabaseInfoMap map[int64]*DatabaseInfo

type TableInfoStoreMap map[common.TableID]*versionedTableInfoStore

type DispatcherInfoMap map[string]DispatcherInfo
type DispatcherInfoMap map[common.DispatcherID]DispatcherInfo
13 changes: 7 additions & 6 deletions pkg/common/context/app_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ var (
)

const (
MessageCenter = "MessageCenter"
EventCollector = "EventCollector"
HeartbeatCollector = "HeartbeatCollector"
SchemaStore = "SchemaStore"
EventStore = "EventStore"
EventService = "EventService"
MessageCenter = "MessageCenter"
EventCollector = "EventCollector"
HeartbeatCollector = "HeartbeatCollector"
SchemaStore = "SchemaStore"
EventStore = "EventStore"
EventService = "EventService"
DispatcherDynamicStream = "DispatcherDynamicStream"
)

// Put all the global instances here.
Expand Down
2 changes: 1 addition & 1 deletion pkg/common/txn_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type TxnEvent struct {
ClusterID uint64 `msg:"cluster-id"`

// TODO: fix
DispatcherID string `msg:"dispatcher-id"`
DispatcherID DispatcherID `msg:"dispatcher-id"`

// Span of this event belongs to.
Span *TableSpan `msg:"-"`
Expand Down
14 changes: 9 additions & 5 deletions pkg/common/txn_event_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 0b0259d

Please sign in to comment.