Skip to content

Commit

Permalink
feat: Allow configuring how throttled jobs are pushed back (#150)
Browse files Browse the repository at this point in the history
This adds support for setting the `requeue_strategy` for a job, to
specify what we should do with throttled jobs. The default is
`:enqueue`, which is the current behavior: re-add it to the end of the
queue. The other option is `:schedule`, which schedules the job for a
time in the future when we think we'll have capacity to process it.

It's also possible to set the `default_requeue_strategy` in the
configuration, to set this behavior for all jobs that do not
individually specify a `requeue_strategy`.

This may be relevant to Issue #36.

Unrelatedly, there's a commit in here that changes the format of some
`require_relative` lines, to comply with the new Rubocop rule
`Style/RedundantCurrentDirectoryInPath`. I don't feel strongly about
this commit; I only added it so that rubocop would pass.

---------

Signed-off-by: Diego Marcet <[email protected]>
Co-authored-by: anero <[email protected]>
Co-authored-by: Diego Marcet <[email protected]>
Co-authored-by: Alexey Zapparov <[email protected]>
Co-authored-by: Mauricio Novelo <[email protected]>
  • Loading branch information
5 people authored Nov 17, 2024
1 parent 8f3fcf2 commit 27ec05e
Show file tree
Hide file tree
Showing 18 changed files with 1,176 additions and 41 deletions.
18 changes: 18 additions & 0 deletions lib/sidekiq/throttled.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ class << self
# @return [Cooldown, nil]
attr_reader :cooldown

# @api internal
#
# @return [Config, nil]
attr_reader :config

# @example
# Sidekiq::Throttled.configure do |config|
# config.cooldown_period = nil # Disable queues cooldown manager
Expand Down Expand Up @@ -87,6 +92,19 @@ def throttled?(message)
rescue StandardError
false
end

# Return throttled job to be executed later, delegating the details of how to do that
# to the Strategy for that job.
#
# @return [void]
def requeue_throttled(work)
message = JSON.parse(work.job)
job_class = Object.const_get(message.fetch("wrapped") { message.fetch("class") { return false } })

Registry.get job_class do |strategy|
strategy.requeue_throttled(work, **job_class.sidekiq_throttled_requeue_options)
end
end
end
end

Expand Down
26 changes: 24 additions & 2 deletions lib/sidekiq/throttled/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,18 @@ class Config
# @return [Integer]
attr_reader :cooldown_threshold

# Specifies how we should return throttled jobs to the queue so they can be executed later.
# Expects a hash with keys that may include :with and :to
# For :with, options are `:enqueue` (put them on the end of the queue) and `:schedule` (schedule for later).
# For :to, the name of a sidekiq queue should be specified. If none is specified, jobs will by default be
# requeued to the same queue they were originally enqueued in.
# Default: {with: `:enqueue`}
#
# @return [Hash]
attr_reader :default_requeue_options

def initialize
@cooldown_period = 1.0
@cooldown_threshold = 100
reset!
end

# @!attribute [w] cooldown_period
Expand All @@ -39,6 +48,19 @@ def cooldown_threshold=(value)

@cooldown_threshold = value
end

# @!attribute [w] default_requeue_options
def default_requeue_options=(options)
requeue_with = options.delete(:with).intern || :enqueue

@default_requeue_options = options.merge({ with: requeue_with })
end

def reset!
@cooldown_period = 1.0
@cooldown_threshold = 100
@default_requeue_options = { with: :enqueue }
end
end
end
end
28 changes: 26 additions & 2 deletions lib/sidekiq/throttled/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ module Throttled
# include Sidekiq::Job
# include Sidekiq::Throttled::Job
#
# sidekiq_options :queue => :my_queue
# sidekiq_throttle :threshold => { :limit => 123, :period => 1.hour }
# sidkiq_options :queue => :my_queue
# sidekiq_throttle :threshold => { :limit => 123, :period => 1.hour },
# :requeue => { :to => :other_queue, :with => :schedule }
#
# def perform
# # ...
Expand All @@ -23,13 +24,16 @@ module Throttled
#
# @see ClassMethods
module Job
VALID_VALUES_FOR_REQUEUE_WITH = %i[enqueue schedule].freeze

# Extends worker class with {ClassMethods}.
#
# @note Using `included` hook with extending worker with {ClassMethods}
# in order to make API inline with `include Sidekiq::Job`.
#
# @private
def self.included(base)
base.sidekiq_class_attribute :sidekiq_throttled_requeue_options
base.extend(ClassMethods)
end

Expand Down Expand Up @@ -71,9 +75,29 @@ module ClassMethods
# })
# end
#
# @example Allow max 123 MyJob jobs per hour; when jobs are throttled, schedule them for later in :other_queue
#
# class MyJob
# include Sidekiq::Job
# include Sidekiq::Throttled::Job
#
# sidekiq_throttle({
# :threshold => { :limit => 123, :period => 1.hour },
# :requeue => { :to => :other_queue, :with => :schedule }
# })
# end
#
# @param [Hash] requeue What to do with jobs that are throttled
# @see Registry.add
# @return [void]
def sidekiq_throttle(**kwargs)
requeue_options = Throttled.config.default_requeue_options.merge(kwargs.delete(:requeue) || {})
unless VALID_VALUES_FOR_REQUEUE_WITH.include?(requeue_options[:with])
raise ArgumentError, "requeue: #{requeue_options[:with]} is not a valid value for :with"
end

self.sidekiq_throttled_requeue_options = requeue_options

Registry.add(self, **kwargs)
end

Expand Down
11 changes: 0 additions & 11 deletions lib/sidekiq/throttled/patches/basic_fetch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,6 @@ def self.prepended(base)

private

# Pushes job back to the head of the queue, so that job won't be tried
# immediately after it was requeued (in most cases).
#
# @note This is triggered when job is throttled. So it is same operation
# Sidekiq performs upon `Sidekiq::Worker.perform_async` call.
#
# @return [void]
def requeue_throttled(work)
redis { |conn| conn.lpush(work.queue, work.job) }
end

# Returns list of queues to try to fetch jobs from.
#
# @note It may return an empty array.
Expand Down
13 changes: 0 additions & 13 deletions lib/sidekiq/throttled/patches/super_fetch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,6 @@ def self.prepended(base)

private

# Calls SuperFetch UnitOfWork's requeue to remove the job from the
# temporary queue and push job back to the head of the queue, so that
# the job won't be tried immediately after it was requeued (in most cases).
#
# @note This is triggered when job is throttled.
#
# @return [void]
def requeue_throttled(work)
# SuperFetch UnitOfWork's requeue will remove it from the temporary
# queue and then requeue it, so no acknowledgement call is needed.
work.requeue
end

# Returns list of non-paused queues to try to fetch jobs from.
#
# @note It may return an empty array.
Expand Down
2 changes: 1 addition & 1 deletion lib/sidekiq/throttled/patches/throttled_retriever.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def retrieve_work

if work && Throttled.throttled?(work.job)
Throttled.cooldown&.notify_throttled(work.queue)
requeue_throttled(work)
Throttled.requeue_throttled(work)
return nil
end

Expand Down
101 changes: 96 additions & 5 deletions lib/sidekiq/throttled/strategy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class Strategy
# See keyword args of {Strategy::Threshold#initialize} for details.
# @param [#call] key_suffix Dynamic key suffix generator.
# @param [#call] observer Process called after throttled.
def initialize(name, concurrency: nil, threshold: nil, key_suffix: nil, observer: nil) # rubocop:disable Metrics/MethodLength
def initialize(name, concurrency: nil, threshold: nil, key_suffix: nil, observer: nil)
@observer = observer

@concurrency = StrategyCollection.new(concurrency,
Expand All @@ -44,9 +44,7 @@ def initialize(name, concurrency: nil, threshold: nil, key_suffix: nil, observer
name: name,
key_suffix: key_suffix)

return if @concurrency.any? || @threshold.any?

raise ArgumentError, "Neither :concurrency nor :threshold given"
raise ArgumentError, "Neither :concurrency nor :threshold given" unless @concurrency.any? || @threshold.any?
end

# @return [Boolean] whenever strategy has dynamic config
Expand Down Expand Up @@ -74,18 +72,111 @@ def throttled?(jid, *job_args)
false
end

# Return throttled job to be executed later. Implementation depends on the value of `with`:
# :enqueue means put the job back at the end of the queue immediately
# :schedule means schedule enqueueing the job for a later time when we expect to have capacity
#
# @param [#to_s, #call] with How to handle the throttled job
# @param [#to_s, #call] to Name of the queue to re-queue the job to.
# If not specified, will use the job's original queue.
# @return [void]
def requeue_throttled(work, with:, to: nil) # rubocop:disable Metrics/MethodLength
# Resolve :with and :to arguments, calling them if they are Procs
job_args = JSON.parse(work.job)["args"]
requeue_with = with.respond_to?(:call) ? with.call(*job_args) : with
target_queue = calc_target_queue(work, to)

case requeue_with
when :enqueue
re_enqueue_throttled(work, target_queue)
when :schedule
# Find out when we will next be able to execute this job, and reschedule for then.
reschedule_throttled(work, requeue_to: target_queue)
else
raise "unrecognized :with option #{with}"
end
end

# Marks job as being processed.
# @return [void]
def finalize!(jid, *job_args)
@concurrency&.finalize!(jid, *job_args)
end

# Resets count of jobs of all avaliable strategies
# Resets count of jobs of all available strategies
# @return [void]
def reset!
@concurrency&.reset!
@threshold&.reset!
end

private

def calc_target_queue(work, to) # rubocop:disable Metrics/MethodLength
target = case to
when Proc, Method
to.call(*JSON.parse(work.job)["args"])
when NilClass
work.queue
when String, Symbol
to.to_s
else
raise ArgumentError, "Invalid argument for `to`"
end

target = work.queue if target.nil? || target.empty?

target.start_with?("queue:") ? target : "queue:#{target}"
end

# Push the job back to the head of the queue.
def re_enqueue_throttled(work, requeue_to)
case work.class.name
when "Sidekiq::Pro::SuperFetch::UnitOfWork"
# Calls SuperFetch UnitOfWork's requeue to remove the job from the
# temporary queue and push job back to the head of the target queue, so that
# the job won't be tried immediately after it was requeued (in most cases).
work.queue = requeue_to if requeue_to
work.requeue
else
# This is the same operation Sidekiq performs upon `Sidekiq::Worker.perform_async` call.
Sidekiq.redis { |conn| conn.lpush(requeue_to, work.job) }
end
end

def reschedule_throttled(work, requeue_to:)
message = JSON.parse(work.job)
job_class = message.fetch("wrapped") { message.fetch("class") { return false } }
job_args = message["args"]

# Re-enqueue the job to the target queue at another time as a NEW unit of work
# AND THEN mark this work as done, so SuperFetch doesn't think this instance is orphaned
# Technically, the job could processed twice if the process dies between the two lines,
# but your job should be idempotent anyway, right?
# The job running twice was already a risk with SuperFetch anyway and this doesn't really increase that risk.
Sidekiq::Client.enqueue_to_in(requeue_to, retry_in(work), Object.const_get(job_class), *job_args)
work.acknowledge
end

def retry_in(work)
message = JSON.parse(work.job)
jid = message.fetch("jid") { return false }
job_args = message["args"]

# Ask both concurrency and threshold, if relevant, how long minimum until we can retry.
# If we get two answers, take the longer one.
intervals = [@concurrency&.retry_in(jid, *job_args), @threshold&.retry_in(*job_args)].compact

raise "Cannot compute a valid retry interval" if intervals.empty?

interval = intervals.max

# Add a random amount of jitter, proportional to the length of the minimum retry time.
# This helps spread out jobs more evenly and avoid clumps of jobs on the queue.
interval += rand(interval / 5) if interval > 10

interval
end
end
end
end
10 changes: 10 additions & 0 deletions lib/sidekiq/throttled/strategy/concurrency.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,16 @@ def throttled?(jid, *job_args)
Sidekiq.redis { |redis| 1 == SCRIPT.call(redis, keys: keys, argv: argv) }
end

# @return [Float] How long, in seconds, before we'll next be able to take on jobs
def retry_in(_jid, *job_args)
job_limit = limit(job_args)
return 0.0 if !job_limit || count(*job_args) < job_limit

oldest_jid_with_score = Sidekiq.redis { |redis| redis.zrange(key(job_args), 0, 0, withscores: true) }.first
expiry_time = oldest_jid_with_score.last.to_f
expiry_time - Time.now.to_f
end

# @return [Integer] Current count of jobs
def count(*job_args)
Sidekiq.redis { |conn| conn.zcard(key(job_args)) }.to_i
Expand Down
19 changes: 19 additions & 0 deletions lib/sidekiq/throttled/strategy/threshold.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,25 @@ def throttled?(*job_args)
Sidekiq.redis { |redis| 1 == SCRIPT.call(redis, keys: keys, argv: argv) }
end

# @return [Float] How long, in seconds, before we'll next be able to take on jobs
def retry_in(*job_args)
job_limit = limit(job_args)
return 0.0 if !job_limit || count(*job_args) < job_limit

job_period = period(job_args)
job_key = key(job_args)
time_since_oldest = Time.now.to_f - Sidekiq.redis { |redis| redis.lindex(job_key, -1) }.to_f
if time_since_oldest > job_period
# The oldest job on our list is from more than the throttling period ago,
# which means we have not hit the limit this period.
0.0
else
# If we can only have X jobs every Y minutes, then wait until Y minutes have elapsed
# since the oldest job on our list.
job_period - time_since_oldest
end
end

# @return [Integer] Current count of jobs
def count(*job_args)
Sidekiq.redis { |conn| conn.llen(key(job_args)) }.to_i
Expand Down
5 changes: 5 additions & 0 deletions lib/sidekiq/throttled/strategy_collection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ def throttled?(...)
any? { |s| s.throttled?(...) }
end

# @return [Float] How long, in seconds, before we'll next be able to take on jobs
def retry_in(*args)
max { |s| s.retry_in(*args) }
end

# Marks job as being processed.
# @return [void]
def finalize!(...)
Expand Down
Loading

0 comments on commit 27ec05e

Please sign in to comment.