Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Sanitize job logging #718

Merged
merged 1 commit into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions pkg/worker/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,7 @@ func initJobContext(origCtx context.Context, job *Job) (context.Context, *zerolo
Str("account_number", job.Identity.Identity.AccountNumber).
Str("request_id", job.EdgeID).
Str("job_id", job.ID.String()).
Str("job_type", job.Type.String()).
Interface("job_args", job.Args)
Str("job_type", job.Type.String())
lzap marked this conversation as resolved.
Show resolved Hide resolved

if config.Telemetry.Enabled {
ctx = otel.GetTextMapPropagator().Extract(ctx, job.TraceContext)
Expand Down
8 changes: 8 additions & 0 deletions pkg/worker/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,16 @@ func (w *MemoryWorker) Enqueue(ctx context.Context, job *Job) error {
return fmt.Errorf("unable to enqueue job: %w", ErrJobNotFound)
}

logger := zerolog.Ctx(ctx).With().
Str("job_id", job.ID.String()).
Str("job_type", job.Type.String()).
Logger()
logger.Info().Interface("job_args", job.Args).Msg("Enqueuing job via memory")

if job.ID == uuid.Nil {
job.ID, err = uuid.NewRandom()
if err != nil {
logger.Error().Err(err).Msg("Unable to generate a job id")
return fmt.Errorf("unable to generate UUID: %w", err)
}
}
Expand Down Expand Up @@ -72,6 +79,7 @@ func (w *MemoryWorker) processJob(origCtx context.Context, job *Job) {

ctx, logger, span := initJobContext(origCtx, job)
defer span.End()
logger.Info().Interface("job_args", job.Args).Msgf("Dequeued job from memory")

if h, ok := w.handlers[job.Type]; ok {
cCtx, cFunc := context.WithTimeout(ctx, config.Worker.Timeout)
Expand Down
39 changes: 21 additions & 18 deletions pkg/worker/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (

"github.com/RHEnVision/provisioning-backend/internal/config"
"github.com/RHEnVision/provisioning-backend/internal/metrics"
"github.com/RHEnVision/provisioning-backend/internal/ptr"
"github.com/google/uuid"
"github.com/redis/go-redis/v9"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -78,20 +77,20 @@ func (w *RedisWorker) Enqueue(ctx context.Context, job *Job) error {
return fmt.Errorf("unable to enqueue job: %w", ErrJobNotFound)
}

logger := zerolog.Ctx(ctx).With().
Str("job_id", job.ID.String()).
Str("job_type", job.Type.String()).
Logger()
logger.Info().Interface("job_args", job.Args).Msg("Enqueuing job via Redis")

if job.ID == uuid.Nil {
job.ID, err = uuid.NewRandom()
if err != nil {
logger.Error().Err(err).Msg("Unable to generate a job id")
return fmt.Errorf("unable to generate UUID: %w", err)
}
}

logger := ptr.To(zerolog.Ctx(ctx).With().
Str("job_id", job.ID.String()).
Str("job_type", job.Type.String()).
Interface("job_args", job.Args).
Logger())
logger.Info().Msgf("Enqueuing job type %s via Redis", job.Type)

var buffer bytes.Buffer
enc := gob.NewEncoder(&buffer)

Expand All @@ -102,20 +101,22 @@ func (w *RedisWorker) Enqueue(ctx context.Context, job *Job) error {

err = enc.Encode(&job)
if err != nil {
logger.Error().Err(err).Msg("Unable to encode the job")
return fmt.Errorf("unable to encode args: %w", err)
}

cmd := w.client.LPush(ctx, w.queueName, buffer.Bytes())
if cmd.Err() != nil {
logger.Error().Err(err).Msg("Unable to push job into Redis")
logger.Error().Err(cmd.Err()).Msg("Unable to push job into Redis")
return fmt.Errorf("unable to push job into Redis: %w", cmd.Err())
}

result, err := cmd.Result()
if err != nil {
logger.Error().Err(err).Msg("Unable to process redis push result")
return fmt.Errorf("unable to process result: %w", err)
}
logger.Info().Int64("job_result", result).Msg("Pushed job successfully")
logger.Info().Int64("redis_push_result", result).Msg("Pushed job successfully")
return nil
}

Expand Down Expand Up @@ -188,29 +189,31 @@ func (w *RedisWorker) fetchJob(ctx context.Context) {
var job Job
dec := gob.NewDecoder(strings.NewReader(res[1]))
err = dec.Decode(&job)
logger := ptr.To(zerolog.Ctx(ctx).With().
Str("job_id", job.ID.String()).
Str("job_type", job.Type.String()).
Interface("job_args", job.Args).
Logger())
if err != nil {
logger.Error().Err(err).Msg("Unable to unmarshal job payload, skipping")
zerolog.Ctx(ctx).Error().
Err(err).
Str("job_id", job.ID.String()).
Str("job_type", job.Type.String()).
Interface("job_args", job.Args).
Msg("Unable to unmarshal job payload, skipping")
return
}

atomic.AddInt64(&w.inFlight, 1)
defer atomic.AddInt64(&w.inFlight, -1)

w.processJob(ctx, &job)
}

func (w *RedisWorker) processJob(origCtx context.Context, job *Job) {
if job == nil {
return
}
defer atomic.AddInt64(&w.inFlight, -1)
lzap marked this conversation as resolved.
Show resolved Hide resolved

ctx, logger, span := initJobContext(origCtx, job)
defer span.End()
defer recoverAndLog(ctx)
logger.Info().Msgf("Dequeued job %s %s from Redis", job.Type.String(), job.ID)
logger.Info().Interface("job_args", job.Args).Msgf("Dequeued job from Redis")

if h, ok := w.handlers[job.Type]; ok {
cCtx, cFunc := context.WithTimeout(ctx, config.Worker.Timeout)
Expand Down