Skip to content

Commit

Permalink
fix: false overall metric + refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
deniskorobicyn authored Sep 22, 2016
1 parent d3be980 commit eb438e4
Show file tree
Hide file tree
Showing 5 changed files with 198 additions and 74 deletions.
83 changes: 15 additions & 68 deletions lib/resque/integration/queues_info.rb
Original file line number Diff line number Diff line change
@@ -1,95 +1,42 @@
module Resque
module Integration
class QueuesInfo
autoload :Age, "resque/integration/queues_info/age"
autoload :Size, "resque/integration/queues_info/size"
autoload :Config, "resque/integration/queues_info/config"

def initialize(options)
config = load_config(options.fetch(:config))
@defaults = config['defaults']
@queues = expand_config(config['queues'])
@config = Config.new(options.fetch(:config))
@size = Size.new(@config)
@age = Age.new(@config)
end

def age_for_queue(queue)
job, max_time = find_longest_job { |job| job['queue'] == queue }

return 0 if job.nil? || max_time.nil?

max_time
@age.time(queue)
end

def age_overall
job, max_time = find_longest_job { |job| !job['queue'].nil? }

return 0 if job.nil? || max_time.nil?

max_time >= threshold_age(job['queue']) ? max_time : 0
@age.overall
end

def size_for_queue(queue = nil)
Resque.size(queue) || 0
def size_for_queue(queue)
@size.size(queue)
end

def size_overall
queue, size = Resque.queues.map do |queue|
[queue, Resque.size(queue).to_i]
end.max_by(&:last)

size >= threshold_size(queue) ? size : 0
@size.overall
end

def threshold_size(queue)
(@queues[queue] || @defaults)['max_size']
@config.max_size(queue)
end

def threshold_age(queue)
(@queues[queue] || @defaults)['max_age']
@config.max_age(queue)
end

def data
@data ||= @queues.map do |k, v|
{
"{#QUEUE}" => k
}
end
end

private

def find_longest_job
workers = Resque.workers
jobs = workers.map(&:job)

working_jobs = workers.zip(jobs).select do |w, j|
!w.idle? && yield(j)
end

working_jobs.map do |_, job|
[job, seconds_for(job)]
end.max_by(&:last)
end

def seconds_for(job)
(Time.now.utc - DateTime.strptime(job['run_at'], '%Y-%m-%dT%H:%M:%S').utc).to_i
end

def load_config(path)
input = File.read(path)
input = ERB.new(input).result if defined?(ERB)
YAML.load(input)
end

def expand_config(config)
keys = config.keys.dup

keys.each do |key|
v = config.delete(key)

key.split(',').each do |queue|
queue.chomp!
queue.strip!
config[queue] = v
end
end

config
@config.data
end
end
end
Expand Down
53 changes: 53 additions & 0 deletions lib/resque/integration/queues_info/age.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
module Resque
module Integration
class QueuesInfo
class Age
def initialize(config)
@config = config
end

def time(queue)
from_time = Time.now.utc
max_secs = 0

jobs.each do |job|
next unless job['queue'] == queue
job_secs = seconds_for(job, from_time)
max_secs = job_secs if job_secs > max_secs
end

max_secs
end

def overall
from_time = Time.now.utc

max_secs = 0

jobs.each do |job|
next unless job['queue']
job_secs = seconds_for(job, from_time)
next if job_secs < threshold(job['queue'])
max_secs = job_secs if job_secs > max_secs
end

max_secs
end

def threshold(queue)
@config.max_age(queue)
end

private

def jobs
Resque.workers.each_with_object([]) { |worker, memo| memo << worker.job unless worker.idle? }
end

def seconds_for(job, from_time)
(from_time - DateTime.strptime(job['run_at'], '%Y-%m-%dT%H:%M:%S').utc).to_i
end
end
end
end
end
57 changes: 57 additions & 0 deletions lib/resque/integration/queues_info/config.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
module Resque
module Integration
class QueuesInfo
class Config
def initialize(config_path)
config = load_config(config_path)
@defaults = config['defaults']
@queues = expand_config(config['queues'])
end

