Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eventService: reduce resolvedTs lag #199

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
1cab7d3
add debug log for perf testing
asddongmen Aug 12, 2024
754fffb
add more log
asddongmen Aug 13, 2024
32894bd
Merge remote-tracking branch 'upstream/master' into messaging-batch-i…
asddongmen Aug 13, 2024
21ba685
add resolved Ts metrics in eventService
asddongmen Aug 13, 2024
8d10db9
add resolved Ts metrics in eventService
asddongmen Aug 13, 2024
cbcc86a
add resolved Ts metrics in eventService 3
asddongmen Aug 13, 2024
e2b7c01
use send command to send heartbeat
asddongmen Aug 13, 2024
42ae90d
add checkpointTsLag and resolvedTsLag metrics to dispatcherManager
asddongmen Aug 15, 2024
20440e5
remove useless code
asddongmen Aug 15, 2024
4d2879b
add scan task inqueue duration metric
asddongmen Aug 15, 2024
b234d22
add event lag duration for event collector
asddongmen Aug 15, 2024
6ae0bd0
add resolvedTs lag for event collector
asddongmen Aug 15, 2024
ef0d8ab
eventService: use sync map to store dispatcher
asddongmen Aug 15, 2024
0b3b9b5
refine metrics
asddongmen Aug 15, 2024
db830e8
fix some error
asddongmen Aug 19, 2024
07b444b
fix metric error
asddongmen Aug 19, 2024
7d06ea8
add debug log
asddongmen Aug 19, 2024
bbbe615
fix metrics
asddongmen Aug 19, 2024
b0e0f87
fix metrics 2
asddongmen Aug 19, 2024
09aac0a
fix metrics 3
asddongmen Aug 19, 2024
cf028ba
fix metrics 4
asddongmen Aug 19, 2024
429fa6c
fix metrics 4
asddongmen Aug 19, 2024
986ee33
fix metrics 5
asddongmen Aug 19, 2024
cce86c5
add event service dispatcher watermark metric
asddongmen Aug 19, 2024
b7dc129
add panel
CharlesCheung96 Aug 19, 2024
82bee88
refactor event broker
CharlesCheung96 Aug 19, 2024
c03ba1e
ignore log file
CharlesCheung96 Aug 20, 2024
bd8a263
fix concurrent map iteration and map write
CharlesCheung96 Aug 20, 2024
f5da741
Merge remote-tracking branch 'upstream/master' into messaging-batch-i…
asddongmen Aug 20, 2024
0fed90f
event broker enlarge notify chan size
asddongmen Aug 20, 2024
77d905a
Merge remote-tracking branch 'tigate/master' into messaging-batch-in-…
CharlesCheung96 Aug 20, 2024
dc4c63e
fix ut
CharlesCheung96 Aug 20, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ tools/include

.vscode
.idea
*.log
17 changes: 13 additions & 4 deletions downstreamadapter/dispatchermanager/event_dispatcher_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ type EventDispatcherManager struct {
filter filter.Filter
metricCreateDispatcherDuration prometheus.Observer
metricCheckpointTs prometheus.Gauge
metricCheckpointTsLag prometheus.Gauge
metricResolveTs prometheus.Gauge
metricResolvedTsLag prometheus.Gauge

closing bool
closed atomic.Bool
Expand All @@ -95,7 +97,9 @@ func NewEventDispatcherManager(changefeedID model.ChangeFeedID, changefeedConfig
tableEventDispatcherCount: metrics.TableEventDispatcherGauge.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricCreateDispatcherDuration: metrics.CreateDispatcherDuration.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricCheckpointTs: metrics.EventDispatcherManagerCheckpointTsGauge.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricCheckpointTsLag: metrics.EventDispatcherManagerCheckpointTsLagGauge.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricResolveTs: metrics.EventDispatcherManagerResolvedTsGauge.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricResolvedTsLag: metrics.EventDispatcherManagerResolvedTsLagGauge.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
}

// TODO: 最后去更新一下 filter 的内部 NewFilter 函数,现在是在套壳适配
Expand Down Expand Up @@ -383,7 +387,6 @@ func (e *EventDispatcherManager) CollectHeartbeatInfo(needCompleteStatus bool) *
// If it's closed successfully, we could clean it up.
// TODO: we need to consider how to deal with the checkpointTs of the removed dispatcher if the message will be discarded.
dispatcherItem.CollectDispatcherHeartBeatInfo(dispatcherHeartBeatInfo)

if dispatcherHeartBeatInfo.IsRemoving {
watermark, ok := dispatcherItem.TryClose()
if ok {
Expand All @@ -405,16 +408,22 @@ func (e *EventDispatcherManager) CollectHeartbeatInfo(needCompleteStatus bool) *
message.Statuses = append(message.Statuses, &heartbeatpb.TableSpanStatus{
Span: dispatcherHeartBeatInfo.TableSpan.TableSpan,
ComponentStatus: dispatcherHeartBeatInfo.ComponentStatus,
CheckpointTs: dispatcherHeartBeatInfo.Watermark.CheckpointTs,
})
}
}

for _, tableSpan := range toReomveTableSpans {
e.cleanTableEventDispatcher(tableSpan)
}

e.metricCheckpointTs.Set(float64(oracle.ExtractPhysical(message.Watermark.CheckpointTs)))
e.metricResolveTs.Set(float64(oracle.ExtractPhysical(message.Watermark.ResolvedTs)))
ckptTs := oracle.ExtractPhysical(message.Watermark.CheckpointTs)
e.metricCheckpointTs.Set(float64(ckptTs))
lag := (oracle.GetPhysical(time.Now()) - ckptTs) / 1e3
e.metricCheckpointTsLag.Set(float64(lag))
resolvedTs := oracle.ExtractPhysical(message.Watermark.ResolvedTs)
e.metricResolveTs.Set(float64(resolvedTs))
lag = (oracle.GetPhysical(time.Now()) - resolvedTs) / 1e3
e.metricResolvedTsLag.Set(float64(lag))
return &message
}

