Skip to content

Commit

Permalink
eventService: reduce resolvedTs lag (#199)
Browse files Browse the repository at this point in the history
* add debug log for perf testing

Signed-off-by: dongmen <[email protected]>

* add more log

Signed-off-by: dongmen <[email protected]>

* add resolved Ts metrics in eventService

Signed-off-by: dongmen <[email protected]>

* add resolved Ts metrics in eventService

Signed-off-by: dongmen <[email protected]>

* add resolved Ts metrics in eventService 3

Signed-off-by: dongmen <[email protected]>

* use send command to send heartbeat

Signed-off-by: dongmen <[email protected]>

* add checkpointTsLag and resolvedTsLag metrics to dispatcherManager

Signed-off-by: dongmen <[email protected]>

* remove useless code

Signed-off-by: dongmen <[email protected]>

* add scan task inqueue duration metric

Signed-off-by: dongmen <[email protected]>

* add event lag duration for event collector

Signed-off-by: dongmen <[email protected]>

* add resolvedTs lag  for event collector

Signed-off-by: dongmen <[email protected]>

* eventService: use sync map to store dispatcher

Signed-off-by: dongmen <[email protected]>

* refine metrics

Signed-off-by: dongmen <[email protected]>

* fix some error

Signed-off-by: dongmen <[email protected]>

* fix metric error

Signed-off-by: dongmen <[email protected]>

* add debug log

Signed-off-by: dongmen <[email protected]>

* fix metrics

Signed-off-by: dongmen <[email protected]>

* fix metrics 2

Signed-off-by: dongmen <[email protected]>

* fix metrics 3

Signed-off-by: dongmen <[email protected]>

* fix metrics 4

Signed-off-by: dongmen <[email protected]>

* fix metrics 4

Signed-off-by: dongmen <[email protected]>

* fix metrics 5

Signed-off-by: dongmen <[email protected]>

* add event service dispatcher watermark metric

Signed-off-by: dongmen <[email protected]>

* add panel

* refactor event broker

* ignore log file

* fix concurrent map iteration and map write

* event broker enlarge notify chan size

Signed-off-by: dongmen <[email protected]>

* fix ut

---------

Signed-off-by: dongmen <[email protected]>
Co-authored-by: CharlesCheung <[email protected]>
  • Loading branch information
asddongmen and CharlesCheung96 authored Aug 20, 2024
1 parent 0b0259d commit 9c6d49d
Show file tree
Hide file tree
Showing 14 changed files with 1,527 additions and 529 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ tools/include

.vscode
.idea
*.log
17 changes: 13 additions & 4 deletions downstreamadapter/dispatchermanager/event_dispatcher_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ type EventDispatcherManager struct {
filter filter.Filter
metricCreateDispatcherDuration prometheus.Observer
metricCheckpointTs prometheus.Gauge
metricCheckpointTsLag prometheus.Gauge
metricResolveTs prometheus.Gauge
metricResolvedTsLag prometheus.Gauge

closing bool
closed atomic.Bool
Expand All @@ -95,7 +97,9 @@ func NewEventDispatcherManager(changefeedID model.ChangeFeedID, changefeedConfig
tableEventDispatcherCount: metrics.TableEventDispatcherGauge.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricCreateDispatcherDuration: metrics.CreateDispatcherDuration.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricCheckpointTs: metrics.EventDispatcherManagerCheckpointTsGauge.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricCheckpointTsLag: metrics.EventDispatcherManagerCheckpointTsLagGauge.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricResolveTs: metrics.EventDispatcherManagerResolvedTsGauge.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricResolvedTsLag: metrics.EventDispatcherManagerResolvedTsLagGauge.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
}

// TODO: 最后去更新一下 filter 的内部 NewFilter 函数,现在是在套壳适配
Expand Down Expand Up @@ -383,7 +387,6 @@ func (e *EventDispatcherManager) CollectHeartbeatInfo(needCompleteStatus bool) *
// If it's closed successfully, we could clean it up.
// TODO: we need to consider how to deal with the checkpointTs of the removed dispatcher if the message will be discarded.
dispatcherItem.CollectDispatcherHeartBeatInfo(dispatcherHeartBeatInfo)

if dispatcherHeartBeatInfo.IsRemoving {
watermark, ok := dispatcherItem.TryClose()
if ok {
Expand All @@ -405,16 +408,22 @@ func (e *EventDispatcherManager) CollectHeartbeatInfo(needCompleteStatus bool) *
message.Statuses = append(message.Statuses, &heartbeatpb.TableSpanStatus{
Span: dispatcherHeartBeatInfo.TableSpan.TableSpan,
ComponentStatus: dispatcherHeartBeatInfo.ComponentStatus,
CheckpointTs: dispatcherHeartBeatInfo.Watermark.CheckpointTs,
})
}
}

for _, tableSpan := range toReomveTableSpans {
e.cleanTableEventDispatcher(tableSpan)
}

e.metricCheckpointTs.Set(float64(oracle.ExtractPhysical(message.Watermark.CheckpointTs)))
e.metricResolveTs.Set(float64(oracle.ExtractPhysical(message.Watermark.ResolvedTs)))
ckptTs := oracle.ExtractPhysical(message.Watermark.CheckpointTs)
e.metricCheckpointTs.Set(float64(ckptTs))
lag := (oracle.GetPhysical(time.Now()) - ckptTs) / 1e3
e.metricCheckpointTsLag.Set(float64(lag))
resolvedTs := oracle.ExtractPhysical(message.Watermark.ResolvedTs)
e.metricResolveTs.Set(float64(resolvedTs))
lag = (oracle.GetPhysical(time.Now()) - resolvedTs) / 1e3
e.metricResolvedTsLag.Set(float64(lag))
return &message
}

Expand Down
2 changes: 1 addition & 1 deletion downstreamadapter/dispatchermanager/heartbeat_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (c *HeartBeatCollector) RegisterEventDispatcherManager(m *EventDispatcherMa
func (c *HeartBeatCollector) SendHeartBeatMessages() {
for {
heartBeatRequestWithTargetID := c.requestQueue.Dequeue()
err := appcontext.GetService[messaging.MessageCenter](appcontext.MessageCenter).SendEvent(&messaging.TargetMessage{
err := appcontext.GetService[messaging.MessageCenter](appcontext.MessageCenter).SendCommand(&messaging.TargetMessage{
To: heartBeatRequestWithTargetID.TargetID,
Topic: messaging.MaintainerManagerTopic,
Type: messaging.TypeHeartBeatRequest,
Expand Down
43 changes: 43 additions & 0 deletions downstreamadapter/eventcollector/event_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tiflow/pkg/chann"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -73,6 +74,9 @@ type EventCollector struct {
registerMessageChan *chann.DrainableChann[RegisterInfo] // for temp
metricDispatcherReceivedKVEventCount prometheus.Counter
metricDispatcherReceivedResolvedTsEventCount prometheus.Counter
metricReceiveEventLagDuration prometheus.Observer
metricReceiveResolvedTsEventLagDuration prometheus.Observer
metricResolvedTsLag prometheus.Gauge
}

func NewEventCollector(globalMemoryQuota int64, serverId messaging.ServerId) *EventCollector {
Expand All @@ -83,6 +87,9 @@ func NewEventCollector(globalMemoryQuota int64, serverId messaging.ServerId) *Ev
registerMessageChan: chann.NewAutoDrainChann[RegisterInfo](),
metricDispatcherReceivedKVEventCount: metrics.DispatcherReceivedEventCount.WithLabelValues("KVEvent"),
metricDispatcherReceivedResolvedTsEventCount: metrics.DispatcherReceivedEventCount.WithLabelValues("ResolvedTs"),
metricReceiveEventLagDuration: metrics.EventCollectorReceivedEventLagDuration.WithLabelValues("KVEvent"),
metricReceiveResolvedTsEventLagDuration: metrics.EventCollectorReceivedEventLagDuration.WithLabelValues("ResolvedTs"),
metricResolvedTsLag: metrics.EventCollectorResolvedTsLagGauge,
}
appcontext.GetService[messaging.MessageCenter](appcontext.MessageCenter).RegisterHandler(messaging.EventCollectorTopic, eventCollector.RecvEventsMessage)

Expand All @@ -104,6 +111,8 @@ func NewEventCollector(globalMemoryQuota int64, serverId messaging.ServerId) *Ev
}
}
}()
// update metrics
eventCollector.updateMetrics(context.Background())

return &eventCollector
}
Expand Down Expand Up @@ -174,6 +183,7 @@ func (c *EventCollector) RecvEventsMessage(ctx context.Context, msg *messaging.T
log.Error("invalid event feed message", zap.Any("msg", msg))
return apperror.AppError{Type: apperror.ErrorTypeInvalidMessage, Reason: "invalid heartbeat response message"}
}
inflightDuration := time.Since(time.Unix(0, msg.CrateAt)).Milliseconds()

dispatcherID := txnEvent.DispatcherID

Expand Down Expand Up @@ -207,9 +217,11 @@ func (c *EventCollector) RecvEventsMessage(ctx context.Context, msg *messaging.T
if txnEvent.IsDMLEvent() || txnEvent.IsDDLEvent() {
dispatcherItem.PushTxnEvent(txnEvent)
c.metricDispatcherReceivedKVEventCount.Inc()
c.metricReceiveEventLagDuration.Observe(float64(inflightDuration))
} else {
dispatcherItem.UpdateResolvedTs(txnEvent.ResolvedTs)
c.metricDispatcherReceivedResolvedTsEventCount.Inc()
c.metricReceiveResolvedTsEventLagDuration.Observe(float64(inflightDuration))
}

// dispatcherItem.UpdateResolvedTs(eventFeeds.ResolvedTs)
Expand All @@ -232,3 +244,34 @@ func (c *EventCollector) RecvEventsMessage(ctx context.Context, msg *messaging.T
}
return nil
}

func (c *EventCollector) updateMetrics(ctx context.Context) error {
ticker := time.NewTicker(10 * time.Second)
go func() {
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
minResolvedTs := uint64(0)
c.dispatcherMap.m.Range(func(key, value interface{}) bool {
d, ok := value.(*dispatcher.Dispatcher)
if !ok {
return true
}
if minResolvedTs == 0 || d.GetResolvedTs() < minResolvedTs {
minResolvedTs = d.GetResolvedTs()
}
return true
})
if minResolvedTs == 0 {
continue
}
phyResolvedTs := oracle.ExtractPhysical(minResolvedTs)
lag := (oracle.GetPhysical(time.Now()) - phyResolvedTs) / 1e3
c.metricResolvedTsLag.Set(float64(lag))
}
}
}()
return nil
}
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ require (
github.com/go-sql-driver/mysql v1.7.1
github.com/gogo/protobuf v1.3.2
github.com/google/btree v1.1.2
github.com/google/martian v2.1.0+incompatible
github.com/google/uuid v1.6.0
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2
github.com/pingcap/errors v0.11.5-0.20240318064555-6bd07397691f
Expand Down
Loading

0 comments on commit 9c6d49d

Please sign in to comment.