diff --git a/.gitignore b/.gitignore index 057afb3..4ce077b 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,4 @@ test/version_tmp tmp .idea .ruby-version +spec/internal/log/*.log diff --git a/lib/resque/integration.rb b/lib/resque/integration.rb index 613a5d2..e7df255 100644 --- a/lib/resque/integration.rb +++ b/lib/resque/integration.rb @@ -100,6 +100,11 @@ def unique? false end + # Public: job used priority queues + def priority? + false + end + # extend resque-retry # # options - Hash diff --git a/lib/resque/integration/engine.rb b/lib/resque/integration/engine.rb index 4ee942d..1c3beaa 100644 --- a/lib/resque/integration/engine.rb +++ b/lib/resque/integration/engine.rb @@ -86,6 +86,7 @@ class Engine < Rails::Engine initializer "resque-integration.extensions" do ::Resque::Worker.send :include, ::Resque::Integration::Extensions::Worker + ::Resque::Job.singleton_class.prepend(::Resque::Integration::Extensions::Job) end end # class Engine end # module Resque::Integration diff --git a/lib/resque/integration/extensions.rb b/lib/resque/integration/extensions.rb index fc90a76..9045014 100644 --- a/lib/resque/integration/extensions.rb +++ b/lib/resque/integration/extensions.rb @@ -2,6 +2,7 @@ module Resque module Integration module Extensions autoload :Worker, "resque/integration/extensions/worker" + autoload :Job, "resque/integration/extensions/job" end end end diff --git a/lib/resque/integration/extensions/job.rb b/lib/resque/integration/extensions/job.rb new file mode 100644 index 0000000..3a7e673 --- /dev/null +++ b/lib/resque/integration/extensions/job.rb @@ -0,0 +1,17 @@ +module Resque + module Integration + module Extensions + # Public: extension for proper determine queue + # when destroy job with priority + module Job + def destroy(queue, klass, *args) + if klass.respond_to?(:priority?) && klass.priority? + queue = klass.priority_queue(args.last) + end + + super + end + end + end + end +end diff --git a/lib/resque/integration/priority.rb b/lib/resque/integration/priority.rb index 5003afb..466b34d 100644 --- a/lib/resque/integration/priority.rb +++ b/lib/resque/integration/priority.rb @@ -8,18 +8,10 @@ module Integration # include Resque::Integration::Priority # # queue :foo - # - # def self.execute(*args) - # meta = get_meta - # - # heavy_lifting_work do - # meta[:count] += 1 - # end - # end # end # - # MyJob.enqueue(1, another_param: 2, queue_priority: high) # enqueue job to :foo_high queue - # MyJob.enqueue(1, another_param: 2, queue_priority: low) # enqueue job to :foo_low queue + # MyJob.enqueue_with_priority(:high, 1, another_param: 2) # enqueue job to :foo_high queue + # MyJob.enqueue_with_priority(:low, 1, another_param: 2) # enqueue job to :foo_low queue # # class MyUniqueJob # include Resque::Integration @@ -27,37 +19,72 @@ module Integration # # queue :foo # unique + # + # def self.execute(*args) + # meta = get_meta + # + # heavy_lifting_work do + # meta[:count] += 1 + # end + # end # end module Priority def self.included(base) - base.extend(Resque::Plugins::Meta) + base.extend(ClassMethods) base.singleton_class.prepend(Enqueue) end + module ClassMethods + def priority? + true + end + end + module Enqueue + # Public: enqueue job with normal priority + # + # Example: + # MyJob.enqueue(1) def enqueue(*args) - priority = args.last.delete(:queue_priority) { :normal }.to_sym - - priority_queue = priority == :normal ? queue : "#{queue}_#{priority}".to_sym + enqueue_with_priority(:normal, *args) + end + # Public: dequeue job with priority + # + # Example: + # MyJob.dequeue(:high, 1) + def dequeue(priority, *args) if unique? - enqueue_to(priority_queue, *args) + super(*args, priority) else - Resque::Plugins::Meta::Metadata.new('meta_id' => meta_id(args), 'job_class' => to_s).tap do |meta| - meta.save - Resque.enqueue_to(priority_queue, self, meta.meta_id, *args) - end + Resque.dequeue(self, *args, priority) end end - def perform(meta_id, *args) - @meta_id = meta_id + # Public: enqueue job to priority queue + # + # Example: + # MyJob.enqueue_with_priority(:high, 1) + def enqueue_with_priority(priority, *args) + queue = priority_queue(priority) + + if unique? + enqueue_to(queue, *args, priority) + else + Resque.enqueue_to(queue, self, *args, priority) + end + end - execute(*args) + def priority_queue(priority) + priority.to_sym == :normal ? queue : "#{queue}_#{priority}".to_sym end - def meta - get_meta(@meta_id) + def perform(*args, _priority) + if unique? + super(*args) + else + execute(*args) + end end end end diff --git a/lib/resque/integration/unique.rb b/lib/resque/integration/unique.rb index dc534f9..ff9753c 100644 --- a/lib/resque/integration/unique.rb +++ b/lib/resque/integration/unique.rb @@ -83,7 +83,7 @@ def lock_on(&block) # LockID should be independent from MetaID # @api private def lock(meta_id, *args) - args = [*args[0..-2], args.last.with_indifferent_access] if args.last.is_a?(Hash) + args = args.map { |i| i.is_a?(Hash) ? i.with_indifferent_access : i } "lock:#{name}-#{Digest::SHA1.hexdigest(obj_to_string(lock_on[*args]))}" end diff --git a/resque-integration.gemspec b/resque-integration.gemspec index 3d6f708..e568f17 100644 --- a/resque-integration.gemspec +++ b/resque-integration.gemspec @@ -41,5 +41,6 @@ Gem::Specification.new do |gem| gem.add_development_dependency 'simplecov' gem.add_development_dependency 'mock_redis' gem.add_development_dependency 'timecop' + gem.add_development_dependency 'combustion' gem.add_development_dependency 'pry-byebug' end diff --git a/spec/resque/integration/priority_spec.rb b/spec/resque/integration/priority_spec.rb index f1f3b80..0b369d3 100644 --- a/spec/resque/integration/priority_spec.rb +++ b/spec/resque/integration/priority_spec.rb @@ -16,7 +16,9 @@ class UniqueJobWithPriority include Resque::Integration::Priority queue :foo - unique + unique do |id, params| + [id, params["param"]] + end def self.execute(id, params) end @@ -24,9 +26,7 @@ def self.execute(id, params) describe '#enqueue' do it 'enqueue to priority queue' do - meta = JobWithPriority.enqueue(1, param: 'one', queue_priority: :high) - - expect(meta.meta_id).to be + JobWithPriority.enqueue_with_priority(:high, 1, param: 'one') expect(Resque.size(:foo)).to eq(0) expect(Resque.size(:foo_low)).to eq(0) @@ -42,8 +42,8 @@ def self.execute(id, params) end it 'enqueue only one job' do - meta1 = UniqueJobWithPriority.enqueue(1, param: 'one', queue_priority: :high) - meta2 = UniqueJobWithPriority.enqueue(1, param: 'one', queue_priority: :high) + meta1 = UniqueJobWithPriority.enqueue_with_priority(:high, 1, param: 'one') + meta2 = UniqueJobWithPriority.enqueue_with_priority(:high, 1, param: 'one') expect(meta1.meta_id).to eq(meta2.meta_id) @@ -53,6 +53,26 @@ def self.execute(id, params) end end + describe '#dequeue' do + it 'dequeue simple job with high priority' do + JobWithPriority.enqueue_with_priority(:high, 1, param: 'one') + JobWithPriority.enqueue_with_priority(:high, 2, param: 'two') + expect(Resque.size(:foo_high)).to eq(2) + + JobWithPriority.dequeue(:high, 1, param: 'one') + expect(Resque.size(:foo_high)).to eq(1) + end + + it 'dequeue unique job with high priority' do + UniqueJobWithPriority.enqueue_with_priority(:high, 1, param: 'one') + UniqueJobWithPriority.enqueue_with_priority(:high, 2, param: 'two') + expect(Resque.size(:foo_high)).to eq(2) + + UniqueJobWithPriority.dequeue(:high, 1, param: 'one') + expect(Resque.size(:foo_high)).to eq(1) + end + end + describe '#perform' do include_context 'resque inline' @@ -60,15 +80,15 @@ def self.execute(id, params) expect(JobWithPriority).to receive(:execute).with(1, 'param' => 'one').once.and_call_original expect(JobWithPriority).to receive(:execute).with(2, 'param' => 'two').once.and_call_original - JobWithPriority.enqueue(1, param: 'one', queue_priority: :high) - JobWithPriority.enqueue(2, param: 'two', queue_priority: :low) + JobWithPriority.enqueue_with_priority(:high, 1, param: 'one') + JobWithPriority.enqueue_with_priority(:high, 2, param: 'two') end it 'executes job' do - expect(UniqueJobWithPriority).to receive(:execute).with(1, 'param' => 'one').twice.and_call_original + expect(UniqueJobWithPriority).to receive(:execute).twice.and_call_original - UniqueJobWithPriority.enqueue(1, param: 'one', queue_priority: :high) - UniqueJobWithPriority.enqueue(1, param: 'one', queue_priority: :high) + UniqueJobWithPriority.enqueue_with_priority(:high, 1, param: 'one') + UniqueJobWithPriority.enqueue_with_priority(:high, 1, param: 'one') end end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 9238cf8..61a30fa 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -5,6 +5,8 @@ require 'simplecov' require 'mock_redis' require 'timecop' +require 'pry-byebug' +require 'combustion' Resque.redis = MockRedis.new @@ -12,6 +14,8 @@ require 'resque/integration' +Combustion.initialize! + Dir["./spec/shared/**/*.rb"].each(&method(:require)) RSpec.configure do |config|