From ee71e3b66b0418c7430015dfa6742184ac99d33b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Wilczy=C5=84ski?= Date: Wed, 8 Jan 2025 09:49:35 +0100 Subject: [PATCH] [ENG-2342] unit and fixes --- monitor/census.go | 21 ++++++++++++++++----- server/ai_live_video.go | 8 +++++--- server/ai_process.go | 2 +- 3 files changed, 22 insertions(+), 9 deletions(-) diff --git a/monitor/census.go b/monitor/census.go index 6f01cf82a..acc9f572b 100644 --- a/monitor/census.go +++ b/monitor/census.go @@ -376,7 +376,7 @@ func InitCensus(nodeType NodeType, version string) { census.mAIResultUploadTime = stats.Float64("ai_result_upload_time_seconds", "Upload (to Orchestrator) time", "sec") census.mAIResultSaveFailed = stats.Int64("ai_result_upload_failed_total", "AIResultUploadFailed", "tot") census.mAICurrentLivePipelines = stats.Int64("ai_current_live_pipelines", "Number of live AI pipelines currently running", "tot") - census.mAIFirstSegmentDelay = stats.Int64("ai_first_segment_delay", "Delay of the first live AI segment being processed", "ms") + census.mAIFirstSegmentDelay = stats.Int64("ai_first_segment_delay_ms", "Delay of the first live AI segment being processed", "ms") glog.Infof("Compiler: %s Arch %s OS %s Go version %s", runtime.Compiler, runtime.GOARCH, runtime.GOOS, runtime.Version()) glog.Infof("Livepeer version: %s", version) @@ -985,10 +985,10 @@ func InitCensus(nodeType NodeType, version string) { Aggregation: view.LastValue(), }, { - Name: "ai_first_segment_delay", + Name: "ai_first_segment_delay_ms", Measure: census.mAIFirstSegmentDelay, Description: "Delay of the first live AI segment being processed", - TagKeys: baseTags, + TagKeys: baseTagsWithOrchInfo, Aggregation: view.Distribution(0, .10, .20, .50, .100, .150, .200, .500, .1000, .5000, 10.000), }, } @@ -1996,8 +1996,19 @@ func AIResultDownloaded(ctx context.Context, pipeline string, model string, down } } -func AIFirstSegmentDelay(delay int64) { - stats.Record(census.ctx, census.mAIFirstSegmentDelay.M(delay)) +func AIFirstSegmentDelay(delay int64, orchInfo *lpnet.OrchestratorInfo) { + orchAddr := "" + if addr := orchInfo.GetAddress(); addr != nil { + orchAddr = common.BytesToAddress(addr).String() + } + tags := []tag.Mutator{tag.Insert(census.kOrchestratorURI, orchInfo.GetTranscoder()), tag.Insert(census.kOrchestratorAddress, orchAddr)} + capabilities := orchInfo.GetCapabilities() + if capabilities != nil { + tags = append(tags, tag.Insert(census.kOrchestratorVersion, orchInfo.GetCapabilities().GetVersion())) + } + if err := stats.RecordWithTags(census.ctx, tags, census.mAIFirstSegmentDelay.M(delay)); err != nil { + glog.Errorf("Error recording metrics err=%q", err) + } } // Convert wei to gwei diff --git a/server/ai_live_video.go b/server/ai_live_video.go index b54103147..8974cb21e 100644 --- a/server/ai_live_video.go +++ b/server/ai_live_video.go @@ -19,7 +19,9 @@ import ( "github.com/livepeer/go-livepeer/core" "github.com/livepeer/go-livepeer/media" "github.com/livepeer/go-livepeer/monitor" + "github.com/livepeer/go-livepeer/net" "github.com/livepeer/go-livepeer/trickle" + "github.com/livepeer/lpms/ffmpeg" "github.com/dustin/go-humanize" @@ -123,7 +125,7 @@ func startTricklePublish(ctx context.Context, url *url.URL, params aiRequestPara clog.Infof(ctx, "trickle pub") } -func startTrickleSubscribe(ctx context.Context, url *url.URL, params aiRequestParams, startTime time.Time) { +func startTrickleSubscribe(ctx context.Context, url *url.URL, params aiRequestParams, startTime time.Time, orchInfo *net.OrchestratorInfo) { // subscribe to the outputs and send them into LPMS subscriber := trickle.NewTrickleSubscriber(url.String()) r, w, err := os.Pipe() @@ -137,7 +139,7 @@ func startTrickleSubscribe(ctx context.Context, url *url.URL, params aiRequestPa // read segments from trickle subscription go func() { var err error - firstSegment := false + firstSegment := true defer w.Close() retries := 0 @@ -185,7 +187,7 @@ func startTrickleSubscribe(ctx context.Context, url *url.URL, params aiRequestPa } if monitor.Enabled && firstSegment { firstSegment = false - monitor.AIFirstSegmentDelay(time.Since(startTime).Milliseconds()) + monitor.AIFirstSegmentDelay(time.Since(startTime).Milliseconds(), orchInfo) } clog.V(8).Infof(ctx, "trickle subscribe read data completed seq=%d bytes=%s", seq, humanize.Bytes(uint64(n))) } diff --git a/server/ai_process.go b/server/ai_process.go index 5a33117e6..8c6ac95f3 100644 --- a/server/ai_process.go +++ b/server/ai_process.go @@ -1078,7 +1078,7 @@ func submitLiveVideoToVideo(ctx context.Context, params aiRequestParams, sess *A startControlPublish(control, params) startTricklePublish(ctx, pub, params, sess) - startTrickleSubscribe(ctx, sub, params, startTime) + startTrickleSubscribe(ctx, sub, params, startTime, sess.OrchestratorInfo) startEventsSubscribe(ctx, events, params, sess) return resp, nil }