Skip to content

Commit

Permalink
Merge pull request #63 from bibendi/master
Browse files Browse the repository at this point in the history
feature: add ordered jobs
  • Loading branch information
bibendi committed Sep 11, 2015
2 parents 29e3085 + 1daa167 commit 501f8df
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 2 deletions.
12 changes: 11 additions & 1 deletion lib/resque/integration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ module Integration
autoload :Configuration, 'resque/integration/configuration'
autoload :Continuous, 'resque/integration/continuous'
autoload :Unique, 'resque/integration/unique'
autoload :Ordered, 'resque/integration/ordered'
autoload :LogsRotator, 'resque/integration/logs_rotator'

extend ActiveSupport::Concern
Expand All @@ -72,7 +73,7 @@ def queue(name = nil)
end

# Mark Job as unique and set given +callback+ or +block+ as Unique Arguments procedure
def unique(callback=nil, &block)
def unique(callback = nil, &block)
extend Unique

lock_on(&(callback || block))
Expand Down Expand Up @@ -104,6 +105,15 @@ def retrys(options = {})
@retry_limit = options.fetch(:limit, 2)
@retry_delay = options.fetch(:delay, 60)
end

# Mark Job as ordered
def ordered(options = {})
unique unless unique?
continuous
extend Ordered

self.max_iterations = options.fetch(:max_iterations, 20)
end
end
end # module Integration
end # module Resque
67 changes: 67 additions & 0 deletions lib/resque/integration/ordered.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Ordered Job
#
# Ensures that only one job for a given queue
# will be running on any worker at a given time
#
# Examples:
#
# class TestJob
# include Resque::Integration
#
# unique { |company_id, param1| [company_id] }
# ordered max_iterations: 10
#
# def self.execute(company_id, param1)
# heavy_lifting_work
# end
# end
#
module Resque
module Integration
module Ordered
ARGS_EXPIRATION = 1.week

def self.extended(base)
base.extend ClassMethods

base.singleton_class.class_eval do
attr_accessor :max_iterations

alias_method_chain :enqueue, :ordered
end
end

module ClassMethods
def enqueue_with_ordered(*args)
meta = enqueue_without_ordered(*args)

encoded_args = Resque.encode(args)
args_key = ordered_queue_key(meta.meta_id)
Resque.redis.rpush(args_key, encoded_args)
Resque.redis.expire(args_key, ARGS_EXPIRATION)

meta
end

def perform(meta_id, *)
args_key = ordered_queue_key(meta_id)
i = 1
while job_args = Resque.redis.lpop(args_key)
execute(*Resque.decode(job_args))

i += 1
return continue if max_iterations && i > max_iterations
end
end

def ordered_queue_size(meta_id)
Resque.redis.llen(ordered_queue_key(meta_id)).to_i
end

def ordered_queue_key(meta_id)
"ordered:#{meta_id}"
end
end
end
end
end
41 changes: 41 additions & 0 deletions spec/resque/integration/ordered_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
require "spec_helper"

describe Resque::Integration::Ordered do
class TestJob
include Resque::Integration

unique { |company_id, param1| [company_id] }
ordered max_iterations: 2
end

it "push args to separate queue" do
meta = TestJob.enqueue(1, 10)
TestJob.enqueue(1, 20)

args_key = TestJob.ordered_queue_key(meta.meta_id)
expect(TestJob).to be_enqueued(1)
expect(TestJob.ordered_queue_size(meta.meta_id)).to eq 2
end

it "execute jobs by each args" do
meta = TestJob.enqueue(1, 10)
TestJob.enqueue(1, 20)

expect(TestJob).to receive(:execute).with(1, 10).ordered
expect(TestJob).to receive(:execute).with(1, 20).ordered

TestJob.perform(meta.meta_id)
end

it "reenqueue job after max iterations reached" do
meta = TestJob.enqueue(1, 10)
TestJob.enqueue(1, 20)
TestJob.enqueue(1, 30)

expect(TestJob).to receive(:execute).with(1, 10).ordered
expect(TestJob).to receive(:execute).with(1, 20).ordered
expect(TestJob).to_not receive(:execute).with(1, 30).ordered

TestJob.perform(meta.meta_id)
end
end
8 changes: 7 additions & 1 deletion spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,10 @@

SimpleCov.start

require 'resque/integration'
require 'resque/integration'

RSpec.configure do |config|
config.before do
Resque.redis.flushdb
end
end

0 comments on commit 501f8df

Please sign in to comment.