Skip to content

Commit

Permalink
refactor and addressed comments from doug
Browse files Browse the repository at this point in the history
  • Loading branch information
aranjans committed Dec 23, 2024
1 parent 1cb4396 commit b8fe8db
Show file tree
Hide file tree
Showing 10 changed files with 125 additions and 113 deletions.
File renamed without changes.
35 changes: 13 additions & 22 deletions stats/opentelemetry/client_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,6 @@ func (h *clientStatsHandler) initializeMetrics() {
rm.registerMetrics(metrics, meter)
}

func (h *clientStatsHandler) initializeTracing() {
if isTracingDisabled(h.options.TraceOptions) {
return
}

otel.SetTextMapPropagator(h.options.TraceOptions.TextMapPropagator)
otel.SetTracerProvider(h.options.TraceOptions.TracerProvider)
}

func (h *clientStatsHandler) unaryInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
ci := &callInfo{
target: cc.CanonicalTarget(),
Expand All @@ -101,7 +92,7 @@ func (h *clientStatsHandler) unaryInterceptor(ctx context.Context, method string

startTime := time.Now()
var span *trace.Span
if !isTracingDisabled(h.options.TraceOptions) {
if h.options.isTracingEnabled() {
ctx, span = h.createCallTraceSpan(ctx, method)
}
err := invoker(ctx, method, req, reply, cc, opts...)
Expand Down Expand Up @@ -139,7 +130,7 @@ func (h *clientStatsHandler) streamInterceptor(ctx context.Context, desc *grpc.S

startTime := time.Now()
var span *trace.Span
if !isTracingDisabled(h.options.TraceOptions) {
if h.options.isTracingEnabled() {
ctx, span = h.createCallTraceSpan(ctx, method)
}
callback := func(err error) {
Expand All @@ -151,7 +142,7 @@ func (h *clientStatsHandler) streamInterceptor(ctx context.Context, desc *grpc.S

// perCallTracesAndMetrics records per call trace spans and metrics.
func (h *clientStatsHandler) perCallTracesAndMetrics(ctx context.Context, err error, startTime time.Time, ci *callInfo, ts *trace.Span) {
if !isTracingDisabled(h.options.TraceOptions) && ts != nil {
if h.options.isTracingEnabled() && ts != nil {
s := status.Convert(err)
if s.Code() == grpccodes.OK {
(*ts).SetStatus(otelcodes.Ok, s.Message())
Expand All @@ -160,7 +151,7 @@ func (h *clientStatsHandler) perCallTracesAndMetrics(ctx context.Context, err er
}

Check warning on line 151 in stats/opentelemetry/client_metrics.go

View check run for this annotation

Codecov / codecov/patch

stats/opentelemetry/client_metrics.go#L150-L151

Added lines #L150 - L151 were not covered by tests
(*ts).End()
}
if !isMetricsDisabled(h.options.MetricsOptions) {
if h.options.isMetricsEnabled() {
callLatency := float64(time.Since(startTime)) / float64(time.Second)
attrs := otelmetric.WithAttributeSet(otelattribute.NewSet(
otelattribute.String("grpc.method", ci.method),
Expand Down Expand Up @@ -210,14 +201,14 @@ func (h *clientStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo)
}
ctx = istats.SetLabels(ctx, labels)
}
ai := &attemptInfo{}
startTime := time.Now()
if !isTracingDisabled(h.options.TraceOptions) {
ctx, ai = h.traceTagRPC(ctx, info)
ai := &attemptInfo{
startTime: time.Now(),
xdsLabels: labels.TelemetryLabels,
method: info.FullMethodName,
}
if h.options.isTracingEnabled() {
ctx, ai = h.traceTagRPC(ctx, info, ai)
}
ai.startTime = startTime
ai.xdsLabels = labels.TelemetryLabels
ai.method = info.FullMethodName
return setRPCInfo(ctx, &rpcInfo{
ai: ai,
})
Expand All @@ -229,10 +220,10 @@ func (h *clientStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
logger.Error("ctx passed into client side stats handler metrics event handling has no client attempt data present")
return
}
if !isMetricsDisabled(h.options.MetricsOptions) {
if h.options.isMetricsEnabled() {
h.processRPCEvent(ctx, rs, ri.ai)
}
if !isTracingDisabled(h.options.TraceOptions) {
if h.options.isTracingEnabled() {
h.populateSpan(ctx, rs, ri.ai)
}
}
Expand Down
38 changes: 38 additions & 0 deletions stats/opentelemetry/client_tracing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package opentelemetry

import (
"context"
"strings"

"go.opentelemetry.io/otel"
"google.golang.org/grpc/stats"
otelinternaltracing "google.golang.org/grpc/stats/opentelemetry/internal/tracing"
)

func (h *clientStatsHandler) initializeTracing() {
if !h.options.isTracingEnabled() {
return
}

otel.SetTextMapPropagator(h.options.TraceOptions.TextMapPropagator)
otel.SetTracerProvider(h.options.TraceOptions.TracerProvider)
}

// traceTagRPC populates provided context with a new span using the
// TextMapPropagator supplied in trace options and internal itracing.carrier.
// It creates a new outgoing carrier which serializes information about this
// span into gRPC Metadata, if TextMapPropagator is provided in the trace
// options. if TextMapPropagator is not provided, it returns the context as is.
func (h *clientStatsHandler) traceTagRPC(ctx context.Context, rti *stats.RPCTagInfo, ai *attemptInfo) (context.Context, *attemptInfo) {
if h.options.TraceOptions.TextMapPropagator == nil {
return ctx, nil
}

Check warning on line 29 in stats/opentelemetry/client_tracing.go

View check run for this annotation

Codecov / codecov/patch

stats/opentelemetry/client_tracing.go#L28-L29

Added lines #L28 - L29 were not covered by tests

mn := "Attempt." + strings.Replace(removeLeadingSlash(rti.FullMethodName), "/", ".", -1)
tracer := otel.Tracer("grpc-open-telemetry")
ctx, span := tracer.Start(ctx, mn)
carrier := otelinternaltracing.NewOutgoingCarrier(ctx)
otel.GetTextMapPropagator().Inject(ctx, carrier)
ai.traceSpan = span
return carrier.Context(), ai
}
7 changes: 4 additions & 3 deletions stats/opentelemetry/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/encoding/gzip"
experimental "google.golang.org/grpc/experimental/opentelemetry"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/stubserver"
Expand All @@ -57,7 +58,7 @@ import (
testpb "google.golang.org/grpc/interop/grpc_testing"
"google.golang.org/grpc/orca"
"google.golang.org/grpc/stats/opentelemetry"
"google.golang.org/grpc/stats/opentelemetry/experimental"
expstats "google.golang.org/grpc/stats/opentelemetry/experimental"
"google.golang.org/grpc/stats/opentelemetry/internal/testutils"
)

Expand Down Expand Up @@ -98,7 +99,7 @@ func defaultTraceOptions(_ *testing.T) (*experimental.TraceOptions, *tracetest.I
spanExporter := tracetest.NewInMemoryExporter()
spanProcessor := trace.NewSimpleSpanProcessor(spanExporter)
tracerProvider := trace.NewTracerProvider(trace.WithSpanProcessor(spanProcessor))
textMapPropagator := propagation.NewCompositeTextMapPropagator(experimental.GRPCTraceBinPropagator{})
textMapPropagator := propagation.NewCompositeTextMapPropagator(expstats.GRPCTraceBinPropagator{})
traceOptions := &experimental.TraceOptions{
TracerProvider: tracerProvider,
TextMapPropagator: textMapPropagator,
Expand Down Expand Up @@ -636,7 +637,7 @@ func (s) TestMetricsAndTracesOptionEnabled(t *testing.T) {
ss := setupStubServer(t, mo, to)
defer ss.Stop()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout*2)
defer cancel()

// Make two RPC's, a unary RPC and a streaming RPC. These should cause
Expand Down
4 changes: 2 additions & 2 deletions stats/opentelemetry/metricsregistry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
)

var DefaultTestTimeout = 5 * time.Second
var defaultTestTimeout = 5 * time.Second

type s struct {
grpctest.Tester
Expand Down Expand Up @@ -130,7 +130,7 @@ func (s) TestMetricsRegistryMetrics(t *testing.T) {
Default: true,
})

ctx, cancel := context.WithTimeout(context.Background(), DefaultTestTimeout)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

// Only float optional labels are configured, so only float optional labels should show up.
Expand Down
30 changes: 15 additions & 15 deletions stats/opentelemetry/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,26 +33,26 @@ import (
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
experimental "google.golang.org/grpc/experimental/opentelemetry"
estats "google.golang.org/grpc/experimental/stats"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/stats/opentelemetry/experimental"
otelinternal "google.golang.org/grpc/stats/opentelemetry/internal"
)

var logger = grpclog.Component("otel-plugin")

var canonicalString = internal.CanonicalString.(func(codes.Code) string)

var joinDialOptions = internal.JoinDialOptions.(func(...grpc.DialOption) grpc.DialOption)

func init() {
otelinternal.SetPluginOption = func(o *Options, po otelinternal.PluginOption) {
o.MetricsOptions.pluginOption = po
}
}

var (
logger = grpclog.Component("otel-plugin")
canonicalString = internal.CanonicalString.(func(codes.Code) string)
joinDialOptions = internal.JoinDialOptions.(func(...grpc.DialOption) grpc.DialOption)
)

// Options are the options for OpenTelemetry instrumentation.
type Options struct {
// MetricsOptions are the metrics options for OpenTelemetry instrumentation.
Expand All @@ -61,6 +61,14 @@ type Options struct {
TraceOptions experimental.TraceOptions
}

func (o *Options) isMetricsEnabled() bool {
return o.MetricsOptions.MeterProvider != nil
}

func (o *Options) isTracingEnabled() bool {
return !(o.TraceOptions.TracerProvider == nil || o.TraceOptions.TextMapPropagator == nil)
}

// MetricsOptions are the metrics options for OpenTelemetry instrumentation.
type MetricsOptions struct {
// MeterProvider is the MeterProvider instance that will be used to create
Expand Down Expand Up @@ -176,14 +184,6 @@ func removeLeadingSlash(mn string) string {
return strings.TrimLeft(mn, "/")
}

func isMetricsDisabled(mo MetricsOptions) bool {
return mo.MeterProvider == nil
}

func isTracingDisabled(to experimental.TraceOptions) bool {
return to.TracerProvider == nil || to.TextMapPropagator == nil
}

// attemptInfo is RPC information scoped to the RPC attempt life span client
// side, and the RPC life span server side.
type attemptInfo struct {
Expand Down
26 changes: 8 additions & 18 deletions stats/opentelemetry/server_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"

"go.opentelemetry.io/otel"
otelattribute "go.opentelemetry.io/otel/attribute"
otelmetric "go.opentelemetry.io/otel/metric"
)
Expand Down Expand Up @@ -68,15 +67,6 @@ func (h *serverStatsHandler) initializeMetrics() {
rm.registerMetrics(metrics, meter)
}

func (h *serverStatsHandler) initializeTracing() {
if !isTracingDisabled(h.options.TraceOptions) {
return
}

otel.SetTextMapPropagator(h.options.TraceOptions.TextMapPropagator)
otel.SetTracerProvider(h.options.TraceOptions.TracerProvider)
}

// attachLabelsTransportStream intercepts SetHeader and SendHeader calls of the
// underlying ServerTransportStream to attach metadataExchangeLabels.
type attachLabelsTransportStream struct {
Expand Down Expand Up @@ -208,13 +198,13 @@ func (h *serverStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo)
}
}

ai := &attemptInfo{}
startTime := time.Now()
if !isTracingDisabled(h.options.TraceOptions) {
ctx, ai = h.traceTagRPC(ctx, info)
ai := &attemptInfo{
startTime: time.Now(),
method: removeLeadingSlash(method),
}
if h.options.isTracingEnabled() {
ctx, ai = h.traceTagRPC(ctx, info, ai)
}
ai.startTime = startTime
ai.method = removeLeadingSlash(method)
return setRPCInfo(ctx, &rpcInfo{
ai: ai,
})
Expand All @@ -227,10 +217,10 @@ func (h *serverStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
logger.Error("ctx passed into server side stats handler metrics event handling has no server call data present")
return
}
if !isTracingDisabled(h.options.TraceOptions) {
if h.options.isTracingEnabled() {
h.populateSpan(ctx, rs, ri.ai)
}
if !isMetricsDisabled(h.options.MetricsOptions) {
if h.options.isMetricsEnabled() {
h.processRPCData(ctx, rs, ri.ai)
}
}
Expand Down
44 changes: 44 additions & 0 deletions stats/opentelemetry/server_tracing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package opentelemetry

import (
"context"
"strings"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc/stats"
otelinternaltracing "google.golang.org/grpc/stats/opentelemetry/internal/tracing"
)

func (h *serverStatsHandler) initializeTracing() {
if !h.options.isTracingEnabled() {
return
}

otel.SetTextMapPropagator(h.options.TraceOptions.TextMapPropagator)
otel.SetTracerProvider(h.options.TraceOptions.TracerProvider)
}

// traceTagRPC populates context with new span data using the TextMapPropagator
// supplied in trace options and internal itracing.Carrier. It creates a new
// incoming carrier which extracts an existing span context (if present) by
// deserializing from provided context. If valid span context is extracted, it
// is set as parent of the new span otherwise new span remains the root span.
// If TextMapPropagator is not provided in the trace options, it returns context
// as is.
func (h *serverStatsHandler) traceTagRPC(ctx context.Context, rti *stats.RPCTagInfo, ai *attemptInfo) (context.Context, *attemptInfo) {
if h.options.TraceOptions.TextMapPropagator == nil {
return ctx, nil
}

Check warning on line 32 in stats/opentelemetry/server_tracing.go

View check run for this annotation

Codecov / codecov/patch

stats/opentelemetry/server_tracing.go#L31-L32

Added lines #L31 - L32 were not covered by tests

mn := strings.Replace(removeLeadingSlash(rti.FullMethodName), "/", ".", -1)
var span trace.Span
tracer := otel.Tracer("grpc-open-telemetry")
ctx = otel.GetTextMapPropagator().Extract(ctx, otelinternaltracing.NewIncomingCarrier(ctx))
// If the context.Context provided in `ctx` to tracer.Start(), contains a
// span then the newly-created Span will be a child of that span,
// otherwise it will be a root span.
ctx, span = tracer.Start(ctx, mn, trace.WithSpanKind(trace.SpanKindServer))
ai.traceSpan = span
return ctx, ai
}
Loading

0 comments on commit b8fe8db

Please sign in to comment.