Skip to content

Commit

Permalink
FEATURE: Add MiniScheduler::Manager.discover_running_scheduled_jobs
Browse files Browse the repository at this point in the history
… API (discourse#56)

This commit adds the `MiniScheduler::Manager.discover_running_scheduled_jobs`
method which returns an array of all currently running scheduled jobs
for the current host.
  • Loading branch information
tgxworld authored and dmke committed Nov 13, 2024
1 parent 5aecec3 commit 8bf7c5a
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 29 deletions.
6 changes: 4 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ on:
push:
branches:
- main
- sidekiq-7

jobs:
lint:
Expand Down Expand Up @@ -33,13 +34,14 @@ jobs:

services:
redis:
image: redis
image: ${{ matrix.redis }}
ports:
- 6379:6379

strategy:
matrix:
ruby: ["3.1", "3.2", "3.3"]
ruby: ["3.2", "3.3"]
redis: ["redis", "valkey/valkey"]

steps:
- uses: actions/checkout@v4
Expand Down
14 changes: 12 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
# next

- Remove support for Sidekiq < 7
- Update minimum Ruby version to 3.2

# 0.17.0 - 2024-08-06

- Add `MiniScheduler::Manager.discover_running_scheduled_jobs` API to allow running scheduled jobs to easily be discovered on the
current host.

# 0.16.0 - 2023-05-17

- Support Redis gem version 5
Expand All @@ -14,7 +24,7 @@
# 0.13.0 - 2020-11-30

- Fix exception code so it has parity with Sidekiq 4.2.3 and up, version bump cause
minimum version of Sikekiq changed.
minimum version of Sikekiq changed.

# 0.12.3 - 2020-10-15

Expand All @@ -35,7 +45,7 @@ minimum version of Sikekiq changed.
# 0.11.0 - 2019-06-24

- Correct situation where distributed mutex could end in a tight loop when
redis could not be contacted
redis could not be contacted

# 0.9.2 - 2019-04-26

Expand Down
12 changes: 1 addition & 11 deletions lib/mini_scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,6 @@ def self.configure
yield self
end

class SidekiqExceptionHandler
if defined?(Sidekiq::ExceptionHandler)
extend Sidekiq::ExceptionHandler
else
def self.handle_exception(exception, context)
Sidekiq.handle_exception(exception, context)
end
end
end

def self.job_exception_handler(&blk)
@job_exception_handler = blk if blk
@job_exception_handler
Expand All @@ -35,7 +25,7 @@ def self.handle_job_exception(ex, context = {})
if job_exception_handler
job_exception_handler.call(ex, context)
else
SidekiqExceptionHandler.handle_exception(ex, context)
Sidekiq.default_configuration.handle_exception(ex, context)
end
end

Expand Down
75 changes: 69 additions & 6 deletions lib/mini_scheduler/manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ def worker_thread_ids

def process_queue
klass = @queue.deq

# hack alert, I need to both deq and set @running atomically.
@running = true

Expand Down Expand Up @@ -249,12 +250,7 @@ def self.current
end

def hostname
@hostname ||=
begin
`hostname`.strip
rescue StandardError
"unknown"
end
@hostname ||= self.class.hostname
end

def schedule_info(klass)
Expand Down Expand Up @@ -393,6 +389,73 @@ def self.discover_schedules
schedules
end

def self.hostname
@hostname ||=
begin
require "socket"
Socket.gethostname
rescue => e
begin
`hostname`.strip
rescue => e
"unknown_host"
end
end
end

# Discover running scheduled jobs on the current host.
#
# @example
#
# MiniScheduler::Manager.discover_running_scheduled_jobs
#
# @return [Array<Hash>] an array of hashes representing the running scheduled jobs.
# @option job [Class] :class The class of the scheduled job.
# @option job [Time] :started_at The time when the scheduled job started.
# @option job [String] :thread_id The ID of the worker thread running the job.
# The thread can be identified by matching the `:mini_scheduler_worker_thread_id` thread variable with the ID.
def self.discover_running_scheduled_jobs
hostname = self.hostname

schedule_keys =
discover_schedules.reduce({}) do |acc, klass|
acc[klass] = if klass.is_per_host
self.schedule_key(klass, hostname)
else
self.schedule_key(klass)
end

acc
end

running_scheduled_jobs = []

schedule_keys
.keys
.zip(MiniScheduler.redis.mget(*schedule_keys.values))
.each do |scheduled_job_class, scheduled_job_info|
next if scheduled_job_info.nil?

parsed =
begin
JSON.parse(scheduled_job_info, symbolize_names: true)
rescue JSON::ParserError
nil
end

next if parsed.nil?
next if parsed[:prev_result] != "RUNNING"

running_scheduled_jobs << {
class: scheduled_job_class,
started_at: Time.at(parsed[:prev_run]),
thread_id: parsed[:current_owner],
}
end

running_scheduled_jobs
end

@class_mutex = Mutex.new
def self.seq
@class_mutex.synchronize do
Expand Down
2 changes: 1 addition & 1 deletion lib/mini_scheduler/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# frozen_string_literal: true

module MiniScheduler
VERSION = "0.16.0"
VERSION = "0.17.0"
end
4 changes: 2 additions & 2 deletions mini_scheduler.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ Gem::Specification.new do |spec|
spec.homepage = "https://github.com/discourse/mini_scheduler"
spec.license = "MIT"

spec.required_ruby_version = ">= 2.7.0"
spec.required_ruby_version = ">= 3.2.0"

spec.files = `git ls-files`.split($/).reject { |s| s =~ /^(spec|\.)/ }
spec.require_paths = ["lib"]

spec.add_runtime_dependency "sidekiq", ">= 4.2.3", "< 7.0"
spec.add_runtime_dependency "sidekiq", ">= 7.0", "< 8.0"

spec.add_development_dependency "pg", "~> 1.0"
spec.add_development_dependency "activesupport", "~> 7.0"
Expand Down
87 changes: 82 additions & 5 deletions spec/mini_scheduler/manager_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@ def perform
end
end

class SuperLongPerHostJob
extend ::MiniScheduler::Schedule

per_host
every 20.minutes

def perform
sleep 1000
end
end

class PerHostJob
extend ::MiniScheduler::Schedule

Expand Down Expand Up @@ -77,11 +88,16 @@ def perform
end

after do
manager.stop!
manager.remove(Testing::RandomJob)
manager.remove(Testing::SuperLongJob)
manager.remove(Testing::PerHostJob)
manager.remove(Testing::FailingJob)
ObjectSpace
.each_object(described_class)
.each do |manager|
manager.stop!
manager.remove(Testing::RandomJob)
manager.remove(Testing::SuperLongJob)
manager.remove(Testing::PerHostJob)
manager.remove(Testing::FailingJob)
manager.remove(Testing::SuperLongPerHostJob)
end

# connections that are not in use must be removed
# otherwise active record gets super confused
Expand Down Expand Up @@ -312,6 +328,67 @@ def queued_jobs(manager, with_hostname:)
end
end

describe ".discover_running_scheduled_jobs" do
let(:manager_1) { MiniScheduler::Manager.new(enable_stats: false) }
let(:manager_2) { MiniScheduler::Manager.new(enable_stats: false) }

before do
freeze_time

info = manager_1.schedule_info(Testing::SuperLongJob)
info.next_run = Time.now.to_i - 1
info.write!

manager_1.tick

info = manager_2.schedule_info(Testing::SuperLongPerHostJob)
info.next_run = Time.now.to_i - 1
info.write!

manager_2.tick

wait_for do
manager_1.schedule_info(Testing::SuperLongJob).prev_result == "RUNNING" &&
manager_2.schedule_info(Testing::SuperLongPerHostJob).prev_result == "RUNNING"
end
end

after do
manager_1.stop!
manager_2.stop!
end

it "returns running jobs on current host" do
jobs = described_class.discover_running_scheduled_jobs

expect(jobs.size).to eq(2)

super_long_job = jobs.find { |job| job[:class] == Testing::SuperLongJob }

expect(super_long_job.keys).to eq(%i[class started_at thread_id])
expect(super_long_job[:started_at]).to be_within(1).of(Time.now)
expect(super_long_job[:thread_id]).to start_with("_scheduler_#{manager.hostname}")

expect(
Thread.list.find do |thread|
thread[:mini_scheduler_worker_thread_id] == super_long_job[:thread_id]
end,
).to be_truthy

super_long_per_host_job = jobs.find { |job| job[:class] == Testing::SuperLongPerHostJob }

expect(super_long_per_host_job.keys).to eq(%i[class started_at thread_id])
expect(super_long_per_host_job[:started_at]).to be_within(1).of(Time.now)
expect(super_long_per_host_job[:thread_id]).to start_with("_scheduler_#{manager.hostname}")

expect(
Thread.list.find do |thread|
thread[:mini_scheduler_worker_thread_id] == super_long_per_host_job[:thread_id]
end,
).to be_truthy
end
end

describe "#next_run" do
it "should be within the next 5 mins if it never ran" do
manager.remove(Testing::RandomJob)
Expand Down

0 comments on commit 8bf7c5a

Please sign in to comment.