Skip to content

Commit

Permalink
feature: allow to use priority queues (#105)
Browse files Browse the repository at this point in the history
  • Loading branch information
artofhuman authored Jun 21, 2017
1 parent f74ae38 commit 2d6bcef
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 10 deletions.
1 change: 1 addition & 0 deletions lib/resque/integration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ module Integration
autoload :QueuesInfo, 'resque/integration/queues_info'
autoload :Extensions, 'resque/integration/extensions'
autoload :FailureBackends, 'resque/integration/failure_backends'
autoload :Priority, 'resque/integration/priority'

extend ActiveSupport::Concern

Expand Down
65 changes: 65 additions & 0 deletions lib/resque/integration/priority.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
module Resque
module Integration
# Public: job with priority queues
#
# Examples:
# class MyJob
# include Resque::Integration
# include Resque::Integration::Priority
#
# queue :foo
#
# def self.execute(*args)
# meta = get_meta
#
# heavy_lifting_work do
# meta[:count] += 1
# end
# end
# end
#
# MyJob.enqueue(1, another_param: 2, queue_priority: high) # enqueue job to :foo_high queue
# MyJob.enqueue(1, another_param: 2, queue_priority: low) # enqueue job to :foo_low queue
#
# class MyUniqueJob
# include Resque::Integration
# include Resque::Integration::Priority
#
# queue :foo
# unique
# end
module Priority
def self.included(base)
base.extend(Resque::Plugins::Meta)
base.singleton_class.prepend(Enqueue)
end

module Enqueue
def enqueue(*args)
priority = args.last.delete(:queue_priority) { :normal }.to_sym

priority_queue = priority == :normal ? queue : "#{queue}_#{priority}".to_sym

if unique?
enqueue_to(priority_queue, *args)
else
Resque::Plugins::Meta::Metadata.new('meta_id' => meta_id(args), 'job_class' => to_s).tap do |meta|
meta.save
Resque.enqueue_to(priority_queue, self, meta.meta_id, *args)
end
end
end

def perform(meta_id, *args)
@meta_id = meta_id

execute(*args)
end

def meta
get_meta(@meta_id)
end
end
end
end
end
2 changes: 1 addition & 1 deletion resque-integration.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Gem::Specification.new do |gem|
gem.add_runtime_dependency 'railties', '>= 3.0.0'
gem.add_runtime_dependency 'activerecord', '>= 3.0.0'
gem.add_runtime_dependency 'actionpack', '>= 3.0.0'
gem.add_runtime_dependency 'activesupport', '>= 3.0.0'
gem.add_runtime_dependency 'activesupport', '>= 3.0.0', '< 5'
gem.add_runtime_dependency 'resque-lock', '~> 1.1.0'
gem.add_runtime_dependency 'resque-meta', '>= 2.0.0'
gem.add_runtime_dependency 'resque-progress', '~> 1.0.1'
Expand Down
74 changes: 74 additions & 0 deletions spec/resque/integration/priority_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
require 'spec_helper'

RSpec.describe Resque::Integration::Priority do
class JobWithPriority
include Resque::Integration
include Resque::Integration::Priority

queue :foo

def self.execute(id, params)
end
end

class UniqueJobWithPriority
include Resque::Integration
include Resque::Integration::Priority

queue :foo
unique

def self.execute(id, params)
end
end

describe '#enqueue' do
it 'enqueue to priority queue' do
meta = JobWithPriority.enqueue(1, param: 'one', queue_priority: :high)

expect(meta.meta_id).to be

expect(Resque.size(:foo)).to eq(0)
expect(Resque.size(:foo_low)).to eq(0)
expect(Resque.size(:foo_high)).to eq(1)
end

it 'enqueue to default queue' do
JobWithPriority.enqueue(1, param: 'one')

expect(Resque.size(:foo)).to eq(1)
expect(Resque.size(:foo_low)).to eq(0)
expect(Resque.size(:foo_high)).to eq(0)
end

it 'enqueue only one job' do
meta1 = UniqueJobWithPriority.enqueue(1, param: 'one', queue_priority: :high)
meta2 = UniqueJobWithPriority.enqueue(1, param: 'one', queue_priority: :high)

expect(meta1.meta_id).to eq(meta2.meta_id)

expect(Resque.size(:foo)).to eq(0)
expect(Resque.size(:foo_low)).to eq(0)
expect(Resque.size(:foo_high)).to eq(1)
end
end

describe '#perform' do
include_context 'resque inline'

it 'executes job' do
expect(JobWithPriority).to receive(:execute).with(1, 'param' => 'one').once.and_call_original
expect(JobWithPriority).to receive(:execute).with(2, 'param' => 'two').once.and_call_original

JobWithPriority.enqueue(1, param: 'one', queue_priority: :high)
JobWithPriority.enqueue(2, param: 'two', queue_priority: :low)
end

it 'executes job' do
expect(UniqueJobWithPriority).to receive(:execute).with(1, 'param' => 'one').twice.and_call_original

UniqueJobWithPriority.enqueue(1, param: 'one', queue_priority: :high)
UniqueJobWithPriority.enqueue(1, param: 'one', queue_priority: :high)
end
end
end
11 changes: 2 additions & 9 deletions spec/resque/integration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ def self.execute(id, params)
end

describe 'unlock' do
include_context 'resque inline'

class UniqueJobWithBlock
include Resque::Integration

Expand All @@ -80,15 +82,6 @@ def self.execute(id, params)
end
end

around do |example|
inline = Resque.inline
Resque.inline = true

example.run

Resque.inline = inline
end

it 'unlocks uniq job with args and without block' do
expect(DummyService).to receive(:call).twice

Expand Down
10 changes: 10 additions & 0 deletions spec/shared/resque_inline.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
RSpec.shared_context 'resque inline' do
around do |example|
inline = Resque.inline
Resque.inline = true

example.run

Resque.inline = inline
end
end
2 changes: 2 additions & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

require 'resque/integration'

Dir["./spec/shared/**/*.rb"].each(&method(:require))

RSpec.configure do |config|
config.before do
Resque.redis.flushdb
Expand Down

0 comments on commit 2d6bcef

Please sign in to comment.