Skip to content

Commit

Permalink
fix(HMS-2785): pass otel spans properly
Browse files Browse the repository at this point in the history
Otel needs a root spans, if that is not passed from external service,
we should start one ourselves, instead of starting span once we decide there is one needed internally.
Also jobs and kafka shall retrieve these spans and continue, not start new spans.
  • Loading branch information
ezr-ondrej authored and lzap committed Oct 16, 2023
1 parent c806fe9 commit 0f3abc7
Show file tree
Hide file tree
Showing 16 changed files with 167 additions and 113 deletions.
4 changes: 3 additions & 1 deletion cmd/pbackend/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ func api() {
apiRouter.Use(telemetry.Middleware(apiRouter))
apiRouter.Use(m.VersionMiddleware)
apiRouter.Use(m.CorrelationID)
apiRouter.Use(m.TraceID)
if config.Telemetry.Enabled {
apiRouter.Use(m.Telemetry)
}
apiRouter.Use(m.LoggerMiddleware(&log.Logger))

// Mount paths
Expand Down
1 change: 0 additions & 1 deletion internal/jobs/ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
func copyContext(ctx context.Context) context.Context {
nCtx := context.Background()
nCtx = log.Logger.WithContext(nCtx)
nCtx = logging.WithTraceId(nCtx, logging.TraceId(ctx))
nCtx = logging.WithEdgeRequestId(nCtx, logging.EdgeRequestId(ctx))
nCtx = identity.WithAccountId(nCtx, identity.AccountId(ctx))
nCtx = logging.WithReservationId(nCtx, logging.ReservationId(ctx))
Expand Down
26 changes: 0 additions & 26 deletions internal/kafka/avail_status_msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"context"
"encoding/json"
"fmt"

"github.com/RHEnVision/provisioning-backend/internal/identity"
)

type AvailabilityStatusMessage struct {
Expand All @@ -25,27 +23,3 @@ func NewAvailabilityStatusMessage(msg *GenericMessage) (*AvailabilityStatusMessa
func (m AvailabilityStatusMessage) GenericMessage(ctx context.Context) (GenericMessage, error) {
return genericMessage(ctx, m, m.SourceID, AvailabilityStatusRequestTopic)
}

func genericMessage(ctx context.Context, m any, key string, topic string) (GenericMessage, error) {
payload, err := json.Marshal(m)
if err != nil {
return GenericMessage{}, fmt.Errorf("unable to marshal message: %w", err)
}

// This will panic when identity was not present in the context (no error handling possible)
id := identity.Identity(ctx)

return GenericMessage{
Topic: topic,
Key: []byte(key),
Value: payload,
// Keep headers written in lowercase to match sources comparison.
Headers: GenericHeaders(
"content-type", "application/json",
"x-rh-identity", identity.IdentityHeader(ctx),
"x-rh-sources-org-id", id.Identity.OrgID,
"x-rh-sources-account-number", id.Identity.AccountNumber,
"event_type", "availability_status",
),
}, nil
}
51 changes: 30 additions & 21 deletions internal/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@ import (

"github.com/RHEnVision/provisioning-backend/internal/config"
"github.com/RHEnVision/provisioning-backend/internal/identity"
"github.com/RHEnVision/provisioning-backend/internal/logging"
"github.com/RHEnVision/provisioning-backend/internal/random"
"github.com/RHEnVision/provisioning-backend/internal/telemetry"
"github.com/RHEnVision/provisioning-backend/internal/version"
"github.com/rs/zerolog"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/protocol"
"github.com/segmentio/kafka-go/sasl"
"github.com/segmentio/kafka-go/sasl/plain"
"github.com/segmentio/kafka-go/sasl/scram"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
)

Expand Down Expand Up @@ -200,12 +202,13 @@ func (b *kafkaBroker) Consume(ctx context.Context, topic string, since time.Time
}
}()

err := r.SetOffsetAt(ctx, since)
if err != nil {
logger.Warn().Err(err).Msg("Unable to set initial offset")
offsetErr := r.SetOffsetAt(ctx, since)
if offsetErr != nil {
logger.Warn().Err(offsetErr).Msg("Unable to set initial offset")
}

for {
var span trace.Span
msg, err := r.ReadMessage(ctx)
if err != nil && errors.Is(err, io.EOF) {
logger.Warn().Err(err).Msg("Kafka receiver has been closed")
Expand All @@ -220,29 +223,35 @@ func (b *kafkaBroker) Consume(ctx context.Context, topic string, since time.Time
msg.Key, msg.Topic, msg.Offset, msg.Partition)

// build new context - identity and trace id
newLogger := logger.With().Str("msg_id", random.TraceID().String())
newCtx, err := identity.WithIdentityFrom64(ctx, header("X-RH-Identity", msg.Headers))
if err != nil {
errLogger := newLogger.Logger()
errLogger.Warn().Err(err).Msgf("Could not extract identity from context to Kafka message")
newCtx = errLogger.WithContext(ctx)
logCtx := logger.With().Str("msg_id", random.TraceID().String())
newCtx, msgErr := identity.WithIdentityFrom64(ctx, header("X-RH-Identity", msg.Headers))
if msgErr != nil {
errLogger := logCtx.Logger()
errLogger.Warn().Err(msgErr).Msgf("Could not extract identity from context to Kafka message")
} else {
id := identity.Identity(newCtx)

traceId := trace.SpanFromContext(ctx).SpanContext().TraceID()
if !traceId.IsValid() {
traceId = random.TraceID()
}
newCtx = logging.WithTraceId(newCtx, traceId.String())

newLogger = newLogger.
Str("trace_id", traceId.String()).
logCtx = logCtx.
Str("account_number", id.Identity.AccountNumber).
Str("org_id", id.Identity.OrgID)
newCtx = newLogger.Logger().WithContext(newCtx)
}

handler(newCtx, NewMessageFromKafka(&msg))
gMsg := NewMessageFromKafka(&msg)

if config.Telemetry.Enabled {
newCtx = otel.GetTextMapPropagator().Extract(newCtx, propagation.MapCarrier(headersMap(gMsg.Headers)))
newCtx, span = otel.Tracer(telemetry.TracePrefix+"kafka").Start(newCtx, fmt.Sprintf("Processing message on topic %s", topic))

logCtx.Str("trace_id", span.SpanContext().TraceID().String())
} else {
// noopSpan from empty context
span = trace.SpanFromContext(context.Background())
}

newCtx = logCtx.Logger().WithContext(newCtx)

handler(newCtx, gMsg)

span.End()
}
}
}
Expand Down
47 changes: 47 additions & 0 deletions internal/kafka/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,15 @@ package kafka

import (
"context"
"encoding/json"
"fmt"
"strings"

"github.com/RHEnVision/provisioning-backend/internal/config"
"github.com/RHEnVision/provisioning-backend/internal/identity"
"github.com/segmentio/kafka-go"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
)

// GenericMessage is a platform independent message.
Expand Down Expand Up @@ -98,3 +104,44 @@ func (m GenericMessage) Header(name string) string {
}
return ""
}

func genericMessage(ctx context.Context, m any, key string, topic string) (GenericMessage, error) {
payload, err := json.Marshal(m)
if err != nil {
return GenericMessage{}, fmt.Errorf("unable to marshal message: %w", err)
}

// This will panic when identity was not present in the context (no error handling possible)
id := identity.Identity(ctx)
// Keep headers written in lowercase to match sources comparison.
headers := GenericHeaders(
"content-type", "application/json",
"x-rh-identity", identity.IdentityHeader(ctx),
"x-rh-sources-org-id", id.Identity.OrgID,
"x-rh-sources-account-number", id.Identity.AccountNumber,
"event_type", "availability_status",
)

if config.Telemetry.Enabled {
var traceHeaders propagation.MapCarrier = make(map[string]string)
otel.GetTextMapPropagator().Inject(ctx, traceHeaders)
for name, value := range traceHeaders {
headers = append(headers, GenericHeader{Key: name, Value: value})
}
}

return GenericMessage{
Topic: topic,
Key: []byte(key),
Value: payload,
Headers: headers,
}, nil
}

func headersMap(headers []GenericHeader) map[string]string {
hMap := make(map[string]string, len(headers))
for _, genericHeader := range headers {
hMap[genericHeader.Key] = genericHeader.Value
}
return hMap
}
14 changes: 5 additions & 9 deletions internal/logging/ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package logging

import (
"context"

"go.opentelemetry.io/otel/trace"
)

type commonKeyId int

const (
requestIdCtxKey commonKeyId = iota
edgeRequestIdCtxKey commonKeyId = iota
correlationCtxKey commonKeyId = iota
jobIdCtxKey commonKeyId = iota
Expand Down Expand Up @@ -45,16 +46,11 @@ func WithEdgeRequestId(ctx context.Context, id string) context.Context {

// TraceId returns request id or an empty string when not set.
func TraceId(ctx context.Context) string {
value := ctx.Value(requestIdCtxKey)
if value == nil {
value := trace.SpanContextFromContext(ctx).TraceID()
if !value.IsValid() {
return ""
}
return value.(string)
}

// WithTraceId returns context copy with trace id value.
func WithTraceId(ctx context.Context, id string) context.Context {
return context.WithValue(ctx, requestIdCtxKey, id)
return value.String()
}

// JobId returns request id or an empty string when not set.
Expand Down
6 changes: 6 additions & 0 deletions internal/middleware/correlation_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ func CorrelationID(next http.Handler) http.Handler {
fn := func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()

// Edge request id
edgeId := r.Header.Get("X-Rh-Edge-Request-Id")
if edgeId != "" {
ctx = logging.WithEdgeRequestId(ctx, edgeId)
}

corrId := r.Header.Get("X-Correlation-Id")
if corrId != "" {
ctx = logging.WithCorrelationId(ctx, corrId)
Expand Down
29 changes: 29 additions & 0 deletions internal/middleware/telemetry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package middleware

import (
"net/http"

"github.com/RHEnVision/provisioning-backend/internal/telemetry"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)

const TraceName = telemetry.TracePrefix + "internal/middleware"

// Telemetry middleware starts a new telemetry span for this request,
// it tries to find the parent trace in the request,
// if none is found, it starts new root span.
func Telemetry(next http.Handler) http.Handler {
fn := func(w http.ResponseWriter, r *http.Request) {
var span trace.Span
ctx := r.Context()

ctx, span = otel.Tracer(TraceName).Start(ctx, r.URL.Path)

// Store TraceID in response headers for easier debugging
w.Header().Set("X-Trace-Id", span.SpanContext().TraceID().String())

next.ServeHTTP(w, r.WithContext(ctx))
}
return http.HandlerFunc(fn)
}
35 changes: 0 additions & 35 deletions internal/middleware/trace_id.go

This file was deleted.

1 change: 0 additions & 1 deletion internal/services/aws_reservation_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ func CreateAWSReservation(w http.ResponseWriter, r *http.Request) {
launchJob := worker.Job{
Type: jobs.TypeLaunchInstanceAws,
Identity: id,
TraceID: logging.TraceId(r.Context()),
EdgeID: logging.EdgeRequestId(r.Context()),
AccountID: accountId,
Args: jobs.LaunchInstanceAWSTaskArgs{
Expand Down
1 change: 0 additions & 1 deletion internal/services/azure_reservation_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ func CreateAzureReservation(w http.ResponseWriter, r *http.Request) {
launchJob := worker.Job{
Type: jobs.TypeLaunchInstanceAzure,
Identity: identity.Identity(r.Context()),
TraceID: logging.TraceId(r.Context()),
EdgeID: logging.EdgeRequestId(r.Context()),
AccountID: identity.AccountId(r.Context()),
Args: jobs.LaunchInstanceAzureTaskArgs{
Expand Down
1 change: 0 additions & 1 deletion internal/services/gcp_reservation_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ func CreateGCPReservation(w http.ResponseWriter, r *http.Request) {
launchJob := worker.Job{
Type: jobs.TypeLaunchInstanceGcp,
AccountID: accountId,
TraceID: logging.TraceId(r.Context()),
EdgeID: logging.EdgeRequestId(r.Context()),
Identity: id,
Args: jobs.LaunchInstanceGCPTaskArgs{
Expand Down
1 change: 0 additions & 1 deletion internal/services/noop_reservation_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ func CreateNoopReservation(w http.ResponseWriter, r *http.Request) {
Type: jobs.TypeNoop,
AccountID: accountId,
Identity: identity,
TraceID: logging.TraceId(r.Context()),
EdgeID: logging.EdgeRequestId(r.Context()),
Args: jobs.NoopJobArgs{
ReservationID: reservation.ID,
Expand Down
Loading

0 comments on commit 0f3abc7

Please sign in to comment.