diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3c1726e..9f4f0c5 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -5,6 +5,7 @@ on: push: branches: - main + - sidekiq-7 jobs: lint: @@ -33,13 +34,14 @@ jobs: services: redis: - image: redis + image: ${{ matrix.redis }} ports: - 6379:6379 strategy: matrix: - ruby: ["3.1", "3.2", "3.3"] + ruby: ["3.2", "3.3"] + redis: ["redis", "valkey/valkey"] steps: - uses: actions/checkout@v4 diff --git a/CHANGELOG.md b/CHANGELOG.md index 7372c31..df54bf5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,13 @@ +# next + +- Remove support for Sidekiq < 7 +- Update minimum Ruby version to 3.2 + +# 0.17.0 - 2024-08-06 + +- Add `MiniScheduler::Manager.discover_running_scheduled_jobs` API to allow running scheduled jobs to easily be discovered on the + current host. + # 0.16.0 - 2023-05-17 - Support Redis gem version 5 @@ -14,7 +24,7 @@ # 0.13.0 - 2020-11-30 - Fix exception code so it has parity with Sidekiq 4.2.3 and up, version bump cause -minimum version of Sikekiq changed. + minimum version of Sikekiq changed. # 0.12.3 - 2020-10-15 @@ -35,7 +45,7 @@ minimum version of Sikekiq changed. # 0.11.0 - 2019-06-24 - Correct situation where distributed mutex could end in a tight loop when - redis could not be contacted + redis could not be contacted # 0.9.2 - 2019-04-26 diff --git a/lib/mini_scheduler.rb b/lib/mini_scheduler.rb index fbf217b..2e3bde6 100644 --- a/lib/mini_scheduler.rb +++ b/lib/mini_scheduler.rb @@ -16,16 +16,6 @@ def self.configure yield self end - class SidekiqExceptionHandler - if defined?(Sidekiq::ExceptionHandler) - extend Sidekiq::ExceptionHandler - else - def self.handle_exception(exception, context) - Sidekiq.handle_exception(exception, context) - end - end - end - def self.job_exception_handler(&blk) @job_exception_handler = blk if blk @job_exception_handler @@ -35,7 +25,7 @@ def self.handle_job_exception(ex, context = {}) if job_exception_handler job_exception_handler.call(ex, context) else - SidekiqExceptionHandler.handle_exception(ex, context) + Sidekiq.default_configuration.handle_exception(ex, context) end end diff --git a/lib/mini_scheduler/manager.rb b/lib/mini_scheduler/manager.rb index 3b4de7b..1b1cd4a 100644 --- a/lib/mini_scheduler/manager.rb +++ b/lib/mini_scheduler/manager.rb @@ -106,6 +106,7 @@ def worker_thread_ids def process_queue klass = @queue.deq + # hack alert, I need to both deq and set @running atomically. @running = true @@ -249,12 +250,7 @@ def self.current end def hostname - @hostname ||= - begin - `hostname`.strip - rescue StandardError - "unknown" - end + @hostname ||= self.class.hostname end def schedule_info(klass) @@ -393,6 +389,73 @@ def self.discover_schedules schedules end + def self.hostname + @hostname ||= + begin + require "socket" + Socket.gethostname + rescue => e + begin + `hostname`.strip + rescue => e + "unknown_host" + end + end + end + + # Discover running scheduled jobs on the current host. + # + # @example + # + # MiniScheduler::Manager.discover_running_scheduled_jobs + # + # @return [Array] an array of hashes representing the running scheduled jobs. + # @option job [Class] :class The class of the scheduled job. + # @option job [Time] :started_at The time when the scheduled job started. + # @option job [String] :thread_id The ID of the worker thread running the job. + # The thread can be identified by matching the `:mini_scheduler_worker_thread_id` thread variable with the ID. + def self.discover_running_scheduled_jobs + hostname = self.hostname + + schedule_keys = + discover_schedules.reduce({}) do |acc, klass| + acc[klass] = if klass.is_per_host + self.schedule_key(klass, hostname) + else + self.schedule_key(klass) + end + + acc + end + + running_scheduled_jobs = [] + + schedule_keys + .keys + .zip(MiniScheduler.redis.mget(*schedule_keys.values)) + .each do |scheduled_job_class, scheduled_job_info| + next if scheduled_job_info.nil? + + parsed = + begin + JSON.parse(scheduled_job_info, symbolize_names: true) + rescue JSON::ParserError + nil + end + + next if parsed.nil? + next if parsed[:prev_result] != "RUNNING" + + running_scheduled_jobs << { + class: scheduled_job_class, + started_at: Time.at(parsed[:prev_run]), + thread_id: parsed[:current_owner], + } + end + + running_scheduled_jobs + end + @class_mutex = Mutex.new def self.seq @class_mutex.synchronize do diff --git a/lib/mini_scheduler/version.rb b/lib/mini_scheduler/version.rb index d251a82..bc91acd 100644 --- a/lib/mini_scheduler/version.rb +++ b/lib/mini_scheduler/version.rb @@ -1,5 +1,5 @@ # frozen_string_literal: true module MiniScheduler - VERSION = "0.16.0" + VERSION = "0.17.0" end diff --git a/mini_scheduler.gemspec b/mini_scheduler.gemspec index 67e7423..ed97d55 100644 --- a/mini_scheduler.gemspec +++ b/mini_scheduler.gemspec @@ -15,12 +15,12 @@ Gem::Specification.new do |spec| spec.homepage = "https://github.com/discourse/mini_scheduler" spec.license = "MIT" - spec.required_ruby_version = ">= 2.7.0" + spec.required_ruby_version = ">= 3.2.0" spec.files = `git ls-files`.split($/).reject { |s| s =~ /^(spec|\.)/ } spec.require_paths = ["lib"] - spec.add_runtime_dependency "sidekiq", ">= 4.2.3", "< 7.0" + spec.add_runtime_dependency "sidekiq", ">= 7.0", "< 8.0" spec.add_development_dependency "pg", "~> 1.0" spec.add_development_dependency "activesupport", "~> 7.0" diff --git a/spec/mini_scheduler/manager_spec.rb b/spec/mini_scheduler/manager_spec.rb index cf73b9d..5491a99 100644 --- a/spec/mini_scheduler/manager_spec.rb +++ b/spec/mini_scheduler/manager_spec.rb @@ -32,6 +32,17 @@ def perform end end + class SuperLongPerHostJob + extend ::MiniScheduler::Schedule + + per_host + every 20.minutes + + def perform + sleep 1000 + end + end + class PerHostJob extend ::MiniScheduler::Schedule @@ -77,11 +88,16 @@ def perform end after do - manager.stop! - manager.remove(Testing::RandomJob) - manager.remove(Testing::SuperLongJob) - manager.remove(Testing::PerHostJob) - manager.remove(Testing::FailingJob) + ObjectSpace + .each_object(described_class) + .each do |manager| + manager.stop! + manager.remove(Testing::RandomJob) + manager.remove(Testing::SuperLongJob) + manager.remove(Testing::PerHostJob) + manager.remove(Testing::FailingJob) + manager.remove(Testing::SuperLongPerHostJob) + end # connections that are not in use must be removed # otherwise active record gets super confused @@ -312,6 +328,67 @@ def queued_jobs(manager, with_hostname:) end end + describe ".discover_running_scheduled_jobs" do + let(:manager_1) { MiniScheduler::Manager.new(enable_stats: false) } + let(:manager_2) { MiniScheduler::Manager.new(enable_stats: false) } + + before do + freeze_time + + info = manager_1.schedule_info(Testing::SuperLongJob) + info.next_run = Time.now.to_i - 1 + info.write! + + manager_1.tick + + info = manager_2.schedule_info(Testing::SuperLongPerHostJob) + info.next_run = Time.now.to_i - 1 + info.write! + + manager_2.tick + + wait_for do + manager_1.schedule_info(Testing::SuperLongJob).prev_result == "RUNNING" && + manager_2.schedule_info(Testing::SuperLongPerHostJob).prev_result == "RUNNING" + end + end + + after do + manager_1.stop! + manager_2.stop! + end + + it "returns running jobs on current host" do + jobs = described_class.discover_running_scheduled_jobs + + expect(jobs.size).to eq(2) + + super_long_job = jobs.find { |job| job[:class] == Testing::SuperLongJob } + + expect(super_long_job.keys).to eq(%i[class started_at thread_id]) + expect(super_long_job[:started_at]).to be_within(1).of(Time.now) + expect(super_long_job[:thread_id]).to start_with("_scheduler_#{manager.hostname}") + + expect( + Thread.list.find do |thread| + thread[:mini_scheduler_worker_thread_id] == super_long_job[:thread_id] + end, + ).to be_truthy + + super_long_per_host_job = jobs.find { |job| job[:class] == Testing::SuperLongPerHostJob } + + expect(super_long_per_host_job.keys).to eq(%i[class started_at thread_id]) + expect(super_long_per_host_job[:started_at]).to be_within(1).of(Time.now) + expect(super_long_per_host_job[:thread_id]).to start_with("_scheduler_#{manager.hostname}") + + expect( + Thread.list.find do |thread| + thread[:mini_scheduler_worker_thread_id] == super_long_per_host_job[:thread_id] + end, + ).to be_truthy + end + end + describe "#next_run" do it "should be within the next 5 mins if it never ran" do manager.remove(Testing::RandomJob)