Skip to content

Commit

Permalink
[ENG-2342] refactor census
Browse files Browse the repository at this point in the history
  • Loading branch information
pwilczynskiclearcode committed Jan 8, 2025
1 parent 48540cf commit 9e290c1
Showing 1 changed file with 16 additions and 35 deletions.
51 changes: 16 additions & 35 deletions monitor/census.go
Original file line number Diff line number Diff line change
Expand Up @@ -1043,20 +1043,21 @@ func manifestIDTag(ctx context.Context, others ...tag.Mutator) []tag.Mutator {
return others
}

func manifestIDTagAndOrchInfo(orchInfo *lpnet.OrchestratorInfo, ctx context.Context, others ...tag.Mutator) []tag.Mutator {
others = manifestIDTag(ctx, others...)

others = append(
others,
tag.Insert(census.kOrchestratorURI, orchInfo.GetTranscoder()),
tag.Insert(census.kOrchestratorAddress, common.BytesToAddress(orchInfo.GetAddress()).String()),
)
func orchInfoTags(orchInfo *lpnet.OrchestratorInfo) []tag.Mutator {
orchAddr := ""
if addr := orchInfo.GetAddress(); addr != nil {
orchAddr = common.BytesToAddress(addr).String()
}

Check warning on line 1050 in monitor/census.go

View check run for this annotation

Codecov / codecov/patch

monitor/census.go#L1049-L1050

Added lines #L1049 - L1050 were not covered by tests
tags := []tag.Mutator{tag.Insert(census.kOrchestratorURI, orchInfo.GetTranscoder()), tag.Insert(census.kOrchestratorAddress, orchAddr)}
capabilities := orchInfo.GetCapabilities()
if capabilities != nil {
others = append(others, tag.Insert(census.kOrchestratorVersion, capabilities.Version))
tags = append(tags, tag.Insert(census.kOrchestratorVersion, orchInfo.GetCapabilities().GetVersion()))

Check warning on line 1054 in monitor/census.go

View check run for this annotation

Codecov / codecov/patch

monitor/census.go#L1054

Added line #L1054 was not covered by tests
}
return tags
}

return others
func manifestIDTagAndOrchInfo(orchInfo *lpnet.OrchestratorInfo, ctx context.Context, others ...tag.Mutator) []tag.Mutator {
return append(manifestIDTag(ctx, others...), orchInfoTags(orchInfo)...)
}

func manifestIDTagStr(manifestID string, others ...tag.Mutator) []tag.Mutator {
Expand Down Expand Up @@ -1879,11 +1880,8 @@ func (cen *censusMetricsCounter) recordAIRequestLatencyScore(pipeline string, Mo
cen.lock.Lock()
defer cen.lock.Unlock()

tags := []tag.Mutator{tag.Insert(cen.kPipeline, pipeline), tag.Insert(cen.kModelName, Model), tag.Insert(cen.kOrchestratorURI, orchInfo.GetTranscoder()), tag.Insert(cen.kOrchestratorAddress, common.BytesToAddress(orchInfo.GetAddress()).String())}
capabilities := orchInfo.GetCapabilities()
if capabilities != nil {
tags = append(tags, tag.Insert(cen.kOrchestratorVersion, orchInfo.GetCapabilities().GetVersion()))
}
tags := []tag.Mutator{tag.Insert(cen.kPipeline, pipeline), tag.Insert(cen.kModelName, Model)}
tags = append(tags, orchInfoTags(orchInfo)...)

Check warning on line 1884 in monitor/census.go

View check run for this annotation

Codecov / codecov/patch

monitor/census.go#L1883-L1884

Added lines #L1883 - L1884 were not covered by tests

if err := stats.RecordWithTags(cen.ctx, tags, cen.mAIRequestLatencyScore.M(latencyScore)); err != nil {
glog.Errorf("Error recording metrics err=%q", err)
Expand All @@ -1904,16 +1902,8 @@ func (cen *censusMetricsCounter) recordAIRequestPricePerUnit(pipeline string, Mo

// AIRequestError logs an error in a gateway AI job request.
func AIRequestError(code string, pipeline string, model string, orchInfo *lpnet.OrchestratorInfo) {
orchAddr := ""
if addr := orchInfo.GetAddress(); addr != nil {
orchAddr = common.BytesToAddress(addr).String()
}

tags := []tag.Mutator{tag.Insert(census.kErrorCode, code), tag.Insert(census.kPipeline, pipeline), tag.Insert(census.kModelName, model), tag.Insert(census.kOrchestratorURI, orchInfo.GetTranscoder()), tag.Insert(census.kOrchestratorAddress, orchAddr)}
capabilities := orchInfo.GetCapabilities()
if capabilities != nil {
tags = append(tags, tag.Insert(census.kOrchestratorVersion, orchInfo.GetCapabilities().GetVersion()))
}
tags := []tag.Mutator{tag.Insert(census.kErrorCode, code), tag.Insert(census.kPipeline, pipeline), tag.Insert(census.kModelName, model)}
tags = append(tags, orchInfoTags(orchInfo)...)

Check warning on line 1906 in monitor/census.go

View check run for this annotation

Codecov / codecov/patch

monitor/census.go#L1905-L1906

Added lines #L1905 - L1906 were not covered by tests

if err := stats.RecordWithTags(census.ctx, tags, census.mAIRequestError.M(1)); err != nil {
glog.Errorf("Error recording metrics err=%q", err)
Expand Down Expand Up @@ -1997,16 +1987,7 @@ func AIResultDownloaded(ctx context.Context, pipeline string, model string, down
}

func AIFirstSegmentDelay(delay int64, orchInfo *lpnet.OrchestratorInfo) {
orchAddr := ""
if addr := orchInfo.GetAddress(); addr != nil {
orchAddr = common.BytesToAddress(addr).String()
}
tags := []tag.Mutator{tag.Insert(census.kOrchestratorURI, orchInfo.GetTranscoder()), tag.Insert(census.kOrchestratorAddress, orchAddr)}
capabilities := orchInfo.GetCapabilities()
if capabilities != nil {
tags = append(tags, tag.Insert(census.kOrchestratorVersion, orchInfo.GetCapabilities().GetVersion()))
}
if err := stats.RecordWithTags(census.ctx, tags, census.mAIFirstSegmentDelay.M(delay)); err != nil {
if err := stats.RecordWithTags(census.ctx, orchInfoTags(orchInfo), census.mAIFirstSegmentDelay.M(delay)); err != nil {
glog.Errorf("Error recording metrics err=%q", err)
}

Check warning on line 1992 in monitor/census.go

View check run for this annotation

Codecov / codecov/patch

monitor/census.go#L1989-L1992

Added lines #L1989 - L1992 were not covered by tests
}
Expand Down

0 comments on commit 9e290c1

Please sign in to comment.