Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENG-2342] ai_first_segment_delay metric added #3341

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 30 additions & 25 deletions monitor/census.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@
mAIResultUploadTime *stats.Float64Measure
mAIResultSaveFailed *stats.Int64Measure
mAICurrentLivePipelines *stats.Int64Measure
mAIFirstSegmentDelay *stats.Int64Measure

lock sync.Mutex
emergeTimes map[uint64]map[uint64]time.Time // nonce:seqNo
Expand Down Expand Up @@ -375,6 +376,7 @@
census.mAIResultUploadTime = stats.Float64("ai_result_upload_time_seconds", "Upload (to Orchestrator) time", "sec")
census.mAIResultSaveFailed = stats.Int64("ai_result_upload_failed_total", "AIResultUploadFailed", "tot")
census.mAICurrentLivePipelines = stats.Int64("ai_current_live_pipelines", "Number of live AI pipelines currently running", "tot")
census.mAIFirstSegmentDelay = stats.Int64("ai_first_segment_delay_ms", "Delay of the first live AI segment being processed", "ms")

glog.Infof("Compiler: %s Arch %s OS %s Go version %s", runtime.Compiler, runtime.GOARCH, runtime.GOOS, runtime.Version())
glog.Infof("Livepeer version: %s", version)
Expand Down Expand Up @@ -982,6 +984,13 @@
TagKeys: append([]tag.Key{census.kOrchestratorURI, census.kPipeline, census.kModelName}, baseTags...),
Aggregation: view.LastValue(),
},
{
Name: "ai_first_segment_delay_ms",
Measure: census.mAIFirstSegmentDelay,
Description: "Delay of the first live AI segment being processed",
TagKeys: baseTagsWithOrchInfo,
Aggregation: view.Distribution(0, .10, .20, .50, .100, .150, .200, .500, .1000, .5000, 10.000),
},
}

// Register the views
Expand Down Expand Up @@ -1034,20 +1043,21 @@
return others
}