def max_age(queue)
threshold(queue, 'max_age')
end

def max_size(queue)
threshold(queue, 'max_size')
end

def data
@data ||= @queues.map do |k, v|
{
"{#QUEUE}" => k
}
end
end

private

def threshold(queue, param)
(@queues[queue] || @defaults)[param]
end

def load_config(path)
input = File.read(path)
input = ERB.new(input).result if defined?(ERB)
YAML.load(input)
end

def expand_config(config)
keys = config.keys.dup

keys.each do |key|
v = config.delete(key)

key.split(',').each do |queue|
queue.chomp!
queue.strip!
config[queue] = v
end
end

config
end
end
end
end
end
33 changes: 33 additions & 0 deletions lib/resque/integration/queues_info/size.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
module Resque
module Integration
class QueuesInfo
class Size
def initialize(config)
@config = config
end

def size(queue)
Resque.size(queue) || 0
end

def overall
max = 0

Resque.queues.each do |queue|
size = Resque.size(queue).to_i
next if size < threshold(queue)
max = size if size > max
end

max
end

private

def threshold(queue)
@config.max_size(queue)
end
end
end
end
end
46 changes: 40 additions & 6 deletions spec/resque/integration/queues_info_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
let(:workers) do
[
double(job: {'run_at' => 100.seconds.ago.utc.iso8601, 'queue' => 'first'}, 'idle?' => false),
double(job: {'run_at' => 20.seconds.ago.utc.iso8601, 'queue' => 'first'}, 'idle?' => false)
double(job: {'run_at' => 20.seconds.ago.utc.iso8601, 'queue' => 'second'}, 'idle?' => false)
]
end

Expand All @@ -87,7 +87,7 @@
let(:workers) do
[
double(job: {'run_at' => 4.seconds.ago.utc.iso8601, 'queue' => 'first'}, 'idle?' => false),
double(job: {'run_at' => 2.seconds.ago.utc.iso8601, 'queue' => 'first'}, 'idle?' => false)
double(job: {'run_at' => 2.seconds.ago.utc.iso8601, 'queue' => 'second'}, 'idle?' => false)
]
end

Expand All @@ -105,6 +105,31 @@
expect(queue_info.age_overall).to eq 11
end
end

context "when one job have problem and other with older age doesn't" do
let(:workers) do
[
double(job: {'run_at' => 16.seconds.ago.utc.iso8601, 'queue' => 'first'}, 'idle?' => false),
double(job: {'run_at' => 14.seconds.ago.utc.iso8601, 'queue' => 'second'}, 'idle?' => false)
]
end

it 'returns size for queue with problem' do
expect(queue_info.age_overall).to eq 14
end
end

context 'when there is no job running' do
let(:workers) do
[
double(job: nil, 'idle?' => true)
]
end

it 'returns 0' do
expect(queue_info.age_overall).to eq 0
end
end
end
end

Expand Down Expand Up @@ -150,16 +175,16 @@
let(:size_first) { 100 }
let(:size_second) { nil }

it 'returns time for job ' do
it 'returns size for queue with problem' do
expect(queue_info.size_overall).to eq 100
end
end

context 'when there is a several old jobs' do
context 'when there is a several problem queues' do
let(:size_first) { 1000 }
let(:size_second) { 200 }

it 'returns time for the lagrest queue ' do
it 'returns size for the lagrest queue ' do
expect(queue_info.size_overall).to eq 1000
end
end
Expand All @@ -177,10 +202,19 @@
let(:size_first) { nil }
let(:size_second) { 11 }

it 'checks time of job with defaults thresholds' do
it 'checks size of queue with defaults thresholds' do
expect(queue_info.size_overall).to eq 11
end
end

context "when one queue have problem and other with bigger size doesn't" do
let(:size_first) { 30 }
let(:size_second) { 20 }

it 'returns size for queue with problem' do
expect(queue_info.size_overall).to eq 20
end
end
end
end

Expand Down

0 comments on commit eb438e4

Please sign in to comment.