From 6622606e70e1d1afc69a84090f619e95e6dd4ec6 Mon Sep 17 00:00:00 2001 From: zhudi Date: Tue, 27 Aug 2024 18:35:45 +0800 Subject: [PATCH 1/4] feat: kq commitinorder support trace --- kq/queue.go | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/kq/queue.go b/kq/queue.go index b7f23ac..1b2b267 100644 --- a/kq/queue.go +++ b/kq/queue.go @@ -167,7 +167,9 @@ func newKafkaQueue(c KqConf, handler ConsumeHandler, options queueOptions) queue } if c.CommitInOrder { q.commitRunner = threading.NewStableRunner(func(msg kafka.Message) kafka.Message { - if err := q.consumeOne(context.Background(), string(msg.Key), string(msg.Value)); err != nil { + ctx := extractCtxFromMsg(msg) + + if err := q.consumeOne(ctx, string(msg.Key), string(msg.Value)); err != nil { if q.errorHandler != nil { q.errorHandler(context.Background(), msg, err) } @@ -220,12 +222,7 @@ func (q *kafkaQueue) startConsumers() { for i := 0; i < q.c.Processors; i++ { q.consumerRoutines.Run(func() { for msg := range q.channel { - // wrap message into message carrier - mc := internal.NewMessageCarrier(internal.NewMessage(&msg)) - // extract trace context from message - ctx := otel.GetTextMapPropagator().Extract(context.Background(), mc) - // remove deadline and error control - ctx = contextx.ValueOnlyFrom(ctx) + ctx := extractCtxFromMsg(msg) if err := q.consumeOne(ctx, string(msg.Key), string(msg.Value)); err != nil { if q.errorHandler != nil { @@ -365,3 +362,13 @@ func ensureQueueOptions(c KqConf, options *queueOptions) { } } } + +func extractCtxFromMsg(msg kafka.Message) context.Context { + // wrap message into message carrier + mc := internal.NewMessageCarrier(internal.NewMessage(&msg)) + // extract trace context from message + ctx := otel.GetTextMapPropagator().Extract(context.Background(), mc) + // remove deadline and error control + ctx = contextx.ValueOnlyFrom(ctx) + return ctx +} From e285d13ae6bde622077992af051fe9b05e6814ec Mon Sep 17 00:00:00 2001 From: zhudi Date: Thu, 19 Dec 2024 14:52:56 +0800 Subject: [PATCH 2/4] feat:consumeone recover --- kq/queue.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/kq/queue.go b/kq/queue.go index 1b2b267..f89d0a0 100644 --- a/kq/queue.go +++ b/kq/queue.go @@ -210,6 +210,11 @@ func (q *kafkaQueue) Stop() { } func (q *kafkaQueue) consumeOne(ctx context.Context, key, val string) error { + defer func() { + if err := recover(); err != nil { + logc.Errorf(ctx, "consumeOne failed recover, error: %v", err) + } + }() startTime := timex.Now() err := q.handler.Consume(ctx, key, val) q.metrics.Add(stat.Task{ From d934679842f69b2b606f0180ef5bde5d63dba207 Mon Sep 17 00:00:00 2001 From: zhudi Date: Thu, 19 Dec 2024 15:15:46 +0800 Subject: [PATCH 3/4] feat:private --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 6b4c053..f647729 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module github.com/zeromicro/go-queue +module github.com/zhuud/go-queue go 1.20 From 0aa8fbc21cb2d517dd30f8bad0ab5b95b3fab0ff Mon Sep 17 00:00:00 2001 From: zhudi Date: Thu, 19 Dec 2024 15:16:46 +0800 Subject: [PATCH 4/4] feat:private --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index f647729..6b4c053 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module github.com/zhuud/go-queue +module github.com/zeromicro/go-queue go 1.20