func manifestIDTagAndOrchInfo(orchInfo *lpnet.OrchestratorInfo, ctx context.Context, others ...tag.Mutator) []tag.Mutator {
others = manifestIDTag(ctx, others...)

others = append(
others,
tag.Insert(census.kOrchestratorURI, orchInfo.GetTranscoder()),
tag.Insert(census.kOrchestratorAddress, common.BytesToAddress(orchInfo.GetAddress()).String()),
)
func orchInfoTags(orchInfo *lpnet.OrchestratorInfo) []tag.Mutator {
orchAddr := ""
if addr := orchInfo.GetAddress(); addr != nil {
orchAddr = common.BytesToAddress(addr).String()
}

Check warning on line 1050 in monitor/census.go

View check run for this annotation

Codecov / codecov/patch

monitor/census.go#L1049-L1050

Added lines #L1049 - L1050 were not covered by tests
tags := []tag.Mutator{tag.Insert(census.kOrchestratorURI, orchInfo.GetTranscoder()), tag.Insert(census.kOrchestratorAddress, orchAddr)}
capabilities := orchInfo.GetCapabilities()
if capabilities != nil {
others = append(others, tag.Insert(census.kOrchestratorVersion, capabilities.Version))
tags = append(tags, tag.Insert(census.kOrchestratorVersion, capabilities.GetVersion()))

Check warning on line 1054 in monitor/census.go

View check run for this annotation

Codecov / codecov/patch

monitor/census.go#L1054

Added line #L1054 was not covered by tests
}
return tags
}

return others
func manifestIDTagAndOrchInfo(orchInfo *lpnet.OrchestratorInfo, ctx context.Context, others ...tag.Mutator) []tag.Mutator {
return append(manifestIDTag(ctx, others...), orchInfoTags(orchInfo)...)
}

func manifestIDTagStr(manifestID string, others ...tag.Mutator) []tag.Mutator {
Expand Down Expand Up @@ -1870,11 +1880,8 @@
cen.lock.Lock()
defer cen.lock.Unlock()

tags := []tag.Mutator{tag.Insert(cen.kPipeline, pipeline), tag.Insert(cen.kModelName, Model), tag.Insert(cen.kOrchestratorURI, orchInfo.GetTranscoder()), tag.Insert(cen.kOrchestratorAddress, common.BytesToAddress(orchInfo.GetAddress()).String())}
capabilities := orchInfo.GetCapabilities()
if capabilities != nil {
tags = append(tags, tag.Insert(cen.kOrchestratorVersion, orchInfo.GetCapabilities().GetVersion()))
}
tags := []tag.Mutator{tag.Insert(cen.kPipeline, pipeline), tag.Insert(cen.kModelName, Model)}
tags = append(tags, orchInfoTags(orchInfo)...)

Check warning on line 1884 in monitor/census.go

View check run for this annotation

Codecov / codecov/patch

monitor/census.go#L1883-L1884

Added lines #L1883 - L1884 were not covered by tests

if err := stats.RecordWithTags(cen.ctx, tags, cen.mAIRequestLatencyScore.M(latencyScore)); err != nil {
glog.Errorf("Error recording metrics err=%q", err)
Expand All @@ -1895,16 +1902,8 @@

// AIRequestError logs an error in a gateway AI job request.
func AIRequestError(code string, pipeline string, model string, orchInfo *lpnet.OrchestratorInfo) {
orchAddr := ""
if addr := orchInfo.GetAddress(); addr != nil {
orchAddr = common.BytesToAddress(addr).String()
}

tags := []tag.Mutator{tag.Insert(census.kErrorCode, code), tag.Insert(census.kPipeline, pipeline), tag.Insert(census.kModelName, model), tag.Insert(census.kOrchestratorURI, orchInfo.GetTranscoder()), tag.Insert(census.kOrchestratorAddress, orchAddr)}
capabilities := orchInfo.GetCapabilities()
if capabilities != nil {
tags = append(tags, tag.Insert(census.kOrchestratorVersion, orchInfo.GetCapabilities().GetVersion()))
}
tags := []tag.Mutator{tag.Insert(census.kErrorCode, code), tag.Insert(census.kPipeline, pipeline), tag.Insert(census.kModelName, model)}
tags = append(tags, orchInfoTags(orchInfo)...)

Check warning on line 1906 in monitor/census.go

View check run for this annotation

Codecov / codecov/patch

monitor/census.go#L1905-L1906

Added lines #L1905 - L1906 were not covered by tests

if err := stats.RecordWithTags(census.ctx, tags, census.mAIRequestError.M(1)); err != nil {
glog.Errorf("Error recording metrics err=%q", err)
Expand Down Expand Up @@ -1987,6 +1986,12 @@
}
}

func AIFirstSegmentDelay(delayMs int64, orchInfo *lpnet.OrchestratorInfo) {
if err := stats.RecordWithTags(census.ctx, orchInfoTags(orchInfo), census.mAIFirstSegmentDelay.M(delayMs)); err != nil {
glog.Errorf("Error recording metrics err=%q", err)
}

Check warning on line 1992 in monitor/census.go

View check run for this annotation

Codecov / codecov/patch

monitor/census.go#L1989-L1992

Added lines #L1989 - L1992 were not covered by tests
}

// Convert wei to gwei
func wei2gwei(wei *big.Int) float64 {
gwei, _ := new(big.Float).Quo(new(big.Float).SetInt(wei), big.NewFloat(float64(gweiConversionFactor))).Float64()
Expand Down
9 changes: 8 additions & 1 deletion server/ai_live_video.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"github.com/livepeer/go-livepeer/media"
"github.com/livepeer/go-livepeer/monitor"
"github.com/livepeer/go-livepeer/trickle"

"github.com/livepeer/lpms/ffmpeg"

"github.com/dustin/go-humanize"
Expand Down Expand Up @@ -123,7 +124,7 @@
clog.Infof(ctx, "trickle pub")
}

func startTrickleSubscribe(ctx context.Context, url *url.URL, params aiRequestParams) {
func startTrickleSubscribe(ctx context.Context, url *url.URL, params aiRequestParams, onFistSegment func()) {

Check warning on line 127 in server/ai_live_video.go

View check run for this annotation

Codecov / codecov/patch

server/ai_live_video.go#L127

Added line #L127 was not covered by tests
// subscribe to the outputs and send them into LPMS
subscriber := trickle.NewTrickleSubscriber(url.String())
r, w, err := os.Pipe()
Expand All @@ -137,6 +138,8 @@
// read segments from trickle subscription
go func() {
var err error
firstSegment := true

Check warning on line 142 in server/ai_live_video.go

View check run for this annotation

Codecov / codecov/patch

server/ai_live_video.go#L141-L142

Added lines #L141 - L142 were not covered by tests
defer w.Close()
retries := 0
// we're trying to keep (retryPause x maxRetries) duration to fall within one output GOP length
Expand Down Expand Up @@ -181,6 +184,10 @@
params.liveParams.stopPipeline(fmt.Errorf("trickle subscribe error copying: %w", err))
return
}
if firstSegment {
firstSegment = false
onFistSegment()
}

Check warning on line 190 in server/ai_live_video.go

View check run for this annotation

Codecov / codecov/patch

server/ai_live_video.go#L187-L190

Added lines #L187 - L190 were not covered by tests
clog.V(8).Infof(ctx, "trickle subscribe read data completed seq=%d bytes=%s", seq, humanize.Bytes(uint64(n)))
}
}()
Expand Down
7 changes: 6 additions & 1 deletion server/ai_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -1026,6 +1026,7 @@
const initPixelsToPay = 30 * 30 * 3200 * 1800 // 30 seconds, 30fps, 1800p

func submitLiveVideoToVideo(ctx context.Context, params aiRequestParams, sess *AISession, req worker.GenLiveVideoToVideoJSONRequestBody) (any, error) {
startTime := time.Now()

Check warning on line 1029 in server/ai_process.go

View check run for this annotation

Codecov / codecov/patch

server/ai_process.go#L1029

Added line #L1029 was not covered by tests
// Live Video should not reuse the existing session balance, because it could lead to not sending the init
// payment, which in turns may cause "Insufficient Balance" on the Orchestrator's side.
// It works differently than other AI Jobs, because Live Video is accounted by mid on the Orchestrator's side.
Expand Down Expand Up @@ -1077,7 +1078,11 @@

startControlPublish(control, params)
startTricklePublish(ctx, pub, params, sess)
startTrickleSubscribe(ctx, sub, params)
startTrickleSubscribe(ctx, sub, params, func() {
if monitor.Enabled {
monitor.AIFirstSegmentDelay(time.Since(startTime).Milliseconds(), sess.OrchestratorInfo)
}

Check warning on line 1084 in server/ai_process.go

View check run for this annotation

Codecov / codecov/patch

server/ai_process.go#L1081-L1084

Added lines #L1081 - L1084 were not covered by tests
})
startEventsSubscribe(ctx, events, params, sess)
return resp, nil
}
Expand Down
Loading