From 528feabfbfaf3feb14120c8b0c74acd3ae3b6225 Mon Sep 17 00:00:00 2001 From: terentev Date: Thu, 2 Aug 2018 20:14:43 +0500 Subject: [PATCH] feat: adds ability for retry jobs after temporary errors https://jira.railsc.ru/browse/GOODS-1418 --- .drone.yml | 2 +- lib/resque/integration.rb | 37 ++++++++++--------- lib/resque/integration/engine.rb | 7 +++- .../app/jobs/retries_argument_error_job.rb | 10 +++++ spec/internal/app/jobs/retries_job.rb | 10 +++++ .../app/jobs/retries_standard_error_job.rb | 10 +++++ .../app/services/dummy_for_retries_service.rb | 5 +++ spec/resque/integration_spec.rb | 32 +++++++++++++++- 8 files changed, 92 insertions(+), 21 deletions(-) create mode 100644 spec/internal/app/jobs/retries_argument_error_job.rb create mode 100644 spec/internal/app/jobs/retries_job.rb create mode 100644 spec/internal/app/jobs/retries_standard_error_job.rb create mode 100644 spec/internal/app/services/dummy_for_retries_service.rb diff --git a/.drone.yml b/.drone.yml index 2cba2e3..ee6de3a 100644 --- a/.drone.yml +++ b/.drone.yml @@ -10,7 +10,7 @@ build: - COMPOSE_FILE_EXT=drone - RUBY_IMAGE_TAG=2.2-latest commands: - - wrapdocker docker -v + - prepare-build - fetch-images --image abakpress/ruby-app:$RUBY_IMAGE_TAG - dip provision diff --git a/lib/resque/integration.rb b/lib/resque/integration.rb index 1bf4f94..0771f42 100644 --- a/lib/resque/integration.rb +++ b/lib/resque/integration.rb @@ -100,33 +100,34 @@ def priority? # Extend resque-retry. # - # options - Hash + # options - Hash of retry options (default: {}): + # :limit - Integer max number of retry attempts (default: 2) + # :delay - Integer seconds between retry attempts (default: 60) + # :exceptions - Array or Hash of specific exceptions to retry (optional) + # :temporary - boolean retry on temporary exceptions list (default: false) + # :expire_retry_key_after - Integer expire of retry key in redis (default: 3200) # - # :limit - Integer (default: 2) - # :delay - Integer (default: 60) - # :expire_retry_key_after - Integer (default: 3200), истечение ключа в секундах. Если - # - # t - среднее время выполнения одного джоба - # n - текущее кол-во джобов в очереди - # k - кол-во воркеров - # - # то expire_retry_key_after >= t * n / k - # - # Иначе ключ истечет, прежде чем джоб отработает. - # - # - # Returns nothing + # Returns nothing. def retrys(options = {}) - if unique? - raise '`retrys` should be declared higher in code than `unique`' - end + raise '`retries` should be declared higher in code than `unique`' if unique? extend Resque::Plugins::Retry @retry_limit = options.fetch(:limit, 2) @retry_delay = options.fetch(:delay, 60) + + @retry_exceptions = options[:exceptions] if options.key? :exceptions + + if options[:temporary] + @retry_exceptions = @retry_exceptions && @retry_exceptions.dup || {} + @retry_exceptions = @retry_exceptions.product([@retry_delay]).to_h if @retry_exceptions.is_a? Array + + @retry_exceptions.reverse_merge!(Rails.application.config.temporary_exceptions) + end + @expire_retry_key_after = options.fetch(:expire_retry_key_after, 1.hour.seconds) end + alias retries retrys # Mark Job as ordered def ordered(options = {}) diff --git a/lib/resque/integration/engine.rb b/lib/resque/integration/engine.rb index dedf0e9..76c6af2 100644 --- a/lib/resque/integration/engine.rb +++ b/lib/resque/integration/engine.rb @@ -73,7 +73,7 @@ class Engine < Rails::Engine end # Глушим ошибки, по которым происходит автоматический перезапуск - initializer 'resque-integration.retrys' do + initializer 'resque-integration.retrys' do |app| require 'resque/failure' require 'resque/failure/redis' @@ -89,6 +89,11 @@ class Engine < Rails::Engine end Resque::Failure.backend = Resque::Failure::MultipleWithRetrySuppression + # Hash - temporary exceptions + # + # example: + # {PG::TRDeadlockDetected => 20, Net::OpenTimeout => 123} + app.config.temporary_exceptions = {} end initializer "resque-integration.extensions" do diff --git a/spec/internal/app/jobs/retries_argument_error_job.rb b/spec/internal/app/jobs/retries_argument_error_job.rb new file mode 100644 index 0000000..5d7b00f --- /dev/null +++ b/spec/internal/app/jobs/retries_argument_error_job.rb @@ -0,0 +1,10 @@ +class RetriesArgumentErrorJob + include Resque::Integration + + retries temporary: true, exceptions: [ArgumentError] + queue :test_retries + + def self.execute + DummyForRetriesService.call + end +end diff --git a/spec/internal/app/jobs/retries_job.rb b/spec/internal/app/jobs/retries_job.rb new file mode 100644 index 0000000..a78cf7a --- /dev/null +++ b/spec/internal/app/jobs/retries_job.rb @@ -0,0 +1,10 @@ +class RetriesJob + include Resque::Integration + + retries + queue :test_retries + + def self.execute + DummyForRetriesService.call + end +end diff --git a/spec/internal/app/jobs/retries_standard_error_job.rb b/spec/internal/app/jobs/retries_standard_error_job.rb new file mode 100644 index 0000000..8d137c2 --- /dev/null +++ b/spec/internal/app/jobs/retries_standard_error_job.rb @@ -0,0 +1,10 @@ +class RetriesStandardErrorJob + include Resque::Integration + + retries temporary: true, exceptions: [StandardError] + queue :test_retries + + def self.execute + DummyForRetriesService.call + end +end diff --git a/spec/internal/app/services/dummy_for_retries_service.rb b/spec/internal/app/services/dummy_for_retries_service.rb new file mode 100644 index 0000000..6589245 --- /dev/null +++ b/spec/internal/app/services/dummy_for_retries_service.rb @@ -0,0 +1,5 @@ +class DummyForRetriesService + def self.call + raise StandardError.new('test') + end +end diff --git a/spec/resque/integration_spec.rb b/spec/resque/integration_spec.rb index 5e4be14..bc98b87 100644 --- a/spec/resque/integration_spec.rb +++ b/spec/resque/integration_spec.rb @@ -1,4 +1,3 @@ -# coding: utf-8 require 'spec_helper' RSpec.describe Resque::Integration do @@ -102,4 +101,35 @@ def self.execute(id, params) end end end + + describe 'retries' do + context 'with default params' do + let(:job) { Resque::Job.new(:test_retries, 'class' => 'RetriesJob', 'args' => ['meta']) } + + it do + expect { job.perform }.to raise_error StandardError + expect(Resque.delayed_queue_schedule_size).to eq 1 + end + end + + context 'with custom params' do + context 'with StandardError in exceptions' do + let(:job) { Resque::Job.new(:test_retries, 'class' => 'RetriesStandardErrorJob', 'args' => ['meta']) } + + it do + expect { job.perform }.to raise_error StandardError + expect(Resque.delayed_queue_schedule_size).to eq 1 + end + end + + context 'with ArgumentError in exceptions' do + let(:job) { Resque::Job.new(:test_retries, 'class' => 'RetriesArgumentErrorJob', 'args' => ['meta']) } + + it do + expect { job.perform }.to raise_error StandardError + expect(Resque.delayed_queue_schedule_size).to eq 0 + end + end + end + end end