diff --git a/go.mod b/go.mod index 1bbd024d22c1..9b3d296cc882 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( go.opentelemetry.io/otel/metric v1.31.0 go.opentelemetry.io/otel/sdk v1.31.0 go.opentelemetry.io/otel/sdk/metric v1.31.0 + go.opentelemetry.io/otel/trace v1.31.0 golang.org/x/net v0.30.0 golang.org/x/oauth2 v0.23.0 golang.org/x/sync v0.8.0 @@ -32,7 +33,6 @@ require ( github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect - go.opentelemetry.io/otel/trace v1.31.0 // indirect golang.org/x/text v0.19.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20241015192408-796eee8c2d53 // indirect ) diff --git a/stats/opentelemetry/e2e_test.go b/stats/opentelemetry/e2e_test.go index 67646b411825..61874d8a55fe 100644 --- a/stats/opentelemetry/e2e_test.go +++ b/stats/opentelemetry/e2e_test.go @@ -633,7 +633,7 @@ func (s) TestClientCallSpanEvents(t *testing.T) { // Create a parent span for the client call ctx, _ = otel.Tracer("grpc-open-telemetry").Start(ctx, "test-parent-span") md, _ := metadata.FromOutgoingContext(ctx) - otel.GetTextMapPropagator().Inject(ctx, otelinternaltracing.NewIncomingCarrier(ctx)) + otel.GetTextMapPropagator().Inject(ctx, otelinternaltracing.NewOutgoingCarrier(ctx)) ctx = metadata.NewOutgoingContext(ctx, md) // Make a unary RPC @@ -690,7 +690,7 @@ func (s) TestServerWithMetricsAndTraceOptions(t *testing.T) { // Create a parent span for the client call ctx, _ = otel.Tracer("grpc-open-telemetry").Start(ctx, "test-parent-span") md, _ := metadata.FromOutgoingContext(ctx) - otel.GetTextMapPropagator().Inject(ctx, otelinternaltracing.NewIncomingCarrier(ctx)) + otel.GetTextMapPropagator().Inject(ctx, otelinternaltracing.NewOutgoingCarrier(ctx)) ctx = metadata.NewOutgoingContext(ctx, md) // Make two RPC's, a unary RPC and a streaming RPC. These should cause @@ -759,7 +759,7 @@ func (s) TestGrpcTraceBinPropagator(t *testing.T) { // Create a parent span for the client call ctx, _ = otel.Tracer("grpc-open-telemetry").Start(ctx, "test-parent-span") md, _ := metadata.FromOutgoingContext(ctx) - otel.GetTextMapPropagator().Inject(ctx, otelinternaltracing.NewIncomingCarrier(ctx)) + otel.GetTextMapPropagator().Inject(ctx, otelinternaltracing.NewOutgoingCarrier(ctx)) ctx = metadata.NewOutgoingContext(ctx, md) // Make a unary RPC if _, err := ss.Client.UnaryCall( diff --git a/stats/opentelemetry/grpc_trace_bin_propagator_test.go b/stats/opentelemetry/grpc_trace_bin_propagator_test.go index 2a9786921143..3ecd69ad1559 100644 --- a/stats/opentelemetry/grpc_trace_bin_propagator_test.go +++ b/stats/opentelemetry/grpc_trace_bin_propagator_test.go @@ -23,6 +23,7 @@ import ( "testing" "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" oteltrace "go.opentelemetry.io/otel/trace" "google.golang.org/grpc/metadata" itracing "google.golang.org/grpc/stats/opentelemetry/internal/tracing" @@ -44,19 +45,25 @@ var validSpanContext = oteltrace.SpanContext{}.WithTraceID( // header is not set in the carrier's context metadata. func (s) TestInject(t *testing.T) { tests := []struct { - name string - injectSC oteltrace.SpanContext - wantSC oteltrace.SpanContext + name string + injectSC oteltrace.SpanContext + incomingMd metadata.MD + outgoingMd metadata.MD + wantKeys []string }{ { - name: "valid OpenTelemetry span context", - injectSC: validSpanContext, - wantSC: validSpanContext, + name: "valid OpenTelemetry span context", + injectSC: validSpanContext, + incomingMd: metadata.MD{"incoming-key": []string{"incoming-value"}}, + outgoingMd: metadata.MD{"outgoing-key": []string{"outgoing-value"}}, + wantKeys: []string{grpcTraceBinHeaderKey, "outgoing-key"}, }, { - name: "invalid OpenTelemetry span context", - injectSC: oteltrace.SpanContext{}, - wantSC: oteltrace.SpanContext{}, + name: "invalid OpenTelemetry span context", + injectSC: oteltrace.SpanContext{}, + incomingMd: metadata.MD{"incoming-key": []string{"incoming-value"}}, + outgoingMd: metadata.MD{"outgoing-key": []string{"outgoing-value"}}, + wantKeys: []string{"outgoing-key"}, }, } @@ -66,27 +73,32 @@ func (s) TestInject(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() ctx = oteltrace.ContextWithSpanContext(ctx, test.injectSC) + ctx = metadata.NewIncomingContext(ctx, test.incomingMd) + ctx = metadata.NewOutgoingContext(ctx, test.outgoingMd) - c := itracing.NewIncomingCarrier(ctx) + c := itracing.NewOutgoingCarrier(ctx) p.Inject(ctx, c) + if gotKeys := c.Keys(); !cmp.Equal(test.wantKeys, gotKeys, cmpopts.SortSlices(func(a, b string) bool { return a < b })) { + t.Errorf("c.Keys() = keys %v, want %v", gotKeys, test.wantKeys) + } md, _ := metadata.FromOutgoingContext(c.Context()) gotH := md.Get(grpcTraceBinHeaderKey) - if !test.wantSC.IsValid() { + if !test.injectSC.IsValid() { if len(gotH) > 0 { - t.Fatalf("got non-empty value from Carrier's context metadata grpc-trace-bin header, want empty") + t.Fatalf("got %v value from Carrier's context metadata grpc-trace-bin header, want empty", gotH) } return } if gotH[len(gotH)-1] == "" { - t.Fatalf("got empty value from Carrier's context metadata grpc-trace-bin header, want valid span context: %v", test.wantSC) + t.Fatalf("got empty value from Carrier's context metadata grpc-trace-bin header, want valid span context: %v", test.injectSC) } gotSC, ok := fromBinary([]byte(gotH[len(gotH)-1])) if !ok { - t.Fatalf("got invalid span context from Carrier's context metadata grpc-trace-bin header, want valid span context: %v", test.wantSC) + t.Fatalf("got invalid span context %v from Carrier's context metadata grpc-trace-bin header, want valid span context: %v", gotSC, test.injectSC) } - if test.wantSC.TraceID() != gotSC.TraceID() && test.wantSC.SpanID() != gotSC.SpanID() && test.wantSC.TraceFlags() != gotSC.TraceFlags() { - t.Fatalf("got span context = %v, want span contexts %v", gotSC, test.wantSC) + if test.injectSC.TraceID() != gotSC.TraceID() && test.injectSC.SpanID() != gotSC.SpanID() && test.injectSC.TraceFlags() != gotSC.TraceFlags() { + t.Fatalf("got span context = %v, want span contexts %v", gotSC, test.injectSC) } }) } @@ -102,28 +114,41 @@ func (s) TestInject(t *testing.T) { // context is not extracted. func (s) TestExtract(t *testing.T) { tests := []struct { - name string - wantSC oteltrace.SpanContext // expected span context from carrier + name string + wantSC oteltrace.SpanContext // expected span context from carrier + incomingMd metadata.MD + outgoingMd metadata.MD + wantKeys []string }{ { - name: "valid OpenTelemetry span context", - wantSC: validSpanContext.WithRemote(true), + name: "valid OpenTelemetry span context", + wantSC: validSpanContext.WithRemote(true), + incomingMd: metadata.MD{grpcTraceBinHeaderKey: []string{string(toBinary(validSpanContext.WithRemote(true)))}, "incoming-key": []string{"incoming-value"}}, + outgoingMd: metadata.MD{"outgoing-key": []string{"outgoing-value"}}, + wantKeys: []string{grpcTraceBinHeaderKey, "incoming-key"}, }, { - name: "invalid OpenTelemetry span context", - wantSC: oteltrace.SpanContext{}, + name: "invalid OpenTelemetry span context", + wantSC: oteltrace.SpanContext{}, + incomingMd: metadata.MD{grpcTraceBinHeaderKey: []string{string(toBinary(oteltrace.SpanContext{}))}, "incoming-key": []string{"incoming-value"}}, + outgoingMd: metadata.MD{"outgoing-key": []string{"outgoing-value"}}, + wantKeys: []string{grpcTraceBinHeaderKey, "incoming-key"}, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { p := GRPCTraceBinPropagator{} - bd := toBinary(test.wantSC) ctx, cancel := context.WithCancel(context.Background()) defer cancel() + ctx = metadata.NewIncomingContext(ctx, test.incomingMd) + ctx = metadata.NewOutgoingContext(ctx, test.outgoingMd) - c := itracing.NewOutgoingCarrier(metadata.NewIncomingContext(ctx, metadata.MD{grpcTraceBinHeaderKey: []string{string(bd)}})) + c := itracing.NewIncomingCarrier(ctx) + if gotKeys := c.Keys(); !cmp.Equal(test.wantKeys, gotKeys, cmpopts.SortSlices(func(a, b string) bool { return a < b })) { + t.Errorf("c.Keys() = keys %v, want %v", gotKeys, test.wantKeys) + } tCtx := p.Extract(ctx, c) got := oteltrace.SpanContextFromContext(tCtx) if !got.Equal(test.wantSC) { diff --git a/stats/opentelemetry/internal/tracing/custom_map_carrier.go b/stats/opentelemetry/internal/tracing/custom_map_carrier.go index fc9bf8008855..214102aaf97a 100644 --- a/stats/opentelemetry/internal/tracing/custom_map_carrier.go +++ b/stats/opentelemetry/internal/tracing/custom_map_carrier.go @@ -23,47 +23,33 @@ package tracing import ( "context" + "google.golang.org/grpc/grpclog" "google.golang.org/grpc/metadata" ) -// propagationDirection specifies whether the propagation is incoming or -// outgoing. -type propagationDirection int +var logger = grpclog.Component("otel-plugin") -const ( - Incoming propagationDirection = iota // Incoming propagation direction - Outgoing // Outgoing propagation direction -) - -// Carrier is a TextMapCarrier that uses `context.Context` to store and -// retrieve any propagated key-value pairs in text format. The propagation -// direction (incoming or outgoing) determines which keys should the `Keys()` -// method returns. -type Carrier struct { - ctx context.Context - direction propagationDirection -} - -// NewIncomingCarrier creates a new Carrier with the given context and -// incoming propagation direction. -func NewIncomingCarrier(ctx context.Context) *Carrier { - return &Carrier{ctx: ctx, direction: Incoming} +// IncomingCarrier is a TextMapCarrier that uses incoming `context.Context` to +// retrieve any propagated key-value pairs in text format. +type IncomingCarrier struct { + ctx context.Context } -// NewOutgoingCarrier creates a new Carrier with the given context and -// outgoing propagation direction. -func NewOutgoingCarrier(ctx context.Context) *Carrier { - return &Carrier{ctx: ctx, direction: Outgoing} +// NewIncomingCarrier creates a new `IncomingCarrier` with the given context. +// The incoming carrier should be used with propagator's `Extract()` method in +// the incoming rpc path. +func NewIncomingCarrier(ctx context.Context) *IncomingCarrier { + return &IncomingCarrier{ctx: ctx} } // Get returns the string value associated with the passed key from the -// carrier's context metadata. +// carrier's incoming context metadata. // // It returns an empty string if the key is not present in the carrier's // context or if the value associated with the key is empty. // // If multiple values are present for a key, it returns the last one. -func (c *Carrier) Get(key string) string { +func (c *IncomingCarrier) Get(key string) string { values := metadata.ValueFromIncomingContext(c.ctx, key) if len(values) == 0 { return "" @@ -71,27 +57,63 @@ func (c *Carrier) Get(key string) string { return values[len(values)-1] } -// Set stores the key-value pair in the carrier's context metadata. +// Set just logs an error. It implements the `TextMapCarrier` interface but +// should not be used with `IncomingCarrier`. +func (c *IncomingCarrier) Set(string, string) { + logger.Error("Set() should not be used with IncomingCarrier.") +} + +// Keys returns the keys stored in the carrier's context metadata. It returns +// keys from incoming context metadata. +func (c *IncomingCarrier) Keys() []string { + md, ok := metadata.FromIncomingContext(c.ctx) + if !ok { + return nil + } + keys := make([]string, 0, len(md)) + for key := range md { + keys = append(keys, key) + } + return keys +} + +// Context returns the underlying context associated with the +// `IncomingCarrier“. +func (c *IncomingCarrier) Context() context.Context { + return c.ctx +} + +// OutgoingCarrier is a TextMapCarrier that uses outgoing `context.Context` to +// store any propagated key-value pairs in text format. +type OutgoingCarrier struct { + ctx context.Context +} + +// NewOutgoingCarrier creates a new Carrier with the given context. The +// outgoing carrier should be used with propagator's `Inject()` method in the +// outgoing rpc path. +func NewOutgoingCarrier(ctx context.Context) *OutgoingCarrier { + return &OutgoingCarrier{ctx: ctx} +} + +// Get just logs an error and returns an empty string. It implements the +// `TextMapCarrier` interface but should not be used with `OutgoingCarrier`. +func (c *OutgoingCarrier) Get(string) string { + logger.Error("Get() should not be used with `OutgoingCarrier`") + return "" +} + +// Set stores the key-value pair in the carrier's outgoing context metadata. // // If the key already exists, given value is appended to the last. -func (c *Carrier) Set(key, value string) { +func (c *OutgoingCarrier) Set(key, value string) { c.ctx = metadata.AppendToOutgoingContext(c.ctx, key, value) } // Keys returns the keys stored in the carrier's context metadata. It returns -// keys from outgoing context metadata if propagation direction is outgoing, -// otherwise it returns keys from incoming context metadata. -func (c *Carrier) Keys() []string { - var md metadata.MD - var ok bool - - switch c.direction { - case Outgoing: - md, ok = metadata.FromOutgoingContext(c.ctx) - case Incoming: - md, ok = metadata.FromIncomingContext(c.ctx) - } - +// keys from outgoing context metadata. +func (c *OutgoingCarrier) Keys() []string { + md, ok := metadata.FromOutgoingContext(c.ctx) if !ok { return nil } @@ -102,7 +124,8 @@ func (c *Carrier) Keys() []string { return keys } -// Context returns the underlying context associated with the Carrier. -func (c *Carrier) Context() context.Context { +// Context returns the underlying context associated with the +// `OutgoingCarrier“. +func (c *OutgoingCarrier) Context() context.Context { return c.ctx } diff --git a/stats/opentelemetry/internal/tracing/custom_map_carrier_test.go b/stats/opentelemetry/internal/tracing/custom_map_carrier_test.go index 1ab4bedadf9d..a5294ca2edae 100644 --- a/stats/opentelemetry/internal/tracing/custom_map_carrier_test.go +++ b/stats/opentelemetry/internal/tracing/custom_map_carrier_test.go @@ -36,16 +36,16 @@ func Test(t *testing.T) { grpctest.RunSubTests(t, s{}) } -// TestGet verifies that `Carrier.Get()` returns correct value for the -// corresponding key in the carrier's context metadata, if key is present. If -// key is not present, it verifies that empty string is returned. +// TestIncomingCarrier verifies that `IncomingCarrier.Get()` returns correct +// value for the corresponding key in the carrier's context metadata, if key is +// present. If key is not present, it verifies that empty string is returned. // // If multiple values are present for a key, it verifies that last value is // returned. // // If key ends with `-bin`, it verifies that a correct binary value is returned // in the string format for the binary header. -func (s) TestGet(t *testing.T) { +func (s) TestIncomingCarrier(t *testing.T) { tests := []struct { name string md metadata.MD @@ -120,17 +120,17 @@ func (s) TestGet(t *testing.T) { } } -// TestSet verifies that a key-value pair is set in carrier's context metadata -// using `Carrier.Set()`. If key is not present, it verifies that -// key-value pair is insterted. If key is already present, it verifies that new -// value is appended at the end of list for the existing key. +// TestOutgoingCarrier verifies that a key-value pair is set in carrier's +// context metadata using `OutgoingCarrier.Set()`. If key is not present, it +// verifies that key-value pair is inserted. If key is already present, it +// verifies that new value is appended at the end of list for the existing key. // // If key ends with `-bin`, it verifies that a binary value is set for // `-bin` header in string format. // // It also verifies that both existing and newly inserted keys are present in // the carrier's context using `Carrier.Keys()`. -func (s) TestSet(t *testing.T) { +func (s) TestOutgoingCarrier(t *testing.T) { tests := []struct { name string initialMD metadata.MD @@ -188,43 +188,3 @@ func (s) TestSet(t *testing.T) { }) } } - -func TestKeys(t *testing.T) { - tests := []struct { - name string - direction propagationDirection - md metadata.MD - want []string - }{ - { - name: "outgoing ignores incoming", - direction: Outgoing, - md: metadata.MD{"incoming-key": []string{"incoming-value"}, "outgoing-key": []string{"outgoing-value"}}, - want: []string{"outgoing-key"}, - }, - { - name: "incoming ignores outgoing", - direction: Incoming, - md: metadata.MD{"incoming-key": []string{"incoming-value"}, "outgoing-key": []string{"outgoing-value"}}, - want: []string{"incoming-key"}, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - ctx = metadata.NewIncomingContext(ctx, metadata.MD{"incoming-key": test.md["incoming-key"]}) - ctx = metadata.NewOutgoingContext(ctx, metadata.MD{"outgoing-key": test.md["outgoing-key"]}) - var c *Carrier - if test.direction == Incoming { - c = NewIncomingCarrier(ctx) - } else { - c = NewOutgoingCarrier(ctx) - } - if got := c.Keys(); !cmp.Equal(test.want, got, cmpopts.SortSlices(func(a, b string) bool { return a < b })) { - t.Fatalf("c.Keys() = %v, want %v", got, test.want) - } - }) - } -} diff --git a/stats/opentelemetry/trace.go b/stats/opentelemetry/trace.go index fea3e4ec30e8..31a4eebb0fd7 100644 --- a/stats/opentelemetry/trace.go +++ b/stats/opentelemetry/trace.go @@ -45,7 +45,7 @@ func (h *clientStatsHandler) traceTagRPC(ctx context.Context, rti *stats.RPCTagI tracer := otel.Tracer("grpc-open-telemetry") ctx, span := tracer.Start(ctx, mn) - carrier := otelinternaltracing.NewIncomingCarrier(ctx) // Use internal custom carrier to inject + carrier := otelinternaltracing.NewOutgoingCarrier(ctx) // Use internal custom carrier to inject otel.GetTextMapPropagator().Inject(ctx, carrier) return carrier.Context(), &attemptInfo{ diff --git a/stream.go b/stream.go index a7e88aa5884f..4929e64bc865 100644 --- a/stream.go +++ b/stream.go @@ -605,8 +605,6 @@ type csAttempt struct { // trInfo.tr is set when created (if EnableTracing is true), // and cleared when the finish method is called. trInfo *traceInfo - // nameResolutionDelayed is true if name resolution is delayed. - nameResolutionDelayed bool statsHandlers []stats.Handler beginTime time.Time