Skip to content

Commit

Permalink
[ENG-2342] ai_first_segment_delay metric added
Browse files Browse the repository at this point in the history
  • Loading branch information
pwilczynskiclearcode committed Jan 7, 2025
1 parent cfc7b6e commit 139f2c8
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 2 deletions.
13 changes: 13 additions & 0 deletions monitor/census.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ type (
mAIResultUploadTime *stats.Float64Measure
mAIResultSaveFailed *stats.Int64Measure
mAICurrentLivePipelines *stats.Int64Measure
mAIFirstSegmentDelay *stats.Int64Measure

lock sync.Mutex
emergeTimes map[uint64]map[uint64]time.Time // nonce:seqNo
Expand Down Expand Up @@ -375,6 +376,7 @@ func InitCensus(nodeType NodeType, version string) {
census.mAIResultUploadTime = stats.Float64("ai_result_upload_time_seconds", "Upload (to Orchestrator) time", "sec")
census.mAIResultSaveFailed = stats.Int64("ai_result_upload_failed_total", "AIResultUploadFailed", "tot")
census.mAICurrentLivePipelines = stats.Int64("ai_current_live_pipelines", "Number of live AI pipelines currently running", "tot")
census.mAIFirstSegmentDelay = stats.Int64("ai_first_segment_delay", "Delay to the first AI segment being processed", "ms")

glog.Infof("Compiler: %s Arch %s OS %s Go version %s", runtime.Compiler, runtime.GOARCH, runtime.GOOS, runtime.Version())
glog.Infof("Livepeer version: %s", version)
Expand Down Expand Up @@ -982,6 +984,13 @@ func InitCensus(nodeType NodeType, version string) {
TagKeys: append([]tag.Key{census.kOrchestratorURI, census.kPipeline, census.kModelName}, baseTags...),
Aggregation: view.LastValue(),
},
{
Name: "ai_first_segment_delay",
Measure: census.mAIFirstSegmentDelay,
Description: "Delay to the first AI segment being processed",
TagKeys: baseTags,
Aggregation: view.Distribution(0, .10, .20, .50, .100, .150, .200, .500, .1000, .5000, 10.000),
},
}

// Register the views
Expand Down Expand Up @@ -1987,6 +1996,10 @@ func AIResultDownloaded(ctx context.Context, pipeline string, model string, down
}
}

func AIFirstSegmentDelay(delay int64) {
stats.Record(census.ctx, census.mAIFirstSegmentDelay.M(delay))
}

// Convert wei to gwei
func wei2gwei(wei *big.Int) float64 {
gwei, _ := new(big.Float).Quo(new(big.Float).SetInt(wei), big.NewFloat(float64(gweiConversionFactor))).Float64()
Expand Down
8 changes: 7 additions & 1 deletion server/ai_live_video.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func startTricklePublish(ctx context.Context, url *url.URL, params aiRequestPara
clog.Infof(ctx, "trickle pub")
}

func startTrickleSubscribe(ctx context.Context, url *url.URL, params aiRequestParams) {
func startTrickleSubscribe(ctx context.Context, url *url.URL, params aiRequestParams, startTime time.Time) {
// subscribe to the outputs and send them into LPMS
subscriber := trickle.NewTrickleSubscriber(url.String())
r, w, err := os.Pipe()
Expand All @@ -137,6 +137,8 @@ func startTrickleSubscribe(ctx context.Context, url *url.URL, params aiRequestPa
// read segments from trickle subscription
go func() {
var err error
firstSegment := false

defer w.Close()
retries := 0
// we're trying to keep (retryPause x maxRetries) duration to fall within one output GOP length
Expand Down Expand Up @@ -181,6 +183,10 @@ func startTrickleSubscribe(ctx context.Context, url *url.URL, params aiRequestPa
params.liveParams.stopPipeline(fmt.Errorf("trickle subscribe error copying: %w", err))
return
}
if monitor.Enabled && firstSegment {
firstSegment = false
monitor.AIFirstSegmentDelay(time.Since(startTime).Milliseconds())
}
clog.V(8).Infof(ctx, "trickle subscribe read data completed seq=%d bytes=%s", seq, humanize.Bytes(uint64(n)))
}
}()
Expand Down
3 changes: 2 additions & 1 deletion server/ai_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -1026,6 +1026,7 @@ func submitAudioToText(ctx context.Context, params aiRequestParams, sess *AISess
const initPixelsToPay = 30 * 30 * 3200 * 1800 // 30 seconds, 30fps, 1800p

func submitLiveVideoToVideo(ctx context.Context, params aiRequestParams, sess *AISession, req worker.GenLiveVideoToVideoJSONRequestBody) (any, error) {
startTime := time.Now()
// Live Video should not reuse the existing session balance, because it could lead to not sending the init
// payment, which in turns may cause "Insufficient Balance" on the Orchestrator's side.
// It works differently than other AI Jobs, because Live Video is accounted by mid on the Orchestrator's side.
Expand Down Expand Up @@ -1077,7 +1078,7 @@ func submitLiveVideoToVideo(ctx context.Context, params aiRequestParams, sess *A

startControlPublish(control, params)
startTricklePublish(ctx, pub, params, sess)
startTrickleSubscribe(ctx, sub, params)
startTrickleSubscribe(ctx, sub, params, startTime)
startEventsSubscribe(ctx, events, params, sess)
return resp, nil
}
Expand Down

0 comments on commit 139f2c8

Please sign in to comment.