diff --git a/.github/workflows/galaxy_framework.yaml b/.github/workflows/galaxy_framework.yaml index 775d48b2..998a7a0c 100644 --- a/.github/workflows/galaxy_framework.yaml +++ b/.github/workflows/galaxy_framework.yaml @@ -51,7 +51,7 @@ jobs: env: GALAXY_TEST_JOB_CONFIG_FILE: ../pulsar/test_data/test_job_conf.yaml GALAXY_CONFIG_OVERRIDE_METADATA_STRATEGY: ${{ matrix.metadata-strategy }} - continue-on-error: true + continue-on-error: ${{ matrix.galaxy-branch == 'master' }} - uses: actions/upload-artifact@v2 with: name: Framework test results (${{ matrix.python-version }}) diff --git a/.github/workflows/pulsar.yaml b/.github/workflows/pulsar.yaml index 49eff7f8..92e85d32 100644 --- a/.github/workflows/pulsar.yaml +++ b/.github/workflows/pulsar.yaml @@ -24,10 +24,8 @@ jobs: include: - tox-env: py36-mypy python: 3.6 - - tox-env: py37-mypy - python: 3.7 - - tox-env: py38-mypy - python: 3.8 + - tox-env: py311-mypy + python: 3.11 steps: - uses: actions/checkout@v2 - uses: actions/setup-python@v3 @@ -36,7 +34,7 @@ jobs: - name: Install tox run: pip install tox - name: Setup pycurl - run: sudo apt update; sudo apt install -y libxml2-dev libxslt1-dev libcurl4-openssl-dev python-pycurl openssh-server + run: sudo apt update; sudo apt install -y libxml2-dev libxslt1-dev libcurl4-openssl-dev openssh-server - name: Run tox run: tox -e ${{ matrix.tox-env }} test: @@ -49,21 +47,13 @@ jobs: python: 3.6 - tox-env: py36-test-unit python: 3.6 - - tox-env: py37-test-ci - python: 3.7 - - tox-env: py37-test-unit - python: 3.7 - - tox-env: py38-test-ci - python: 3.8 - - tox-env: py38-test-unit - python: 3.8 - - tox-env: py39-test-ci - python: 3.9 - - tox-env: py39-test-unit - python: 3.9 - - tox-env: py39-install-wheel-no-conda + - tox-env: py311-test-ci + python: 3.11 + - tox-env: py311-test-unit + python: 3.11 + - tox-env: py39-install_wheel-no_conda python: 3.9 - - tox-env: py37-install-wheel + - tox-env: py37-install_wheel python: 3.7 services: job-files: @@ -94,7 +84,7 @@ jobs: ## connect to the host running the Pulsar server for file transfers in stage out/in. # tes-test: # name: Run Tests - # runs-on: ubuntu-20.04 + # runs-on: ubuntu-20.04 # strategy: # matrix: # include: diff --git a/.gitignore b/.gitignore index a4956890..3ee584c8 100644 --- a/.gitignore +++ b/.gitignore @@ -32,3 +32,4 @@ dependencies dependency_resolvers_conf.xml job_metrics_conf.xml .DS_Store +.idea diff --git a/HISTORY.rst b/HISTORY.rst index 07ac1575..226c2c2a 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -6,19 +6,34 @@ History .. to_doc --------------------- -0.15.0.dev1 +0.15.2 (2023-05-02) --------------------- +* Fix Pulsar and Pulsar client reconnection to AMQP server. `Pull Request 324`_ +* Reduce verbosity of timeout exception catching. `Pull Request 325`_ -* Updated Galaxy+Pulsar container. +--------------------- +0.15.1 (2023-04-13) +--------------------- +* No changes, working around pypi isssue. + +--------------------- +0.15.0 (2023-04-13) +--------------------- + +* Updated Galaxy+Pulsar container. `Pull Request 306`_ * Rework container execution - generalize Kubernetes execution to allow it to work without a - message queue and to allow TES execution based on pydantic-tes (https://github.com/jmchilton/pydantic-tes). -* Add documentation and diagrams for container execution scenarios. + message queue and to allow TES execution based on pydantic-tes (https://github.com/jmchilton/pydantic-tes). `Pull Request 302`_ +* Add documentation and diagrams for container execution scenarios. `Pull Request 302`_ * Rework integration tests to use pytest more aggressively. * Fixes to CI to run more tests that weren't being executed because Tox was not sending environment variables through to pytest. * Add option ``amqp_key_prefix`` to direct task queue naming while retaining simple - default manager names and such in container scheduling deployments. -* Various typing and CI fixes. + default manager names and such in container scheduling deployments. `Pull Request 315`_ +* Various typing and CI fixes. `Pull Request 312`_, `Pull Request 319`_ +* Fixes for extra_file handling. `Pull Request 318`_ +* Separate tool_stdio and job_stdio handling. `Pull Request 318`_ +* Re-import MEMORY_STATEMENT.sh from Galaxy. `Pull Request 297`_ +* Add support for logging to sentry. `Pull Request 322`_ --------------------- 0.14.16 (2022-10-04) @@ -451,6 +466,17 @@ History .. github_links + +.. _Pull Request 325: https://github.com/galaxyproject/pulsar/pull/325 +.. _Pull Request 324: https://github.com/galaxyproject/pulsar/pull/324 +.. _Pull Request 322: https://github.com/galaxyproject/pulsar/pull/322 +.. _Pull Request 318: https://github.com/galaxyproject/pulsar/pull/318 +.. _Pull Request 319: https://github.com/galaxyproject/pulsar/pull/319 +.. _Pull Request 312: https://github.com/galaxyproject/pulsar/pull/312 +.. _Pull Request 315: https://github.com/galaxyproject/pulsar/pull/315 +.. _Pull Request 306: https://github.com/galaxyproject/pulsar/pull/306 +.. _Pull Request 297: https://github.com/galaxyproject/pulsar/pull/297 +.. _Pull Request 302: https://github.com/galaxyproject/pulsar/pull/302 .. _Pull Request 303: https://github.com/galaxyproject/pulsar/pull/303 .. _Pull Request 301: https://github.com/galaxyproject/pulsar/pull/301 .. _Pull Request 299: https://github.com/galaxyproject/pulsar/pull/299 diff --git a/app.yml.sample b/app.yml.sample index 85d68ad6..a4bcb3b3 100644 --- a/app.yml.sample +++ b/app.yml.sample @@ -89,6 +89,10 @@ ## kombu.Connection's drain_events() method. #amqp_consumer_timeout: 0.2 +## publishing messages to the queue may hang if the connection becomes invalid. +## this value is used as the timeout argument to the producer.publish function. +#amqp_publish_timeout: 2.0 + # AMQP does not guarantee that a published message is received by the AMQP # server, so Pulsar can request that the consumer acknowledge messages and will # resend them if acknowledgement is not received after a configurable timeout @@ -120,9 +124,36 @@ ## Maximum number of seconds to sleep between each retry. #amqp_publish_retry_interval_max: 60 + +## configure user authentication/authorization plugins +## parameters depend on auth type. Authentication plugin should return a username +## and authorization plugin should authorize this user +#user_auth: +# authentication: +# - type: oidc +# oidc_jwks_url: https://login.microsoftonline.com/xxx/discovery/v2.0/keys +# oidc_provider: azure +# oidc_username_in_token: preferred_username +# oidc_username_template: *. +# authorization: +# - type: userlist +# userlist_allowed_users: +# - xxx + ## *Experimental*. Enable file caching by specifing a directory here. ## Directory used to store incoming file cache. It works fine for HTTP ## transfer, have not tested with staging by coping. Also there is no ## mechanism for expiring cache so it will grow unbounded without ## external clean up. #file_cache_dir: cache + +## Log to Sentry Sentry is an open source logging and error aggregation +## platform. Setting sentry_dsn will enable the Sentry middleware and +## errors will be sent to the indicated sentry instance. This +## connection string is available in your sentry instance under +## -> Settings -> API Keys. +##sentry_dsn: null + +## Determines the minimum log level that will be sent as an event to +## Sentry. Possible values are DEBUG, INFO, WARNING, ERROR or CRITICAL. +##sentry_event_level: WARNING diff --git a/dev-requirements.txt b/dev-requirements.txt index 1215fd0a..f163c94d 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -18,13 +18,14 @@ sphinx==1.2 pyflakes flake8 -mypy +mypy<=1.0.1 # https://github.com/pydantic/pydantic/issues/5192 types-paramiko types-pkg-resources types-PyYAML types-pycurl types-requests types-psutil +sentry-sdk # For release wheel diff --git a/docs/configure.rst b/docs/configure.rst index a2a8dbe4..fb1dc748 100644 --- a/docs/configure.rst +++ b/docs/configure.rst @@ -112,6 +112,34 @@ You can consult the `Kombu documentation `__ for even more information. +User Authentication/Authorization +````````````` + +You can configure Pulsar to authenticate user during request processing and check +if this user is allowed to run a job. + +Various authentication/authorization plugins can be configured in `app.yml` to +do that and plugin parameters depend on auth type. For example, the following +configuration uses `oidc` plugin for authentication and `userlist` for +authorization:: + + user_auth: + authentication: + - type: oidc + oidc_jwks_url: https://login.microsoftonline.com/xxx/discovery/v2.0/keys + oidc_provider: azure + oidc_username_in_token: preferred_username + oidc_username_template: *. + authorization: + - type: userlist + userlist_allowed_users: + - xxx + + +see `plugins folder +`_ +for available plugins and their parameters. + Customizing the Pulsar Environment (\*nix only) ----------------------------------------------- diff --git a/docs/pulsar.client.rst b/docs/pulsar.client.rst index dd006a59..e99a7171 100644 --- a/docs/pulsar.client.rst +++ b/docs/pulsar.client.rst @@ -77,14 +77,6 @@ pulsar.client.exceptions module :undoc-members: :show-inheritance: -pulsar.client.interface module ------------------------------- - -.. automodule:: pulsar.client.interface - :members: - :undoc-members: - :show-inheritance: - pulsar.client.job\_directory module ----------------------------------- @@ -117,6 +109,14 @@ pulsar.client.path\_mapper module :undoc-members: :show-inheritance: +pulsar.client.server\_interface module +-------------------------------------- + +.. automodule:: pulsar.client.server_interface + :members: + :undoc-members: + :show-inheritance: + pulsar.client.setup\_handler module ----------------------------------- diff --git a/docs/pulsar.managers.util.cli.job.rst b/docs/pulsar.managers.util.cli.job.rst index 45383ff5..6db7c001 100644 --- a/docs/pulsar.managers.util.cli.job.rst +++ b/docs/pulsar.managers.util.cli.job.rst @@ -4,6 +4,22 @@ pulsar.managers.util.cli.job package Submodules ---------- +pulsar.managers.util.cli.job.lsf module +--------------------------------------- + +.. automodule:: pulsar.managers.util.cli.job.lsf + :members: + :undoc-members: + :show-inheritance: + +pulsar.managers.util.cli.job.pbs module +--------------------------------------- + +.. automodule:: pulsar.managers.util.cli.job.pbs + :members: + :undoc-members: + :show-inheritance: + pulsar.managers.util.cli.job.slurm module ----------------------------------------- diff --git a/docs/pulsar.managers.util.rst b/docs/pulsar.managers.util.rst index d970ed3d..310cde4d 100644 --- a/docs/pulsar.managers.util.rst +++ b/docs/pulsar.managers.util.rst @@ -14,6 +14,14 @@ Subpackages Submodules ---------- +pulsar.managers.util.aws\_batch module +-------------------------------------- + +.. automodule:: pulsar.managers.util.aws_batch + :members: + :undoc-members: + :show-inheritance: + pulsar.managers.util.env module ------------------------------- @@ -38,6 +46,22 @@ pulsar.managers.util.kill module :undoc-members: :show-inheritance: +pulsar.managers.util.process\_groups module +------------------------------------------- + +.. automodule:: pulsar.managers.util.process_groups + :members: + :undoc-members: + :show-inheritance: + +pulsar.managers.util.pykube\_util module +---------------------------------------- + +.. automodule:: pulsar.managers.util.pykube_util + :members: + :undoc-members: + :show-inheritance: + pulsar.managers.util.retry module --------------------------------- @@ -54,6 +78,14 @@ pulsar.managers.util.sudo module :undoc-members: :show-inheritance: +pulsar.managers.util.tes module +------------------------------- + +.. automodule:: pulsar.managers.util.tes + :members: + :undoc-members: + :show-inheritance: + Module contents --------------- diff --git a/docs/pulsar.scripts.rst b/docs/pulsar.scripts.rst index 06d50230..08740e57 100644 --- a/docs/pulsar.scripts.rst +++ b/docs/pulsar.scripts.rst @@ -44,6 +44,14 @@ pulsar.scripts.drmaa\_launch module :undoc-members: :show-inheritance: +pulsar.scripts.finish module +---------------------------- + +.. automodule:: pulsar.scripts.finish + :members: + :undoc-members: + :show-inheritance: + pulsar.scripts.mesos\_executor module ------------------------------------- diff --git a/install_test/common_functions.bash b/install_test/common_functions.bash index e590ca8a..97760eb8 100644 --- a/install_test/common_functions.bash +++ b/install_test/common_functions.bash @@ -5,7 +5,7 @@ shopt -s nullglob : ${PULSAR_TARGET_PORT:=8913} : ${PULSAR_INSTALL_TARGET:=pulsar-app} : ${PULSAR_TEST_DEBUG:=false} -: ${PLANEMO_INSTALL_TARGET:=planemo==0.75.3} +: ${PLANEMO_INSTALL_TARGET:=planemo} init_temp_dir() { case $(uname -s) in diff --git a/mypy.ini b/mypy.ini index b10a4e64..e8722c14 100644 --- a/mypy.ini +++ b/mypy.ini @@ -1,3 +1,7 @@ +[mypy] +show_error_codes = True +pretty = True + [mypy-galaxy.*] ignore_missing_imports = True @@ -31,6 +35,9 @@ ignore_missing_imports = True [mypy-kombu.*] ignore_missing_imports = True +[mypy-amqp.*] +ignore_missing_imports = True + [mypy-requests_toolbelt.*] ignore_missing_imports = True diff --git a/pulsar/__init__.py b/pulsar/__init__.py index f22f5241..459b4941 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -1,4 +1,4 @@ -__version__ = '0.15.0.dev2' +__version__ = '0.15.2' PROJECT_NAME = "pulsar" PROJECT_OWNER = PROJECT_USERAME = "galaxyproject" diff --git a/pulsar/client/amqp_exchange.py b/pulsar/client/amqp_exchange.py index 0de56ced..bde924be 100644 --- a/pulsar/client/amqp_exchange.py +++ b/pulsar/client/amqp_exchange.py @@ -7,17 +7,27 @@ sleep, time, ) +from typing import Optional +from packaging.version import parse as parse_version try: import kombu + import kombu.exceptions from kombu import pools except ImportError: kombu = None +try: + import amqp + import amqp.exceptions +except ImportError: + amqp = None + log = logging.getLogger(__name__) KOMBU_UNAVAILABLE = "Attempting to bind to AMQP message queue, but kombu dependency unavailable" +AMQP_UNAVAILABLE = "Attempting to bind to AMQP message queue, but pyampq dependency unavailable" DEFAULT_EXCHANGE_NAME = "pulsar" DEFAULT_EXCHANGE_TYPE = "direct" @@ -37,6 +47,7 @@ ACK_FORCE_NOACK_KEY = 'force_noack' DEFAULT_ACK_MANAGER_SLEEP = 15 DEFAULT_REPUBLISH_TIME = 30 +MINIMUM_KOMBU_VERSION_PUBLISH_TIMEOUT = parse_version("5.2.0") class PulsarExchange: @@ -47,7 +58,7 @@ class PulsarExchange: Each Pulsar manager is defined solely by name in the scheme, so only one Pulsar should target each AMQP endpoint or care should be taken that unique - manager names are used across Pulsar servers targetting same AMQP endpoint - + manager names are used across Pulsar servers targeting the same AMQP endpoint - and in particular only one such Pulsar should define an default manager with name _default_. """ @@ -68,6 +79,17 @@ def __init__( """ if not kombu: raise Exception(KOMBU_UNAVAILABLE) + if not amqp: + raise Exception(AMQP_UNAVAILABLE) + # conditional imports and type checking prevent us from doing this at the module level. + self.recoverable_exceptions = ( + socket.timeout, + amqp.exceptions.ConnectionForced, # e.g. connection closed on rabbitmq sigterm + amqp.exceptions.RecoverableConnectionError, # connection closed + amqp.exceptions.RecoverableChannelError, # publish time out + kombu.exceptions.OperationalError, # ConnectionRefusedError, e.g. when getting a new connection while rabbitmq is down + ) + self.__kombu_version = parse_version(kombu.__version__) self.__url = url self.__manager_name = manager_name self.__amqp_key_prefix = amqp_key_prefix @@ -84,7 +106,7 @@ def __init__( self.consume_uuid_store = consume_uuid_store self.publish_ack_lock = threading.Lock() # Ack manager should sleep before checking for - # repbulishes, but if that changes, need to drain the + # republishes, but if that changes, need to drain the # queue once before the ack manager starts doing its # thing self.ack_manager_thread = self.__start_ack_manager() @@ -119,7 +141,7 @@ def consume(self, queue_name, callback, check=True, connection_kwargs={}): connection.drain_events(timeout=self.__timeout) except socket.timeout: pass - except OSError as exc: + except self.recoverable_exceptions as exc: self.__handle_io_error(exc, heartbeat_thread) except BaseException: log.exception("Problem consuming queue, consumer quitting in problematic fashion!") @@ -161,7 +183,7 @@ def __ack_callback(self, body, message): log.warning('Cannot remove UUID %s from store, already removed', ack_uuid) message.ack() - def __handle_io_error(self, exc, heartbeat_thread): + def __handle_io_error(self, exc: BaseException, heartbeat_thread: Optional[threading.Thread] = None): # In testing, errno is None log.warning('Got %s, will retry: %s', exc.__class__.__name__, exc) try: @@ -233,8 +255,12 @@ def ack_manager(self): log.debug('UUID %s has not been acknowledged, ' 'republishing original message on queue %s', unack_uuid, resubmit_queue) - self.publish(resubmit_queue, payload) - self.publish_uuid_store.set_time(unack_uuid) + try: + self.publish(resubmit_queue, payload) + self.publish_uuid_store.set_time(unack_uuid) + except self.recoverable_exceptions as e: + self.__handle_io_error(e) + continue except Exception: log.exception("Problem with acknowledgement manager, leaving ack_manager method in problematic state!") raise @@ -271,6 +297,9 @@ def errback(exc, interval): publish_kwds["retry_policy"]["errback"] = errback else: publish_kwds = self.__publish_kwds + if self.__kombu_version < MINIMUM_KOMBU_VERSION_PUBLISH_TIMEOUT: + log.warning(f"kombu version {kombu.__version__} does not support timeout argument to publish. Consider updating to 5.2.0 or newer") + publish_kwds.pop("timeout", None) return publish_kwds def __publish_log_prefex(self, transaction_uuid=None): diff --git a/pulsar/client/amqp_exchange_factory.py b/pulsar/client/amqp_exchange_factory.py index 6ca8fd11..94f41f57 100644 --- a/pulsar/client/amqp_exchange_factory.py +++ b/pulsar/client/amqp_exchange_factory.py @@ -11,7 +11,7 @@ def get_exchange(url, manager_name, params): manager_name=manager_name, amqp_key_prefix=params.get("amqp_key_prefix"), connect_ssl=connect_ssl, - publish_kwds=parse_amqp_publish_kwds(params) + publish_kwds=parse_amqp_publish_kwds(params), ) if params.get('amqp_acknowledge', False): exchange_kwds.update(parse_ack_kwds(params, manager_name)) diff --git a/pulsar/client/client.py b/pulsar/client/client.py index d7edff63..e28d84ea 100644 --- a/pulsar/client/client.py +++ b/pulsar/client/client.py @@ -100,6 +100,7 @@ def __init__(self, destination_params, job_id): setattr(self, attr, destination_params.get(attr, None)) self.env = destination_params.get("env", []) self.files_endpoint = destination_params.get("files_endpoint", None) + self.token_endpoint = destination_params.get("token_endpoint", None) default_file_action = self.destination_params.get("default_file_action", "transfer") if default_file_action not in actions: @@ -166,7 +167,8 @@ def __init__(self, destination_params, job_id, job_manager_interface): super().__init__(destination_params, job_id) self.job_manager_interface = job_manager_interface - def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None, dynamic_file_sources=None): + def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None, + dynamic_file_sources=None, token_endpoint=None): """ Queue up the execution of the supplied `command_line` on the remote server. Called launch for historical reasons, should be renamed to @@ -190,6 +192,8 @@ def launch(self, command_line, dependencies_description=None, env=None, remote_s if job_config and 'touch_outputs' in job_config: # message clients pass the entire job config launch_params['submit_extras'] = json_dumps({'touch_outputs': job_config['touch_outputs']}) + if token_endpoint is not None: + launch_params["token_endpoint"] = json_dumps({'token_endpoint': token_endpoint}) if job_config and self.setup_handler.local: # Setup not yet called, job properties were inferred from @@ -344,7 +348,8 @@ def __init__(self, destination_params, job_id, client_manager): self.client_manager = client_manager self.amqp_key_prefix = self.destination_params.get("amqp_key_prefix") - def _build_setup_message(self, command_line, dependencies_description, env, remote_staging, job_config, dynamic_file_sources): + def _build_setup_message(self, command_line, dependencies_description, env, remote_staging, job_config, + dynamic_file_sources, token_endpoint): """ """ launch_params = dict(command_line=command_line, job_id=self.job_id) @@ -359,6 +364,8 @@ def _build_setup_message(self, command_line, dependencies_description, env, remo launch_params['remote_staging'] = remote_staging launch_params['remote_staging']['ssh_key'] = self.ssh_key launch_params['dynamic_file_sources'] = dynamic_file_sources + launch_params['token_endpoint'] = token_endpoint + if job_config and self.setup_handler.local: # Setup not yet called, job properties were inferred from # destination arguments. Hence, must have Pulsar setup job @@ -397,7 +404,8 @@ def _build_status_request_message(self): class MessageJobClient(BaseMessageJobClient): - def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None, dynamic_file_sources=None): + def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None, + dynamic_file_sources=None, token_endpoint=None): """ """ launch_params = self._build_setup_message( @@ -407,6 +415,7 @@ def launch(self, command_line, dependencies_description=None, env=None, remote_s remote_staging=remote_staging, job_config=job_config, dynamic_file_sources=dynamic_file_sources, + token_endpoint=token_endpoint, ) self.client_manager.exchange.publish("setup", launch_params) log.info("Job published to setup message queue: %s", self.job_id) @@ -429,7 +438,8 @@ def __init__(self, destination_params, job_id, client_manager, shell): self.remote_pulsar_path = destination_params["remote_pulsar_path"] self.shell = shell - def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None, dynamic_file_sources=None): + def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None, + dynamic_file_sources=None, token_endpoint=None): """ """ launch_params = self._build_setup_message( @@ -439,6 +449,7 @@ def launch(self, command_line, dependencies_description=None, env=None, remote_s remote_staging=remote_staging, job_config=job_config, dynamic_file_sources=dynamic_file_sources, + token_endpoint=token_endpoint, ) base64_message = to_base64_json(launch_params) submit_command = os.path.join(self.remote_pulsar_path, "scripts", "submit.bash") @@ -479,6 +490,7 @@ def launch( job_config=None, dynamic_file_sources=None, container_info=None, + token_endpoint=None, pulsar_app_config=None ) -> Optional[ExternalId]: """ @@ -490,6 +502,7 @@ def launch( remote_staging=remote_staging, job_config=job_config, dynamic_file_sources=dynamic_file_sources, + token_endpoint=token_endpoint, ) container = None guest_ports = None diff --git a/pulsar/client/manager.py b/pulsar/client/manager.py index 8f2ab74f..9c429734 100644 --- a/pulsar/client/manager.py +++ b/pulsar/client/manager.py @@ -52,6 +52,7 @@ def get_client(self, destination_params: Dict[str, Any], job_id: str, **kwargs: def shutdown(self, ensure_cleanup=False) -> None: """Mark client manager's work as complete and clean up resources it managed.""" + return class ClientManager(ClientManagerInterface): @@ -104,10 +105,6 @@ def get_client(self, destination_params, job_id, **kwargs): job_manager_interface = job_manager_interface_class(**job_manager_interface_args) return self.client_class(destination_params, job_id, job_manager_interface, **self.extra_client_kwds) - def shutdown(self, ensure_cleanup=False): - """Mark client manager's work as complete and clean up resources it managed.""" - pass - try: from galaxy.jobs.runners.util.cli import factory as cli_factory @@ -336,7 +333,7 @@ def __perform_transfer(self, transfer_info): def __init_transfer_threads(self, num_transfer_threads): self.num_transfer_threads = num_transfer_threads self.transfer_queue = Queue() - for i in range(num_transfer_threads): + for _ in range(num_transfer_threads): t = threading.Thread(target=self._transfer_worker) t.daemon = True t.start() diff --git a/pulsar/client/staging/down.py b/pulsar/client/staging/down.py index c14a9bec..ea8f8aa1 100644 --- a/pulsar/client/staging/down.py +++ b/pulsar/client/staging/down.py @@ -135,7 +135,7 @@ def __collect_job_directory_files(self): ) def __realized_dynamic_file_source_references(self): - references = [] + references = {"filename": [], "extra_files": []} def record_references(from_dict): if isinstance(from_dict, list): @@ -143,8 +143,8 @@ def record_references(from_dict): record_references(v) elif isinstance(from_dict, dict): for k, v in from_dict.items(): - if k == "filename": - references.append(v) + if k in references: + references[k].append(v) if isinstance(v, (list, dict)): record_references(v) @@ -182,7 +182,7 @@ def __collect_directory_files(self, directory, contents, output_type): continue if self.client_outputs.dynamic_match(name): collect = True - elif name in dynamic_file_source_references: + elif name in dynamic_file_source_references["filename"] or any(name.startswith(r) for r in dynamic_file_source_references["extra_files"]): collect = True if collect: diff --git a/pulsar/client/staging/up.py b/pulsar/client/staging/up.py index bfdddc29..9f08bef8 100644 --- a/pulsar/client/staging/up.py +++ b/pulsar/client/staging/up.py @@ -70,7 +70,7 @@ def submit_job(client, client_job_description, job_config=None): # potentially duplicated but we don't want to count on remote staging to include this # it needs to be in the response to Pulsar even Pulsar is inititing staging actions launch_kwds["dynamic_file_sources"] = client_job_description.client_outputs.dynamic_file_sources - + launch_kwds["token_endpoint"] = client.token_endpoint # for pulsar modalities that skip the explicit "setup" step, give them a chance to set an external # id from the submission process (e.g. to TES). launch_response = client.launch(**launch_kwds) diff --git a/pulsar/client/test/check.py b/pulsar/client/test/check.py index 27425837..5ddc00f4 100644 --- a/pulsar/client/test/check.py +++ b/pulsar/client/test/check.py @@ -226,7 +226,7 @@ def run(options): test_unicode = getattr(options, "test_unicode", False) # TODO Switch this in integration tests legacy_galaxy_json = getattr(options, "legacy_galaxy_json", False) cmd_text = EXAMPLE_UNICODE_TEXT if test_unicode else "Hello World" - command_line_params = ( + command_line_arguments = ( temp_tool_path, temp_config_path, temp_input_path, @@ -246,7 +246,8 @@ def run(options): "1" if legacy_galaxy_json else "0", ) assert os.path.exists(temp_index_path) - command_line = 'python %s "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s"' % command_line_params + quoted_args = (f'"{a}"' for a in command_line_arguments) + command_line = f"python {' '.join(quoted_args)} 2> ../metadata/tool_stderr > ../metadata/tool_stdout" config_files = [temp_config_path] client_inputs = [] client_inputs.append(ClientInput(temp_input_path, CLIENT_INPUT_PATH_TYPES.INPUT_PATH)) diff --git a/pulsar/core.py b/pulsar/core.py index 4f124067..c53dd3f2 100644 --- a/pulsar/core.py +++ b/pulsar/core.py @@ -1,5 +1,6 @@ """ """ +import logging import os from logging import getLogger from tempfile import tempdir @@ -9,12 +10,17 @@ from galaxy.tool_util.deps import build_dependency_manager from galaxy.util.bunch import Bunch -from pulsar import messaging +from pulsar import ( + __version__ as pulsar_version, + messaging, +) from pulsar.cache import Cache from pulsar.manager_factory import build_managers from pulsar.tools import ToolBox from pulsar.tools.authorization import get_authorizer +from pulsar.user_auth.manager import UserAuthManager + log = getLogger(__name__) DEFAULT_PRIVATE_TOKEN = None @@ -34,6 +40,7 @@ def __init__(self, **conf): if conf is None: conf = {} self.config_dir = conf.get('config_dir', os.getcwd()) + self.__setup_sentry_integration(conf) self.__setup_staging_directory(conf.get("staging_directory", DEFAULT_STAGING_DIRECTORY)) self.__setup_private_token(conf.get("private_token", DEFAULT_PRIVATE_TOKEN)) self.__setup_persistence_directory(conf.get("persistence_directory", None)) @@ -41,6 +48,7 @@ def __init__(self, **conf): self.__setup_object_store(conf) self.__setup_dependency_manager(conf) self.__setup_job_metrics(conf) + self.__setup_user_auth_manager(conf) self.__setup_managers(conf) self.__setup_file_cache(conf) self.__setup_bind_to_message_queue(conf) @@ -66,6 +74,9 @@ def __setup_bind_to_message_queue(self, conf): queue_state = messaging.bind_app(self, message_queue_url, conf) self.__queue_state = queue_state + def __setup_user_auth_manager(self, conf): + self.user_auth_manager = UserAuthManager(conf) + def __setup_tool_config(self, conf): """ Setups toolbox object and authorization mechanism based @@ -84,6 +95,27 @@ def __setup_tool_config(self, conf): self.toolbox = toolbox self.authorizer = get_authorizer(toolbox) + def __setup_sentry_integration(self, conf): + sentry_dsn = conf.get("sentry_dsn") + if sentry_dsn: + try: + import sentry_sdk + from sentry_sdk.integrations.logging import LoggingIntegration + except ImportError: + log.error("sentry_dsn configured, but sentry-sdk not installed") + sentry_sdk = None + LoggingIntegration = None + if sentry_sdk: + sentry_logging = LoggingIntegration( + level=logging.INFO, # Capture info and above as breadcrumbs + event_level=getattr(logging, conf.get("sentry_event_level", "WARNING")), # Send warnings as events + ) + sentry_sdk.init( + sentry_dsn, + release=pulsar_version, + integrations=[sentry_logging], + ) + def __setup_staging_directory(self, staging_directory): self.staging_directory = os.path.abspath(staging_directory) diff --git a/pulsar/manager_endpoint_util.py b/pulsar/manager_endpoint_util.py index 257ffed8..bf96dd10 100644 --- a/pulsar/manager_endpoint_util.py +++ b/pulsar/manager_endpoint_util.py @@ -38,6 +38,8 @@ def __job_complete_dict(complete_status, manager, job_id): return_code = None stdout_contents = manager.stdout_contents(job_id).decode("utf-8") stderr_contents = manager.stderr_contents(job_id).decode("utf-8") + job_stdout_contents = manager.job_stdout_contents(job_id).decode("utf-8") + job_stderr_contents = manager.job_stderr_contents(job_id).decode("utf-8") job_directory = manager.job_directory(job_id) as_dict = dict( job_id=job_id, @@ -46,6 +48,8 @@ def __job_complete_dict(complete_status, manager, job_id): returncode=return_code, stdout=stdout_contents, stderr=stderr_contents, + job_stdout=job_stdout_contents, + job_stderr=job_stderr_contents, working_directory=job_directory.working_directory(), metadata_directory=job_directory.metadata_directory(), job_directory=job_directory.job_directory, @@ -77,6 +81,8 @@ def submit_job(manager, job_config): submit_params = job_config.get('submit_params', {}) touch_outputs = job_config.get('touch_outputs', []) dynamic_file_sources = job_config.get("dynamic_file_sources", None) + token_endpoint = job_config.get("token_endpoint", None) + job_config = None if setup_params or force_setup: input_job_id = setup_params.get("job_id", job_id) @@ -104,6 +110,7 @@ def submit_job(manager, job_config): "env": env, "setup_params": setup_params, "dynamic_file_sources": dynamic_file_sources, + "token_endpoint": token_endpoint, } manager.preprocess_and_launch(job_id, launch_config) except Exception: diff --git a/pulsar/managers/__init__.py b/pulsar/managers/__init__.py index e7af36b2..317e38b9 100644 --- a/pulsar/managers/__init__.py +++ b/pulsar/managers/__init__.py @@ -53,15 +53,25 @@ def return_code(self, job_id): @abstractmethod def stdout_contents(self, job_id): """ - After completion, return contents of stdout associated with specified - job. + After completion, return contents of stdout of the tool script. """ @abstractmethod def stderr_contents(self, job_id): """ - After completion, return contents of stderr associated with specified - job. + After completion, return contents of stderr of the tool script. + """ + + @abstractmethod + def job_stdout_contents(self, job_id): + """ + After completion, return contents of stdout of the job as produced by the job runner. + """ + + @abstractmethod + def job_stderr_contents(self, job_id): + """ + After completion, return contents of stderr of the job as produced by the job runner. """ @abstractmethod @@ -107,6 +117,12 @@ def stdout_contents(self, *args, **kwargs): def stderr_contents(self, *args, **kwargs): return self._proxied_manager.stderr_contents(*args, **kwargs) + def job_stdout_contents(self, *args, **kwargs): + return self._proxied_manager.job_stdout_contents(*args, **kwargs) + + def job_stderr_contents(self, *args, **kwargs): + return self._proxied_manager.job_stderr_contents(*args, **kwargs) + def kill(self, *args, **kwargs): return self._proxied_manager.kill(*args, **kwargs) diff --git a/pulsar/managers/base/__init__.py b/pulsar/managers/base/__init__.py index 2bbe0e8e..5c89eb66 100644 --- a/pulsar/managers/base/__init__.py +++ b/pulsar/managers/base/__init__.py @@ -73,6 +73,7 @@ def __init__(self, name, app, **kwds): self.tmp_dir = kwds.get("tmp_dir", None) self.debug = str(kwds.get("debug", False)).lower() == "true" self.authorizer = app.authorizer + self.user_auth_manager = app.user_auth_manager self.__init_system_properties() self.__init_env_vars(**kwds) self.dependency_manager = app.dependency_manager @@ -179,6 +180,8 @@ def _check_execution(self, job_id, tool_id, command_line): log.debug("job_id: {} - Checking authorization of command_line [{}]".format(job_id, command_line)) authorization = self._get_authorization(job_id, tool_id) job_directory = self._job_directory(job_id) + self.user_auth_manager.authorize(job_id, job_directory) + tool_files_dir = job_directory.tool_files_directory() for file in self._list_dir(tool_files_dir): if os.path.isdir(join(tool_files_dir, file)): diff --git a/pulsar/managers/base/base_drmaa.py b/pulsar/managers/base/base_drmaa.py index e966a1de..d569c432 100644 --- a/pulsar/managers/base/base_drmaa.py +++ b/pulsar/managers/base/base_drmaa.py @@ -50,8 +50,8 @@ def _get_status_external(self, external_id): }[drmaa_state] def _build_template_attributes(self, job_id, command_line, dependencies_description=None, env=[], submit_params={}, setup_params=None): - stdout_path = self._stdout_path(job_id) - stderr_path = self._stderr_path(job_id) + stdout_path = self._job_stdout_path(job_id) + stderr_path = self._job_stderr_path(job_id) working_directory = self.job_directory(job_id).working_directory() attributes = { diff --git a/pulsar/managers/base/directory.py b/pulsar/managers/base/directory.py index 96a0055e..470e5285 100644 --- a/pulsar/managers/base/directory.py +++ b/pulsar/managers/base/directory.py @@ -15,8 +15,10 @@ # should be able to replace metadata backing with non-file stuff now that # the abstractions are fairly well utilized. JOB_FILE_RETURN_CODE = "return_code" -JOB_FILE_STANDARD_OUTPUT = "stdout" -JOB_FILE_STANDARD_ERROR = "stderr" +TOOL_FILE_STANDARD_OUTPUT = os.path.join("metadata", "tool_stdout") +TOOL_FILE_STANDARD_ERROR = os.path.join("metadata", "tool_stderr") +JOB_FILE_STANDARD_OUTPUT = os.path.join("metadata", "job_stdout") +JOB_FILE_STANDARD_ERROR = os.path.join("metadata", "job_stderr") JOB_FILE_TOOL_ID = "tool_id" JOB_FILE_TOOL_VERSION = "tool_version" JOB_FILE_CANCELLED = "cancelled" @@ -34,9 +36,23 @@ def return_code(self, job_id): return int(return_code_str) if return_code_str and return_code_str != PULSAR_UNKNOWN_RETURN_CODE else return_code_str def stdout_contents(self, job_id): - return self._read_job_file(job_id, JOB_FILE_STANDARD_OUTPUT, size=self.maximum_stream_size, default=b"") + try: + return self._read_job_file(job_id, TOOL_FILE_STANDARD_OUTPUT, size=self.maximum_stream_size) + except FileNotFoundError: + # Could be old job finishing up, drop in 2024? + return self._read_job_file(job_id, "tool_stdout", size=self.maximum_stream_size, default=b"") def stderr_contents(self, job_id): + try: + return self._read_job_file(job_id, TOOL_FILE_STANDARD_ERROR, size=self.maximum_stream_size) + except FileNotFoundError: + # Could be old job finishing up, drop in 2024? + return self._read_job_file(job_id, "tool_stderr", size=self.maximum_stream_size, default=b"") + + def job_stdout_contents(self, job_id): + return self._read_job_file(job_id, JOB_FILE_STANDARD_OUTPUT, size=self.maximum_stream_size, default=b"") + + def job_stderr_contents(self, job_id): return self._read_job_file(job_id, JOB_FILE_STANDARD_ERROR, size=self.maximum_stream_size, default=b"") def read_command_line(self, job_id): @@ -47,10 +63,16 @@ def read_command_line(self, job_id): command_line = json.loads(command_line) return command_line - def _stdout_path(self, job_id): + def _tool_stdout_path(self, job_id): + return self._job_file(job_id, TOOL_FILE_STANDARD_OUTPUT) + + def _tool_stderr_path(self, job_id): + return self._job_file(job_id, TOOL_FILE_STANDARD_ERROR) + + def _job_stdout_path(self, job_id): return self._job_file(job_id, JOB_FILE_STANDARD_OUTPUT) - def _stderr_path(self, job_id): + def _job_stderr_path(self, job_id): return self._job_file(job_id, JOB_FILE_STANDARD_ERROR) def _return_code_path(self, job_id): @@ -100,10 +122,10 @@ def _was_cancelled(self, job_id): log.info("Failed to determine if job with id %s was cancelled, assuming no." % job_id) return False - def _open_standard_output(self, job_id): + def _open_job_standard_output(self, job_id): return self._job_directory(job_id).open_file(JOB_FILE_STANDARD_OUTPUT, 'w') - def _open_standard_error(self, job_id): + def _open_job_standard_error(self, job_id): return self._job_directory(job_id).open_file(JOB_FILE_STANDARD_ERROR, 'w') def _check_execution_with_tool_file(self, job_id, command_line): diff --git a/pulsar/managers/queued_cli.py b/pulsar/managers/queued_cli.py index bc24dd2f..6411600e 100644 --- a/pulsar/managers/queued_cli.py +++ b/pulsar/managers/queued_cli.py @@ -27,8 +27,8 @@ def __init__(self, name, app, **kwds): def launch(self, job_id, command_line, submit_params={}, dependencies_description=None, env=[], setup_params=None): self._check_execution_with_tool_file(job_id, command_line) shell, job_interface = self.__get_cli_plugins() - stdout_path = self._stdout_path(job_id) - stderr_path = self._stderr_path(job_id) + stdout_path = self._job_stdout_path(job_id) + stderr_path = self._job_stderr_path(job_id) job_name = self._job_name(job_id) command_line = self._expand_command_line( job_id, command_line, dependencies_description, job_directory=self.job_directory(job_id).job_directory diff --git a/pulsar/managers/queued_condor.py b/pulsar/managers/queued_condor.py index 688aca94..62e3677c 100644 --- a/pulsar/managers/queued_condor.py +++ b/pulsar/managers/queued_condor.py @@ -46,8 +46,8 @@ def launch(self, job_id, command_line, submit_params={}, dependencies_descriptio submit_params.update(self.submission_params) build_submit_params = dict( executable=job_file_path, - output=self._stdout_path(job_id), - error=self._stderr_path(job_id), + output=self._job_stdout_path(job_id), + error=self._job_stderr_path(job_id), user_log=log_path, query_params=submit_params, ) diff --git a/pulsar/managers/unqueued.py b/pulsar/managers/unqueued.py index b807e2fa..4c316b99 100644 --- a/pulsar/managers/unqueued.py +++ b/pulsar/managers/unqueued.py @@ -188,8 +188,8 @@ def _run(self, job_id, command_line, montior: MonitorStyle = MonitorStyle.BACKGR def _proc_for_job_id(self, job_id, command_line): job_directory = self.job_directory(job_id) working_directory = job_directory.working_directory() - stdout = self._open_standard_output(job_id) - stderr = self._open_standard_error(job_id) + stdout = self._open_job_standard_output(job_id) + stderr = self._open_job_standard_error(job_id) proc = execute(command_line=command_line, working_directory=working_directory, stdout=stdout, @@ -240,10 +240,6 @@ def launch(self, job_id, command_line, submit_params={}, dependencies_descriptio command_line = self._prepare_run(job_id, command_line, dependencies_description=dependencies_description, env=env, setup_params=setup_params) job_directory = self.job_directory(job_id) working_directory = job_directory.working_directory() - command_line += " > '{}' 2> '{}'".format( - self._stdout_path(job_id), - self._stderr_path(job_id), - ) command_line = "cd '{}'; sh {}".format(working_directory, command_line) log.info("writing command line [%s] for co-execution" % command_line) self._write_command_line(job_id, command_line) diff --git a/pulsar/managers/util/job_script/MEMORY_STATEMENT.sh b/pulsar/managers/util/job_script/MEMORY_STATEMENT.sh index 09fc2f60..fda7266e 100644 --- a/pulsar/managers/util/job_script/MEMORY_STATEMENT.sh +++ b/pulsar/managers/util/job_script/MEMORY_STATEMENT.sh @@ -1,10 +1,8 @@ -if [ -z "$GALAXY_MEMORY_MB" ]; then - if [ -n "$SLURM_JOB_ID" ]; then - GALAXY_MEMORY_MB=`scontrol -do show job "$SLURM_JOB_ID" | sed 's/.*\( \|^\)Mem=\([0-9][0-9]*\)\( \|$\).*/\2/p;d'` 2>memory_statement.log - fi - if [ -n "$SGE_HGR_h_vmem" ]; then - GALAXY_MEMORY_MB=`echo "$SGE_HGR_h_vmem" | sed 's/G$/ * 1024/' | bc | cut -d"." -f1` 2>memory_statement.log - fi +if [ -n "$SLURM_JOB_ID" ]; then + GALAXY_MEMORY_MB=`scontrol -do show job "$SLURM_JOB_ID" | sed 's/.*\( \|^\)Mem=\([0-9][0-9]*\)\( \|$\).*/\2/p;d'` 2>$metadata_directory/memory_statement.log +fi +if [ -n "$SGE_HGR_h_vmem" ]; then + GALAXY_MEMORY_MB_PER_SLOT=`echo "$SGE_HGR_h_vmem" | sed 's/G$/ * 1024/' | bc | cut -d"." -f1` 2>$metadata_directory/memory_statement.log fi if [ -z "$GALAXY_MEMORY_MB_PER_SLOT" -a -n "$GALAXY_MEMORY_MB" ]; then @@ -12,5 +10,5 @@ if [ -z "$GALAXY_MEMORY_MB_PER_SLOT" -a -n "$GALAXY_MEMORY_MB" ]; then elif [ -z "$GALAXY_MEMORY_MB" -a -n "$GALAXY_MEMORY_MB_PER_SLOT" ]; then GALAXY_MEMORY_MB=$(($GALAXY_MEMORY_MB_PER_SLOT * $GALAXY_SLOTS)) fi -[ "${GALAXY_MEMORY_MB--1}" -gt 0 ] 2>>memory_statement.log && export GALAXY_MEMORY_MB || unset GALAXY_MEMORY_MB -[ "${GALAXY_MEMORY_MB_PER_SLOT--1}" -gt 0 ] 2>>memory_statement.log && export GALAXY_MEMORY_MB_PER_SLOT || unset GALAXY_MEMORY_MB_PER_SLOT +[ "${GALAXY_MEMORY_MB--1}" -gt 0 ] 2>>$metadata_directory/memory_statement.log && export GALAXY_MEMORY_MB || unset GALAXY_MEMORY_MB +[ "${GALAXY_MEMORY_MB_PER_SLOT--1}" -gt 0 ] 2>>$metadata_directory/memory_statement.log && export GALAXY_MEMORY_MB_PER_SLOT || unset GALAXY_MEMORY_MB_PER_SLOT diff --git a/pulsar/managers/util/job_script/__init__.py b/pulsar/managers/util/job_script/__init__.py index d3512d9f..5209bcd4 100644 --- a/pulsar/managers/util/job_script/__init__.py +++ b/pulsar/managers/util/job_script/__init__.py @@ -23,7 +23,7 @@ SLOTS_STATEMENT_CLUSTER_DEFAULT = resource_string(__name__, "CLUSTER_SLOTS_STATEMENT.sh") -MEMORY_STATEMENT_DEFAULT = resource_string(__name__, "MEMORY_STATEMENT.sh") +MEMORY_STATEMENT_DEFAULT_TEMPLATE = Template(resource_string(__name__, "MEMORY_STATEMENT.sh")) SLOTS_STATEMENT_SINGLE = """ GALAXY_SLOTS="1" @@ -42,14 +42,13 @@ DEFAULT_INTEGRITY_CHECK = True DEFAULT_INTEGRITY_COUNT = 35 DEFAULT_INTEGRITY_SLEEP = 0.25 -REQUIRED_TEMPLATE_PARAMS = ["working_directory", "command"] +REQUIRED_TEMPLATE_PARAMS = ["metadata_directory", "working_directory", "command"] OPTIONAL_TEMPLATE_PARAMS: Dict[str, Any] = { "galaxy_lib": None, "galaxy_virtual_env": None, "headers": "", "env_setup_commands": [], "slots_statement": SLOTS_STATEMENT_CLUSTER_DEFAULT, - "memory_statement": MEMORY_STATEMENT_DEFAULT, "instrument_pre_commands": "", "instrument_post_commands": "", "integrity_injection": INTEGRITY_INJECTION, @@ -86,12 +85,17 @@ def job_script(template=DEFAULT_JOB_FILE_TEMPLATE, **kwds): """ if any(param not in kwds for param in REQUIRED_TEMPLATE_PARAMS): raise Exception("Failed to create job_script, a required parameter is missing.") + metadata_directory = kwds.get("metadata_directory", kwds["working_directory"]) job_instrumenter = kwds.get("job_instrumenter", None) if job_instrumenter: del kwds["job_instrumenter"] working_directory = kwds.get("metadata_directory", kwds["working_directory"]) kwds["instrument_pre_commands"] = job_instrumenter.pre_execute_commands(working_directory) or "" kwds["instrument_post_commands"] = job_instrumenter.post_execute_commands(working_directory) or "" + if "memory_statement" not in kwds: + kwds["memory_statement"] = MEMORY_STATEMENT_DEFAULT_TEMPLATE.safe_substitute( + metadata_directory=metadata_directory + ) # Setup home directory var kwds["home_directory"] = kwds.get("home_directory", os.path.join(kwds["working_directory"], "home")) diff --git a/pulsar/messaging/bind_amqp.py b/pulsar/messaging/bind_amqp.py index 4d7798e9..c735a3cb 100644 --- a/pulsar/messaging/bind_amqp.py +++ b/pulsar/messaging/bind_amqp.py @@ -15,6 +15,7 @@ TYPED_PARAMS = { "amqp_consumer_timeout": lambda val: None if str(val) == "None" else float(val), + "amqp_publish_timeout": lambda val: None if str(val) == "None" else float(val), "amqp_publish_retry": asbool, "amqp_publish_retry_max_retries": int, "amqp_publish_retry_interval_start": int, diff --git a/pulsar/user_auth/__init__.py b/pulsar/user_auth/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pulsar/user_auth/manager.py b/pulsar/user_auth/manager.py new file mode 100644 index 00000000..08b16f77 --- /dev/null +++ b/pulsar/user_auth/manager.py @@ -0,0 +1,66 @@ +from abc import ABC + +import inspect + + +class UserAuthManager(ABC): + """ + Authorization/Authentication manager. + """ + + def __init__(self, config): + self._authorization_methods = [] + self._authentication_methods = [] + + try: + user_auth = config.get("user_auth", None) + if not user_auth: + return + authentications = user_auth.pop("authentication", []) + authorizations = user_auth.pop("authorization", []) + + for authorization in authorizations: + authorization.update(user_auth) + obj = get_object("pulsar.user_auth.methods." + authorization["type"], "auth_type", + authorization["type"]) + self._authorization_methods.append(obj(authorization)) + + for authentication in authentications: + authentication.update(user_auth) + obj = get_object("pulsar.user_auth.methods." + authentication["type"], "auth_type", + authentication["type"]) + self._authentication_methods.append(obj(authentication)) + except Exception as e: + raise Exception("cannot read auth configuration") from e + + def authorize(self, job_id, job_directory): + authentication_info = self.__authenticate(job_id, job_directory) + + if len(self._authorization_methods) == 0: + return True + for method in self._authorization_methods: + res = method.authorize(authentication_info) + if res: + return True + + raise Exception("Could not authorize job execution on remote resource") + + def __authenticate(self, job_id, job_directory): + if len(self._authentication_methods) == 0: + return {} + for method in self._authentication_methods: + res = method.authenticate(job_directory) + if res: + return res + + raise Exception("Could not authenticate job %s" % job_id) + + +def get_object(module_name, attribute_name, attribute_value): + module = __import__(module_name) + for comp in module_name.split(".")[1:]: + module = getattr(module, comp) + for _, obj in inspect.getmembers(module): + if inspect.isclass(obj) and hasattr(obj, attribute_name) and getattr(obj, attribute_name) == attribute_value: + return obj + raise Exception("Cannot find object %s with attribute %s=%s " % (module_name, attribute_name, attribute_value)) diff --git a/pulsar/user_auth/methods/__init__.py b/pulsar/user_auth/methods/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pulsar/user_auth/methods/allow_all.py b/pulsar/user_auth/methods/allow_all.py new file mode 100644 index 00000000..c51c521f --- /dev/null +++ b/pulsar/user_auth/methods/allow_all.py @@ -0,0 +1,18 @@ +from pulsar.user_auth.methods.interface import AuthMethod + + +class AlwaysAllowAuthMethod(AuthMethod): + """ + Always allow + """ + + def __init__(self, _config): + pass + + auth_type = "allow_all" + + def authorize(self, authentication_info): + return True + + def authenticate(self, job_directory): + return {"username": "anonymous"} diff --git a/pulsar/user_auth/methods/interface.py b/pulsar/user_auth/methods/interface.py new file mode 100644 index 00000000..983ca9ca --- /dev/null +++ b/pulsar/user_auth/methods/interface.py @@ -0,0 +1,15 @@ +from abc import ABC, abstractmethod + + +class AuthMethod(ABC): + """ + Defines the interface to various authentication/authorization methods. + """ + + @abstractmethod + def authorize(self, authentication_info): + raise NotImplementedError("a concrete class should implement this") + + @abstractmethod + def authenticate(self, job_directory): + raise NotImplementedError("a concrete class should implement this") diff --git a/pulsar/user_auth/methods/oidc.py b/pulsar/user_auth/methods/oidc.py new file mode 100644 index 00000000..5a14957d --- /dev/null +++ b/pulsar/user_auth/methods/oidc.py @@ -0,0 +1,74 @@ +import requests +import base64 +import json +import jwt +import re +from cryptography.hazmat.backends import default_backend +from cryptography.x509 import load_der_x509_certificate + +from pulsar.user_auth.methods.interface import AuthMethod + +import logging + +log = logging.getLogger(__name__) + + +def get_token(job_directory, provider): + log.debug("Getting OIDC token for provider " + provider + " from Galaxy") + endpoint = job_directory.load_metadata("launch_config")["token_endpoint"] + endpoint = endpoint + "&provider=" + provider + r = requests.get(url=endpoint) + return r.text + + +class OIDCAuth(AuthMethod): + """ + Authorization based on OIDC tokens + """ + auth_type = "oidc" + + def __init__(self, config): + try: + self._provider = config["oidc_provider"] + self._jwks_url = config["oidc_jwks_url"] + self._username_in_token = config["oidc_username_in_token"] + self._username_template = config["oidc_username_template"] + + except Exception as e: + raise Exception("cannot read OIDCAuth configuration") from e + + def _verify_token(self, token): + try: + # Obtain appropriate cert from JWK URI + key_set = requests.get(self._jwks_url, timeout=5) + encoded_header, rest = token.split('.', 1) + headerobj = json.loads(base64.b64decode(encoded_header + '==').decode('utf8')) + key_id = headerobj['kid'] + for key in key_set.json()['keys']: + if key['kid'] == key_id: + x5c = key['x5c'][0] + break + else: + raise jwt.DecodeError('Cannot find kid ' + key_id) + cert = load_der_x509_certificate(base64.b64decode(x5c), default_backend()) + # Decode token (exp date is checked automatically) + decoded_token = jwt.decode( + token, + key=cert.public_key(), + algorithms=['RS256'], + options={'exp': True, 'verify_aud': False} + ) + return decoded_token + except Exception as error: + raise Exception("Error verifying jwt token") from error + + def authorize(self, authentication_info): + raise NotImplementedError("authorization not implemented for this class") + + def authenticate(self, job_directory): + token = get_token(job_directory, self._provider) + + decoded_token = self._verify_token(token) + user = decoded_token[self._username_in_token] + user = re.match(self._username_template, user).group(0) + return {"username": user} diff --git a/pulsar/user_auth/methods/userlist.py b/pulsar/user_auth/methods/userlist.py new file mode 100644 index 00000000..30f3c449 --- /dev/null +++ b/pulsar/user_auth/methods/userlist.py @@ -0,0 +1,21 @@ +from pulsar.user_auth.methods.interface import AuthMethod + + +class UserListAuth(AuthMethod): + """ + Defines authorization user by username + """ + + def __init__(self, config): + try: + self._allowed_users = config["userlist_allowed_users"] + except Exception as e: + raise Exception("cannot read UsernameAuth configuration") from e + + auth_type = "userlist" + + def authorize(self, authentication_info): + return authentication_info["username"] in self._allowed_users + + def authenticate(self, job_directory): + raise NotImplementedError("authentication not implemented for this class") diff --git a/pulsar/web/framework.py b/pulsar/web/framework.py index 20fb7164..b2abce1e 100644 --- a/pulsar/web/framework.py +++ b/pulsar/web/framework.py @@ -69,7 +69,7 @@ def add_args(func_args, arg_values): if func_arg not in args and func_arg in arg_values: args[func_arg] = arg_values[func_arg] - func_args = inspect.getargspec(func).args + func_args = inspect.getfullargspec(func).args for arg_dict in arg_dicts: add_args(func_args, arg_dict) @@ -108,7 +108,7 @@ def __handle_access(self, req, environ, start_response): def __build_args(self, func, args, req, environ): args = build_func_args(func, args, req.GET, self._app_args(args, req)) - func_args = inspect.getargspec(func).args + func_args = inspect.getfullargspec(func).args for func_arg in func_args: if func_arg == "ip": diff --git a/requirements.txt b/requirements.txt index de5a11d4..46f29c85 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,13 +2,14 @@ webob psutil PasteDeploy pyyaml -galaxy-job-metrics -galaxy-objectstore -galaxy-tool-util -galaxy-util +galaxy-job-metrics>=19.9.0 +galaxy-objectstore>=19.9.0 +galaxy-tool-util>=19.9.0 +galaxy-util>=22.1.2 paramiko typing-extensions pydantic-tes>=0.1.5 +pyjwt ## Uncomment if using DRMAA queue manager. #drmaa diff --git a/setup.py b/setup.py index 6976216c..68faeed3 100644 --- a/setup.py +++ b/setup.py @@ -31,10 +31,6 @@ if PULSAR_GALAXY_LIB: requirements = [r for r in requirements if not r.startswith("galaxy-")] -test_requirements = [ - # TODO: put package test requirements here -] - _version_re = re.compile(r'__version__\s+=\s+(.*)') @@ -79,6 +75,8 @@ 'pulsar.messaging', 'pulsar.scripts', 'pulsar.tools', + 'pulsar.user_auth', + 'pulsar.user_auth.methods', 'pulsar.util', 'pulsar.util.pastescript', 'pulsar.web', @@ -108,6 +106,7 @@ include_package_data=True, install_requires=requirements, extras_require={ + 'amqp': ['kombu'], 'web': ['Paste', 'PasteScript'], 'galaxy_extended_metadata': ['galaxy-job-execution', 'galaxy-util[template]'], }, @@ -128,7 +127,6 @@ 'Programming Language :: Python :: 3.8', 'Programming Language :: Python :: 3.9', 'Programming Language :: Python :: 3.10', + 'Programming Language :: Python :: 3.11', ], - test_suite='test', - tests_require=test_requirements ) diff --git a/test/client_staging_test.py b/test/client_staging_test.py index b161de9e..86298ef6 100644 --- a/test/client_staging_test.py +++ b/test/client_staging_test.py @@ -11,6 +11,7 @@ TEST_REQUIREMENT_1 = ToolRequirement("test1", "1.0") TEST_REQUIREMENT_2 = ToolRequirement("test2", "1.0") TEST_ENV_1 = dict(name="x", value="y") +TEST_TOKEN_ENDPOINT = "endpoint" class TestStager(TempDirectoryTestCase): @@ -153,6 +154,7 @@ def __init__(self, temp_directory, tool): self.default_file_action = "transfer" self.action_config_path = None self.files_endpoint = None + self.token_endpoint = TEST_TOKEN_ENDPOINT self.expected_tool = tool self.job_id = "1234" self.expected_command_line = None @@ -176,11 +178,13 @@ def setup(self, tool_id, tool_version, use_metadata=False): assert tool_version == self.expected_tool.version return {} - def launch(self, command_line, dependencies_description, job_config={}, remote_staging={}, env=[], dynamic_file_sources=None): + def launch(self, command_line, dependencies_description, job_config={}, remote_staging={}, env=[], dynamic_file_sources=None, + token_endpoint=None): if self.expected_command_line is not None: message = "Excepected command line {}, got {}".format(self.expected_command_line, command_line) assert self.expected_command_line == command_line, message assert dependencies_description.requirements == [TEST_REQUIREMENT_1, TEST_REQUIREMENT_2] + assert token_endpoint == TEST_TOKEN_ENDPOINT assert env == [TEST_ENV_1] def expect_command_line(self, expected_command_line): diff --git a/test/manager_test.py b/test/manager_test.py index 147456c9..09491b95 100644 --- a/test/manager_test.py +++ b/test/manager_test.py @@ -2,7 +2,7 @@ from os.path import join -from .test_utils import BaseManagerTestCase +from .test_utils import BaseManagerTestCase, get_failing_user_auth_manager class ManagerTest(BaseManagerTestCase): @@ -34,6 +34,12 @@ def test_unauthorized_command_line(self): with self.assertRaises(Exception): self.manager.launch(job_id, 'python') + def test_unauthorized_user(self): + self.manager.user_auth_manager = get_failing_user_auth_manager() + job_id = self.manager.setup_job("123", "tool1", "1.0.0") + with self.assertRaises(Exception): + self.manager.launch(job_id, 'python') + def test_id_assigners(self): self._set_manager(assign_ids="galaxy") job_id = self.manager.setup_job("123", "tool1", "1.0.0") diff --git a/test/persistence_test.py b/test/persistence_test.py index 99367a4f..3c90868f 100644 --- a/test/persistence_test.py +++ b/test/persistence_test.py @@ -7,7 +7,7 @@ from pulsar.tools.authorization import get_authorizer from .test_utils import ( temp_directory, - TestDependencyManager + TestDependencyManager, get_test_user_auth_manager ) from galaxy.job_metrics import NULL_JOB_INSTRUMENTER from galaxy.util.bunch import Bunch @@ -125,6 +125,7 @@ def _app(): staging_directory=staging_directory, persistence_directory=staging_directory, authorizer=get_authorizer(None), + user_auth_manager=get_test_user_auth_manager(), dependency_manager=TestDependencyManager(), job_metrics=Bunch(default_job_instrumenter=NULL_JOB_INSTRUMENTER), object_store=None, diff --git a/test/test_utils.py b/test/test_utils.py index f8f88fb8..e41f9130 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -32,6 +32,7 @@ from pulsar.managers.util import drmaa from pulsar.tools import ToolBox from pulsar.managers.base import JobDirectory +from pulsar.user_auth.manager import UserAuthManager from unittest import TestCase, skip @@ -171,13 +172,16 @@ def setUp(self): self.app = minimal_app_for_managers() self.staging_directory = self.app.staging_directory self.authorizer = self.app.authorizer + self.user_auth_manager = self.app.user_auth_manager def tearDown(self): rmtree(self.staging_directory) @nottest def _test_simple_execution(self, manager, timeout=None): - command = """python -c "import sys; sys.stdout.write(\'Hello World!\'); sys.stdout.flush(); sys.stderr.write(\'moo\'); sys.stderr.flush()" """ + command = """ +python -c "import sys; sys.stdout.write(\'Hello World!\'); sys.stdout.flush(); sys.stderr.write(\'moo\'); sys.stderr.flush()" \ +2> ../metadata/tool_stderr > ../metadata/tool_stdout""" job_id = manager.setup_job("123", "tool1", "1.0.0") manager.launch(job_id, command) @@ -185,8 +189,10 @@ def _test_simple_execution(self, manager, timeout=None): while manager.get_status(job_id) not in ['complete', 'cancelled']: if time_end and time.time() > time_end: raise Exception("Timeout.") - self.assertEqual(manager.stderr_contents(job_id), b'moo') - self.assertEqual(manager.stdout_contents(job_id), b'Hello World!') + self.assertEqual(manager.job_stderr_contents(job_id), b"") + self.assertEqual(manager.job_stdout_contents(job_id), b"") + self.assertEqual(manager.stderr_contents(job_id), b"moo") + self.assertEqual(manager.stdout_contents(job_id), b"Hello World!") self.assertEqual(manager.return_code(job_id), 0) manager.clean(job_id) self.assertEqual(len(listdir(self.staging_directory)), 0) @@ -225,13 +231,26 @@ def minimal_app_for_managers(): staging_directory = temp_directory_persist(prefix='minimal_app_') rmtree(staging_directory) authorizer = TestAuthorizer() + user_auth_manager = get_test_user_auth_manager() return Bunch(staging_directory=staging_directory, authorizer=authorizer, job_metrics=NullJobMetrics(), dependency_manager=TestDependencyManager(), + user_auth_manager=user_auth_manager, object_store=object()) +def get_test_user_auth_manager(): + config = {"user_auth": {"authentication": [{"type": "allow_all"}], "authorization": [{"type": "allow_all"}]}} + return UserAuthManager(config) + + +def get_failing_user_auth_manager(): + config = {"user_auth": {"authentication": [{"type": "allow_all"}], + "authorization": [{"type": "userlist", "userlist_allowed_users": []}]}} + return UserAuthManager(config) + + class NullJobMetrics: def __init__(self): @@ -282,11 +301,11 @@ def __init__(self, global_conf={}, app_conf={}, test_conf={}, web=True): @contextmanager def new_app(self): with test_pulsar_app( - self.global_conf, - self.app_conf, - self.test_conf, - staging_directory=self.staging_directory, - web=self.web, + self.global_conf, + self.app_conf, + self.test_conf, + staging_directory=self.staging_directory, + web=self.web, ) as app: yield app @@ -309,11 +328,11 @@ def restartable_pulsar_app_provider(**kwds): @nottest @contextmanager def test_pulsar_app( - global_conf={}, - app_conf={}, - test_conf={}, - staging_directory=None, - web=True, + global_conf={}, + app_conf={}, + test_conf={}, + staging_directory=None, + web=True, ): clean_staging_directory = False if staging_directory is None: @@ -420,7 +439,6 @@ def skip_without_drmaa(f): def _which(program): - def is_exe(fpath): return isfile(fpath) and access(fpath, X_OK) @@ -487,7 +505,6 @@ def dump_other_threads(): # Extracted from: https://github.com/python/cpython/blob/ # 937ee9e745d7ff3c2010b927903c0e2a83623324/Lib/test/support/__init__.py class EnvironmentVarGuard: - """Class to help protect the environment variable properly. Can be used as a context manager.""" diff --git a/test/user_authorization_test.py b/test/user_authorization_test.py new file mode 100644 index 00000000..3696a243 --- /dev/null +++ b/test/user_authorization_test.py @@ -0,0 +1,20 @@ +from unittest import TestCase + +from test.test_utils import get_test_user_auth_manager, get_failing_user_auth_manager + + +class UserAuthorizationTestCase(TestCase): + + def setUp(self): + self.authorizer = get_test_user_auth_manager() + self.failing_authorizer = get_failing_user_auth_manager() + + def test_passes(self): + self.authorizer.authorize("123", None) + + def test_fails(self): + with self.unauthorized_expectation(): + self.failing_authorizer.authorize("123", None) + + def unauthorized_expectation(self): + return self.assertRaises(Exception) diff --git a/test/wsgi_app_test.py b/test/wsgi_app_test.py index b7a8fa3a..21c3db18 100644 --- a/test/wsgi_app_test.py +++ b/test/wsgi_app_test.py @@ -59,8 +59,8 @@ def test_upload(upload_type): check_response = app.get("/jobs/%s/status" % job_id) check_config = json.loads(check_response.body.decode("utf-8")) assert check_config['returncode'] == 0 - assert check_config['stdout'] == "test_out" - assert check_config['stderr'] == "" + assert check_config['job_stdout'] == "test_out" + assert check_config['job_stderr'] == "" kill_response = app.put("/jobs/%s/cancel" % job_id) assert kill_response.body.decode("utf-8") == 'OK' diff --git a/tox.ini b/tox.ini index d0939b74..50f3d1ab 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist = py{36,37,38,39}-lint, py{36,37,38,39}-docs, py{36,37,38,39}-dist, py{36,37,38,39}-test-unit, py{36,37,38,39}-test, py{36,37,38,39}-install-wheel, py{36,37,38,39}-install-wheel-no-conda +envlist = lint, docs, dist, test-unit, test, install_wheel, install_wheel-no_conda toxworkdir={env:TOX_WORK_DIR:.tox} source_dir = pulsar test_dir = test @@ -11,18 +11,18 @@ commands = lint: flake8 --ignore W504 {[tox]source_dir} {[tox]test_dir} dist: make lint-dist docs: make lint-docs - install-wheel: make test-install-wheel - install-wheel-no-conda: make test-install-wheel-no-conda + install_wheel-!no_conda: make test-install-wheel + install_wheel-no_conda: make test-install-wheel-no-conda mypy: mypy {[tox]source_dir} {[tox]test_dir} deps = test,docs,mypy: -rrequirements.txt - test,test-install-wheel,test-install-wheel-no-conda,mypy: -rdev-requirements.txt + test,install_wheel,mypy: -rdev-requirements.txt test: drmaa lint: flake8 docs: sphinx==1.2 dist: twine - install-wheel,install-wheel-no-conda: virtualenv + install_wheel: virtualenv setenv = # tests ready to go after setup_tests.sh @@ -40,10 +40,10 @@ passenv = DRMAA_LIBRARY_PATH skip_install = - lint,dist,install-wheel,install-wheel-no-conda: True + lint,dist,install_wheel: True skipsdist = docs: True allowlist_externals = - docs,dist,install-wheel,install-wheel-no-conda: make + docs,dist,install_wheel: make