Skip to content

Commit

Permalink
Merge pull request #8 from mraspaud/fix-uid
Browse files Browse the repository at this point in the history
Add `path` to messages when `filesystem` is provided
  • Loading branch information
mraspaud authored Apr 26, 2024
2 parents b38faca + a7d6ad8 commit b330e70
Show file tree
Hide file tree
Showing 8 changed files with 150 additions and 18 deletions.
66 changes: 66 additions & 0 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,72 @@ The command-line tool can be used by invoking `pytroll-watcher <config-file>`. A
platform_name:
npp: Suomi-NPP

Published messages
******************

The published messages will contain information on how to access the resource advertized. The following parameters will
be present in the message.

uid
---

This is the unique identifier for the resource. In general, it is the basename for the file/objects, since we assume
that two files with the same name will have the same content. In some cases it can include the containing directory.

Examples of uids:

- `SVM13_npp_d20240408_t1006227_e1007469_b64498_c20240408102334392250_cspp_dev.h5`
- `S3B_OL_1_EFR____20240415T074029_20240415T074329_20240415T094236_0179_092_035_1620_PS2_O_NR_003.SEN3/Oa02_radiances.nc`

uri
---

This is the URI that can be used to access the resource. The URI can be composed as fsspec allows for more complex cases.

Examples of uris:

- `s3://viirs-data/sdr/SVM13_npp_d20240408_t1006227_e1007469_b64498_c20240408102334392250_cspp_dev.h5`
- `zip://sdr/SVM13_npp_d20240408_t1006227_e1007469_b64498_c20240408102334392250_cspp_dev.h5::s3://viirs-data/viirs_sdr_npp_d20240408_t1006227_e1007469_b64498.zip`
- `https://someplace.com/files/S3B_OL_1_EFR____20240415T074029_20240415T074329_20240415T094236_0179_092_035_1620_PS2_O_NR_003.SEN3/Oa02_radiances.nc`

filesystem
----------

Sometimes the URI is not enough to gain access to the resource, for example when the hosting service requires
authentification. This is why pytroll-watchers with also provide the filesystem and the path items. The filesystem
parameter is the fsspec json representation of the filesystem. This can be used on the recipient side using eg::

fsspec.AbstractFileSystem.from_json(json.dumps(fs_info))

where `fs_info` is the content of the filesystem parameter.

To pass authentification parameters to the filesystem, use the `storage_options` configuration item.


Example of filesystem:

- `{"cls": "s3fs.core.S3FileSystem", "protocol": "s3", "args": [], "profile": "someprofile"}`

.. warning::

Pytroll-watchers tries to prevent publishing of sensitive information such as passwords and secret keys, and will
raise an error in most cases when this is done. However, always double-check your pytroll-watchers configuration so
that secrets are not passed to the library to start with.
Solutions include ssh-agent for ssh-based filesystems, storing credentials in .aws config files for s3 filesystems.
For http-based filesystems implemented in pytroll-watchers, the username and password are used to generate a token
prior to publishing, and will thus not be published.

path
----

This parameter is the companion to `filesystem` and gives the path to the resource within the filesystem.

Examples of paths:

- `/viirs-data/sdr/SVM13_npp_d20240408_t1006227_e1007469_b64498_c20240408102334392250_cspp_dev.h5`
- `/sdr/SVM13_npp_d20240408_t1006227_e1007469_b64498_c20240408102334392250_cspp_dev.h5`
- `/files/S3B_OL_1_EFR____20240415T074029_20240415T074329_20240415T094236_0179_092_035_1620_PS2_O_NR_003.SEN3/Oa02_radiances.nc`


API
***
Expand Down
4 changes: 3 additions & 1 deletion src/pytroll_watchers/local_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from upath import UPath

from pytroll_watchers.backends.local import listen_to_local_events
from pytroll_watchers.publisher import file_publisher_from_generator, parse_metadata
from pytroll_watchers.publisher import SecurityError, file_publisher_from_generator, parse_metadata

logger = logging.getLogger(__name__)

Expand All @@ -25,6 +25,8 @@ def file_publisher(fs_config, publisher_config, message_config):
with the file metadata, and passed directly to posttroll's Message constructor.
"""
logger.info(f"Starting watch on '{fs_config['directory']}'")
if "password" in fs_config.get("storage_options", []):
raise SecurityError("A password cannot be published safely.")
generator = file_generator(**fs_config)
return file_publisher_from_generator(generator, publisher_config, message_config)

Expand Down
9 changes: 7 additions & 2 deletions src/pytroll_watchers/minio_notification_watcher.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
"""Publish messages based on Minio bucket notifications."""
"""Publish messages based on Minio bucket notifications.
The published messages will contain filesystem information generated by fsspec.
"""

from logging import getLogger

from upath import UPath

from pytroll_watchers.publisher import file_publisher_from_generator, parse_metadata
from pytroll_watchers.publisher import SecurityError, file_publisher_from_generator, parse_metadata

logger = getLogger(__name__)

Expand All @@ -19,6 +22,8 @@ def file_publisher(fs_config, publisher_config, message_config):
with the file metadata, and passed directly to posttroll's Message constructor.
"""
logger.info(f"Starting watch on '{fs_config['bucket_name']}'")
if "secret_key" in fs_config.get("storage_options", []):
raise SecurityError("A secret key cannot be published safely.")
generator = file_generator(**fs_config)
return file_publisher_from_generator(generator, publisher_config, message_config)

