From 80a8e3d9725b663d5f0508c8c1641f745f56bea6 Mon Sep 17 00:00:00 2001 From: Ondrej Ezr Date: Fri, 13 Oct 2023 18:13:41 +0200 Subject: [PATCH] chore: Sanitize job logging Cleans up job logs. Removes job_args from most of the job logs except Enqueue and Dequeue. Removes double logging of job id and type. --- pkg/worker/job.go | 3 +-- pkg/worker/memory.go | 8 ++++++++ pkg/worker/redis.go | 39 +++++++++++++++++++++------------------ 3 files changed, 30 insertions(+), 20 deletions(-) diff --git a/pkg/worker/job.go b/pkg/worker/job.go index 4c17474d..8e9f4815 100644 --- a/pkg/worker/job.go +++ b/pkg/worker/job.go @@ -105,8 +105,7 @@ func initJobContext(origCtx context.Context, job *Job) (context.Context, *zerolo Str("account_number", job.Identity.Identity.AccountNumber). Str("request_id", job.EdgeID). Str("job_id", job.ID.String()). - Str("job_type", job.Type.String()). - Interface("job_args", job.Args) + Str("job_type", job.Type.String()) if config.Telemetry.Enabled { ctx = otel.GetTextMapPropagator().Extract(ctx, job.TraceContext) diff --git a/pkg/worker/memory.go b/pkg/worker/memory.go index 3167520b..55613629 100644 --- a/pkg/worker/memory.go +++ b/pkg/worker/memory.go @@ -33,9 +33,16 @@ func (w *MemoryWorker) Enqueue(ctx context.Context, job *Job) error { return fmt.Errorf("unable to enqueue job: %w", ErrJobNotFound) } + logger := zerolog.Ctx(ctx).With(). + Str("job_id", job.ID.String()). + Str("job_type", job.Type.String()). + Logger() + logger.Info().Interface("job_args", job.Args).Msg("Enqueuing job via memory") + if job.ID == uuid.Nil { job.ID, err = uuid.NewRandom() if err != nil { + logger.Error().Err(err).Msg("Unable to generate a job id") return fmt.Errorf("unable to generate UUID: %w", err) } } @@ -72,6 +79,7 @@ func (w *MemoryWorker) processJob(origCtx context.Context, job *Job) { ctx, logger, span := initJobContext(origCtx, job) defer span.End() + logger.Info().Interface("job_args", job.Args).Msgf("Dequeued job from memory") if h, ok := w.handlers[job.Type]; ok { cCtx, cFunc := context.WithTimeout(ctx, config.Worker.Timeout) diff --git a/pkg/worker/redis.go b/pkg/worker/redis.go index 20ed9f29..031a67f2 100644 --- a/pkg/worker/redis.go +++ b/pkg/worker/redis.go @@ -14,7 +14,6 @@ import ( "github.com/RHEnVision/provisioning-backend/internal/config" "github.com/RHEnVision/provisioning-backend/internal/metrics" - "github.com/RHEnVision/provisioning-backend/internal/ptr" "github.com/google/uuid" "github.com/redis/go-redis/v9" "github.com/rs/zerolog" @@ -78,20 +77,20 @@ func (w *RedisWorker) Enqueue(ctx context.Context, job *Job) error { return fmt.Errorf("unable to enqueue job: %w", ErrJobNotFound) } + logger := zerolog.Ctx(ctx).With(). + Str("job_id", job.ID.String()). + Str("job_type", job.Type.String()). + Logger() + logger.Info().Interface("job_args", job.Args).Msg("Enqueuing job via Redis") + if job.ID == uuid.Nil { job.ID, err = uuid.NewRandom() if err != nil { + logger.Error().Err(err).Msg("Unable to generate a job id") return fmt.Errorf("unable to generate UUID: %w", err) } } - logger := ptr.To(zerolog.Ctx(ctx).With(). - Str("job_id", job.ID.String()). - Str("job_type", job.Type.String()). - Interface("job_args", job.Args). - Logger()) - logger.Info().Msgf("Enqueuing job type %s via Redis", job.Type) - var buffer bytes.Buffer enc := gob.NewEncoder(&buffer) @@ -102,20 +101,22 @@ func (w *RedisWorker) Enqueue(ctx context.Context, job *Job) error { err = enc.Encode(&job) if err != nil { + logger.Error().Err(err).Msg("Unable to encode the job") return fmt.Errorf("unable to encode args: %w", err) } cmd := w.client.LPush(ctx, w.queueName, buffer.Bytes()) if cmd.Err() != nil { - logger.Error().Err(err).Msg("Unable to push job into Redis") + logger.Error().Err(cmd.Err()).Msg("Unable to push job into Redis") return fmt.Errorf("unable to push job into Redis: %w", cmd.Err()) } result, err := cmd.Result() if err != nil { + logger.Error().Err(err).Msg("Unable to process redis push result") return fmt.Errorf("unable to process result: %w", err) } - logger.Info().Int64("job_result", result).Msg("Pushed job successfully") + logger.Info().Int64("redis_push_result", result).Msg("Pushed job successfully") return nil } @@ -188,16 +189,19 @@ func (w *RedisWorker) fetchJob(ctx context.Context) { var job Job dec := gob.NewDecoder(strings.NewReader(res[1])) err = dec.Decode(&job) - logger := ptr.To(zerolog.Ctx(ctx).With(). - Str("job_id", job.ID.String()). - Str("job_type", job.Type.String()). - Interface("job_args", job.Args). - Logger()) if err != nil { - logger.Error().Err(err).Msg("Unable to unmarshal job payload, skipping") + zerolog.Ctx(ctx).Error(). + Err(err). + Str("job_id", job.ID.String()). + Str("job_type", job.Type.String()). + Interface("job_args", job.Args). + Msg("Unable to unmarshal job payload, skipping") + return } atomic.AddInt64(&w.inFlight, 1) + defer atomic.AddInt64(&w.inFlight, -1) + w.processJob(ctx, &job) } @@ -205,12 +209,11 @@ func (w *RedisWorker) processJob(origCtx context.Context, job *Job) { if job == nil { return } - defer atomic.AddInt64(&w.inFlight, -1) ctx, logger, span := initJobContext(origCtx, job) defer span.End() defer recoverAndLog(ctx) - logger.Info().Msgf("Dequeued job %s %s from Redis", job.Type.String(), job.ID) + logger.Info().Interface("job_args", job.Args).Msgf("Dequeued job from Redis") if h, ok := w.handlers[job.Type]; ok { cCtx, cFunc := context.WithTimeout(ctx, config.Worker.Timeout)