Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated DLQ re-drive #688

Open
Marty-Me opened this issue Dec 22, 2023 · 8 comments
Open

Automated DLQ re-drive #688

Marty-Me opened this issue Dec 22, 2023 · 8 comments

Comments

@Marty-Me
Copy link

To get all ServiceBus Queue DLQ messages re-driven, I currently need to Receive 600-messages at a time using the Receive-Mode PeekLock mechanism in the Azure Portal.

The DLQs we use in my team sometimes collect millions of messages when one of the downstream services is down.
Doing this by hand is nearly impossible.

My feature-request is to have an option to re-drive DLQ-ed messages internally within the ServiceBus service, without the need to locally consume them first.

@SeanFeldman
Copy link
Contributor

I'd call the feature batched retry.

This is a common issue, and ServiceBusExplorer provides some help. But it's not as sufficient as a service-provided feature as it involves some work. And when we're talking about hundreds of thousands or millions of messages, it's likely to choke.

A potential workaround would be auto-forwarding DLQ-ed messages to some intermediate queue and then turning the auto-forwarding on the DLQ off, and auto-forwarding those messages back to the original queue. This helps with a large number of DLQ-ed messages that need to be reprocessed in bulk creating an infinite loop.

@EldertGrootenboer
Copy link
Contributor

Thank you for suggesting this feature. However, since we're not considering it in the short-term, we've added it to our backlog. To help us give this the right priority, it would be helpful to see others vote and support this feature, as well as explain their scenarios.

@EldertGrootenboer
Copy link
Contributor

We have brought this item in our current planning. We don't have a specific date when development will start for this, once we have more information around this, we will update this thread.

@yggdrasil-tynor
Copy link

Any update on this really valuable feature? 👍

@yggdrasil-tynor
Copy link

@Marty-Me did you make a workaround?
We are considering doing this ourselves as the ServiceBus Explorer is not a perfect solution for this scenario.

@EldertGrootenboer please can you kindly let us know if this is still a feature that will come anytime soon?

@Marty-Me
Copy link
Author

Marty-Me commented Jan 9, 2025

I did, I wrote a small a-sync script in Python, but I can't share it here as it is my employer's property 😔

@yggdrasil-tynor
Copy link

I found a code snippet on SO which im posting here for reference. I take no credit for it, as i'm not the author, but it might help some other people looking for this feature:

using Azure.Messaging.ServiceBus;
using System.Collections.Generic;
using System.Threading.Tasks;
using NLog;

namespace ServiceBus.Tools
{
    class TransferDeadLetterMessages
    {
        // https://github.com/Azure/azure-sdk-for-net/blob/Azure.Messaging.ServiceBus_7.2.1/sdk/servicebus/Azure.Messaging.ServiceBus/README.md

        private static Logger logger = LogManager.GetCurrentClassLogger();

        private static ServiceBusClient client;
        private static ServiceBusSender sender;
    
        public static async Task ProcessTopicAsync(string connectionString, string topicName, string subscriberName, int fetchCount = 10)
        {
            try
            {
                client = new ServiceBusClient(connectionString);
                sender = client.CreateSender(topicName);

                ServiceBusReceiver dlqReceiver = client.CreateReceiver(topicName, subscriberName, new ServiceBusReceiverOptions
                {
                    SubQueue = SubQueue.DeadLetter,
                    ReceiveMode = ServiceBusReceiveMode.PeekLock
                });

                await ProcessDeadLetterMessagesAsync($"topic: {topicName} -> subscriber: {subscriberName}", fetchCount, sender, dlqReceiver);
            }
            catch (Azure.Messaging.ServiceBus.ServiceBusException ex)
            {
                if (ex.Reason == Azure.Messaging.ServiceBus.ServiceBusFailureReason.MessagingEntityNotFound)
                {
                    logger.Error(ex, $"Topic:Subscriber '{topicName}:{subscriberName}' not found. Check that the name provided is correct.");
                }
                else
                {
                    throw;
                }
            }
            finally
            {
                await sender.CloseAsync();
                await client.DisposeAsync();
            }
        }

        public static async Task ProcessQueueAsync(string connectionString, string queueName, int fetchCount = 10)
        {         
            try
            {
                client = new ServiceBusClient(connectionString);
                sender = client.CreateSender(queueName);

                ServiceBusReceiver dlqReceiver = client.CreateReceiver(queueName, new ServiceBusReceiverOptions
                {
                    SubQueue = SubQueue.DeadLetter,
                    ReceiveMode = ServiceBusReceiveMode.PeekLock
                });

                await ProcessDeadLetterMessagesAsync($"queue: {queueName}", fetchCount, sender, dlqReceiver);
            }
            catch (Azure.Messaging.ServiceBus.ServiceBusException ex)
            {
                if (ex.Reason == Azure.Messaging.ServiceBus.ServiceBusFailureReason.MessagingEntityNotFound)
                {
                    logger.Error(ex, $"Queue '{queueName}' not found. Check that the name provided is correct.");
                }
                else
                {
                    throw;
                }
            }
            finally
            {
                await sender.CloseAsync();
                await client.DisposeAsync();
            }
        }

        private static async Task ProcessDeadLetterMessagesAsync(string source, int fetchCount, ServiceBusSender sender, ServiceBusReceiver dlqReceiver)
        {
            var wait = new System.TimeSpan(0, 0, 10);

            logger.Info($"fetching messages ({wait.TotalSeconds} seconds retrieval timeout)");
            logger.Info(source);

            IReadOnlyList<ServiceBusReceivedMessage> dlqMessages = await dlqReceiver.ReceiveMessagesAsync(fetchCount, wait);

            logger.Info($"dl-count: {dlqMessages.Count}");

            int i = 1;

            foreach (var dlqMessage in dlqMessages)
            {
                logger.Info($"start processing message {i}");
                logger.Info($"dl-message-dead-letter-message-id: {dlqMessage.MessageId}");
                logger.Info($"dl-message-dead-letter-reason: {dlqMessage.DeadLetterReason}");
                logger.Info($"dl-message-dead-letter-error-description: {dlqMessage.DeadLetterErrorDescription}");

                ServiceBusMessage resubmittableMessage = new ServiceBusMessage(dlqMessage);

                await sender.SendMessageAsync(resubmittableMessage);

                await dlqReceiver.CompleteMessageAsync(dlqMessage);

                logger.Info($"finished processing message {i}");
                logger.Info("--------------------------------------------------------------------------------------");

                i++;
            }

            await dlqReceiver.CloseAsync();

            logger.Info($"finished");
        }
    }
}

@EldertGrootenboer
Copy link
Contributor

@Marty-Me did you make a workaround? We are considering doing this ourselves as the ServiceBus Explorer is not a perfect solution for this scenario.

@EldertGrootenboer please can you kindly let us know if this is still a feature that will come anytime soon?

Due to other incoming priorities, we have not had a chance yet to start on this work. Once we have more information to share, we will update this thread.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants