Skip to content

Commit

Permalink
Uniquiness functions extracted to separate module
Browse files Browse the repository at this point in the history
  • Loading branch information
take-five committed Apr 1, 2013
1 parent f9c47e6 commit 5a7c801
Show file tree
Hide file tree
Showing 8 changed files with 294 additions and 81 deletions.
3 changes: 3 additions & 0 deletions .rspec
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
--color
--format progress
--order random
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ class ResqueJobTest
# это название очереди, в которой будет выполняться джою
queue :my_queue

# с помощью lock_on можно указать, какие аргументы определяют уникальность задачи.
# с помощью unique можно указать, что задача является уникальной, и какие аргументы определяют уникальность задачи.
# в данном случае не может быть двух одновременных задач ResqueJobTest с одинаковым первым аргументом
# (второй аргумент может быть любым)
lock_on { |id, description| [id] }
unique { |id, description| [id] }

# В отличие от обычных джобов resque, надо определять метод execute.
#
Expand Down
104 changes: 25 additions & 79 deletions lib/resque/integration.rb
Original file line number Diff line number Diff line change
@@ -1,24 +1,20 @@
# coding: utf-8
require "resque/integration/version"
require 'resque/integration/version'

require "resque"
require 'resque'

require "rails/railtie"
require "rake"
require "resque-rails"
require 'rails/railtie'
require 'rake'
require 'resque-rails'

require "active_record"
require "resque-ensure-connected"
require 'active_record'
require 'resque-ensure-connected'

require "resque/plugins/lock"
require "resque/plugins/progress"
require 'active_support/concern'

require "active_support/concern"
require 'resque/integration/engine'

require "resque/integration/engine"
require "resque/integration/backtrace"

require "active_support/core_ext/module/attribute_accessors"
require 'active_support/core_ext/module/attribute_accessors'

module Resque
# Resque.config is available now
Expand All @@ -32,49 +28,25 @@ module Resque
# include Resque::Integration
#
# queue :my_queue
#
# lock_on do |*args|
# [args.first]
# end
#
# unique ->(*args) { args.first }

# def self.execute(*args)
# end
# end
module Integration
autoload :Application, "resque/integration/application"
autoload :CLI, "resque/integration/cli"
autoload :Configuration, "resque/integration/configuration"
autoload :Supervisor, "resque/integration/supervisor"
autoload :Application, 'resque/integration/application'
autoload :Backtrace, 'resque/integration/backtrace'
autoload :CLI, 'resque/integration/cli'
autoload :Configuration, 'resque/integration/configuration'
autoload :Supervisor, 'resque/integration/supervisor'
autoload :Unique, 'resque/integration/unique'

extend ActiveSupport::Concern

included do
extend Backtrace
extend Resque::Plugins::Progress
extend Resque::Plugins::Lock

@queue ||= :default
@lock_on ||= proc { |*args| args }

# LockID is now independent from MetaID
# @api private
def self.lock(meta_id, *args) #:nodoc:
"lock:#{name}-#{@lock_on[*args].to_s}"
end

# Overriding +meta_id+ here so now it generates the same MetaID for Jobs with same args
# @api private
def self.meta_id(*args) #:nodoc:
Digest::SHA1.hexdigest([ ::Rails.application.config.secret_token, self, @lock_on[*args] ].join)
end

# Overriding +enqueue+ method here so now it returns existing metadata if job already queued
def self.enqueue(*args) #:nodoc:
meta = enqueued?(*args) and return meta

# enqueue job and retrieve its meta
super
end
end

module ClassMethods
Expand All @@ -83,42 +55,16 @@ def queue(name)
@queue = name
end

# Define a proc for lock key generation
def lock_on(&block)
raise ArgumentError, 'Block expected' unless block_given?

@lock_on = block
end

# Is job already in queue or in process?
def enqueued?(*args)
key = lock(nil, *args)
now = Time.now.to_i

# if lock exists and timeout not exceeded
if Resque.redis.exists(key) && now <= Resque.redis.get(key).to_i
get_meta(meta_id(*args))
else
nil
end
end

# get meta object associated with job
def meta
get_meta(@meta_id)
end

def perform(meta_id, *args)
execute(*args)
end
# Mark Job as unique and set given +callback+ or +block+ as Unique Arguments procedure
def unique(callback=nil, &block)
extend Unique

def execute(*)
raise NotImplementedError, "You should implement `execute' method"
lock_on(&(callback || block))
end

def on_failure_lock(e, *args)
Resque.redis.del(lock(*args))
def unique?
false
end
end
end # module Integration
end # module Resque
end # module Resque
140 changes: 140 additions & 0 deletions lib/resque/integration/unique.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
# coding: utf-8

require 'active_support/core_ext/module/aliasing'

require 'resque/plugins/lock'
require 'resque/plugins/progress'

module Resque
module Integration
# Unique job
#
# @example
# class MyJob
# extend Resque::Integration::Unique
#
# # jobs are considered as equal if their first argument is the same
# lock_on { |*args| args.first }
#
# def self.execute(image_id)
# # do it
# end
# end
#
# MyJob.enqueue(11)
module Unique
SALT = 'f5db195354e682fc3389c086beed4f70'.freeze

def self.extended(base)
base.extend(Resque::Plugins::Progress)
base.extend(Resque::Plugins::Lock)
base.extend(ClassMethods)
base.singleton_class.class_eval do
alias_method_chain :enqueue, :check
end
end

module ClassMethods
# Returns true because job is unique now
def unique?
true
end

# Get or set proc returning unique arguments
def lock_on(&block)
if block_given?
@unique = block
else
@unique ||= proc { |*args| args }
end
end

# LockID should be independent from MetaID
# @api private
def lock(meta_id, *args)
"lock:#{name}-#{lock_on[*args].to_s}"
end

# Overriding +meta_id+ here so now it generates the same MetaID for Jobs with same args
# @api private
def meta_id(*args)
Digest::SHA1.hexdigest([ secret_token, self, lock_on[*args] ].join)
end

# get meta object associated with job
def meta
get_meta(@meta_id)
end

# default `perform` method override
def perform(meta_id, *args)
execute(*args)
end

def execute(*)
raise NotImplementedError, "You should implement `execute' method"
end

# When job is failed we should remove lock
def on_failure_lock(e, *args)
Resque.redis.del(lock(*args))
end

# Before dequeue check if job is running
def before_dequeue_lock(meta_id, *args)
(meta = get_meta(meta_id)) && !meta.working?
end

# When job is dequeued we should remove lock
def after_dequeue_lock(*args)
Resque.redis.del(lock(*args))
end

# Fail metadata if dequeue succeed
def after_dequeue_meta(meta_id, *args)
if (meta = get_meta(meta_id))
meta.fail!
end
end

# Is job already in queue or in process?
def enqueued?(*args)
# if lock exists and timeout not exceeded
if locked?(*args)
get_meta(meta_id(*args))
else
nil
end
end

# Returns true if resque job is in locked state
def locked?(*args)
key = lock(nil, *args)
now = Time.now.to_i

Resque.redis.exists(key) && now <= Resque.redis.get(key).to_i
end

# Dequeue unique job
def dequeue(*args)
Resque.dequeue(self, meta_id(*args), *args)
end

# Overriding +enqueue+ method here so now it returns existing metadata if job already queued
def enqueue_with_check(*args) #:nodoc:
meta = enqueued?(*args) and return meta

# enqueue job and retrieve its meta
enqueue_without_check(*args)
end

private
def secret_token
::Rails.respond_to?(:application) &&
::Rails.application &&
::Rails.application.config.secret_token
end
end
end # module Unique
end # module Integration
end # module Resque
6 changes: 6 additions & 0 deletions resque-integration.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,10 @@ Gem::Specification.new do |gem|
gem.add_runtime_dependency 'multi_json'
gem.add_runtime_dependency 'rake'
gem.add_runtime_dependency 'sinatra'

gem.add_development_dependency 'rake'
gem.add_development_dependency 'bundler'
gem.add_development_dependency 'rspec'
gem.add_development_dependency 'simplecov'
gem.add_development_dependency 'mock_redis'
end
Loading

0 comments on commit 5a7c801

Please sign in to comment.