Skip to content

Commit

Permalink
fix: Adjust queue name handling in puash backs (#201)
Browse files Browse the repository at this point in the history
  • Loading branch information
anero authored and ixti committed Jan 12, 2025
1 parent eb2c0ee commit 2c7157a
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 21 deletions.
19 changes: 13 additions & 6 deletions lib/sidekiq/throttled/strategy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -143,42 +143,49 @@ def calc_target_queue(work) # rubocop:disable Metrics/MethodLength
when NilClass
work.queue
when String, Symbol
requeue_to.to_s
requeue_to
else
raise ArgumentError, "Invalid argument for `to`"
end

target = work.queue if target.nil? || target.empty?

target.start_with?("queue:") ? target : "queue:#{target}"
target.to_s
end

# Push the job back to the head of the queue.
# The queue name is expected to include the "queue:" prefix, so we add it if it's missing.
def re_enqueue_throttled(work, target_queue)
target_queue = "queue:#{target_queue}" unless target_queue.start_with?("queue:")

case work.class.name
when "Sidekiq::Pro::SuperFetch::UnitOfWork"
# Calls SuperFetch UnitOfWork's requeue to remove the job from the
# temporary queue and push job back to the head of the target queue, so that
# the job won't be tried immediately after it was requeued (in most cases).
work.queue = target_queue if target_queue
work.queue = target_queue
work.requeue
else
# This is the same operation Sidekiq performs upon `Sidekiq::Worker.perform_async` call.
Sidekiq.redis { |conn| conn.lpush(target_queue, work.job) }
end
end

# Reschedule the job to be executed later in the target queue.
# The queue name should NOT include the "queue:" prefix, so we remove it if it's present.
def reschedule_throttled(work, target_queue)
message = JSON.parse(work.job)
job_class = message.fetch("wrapped") { message.fetch("class") { return false } }
job_args = message["args"]
target_queue = target_queue.delete_prefix("queue:")
message = JSON.parse(work.job)
job_class = message.fetch("wrapped") { message.fetch("class") { return false } }
job_args = message["args"]

# Re-enqueue the job to the target queue at another time as a NEW unit of work
# AND THEN mark this work as done, so SuperFetch doesn't think this instance is orphaned
# Technically, the job could processed twice if the process dies between the two lines,
# but your job should be idempotent anyway, right?
# The job running twice was already a risk with SuperFetch anyway and this doesn't really increase that risk.
Sidekiq::Client.enqueue_to_in(target_queue, retry_in(work), Object.const_get(job_class), *job_args)

work.acknowledge
end

Expand Down
2 changes: 1 addition & 1 deletion lib/sidekiq/throttled/strategy_collection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def throttled?(...)

# @return [Float] How long, in seconds, before we'll next be able to take on jobs
def retry_in(*args)
max { |s| s.retry_in(*args) }
map { |s| s.retry_in(*args) }.max
end

# Marks job as being processed.
Expand Down
20 changes: 20 additions & 0 deletions spec/lib/sidekiq/throttled/strategy_collection_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,26 @@
end
end

describe "#retry_in" do
let(:strategies) do
[
{ limit: 1, key_suffix: ->(_project_id, user_id) { user_id } },
{ limit: 10 },
{ limit: 30 }
]
end
let(:job_id) { jid }

it do
collection.each_with_index do |s, i|
allow(s).to receive(:retry_in).with(job_id, 11, 22).and_return(i * 10)
end

# Returns the max value from the strategies' retry_in returned values
expect(collection.retry_in(job_id, 11, 22)).to eq 20
end
end

describe "#finalize!" do
subject(:finalize!) { collection.finalize!(job_id, *job_args) }

Expand Down
48 changes: 34 additions & 14 deletions spec/lib/sidekiq/throttled/strategy_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ def scheduled_redis_item_and_score

item, score = scheduled_redis_item_and_score
expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1],
"queue" => "queue:default")
"queue" => "default")
expect(score.to_f).to be_within(31.0).of(Time.now.to_f + 330.0)

# Ensure that the job is no longer in the private queue
Expand All @@ -425,7 +425,7 @@ def scheduled_redis_item_and_score

item, score = scheduled_redis_item_and_score
expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1],
"queue" => "queue:other_queue")
"queue" => "other_queue")
expect(score.to_f).to be_within(31.0).of(Time.now.to_f + 330.0)

# Ensure that the job is no longer in the private queue
Expand All @@ -449,7 +449,7 @@ def scheduled_redis_item_and_score

item, score = scheduled_redis_item_and_score
expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1],
"queue" => "queue:default")
"queue" => "default")
expect(score.to_f).to be_within(31.0).of(Time.now.to_f + 330.0)

