Skip to content

Commit

Permalink
fix dispatcher manager metrics
Browse files Browse the repository at this point in the history
Signed-off-by: dongmen <[email protected]>
  • Loading branch information
asddongmen committed Aug 28, 2024
1 parent 8770326 commit 3f3778c
Showing 1 changed file with 31 additions and 8 deletions.
39 changes: 31 additions & 8 deletions downstreamadapter/dispatchermanager/event_dispatcher_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,14 +368,6 @@ func (e *EventDispatcherManager) CollectHeartbeatInfo(needCompleteStatus bool) *
for _, id := range toReomveDispatcherIDs {
e.cleanTableEventDispatcher(id)
}
ckptTs := oracle.ExtractPhysical(message.Watermark.CheckpointTs)
e.metricCheckpointTs.Set(float64(ckptTs))
lag := (oracle.GetPhysical(time.Now()) - ckptTs) / 1e3
e.metricCheckpointTsLag.Set(float64(lag))
resolvedTs := oracle.ExtractPhysical(message.Watermark.ResolvedTs)
e.metricResolveTs.Set(float64(resolvedTs))
lag = (oracle.GetPhysical(time.Now()) - resolvedTs) / 1e3
e.metricResolvedTsLag.Set(float64(lag))
return &message
}

Expand Down Expand Up @@ -407,6 +399,37 @@ func (e *EventDispatcherManager) SetMaintainerID(maintainerID messaging.ServerId
e.maintainerID = maintainerID
}

func (e *EventDispatcherManager) updateMetrics(ctx context.Context) error {
ticker := time.NewTicker(10 * time.Second)
go func() {
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
minResolvedTs := uint64(0)
e.dispatcherMap.m.Range(func(key, value interface{}) bool {
d, ok := value.(*dispatcher.Dispatcher)
if !ok {
return true
}
if minResolvedTs == 0 || d.GetResolvedTs() < minResolvedTs {
minResolvedTs = d.GetResolvedTs()
}
return true
})
if minResolvedTs == 0 {
continue
}
phyResolvedTs := oracle.ExtractPhysical(minResolvedTs)
lag := (oracle.GetPhysical(time.Now()) - phyResolvedTs) / 1e3
e.metricResolvedTsLag.Set(float64(lag))
}
}
}()
return nil
}

// 测一下用 sync.Map 的效果和普通的 map 相比
type DispatcherMap struct {
m sync.Map
Expand Down

0 comments on commit 3f3778c

Please sign in to comment.