diff --git a/kq/queue.go b/kq/queue.go index b7f23ac..f89d0a0 100644 --- a/kq/queue.go +++ b/kq/queue.go @@ -167,7 +167,9 @@ func newKafkaQueue(c KqConf, handler ConsumeHandler, options queueOptions) queue } if c.CommitInOrder { q.commitRunner = threading.NewStableRunner(func(msg kafka.Message) kafka.Message { - if err := q.consumeOne(context.Background(), string(msg.Key), string(msg.Value)); err != nil { + ctx := extractCtxFromMsg(msg) + + if err := q.consumeOne(ctx, string(msg.Key), string(msg.Value)); err != nil { if q.errorHandler != nil { q.errorHandler(context.Background(), msg, err) } @@ -208,6 +210,11 @@ func (q *kafkaQueue) Stop() { } func (q *kafkaQueue) consumeOne(ctx context.Context, key, val string) error { + defer func() { + if err := recover(); err != nil { + logc.Errorf(ctx, "consumeOne failed recover, error: %v", err) + } + }() startTime := timex.Now() err := q.handler.Consume(ctx, key, val) q.metrics.Add(stat.Task{ @@ -220,12 +227,7 @@ func (q *kafkaQueue) startConsumers() { for i := 0; i < q.c.Processors; i++ { q.consumerRoutines.Run(func() { for msg := range q.channel { - // wrap message into message carrier - mc := internal.NewMessageCarrier(internal.NewMessage(&msg)) - // extract trace context from message - ctx := otel.GetTextMapPropagator().Extract(context.Background(), mc) - // remove deadline and error control - ctx = contextx.ValueOnlyFrom(ctx) + ctx := extractCtxFromMsg(msg) if err := q.consumeOne(ctx, string(msg.Key), string(msg.Value)); err != nil { if q.errorHandler != nil { @@ -365,3 +367,13 @@ func ensureQueueOptions(c KqConf, options *queueOptions) { } } } + +func extractCtxFromMsg(msg kafka.Message) context.Context { + // wrap message into message carrier + mc := internal.NewMessageCarrier(internal.NewMessage(&msg)) + // extract trace context from message + ctx := otel.GetTextMapPropagator().Extract(context.Background(), mc) + // remove deadline and error control + ctx = contextx.ValueOnlyFrom(ctx) + return ctx +}