Skip to content

Commit

Permalink
[ENG-2342] unit and fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
pwilczynskiclearcode committed Jan 8, 2025
1 parent 6d8b899 commit ee71e3b
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 9 deletions.
21 changes: 16 additions & 5 deletions monitor/census.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ func InitCensus(nodeType NodeType, version string) {
census.mAIResultUploadTime = stats.Float64("ai_result_upload_time_seconds", "Upload (to Orchestrator) time", "sec")
census.mAIResultSaveFailed = stats.Int64("ai_result_upload_failed_total", "AIResultUploadFailed", "tot")
census.mAICurrentLivePipelines = stats.Int64("ai_current_live_pipelines", "Number of live AI pipelines currently running", "tot")
census.mAIFirstSegmentDelay = stats.Int64("ai_first_segment_delay", "Delay of the first live AI segment being processed", "ms")
census.mAIFirstSegmentDelay = stats.Int64("ai_first_segment_delay_ms", "Delay of the first live AI segment being processed", "ms")

glog.Infof("Compiler: %s Arch %s OS %s Go version %s", runtime.Compiler, runtime.GOARCH, runtime.GOOS, runtime.Version())
glog.Infof("Livepeer version: %s", version)
Expand Down Expand Up @@ -985,10 +985,10 @@ func InitCensus(nodeType NodeType, version string) {
Aggregation: view.LastValue(),
},
{
Name: "ai_first_segment_delay",
Name: "ai_first_segment_delay_ms",
Measure: census.mAIFirstSegmentDelay,
Description: "Delay of the first live AI segment being processed",
TagKeys: baseTags,
TagKeys: baseTagsWithOrchInfo,
Aggregation: view.Distribution(0, .10, .20, .50, .100, .150, .200, .500, .1000, .5000, 10.000),
},
}
Expand Down Expand Up @@ -1996,8 +1996,19 @@ func AIResultDownloaded(ctx context.Context, pipeline string, model string, down
}
}

func AIFirstSegmentDelay(delay int64) {
stats.Record(census.ctx, census.mAIFirstSegmentDelay.M(delay))
func AIFirstSegmentDelay(delay int64, orchInfo *lpnet.OrchestratorInfo) {
orchAddr := ""
if addr := orchInfo.GetAddress(); addr != nil {
orchAddr = common.BytesToAddress(addr).String()
}
tags := []tag.Mutator{tag.Insert(census.kOrchestratorURI, orchInfo.GetTranscoder()), tag.Insert(census.kOrchestratorAddress, orchAddr)}
capabilities := orchInfo.GetCapabilities()
if capabilities != nil {
tags = append(tags, tag.Insert(census.kOrchestratorVersion, orchInfo.GetCapabilities().GetVersion()))
}
if err := stats.RecordWithTags(census.ctx, tags, census.mAIFirstSegmentDelay.M(delay)); err != nil {
glog.Errorf("Error recording metrics err=%q", err)
}
}

// Convert wei to gwei
Expand Down
8 changes: 5 additions & 3 deletions server/ai_live_video.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
"github.com/livepeer/go-livepeer/core"
"github.com/livepeer/go-livepeer/media"
"github.com/livepeer/go-livepeer/monitor"
"github.com/livepeer/go-livepeer/net"
"github.com/livepeer/go-livepeer/trickle"

"github.com/livepeer/lpms/ffmpeg"

"github.com/dustin/go-humanize"
Expand Down Expand Up @@ -123,7 +125,7 @@ func startTricklePublish(ctx context.Context, url *url.URL, params aiRequestPara
clog.Infof(ctx, "trickle pub")
}

func startTrickleSubscribe(ctx context.Context, url *url.URL, params aiRequestParams, startTime time.Time) {
func startTrickleSubscribe(ctx context.Context, url *url.URL, params aiRequestParams, startTime time.Time, orchInfo *net.OrchestratorInfo) {
// subscribe to the outputs and send them into LPMS
subscriber := trickle.NewTrickleSubscriber(url.String())
r, w, err := os.Pipe()
Expand All @@ -137,7 +139,7 @@ func startTrickleSubscribe(ctx context.Context, url *url.URL, params aiRequestPa
// read segments from trickle subscription
go func() {
var err error
firstSegment := false
firstSegment := true

defer w.Close()
retries := 0
Expand Down Expand Up @@ -185,7 +187,7 @@ func startTrickleSubscribe(ctx context.Context, url *url.URL, params aiRequestPa
}
if monitor.Enabled && firstSegment {
firstSegment = false
monitor.AIFirstSegmentDelay(time.Since(startTime).Milliseconds())
monitor.AIFirstSegmentDelay(time.Since(startTime).Milliseconds(), orchInfo)
}
clog.V(8).Infof(ctx, "trickle subscribe read data completed seq=%d bytes=%s", seq, humanize.Bytes(uint64(n)))
}
Expand Down
2 changes: 1 addition & 1 deletion server/ai_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -1078,7 +1078,7 @@ func submitLiveVideoToVideo(ctx context.Context, params aiRequestParams, sess *A

startControlPublish(control, params)
startTricklePublish(ctx, pub, params, sess)
startTrickleSubscribe(ctx, sub, params, startTime)
startTrickleSubscribe(ctx, sub, params, startTime, sess.OrchestratorInfo)
startEventsSubscribe(ctx, events, params, sess)
return resp, nil
}
Expand Down

0 comments on commit ee71e3b

Please sign in to comment.