Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stats/opentelemetry: Introduce Tracing API #7852

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 73 additions & 19 deletions stats/opentelemetry/client_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@ package opentelemetry

import (
"context"
"strings"
"sync/atomic"
"time"

"go.opentelemetry.io/otel"
otelcodes "go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
grpccodes "google.golang.org/grpc/codes"
estats "google.golang.org/grpc/experimental/stats"
istats "google.golang.org/grpc/internal/stats"
"google.golang.org/grpc/metadata"
Expand All @@ -33,6 +38,7 @@ import (
)

type clientStatsHandler struct {
statsHandler
estats.MetricsRecorder
options Options
clientMetrics clientMetrics
Expand Down Expand Up @@ -68,6 +74,15 @@ func (h *clientStatsHandler) initializeMetrics() {
rm.registerMetrics(metrics, meter)
}

func (h *clientStatsHandler) initializeTracing() {
if isTracingDisabled(h.options.TraceOptions) {
return
}

otel.SetTextMapPropagator(h.options.TraceOptions.TextMapPropagator)
otel.SetTracerProvider(h.options.TraceOptions.TracerProvider)
}

func (h *clientStatsHandler) unaryInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
ci := &callInfo{
target: cc.CanonicalTarget(),
Expand All @@ -85,8 +100,12 @@ func (h *clientStatsHandler) unaryInterceptor(ctx context.Context, method string
}

startTime := time.Now()
var span *trace.Span
if !isTracingDisabled(h.options.TraceOptions) {
purnesh42H marked this conversation as resolved.
Show resolved Hide resolved
ctx, span = h.createCallTraceSpan(ctx, method)
}
err := invoker(ctx, method, req, reply, cc, opts...)
h.perCallMetrics(ctx, err, startTime, ci)
h.perCallTracesAndMetrics(ctx, err, startTime, ci, span)
return err
}

Expand Down Expand Up @@ -119,22 +138,50 @@ func (h *clientStatsHandler) streamInterceptor(ctx context.Context, desc *grpc.S
}

startTime := time.Now()

var span *trace.Span
if !isTracingDisabled(h.options.TraceOptions) {
purnesh42H marked this conversation as resolved.
Show resolved Hide resolved
ctx, span = h.createCallTraceSpan(ctx, method)
}
callback := func(err error) {
h.perCallMetrics(ctx, err, startTime, ci)
h.perCallTracesAndMetrics(ctx, err, startTime, ci, span)
}
opts = append([]grpc.CallOption{grpc.OnFinish(callback)}, opts...)
return streamer(ctx, desc, cc, method, opts...)
}

func (h *clientStatsHandler) perCallMetrics(ctx context.Context, err error, startTime time.Time, ci *callInfo) {
callLatency := float64(time.Since(startTime)) / float64(time.Second) // calculate ASAP
attrs := otelmetric.WithAttributeSet(otelattribute.NewSet(
otelattribute.String("grpc.method", ci.method),
otelattribute.String("grpc.target", ci.target),
otelattribute.String("grpc.status", canonicalString(status.Code(err))),
))
h.clientMetrics.callDuration.Record(ctx, callLatency, attrs)
// perCallTracesAndMetrics records per call trace spans and metrics.
func (h *clientStatsHandler) perCallTracesAndMetrics(ctx context.Context, err error, startTime time.Time, ci *callInfo, ts *trace.Span) {
if !isTracingDisabled(h.options.TraceOptions) && ts != nil {
s := status.Convert(err)
if s.Code() == grpccodes.OK {
(*ts).SetStatus(otelcodes.Ok, s.Message())
} else {
(*ts).SetStatus(otelcodes.Error, s.Message())
}
(*ts).End()
}
if !isMetricsDisabled(h.options.MetricsOptions) {
callLatency := float64(time.Since(startTime)) / float64(time.Second)
attrs := otelmetric.WithAttributeSet(otelattribute.NewSet(
otelattribute.String("grpc.method", ci.method),
otelattribute.String("grpc.target", ci.target),
otelattribute.String("grpc.status", canonicalString(status.Code(err))),
))
h.clientMetrics.callDuration.Record(ctx, callLatency, attrs)
}
}

// createCallTraceSpan creates a call span to put in the provided context using
// provided TraceProvider. If TraceProvider is nil, it returns context as is.
func (h *clientStatsHandler) createCallTraceSpan(ctx context.Context, method string) (context.Context, *trace.Span) {
if h.options.TraceOptions.TracerProvider == nil {
logger.Error("TraceProvider is not provided in trace options")
return ctx, nil
}
mn := strings.Replace(removeLeadingSlash(method), "/", ".", -1)
tracer := otel.Tracer("grpc-open-telemetry")
ctx, span := tracer.Start(ctx, mn, trace.WithSpanKind(trace.SpanKindClient))
return ctx, &span
}

// TagConn exists to satisfy stats.Handler.
Expand Down Expand Up @@ -163,15 +210,17 @@ func (h *clientStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo)
}
ctx = istats.SetLabels(ctx, labels)
}
ai := &attemptInfo{ // populates information about RPC start.
startTime: time.Now(),
xdsLabels: labels.TelemetryLabels,
method: info.FullMethodName,
ai := &attemptInfo{}
startTime := time.Now()
if !isTracingDisabled(h.options.TraceOptions) {
ctx, ai = h.traceTagRPC(ctx, info)
}
ri := &rpcInfo{
ai.startTime = startTime
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you change how this is set?

Why are you setting a local 3 lines above to time.Now() and then assigning this field to that?

Copy link
Contributor Author

@aranjans aranjans Dec 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated now, thAnks.

ai.xdsLabels = labels.TelemetryLabels
ai.method = info.FullMethodName
return setRPCInfo(ctx, &rpcInfo{
ai: ai,
}
return setRPCInfo(ctx, ri)
})
}

func (h *clientStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
Expand All @@ -180,7 +229,12 @@ func (h *clientStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
logger.Error("ctx passed into client side stats handler metrics event handling has no client attempt data present")
return
}
h.processRPCEvent(ctx, rs, ri.ai)
if !isMetricsDisabled(h.options.MetricsOptions) {
h.processRPCEvent(ctx, rs, ri.ai)
}
if !isTracingDisabled(h.options.TraceOptions) {
h.populateSpan(ctx, rs, ri.ai)
}
}

func (h *clientStatsHandler) processRPCEvent(ctx context.Context, s stats.RPCStats, ai *attemptInfo) {
Expand Down
Loading
Loading