From 139f2c8297243c1bc68132c9bd0506ec0719883d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Wilczy=C5=84ski?= Date: Tue, 7 Jan 2025 16:26:26 +0100 Subject: [PATCH] [ENG-2342] ai_first_segment_delay metric added --- monitor/census.go | 13 +++++++++++++ server/ai_live_video.go | 8 +++++++- server/ai_process.go | 3 ++- 3 files changed, 22 insertions(+), 2 deletions(-) diff --git a/monitor/census.go b/monitor/census.go index 0993a7f74..dae60135a 100644 --- a/monitor/census.go +++ b/monitor/census.go @@ -205,6 +205,7 @@ type ( mAIResultUploadTime *stats.Float64Measure mAIResultSaveFailed *stats.Int64Measure mAICurrentLivePipelines *stats.Int64Measure + mAIFirstSegmentDelay *stats.Int64Measure lock sync.Mutex emergeTimes map[uint64]map[uint64]time.Time // nonce:seqNo @@ -375,6 +376,7 @@ func InitCensus(nodeType NodeType, version string) { census.mAIResultUploadTime = stats.Float64("ai_result_upload_time_seconds", "Upload (to Orchestrator) time", "sec") census.mAIResultSaveFailed = stats.Int64("ai_result_upload_failed_total", "AIResultUploadFailed", "tot") census.mAICurrentLivePipelines = stats.Int64("ai_current_live_pipelines", "Number of live AI pipelines currently running", "tot") + census.mAIFirstSegmentDelay = stats.Int64("ai_first_segment_delay", "Delay to the first AI segment being processed", "ms") glog.Infof("Compiler: %s Arch %s OS %s Go version %s", runtime.Compiler, runtime.GOARCH, runtime.GOOS, runtime.Version()) glog.Infof("Livepeer version: %s", version) @@ -982,6 +984,13 @@ func InitCensus(nodeType NodeType, version string) { TagKeys: append([]tag.Key{census.kOrchestratorURI, census.kPipeline, census.kModelName}, baseTags...), Aggregation: view.LastValue(), }, + { + Name: "ai_first_segment_delay", + Measure: census.mAIFirstSegmentDelay, + Description: "Delay to the first AI segment being processed", + TagKeys: baseTags, + Aggregation: view.Distribution(0, .10, .20, .50, .100, .150, .200, .500, .1000, .5000, 10.000), + }, } // Register the views @@ -1987,6 +1996,10 @@ func AIResultDownloaded(ctx context.Context, pipeline string, model string, down } } +func AIFirstSegmentDelay(delay int64) { + stats.Record(census.ctx, census.mAIFirstSegmentDelay.M(delay)) +} + // Convert wei to gwei func wei2gwei(wei *big.Int) float64 { gwei, _ := new(big.Float).Quo(new(big.Float).SetInt(wei), big.NewFloat(float64(gweiConversionFactor))).Float64() diff --git a/server/ai_live_video.go b/server/ai_live_video.go index 3dfc0f1dc..21012998a 100644 --- a/server/ai_live_video.go +++ b/server/ai_live_video.go @@ -123,7 +123,7 @@ func startTricklePublish(ctx context.Context, url *url.URL, params aiRequestPara clog.Infof(ctx, "trickle pub") } -func startTrickleSubscribe(ctx context.Context, url *url.URL, params aiRequestParams) { +func startTrickleSubscribe(ctx context.Context, url *url.URL, params aiRequestParams, startTime time.Time) { // subscribe to the outputs and send them into LPMS subscriber := trickle.NewTrickleSubscriber(url.String()) r, w, err := os.Pipe() @@ -137,6 +137,8 @@ func startTrickleSubscribe(ctx context.Context, url *url.URL, params aiRequestPa // read segments from trickle subscription go func() { var err error + firstSegment := false + defer w.Close() retries := 0 // we're trying to keep (retryPause x maxRetries) duration to fall within one output GOP length @@ -181,6 +183,10 @@ func startTrickleSubscribe(ctx context.Context, url *url.URL, params aiRequestPa params.liveParams.stopPipeline(fmt.Errorf("trickle subscribe error copying: %w", err)) return } + if monitor.Enabled && firstSegment { + firstSegment = false + monitor.AIFirstSegmentDelay(time.Since(startTime).Milliseconds()) + } clog.V(8).Infof(ctx, "trickle subscribe read data completed seq=%d bytes=%s", seq, humanize.Bytes(uint64(n))) } }() diff --git a/server/ai_process.go b/server/ai_process.go index ea15bd43e..5a33117e6 100644 --- a/server/ai_process.go +++ b/server/ai_process.go @@ -1026,6 +1026,7 @@ func submitAudioToText(ctx context.Context, params aiRequestParams, sess *AISess const initPixelsToPay = 30 * 30 * 3200 * 1800 // 30 seconds, 30fps, 1800p func submitLiveVideoToVideo(ctx context.Context, params aiRequestParams, sess *AISession, req worker.GenLiveVideoToVideoJSONRequestBody) (any, error) { + startTime := time.Now() // Live Video should not reuse the existing session balance, because it could lead to not sending the init // payment, which in turns may cause "Insufficient Balance" on the Orchestrator's side. // It works differently than other AI Jobs, because Live Video is accounted by mid on the Orchestrator's side. @@ -1077,7 +1078,7 @@ func submitLiveVideoToVideo(ctx context.Context, params aiRequestParams, sess *A startControlPublish(control, params) startTricklePublish(ctx, pub, params, sess) - startTrickleSubscribe(ctx, sub, params) + startTrickleSubscribe(ctx, sub, params, startTime) startEventsSubscribe(ctx, events, params, sess) return resp, nil }