Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the datastore watcher #5

Merged
merged 5 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions src/pytroll_watchers/dataspace_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def generate_download_links_since(filter_string, dataspace_auth, last_publicatio
pub_limit = f"PublicationDate gt {last_publication_date.strftime('%Y-%m-%dT%H:%M:%S.%fZ')}"
filter_string_with_pub_limit = f"{filter_string} and {pub_limit}"

return generate_download_links(filter_string_with_pub_limit, dataspace_auth, storage_options)
yield from generate_download_links(filter_string_with_pub_limit, dataspace_auth, storage_options)


def generate_download_links(filter_string, dataspace_auth, storage_options):
Expand Down Expand Up @@ -166,11 +166,11 @@ def __init__(self, dataspace_auth):
"""Set up the session."""
dataspace_credentials = _get_credentials(dataspace_auth)
self._oauth = OAuth2Session(client=LegacyApplicationClient(client_id=client_id))
def sentinelhub_compliance_hook(response):
def compliance_hook(response):
response.raise_for_status()
return response

self._oauth.register_compliance_hook("access_token_response", sentinelhub_compliance_hook)
self._oauth.register_compliance_hook("access_token_response", compliance_hook)
self._token_user, self._token_pass = dataspace_credentials

def get(self, filter_string):
Expand All @@ -182,7 +182,9 @@ def get(self, filter_string):
def fetch_token(self):
"""Fetch the token."""
if not self._oauth.token or self._oauth.token["expires_at"] <= time.time():
self._oauth.fetch_token(token_url=token_url, username=self._token_user, password=self._token_pass)
self._oauth.fetch_token(token_url=token_url,
username=self._token_user,
password=self._token_pass)


def _get_credentials(ds_auth):
Expand Down
143 changes: 143 additions & 0 deletions src/pytroll_watchers/datastore_watcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
"""Module to provide file generator and publisher for the EUMETSAT datastore contents.

It polls the catalogue using Opensearch for new data and generates locations for the data on https.

Note:
The links produced can only be downloaded with a valid token. A token comes with the links, but
has only a limited validity time.

"""

import datetime
import logging
import netrc
import time
from contextlib import suppress

from oauthlib.oauth2 import BackendApplicationClient
from requests_oauthlib import OAuth2Session
from upath import UPath

from pytroll_watchers.dataspace_watcher import run_every
from pytroll_watchers.publisher import file_publisher_from_generator

logger = logging.getLogger(__name__)

token_url = "https://api.eumetsat.int/token" # noqa
data_url = "https://api.eumetsat.int/data"



def file_publisher(fs_config, publisher_config, message_config):
"""Publish files coming from local filesystem events.

Args:
fs_config: the configuration for the filesystem watching, will be passed as argument to `file_generator`.
publisher_config: The configuration dictionary to pass to the posttroll publishing functions.
message_config: The information needed to complete the posttroll message generation. Will be amended
with the file metadata, and passed directly to posttroll's Message constructor.
"""
logger.info(f"Starting watch on datastore for '{fs_config['search_params']}'")
generator = file_generator(**fs_config)
return file_publisher_from_generator(generator, publisher_config, message_config)


def file_generator(search_params, polling_interval, ds_auth, start_from=None):
"""Search params must contain at least collection."""
with suppress(TypeError):
polling_interval = datetime.timedelta(**polling_interval)
with suppress(TypeError):
start_from = datetime.timedelta(**start_from)
if start_from is None:
start_from = datetime.timedelta(0)

Check warning on line 52 in src/pytroll_watchers/datastore_watcher.py

View check run for this annotation

Codecov / codecov/patch

src/pytroll_watchers/datastore_watcher.py#L52

Added line #L52 was not covered by tests

last_pub_date = datetime.datetime.now(datetime.timezone.utc) - start_from
for next_check in run_every(polling_interval):
new_pub_date = datetime.datetime.now(datetime.timezone.utc)
yield from generate_download_links_since(search_params, ds_auth, last_pub_date)
logger.info("Finished polling.")
if next_check > datetime.datetime.now(datetime.timezone.utc):
logger.info(f"Next iteration at {next_check}")
last_pub_date = new_pub_date

Check warning on line 61 in src/pytroll_watchers/datastore_watcher.py

View check run for this annotation

Codecov / codecov/patch

src/pytroll_watchers/datastore_watcher.py#L60-L61

Added lines #L60 - L61 were not covered by tests

def generate_download_links_since(search_params, ds_auth, start_from):
"""Generate download links for data that was published since `start_from`."""
str_pub_start = start_from.isoformat(timespec="milliseconds")
search_params = search_params.copy()
search_params["publication"] = f"[{str_pub_start}"
yield from generate_download_links(search_params, ds_auth)


def generate_download_links(search_params, ds_auth):
"""Generate download links provide search parameter and authentication."""
session = DatastoreOAuth2Session(ds_auth)
collection = search_params.pop("collection")
request_params = {
"format": "json",
"pi": str(collection),
"si": 0,
"c": 100, # items per page
}

if search_params:
request_params.update(search_params)

jres = session.get(request_params)
headers={"Authorization": f"Bearer {session.token['access_token']}"}
client_args = dict(headers=headers)
for feature in jres["features"]:
links = feature["properties"]["links"]["data"]
if len(links) != 1:
raise ValueError("Don't know how to generate multiple files at the time.")

Check warning on line 91 in src/pytroll_watchers/datastore_watcher.py

View check run for this annotation

Codecov / codecov/patch

src/pytroll_watchers/datastore_watcher.py#L91

Added line #L91 was not covered by tests
path = UPath(links[0]["href"], encoded=True, client_kwargs=client_args)
# In the future, it might be interesting to generate items from the sip-entries, as
# they contain individual files for zip archives.

yield path, feature


class DatastoreOAuth2Session():
"""An oauth2 session for eumetsat datastore."""

def __init__(self, datastore_auth):
"""Set up the session."""
client_id, client_secret = _get_credentials(datastore_auth)
self._oauth = OAuth2Session(client=BackendApplicationClient(client_id=client_id))
def compliance_hook(response):
response.raise_for_status()
return response

self._oauth.register_compliance_hook("access_token_response", compliance_hook)
self._token_secret = client_secret

def get(self, params):
"""Run a get request."""
self.fetch_token()
search_url = f"{data_url}/search-products/1.0.0/os"
headers = {"referer": "https://github.com/pytroll/pytroll-watchers",
"User-Agent": "pytroll-watchers / 0.1.0"}

return self._oauth.get(search_url, params=params, headers=headers).json()

@property
def token(self):
"""Return the current token."""
return self.fetch_token()

def fetch_token(self):
"""Fetch the token."""
if not self._oauth.token or self._oauth.token["expires_at"] <= time.time():
self._oauth.fetch_token(token_url=token_url,
client_secret=self._token_secret,
include_client_id=True)
return self._oauth.token


def _get_credentials(ds_auth):
"""Get credentials from the ds_auth dictionary."""
try:
creds = ds_auth["username"], ds_auth["password"]
except KeyError:
username, _, password = netrc.netrc(ds_auth.get("netrc_file")).authenticators(ds_auth["netrc_host"])
creds = (username, password)
return creds
Loading
Loading