Expand Down
2 changes: 1 addition & 1 deletion downstreamadapter/dispatchermanager/heartbeat_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (c *HeartBeatCollector) RegisterEventDispatcherManager(m *EventDispatcherMa
func (c *HeartBeatCollector) SendHeartBeatMessages() {
for {
heartBeatRequestWithTargetID := c.requestQueue.Dequeue()
err := appcontext.GetService[messaging.MessageCenter](appcontext.MessageCenter).SendEvent(&messaging.TargetMessage{
err := appcontext.GetService[messaging.MessageCenter](appcontext.MessageCenter).SendCommand(&messaging.TargetMessage{
To: heartBeatRequestWithTargetID.TargetID,
Topic: messaging.MaintainerManagerTopic,
Type: messaging.TypeHeartBeatRequest,
Expand Down
43 changes: 43 additions & 0 deletions downstreamadapter/eventcollector/event_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tiflow/pkg/chann"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -73,6 +74,9 @@ type EventCollector struct {
registerMessageChan *chann.DrainableChann[RegisterInfo] // for temp
metricDispatcherReceivedKVEventCount prometheus.Counter
metricDispatcherReceivedResolvedTsEventCount prometheus.Counter
metricReceiveEventLagDuration prometheus.Observer
metricReceiveResolvedTsEventLagDuration prometheus.Observer
metricResolvedTsLag prometheus.Gauge
}

func NewEventCollector(globalMemoryQuota int64, serverId messaging.ServerId) *EventCollector {
Expand All @@ -83,6 +87,9 @@ func NewEventCollector(globalMemoryQuota int64, serverId messaging.ServerId) *Ev
registerMessageChan: chann.NewAutoDrainChann[RegisterInfo](),
metricDispatcherReceivedKVEventCount: metrics.DispatcherReceivedEventCount.WithLabelValues("KVEvent"),
metricDispatcherReceivedResolvedTsEventCount: metrics.DispatcherReceivedEventCount.WithLabelValues("ResolvedTs"),
metricReceiveEventLagDuration: metrics.EventCollectorReceivedEventLagDuration.WithLabelValues("KVEvent"),
metricReceiveResolvedTsEventLagDuration: metrics.EventCollectorReceivedEventLagDuration.WithLabelValues("ResolvedTs"),
metricResolvedTsLag: metrics.EventCollectorResolvedTsLagGauge,
}
appcontext.GetService[messaging.MessageCenter](appcontext.MessageCenter).RegisterHandler(messaging.EventCollectorTopic, eventCollector.RecvEventsMessage)

Expand All @@ -104,6 +111,8 @@ func NewEventCollector(globalMemoryQuota int64, serverId messaging.ServerId) *Ev
}
}
}()
// update metrics
eventCollector.updateMetrics(context.Background())

return &eventCollector
}
Expand Down Expand Up @@ -174,6 +183,7 @@ func (c *EventCollector) RecvEventsMessage(ctx context.Context, msg *messaging.T
log.Error("invalid event feed message", zap.Any("msg", msg))
return apperror.AppError{Type: apperror.ErrorTypeInvalidMessage, Reason: "invalid heartbeat response message"}
}
inflightDuration := time.Since(time.Unix(0, msg.CrateAt)).Milliseconds()

dispatcherID := txnEvent.DispatcherID

Expand Down Expand Up @@ -207,9 +217,11 @@ func (c *EventCollector) RecvEventsMessage(ctx context.Context, msg *messaging.T
if txnEvent.IsDMLEvent() || txnEvent.IsDDLEvent() {
dispatcherItem.PushTxnEvent(txnEvent)
c.metricDispatcherReceivedKVEventCount.Inc()
c.metricReceiveEventLagDuration.Observe(float64(inflightDuration))
} else {
dispatcherItem.UpdateResolvedTs(txnEvent.ResolvedTs)
c.metricDispatcherReceivedResolvedTsEventCount.Inc()
c.metricReceiveResolvedTsEventLagDuration.Observe(float64(inflightDuration))
}

// dispatcherItem.UpdateResolvedTs(eventFeeds.ResolvedTs)
Expand All @@ -232,3 +244,34 @@ func (c *EventCollector) RecvEventsMessage(ctx context.Context, msg *messaging.T
}
return nil
}

func (c *EventCollector) updateMetrics(ctx context.Context) error {
ticker := time.NewTicker(10 * time.Second)
go func() {
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
minResolvedTs := uint64(0)
c.dispatcherMap.m.Range(func(key, value interface{}) bool {
d, ok := value.(*dispatcher.Dispatcher)
if !ok {
return true
}
if minResolvedTs == 0 || d.GetResolvedTs() < minResolvedTs {
minResolvedTs = d.GetResolvedTs()
}
return true
})
if minResolvedTs == 0 {
continue
}
phyResolvedTs := oracle.ExtractPhysical(minResolvedTs)
lag := (oracle.GetPhysical(time.Now()) - phyResolvedTs) / 1e3
c.metricResolvedTsLag.Set(float64(lag))
}
}
}()
return nil
}
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ require (
github.com/go-sql-driver/mysql v1.7.1
github.com/gogo/protobuf v1.3.2
github.com/google/btree v1.1.2
github.com/google/martian v2.1.0+incompatible
github.com/google/uuid v1.6.0
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2
github.com/pingcap/errors v0.11.5-0.20240318064555-6bd07397691f
Expand Down
Loading
Loading