Expand Down
23 changes: 21 additions & 2 deletions src/pytroll_watchers/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
logger = logging.getLogger(__name__)


class SecurityError(Exception):
"""An exception for breaking security rules."""


def file_publisher_from_generator(generator, publisher_config, message_config):
"""Publish files coming from local filesystem events.
Expand All @@ -22,7 +26,21 @@ def file_publisher_from_generator(generator, publisher_config, message_config):
publisher_config: The configuration dictionary to pass to the posttroll publishing functions.
message_config: The information needed to complete the posttroll message generation. Will be amended
with the file metadata, and passed directly to posttroll's Message constructor.
"""
Side effect:
Publishes posttroll messages containing the location of the file with the following fields:
- The "uid" which is the unique filename of the file.
- The "uri" which provides an fsspec-style uri to the file. This does not however contain the connection
parameters, or storage options of the filesystems referred to.
- The "filesystem" which is the json-serialized filesystem information that can then be fed to fsspec.
- The "path" which is the path of the file inside the filesystem.
For example, the file
- uid: `S3B_OL_1_EFR____20240415T074029_20240415T074329_20240415T094236_0179_092_035_1620_PS2_O_NR_003.SEN3/Oa02_radiance.nc`
- uri: `s3:///eodata/Sentinel-3/OLCI/OL_1_EFR___/2024/04/15/S3B_OL_1_EFR____20240415T074029_20240415T074329_20240415T094236_0179_092_035_1620_PS2_O_NR_003.SEN3/Oa02_radiance.nc`
- filesystem: `{"cls": "s3fs.core.S3FileSystem", "protocol": "s3", "args": [], "profile": "my_profile"}`
- path: `/eodata/Sentinel-3/OLCI/OL_1_EFR___/2024/04/15/S3B_OL_1_EFR____20240415T074029_20240415T074329_20240415T094236_0179_092_035_1620_PS2_O_NR_003.SEN3/Oa02_radiance.nc`
""" # noqa
publisher = create_publisher_from_dict_config(publisher_config)
publisher.start()
with closing(publisher):
Expand All @@ -31,7 +49,8 @@ def file_publisher_from_generator(generator, publisher_config, message_config):
amended_message_config["data"]["uri"] = file_item.as_uri()
amended_message_config["data"]["uid"] = file_item.name
with suppress(AttributeError):
amended_message_config["data"]["fs"] = json.loads(file_item.fs.to_json())
amended_message_config["data"]["filesystem"] = json.loads(file_item.fs.to_json())
amended_message_config["data"]["path"] = file_item.path
aliases = amended_message_config.pop("aliases", {})
apply_aliases(aliases, file_metadata)
amended_message_config["data"].update(file_metadata)
Expand Down
31 changes: 25 additions & 6 deletions tests/test_bucket_notification_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
import datetime
from unittest import mock

import pytest
from posttroll.message import Message
from posttroll.testing import patched_publisher
from pytroll_watchers import minio_notification_watcher
from pytroll_watchers.publisher import fix_times
from pytroll_watchers.publisher import SecurityError, fix_times
from pytroll_watchers.testing import patched_bucket_listener # noqa
from upath import UPath

Expand Down Expand Up @@ -107,11 +108,29 @@ def test_publish_paths(patched_bucket_listener, caplog): # noqa
assert message.data["uri"] == "s3://viirs-data/sdr/SVM13_npp_d20240408_t1006227_e1007469_b64498_c20240408102334392250_cspp_dev.h5"
assert message.data["uid"] == "SVM13_npp_d20240408_t1006227_e1007469_b64498_c20240408102334392250_cspp_dev.h5"
assert message.data["sensor"] == "viirs"
assert message.data["fs"] == {"cls": "s3fs.core.S3FileSystem", "protocol": "s3", "args": [],
assert message.data["filesystem"] == {"cls": "s3fs.core.S3FileSystem", "protocol": "s3", "args": [],
"profile": "someprofile"}
assert "Starting watch on 'viirs-data'" in caplog.text


def test_publish_paths_forbids_passing_secret_key(patched_bucket_listener): # noqa
"""Test publishing paths forbids passing a secret key."""
secret_key = "very secret" # noqa
s3_config = dict(endpoint_url="someendpoint",
bucket_name="viirs-data",
file_pattern=sdr_file_pattern,
storage_options=dict(access_key="my access key",
secret_key=secret_key))
publisher_settings = dict(nameservers=False, port=1979)
message_settings = dict(subject="/segment/viirs/l1b/", atype="file", data=dict(sensor="viirs"))
with patched_publisher():
with patched_bucket_listener(records):
with pytest.raises(SecurityError):
minio_notification_watcher.file_publisher(fs_config=s3_config,
publisher_config=publisher_settings,
message_config=message_settings)


def test_publish_paths_with_pattern(patched_bucket_listener): # noqa
"""Test publishing paths."""
s3_config = dict(endpoint_url="someendpoint",
Expand All @@ -121,10 +140,10 @@ def test_publish_paths_with_pattern(patched_bucket_listener): # noqa
publisher_settings = dict(nameservers=False, port=1979)
message_settings = dict(subject="/segment/viirs/l1b/", atype="file", data=dict(sensor="viirs"))
with patched_publisher() as messages:
with patched_bucket_listener(records):
minio_notification_watcher.file_publisher(fs_config=s3_config,
publisher_config=publisher_settings,
message_config=message_settings)
with patched_bucket_listener(records):
minio_notification_watcher.file_publisher(fs_config=s3_config,
publisher_config=publisher_settings,
message_config=message_settings)
message = Message(rawstr=messages[0])
assert message.data["sensor"] == "viirs"
assert message.data["platform_name"] == "npp"
Expand Down
3 changes: 2 additions & 1 deletion tests/test_copernicus_dataspace_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ def test_publish_paths(caplog):
assert message.data["uri"] == "s3:///eodata/Sentinel-3/OLCI/OL_1_EFR___/2024/04/15/S3B_OL_1_EFR____20240415T074029_20240415T074329_20240415T094236_0179_092_035_1620_PS2_O_NR_003.SEN3"
assert message.data["uid"] == "S3B_OL_1_EFR____20240415T074029_20240415T074329_20240415T094236_0179_092_035_1620_PS2_O_NR_003.SEN3" # noqa
assert message.data["sensor"] == "olci"
assert message.data["fs"] == {"cls": "s3fs.core.S3FileSystem", "protocol": "s3", "args": [],
assert message.data["filesystem"] == {"cls": "s3fs.core.S3FileSystem", "protocol": "s3", "args": [],
"profile": "someprofile"}
assert message.data["path"] == "/eodata/Sentinel-3/OLCI/OL_1_EFR___/2024/04/15/S3B_OL_1_EFR____20240415T074029_20240415T074329_20240415T094236_0179_092_035_1620_PS2_O_NR_003.SEN3" # noqa
assert f"Starting watch on dataspace for '{filter_string}'" in caplog.text
28 changes: 24 additions & 4 deletions tests/test_local_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from posttroll.message import Message
from posttroll.testing import patched_publisher
from pytroll_watchers import local_watcher
from pytroll_watchers.publisher import SecurityError
from pytroll_watchers.testing import patched_local_events # noqa


Expand Down Expand Up @@ -34,13 +35,13 @@ def test_watchdog_generator_with_protocol(tmp_path, patched_local_events): # no

protocol = "ssh"
storage_options = {"parameter": "value",
"host": "somehost.pytroll.org"}
"host": "somehost.pytroll.org"}


generator = local_watcher.file_generator(tmp_path,
file_pattern=fname_pattern,
protocol=protocol,
storage_options=storage_options)
file_pattern=fname_pattern,
protocol=protocol,
storage_options=storage_options)
path, metadata = next(generator)

assert path.as_uri().startswith("ssh://")
Expand Down Expand Up @@ -97,3 +98,22 @@ def test_publish_paths(tmp_path, patched_local_events, caplog): # noqa
assert message.data["sensor"] == "viirs"
assert "fs" not in message.data
assert f"Starting watch on '{local_settings['directory']}'" in caplog.text


def test_publish_paths_forbids_passing_password(tmp_path, patched_local_events, caplog): # noqa
"""Test publishing paths."""
filename = os.fspath(tmp_path / "foo.txt")
password = "very strong" # noqa

local_settings = dict(directory=tmp_path, protocol="ssh",
storage_options=dict(host="myhost.pytroll.org", username="user", password=password))
publisher_settings = dict(nameservers=False, port=1979)
message_settings = dict(subject="/segment/viirs/l1b/", atype="file", data=dict(sensor="viirs"))

caplog.set_level("INFO")
with patched_local_events([filename]):
with patched_publisher():
with pytest.raises(SecurityError):
local_watcher.file_publisher(fs_config=local_settings,
publisher_config=publisher_settings,
message_config=message_settings)
4 changes: 2 additions & 2 deletions tests/test_main_interface.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Tests for the gathered publisher functions."""

import logging

import pytest
import yaml
from posttroll.testing import patched_publisher
Expand Down Expand Up @@ -146,8 +148,6 @@ def test_cli_with_logging(tmp_path, patched_local_events): # noqa
with open(log_config_file, "w") as fd:
fd.write(yaml.dump(log_config))

import logging

with patched_publisher():
filename = tmp_path / "bla"
with patched_local_events([filename]):
Expand Down

0 comments on commit b330e70

Please sign in to comment.