diff --git a/.rspec b/.rspec new file mode 100644 index 0000000..fa3fde7 --- /dev/null +++ b/.rspec @@ -0,0 +1,3 @@ +--color +--format progress +--order random \ No newline at end of file diff --git a/README.md b/README.md index c1352e8..7b9c197 100644 --- a/README.md +++ b/README.md @@ -40,10 +40,10 @@ class ResqueJobTest # это название очереди, в которой будет выполняться джою queue :my_queue - # с помощью lock_on можно указать, какие аргументы определяют уникальность задачи. + # с помощью unique можно указать, что задача является уникальной, и какие аргументы определяют уникальность задачи. # в данном случае не может быть двух одновременных задач ResqueJobTest с одинаковым первым аргументом # (второй аргумент может быть любым) - lock_on { |id, description| [id] } + unique { |id, description| [id] } # В отличие от обычных джобов resque, надо определять метод execute. # diff --git a/lib/resque/integration.rb b/lib/resque/integration.rb index f2837fc..0763361 100644 --- a/lib/resque/integration.rb +++ b/lib/resque/integration.rb @@ -1,24 +1,20 @@ # coding: utf-8 -require "resque/integration/version" +require 'resque/integration/version' -require "resque" +require 'resque' -require "rails/railtie" -require "rake" -require "resque-rails" +require 'rails/railtie' +require 'rake' +require 'resque-rails' -require "active_record" -require "resque-ensure-connected" +require 'active_record' +require 'resque-ensure-connected' -require "resque/plugins/lock" -require "resque/plugins/progress" +require 'active_support/concern' -require "active_support/concern" +require 'resque/integration/engine' -require "resque/integration/engine" -require "resque/integration/backtrace" - -require "active_support/core_ext/module/attribute_accessors" +require 'active_support/core_ext/module/attribute_accessors' module Resque # Resque.config is available now @@ -32,49 +28,25 @@ module Resque # include Resque::Integration # # queue :my_queue - # - # lock_on do |*args| - # [args.first] - # end - # + # unique ->(*args) { args.first } + # def self.execute(*args) # end # end module Integration - autoload :Application, "resque/integration/application" - autoload :CLI, "resque/integration/cli" - autoload :Configuration, "resque/integration/configuration" - autoload :Supervisor, "resque/integration/supervisor" + autoload :Application, 'resque/integration/application' + autoload :Backtrace, 'resque/integration/backtrace' + autoload :CLI, 'resque/integration/cli' + autoload :Configuration, 'resque/integration/configuration' + autoload :Supervisor, 'resque/integration/supervisor' + autoload :Unique, 'resque/integration/unique' extend ActiveSupport::Concern included do extend Backtrace - extend Resque::Plugins::Progress - extend Resque::Plugins::Lock @queue ||= :default - @lock_on ||= proc { |*args| args } - - # LockID is now independent from MetaID - # @api private - def self.lock(meta_id, *args) #:nodoc: - "lock:#{name}-#{@lock_on[*args].to_s}" - end - - # Overriding +meta_id+ here so now it generates the same MetaID for Jobs with same args - # @api private - def self.meta_id(*args) #:nodoc: - Digest::SHA1.hexdigest([ ::Rails.application.config.secret_token, self, @lock_on[*args] ].join) - end - - # Overriding +enqueue+ method here so now it returns existing metadata if job already queued - def self.enqueue(*args) #:nodoc: - meta = enqueued?(*args) and return meta - - # enqueue job and retrieve its meta - super - end end module ClassMethods @@ -83,42 +55,16 @@ def queue(name) @queue = name end - # Define a proc for lock key generation - def lock_on(&block) - raise ArgumentError, 'Block expected' unless block_given? - - @lock_on = block - end - - # Is job already in queue or in process? - def enqueued?(*args) - key = lock(nil, *args) - now = Time.now.to_i - - # if lock exists and timeout not exceeded - if Resque.redis.exists(key) && now <= Resque.redis.get(key).to_i - get_meta(meta_id(*args)) - else - nil - end - end - - # get meta object associated with job - def meta - get_meta(@meta_id) - end - - def perform(meta_id, *args) - execute(*args) - end + # Mark Job as unique and set given +callback+ or +block+ as Unique Arguments procedure + def unique(callback=nil, &block) + extend Unique - def execute(*) - raise NotImplementedError, "You should implement `execute' method" + lock_on(&(callback || block)) end - def on_failure_lock(e, *args) - Resque.redis.del(lock(*args)) + def unique? + false end end end # module Integration -end # module Resque +end # module Resque \ No newline at end of file diff --git a/lib/resque/integration/unique.rb b/lib/resque/integration/unique.rb new file mode 100644 index 0000000..7211c23 --- /dev/null +++ b/lib/resque/integration/unique.rb @@ -0,0 +1,140 @@ +# coding: utf-8 + +require 'active_support/core_ext/module/aliasing' + +require 'resque/plugins/lock' +require 'resque/plugins/progress' + +module Resque + module Integration + # Unique job + # + # @example + # class MyJob + # extend Resque::Integration::Unique + # + # # jobs are considered as equal if their first argument is the same + # lock_on { |*args| args.first } + # + # def self.execute(image_id) + # # do it + # end + # end + # + # MyJob.enqueue(11) + module Unique + SALT = 'f5db195354e682fc3389c086beed4f70'.freeze + + def self.extended(base) + base.extend(Resque::Plugins::Progress) + base.extend(Resque::Plugins::Lock) + base.extend(ClassMethods) + base.singleton_class.class_eval do + alias_method_chain :enqueue, :check + end + end + + module ClassMethods + # Returns true because job is unique now + def unique? + true + end + + # Get or set proc returning unique arguments + def lock_on(&block) + if block_given? + @unique = block + else + @unique ||= proc { |*args| args } + end + end + + # LockID should be independent from MetaID + # @api private + def lock(meta_id, *args) + "lock:#{name}-#{lock_on[*args].to_s}" + end + + # Overriding +meta_id+ here so now it generates the same MetaID for Jobs with same args + # @api private + def meta_id(*args) + Digest::SHA1.hexdigest([ secret_token, self, lock_on[*args] ].join) + end + + # get meta object associated with job + def meta + get_meta(@meta_id) + end + + # default `perform` method override + def perform(meta_id, *args) + execute(*args) + end + + def execute(*) + raise NotImplementedError, "You should implement `execute' method" + end + + # When job is failed we should remove lock + def on_failure_lock(e, *args) + Resque.redis.del(lock(*args)) + end + + # Before dequeue check if job is running + def before_dequeue_lock(meta_id, *args) + (meta = get_meta(meta_id)) && !meta.working? + end + + # When job is dequeued we should remove lock + def after_dequeue_lock(*args) + Resque.redis.del(lock(*args)) + end + + # Fail metadata if dequeue succeed + def after_dequeue_meta(meta_id, *args) + if (meta = get_meta(meta_id)) + meta.fail! + end + end + + # Is job already in queue or in process? + def enqueued?(*args) + # if lock exists and timeout not exceeded + if locked?(*args) + get_meta(meta_id(*args)) + else + nil + end + end + + # Returns true if resque job is in locked state + def locked?(*args) + key = lock(nil, *args) + now = Time.now.to_i + + Resque.redis.exists(key) && now <= Resque.redis.get(key).to_i + end + + # Dequeue unique job + def dequeue(*args) + Resque.dequeue(self, meta_id(*args), *args) + end + + # Overriding +enqueue+ method here so now it returns existing metadata if job already queued + def enqueue_with_check(*args) #:nodoc: + meta = enqueued?(*args) and return meta + + # enqueue job and retrieve its meta + enqueue_without_check(*args) + end + + private + def secret_token + ::Rails.respond_to?(:application) && + ::Rails.application && + ::Rails.application.config.secret_token + end + end + end # module Unique + end # module Integration +end # module Resque \ No newline at end of file diff --git a/resque-integration.gemspec b/resque-integration.gemspec index 201c3a4..96b003e 100644 --- a/resque-integration.gemspec +++ b/resque-integration.gemspec @@ -28,4 +28,10 @@ Gem::Specification.new do |gem| gem.add_runtime_dependency 'multi_json' gem.add_runtime_dependency 'rake' gem.add_runtime_dependency 'sinatra' + + gem.add_development_dependency 'rake' + gem.add_development_dependency 'bundler' + gem.add_development_dependency 'rspec' + gem.add_development_dependency 'simplecov' + gem.add_development_dependency 'mock_redis' end \ No newline at end of file diff --git a/spec/resque/integration/unique_spec.rb b/spec/resque/integration/unique_spec.rb new file mode 100644 index 0000000..08c556c --- /dev/null +++ b/spec/resque/integration/unique_spec.rb @@ -0,0 +1,86 @@ +# coding: utf-8 + +require 'spec_helper' + +describe Resque::Integration::Unique, '#meta_id, #lock' do + let(:job) { Class.new } + before { job.extend Resque::Integration::Unique } + + context 'when unique arguments are not set' do + it 'returns same results when equal arguments given' do + job.meta_id('a', 'b').should eq job.meta_id('a', 'b') + job.lock(nil, 'a', 'b').should eq job.lock(nil, 'a', 'b') + end + + it 'returns different results when different arguments given' do + job.meta_id('a', 'b').should_not eq job.meta_id('a', 'c') + job.lock(nil, 'a', 'b').should_not eq job.lock(nil, 'a', 'c') + end + end + + context 'when unique arguments are set' do + # mark second argument as unique + before { job.lock_on &->(a, b) { b } } + + it 'returns same results when equal arguments given' do + job.meta_id('a', 'b').should eq job.meta_id('a', 'b') + job.lock(nil, 'a', 'b').should eq job.lock(nil, 'a', 'b') + + job.meta_id('a', 'b').should eq job.meta_id('c', 'b') + job.lock(nil, 'a', 'b').should eq job.lock(nil, 'c', 'b') + end + + it 'returns different results when different arguments given' do + job.meta_id('a', 'b').should_not eq job.meta_id('a', 'c') + job.lock(nil, 'a', 'b').should_not eq job.lock(nil, 'a', 'c') + end + end +end + +describe Resque::Integration::Unique, '#enqueue, #enqueued?' do + class JobEnqueueTest + extend Resque::Integration::Unique + + @queue = :queue1 + end + + it 'returns false when job is not enqueued' do + JobEnqueueTest.should_not be_enqueued(0) + end + + it 'returns new meta when job is enqueued' do + JobEnqueueTest.should_receive(:enqueue_without_check).and_call_original + + meta = JobEnqueueTest.enqueue(1) + meta.should be_a Resque::Plugins::Meta::Metadata + + JobEnqueueTest.should be_enqueued(1) + end + + it 'returns the same meta if job already in queue' do + meta1 = JobEnqueueTest.enqueue(2) + meta2 = JobEnqueueTest.enqueue(2) + + meta1.meta_id.should eq meta2.meta_id + meta1.enqueued_at.should eq meta2.enqueued_at + + JobEnqueueTest.should be_enqueued(2) + end +end + +describe Resque::Integration::Unique, '#dequeue' do + class JobDequeueTest + extend Resque::Integration::Unique + + @queue = :queue2 + end + + it 'dequeues and unlocks job if job is not in work now' do + JobDequeueTest.enqueue(1) + JobDequeueTest.should be_enqueued(1) + + JobDequeueTest.dequeue(1) + JobDequeueTest.should_not be_enqueued(1) + JobDequeueTest.should_not be_locked(1) + end +end \ No newline at end of file diff --git a/spec/resque/integration_spec.rb b/spec/resque/integration_spec.rb new file mode 100644 index 0000000..e45b015 --- /dev/null +++ b/spec/resque/integration_spec.rb @@ -0,0 +1,20 @@ +# coding: utf-8 + +require 'spec_helper' + +describe Resque::Integration, '#unique?' do + let(:job) { Class.new } + before { job.send :include, Resque::Integration } + + subject { job } + + context 'when #unique is not called' do + it { should_not be_unique } + end + + context 'when #unique is called' do + before { job.unique } + + it { should be_unique } + end +end \ No newline at end of file diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb new file mode 100644 index 0000000..8d5d6b4 --- /dev/null +++ b/spec/spec_helper.rb @@ -0,0 +1,12 @@ +# coding: utf-8 +require 'bundler/setup' +require 'rspec' +require 'resque' +require 'simplecov' +require 'mock_redis' + +Resque.redis = MockRedis.new + +SimpleCov.start + +require 'resque/integration' \ No newline at end of file