From 2c7157a3d5e8b56453495676ef91d3ef7a82b6f0 Mon Sep 17 00:00:00 2001 From: anero Date: Mon, 9 Dec 2024 18:02:58 -0300 Subject: [PATCH] fix: Adjust queue name handling in puash backs (#201) --- lib/sidekiq/throttled/strategy.rb | 19 +++++--- lib/sidekiq/throttled/strategy_collection.rb | 2 +- .../throttled/strategy_collection_spec.rb | 20 ++++++++ spec/lib/sidekiq/throttled/strategy_spec.rb | 48 +++++++++++++------ 4 files changed, 68 insertions(+), 21 deletions(-) diff --git a/lib/sidekiq/throttled/strategy.rb b/lib/sidekiq/throttled/strategy.rb index 971549a8..ab494247 100644 --- a/lib/sidekiq/throttled/strategy.rb +++ b/lib/sidekiq/throttled/strategy.rb @@ -143,24 +143,27 @@ def calc_target_queue(work) # rubocop:disable Metrics/MethodLength when NilClass work.queue when String, Symbol - requeue_to.to_s + requeue_to else raise ArgumentError, "Invalid argument for `to`" end target = work.queue if target.nil? || target.empty? - target.start_with?("queue:") ? target : "queue:#{target}" + target.to_s end # Push the job back to the head of the queue. + # The queue name is expected to include the "queue:" prefix, so we add it if it's missing. def re_enqueue_throttled(work, target_queue) + target_queue = "queue:#{target_queue}" unless target_queue.start_with?("queue:") + case work.class.name when "Sidekiq::Pro::SuperFetch::UnitOfWork" # Calls SuperFetch UnitOfWork's requeue to remove the job from the # temporary queue and push job back to the head of the target queue, so that # the job won't be tried immediately after it was requeued (in most cases). - work.queue = target_queue if target_queue + work.queue = target_queue work.requeue else # This is the same operation Sidekiq performs upon `Sidekiq::Worker.perform_async` call. @@ -168,10 +171,13 @@ def re_enqueue_throttled(work, target_queue) end end + # Reschedule the job to be executed later in the target queue. + # The queue name should NOT include the "queue:" prefix, so we remove it if it's present. def reschedule_throttled(work, target_queue) - message = JSON.parse(work.job) - job_class = message.fetch("wrapped") { message.fetch("class") { return false } } - job_args = message["args"] + target_queue = target_queue.delete_prefix("queue:") + message = JSON.parse(work.job) + job_class = message.fetch("wrapped") { message.fetch("class") { return false } } + job_args = message["args"] # Re-enqueue the job to the target queue at another time as a NEW unit of work # AND THEN mark this work as done, so SuperFetch doesn't think this instance is orphaned @@ -179,6 +185,7 @@ def reschedule_throttled(work, target_queue) # but your job should be idempotent anyway, right? # The job running twice was already a risk with SuperFetch anyway and this doesn't really increase that risk. Sidekiq::Client.enqueue_to_in(target_queue, retry_in(work), Object.const_get(job_class), *job_args) + work.acknowledge end diff --git a/lib/sidekiq/throttled/strategy_collection.rb b/lib/sidekiq/throttled/strategy_collection.rb index d6fd0bc4..b1cbaeda 100644 --- a/lib/sidekiq/throttled/strategy_collection.rb +++ b/lib/sidekiq/throttled/strategy_collection.rb @@ -43,7 +43,7 @@ def throttled?(...) # @return [Float] How long, in seconds, before we'll next be able to take on jobs def retry_in(*args) - max { |s| s.retry_in(*args) } + map { |s| s.retry_in(*args) }.max end # Marks job as being processed. diff --git a/spec/lib/sidekiq/throttled/strategy_collection_spec.rb b/spec/lib/sidekiq/throttled/strategy_collection_spec.rb index 8807fc69..5ae158e9 100644 --- a/spec/lib/sidekiq/throttled/strategy_collection_spec.rb +++ b/spec/lib/sidekiq/throttled/strategy_collection_spec.rb @@ -90,6 +90,26 @@ end end + describe "#retry_in" do + let(:strategies) do + [ + { limit: 1, key_suffix: ->(_project_id, user_id) { user_id } }, + { limit: 10 }, + { limit: 30 } + ] + end + let(:job_id) { jid } + + it do + collection.each_with_index do |s, i| + allow(s).to receive(:retry_in).with(job_id, 11, 22).and_return(i * 10) + end + + # Returns the max value from the strategies' retry_in returned values + expect(collection.retry_in(job_id, 11, 22)).to eq 20 + end + end + describe "#finalize!" do subject(:finalize!) { collection.finalize!(job_id, *job_args) } diff --git a/spec/lib/sidekiq/throttled/strategy_spec.rb b/spec/lib/sidekiq/throttled/strategy_spec.rb index b168bcdc..40fd38f6 100644 --- a/spec/lib/sidekiq/throttled/strategy_spec.rb +++ b/spec/lib/sidekiq/throttled/strategy_spec.rb @@ -401,7 +401,7 @@ def scheduled_redis_item_and_score item, score = scheduled_redis_item_and_score expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1], - "queue" => "queue:default") + "queue" => "default") expect(score.to_f).to be_within(31.0).of(Time.now.to_f + 330.0) # Ensure that the job is no longer in the private queue @@ -425,7 +425,7 @@ def scheduled_redis_item_and_score item, score = scheduled_redis_item_and_score expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1], - "queue" => "queue:other_queue") + "queue" => "other_queue") expect(score.to_f).to be_within(31.0).of(Time.now.to_f + 330.0) # Ensure that the job is no longer in the private queue @@ -449,7 +449,7 @@ def scheduled_redis_item_and_score item, score = scheduled_redis_item_and_score expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1], - "queue" => "queue:default") + "queue" => "default") expect(score.to_f).to be_within(31.0).of(Time.now.to_f + 330.0) # Ensure that the job is no longer in the private queue @@ -475,7 +475,7 @@ def scheduled_redis_item_and_score item, score = scheduled_redis_item_and_score expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1], - "queue" => "queue:other_queue") + "queue" => "other_queue") expect(score.to_f).to be_within(31.0).of(Time.now.to_f + 330.0) # Ensure that the job is no longer in the private queue @@ -503,7 +503,7 @@ def scheduled_redis_item_and_score item, score = scheduled_redis_item_and_score expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1], - "queue" => "queue:default") + "queue" => "default") expect(score.to_f).to be_within(31.0).of(Time.now.to_f + 330.0) # Ensure that the job is no longer in the private queue @@ -531,7 +531,7 @@ def scheduled_redis_item_and_score item, score = scheduled_redis_item_and_score expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1], - "queue" => "queue:default") + "queue" => "default") expect(score.to_f).to be_within(51.0).of(Time.now.to_f + 550.0) # Ensure that the job is no longer in the private queue @@ -666,7 +666,17 @@ def scheduled_redis_item_and_score it "returns false and does not reschedule the job" do expect(Sidekiq::Client).not_to receive(:enqueue_to_in) expect(work).not_to receive(:acknowledge) - expect(subject.send(:reschedule_throttled, work, requeue_to: "queue:default")).to be_falsey + expect(subject.send(:reschedule_throttled, work, "queue:default")).to be_falsey + end + end + + context "when target_queue has the 'queue:' prefix" do + let(:target_queue) { "queue:default" } + + it "reschedules the job to the specified queue" do + expect(Sidekiq::Client).to receive(:enqueue_to_in).with("default", anything, anything, anything) + expect(work).to receive(:acknowledge) + subject.send(:reschedule_throttled, work, target_queue) end end end @@ -807,7 +817,7 @@ def scheduled_redis_item_and_score item, score = scheduled_redis_item_and_score expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1], - "queue" => "queue:default") + "queue" => "default") expect(score.to_f).to be_within(31.0).of(Time.now.to_f + 330.0) end end @@ -823,7 +833,7 @@ def scheduled_redis_item_and_score item, score = scheduled_redis_item_and_score expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1], - "queue" => "queue:other_queue") + "queue" => "other_queue") expect(score.to_f).to be_within(31.0).of(Time.now.to_f + 330.0) end end @@ -839,7 +849,7 @@ def scheduled_redis_item_and_score item, score = scheduled_redis_item_and_score expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1], - "queue" => "queue:default") + "queue" => "default") expect(score.to_f).to be_within(31.0).of(Time.now.to_f + 330.0) end end @@ -859,7 +869,7 @@ def scheduled_redis_item_and_score item, score = scheduled_redis_item_and_score expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1], - "queue" => "queue:other_queue") + "queue" => "other_queue") expect(score.to_f).to be_within(31.0).of(Time.now.to_f + 330.0) end end @@ -879,7 +889,7 @@ def scheduled_redis_item_and_score item, score = scheduled_redis_item_and_score expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1], - "queue" => "queue:default") + "queue" => "default") expect(score.to_f).to be_within(31.0).of(Time.now.to_f + 330.0) end end @@ -899,7 +909,7 @@ def scheduled_redis_item_and_score item, score = scheduled_redis_item_and_score expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1], - "queue" => "queue:default") + "queue" => "default") expect(score.to_f).to be_within(51.0).of(Time.now.to_f + 550.0) end end @@ -998,7 +1008,17 @@ def scheduled_redis_item_and_score it "returns false and does not reschedule the job" do expect(Sidekiq::Client).not_to receive(:enqueue_to_in) expect(work).not_to receive(:acknowledge) - expect(subject.send(:reschedule_throttled, work, requeue_to: "queue:default")).to be_falsey + expect(subject.send(:reschedule_throttled, work, "queue:default")).to be_falsey + end + end + + context "when target_queue has the 'queue:' prefix" do + let(:target_queue) { "queue:default" } + + it "reschedules the job to the specified queue" do + expect(Sidekiq::Client).to receive(:enqueue_to_in).with("default", anything, anything, anything) + expect(work).to receive(:acknowledge) + subject.send(:reschedule_throttled, work, target_queue) end end end