Skip to content

Commit

Permalink
*: separate resolvedTs event from txnEvent (#231)
Browse files Browse the repository at this point in the history
* common: add TEvent

Signed-off-by: dongmen <[email protected]>

* common: add resolvedTs event

Signed-off-by: dongmen <[email protected]>

* common: unit test cases

Signed-off-by: dongmen <[email protected]>

* fix panic

Signed-off-by: dongmen <[email protected]>

* revert dispatcher metrics changes

Signed-off-by: dongmen <[email protected]>

---------

Signed-off-by: dongmen <[email protected]>
  • Loading branch information
asddongmen authored Aug 28, 2024
1 parent b89abbe commit d80de7a
Show file tree
Hide file tree
Showing 11 changed files with 435 additions and 149 deletions.
32 changes: 19 additions & 13 deletions downstreamadapter/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,21 +183,27 @@ func (d *Dispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.Dispat
}
}

func (d *Dispatcher) HandleEvent(event *common.TxnEvent) bool {
if event.IsDMLEvent() {
d.sink.AddDMLEvent(event, d.tableProgress)
return false
} else if event.IsDDLEvent() {
event.PostTxnFlushed = append(event.PostTxnFlushed, func() {
dispatcherEventDynamicStream := GetDispatcherEventsDynamicStream()
dispatcherEventDynamicStream.Wake() <- event.GetDispatcherID()
})
d.AddDDLEventToSinkWhenAvailable(event)
return true
} else {
d.resolvedTs.Set(event.ResolvedTs)
func (d *Dispatcher) HandleEvent(event common.Event) bool {
switch event.GetType() {
case common.TypeTxnEvent:
event := event.(*common.TxnEvent)
if event.IsDMLEvent() {
d.sink.AddDMLEvent(event, d.tableProgress)
return false
} else if event.IsDDLEvent() {
event.PostTxnFlushed = append(event.PostTxnFlushed, func() {
dispatcherEventDynamicStream := GetDispatcherEventsDynamicStream()
dispatcherEventDynamicStream.Wake() <- event.GetDispatcherID()
})
d.AddDDLEventToSinkWhenAvailable(event)
return true
}
case common.TypeResolvedEvent:
d.resolvedTs.Set(event.(common.ResolvedEvent).ResolvedTs)
return false
}
log.Panic("invalid event type", zap.Any("event", event))
return false
}

// 1.If the event is a single table DDL, it will be added to the sink for writing to downstream(async). If the ddl leads to add new tables or drop tables, it should send heartbeat to maintainer
Expand Down
10 changes: 5 additions & 5 deletions downstreamadapter/dispatcher/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,12 @@ func (t *ResendTask) Cancel() {
type DispatcherEventsHandler struct {
}

func (h *DispatcherEventsHandler) Path(event *common.TxnEvent) common.DispatcherID {
func (h *DispatcherEventsHandler) Path(event common.Event) common.DispatcherID {
return event.GetDispatcherID()
}

// TODO: 这个后面需要按照更大的粒度进行攒批
func (h *DispatcherEventsHandler) Handle(event *common.TxnEvent, dispatcher *Dispatcher) bool {
func (h *DispatcherEventsHandler) Handle(event common.Event, dispatcher *Dispatcher) bool {
return dispatcher.HandleEvent(event)
}

Expand All @@ -206,10 +206,10 @@ func SetDispatcherTaskScheduler(taskScheduler threadpool.ThreadPool) {
DispatcherTaskScheduler = taskScheduler
}

var dispatcherEventsDynamicStream dynstream.DynamicStream[common.DispatcherID, *common.TxnEvent, *Dispatcher]
var dispatcherEventsDynamicStream dynstream.DynamicStream[common.DispatcherID, common.Event, *Dispatcher]
var dispatcherEventsDynamicStreamOnce sync.Once

func GetDispatcherEventsDynamicStream() dynstream.DynamicStream[common.DispatcherID, *common.TxnEvent, *Dispatcher] {
func GetDispatcherEventsDynamicStream() dynstream.DynamicStream[common.DispatcherID, common.Event, *Dispatcher] {
if dispatcherEventsDynamicStream == nil {
dispatcherEventsDynamicStreamOnce.Do(func() {
dispatcherEventsDynamicStream = dynstream.NewDynamicStreamDefault(&DispatcherEventsHandler{})
Expand All @@ -219,7 +219,7 @@ func GetDispatcherEventsDynamicStream() dynstream.DynamicStream[common.Dispatche
return dispatcherEventsDynamicStream
}

func SetDispatcherEventsDynamicStream(dynamicStream dynstream.DynamicStream[common.DispatcherID, *common.TxnEvent, *Dispatcher]) {
func SetDispatcherEventsDynamicStream(dynamicStream dynstream.DynamicStream[common.DispatcherID, common.Event, *Dispatcher]) {
dispatcherEventsDynamicStream = dynamicStream
}

Expand Down
43 changes: 34 additions & 9 deletions downstreamadapter/dispatchermanager/event_dispatcher_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func NewEventDispatcherManager(changefeedID model.ChangeFeedID,

manager.wg.Add(1)
go manager.CollectHeartbeatInfoWhenStatesChanged(ctx)

manager.updateMetrics(ctx)
return manager
}

Expand Down Expand Up @@ -368,14 +368,6 @@ func (e *EventDispatcherManager) CollectHeartbeatInfo(needCompleteStatus bool) *
for _, id := range toReomveDispatcherIDs {
e.cleanTableEventDispatcher(id)
}
ckptTs := oracle.ExtractPhysical(message.Watermark.CheckpointTs)
e.metricCheckpointTs.Set(float64(ckptTs))
lag := (oracle.GetPhysical(time.Now()) - ckptTs) / 1e3
e.metricCheckpointTsLag.Set(float64(lag))
resolvedTs := oracle.ExtractPhysical(message.Watermark.ResolvedTs)
e.metricResolveTs.Set(float64(resolvedTs))
lag = (oracle.GetPhysical(time.Now()) - resolvedTs) / 1e3
e.metricResolvedTsLag.Set(float64(lag))
return &message
}

Expand Down Expand Up @@ -407,6 +399,39 @@ func (e *EventDispatcherManager) SetMaintainerID(maintainerID messaging.ServerId
e.maintainerID = maintainerID
}

func (e *EventDispatcherManager) updateMetrics(ctx context.Context) error {
ticker := time.NewTicker(10 * time.Second)
e.wg.Add(1)
go func() {
defer e.wg.Done()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
minResolvedTs := uint64(0)
e.dispatcherMap.m.Range(func(key, value interface{}) bool {
d, ok := value.(*dispatcher.Dispatcher)
if !ok {
return true
}
if minResolvedTs == 0 || d.GetResolvedTs() < minResolvedTs {
minResolvedTs = d.GetResolvedTs()
}
return true
})
if minResolvedTs == 0 {
continue
}
phyResolvedTs := oracle.ExtractPhysical(minResolvedTs)
lag := (oracle.GetPhysical(time.Now()) - phyResolvedTs) / 1e3
e.metricResolvedTsLag.Set(float64(lag))
}
}
}()
return nil
}

// 测一下用 sync.Map 的效果和普通的 map 相比
type DispatcherMap struct {
m sync.Map
Expand Down
22 changes: 11 additions & 11 deletions downstreamadapter/eventcollector/event_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type EventCollector struct {
globalMemoryQuota int64
wg sync.WaitGroup

dispatcherEventsDynamicStream dynstream.DynamicStream[common.DispatcherID, *common.TxnEvent, *dispatcher.Dispatcher]
dispatcherEventsDynamicStream dynstream.DynamicStream[common.DispatcherID, common.Event, *dispatcher.Dispatcher]

registerMessageChan *chann.DrainableChann[RegisterInfo] // for temp
metricDispatcherReceivedKVEventCount prometheus.Counter
Expand Down Expand Up @@ -112,9 +112,6 @@ func NewEventCollector(globalMemoryQuota int64, serverId messaging.ServerId) *Ev
}
}
}()
// update metrics
eventCollector.updateMetrics(context.Background())

return &eventCollector
}

Expand Down Expand Up @@ -171,17 +168,20 @@ func (c *EventCollector) RecvEventsMessage(ctx context.Context, msg *messaging.T
inflightDuration := time.Since(time.Unix(0, msg.CrateAt)).Milliseconds()
c.metricReceiveEventLagDuration.Observe(float64(inflightDuration))
for _, msg := range msg.Message {
txnEvent, ok := msg.(*common.TxnEvent)
event, ok := msg.(common.Event)
if !ok {
log.Panic("invalid event feed message", zap.Any("msg", msg))
log.Panic("invalid message type", zap.Any("msg", msg))
}
if txnEvent.IsDMLEvent() || txnEvent.IsDDLEvent() {
switch event.GetType() {
case common.TypeBatchResolvedTs:
for _, e := range event.(*common.BatchResolvedTs).Events {
c.metricDispatcherReceivedResolvedTsEventCount.Inc()
c.dispatcherEventsDynamicStream.In() <- e
}
default:
c.metricDispatcherReceivedKVEventCount.Inc()
} else {
c.metricDispatcherReceivedResolvedTsEventCount.Inc()
c.dispatcherEventsDynamicStream.In() <- event
}

c.dispatcherEventsDynamicStream.In() <- txnEvent
}
return nil
}
Expand Down
180 changes: 180 additions & 0 deletions pkg/common/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package common

import (
"encoding/binary"
"fmt"
"log"

"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/util/chunk"
)

const (
// TEvent is the event type of a transaction.
TypeTEvent = iota
// DDLEvent is the event type of a DDL.
TypeDDLEvent
// ResolvedEvent is the event type of a resolvedTs.
TypeResolvedEvent
// BatchResolvedTs is the event type of a batch resolvedTs.
TypeBatchResolvedTs
TypeTxnEvent
)

type Event interface {
GetType() int
GetDispatcherID() DispatcherID
}

type BatchResolvedTs struct {
Events []ResolvedEvent
}

func (b BatchResolvedTs) GetType() int {
return TypeBatchResolvedTs
}

func (b BatchResolvedTs) GetDispatcherID() DispatcherID {
// It's a fake dispatcherID.
return NewDispatcherID()
}

func (b *BatchResolvedTs) Marshal() ([]byte, error) {
buf := make([]byte, 0, len(b.Events)*24)
for _, e := range b.Events {
data, err := e.Marshal()
if err != nil {
return nil, err
}
buf = append(buf, data...)
}
return buf, nil
}

func (b *BatchResolvedTs) Unmarshal(data []byte) error {
if len(data)%24 != 0 {
log.Panic("BatchResolvedTs.Unmarshal: invalid data")
}
b.Events = make([]ResolvedEvent, 0, len(data)/24)
for i := 0; i < len(data); i += 24 {
var e ResolvedEvent
if err := e.Unmarshal(data[i : i+24]); err != nil {
return err
}
b.Events = append(b.Events, e)
}
return nil
}

// ResolvedEvent represents a resolvedTs event of a dispatcher.
type ResolvedEvent struct {
DispatcherID DispatcherID
ResolvedTs Ts
}

func (e ResolvedEvent) GetType() int {
return TypeResolvedEvent
}

func (e ResolvedEvent) GetDispatcherID() DispatcherID {
return e.DispatcherID
}

func (e ResolvedEvent) Marshal() ([]byte, error) {
buf := e.DispatcherID.Marshal()
buf = append(buf, make([]byte, 8)...)
binary.LittleEndian.PutUint64(buf[16:24], e.ResolvedTs)
return buf, nil
}

func (e *ResolvedEvent) Unmarshal(data []byte) error {
if len(data) != 24 {
log.Panic("ResolvedEvent.Unmarshal: invalid data")
}
e.DispatcherID.Unmarshal(data[:16])
e.ResolvedTs = Ts(binary.LittleEndian.Uint64(data[16:24]))
return nil
}

func (e ResolvedEvent) String() string {
return fmt.Sprintf("ResolvedEvent{DispatcherID: %s, ResolvedTs: %d}", e.DispatcherID, e.ResolvedTs)
}

// TEvent represent a transaction, it contains the rows of the transaction.
// Note: The PreRows is the rows before the transaction, it is used to generate the update and delete SQL.
// The Rows is the rows after the transaction, it is used to generate the insert and update SQL.
type TEvent struct {
DispatcherID DispatcherID
PhysicalTableID uint64
StartTs uint64
CommitTs uint64

Rows chunk.Chunk
PreRows chunk.Chunk
}

func (t TEvent) GetType() int {
return TypeTEvent
}

func (t TEvent) GetDispatcherID() DispatcherID {
return t.DispatcherID
}

func (t TEvent) Marshal() ([]byte, error) {
// TODO
log.Panic("TEvent.Marshal: not implemented")
buf := make([]byte, 0)
return buf, nil
}

func (t TEvent) Unmarshal(data []byte) error {
//TODO
log.Panic("TEvent.Unmarshal: not implemented")
return nil
}

type DDLEventX struct {
DispatcherID DispatcherID
// commitTS of the rawKV
CommitTS Ts
Job *model.Job
}

func (d DDLEventX) GetType() int {
return TypeDDLEvent
}

func (d DDLEventX) GetDispatcherID() DispatcherID {
return d.DispatcherID
}

func (d DDLEventX) Marshal() ([]byte, error) {
// TODO
log.Panic("DDLEvent.Marshal: not implemented")
buf := make([]byte, 0)
return buf, nil
}

func (d DDLEventX) Unmarshal(data []byte) error {
//TODO
log.Panic("DDLEvent.Unmarshal: not implemented")
return nil
}

func (d DDLEventX) String() string {
return fmt.Sprintf("DDLEvent{DispatcherID: %s, CommitTS: %d, Job: %v}", d.DispatcherID, d.CommitTS, d.Job)
}
Loading

0 comments on commit d80de7a

Please sign in to comment.