diff --git a/monitor/census.go b/monitor/census.go index 0993a7f74..e5587c0e3 100644 --- a/monitor/census.go +++ b/monitor/census.go @@ -205,6 +205,7 @@ type ( mAIResultUploadTime *stats.Float64Measure mAIResultSaveFailed *stats.Int64Measure mAICurrentLivePipelines *stats.Int64Measure + mAIFirstSegmentDelay *stats.Int64Measure lock sync.Mutex emergeTimes map[uint64]map[uint64]time.Time // nonce:seqNo @@ -375,6 +376,7 @@ func InitCensus(nodeType NodeType, version string) { census.mAIResultUploadTime = stats.Float64("ai_result_upload_time_seconds", "Upload (to Orchestrator) time", "sec") census.mAIResultSaveFailed = stats.Int64("ai_result_upload_failed_total", "AIResultUploadFailed", "tot") census.mAICurrentLivePipelines = stats.Int64("ai_current_live_pipelines", "Number of live AI pipelines currently running", "tot") + census.mAIFirstSegmentDelay = stats.Int64("ai_first_segment_delay_ms", "Delay of the first live AI segment being processed", "ms") glog.Infof("Compiler: %s Arch %s OS %s Go version %s", runtime.Compiler, runtime.GOARCH, runtime.GOOS, runtime.Version()) glog.Infof("Livepeer version: %s", version) @@ -982,6 +984,13 @@ func InitCensus(nodeType NodeType, version string) { TagKeys: append([]tag.Key{census.kOrchestratorURI, census.kPipeline, census.kModelName}, baseTags...), Aggregation: view.LastValue(), }, + { + Name: "ai_first_segment_delay_ms", + Measure: census.mAIFirstSegmentDelay, + Description: "Delay of the first live AI segment being processed", + TagKeys: baseTagsWithOrchInfo, + Aggregation: view.Distribution(0, .10, .20, .50, .100, .150, .200, .500, .1000, .5000, 10.000), + }, } // Register the views @@ -1034,20 +1043,21 @@ func manifestIDTag(ctx context.Context, others ...tag.Mutator) []tag.Mutator { return others } -func manifestIDTagAndOrchInfo(orchInfo *lpnet.OrchestratorInfo, ctx context.Context, others ...tag.Mutator) []tag.Mutator { - others = manifestIDTag(ctx, others...) - - others = append( - others, - tag.Insert(census.kOrchestratorURI, orchInfo.GetTranscoder()), - tag.Insert(census.kOrchestratorAddress, common.BytesToAddress(orchInfo.GetAddress()).String()), - ) +func orchInfoTags(orchInfo *lpnet.OrchestratorInfo) []tag.Mutator { + orchAddr := "" + if addr := orchInfo.GetAddress(); addr != nil { + orchAddr = common.BytesToAddress(addr).String() + } + tags := []tag.Mutator{tag.Insert(census.kOrchestratorURI, orchInfo.GetTranscoder()), tag.Insert(census.kOrchestratorAddress, orchAddr)} capabilities := orchInfo.GetCapabilities() if capabilities != nil { - others = append(others, tag.Insert(census.kOrchestratorVersion, capabilities.Version)) + tags = append(tags, tag.Insert(census.kOrchestratorVersion, capabilities.GetVersion())) } + return tags +} - return others +func manifestIDTagAndOrchInfo(orchInfo *lpnet.OrchestratorInfo, ctx context.Context, others ...tag.Mutator) []tag.Mutator { + return append(manifestIDTag(ctx, others...), orchInfoTags(orchInfo)...) } func manifestIDTagStr(manifestID string, others ...tag.Mutator) []tag.Mutator { @@ -1870,11 +1880,8 @@ func (cen *censusMetricsCounter) recordAIRequestLatencyScore(pipeline string, Mo cen.lock.Lock() defer cen.lock.Unlock() - tags := []tag.Mutator{tag.Insert(cen.kPipeline, pipeline), tag.Insert(cen.kModelName, Model), tag.Insert(cen.kOrchestratorURI, orchInfo.GetTranscoder()), tag.Insert(cen.kOrchestratorAddress, common.BytesToAddress(orchInfo.GetAddress()).String())} - capabilities := orchInfo.GetCapabilities() - if capabilities != nil { - tags = append(tags, tag.Insert(cen.kOrchestratorVersion, orchInfo.GetCapabilities().GetVersion())) - } + tags := []tag.Mutator{tag.Insert(cen.kPipeline, pipeline), tag.Insert(cen.kModelName, Model)} + tags = append(tags, orchInfoTags(orchInfo)...) if err := stats.RecordWithTags(cen.ctx, tags, cen.mAIRequestLatencyScore.M(latencyScore)); err != nil { glog.Errorf("Error recording metrics err=%q", err) @@ -1895,16 +1902,8 @@ func (cen *censusMetricsCounter) recordAIRequestPricePerUnit(pipeline string, Mo // AIRequestError logs an error in a gateway AI job request. func AIRequestError(code string, pipeline string, model string, orchInfo *lpnet.OrchestratorInfo) { - orchAddr := "" - if addr := orchInfo.GetAddress(); addr != nil { - orchAddr = common.BytesToAddress(addr).String() - } - - tags := []tag.Mutator{tag.Insert(census.kErrorCode, code), tag.Insert(census.kPipeline, pipeline), tag.Insert(census.kModelName, model), tag.Insert(census.kOrchestratorURI, orchInfo.GetTranscoder()), tag.Insert(census.kOrchestratorAddress, orchAddr)} - capabilities := orchInfo.GetCapabilities() - if capabilities != nil { - tags = append(tags, tag.Insert(census.kOrchestratorVersion, orchInfo.GetCapabilities().GetVersion())) - } + tags := []tag.Mutator{tag.Insert(census.kErrorCode, code), tag.Insert(census.kPipeline, pipeline), tag.Insert(census.kModelName, model)} + tags = append(tags, orchInfoTags(orchInfo)...) if err := stats.RecordWithTags(census.ctx, tags, census.mAIRequestError.M(1)); err != nil { glog.Errorf("Error recording metrics err=%q", err) @@ -1987,6 +1986,12 @@ func AIResultDownloaded(ctx context.Context, pipeline string, model string, down } } +func AIFirstSegmentDelay(delayMs int64, orchInfo *lpnet.OrchestratorInfo) { + if err := stats.RecordWithTags(census.ctx, orchInfoTags(orchInfo), census.mAIFirstSegmentDelay.M(delayMs)); err != nil { + glog.Errorf("Error recording metrics err=%q", err) + } +} + // Convert wei to gwei func wei2gwei(wei *big.Int) float64 { gwei, _ := new(big.Float).Quo(new(big.Float).SetInt(wei), big.NewFloat(float64(gweiConversionFactor))).Float64() diff --git a/server/ai_live_video.go b/server/ai_live_video.go index e5eb04d71..c08ce3ce8 100644 --- a/server/ai_live_video.go +++ b/server/ai_live_video.go @@ -20,6 +20,7 @@ import ( "github.com/livepeer/go-livepeer/media" "github.com/livepeer/go-livepeer/monitor" "github.com/livepeer/go-livepeer/trickle" + "github.com/livepeer/lpms/ffmpeg" "github.com/dustin/go-humanize" @@ -123,7 +124,7 @@ func startTricklePublish(ctx context.Context, url *url.URL, params aiRequestPara clog.Infof(ctx, "trickle pub") } -func startTrickleSubscribe(ctx context.Context, url *url.URL, params aiRequestParams) { +func startTrickleSubscribe(ctx context.Context, url *url.URL, params aiRequestParams, onFistSegment func()) { // subscribe to the outputs and send them into LPMS subscriber := trickle.NewTrickleSubscriber(url.String()) r, w, err := os.Pipe() @@ -137,6 +138,8 @@ func startTrickleSubscribe(ctx context.Context, url *url.URL, params aiRequestPa // read segments from trickle subscription go func() { var err error + firstSegment := true + defer w.Close() retries := 0 // we're trying to keep (retryPause x maxRetries) duration to fall within one output GOP length @@ -181,6 +184,10 @@ func startTrickleSubscribe(ctx context.Context, url *url.URL, params aiRequestPa params.liveParams.stopPipeline(fmt.Errorf("trickle subscribe error copying: %w", err)) return } + if firstSegment { + firstSegment = false + onFistSegment() + } clog.V(8).Infof(ctx, "trickle subscribe read data completed seq=%d bytes=%s", seq, humanize.Bytes(uint64(n))) } }() diff --git a/server/ai_process.go b/server/ai_process.go index ea15bd43e..384cadf20 100644 --- a/server/ai_process.go +++ b/server/ai_process.go @@ -1026,6 +1026,7 @@ func submitAudioToText(ctx context.Context, params aiRequestParams, sess *AISess const initPixelsToPay = 30 * 30 * 3200 * 1800 // 30 seconds, 30fps, 1800p func submitLiveVideoToVideo(ctx context.Context, params aiRequestParams, sess *AISession, req worker.GenLiveVideoToVideoJSONRequestBody) (any, error) { + startTime := time.Now() // Live Video should not reuse the existing session balance, because it could lead to not sending the init // payment, which in turns may cause "Insufficient Balance" on the Orchestrator's side. // It works differently than other AI Jobs, because Live Video is accounted by mid on the Orchestrator's side. @@ -1077,7 +1078,11 @@ func submitLiveVideoToVideo(ctx context.Context, params aiRequestParams, sess *A startControlPublish(control, params) startTricklePublish(ctx, pub, params, sess) - startTrickleSubscribe(ctx, sub, params) + startTrickleSubscribe(ctx, sub, params, func() { + if monitor.Enabled { + monitor.AIFirstSegmentDelay(time.Since(startTime).Milliseconds(), sess.OrchestratorInfo) + } + }) startEventsSubscribe(ctx, events, params, sess) return resp, nil }