From df9f426c8aa1d040d2e4ecc6979824c08f80e205 Mon Sep 17 00:00:00 2001 From: "Jens W. Klein" Date: Tue, 16 Jan 2024 12:59:32 +0100 Subject: [PATCH] Fail and exit worker if there is mappings.json is missing or has a syntax error --- CHANGES.rst | 1 + src/collective/elastic/ingest/celery.py | 13 +++++++++++++ src/collective/elastic/ingest/mapping.py | 9 +++++++-- 3 files changed, 21 insertions(+), 2 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index 1594d3e..2c23a17 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -5,6 +5,7 @@ Changelog ------------------ - Fix: Do not fail on `field_remove` or `full_remove` if section or name does not exist. [jensens] +- Fix: Fail and exit worker if there is `mappings.json` is missing or has a syntax error. [jensens] 2.0.0 (2023-12-05) ------------------ diff --git a/src/collective/elastic/ingest/celery.py b/src/collective/elastic/ingest/celery.py index 02acb88..ce33d2f 100644 --- a/src/collective/elastic/ingest/celery.py +++ b/src/collective/elastic/ingest/celery.py @@ -6,6 +6,7 @@ from celery import Celery import os +import sys # sentry integration @@ -38,6 +39,9 @@ def index(path, timestamp, index_name): try: content = fetch_content(path, timestamp) + except RuntimeError: + logger.error("Fatal error, stop worker") + sys.exit(1) except Exception: msg = "Error while fetching content from Plone" # xxx: retry handling! @@ -47,12 +51,18 @@ def index(path, timestamp, index_name): return try: schema = fetch_schema() + except RuntimeError: + logger.error("Fatal error, stop worker") + sys.exit(1) except Exception: msg = "Error while fetching schema from Plone" logger.exception(msg) return msg try: process_ingest(content, schema, index_name) + except RuntimeError: + logger.error("Fatal error, stop worker") + sys.exit(1) except Exception: # xxx: retry handling! msg = "Error while writing data to ElasticSearch" @@ -65,6 +75,9 @@ def index(path, timestamp, index_name): def unindex(uid, index_name): try: remove(uid, index_name) + except RuntimeError: + logger.error("Fatal error, stop worker") + sys.exit(1) except Exception: # xxx: retry handling! msg = "Error while removing data from ElasticSearch" diff --git a/src/collective/elastic/ingest/mapping.py b/src/collective/elastic/ingest/mapping.py index 908a173..ef9f8d7 100644 --- a/src/collective/elastic/ingest/mapping.py +++ b/src/collective/elastic/ingest/mapping.py @@ -42,9 +42,14 @@ def get_field_map() -> dict: if STATE["fieldmap"] == {}: _mappings_file = os.environ.get("MAPPINGS_FILE", None) if not _mappings_file: - raise ValueError("No mappings file configured.") + raise RuntimeError("No mappings file configured.") with open(_mappings_file) as fp: - STATE["fieldmap"] = json.load(fp) + try: + STATE["fieldmap"] = json.load(fp) + except json.decoder.JSONDecodeError as e: + raise RuntimeError( + f"Error while loading mappings file {_mappings_file}: {e}" + ) assert isinstance(STATE["fieldmap"], dict) return STATE["fieldmap"]