Skip to content

Commit

Permalink
Use NewOutgoingCarrier to inject into GRPCTraceBinPropagator
Browse files Browse the repository at this point in the history
  • Loading branch information
aranjans committed Nov 21, 2024
1 parent 2a26fc2 commit 8cb8222
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 124 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
go.opentelemetry.io/otel/metric v1.31.0
go.opentelemetry.io/otel/sdk v1.31.0
go.opentelemetry.io/otel/sdk/metric v1.31.0
go.opentelemetry.io/otel/trace v1.31.0
golang.org/x/net v0.30.0
golang.org/x/oauth2 v0.23.0
golang.org/x/sync v0.8.0
Expand All @@ -32,7 +33,6 @@ require (
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect
go.opentelemetry.io/otel/trace v1.31.0 // indirect
golang.org/x/text v0.19.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20241015192408-796eee8c2d53 // indirect
)
6 changes: 3 additions & 3 deletions stats/opentelemetry/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@ func (s) TestClientCallSpanEvents(t *testing.T) {
// Create a parent span for the client call
ctx, _ = otel.Tracer("grpc-open-telemetry").Start(ctx, "test-parent-span")
md, _ := metadata.FromOutgoingContext(ctx)
otel.GetTextMapPropagator().Inject(ctx, otelinternaltracing.NewIncomingCarrier(ctx))
otel.GetTextMapPropagator().Inject(ctx, otelinternaltracing.NewOutgoingCarrier(ctx))
ctx = metadata.NewOutgoingContext(ctx, md)

// Make a unary RPC
Expand Down Expand Up @@ -690,7 +690,7 @@ func (s) TestServerWithMetricsAndTraceOptions(t *testing.T) {
// Create a parent span for the client call
ctx, _ = otel.Tracer("grpc-open-telemetry").Start(ctx, "test-parent-span")
md, _ := metadata.FromOutgoingContext(ctx)
otel.GetTextMapPropagator().Inject(ctx, otelinternaltracing.NewIncomingCarrier(ctx))
otel.GetTextMapPropagator().Inject(ctx, otelinternaltracing.NewOutgoingCarrier(ctx))
ctx = metadata.NewOutgoingContext(ctx, md)

// Make two RPC's, a unary RPC and a streaming RPC. These should cause
Expand Down Expand Up @@ -759,7 +759,7 @@ func (s) TestGrpcTraceBinPropagator(t *testing.T) {
// Create a parent span for the client call
ctx, _ = otel.Tracer("grpc-open-telemetry").Start(ctx, "test-parent-span")
md, _ := metadata.FromOutgoingContext(ctx)
otel.GetTextMapPropagator().Inject(ctx, otelinternaltracing.NewIncomingCarrier(ctx))
otel.GetTextMapPropagator().Inject(ctx, otelinternaltracing.NewOutgoingCarrier(ctx))
ctx = metadata.NewOutgoingContext(ctx, md)
// Make a unary RPC
if _, err := ss.Client.UnaryCall(
Expand Down
73 changes: 49 additions & 24 deletions stats/opentelemetry/grpc_trace_bin_propagator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"testing"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
oteltrace "go.opentelemetry.io/otel/trace"
"google.golang.org/grpc/metadata"
itracing "google.golang.org/grpc/stats/opentelemetry/internal/tracing"
Expand All @@ -44,19 +45,25 @@ var validSpanContext = oteltrace.SpanContext{}.WithTraceID(
// header is not set in the carrier's context metadata.
func (s) TestInject(t *testing.T) {
tests := []struct {
name string
injectSC oteltrace.SpanContext
wantSC oteltrace.SpanContext
name string
injectSC oteltrace.SpanContext
incomingMd metadata.MD
outgoingMd metadata.MD
wantKeys []string
}{
{
name: "valid OpenTelemetry span context",
injectSC: validSpanContext,
wantSC: validSpanContext,
name: "valid OpenTelemetry span context",
injectSC: validSpanContext,
incomingMd: metadata.MD{"incoming-key": []string{"incoming-value"}},
outgoingMd: metadata.MD{"outgoing-key": []string{"outgoing-value"}},
wantKeys: []string{grpcTraceBinHeaderKey, "outgoing-key"},
},
{
name: "invalid OpenTelemetry span context",
injectSC: oteltrace.SpanContext{},
wantSC: oteltrace.SpanContext{},
name: "invalid OpenTelemetry span context",
injectSC: oteltrace.SpanContext{},
incomingMd: metadata.MD{"incoming-key": []string{"incoming-value"}},
outgoingMd: metadata.MD{"outgoing-key": []string{"outgoing-value"}},
wantKeys: []string{"outgoing-key"},
},
}

Expand All @@ -66,27 +73,32 @@ func (s) TestInject(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx = oteltrace.ContextWithSpanContext(ctx, test.injectSC)
ctx = metadata.NewIncomingContext(ctx, test.incomingMd)
ctx = metadata.NewOutgoingContext(ctx, test.outgoingMd)

c := itracing.NewIncomingCarrier(ctx)
c := itracing.NewOutgoingCarrier(ctx)
p.Inject(ctx, c)

if gotKeys := c.Keys(); !cmp.Equal(test.wantKeys, gotKeys, cmpopts.SortSlices(func(a, b string) bool { return a < b })) {
t.Errorf("c.Keys() = keys %v, want %v", gotKeys, test.wantKeys)
}
md, _ := metadata.FromOutgoingContext(c.Context())
gotH := md.Get(grpcTraceBinHeaderKey)
if !test.wantSC.IsValid() {
if !test.injectSC.IsValid() {
if len(gotH) > 0 {
t.Fatalf("got non-empty value from Carrier's context metadata grpc-trace-bin header, want empty")
t.Fatalf("got %v value from Carrier's context metadata grpc-trace-bin header, want empty", gotH)
}
return
}
if gotH[len(gotH)-1] == "" {
t.Fatalf("got empty value from Carrier's context metadata grpc-trace-bin header, want valid span context: %v", test.wantSC)
t.Fatalf("got empty value from Carrier's context metadata grpc-trace-bin header, want valid span context: %v", test.injectSC)
}
gotSC, ok := fromBinary([]byte(gotH[len(gotH)-1]))
if !ok {
t.Fatalf("got invalid span context from Carrier's context metadata grpc-trace-bin header, want valid span context: %v", test.wantSC)
t.Fatalf("got invalid span context %v from Carrier's context metadata grpc-trace-bin header, want valid span context: %v", gotSC, test.injectSC)
}
if test.wantSC.TraceID() != gotSC.TraceID() && test.wantSC.SpanID() != gotSC.SpanID() && test.wantSC.TraceFlags() != gotSC.TraceFlags() {
t.Fatalf("got span context = %v, want span contexts %v", gotSC, test.wantSC)
if test.injectSC.TraceID() != gotSC.TraceID() && test.injectSC.SpanID() != gotSC.SpanID() && test.injectSC.TraceFlags() != gotSC.TraceFlags() {
t.Fatalf("got span context = %v, want span contexts %v", gotSC, test.injectSC)
}
})
}
Expand All @@ -102,28 +114,41 @@ func (s) TestInject(t *testing.T) {
// context is not extracted.
func (s) TestExtract(t *testing.T) {
tests := []struct {
name string
wantSC oteltrace.SpanContext // expected span context from carrier
name string
wantSC oteltrace.SpanContext // expected span context from carrier
incomingMd metadata.MD
outgoingMd metadata.MD
wantKeys []string
}{
{
name: "valid OpenTelemetry span context",
wantSC: validSpanContext.WithRemote(true),
name: "valid OpenTelemetry span context",
wantSC: validSpanContext.WithRemote(true),
incomingMd: metadata.MD{grpcTraceBinHeaderKey: []string{string(toBinary(validSpanContext.WithRemote(true)))}, "incoming-key": []string{"incoming-value"}},
outgoingMd: metadata.MD{"outgoing-key": []string{"outgoing-value"}},
wantKeys: []string{grpcTraceBinHeaderKey, "incoming-key"},
},
{
name: "invalid OpenTelemetry span context",
wantSC: oteltrace.SpanContext{},
name: "invalid OpenTelemetry span context",
wantSC: oteltrace.SpanContext{},
incomingMd: metadata.MD{grpcTraceBinHeaderKey: []string{string(toBinary(oteltrace.SpanContext{}))}, "incoming-key": []string{"incoming-value"}},
outgoingMd: metadata.MD{"outgoing-key": []string{"outgoing-value"}},
wantKeys: []string{grpcTraceBinHeaderKey, "incoming-key"},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
p := GRPCTraceBinPropagator{}
bd := toBinary(test.wantSC)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx = metadata.NewIncomingContext(ctx, test.incomingMd)
ctx = metadata.NewOutgoingContext(ctx, test.outgoingMd)

c := itracing.NewOutgoingCarrier(metadata.NewIncomingContext(ctx, metadata.MD{grpcTraceBinHeaderKey: []string{string(bd)}}))
c := itracing.NewIncomingCarrier(ctx)

if gotKeys := c.Keys(); !cmp.Equal(test.wantKeys, gotKeys, cmpopts.SortSlices(func(a, b string) bool { return a < b })) {
t.Errorf("c.Keys() = keys %v, want %v", gotKeys, test.wantKeys)
}
tCtx := p.Extract(ctx, c)
got := oteltrace.SpanContextFromContext(tCtx)
if !got.Equal(test.wantSC) {
Expand Down
111 changes: 67 additions & 44 deletions stats/opentelemetry/internal/tracing/custom_map_carrier.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,75 +23,97 @@ package tracing
import (
"context"

"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/metadata"
)

// propagationDirection specifies whether the propagation is incoming or
// outgoing.
type propagationDirection int
var logger = grpclog.Component("otel-plugin")

const (
Incoming propagationDirection = iota // Incoming propagation direction
Outgoing // Outgoing propagation direction
)

// Carrier is a TextMapCarrier that uses `context.Context` to store and
// retrieve any propagated key-value pairs in text format. The propagation
// direction (incoming or outgoing) determines which keys should the `Keys()`
// method returns.
type Carrier struct {
ctx context.Context
direction propagationDirection
}

// NewIncomingCarrier creates a new Carrier with the given context and
// incoming propagation direction.
func NewIncomingCarrier(ctx context.Context) *Carrier {
return &Carrier{ctx: ctx, direction: Incoming}
// IncomingCarrier is a TextMapCarrier that uses incoming `context.Context` to
// retrieve any propagated key-value pairs in text format.
type IncomingCarrier struct {
ctx context.Context
}

// NewOutgoingCarrier creates a new Carrier with the given context and
// outgoing propagation direction.
func NewOutgoingCarrier(ctx context.Context) *Carrier {
return &Carrier{ctx: ctx, direction: Outgoing}
// NewIncomingCarrier creates a new `IncomingCarrier` with the given context.
// The incoming carrier should be used with propagator's `Extract()` method in
// the incoming rpc path.
func NewIncomingCarrier(ctx context.Context) *IncomingCarrier {
return &IncomingCarrier{ctx: ctx}
}

// Get returns the string value associated with the passed key from the
// carrier's context metadata.
// carrier's incoming context metadata.
//
// It returns an empty string if the key is not present in the carrier's
// context or if the value associated with the key is empty.
//
// If multiple values are present for a key, it returns the last one.
func (c *Carrier) Get(key string) string {
func (c *IncomingCarrier) Get(key string) string {
values := metadata.ValueFromIncomingContext(c.ctx, key)
if len(values) == 0 {
return ""
}
return values[len(values)-1]
}

// Set stores the key-value pair in the carrier's context metadata.
// Set just logs an error. It implements the `TextMapCarrier` interface but
// should not be used with `IncomingCarrier`.
func (c *IncomingCarrier) Set(string, string) {
logger.Error("Set() should not be used with IncomingCarrier.")

Check warning on line 63 in stats/opentelemetry/internal/tracing/custom_map_carrier.go

View check run for this annotation

Codecov / codecov/patch

stats/opentelemetry/internal/tracing/custom_map_carrier.go#L62-L63

Added lines #L62 - L63 were not covered by tests
}

// Keys returns the keys stored in the carrier's context metadata. It returns
// keys from incoming context metadata.
func (c *IncomingCarrier) Keys() []string {
md, ok := metadata.FromIncomingContext(c.ctx)
if !ok {
return nil
}

Check warning on line 72 in stats/opentelemetry/internal/tracing/custom_map_carrier.go

View check run for this annotation

Codecov / codecov/patch

stats/opentelemetry/internal/tracing/custom_map_carrier.go#L71-L72

Added lines #L71 - L72 were not covered by tests
keys := make([]string, 0, len(md))
for key := range md {
keys = append(keys, key)
}
return keys
}

// Context returns the underlying context associated with the
// `IncomingCarrier“.
func (c *IncomingCarrier) Context() context.Context {
return c.ctx

Check warning on line 83 in stats/opentelemetry/internal/tracing/custom_map_carrier.go

View check run for this annotation

Codecov / codecov/patch

stats/opentelemetry/internal/tracing/custom_map_carrier.go#L82-L83

Added lines #L82 - L83 were not covered by tests
}

// OutgoingCarrier is a TextMapCarrier that uses outgoing `context.Context` to
// store any propagated key-value pairs in text format.
type OutgoingCarrier struct {
ctx context.Context
}

// NewOutgoingCarrier creates a new Carrier with the given context. The
// outgoing carrier should be used with propagator's `Inject()` method in the
// outgoing rpc path.
func NewOutgoingCarrier(ctx context.Context) *OutgoingCarrier {
return &OutgoingCarrier{ctx: ctx}
}

// Get just logs an error and returns an empty string. It implements the
// `TextMapCarrier` interface but should not be used with `OutgoingCarrier`.
func (c *OutgoingCarrier) Get(string) string {
logger.Error("Get() should not be used with `OutgoingCarrier`")
return ""

Check warning on line 103 in stats/opentelemetry/internal/tracing/custom_map_carrier.go

View check run for this annotation

Codecov / codecov/patch

stats/opentelemetry/internal/tracing/custom_map_carrier.go#L101-L103

Added lines #L101 - L103 were not covered by tests
}

// Set stores the key-value pair in the carrier's outgoing context metadata.
//
// If the key already exists, given value is appended to the last.
func (c *Carrier) Set(key, value string) {
func (c *OutgoingCarrier) Set(key, value string) {
c.ctx = metadata.AppendToOutgoingContext(c.ctx, key, value)
}

// Keys returns the keys stored in the carrier's context metadata. It returns
// keys from outgoing context metadata if propagation direction is outgoing,
// otherwise it returns keys from incoming context metadata.
func (c *Carrier) Keys() []string {
var md metadata.MD
var ok bool

switch c.direction {
case Outgoing:
md, ok = metadata.FromOutgoingContext(c.ctx)
case Incoming:
md, ok = metadata.FromIncomingContext(c.ctx)
}

// keys from outgoing context metadata.
func (c *OutgoingCarrier) Keys() []string {
md, ok := metadata.FromOutgoingContext(c.ctx)
if !ok {
return nil
}

Check warning on line 119 in stats/opentelemetry/internal/tracing/custom_map_carrier.go

View check run for this annotation

Codecov / codecov/patch

stats/opentelemetry/internal/tracing/custom_map_carrier.go#L118-L119

Added lines #L118 - L119 were not covered by tests
Expand All @@ -102,7 +124,8 @@ func (c *Carrier) Keys() []string {
return keys
}

// Context returns the underlying context associated with the Carrier.
func (c *Carrier) Context() context.Context {
// Context returns the underlying context associated with the
// `OutgoingCarrier“.
func (c *OutgoingCarrier) Context() context.Context {
return c.ctx
}
Loading

0 comments on commit 8cb8222

Please sign in to comment.