# Ensure that the job is no longer in the private queue
Expand All @@ -475,7 +475,7 @@ def scheduled_redis_item_and_score

item, score = scheduled_redis_item_and_score
expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1],
"queue" => "queue:other_queue")
"queue" => "other_queue")
expect(score.to_f).to be_within(31.0).of(Time.now.to_f + 330.0)

# Ensure that the job is no longer in the private queue
Expand Down Expand Up @@ -503,7 +503,7 @@ def scheduled_redis_item_and_score

item, score = scheduled_redis_item_and_score
expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1],
"queue" => "queue:default")
"queue" => "default")
expect(score.to_f).to be_within(31.0).of(Time.now.to_f + 330.0)

# Ensure that the job is no longer in the private queue
Expand Down Expand Up @@ -531,7 +531,7 @@ def scheduled_redis_item_and_score

item, score = scheduled_redis_item_and_score
expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1],
"queue" => "queue:default")
"queue" => "default")
expect(score.to_f).to be_within(51.0).of(Time.now.to_f + 550.0)

# Ensure that the job is no longer in the private queue
Expand Down Expand Up @@ -666,7 +666,17 @@ def scheduled_redis_item_and_score
it "returns false and does not reschedule the job" do
expect(Sidekiq::Client).not_to receive(:enqueue_to_in)
expect(work).not_to receive(:acknowledge)
expect(subject.send(:reschedule_throttled, work, requeue_to: "queue:default")).to be_falsey
expect(subject.send(:reschedule_throttled, work, "queue:default")).to be_falsey
end
end

context "when target_queue has the 'queue:' prefix" do
let(:target_queue) { "queue:default" }

it "reschedules the job to the specified queue" do
expect(Sidekiq::Client).to receive(:enqueue_to_in).with("default", anything, anything, anything)
expect(work).to receive(:acknowledge)
subject.send(:reschedule_throttled, work, target_queue)
end
end
end
Expand Down Expand Up @@ -807,7 +817,7 @@ def scheduled_redis_item_and_score

item, score = scheduled_redis_item_and_score
expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1],
"queue" => "queue:default")
"queue" => "default")
expect(score.to_f).to be_within(31.0).of(Time.now.to_f + 330.0)
end
end
Expand All @@ -823,7 +833,7 @@ def scheduled_redis_item_and_score

item, score = scheduled_redis_item_and_score
expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1],
"queue" => "queue:other_queue")
"queue" => "other_queue")
expect(score.to_f).to be_within(31.0).of(Time.now.to_f + 330.0)
end
end
Expand All @@ -839,7 +849,7 @@ def scheduled_redis_item_and_score

item, score = scheduled_redis_item_and_score
expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1],
"queue" => "queue:default")
"queue" => "default")
expect(score.to_f).to be_within(31.0).of(Time.now.to_f + 330.0)
end
end
Expand All @@ -859,7 +869,7 @@ def scheduled_redis_item_and_score

item, score = scheduled_redis_item_and_score
expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1],
"queue" => "queue:other_queue")
"queue" => "other_queue")
expect(score.to_f).to be_within(31.0).of(Time.now.to_f + 330.0)
end
end
Expand All @@ -879,7 +889,7 @@ def scheduled_redis_item_and_score

item, score = scheduled_redis_item_and_score
expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1],
"queue" => "queue:default")
"queue" => "default")
expect(score.to_f).to be_within(31.0).of(Time.now.to_f + 330.0)
end
end
Expand All @@ -899,7 +909,7 @@ def scheduled_redis_item_and_score

item, score = scheduled_redis_item_and_score
expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1],
"queue" => "queue:default")
"queue" => "default")
expect(score.to_f).to be_within(51.0).of(Time.now.to_f + 550.0)
end
end
Expand Down Expand Up @@ -998,7 +1008,17 @@ def scheduled_redis_item_and_score
it "returns false and does not reschedule the job" do
expect(Sidekiq::Client).not_to receive(:enqueue_to_in)
expect(work).not_to receive(:acknowledge)
expect(subject.send(:reschedule_throttled, work, requeue_to: "queue:default")).to be_falsey
expect(subject.send(:reschedule_throttled, work, "queue:default")).to be_falsey
end
end

context "when target_queue has the 'queue:' prefix" do
let(:target_queue) { "queue:default" }

it "reschedules the job to the specified queue" do
expect(Sidekiq::Client).to receive(:enqueue_to_in).with("default", anything, anything, anything)
expect(work).to receive(:acknowledge)
subject.send(:reschedule_throttled, work, target_queue)
end
end
end
Expand Down

0 comments on commit 2c7157a

Please sign in to comment.