Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SIGTERM handling to segment gatherer #156

Merged
merged 4 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions pytroll_collectors/segments.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@

import datetime as dt
import logging.handlers
import os
import signal
from abc import ABCMeta, abstractmethod
from collections import OrderedDict
from enum import Enum
import os

import trollsift
from posttroll import message as pmessage
Expand Down Expand Up @@ -610,6 +611,7 @@
self._group_by_minutes = self._config.get('group_by_minutes', None)

self._loop = False
self._sigterm_caught = False
self._providing_server = self._config.get('providing_server')
self._is_first_message_after_start = True

Expand Down Expand Up @@ -711,9 +713,10 @@
def run(self):
"""Run SegmentGatherer."""
self._setup_messaging()
signal.signal(signal.SIGTERM, self._handle_sigterm)

Check warning on line 716 in pytroll_collectors/segments.py

View check run for this annotation

Codecov / codecov/patch

pytroll_collectors/segments.py#L716

Added line #L716 was not covered by tests

self._loop = True
while self._loop:
while self._keep_running():

Check warning on line 719 in pytroll_collectors/segments.py

View check run for this annotation

Codecov / codecov/patch

pytroll_collectors/segments.py#L719

Added line #L719 was not covered by tests
self.triage_slots()

# Check listener for new messages
Expand All @@ -722,8 +725,7 @@
except AttributeError:
msg = self._listener.queue.get(True, 1)
except KeyboardInterrupt:
self.stop()
continue
break

Check warning on line 728 in pytroll_collectors/segments.py

View check run for this annotation

Codecov / codecov/patch

pytroll_collectors/segments.py#L728

Added line #L728 was not covered by tests
except Empty:
continue

Expand All @@ -733,6 +735,16 @@
continue
logger.info("New message received: %s", str(msg))
self.process(msg)
self.stop()

Check warning on line 738 in pytroll_collectors/segments.py

View check run for this annotation

Codecov / codecov/patch

pytroll_collectors/segments.py#L738

Added line #L738 was not covered by tests

def _handle_sigterm(self, signum, frame):
logging.info("Caught SIGTERM, shutting down when all collections are finished.")
self._sigterm_caught = True

Check warning on line 742 in pytroll_collectors/segments.py

View check run for this annotation

Codecov / codecov/patch

pytroll_collectors/segments.py#L741-L742

Added lines #L741 - L742 were not covered by tests

def _keep_running(self):
if not self._loop or (self._sigterm_caught and not self.slots):
return False
return True

Check warning on line 747 in pytroll_collectors/segments.py

View check run for this annotation

Codecov / codecov/patch

pytroll_collectors/segments.py#L745-L747

Added lines #L745 - L747 were not covered by tests

def triage_slots(self):
"""Check if there are slots ready for publication."""
Expand Down
16 changes: 8 additions & 8 deletions pytroll_collectors/tests/test_fsspec_to_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ def create_files_to_pack(self, tmp_path):
@pytest.mark.parametrize(
("packing", "create_packfile", "filesystem_class"),
[
("tar", create_tar_file, "fsspec.implementations.tar.TarFileSystem"),
("zip", create_zip_file, "fsspec.implementations.zip.ZipFileSystem"),
("tar", create_tar_file, "fsspec.implementations.tar:TarFileSystem"),
("zip", create_zip_file, "fsspec.implementations.zip:ZipFileSystem"),
]
)
def test_pack_file_extract(self, packing, create_packfile, filesystem_class, tmp_path):
Expand Down Expand Up @@ -153,8 +153,8 @@ def test_pack_file_extract(self, packing, create_packfile, filesystem_class, tmp
@pytest.mark.parametrize(
("packing", "create_packfile", "filesystem_class"),
[
("tar", create_tar_file, "fsspec.implementations.tar.TarFileSystem"),
("zip", create_zip_file, "fsspec.implementations.zip.ZipFileSystem"),
("tar", create_tar_file, "fsspec.implementations.tar:TarFileSystem"),
("zip", create_zip_file, "fsspec.implementations.zip:ZipFileSystem"),
]
)
def test_pack_local_file_extract(self, packing, create_packfile, filesystem_class, tmp_path):
Expand Down Expand Up @@ -184,8 +184,8 @@ def test_pack_local_file_extract(self, packing, create_packfile, filesystem_clas
@pytest.mark.parametrize(
("packing", "create_packfile", "filesystem_class"),
[
("tar", create_tar_file, "fsspec.implementations.tar.TarFileSystem"),
("zip", create_zip_file, "fsspec.implementations.zip.ZipFileSystem"),
("tar", create_tar_file, "fsspec.implementations.tar:TarFileSystem"),
("zip", create_zip_file, "fsspec.implementations.zip:ZipFileSystem"),
]
)
def test_pack_local_file_extract_filesystem(self, packing, create_packfile, filesystem_class, tmp_path):
Expand All @@ -211,8 +211,8 @@ def check_filesystem_is_understood_by_fsspec(self, filesystem_info):
@pytest.mark.parametrize(
("packing", "create_packfile", "filesystem_class"),
[
("tar", create_tar_file, "fsspec.implementations.tar.TarFileSystem"),
("zip", create_zip_file, "fsspec.implementations.zip.ZipFileSystem"),
("tar", create_tar_file, "fsspec.implementations.tar:TarFileSystem"),
("zip", create_zip_file, "fsspec.implementations.zip:ZipFileSystem"),
]
)
def test_pack_local_file_extract_with_custom_options(self, packing, create_packfile, filesystem_class, tmp_path):
Expand Down
51 changes: 51 additions & 0 deletions pytroll_collectors/tests/test_segments.py
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,57 @@
self.msg0deg._setup_listener()
assert_messaging(None, None, None, 'localhost', None, ListenerContainer)

def test_sigterm(self):
"""Test that SIGTERM signal is handled."""
import os
import signal
import time
from multiprocessing import Process

with patch('pytroll_collectors.segments.ListenerContainer'):
col = SegmentGatherer(CONFIG_SINGLE)
proc = Process(target=col.run)
proc.start()
time.sleep(1)
os.kill(proc.pid, signal.SIGTERM)
proc.join()

assert proc.exitcode == 0

def test_sigterm_nonempty_slots(self):
"""Test that SIGTERM signal is handled properly when there are active slots present."""
import os
import signal
import time
from multiprocessing import Process

with patch('pytroll_collectors.segments.ListenerContainer'):
with patch('pytroll_collectors.segments.SegmentGatherer.triage_slots',
new=_fake_triage_slots):
col = SegmentGatherer(CONFIG_SINGLE)
proc = Process(target=col.run)
proc.start()
time.sleep(1)
tic = time.time()
os.kill(proc.pid, signal.SIGTERM)
proc.join()

assert proc.exitcode == 0
# Triage after the kill signal takes 1 s
assert time.time() - tic > 1.


def _fake_triage_slots(self):
"""Fake the triage_slots() method.

The fake triage adds a new slot if SIGTERM has not been caught, and removes it when the signal comes.
"""
import time
self.slots["foo"] = "bar"
if self._sigterm_caught:
del self.slots["foo"]
time.sleep(1)

Check warning on line 799 in pytroll_collectors/tests/test_segments.py

View check run for this annotation

Codecov / codecov/patch

pytroll_collectors/tests/test_segments.py#L795-L799

Added lines #L795 - L799 were not covered by tests


def _get_message_from_metadata_and_patterns(mda, patterns):
fake_message = FakeMessage(mda)
Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ tag_prefix = v
omit =
pytroll_collectors/_version.py
versioneer.py
relative_files = True
Loading