diff --git a/pytroll_collectors/geographic_gatherer.py b/pytroll_collectors/geographic_gatherer.py index 3312d22..f446ac6 100644 --- a/pytroll_collectors/geographic_gatherer.py +++ b/pytroll_collectors/geographic_gatherer.py @@ -25,6 +25,7 @@ """Geographic segment gathering.""" import logging +import signal import time from configparser import NoOptionError, ConfigParser @@ -56,6 +57,8 @@ def __init__(self, opts): self.triggers = [] self.return_status = 0 + self._sigterm_caught = False + self._clean_config() self._setup_publisher() try: @@ -103,8 +106,9 @@ def _setup_triggers(self): def run(self): """Run granule triggers.""" + signal.signal(signal.SIGTERM, self._handle_sigterm) try: - while True: + while self._keep_running(): time.sleep(1) for trigger in self.triggers: if not trigger.is_alive(): @@ -119,9 +123,26 @@ def run(self): return self.return_status + def _handle_sigterm(self, signum, frame): + logger.info("Caught SIGTERM, shutting down when all collections are finished.") + self._sigterm_caught = True + + def _keep_running(self): + keep_running = True + if self._sigterm_caught: + keep_running = self._trigger_collectors_have_granules() + return keep_running + + def _trigger_collectors_have_granules(self): + for t in self.triggers: + for c in t.collectors: + if c.granules: + return True + return False + def stop(self): """Stop the gatherer.""" - logger.info('Ending publication the gathering of granules...') + logger.info('Ending the gathering of granules...') for trigger in self.triggers: trigger.stop() self.publisher.stop() diff --git a/pytroll_collectors/tests/test_geographic_gatherer.py b/pytroll_collectors/tests/test_geographic_gatherer.py index 077d2bd..44548f4 100644 --- a/pytroll_collectors/tests/test_geographic_gatherer.py +++ b/pytroll_collectors/tests/test_geographic_gatherer.py @@ -527,3 +527,89 @@ def test_full_pass(self, sub_factory, monkeypatch, tmp_tle): assert snd_msg.data == expected_msg.data finally: gatherer.stop() + + +def test_sigterm(tmp_config_file, tmp_config_parser): + """Test that SIGTERM signal is handled.""" + import os + import signal + import time + from multiprocessing import Process + + from pytroll_collectors.geographic_gatherer import GeographicGatherer + + with open(tmp_config_file, mode="w") as fp: + tmp_config_parser.write(fp) + + opts = arg_parse(["-c", "minimal_config", "-p", "40000", "-n", "false", "-i", "localhost:12345", + str(tmp_config_file)]) + # We don't need the triggers here. They also interfere with completing the test (the test never exits) + with patch("pytroll_collectors.geographic_gatherer.TriggerFactory.create"): + gatherer = GeographicGatherer(opts) + proc = Process(target=gatherer.run) + proc.start() + time.sleep(1) + os.kill(proc.pid, signal.SIGTERM) + proc.join() + + assert proc.exitcode == 0 + + +def test_sigterm_with_collection(tmp_config_file, tmp_config_parser): + """Test that SIGTERM signal is handled when there is collection ongoing.""" + import os + import signal + import time + from multiprocessing import Process + + from pytroll_collectors.geographic_gatherer import GeographicGatherer + + with open(tmp_config_file, mode="w") as fp: + tmp_config_parser.write(fp) + + opts = arg_parse(["-c", "posttroll_section", "-p", "40000", "-n", "false", "-i", "localhost:12345", + str(tmp_config_file)]) + # Use a fake trigger that initially sets some granules and after a while clears them + with patch("pytroll_collectors.geographic_gatherer.PostTrollTrigger", + new=FakeTriggerWithGranules): + gatherer = GeographicGatherer(opts) + proc = Process(target=gatherer.run) + proc.start() + time.sleep(1) + os.kill(proc.pid, signal.SIGTERM) + proc.join() + + assert proc.exitcode == 0 + + +class FakeTriggerWithGranules: + """Fake trigger class used in testing SIGTERM handling. + + At creation, adds "foo" to collector granules. When is_alive() is called the second time, it clears the granules. + """ + + def __init__(self, collectors, *args, **kwargs): + """Initialize the trigger class.""" + self.collectors = collectors + for col in self.collectors: + col.granules.append("foo") + self._args = args + self._kwargs = kwargs + self._counter = 0 + + def is_alive(self): + """Return True for alive thread.""" + if self._counter > 0: + # On the second call clear the granules + for col in self.collectors: + col.granules = [] + self._counter += 1 + return True + + def start(self): + """Start the trigger.""" + pass + + def stop(self): + """Stop the trigger.""" + pass