From d48b974403461efb330058f0ee80027b65453f9f Mon Sep 17 00:00:00 2001 From: Ondrej Ezr Date: Wed, 11 Oct 2023 23:17:19 +0200 Subject: [PATCH] fix(HMS-2785): pass spans properly Otel needs a root spans, if that is not passed from external service, we should start one ourselves, instead of starting span once we decide there is one needed internally. Also jobs shall retrieve these spans and not start new one. --- cmd/pbackend/api.go | 4 ++- internal/jobs/ctx.go | 1 - internal/kafka/kafka.go | 9 +---- internal/logging/ctx.go | 13 +++---- internal/middleware/correlation_id.go | 6 ++++ internal/middleware/telemetry.go | 29 +++++++++++++++ internal/middleware/trace_id.go | 35 ------------------- internal/services/aws_reservation_service.go | 1 - .../services/azure_reservation_service.go | 1 - internal/services/gcp_reservation_service.go | 1 - internal/services/noop_reservation_service.go | 1 - pkg/worker/job.go | 32 +++++++++++------ pkg/worker/memory.go | 18 +++++++--- pkg/worker/redis.go | 12 ++++++- 14 files changed, 90 insertions(+), 73 deletions(-) create mode 100644 internal/middleware/telemetry.go delete mode 100644 internal/middleware/trace_id.go diff --git a/cmd/pbackend/api.go b/cmd/pbackend/api.go index a293ac83..5c19fcbd 100644 --- a/cmd/pbackend/api.go +++ b/cmd/pbackend/api.go @@ -100,7 +100,9 @@ func api() { apiRouter.Use(telemetry.Middleware(apiRouter)) apiRouter.Use(m.VersionMiddleware) apiRouter.Use(m.CorrelationID) - apiRouter.Use(m.TraceID) + if config.Telemetry.Enabled { + apiRouter.Use(m.Telemetry) + } apiRouter.Use(m.LoggerMiddleware(&log.Logger)) // Mount paths diff --git a/internal/jobs/ctx.go b/internal/jobs/ctx.go index 2274c537..ed955797 100644 --- a/internal/jobs/ctx.go +++ b/internal/jobs/ctx.go @@ -15,7 +15,6 @@ import ( func copyContext(ctx context.Context) context.Context { nCtx := context.Background() nCtx = log.Logger.WithContext(nCtx) - nCtx = logging.WithTraceId(nCtx, logging.TraceId(ctx)) nCtx = logging.WithEdgeRequestId(nCtx, logging.EdgeRequestId(ctx)) nCtx = identity.WithAccountId(nCtx, identity.AccountId(ctx)) nCtx = logging.WithReservationId(nCtx, logging.ReservationId(ctx)) diff --git a/internal/kafka/kafka.go b/internal/kafka/kafka.go index 5a8c3400..6bebd2de 100644 --- a/internal/kafka/kafka.go +++ b/internal/kafka/kafka.go @@ -22,7 +22,6 @@ import ( "github.com/segmentio/kafka-go/sasl" "github.com/segmentio/kafka-go/sasl/plain" "github.com/segmentio/kafka-go/sasl/scram" - "go.opentelemetry.io/otel/trace" ) type kafkaBroker struct { @@ -229,14 +228,8 @@ func (b *kafkaBroker) Consume(ctx context.Context, topic string, since time.Time } else { id := identity.Identity(newCtx) - traceId := trace.SpanFromContext(ctx).SpanContext().TraceID() - if !traceId.IsValid() { - traceId = random.TraceID() - } - newCtx = logging.WithTraceId(newCtx, traceId.String()) - newLogger = newLogger. - Str("trace_id", traceId.String()). + Str("trace_id", logging.TraceId(newCtx)). Str("account_number", id.Identity.AccountNumber). Str("org_id", id.Identity.OrgID) newCtx = newLogger.Logger().WithContext(newCtx) diff --git a/internal/logging/ctx.go b/internal/logging/ctx.go index 470a4ad2..34ab8ba7 100644 --- a/internal/logging/ctx.go +++ b/internal/logging/ctx.go @@ -2,6 +2,8 @@ package logging import ( "context" + + "go.opentelemetry.io/otel/trace" ) type commonKeyId int @@ -45,16 +47,11 @@ func WithEdgeRequestId(ctx context.Context, id string) context.Context { // TraceId returns request id or an empty string when not set. func TraceId(ctx context.Context) string { - value := ctx.Value(requestIdCtxKey) - if value == nil { + value := trace.SpanContextFromContext(ctx).TraceID() + if !value.IsValid() { return "" } - return value.(string) -} - -// WithTraceId returns context copy with trace id value. -func WithTraceId(ctx context.Context, id string) context.Context { - return context.WithValue(ctx, requestIdCtxKey, id) + return value.String() } // JobId returns request id or an empty string when not set. diff --git a/internal/middleware/correlation_id.go b/internal/middleware/correlation_id.go index d62c8569..d8c2b8bb 100644 --- a/internal/middleware/correlation_id.go +++ b/internal/middleware/correlation_id.go @@ -11,6 +11,12 @@ func CorrelationID(next http.Handler) http.Handler { fn := func(w http.ResponseWriter, r *http.Request) { ctx := r.Context() + // Edge request id + edgeId := r.Header.Get("X-Rh-Edge-Request-Id") + if edgeId != "" { + ctx = logging.WithEdgeRequestId(ctx, edgeId) + } + corrId := r.Header.Get("X-Correlation-Id") if corrId != "" { ctx = logging.WithCorrelationId(ctx, corrId) diff --git a/internal/middleware/telemetry.go b/internal/middleware/telemetry.go new file mode 100644 index 00000000..f3c8e69a --- /dev/null +++ b/internal/middleware/telemetry.go @@ -0,0 +1,29 @@ +package middleware + +import ( + "net/http" + + "github.com/RHEnVision/provisioning-backend/internal/telemetry" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/trace" +) + +const TraceName = telemetry.TracePrefix + "internal/middleware" + +// Telemetry middleware starts a new telemetry span for this request, +// it tries to find the parent trace in the request, +// if none is found, it starts new root span. +func Telemetry(next http.Handler) http.Handler { + fn := func(w http.ResponseWriter, r *http.Request) { + var span trace.Span + ctx := r.Context() + + ctx, span = otel.Tracer(TraceName).Start(ctx, r.URL.Path) + + // Store TraceID in response headers for easier debugging + w.Header().Set("X-Trace-Id", span.SpanContext().TraceID().String()) + + next.ServeHTTP(w, r.WithContext(ctx)) + } + return http.HandlerFunc(fn) +} diff --git a/internal/middleware/trace_id.go b/internal/middleware/trace_id.go deleted file mode 100644 index 3b1f941c..00000000 --- a/internal/middleware/trace_id.go +++ /dev/null @@ -1,35 +0,0 @@ -package middleware - -import ( - "net/http" - - "github.com/RHEnVision/provisioning-backend/internal/logging" - "github.com/RHEnVision/provisioning-backend/internal/random" - "go.opentelemetry.io/otel/trace" -) - -func TraceID(next http.Handler) http.Handler { - fn := func(w http.ResponseWriter, r *http.Request) { - ctx := r.Context() - - // Edge request id - edgeId := r.Header.Get("X-Rh-Edge-Request-Id") - if edgeId != "" { - ctx = logging.WithEdgeRequestId(ctx, edgeId) - } - - // OpenTelemetry trace id - traceId := trace.SpanFromContext(ctx).SpanContext().TraceID() - if !traceId.IsValid() { - // OpenTelemetry library does not provide a public interface to create new IDs - traceId = random.TraceID() - } - - // Store in response headers for easier debugging - w.Header().Set("X-Trace-Id", traceId.String()) - - ctx = logging.WithTraceId(ctx, traceId.String()) - next.ServeHTTP(w, r.WithContext(ctx)) - } - return http.HandlerFunc(fn) -} diff --git a/internal/services/aws_reservation_service.go b/internal/services/aws_reservation_service.go index 38bbb7d4..a84d6ab3 100644 --- a/internal/services/aws_reservation_service.go +++ b/internal/services/aws_reservation_service.go @@ -150,7 +150,6 @@ func CreateAWSReservation(w http.ResponseWriter, r *http.Request) { launchJob := worker.Job{ Type: jobs.TypeLaunchInstanceAws, Identity: id, - TraceID: logging.TraceId(r.Context()), EdgeID: logging.EdgeRequestId(r.Context()), AccountID: accountId, Args: jobs.LaunchInstanceAWSTaskArgs{ diff --git a/internal/services/azure_reservation_service.go b/internal/services/azure_reservation_service.go index d445ab81..5d5c7772 100644 --- a/internal/services/azure_reservation_service.go +++ b/internal/services/azure_reservation_service.go @@ -144,7 +144,6 @@ func CreateAzureReservation(w http.ResponseWriter, r *http.Request) { launchJob := worker.Job{ Type: jobs.TypeLaunchInstanceAzure, Identity: identity.Identity(r.Context()), - TraceID: logging.TraceId(r.Context()), EdgeID: logging.EdgeRequestId(r.Context()), AccountID: identity.AccountId(r.Context()), Args: jobs.LaunchInstanceAzureTaskArgs{ diff --git a/internal/services/gcp_reservation_service.go b/internal/services/gcp_reservation_service.go index 7e0f6a38..5a89c668 100644 --- a/internal/services/gcp_reservation_service.go +++ b/internal/services/gcp_reservation_service.go @@ -142,7 +142,6 @@ func CreateGCPReservation(w http.ResponseWriter, r *http.Request) { launchJob := worker.Job{ Type: jobs.TypeLaunchInstanceGcp, AccountID: accountId, - TraceID: logging.TraceId(r.Context()), EdgeID: logging.EdgeRequestId(r.Context()), Identity: id, Args: jobs.LaunchInstanceGCPTaskArgs{ diff --git a/internal/services/noop_reservation_service.go b/internal/services/noop_reservation_service.go index 20dba764..3d389cd8 100644 --- a/internal/services/noop_reservation_service.go +++ b/internal/services/noop_reservation_service.go @@ -46,7 +46,6 @@ func CreateNoopReservation(w http.ResponseWriter, r *http.Request) { Type: jobs.TypeNoop, AccountID: accountId, Identity: identity, - TraceID: logging.TraceId(r.Context()), EdgeID: logging.EdgeRequestId(r.Context()), Args: jobs.NoopJobArgs{ ReservationID: reservation.ID, diff --git a/pkg/worker/job.go b/pkg/worker/job.go index 40f6d257..80c8b5fe 100644 --- a/pkg/worker/job.go +++ b/pkg/worker/job.go @@ -4,11 +4,14 @@ import ( "context" "errors" + "github.com/RHEnVision/provisioning-backend/internal/config" "github.com/RHEnVision/provisioning-backend/internal/identity" "github.com/RHEnVision/provisioning-backend/internal/logging" - "github.com/RHEnVision/provisioning-backend/internal/ptr" "github.com/google/uuid" "github.com/rs/zerolog" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/trace" ) func init() { @@ -34,7 +37,7 @@ type Job struct { Identity identity.Principal // For logging purposes - TraceID string + TraceContext propagation.MapCarrier // For logging purposes EdgeID string @@ -79,30 +82,37 @@ type Stats struct { InFlight int64 } -func contextLogger(origCtx context.Context, job *Job) (context.Context, *zerolog.Logger) { +func initJobContext(origCtx context.Context, job *Job) (context.Context, *zerolog.Logger, trace.Span) { + // init with invalid span + span := trace.SpanFromContext(nil) + if job == nil { zerolog.Ctx(origCtx).Warn().Err(ErrJobNotFound).Msg("No job, context not changed") - return origCtx, nil + return origCtx, nil, span } ctx := logging.WithJobId(origCtx, job.ID.String()) ctx = identity.WithIdentity(ctx, job.Identity) - ctx = logging.WithTraceId(ctx, job.TraceID) ctx = logging.WithEdgeRequestId(ctx, job.EdgeID) ctx = identity.WithAccountId(ctx, job.AccountID) ctx = logging.WithJobId(ctx, job.ID.String()) ctx = logging.WithJobType(ctx, job.Type.String()) - logger := zerolog.Ctx(ctx) - logger = ptr.To(logger.With(). + logCtx := zerolog.Ctx(ctx).With(). Int64("account_id", job.AccountID). Str("org_id", job.Identity.Identity.OrgID). Str("account_number", job.Identity.Identity.AccountNumber). - Str("trace_id", job.TraceID). Str("request_id", job.EdgeID). Str("job_id", job.ID.String()). Str("job_type", job.Type.String()). - Interface("job_args", job.Args). - Logger()) - return logger.WithContext(ctx), logger + Interface("job_args", job.Args) + + if config.Telemetry.Enabled { + ctx = otel.GetTextMapPropagator().Extract(ctx, job.TraceContext) + ctx, span = otel.Tracer("any").Start(ctx, job.Type.String()) + logCtx = logCtx.Str("trace_id", span.SpanContext().TraceID().String()) + } + logger := logCtx.Logger() + + return logger.WithContext(ctx), &logger, span } diff --git a/pkg/worker/memory.go b/pkg/worker/memory.go index 0be8cb2b..3167520b 100644 --- a/pkg/worker/memory.go +++ b/pkg/worker/memory.go @@ -7,6 +7,8 @@ import ( "github.com/RHEnVision/provisioning-backend/internal/config" "github.com/google/uuid" "github.com/rs/zerolog" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/codes" ) type MemoryWorker struct { @@ -38,6 +40,11 @@ func (w *MemoryWorker) Enqueue(ctx context.Context, job *Job) error { } } + if config.Telemetry.Enabled { + job.TraceContext = make(map[string]string) + otel.GetTextMapPropagator().Inject(ctx, job.TraceContext) + } + w.todo <- job return nil } @@ -57,19 +64,22 @@ func (w *MemoryWorker) dequeueLoop(ctx context.Context) { } } -func (w *MemoryWorker) processJob(ctx context.Context, job *Job) { +func (w *MemoryWorker) processJob(origCtx context.Context, job *Job) { if job == nil { - zerolog.Ctx(ctx).Error().Err(ErrJobNotFound).Msg("No job to process") + zerolog.Ctx(origCtx).Error().Err(ErrJobNotFound).Msg("No job to process") return } + ctx, logger, span := initJobContext(origCtx, job) + defer span.End() + if h, ok := w.handlers[job.Type]; ok { - ctx, _ = contextLogger(ctx, job) cCtx, cFunc := context.WithTimeout(ctx, config.Worker.Timeout) defer cFunc() h(cCtx, job) } else { - zerolog.Ctx(ctx).Warn().Msgf("Memory worker handler not found for job type: %s", job.Type) + span.SetStatus(codes.Error, "worker has not found handler for a job type") + logger.Warn().Msgf("Memory worker handler not found for job type: %s", job.Type) } } diff --git a/pkg/worker/redis.go b/pkg/worker/redis.go index 133a5f26..20ed9f29 100644 --- a/pkg/worker/redis.go +++ b/pkg/worker/redis.go @@ -18,6 +18,8 @@ import ( "github.com/google/uuid" "github.com/redis/go-redis/v9" "github.com/rs/zerolog" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/codes" ) type RedisWorker struct { @@ -92,6 +94,12 @@ func (w *RedisWorker) Enqueue(ctx context.Context, job *Job) error { var buffer bytes.Buffer enc := gob.NewEncoder(&buffer) + + if config.Telemetry.Enabled { + job.TraceContext = make(map[string]string) + otel.GetTextMapPropagator().Inject(ctx, job.TraceContext) + } + err = enc.Encode(&job) if err != nil { return fmt.Errorf("unable to encode args: %w", err) @@ -199,7 +207,8 @@ func (w *RedisWorker) processJob(origCtx context.Context, job *Job) { } defer atomic.AddInt64(&w.inFlight, -1) - ctx, logger := contextLogger(origCtx, job) + ctx, logger, span := initJobContext(origCtx, job) + defer span.End() defer recoverAndLog(ctx) logger.Info().Msgf("Dequeued job %s %s from Redis", job.Type.String(), job.ID) @@ -216,6 +225,7 @@ func (w *RedisWorker) processJob(origCtx context.Context, job *Job) { }) } else { // handler not found + span.SetStatus(codes.Error, "worker has not found handler for a job type") zerolog.Ctx(ctx).Warn().Msgf("Redis worker handler not found for job type: %s", job.Type) } }