Skip to content

Commit

Permalink
fix(HMS-2785): pass spans properly
Browse files Browse the repository at this point in the history
Otel needs a root spans, if that is not passed from external service,
we should start one ourselves, instead of starting span once we decide there is one needed internally.
Also jobs shall retrieve these spans and not start new one.
  • Loading branch information
ezr-ondrej committed Oct 13, 2023
1 parent c0b85a0 commit d963b9b
Show file tree
Hide file tree
Showing 14 changed files with 90 additions and 74 deletions.
4 changes: 3 additions & 1 deletion cmd/pbackend/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ func api() {
apiRouter.Use(telemetry.Middleware(apiRouter))
apiRouter.Use(m.VersionMiddleware)
apiRouter.Use(m.CorrelationID)
apiRouter.Use(m.TraceID)
if config.Telemetry.Enabled {
apiRouter.Use(m.Telemetry)
}
apiRouter.Use(m.LoggerMiddleware(&log.Logger))

// Mount paths
Expand Down
1 change: 0 additions & 1 deletion internal/jobs/ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
func copyContext(ctx context.Context) context.Context {
nCtx := context.Background()
nCtx = log.Logger.WithContext(nCtx)
nCtx = logging.WithTraceId(nCtx, logging.TraceId(ctx))
nCtx = logging.WithEdgeRequestId(nCtx, logging.EdgeRequestId(ctx))
nCtx = identity.WithAccountId(nCtx, identity.AccountId(ctx))
nCtx = logging.WithReservationId(nCtx, logging.ReservationId(ctx))
Expand Down
9 changes: 1 addition & 8 deletions internal/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/segmentio/kafka-go/sasl"
"github.com/segmentio/kafka-go/sasl/plain"
"github.com/segmentio/kafka-go/sasl/scram"
"go.opentelemetry.io/otel/trace"
)

type kafkaBroker struct {
Expand Down Expand Up @@ -229,14 +228,8 @@ func (b *kafkaBroker) Consume(ctx context.Context, topic string, since time.Time
} else {
id := identity.Identity(newCtx)

traceId := trace.SpanFromContext(ctx).SpanContext().TraceID()
if !traceId.IsValid() {
traceId = random.TraceID()
}
newCtx = logging.WithTraceId(newCtx, traceId.String())

newLogger = newLogger.
Str("trace_id", traceId.String()).
Str("trace_id", logging.TraceId(newCtx)).
Str("account_number", id.Identity.AccountNumber).
Str("org_id", id.Identity.OrgID)
newCtx = newLogger.Logger().WithContext(newCtx)
Expand Down
14 changes: 5 additions & 9 deletions internal/logging/ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package logging

import (
"context"

"go.opentelemetry.io/otel/trace"
)

type commonKeyId int

const (
requestIdCtxKey commonKeyId = iota
edgeRequestIdCtxKey commonKeyId = iota
correlationCtxKey commonKeyId = iota
jobIdCtxKey commonKeyId = iota
Expand Down Expand Up @@ -45,16 +46,11 @@ func WithEdgeRequestId(ctx context.Context, id string) context.Context {

// TraceId returns request id or an empty string when not set.
func TraceId(ctx context.Context) string {
value := ctx.Value(requestIdCtxKey)
if value == nil {
value := trace.SpanContextFromContext(ctx).TraceID()
if !value.IsValid() {
return ""
}
return value.(string)
}

// WithTraceId returns context copy with trace id value.
func WithTraceId(ctx context.Context, id string) context.Context {
return context.WithValue(ctx, requestIdCtxKey, id)
return value.String()
}

// JobId returns request id or an empty string when not set.
Expand Down
6 changes: 6 additions & 0 deletions internal/middleware/correlation_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ func CorrelationID(next http.Handler) http.Handler {
fn := func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()

// Edge request id
edgeId := r.Header.Get("X-Rh-Edge-Request-Id")
if edgeId != "" {
ctx = logging.WithEdgeRequestId(ctx, edgeId)
}

corrId := r.Header.Get("X-Correlation-Id")
if corrId != "" {
ctx = logging.WithCorrelationId(ctx, corrId)
Expand Down
29 changes: 29 additions & 0 deletions internal/middleware/telemetry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package middleware

import (
"net/http"

"github.com/RHEnVision/provisioning-backend/internal/telemetry"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)

const TraceName = telemetry.TracePrefix + "internal/middleware"

// Telemetry middleware starts a new telemetry span for this request,
// it tries to find the parent trace in the request,
// if none is found, it starts new root span.
func Telemetry(next http.Handler) http.Handler {
fn := func(w http.ResponseWriter, r *http.Request) {
var span trace.Span
ctx := r.Context()

ctx, span = otel.Tracer(TraceName).Start(ctx, r.URL.Path)

// Store TraceID in response headers for easier debugging
w.Header().Set("X-Trace-Id", span.SpanContext().TraceID().String())

next.ServeHTTP(w, r.WithContext(ctx))
}
return http.HandlerFunc(fn)
}
35 changes: 0 additions & 35 deletions internal/middleware/trace_id.go

This file was deleted.

1 change: 0 additions & 1 deletion internal/services/aws_reservation_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ func CreateAWSReservation(w http.ResponseWriter, r *http.Request) {
launchJob := worker.Job{
Type: jobs.TypeLaunchInstanceAws,
Identity: id,
TraceID: logging.TraceId(r.Context()),
EdgeID: logging.EdgeRequestId(r.Context()),
AccountID: accountId,
Args: jobs.LaunchInstanceAWSTaskArgs{
Expand Down
1 change: 0 additions & 1 deletion internal/services/azure_reservation_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ func CreateAzureReservation(w http.ResponseWriter, r *http.Request) {
launchJob := worker.Job{
Type: jobs.TypeLaunchInstanceAzure,
Identity: identity.Identity(r.Context()),
TraceID: logging.TraceId(r.Context()),
EdgeID: logging.EdgeRequestId(r.Context()),
AccountID: identity.AccountId(r.Context()),
Args: jobs.LaunchInstanceAzureTaskArgs{
Expand Down
1 change: 0 additions & 1 deletion internal/services/gcp_reservation_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ func CreateGCPReservation(w http.ResponseWriter, r *http.Request) {
launchJob := worker.Job{
Type: jobs.TypeLaunchInstanceGcp,
AccountID: accountId,
TraceID: logging.TraceId(r.Context()),
EdgeID: logging.EdgeRequestId(r.Context()),
Identity: id,
Args: jobs.LaunchInstanceGCPTaskArgs{
Expand Down
1 change: 0 additions & 1 deletion internal/services/noop_reservation_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ func CreateNoopReservation(w http.ResponseWriter, r *http.Request) {
Type: jobs.TypeNoop,
AccountID: accountId,
Identity: identity,
TraceID: logging.TraceId(r.Context()),
EdgeID: logging.EdgeRequestId(r.Context()),
Args: jobs.NoopJobArgs{
ReservationID: reservation.ID,
Expand Down
32 changes: 21 additions & 11 deletions pkg/worker/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ import (
"context"
"errors"

"github.com/RHEnVision/provisioning-backend/internal/config"
"github.com/RHEnVision/provisioning-backend/internal/identity"
"github.com/RHEnVision/provisioning-backend/internal/logging"
"github.com/RHEnVision/provisioning-backend/internal/ptr"
"github.com/google/uuid"
"github.com/rs/zerolog"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
)

func init() {
Expand All @@ -34,7 +37,7 @@ type Job struct {
Identity identity.Principal

// For logging purposes
TraceID string
TraceContext propagation.MapCarrier

// For logging purposes
EdgeID string
Expand Down Expand Up @@ -79,30 +82,37 @@ type Stats struct {
InFlight int64
}

func contextLogger(origCtx context.Context, job *Job) (context.Context, *zerolog.Logger) {
func initJobContext(origCtx context.Context, job *Job) (context.Context, *zerolog.Logger, trace.Span) {
// init with invalid span
span := trace.SpanFromContext(nil)

Check failure on line 87 in pkg/worker/job.go

View workflow job for this annotation

GitHub Actions / 🎯 Go linter

SA1012: do not pass a nil Context, even if a function permits it; pass context.TODO if you are unsure about which Context to use (staticcheck)

if job == nil {
zerolog.Ctx(origCtx).Warn().Err(ErrJobNotFound).Msg("No job, context not changed")
return origCtx, nil
return origCtx, nil, span
}

ctx := logging.WithJobId(origCtx, job.ID.String())
ctx = identity.WithIdentity(ctx, job.Identity)
ctx = logging.WithTraceId(ctx, job.TraceID)
ctx = logging.WithEdgeRequestId(ctx, job.EdgeID)
ctx = identity.WithAccountId(ctx, job.AccountID)
ctx = logging.WithJobId(ctx, job.ID.String())
ctx = logging.WithJobType(ctx, job.Type.String())

logger := zerolog.Ctx(ctx)
logger = ptr.To(logger.With().
logCtx := zerolog.Ctx(ctx).With().
Int64("account_id", job.AccountID).
Str("org_id", job.Identity.Identity.OrgID).
Str("account_number", job.Identity.Identity.AccountNumber).
Str("trace_id", job.TraceID).
Str("request_id", job.EdgeID).
Str("job_id", job.ID.String()).
Str("job_type", job.Type.String()).
Interface("job_args", job.Args).
Logger())
return logger.WithContext(ctx), logger
Interface("job_args", job.Args)

if config.Telemetry.Enabled {
ctx = otel.GetTextMapPropagator().Extract(ctx, job.TraceContext)
ctx, span = otel.Tracer("any").Start(ctx, job.Type.String())
logCtx = logCtx.Str("trace_id", span.SpanContext().TraceID().String())
}
logger := logCtx.Logger()

return logger.WithContext(ctx), &logger, span
}
18 changes: 14 additions & 4 deletions pkg/worker/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"github.com/RHEnVision/provisioning-backend/internal/config"
"github.com/google/uuid"
"github.com/rs/zerolog"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/codes"
)

type MemoryWorker struct {
Expand Down Expand Up @@ -38,6 +40,11 @@ func (w *MemoryWorker) Enqueue(ctx context.Context, job *Job) error {
}
}

if config.Telemetry.Enabled {
job.TraceContext = make(map[string]string)
otel.GetTextMapPropagator().Inject(ctx, job.TraceContext)
}

w.todo <- job
return nil
}
Expand All @@ -57,19 +64,22 @@ func (w *MemoryWorker) dequeueLoop(ctx context.Context) {
}
}

func (w *MemoryWorker) processJob(ctx context.Context, job *Job) {
func (w *MemoryWorker) processJob(origCtx context.Context, job *Job) {
if job == nil {
zerolog.Ctx(ctx).Error().Err(ErrJobNotFound).Msg("No job to process")
zerolog.Ctx(origCtx).Error().Err(ErrJobNotFound).Msg("No job to process")
return
}

ctx, logger, span := initJobContext(origCtx, job)
defer span.End()

if h, ok := w.handlers[job.Type]; ok {
ctx, _ = contextLogger(ctx, job)
cCtx, cFunc := context.WithTimeout(ctx, config.Worker.Timeout)
defer cFunc()
h(cCtx, job)
} else {
zerolog.Ctx(ctx).Warn().Msgf("Memory worker handler not found for job type: %s", job.Type)
span.SetStatus(codes.Error, "worker has not found handler for a job type")
logger.Warn().Msgf("Memory worker handler not found for job type: %s", job.Type)
}
}

Expand Down
12 changes: 11 additions & 1 deletion pkg/worker/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"github.com/google/uuid"
"github.com/redis/go-redis/v9"
"github.com/rs/zerolog"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/codes"
)

type RedisWorker struct {
Expand Down Expand Up @@ -92,6 +94,12 @@ func (w *RedisWorker) Enqueue(ctx context.Context, job *Job) error {

var buffer bytes.Buffer
enc := gob.NewEncoder(&buffer)

if config.Telemetry.Enabled {
job.TraceContext = make(map[string]string)
otel.GetTextMapPropagator().Inject(ctx, job.TraceContext)
}

err = enc.Encode(&job)
if err != nil {
return fmt.Errorf("unable to encode args: %w", err)
Expand Down Expand Up @@ -199,7 +207,8 @@ func (w *RedisWorker) processJob(origCtx context.Context, job *Job) {
}
defer atomic.AddInt64(&w.inFlight, -1)

ctx, logger := contextLogger(origCtx, job)
ctx, logger, span := initJobContext(origCtx, job)
defer span.End()
defer recoverAndLog(ctx)
logger.Info().Msgf("Dequeued job %s %s from Redis", job.Type.String(), job.ID)

Expand All @@ -216,6 +225,7 @@ func (w *RedisWorker) processJob(origCtx context.Context, job *Job) {
})
} else {
// handler not found
span.SetStatus(codes.Error, "worker has not found handler for a job type")
zerolog.Ctx(ctx).Warn().Msgf("Redis worker handler not found for job type: %s", job.Type)
}
}
Expand Down

0 comments on commit d963b9b

Please sign in to comment.