From 4a7b4fc0172f3207daa1b1945c123546d2dc2cde Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Thu, 6 Apr 2023 15:17:22 +0200 Subject: [PATCH 01/35] Use latest stable planemo version for tests --- install_test/common_functions.bash | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/install_test/common_functions.bash b/install_test/common_functions.bash index e590ca8a..97760eb8 100644 --- a/install_test/common_functions.bash +++ b/install_test/common_functions.bash @@ -5,7 +5,7 @@ shopt -s nullglob : ${PULSAR_TARGET_PORT:=8913} : ${PULSAR_INSTALL_TARGET:=pulsar-app} : ${PULSAR_TEST_DEBUG:=false} -: ${PLANEMO_INSTALL_TARGET:=planemo==0.75.3} +: ${PLANEMO_INSTALL_TARGET:=planemo} init_temp_dir() { case $(uname -s) in From 94c8d8fe0e1dabfc928286f3732b7c7f1868062d Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Thu, 6 Apr 2023 15:35:26 +0200 Subject: [PATCH 02/35] Pin mypy to 1.0.1, waiting for https://github.com/pydantic/pydantic/issues/5192 --- dev-requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev-requirements.txt b/dev-requirements.txt index 1215fd0a..01dd1d24 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -18,7 +18,7 @@ sphinx==1.2 pyflakes flake8 -mypy +mypy<=1.0.1 # https://github.com/pydantic/pydantic/issues/5192 types-paramiko types-pkg-resources types-PyYAML From 00fa8c67754547af881f03ff5f59b0c12b75cf10 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Thu, 6 Apr 2023 12:30:08 +0200 Subject: [PATCH 03/35] Implement job and tool stdio separation and always set up job i/o redirection on the pulsar side. Since pulsar stdio is broken prior to this on 23.0 this is maybe a good time to make the switch. Seems like this detail should be controlled on pulsar, not Galaxy. --- pulsar/client/test/check.py | 5 +++-- pulsar/manager_endpoint_util.py | 4 ++++ pulsar/managers/__init__.py | 24 ++++++++++++++++---- pulsar/managers/base/base_drmaa.py | 4 ++-- pulsar/managers/base/directory.py | 36 ++++++++++++++++++++++++------ pulsar/managers/queued_cli.py | 4 ++-- pulsar/managers/queued_condor.py | 4 ++-- pulsar/managers/unqueued.py | 8 ++----- test/test_utils.py | 10 ++++++--- test/wsgi_app_test.py | 4 ++-- 10 files changed, 73 insertions(+), 30 deletions(-) diff --git a/pulsar/client/test/check.py b/pulsar/client/test/check.py index 27425837..5ddc00f4 100644 --- a/pulsar/client/test/check.py +++ b/pulsar/client/test/check.py @@ -226,7 +226,7 @@ def run(options): test_unicode = getattr(options, "test_unicode", False) # TODO Switch this in integration tests legacy_galaxy_json = getattr(options, "legacy_galaxy_json", False) cmd_text = EXAMPLE_UNICODE_TEXT if test_unicode else "Hello World" - command_line_params = ( + command_line_arguments = ( temp_tool_path, temp_config_path, temp_input_path, @@ -246,7 +246,8 @@ def run(options): "1" if legacy_galaxy_json else "0", ) assert os.path.exists(temp_index_path) - command_line = 'python %s "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s"' % command_line_params + quoted_args = (f'"{a}"' for a in command_line_arguments) + command_line = f"python {' '.join(quoted_args)} 2> ../metadata/tool_stderr > ../metadata/tool_stdout" config_files = [temp_config_path] client_inputs = [] client_inputs.append(ClientInput(temp_input_path, CLIENT_INPUT_PATH_TYPES.INPUT_PATH)) diff --git a/pulsar/manager_endpoint_util.py b/pulsar/manager_endpoint_util.py index 257ffed8..cea80cb2 100644 --- a/pulsar/manager_endpoint_util.py +++ b/pulsar/manager_endpoint_util.py @@ -38,6 +38,8 @@ def __job_complete_dict(complete_status, manager, job_id): return_code = None stdout_contents = manager.stdout_contents(job_id).decode("utf-8") stderr_contents = manager.stderr_contents(job_id).decode("utf-8") + job_stdout_contents = manager.job_stdout_contents(job_id).decode("utf-8") + job_stderr_contents = manager.job_stderr_contents(job_id).decode("utf-8") job_directory = manager.job_directory(job_id) as_dict = dict( job_id=job_id, @@ -46,6 +48,8 @@ def __job_complete_dict(complete_status, manager, job_id): returncode=return_code, stdout=stdout_contents, stderr=stderr_contents, + job_stdout=job_stdout_contents, + job_stderr=job_stderr_contents, working_directory=job_directory.working_directory(), metadata_directory=job_directory.metadata_directory(), job_directory=job_directory.job_directory, diff --git a/pulsar/managers/__init__.py b/pulsar/managers/__init__.py index e7af36b2..317e38b9 100644 --- a/pulsar/managers/__init__.py +++ b/pulsar/managers/__init__.py @@ -53,15 +53,25 @@ def return_code(self, job_id): @abstractmethod def stdout_contents(self, job_id): """ - After completion, return contents of stdout associated with specified - job. + After completion, return contents of stdout of the tool script. """ @abstractmethod def stderr_contents(self, job_id): """ - After completion, return contents of stderr associated with specified - job. + After completion, return contents of stderr of the tool script. + """ + + @abstractmethod + def job_stdout_contents(self, job_id): + """ + After completion, return contents of stdout of the job as produced by the job runner. + """ + + @abstractmethod + def job_stderr_contents(self, job_id): + """ + After completion, return contents of stderr of the job as produced by the job runner. """ @abstractmethod @@ -107,6 +117,12 @@ def stdout_contents(self, *args, **kwargs): def stderr_contents(self, *args, **kwargs): return self._proxied_manager.stderr_contents(*args, **kwargs) + def job_stdout_contents(self, *args, **kwargs): + return self._proxied_manager.job_stdout_contents(*args, **kwargs) + + def job_stderr_contents(self, *args, **kwargs): + return self._proxied_manager.job_stderr_contents(*args, **kwargs) + def kill(self, *args, **kwargs): return self._proxied_manager.kill(*args, **kwargs) diff --git a/pulsar/managers/base/base_drmaa.py b/pulsar/managers/base/base_drmaa.py index e966a1de..d569c432 100644 --- a/pulsar/managers/base/base_drmaa.py +++ b/pulsar/managers/base/base_drmaa.py @@ -50,8 +50,8 @@ def _get_status_external(self, external_id): }[drmaa_state] def _build_template_attributes(self, job_id, command_line, dependencies_description=None, env=[], submit_params={}, setup_params=None): - stdout_path = self._stdout_path(job_id) - stderr_path = self._stderr_path(job_id) + stdout_path = self._job_stdout_path(job_id) + stderr_path = self._job_stderr_path(job_id) working_directory = self.job_directory(job_id).working_directory() attributes = { diff --git a/pulsar/managers/base/directory.py b/pulsar/managers/base/directory.py index 96a0055e..470e5285 100644 --- a/pulsar/managers/base/directory.py +++ b/pulsar/managers/base/directory.py @@ -15,8 +15,10 @@ # should be able to replace metadata backing with non-file stuff now that # the abstractions are fairly well utilized. JOB_FILE_RETURN_CODE = "return_code" -JOB_FILE_STANDARD_OUTPUT = "stdout" -JOB_FILE_STANDARD_ERROR = "stderr" +TOOL_FILE_STANDARD_OUTPUT = os.path.join("metadata", "tool_stdout") +TOOL_FILE_STANDARD_ERROR = os.path.join("metadata", "tool_stderr") +JOB_FILE_STANDARD_OUTPUT = os.path.join("metadata", "job_stdout") +JOB_FILE_STANDARD_ERROR = os.path.join("metadata", "job_stderr") JOB_FILE_TOOL_ID = "tool_id" JOB_FILE_TOOL_VERSION = "tool_version" JOB_FILE_CANCELLED = "cancelled" @@ -34,9 +36,23 @@ def return_code(self, job_id): return int(return_code_str) if return_code_str and return_code_str != PULSAR_UNKNOWN_RETURN_CODE else return_code_str def stdout_contents(self, job_id): - return self._read_job_file(job_id, JOB_FILE_STANDARD_OUTPUT, size=self.maximum_stream_size, default=b"") + try: + return self._read_job_file(job_id, TOOL_FILE_STANDARD_OUTPUT, size=self.maximum_stream_size) + except FileNotFoundError: + # Could be old job finishing up, drop in 2024? + return self._read_job_file(job_id, "tool_stdout", size=self.maximum_stream_size, default=b"") def stderr_contents(self, job_id): + try: + return self._read_job_file(job_id, TOOL_FILE_STANDARD_ERROR, size=self.maximum_stream_size) + except FileNotFoundError: + # Could be old job finishing up, drop in 2024? + return self._read_job_file(job_id, "tool_stderr", size=self.maximum_stream_size, default=b"") + + def job_stdout_contents(self, job_id): + return self._read_job_file(job_id, JOB_FILE_STANDARD_OUTPUT, size=self.maximum_stream_size, default=b"") + + def job_stderr_contents(self, job_id): return self._read_job_file(job_id, JOB_FILE_STANDARD_ERROR, size=self.maximum_stream_size, default=b"") def read_command_line(self, job_id): @@ -47,10 +63,16 @@ def read_command_line(self, job_id): command_line = json.loads(command_line) return command_line - def _stdout_path(self, job_id): + def _tool_stdout_path(self, job_id): + return self._job_file(job_id, TOOL_FILE_STANDARD_OUTPUT) + + def _tool_stderr_path(self, job_id): + return self._job_file(job_id, TOOL_FILE_STANDARD_ERROR) + + def _job_stdout_path(self, job_id): return self._job_file(job_id, JOB_FILE_STANDARD_OUTPUT) - def _stderr_path(self, job_id): + def _job_stderr_path(self, job_id): return self._job_file(job_id, JOB_FILE_STANDARD_ERROR) def _return_code_path(self, job_id): @@ -100,10 +122,10 @@ def _was_cancelled(self, job_id): log.info("Failed to determine if job with id %s was cancelled, assuming no." % job_id) return False - def _open_standard_output(self, job_id): + def _open_job_standard_output(self, job_id): return self._job_directory(job_id).open_file(JOB_FILE_STANDARD_OUTPUT, 'w') - def _open_standard_error(self, job_id): + def _open_job_standard_error(self, job_id): return self._job_directory(job_id).open_file(JOB_FILE_STANDARD_ERROR, 'w') def _check_execution_with_tool_file(self, job_id, command_line): diff --git a/pulsar/managers/queued_cli.py b/pulsar/managers/queued_cli.py index bc24dd2f..6411600e 100644 --- a/pulsar/managers/queued_cli.py +++ b/pulsar/managers/queued_cli.py @@ -27,8 +27,8 @@ def __init__(self, name, app, **kwds): def launch(self, job_id, command_line, submit_params={}, dependencies_description=None, env=[], setup_params=None): self._check_execution_with_tool_file(job_id, command_line) shell, job_interface = self.__get_cli_plugins() - stdout_path = self._stdout_path(job_id) - stderr_path = self._stderr_path(job_id) + stdout_path = self._job_stdout_path(job_id) + stderr_path = self._job_stderr_path(job_id) job_name = self._job_name(job_id) command_line = self._expand_command_line( job_id, command_line, dependencies_description, job_directory=self.job_directory(job_id).job_directory diff --git a/pulsar/managers/queued_condor.py b/pulsar/managers/queued_condor.py index 688aca94..62e3677c 100644 --- a/pulsar/managers/queued_condor.py +++ b/pulsar/managers/queued_condor.py @@ -46,8 +46,8 @@ def launch(self, job_id, command_line, submit_params={}, dependencies_descriptio submit_params.update(self.submission_params) build_submit_params = dict( executable=job_file_path, - output=self._stdout_path(job_id), - error=self._stderr_path(job_id), + output=self._job_stdout_path(job_id), + error=self._job_stderr_path(job_id), user_log=log_path, query_params=submit_params, ) diff --git a/pulsar/managers/unqueued.py b/pulsar/managers/unqueued.py index b807e2fa..4c316b99 100644 --- a/pulsar/managers/unqueued.py +++ b/pulsar/managers/unqueued.py @@ -188,8 +188,8 @@ def _run(self, job_id, command_line, montior: MonitorStyle = MonitorStyle.BACKGR def _proc_for_job_id(self, job_id, command_line): job_directory = self.job_directory(job_id) working_directory = job_directory.working_directory() - stdout = self._open_standard_output(job_id) - stderr = self._open_standard_error(job_id) + stdout = self._open_job_standard_output(job_id) + stderr = self._open_job_standard_error(job_id) proc = execute(command_line=command_line, working_directory=working_directory, stdout=stdout, @@ -240,10 +240,6 @@ def launch(self, job_id, command_line, submit_params={}, dependencies_descriptio command_line = self._prepare_run(job_id, command_line, dependencies_description=dependencies_description, env=env, setup_params=setup_params) job_directory = self.job_directory(job_id) working_directory = job_directory.working_directory() - command_line += " > '{}' 2> '{}'".format( - self._stdout_path(job_id), - self._stderr_path(job_id), - ) command_line = "cd '{}'; sh {}".format(working_directory, command_line) log.info("writing command line [%s] for co-execution" % command_line) self._write_command_line(job_id, command_line) diff --git a/test/test_utils.py b/test/test_utils.py index f8f88fb8..00c021a2 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -177,7 +177,9 @@ def tearDown(self): @nottest def _test_simple_execution(self, manager, timeout=None): - command = """python -c "import sys; sys.stdout.write(\'Hello World!\'); sys.stdout.flush(); sys.stderr.write(\'moo\'); sys.stderr.flush()" """ + command = """ +python -c "import sys; sys.stdout.write(\'Hello World!\'); sys.stdout.flush(); sys.stderr.write(\'moo\'); sys.stderr.flush()" \ +2> ../metadata/tool_stderr > ../metadata/tool_stdout""" job_id = manager.setup_job("123", "tool1", "1.0.0") manager.launch(job_id, command) @@ -185,8 +187,10 @@ def _test_simple_execution(self, manager, timeout=None): while manager.get_status(job_id) not in ['complete', 'cancelled']: if time_end and time.time() > time_end: raise Exception("Timeout.") - self.assertEqual(manager.stderr_contents(job_id), b'moo') - self.assertEqual(manager.stdout_contents(job_id), b'Hello World!') + self.assertEqual(manager.job_stderr_contents(job_id), b"") + self.assertEqual(manager.job_stdout_contents(job_id), b"") + self.assertEqual(manager.stderr_contents(job_id), b"moo") + self.assertEqual(manager.stdout_contents(job_id), b"Hello World!") self.assertEqual(manager.return_code(job_id), 0) manager.clean(job_id) self.assertEqual(len(listdir(self.staging_directory)), 0) diff --git a/test/wsgi_app_test.py b/test/wsgi_app_test.py index b7a8fa3a..21c3db18 100644 --- a/test/wsgi_app_test.py +++ b/test/wsgi_app_test.py @@ -59,8 +59,8 @@ def test_upload(upload_type): check_response = app.get("/jobs/%s/status" % job_id) check_config = json.loads(check_response.body.decode("utf-8")) assert check_config['returncode'] == 0 - assert check_config['stdout'] == "test_out" - assert check_config['stderr'] == "" + assert check_config['job_stdout'] == "test_out" + assert check_config['job_stderr'] == "" kill_response = app.put("/jobs/%s/cancel" % job_id) assert kill_response.body.decode("utf-8") == 'OK' From d0baaa7a7bb4f79aa4e1777ae62b8dc149a6f008 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Fri, 7 Apr 2023 08:13:03 +0200 Subject: [PATCH 04/35] Test against my fork --- .github/workflows/galaxy_framework.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/galaxy_framework.yaml b/.github/workflows/galaxy_framework.yaml index 775d48b2..06888b83 100644 --- a/.github/workflows/galaxy_framework.yaml +++ b/.github/workflows/galaxy_framework.yaml @@ -9,11 +9,11 @@ jobs: strategy: matrix: python-version: ['3.7'] - galaxy-branch: ['dev', 'master'] + galaxy-branch: ['pulsar_standing_branch', 'master'] metadata-strategy: ['directory'] include: - python-version: '3.7' - galaxy-branch: 'dev' + galaxy-branch: 'pulsar_standing_branch' metadata-strategy: 'extended' services: postgres: @@ -31,7 +31,7 @@ jobs: - name: Checkout tools repo uses: actions/checkout@v2 with: - repository: galaxyproject/galaxy + repository: mvdbeek/galaxy ref: ${{ matrix.galaxy-branch }} path: galaxy - uses: actions/setup-python@v2 From 84bc15c89215895e640f1b1e49cc951ff9012e88 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Fri, 7 Apr 2023 10:54:39 +0200 Subject: [PATCH 05/35] Import MEMORY_STATEMENT.sh from Galaxy This will write to metadata/memory_statement.log, instead of the working directory. Fixes a bunch of tests that use metadata collection. --- .../managers/util/job_script/MEMORY_STATEMENT.sh | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/pulsar/managers/util/job_script/MEMORY_STATEMENT.sh b/pulsar/managers/util/job_script/MEMORY_STATEMENT.sh index 09fc2f60..fda7266e 100644 --- a/pulsar/managers/util/job_script/MEMORY_STATEMENT.sh +++ b/pulsar/managers/util/job_script/MEMORY_STATEMENT.sh @@ -1,10 +1,8 @@ -if [ -z "$GALAXY_MEMORY_MB" ]; then - if [ -n "$SLURM_JOB_ID" ]; then - GALAXY_MEMORY_MB=`scontrol -do show job "$SLURM_JOB_ID" | sed 's/.*\( \|^\)Mem=\([0-9][0-9]*\)\( \|$\).*/\2/p;d'` 2>memory_statement.log - fi - if [ -n "$SGE_HGR_h_vmem" ]; then - GALAXY_MEMORY_MB=`echo "$SGE_HGR_h_vmem" | sed 's/G$/ * 1024/' | bc | cut -d"." -f1` 2>memory_statement.log - fi +if [ -n "$SLURM_JOB_ID" ]; then + GALAXY_MEMORY_MB=`scontrol -do show job "$SLURM_JOB_ID" | sed 's/.*\( \|^\)Mem=\([0-9][0-9]*\)\( \|$\).*/\2/p;d'` 2>$metadata_directory/memory_statement.log +fi +if [ -n "$SGE_HGR_h_vmem" ]; then + GALAXY_MEMORY_MB_PER_SLOT=`echo "$SGE_HGR_h_vmem" | sed 's/G$/ * 1024/' | bc | cut -d"." -f1` 2>$metadata_directory/memory_statement.log fi if [ -z "$GALAXY_MEMORY_MB_PER_SLOT" -a -n "$GALAXY_MEMORY_MB" ]; then @@ -12,5 +10,5 @@ if [ -z "$GALAXY_MEMORY_MB_PER_SLOT" -a -n "$GALAXY_MEMORY_MB" ]; then elif [ -z "$GALAXY_MEMORY_MB" -a -n "$GALAXY_MEMORY_MB_PER_SLOT" ]; then GALAXY_MEMORY_MB=$(($GALAXY_MEMORY_MB_PER_SLOT * $GALAXY_SLOTS)) fi -[ "${GALAXY_MEMORY_MB--1}" -gt 0 ] 2>>memory_statement.log && export GALAXY_MEMORY_MB || unset GALAXY_MEMORY_MB -[ "${GALAXY_MEMORY_MB_PER_SLOT--1}" -gt 0 ] 2>>memory_statement.log && export GALAXY_MEMORY_MB_PER_SLOT || unset GALAXY_MEMORY_MB_PER_SLOT +[ "${GALAXY_MEMORY_MB--1}" -gt 0 ] 2>>$metadata_directory/memory_statement.log && export GALAXY_MEMORY_MB || unset GALAXY_MEMORY_MB +[ "${GALAXY_MEMORY_MB_PER_SLOT--1}" -gt 0 ] 2>>$metadata_directory/memory_statement.log && export GALAXY_MEMORY_MB_PER_SLOT || unset GALAXY_MEMORY_MB_PER_SLOT From 463f8bf76139400ead5262e8142f57932a3a6eab Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Fri, 7 Apr 2023 11:36:47 +0200 Subject: [PATCH 06/35] More memory statement fixes from upsytream --- pulsar/managers/util/job_script/__init__.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/pulsar/managers/util/job_script/__init__.py b/pulsar/managers/util/job_script/__init__.py index d3512d9f..5209bcd4 100644 --- a/pulsar/managers/util/job_script/__init__.py +++ b/pulsar/managers/util/job_script/__init__.py @@ -23,7 +23,7 @@ SLOTS_STATEMENT_CLUSTER_DEFAULT = resource_string(__name__, "CLUSTER_SLOTS_STATEMENT.sh") -MEMORY_STATEMENT_DEFAULT = resource_string(__name__, "MEMORY_STATEMENT.sh") +MEMORY_STATEMENT_DEFAULT_TEMPLATE = Template(resource_string(__name__, "MEMORY_STATEMENT.sh")) SLOTS_STATEMENT_SINGLE = """ GALAXY_SLOTS="1" @@ -42,14 +42,13 @@ DEFAULT_INTEGRITY_CHECK = True DEFAULT_INTEGRITY_COUNT = 35 DEFAULT_INTEGRITY_SLEEP = 0.25 -REQUIRED_TEMPLATE_PARAMS = ["working_directory", "command"] +REQUIRED_TEMPLATE_PARAMS = ["metadata_directory", "working_directory", "command"] OPTIONAL_TEMPLATE_PARAMS: Dict[str, Any] = { "galaxy_lib": None, "galaxy_virtual_env": None, "headers": "", "env_setup_commands": [], "slots_statement": SLOTS_STATEMENT_CLUSTER_DEFAULT, - "memory_statement": MEMORY_STATEMENT_DEFAULT, "instrument_pre_commands": "", "instrument_post_commands": "", "integrity_injection": INTEGRITY_INJECTION, @@ -86,12 +85,17 @@ def job_script(template=DEFAULT_JOB_FILE_TEMPLATE, **kwds): """ if any(param not in kwds for param in REQUIRED_TEMPLATE_PARAMS): raise Exception("Failed to create job_script, a required parameter is missing.") + metadata_directory = kwds.get("metadata_directory", kwds["working_directory"]) job_instrumenter = kwds.get("job_instrumenter", None) if job_instrumenter: del kwds["job_instrumenter"] working_directory = kwds.get("metadata_directory", kwds["working_directory"]) kwds["instrument_pre_commands"] = job_instrumenter.pre_execute_commands(working_directory) or "" kwds["instrument_post_commands"] = job_instrumenter.post_execute_commands(working_directory) or "" + if "memory_statement" not in kwds: + kwds["memory_statement"] = MEMORY_STATEMENT_DEFAULT_TEMPLATE.safe_substitute( + metadata_directory=metadata_directory + ) # Setup home directory var kwds["home_directory"] = kwds.get("home_directory", os.path.join(kwds["working_directory"], "home")) From 108946a3a5c5aa798f81e69ff099c46a9d07684e Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Fri, 7 Apr 2023 23:34:04 +0200 Subject: [PATCH 07/35] Collect extra_files from galaxy.json --- pulsar/client/staging/down.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/pulsar/client/staging/down.py b/pulsar/client/staging/down.py index c14a9bec..81be9596 100644 --- a/pulsar/client/staging/down.py +++ b/pulsar/client/staging/down.py @@ -136,6 +136,7 @@ def __collect_job_directory_files(self): def __realized_dynamic_file_source_references(self): references = [] + extra_files = [] def record_references(from_dict): if isinstance(from_dict, list): @@ -145,6 +146,8 @@ def record_references(from_dict): for k, v in from_dict.items(): if k == "filename": references.append(v) + elif k == "extra_files": + extra_files.append(v) if isinstance(v, (list, dict)): record_references(v) @@ -167,13 +170,13 @@ def parse_and_record_references(json_content): for line in contents.splitlines(): parse_and_record_references(line) - return references + return references, extra_files def __collect_directory_files(self, directory, contents, output_type): if directory is None: # e.g. output_metadata_directory return - dynamic_file_source_references = self.__realized_dynamic_file_source_references() + dynamic_file_source_references, extra_files_references = self.__realized_dynamic_file_source_references() # Fetch remaining working directory outputs of interest. for name in contents: @@ -182,7 +185,7 @@ def __collect_directory_files(self, directory, contents, output_type): continue if self.client_outputs.dynamic_match(name): collect = True - elif name in dynamic_file_source_references: + elif name in dynamic_file_source_references or any(name.startswith(r) for r in extra_files_references): collect = True if collect: From 8c0936f5e36da3432ca03d35d9a39d4c3486075b Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Sat, 8 Apr 2023 09:48:38 +0200 Subject: [PATCH 08/35] Drop continue-on-error for tests that should pass now --- .github/workflows/galaxy_framework.yaml | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/.github/workflows/galaxy_framework.yaml b/.github/workflows/galaxy_framework.yaml index 06888b83..68508ea0 100644 --- a/.github/workflows/galaxy_framework.yaml +++ b/.github/workflows/galaxy_framework.yaml @@ -45,13 +45,21 @@ jobs: with: path: ~/.cache/pip key: pip-cache-${{ matrix.python-version }}-${{ hashFiles('galaxy/requirements.txt') }} - - name: Run tests + - name: Run tests (and allow failure) run: ./run_tests.sh --framework + if: matrix.galaxy-branch == 'master' || matrix.metadata-strategy == 'extended' working-directory: 'galaxy' env: GALAXY_TEST_JOB_CONFIG_FILE: ../pulsar/test_data/test_job_conf.yaml GALAXY_CONFIG_OVERRIDE_METADATA_STRATEGY: ${{ matrix.metadata-strategy }} continue-on-error: true + - name: Run tests (must pass) + if: matrix.metadata-strategy == 'directory' && matrix.galaxy-branch == 'pulsar_standing_branch' + run: ./run_tests.sh --framework + working-directory: 'galaxy' + env: + GALAXY_TEST_JOB_CONFIG_FILE: ../pulsar/test_data/test_job_conf.yaml + GALAXY_CONFIG_OVERRIDE_METADATA_STRATEGY: ${{ matrix.metadata-strategy }} - uses: actions/upload-artifact@v2 with: name: Framework test results (${{ matrix.python-version }}) From a0ee45297169d2ac2ef795bce74ecfd4d9ee41b9 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Sat, 8 Apr 2023 09:55:32 +0200 Subject: [PATCH 09/35] Decrease complexity --- pulsar/client/staging/down.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/pulsar/client/staging/down.py b/pulsar/client/staging/down.py index 81be9596..ea8f8aa1 100644 --- a/pulsar/client/staging/down.py +++ b/pulsar/client/staging/down.py @@ -135,8 +135,7 @@ def __collect_job_directory_files(self): ) def __realized_dynamic_file_source_references(self): - references = [] - extra_files = [] + references = {"filename": [], "extra_files": []} def record_references(from_dict): if isinstance(from_dict, list): @@ -144,10 +143,8 @@ def record_references(from_dict): record_references(v) elif isinstance(from_dict, dict): for k, v in from_dict.items(): - if k == "filename": - references.append(v) - elif k == "extra_files": - extra_files.append(v) + if k in references: + references[k].append(v) if isinstance(v, (list, dict)): record_references(v) @@ -170,13 +167,13 @@ def parse_and_record_references(json_content): for line in contents.splitlines(): parse_and_record_references(line) - return references, extra_files + return references def __collect_directory_files(self, directory, contents, output_type): if directory is None: # e.g. output_metadata_directory return - dynamic_file_source_references, extra_files_references = self.__realized_dynamic_file_source_references() + dynamic_file_source_references = self.__realized_dynamic_file_source_references() # Fetch remaining working directory outputs of interest. for name in contents: @@ -185,7 +182,7 @@ def __collect_directory_files(self, directory, contents, output_type): continue if self.client_outputs.dynamic_match(name): collect = True - elif name in dynamic_file_source_references or any(name.startswith(r) for r in extra_files_references): + elif name in dynamic_file_source_references["filename"] or any(name.startswith(r) for r in dynamic_file_source_references["extra_files"]): collect = True if collect: From 194e52547f3272fd204c93a3933b20ce945ef304 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Mon, 10 Apr 2023 11:51:42 +0200 Subject: [PATCH 10/35] Require 23.0 tests to actuall pass --- .github/workflows/galaxy_framework.yaml | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/.github/workflows/galaxy_framework.yaml b/.github/workflows/galaxy_framework.yaml index 68508ea0..50c9e0b8 100644 --- a/.github/workflows/galaxy_framework.yaml +++ b/.github/workflows/galaxy_framework.yaml @@ -45,21 +45,13 @@ jobs: with: path: ~/.cache/pip key: pip-cache-${{ matrix.python-version }}-${{ hashFiles('galaxy/requirements.txt') }} - - name: Run tests (and allow failure) - run: ./run_tests.sh --framework - if: matrix.galaxy-branch == 'master' || matrix.metadata-strategy == 'extended' - working-directory: 'galaxy' - env: - GALAXY_TEST_JOB_CONFIG_FILE: ../pulsar/test_data/test_job_conf.yaml - GALAXY_CONFIG_OVERRIDE_METADATA_STRATEGY: ${{ matrix.metadata-strategy }} - continue-on-error: true - - name: Run tests (must pass) - if: matrix.metadata-strategy == 'directory' && matrix.galaxy-branch == 'pulsar_standing_branch' + - name: Run tests run: ./run_tests.sh --framework working-directory: 'galaxy' env: GALAXY_TEST_JOB_CONFIG_FILE: ../pulsar/test_data/test_job_conf.yaml GALAXY_CONFIG_OVERRIDE_METADATA_STRATEGY: ${{ matrix.metadata-strategy }} + continue-on-error: ${{ matrix.galaxy-branch == 'master' }} - uses: actions/upload-artifact@v2 with: name: Framework test results (${{ matrix.python-version }}) From a596a85fb246b093eb149162da16e7acd7439968 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Mon, 10 Apr 2023 14:22:47 +0200 Subject: [PATCH 11/35] Bump up minimum galaxy-util version Should fix https://github.com/galaxyproject/pulsar/issues/311. --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index de5a11d4..7edae48a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,7 +5,7 @@ pyyaml galaxy-job-metrics galaxy-objectstore galaxy-tool-util -galaxy-util +galaxy-util>=22.1.2 paramiko typing-extensions pydantic-tes>=0.1.5 From 1bab11254ad7aed3b5334200910774f9c558daf1 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Mon, 10 Apr 2023 16:50:17 +0200 Subject: [PATCH 12/35] Move galaxy framework tests back to upstream dev branch --- .github/workflows/galaxy_framework.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/galaxy_framework.yaml b/.github/workflows/galaxy_framework.yaml index 50c9e0b8..998a7a0c 100644 --- a/.github/workflows/galaxy_framework.yaml +++ b/.github/workflows/galaxy_framework.yaml @@ -9,11 +9,11 @@ jobs: strategy: matrix: python-version: ['3.7'] - galaxy-branch: ['pulsar_standing_branch', 'master'] + galaxy-branch: ['dev', 'master'] metadata-strategy: ['directory'] include: - python-version: '3.7' - galaxy-branch: 'pulsar_standing_branch' + galaxy-branch: 'dev' metadata-strategy: 'extended' services: postgres: @@ -31,7 +31,7 @@ jobs: - name: Checkout tools repo uses: actions/checkout@v2 with: - repository: mvdbeek/galaxy + repository: galaxyproject/galaxy ref: ${{ matrix.galaxy-branch }} path: galaxy - uses: actions/setup-python@v2 From 1b088efa53c15ef8331d4ce8c157be6abc61244f Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Thu, 6 Apr 2023 16:54:42 +0200 Subject: [PATCH 13/35] Add support for logging to sentry --- app.yml.sample | 11 +++++++++++ dev-requirements.txt | 1 + pulsar/core.py | 28 +++++++++++++++++++++++++++- 3 files changed, 39 insertions(+), 1 deletion(-) diff --git a/app.yml.sample b/app.yml.sample index 85d68ad6..69113ff8 100644 --- a/app.yml.sample +++ b/app.yml.sample @@ -126,3 +126,14 @@ ## mechanism for expiring cache so it will grow unbounded without ## external clean up. #file_cache_dir: cache + +## Log to Sentry Sentry is an open source logging and error aggregation +## platform. Setting sentry_dsn will enable the Sentry middleware and +## errors will be sent to the indicated sentry instance. This +## connection string is available in your sentry instance under +## -> Settings -> API Keys. +##sentry_dsn: null + +## Determines the minimum log level that will be sent as an event to +## Sentry. Possible values are DEBUG, INFO, WARNING, ERROR or CRITICAL. +##sentry_event_level: WARNING diff --git a/dev-requirements.txt b/dev-requirements.txt index 01dd1d24..f163c94d 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -25,6 +25,7 @@ types-PyYAML types-pycurl types-requests types-psutil +sentry-sdk # For release wheel diff --git a/pulsar/core.py b/pulsar/core.py index 4f124067..cccadea3 100644 --- a/pulsar/core.py +++ b/pulsar/core.py @@ -1,5 +1,6 @@ """ """ +import logging import os from logging import getLogger from tempfile import tempdir @@ -9,7 +10,10 @@ from galaxy.tool_util.deps import build_dependency_manager from galaxy.util.bunch import Bunch -from pulsar import messaging +from pulsar import ( + __version__ as pulsar_version, + messaging, +) from pulsar.cache import Cache from pulsar.manager_factory import build_managers from pulsar.tools import ToolBox @@ -34,6 +38,7 @@ def __init__(self, **conf): if conf is None: conf = {} self.config_dir = conf.get('config_dir', os.getcwd()) + self.__setup_sentry_integration(conf) self.__setup_staging_directory(conf.get("staging_directory", DEFAULT_STAGING_DIRECTORY)) self.__setup_private_token(conf.get("private_token", DEFAULT_PRIVATE_TOKEN)) self.__setup_persistence_directory(conf.get("persistence_directory", None)) @@ -84,6 +89,27 @@ def __setup_tool_config(self, conf): self.toolbox = toolbox self.authorizer = get_authorizer(toolbox) + def __setup_sentry_integration(self, conf): + sentry_dsn = conf.get("sentry_dsn") + if sentry_dsn: + try: + import sentry_sdk + from sentry_sdk.integrations.logging import LoggingIntegration + except ImportError: + log.error("sentry_dsn configured, but sentry-sdk not installed") + sentry_sdk = None + LoggingIntegration = None + if sentry_sdk: + sentry_logging = LoggingIntegration( + level=logging.INFO, # Capture info and above as breadcrumbs + event_level=getattr(logging, conf.get("sentry_event_level", "WARNING")), # Send warnings as events + ) + sentry_sdk.init( + sentry_dsn, + release=pulsar_version, + integrations=[sentry_logging], + ) + def __setup_staging_directory(self, staging_directory): self.staging_directory = os.path.abspath(staging_directory) From fea6969c53810bdf3465257ed3c354212e547241 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Wed, 12 Apr 2023 13:59:47 +0200 Subject: [PATCH 14/35] Create .dev3 version --- HISTORY.rst | 6 +++++- pulsar/__init__.py | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/HISTORY.rst b/HISTORY.rst index 07ac1575..b559ee58 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -6,7 +6,7 @@ History .. to_doc --------------------- -0.15.0.dev1 +0.15.0.dev3 --------------------- * Updated Galaxy+Pulsar container. @@ -19,6 +19,10 @@ History * Add option ``amqp_key_prefix`` to direct task queue naming while retaining simple default manager names and such in container scheduling deployments. * Various typing and CI fixes. +* Fixes for extra_file handling +* Separate tool_stdio and job_stdio handling +* Re-import MEMORY_STATEMENT.sh from Galaxy +* Add support for logging to sentry --------------------- 0.14.16 (2022-10-04) diff --git a/pulsar/__init__.py b/pulsar/__init__.py index f22f5241..77c55244 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -1,4 +1,4 @@ -__version__ = '0.15.0.dev2' +__version__ = '0.15.0.dev3' PROJECT_NAME = "pulsar" PROJECT_OWNER = PROJECT_USERAME = "galaxyproject" From 64c00749b9d03604e82c4a3acb5f573c14f899a8 Mon Sep 17 00:00:00 2001 From: Nicola Soranzo Date: Fri, 9 Dec 2022 17:18:19 +0000 Subject: [PATCH 15/35] Fix tox environment names - Don't use hyphens in the middle of an environment name, they are used to split them in factors. - Simplify --- .github/workflows/pulsar.yaml | 6 +++--- tox.ini | 14 +++++++------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/.github/workflows/pulsar.yaml b/.github/workflows/pulsar.yaml index 49eff7f8..3dc95e43 100644 --- a/.github/workflows/pulsar.yaml +++ b/.github/workflows/pulsar.yaml @@ -61,9 +61,9 @@ jobs: python: 3.9 - tox-env: py39-test-unit python: 3.9 - - tox-env: py39-install-wheel-no-conda + - tox-env: py39-install_wheel-no_conda python: 3.9 - - tox-env: py37-install-wheel + - tox-env: py37-install_wheel python: 3.7 services: job-files: @@ -94,7 +94,7 @@ jobs: ## connect to the host running the Pulsar server for file transfers in stage out/in. # tes-test: # name: Run Tests - # runs-on: ubuntu-20.04 + # runs-on: ubuntu-20.04 # strategy: # matrix: # include: diff --git a/tox.ini b/tox.ini index d0939b74..2d5e6c02 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist = py{36,37,38,39}-lint, py{36,37,38,39}-docs, py{36,37,38,39}-dist, py{36,37,38,39}-test-unit, py{36,37,38,39}-test, py{36,37,38,39}-install-wheel, py{36,37,38,39}-install-wheel-no-conda +envlist = py{36,37,38,39}-lint, py{36,37,38,39}-docs, py{36,37,38,39}-dist, py{36,37,38,39}-test-unit, py{36,37,38,39}-test, py{36,37,38,39}-install_wheel, py{36,37,38,39}-install_wheel-no_conda toxworkdir={env:TOX_WORK_DIR:.tox} source_dir = pulsar test_dir = test @@ -11,18 +11,18 @@ commands = lint: flake8 --ignore W504 {[tox]source_dir} {[tox]test_dir} dist: make lint-dist docs: make lint-docs - install-wheel: make test-install-wheel - install-wheel-no-conda: make test-install-wheel-no-conda + install_wheel-!no_conda: make test-install-wheel + install_wheel-no_conda: make test-install-wheel-no-conda mypy: mypy {[tox]source_dir} {[tox]test_dir} deps = test,docs,mypy: -rrequirements.txt - test,test-install-wheel,test-install-wheel-no-conda,mypy: -rdev-requirements.txt + test,install_wheel,mypy: -rdev-requirements.txt test: drmaa lint: flake8 docs: sphinx==1.2 dist: twine - install-wheel,install-wheel-no-conda: virtualenv + install_wheel: virtualenv setenv = # tests ready to go after setup_tests.sh @@ -40,10 +40,10 @@ passenv = DRMAA_LIBRARY_PATH skip_install = - lint,dist,install-wheel,install-wheel-no-conda: True + lint,dist,install_wheel: True skipsdist = docs: True allowlist_externals = - docs,dist,install-wheel,install-wheel-no-conda: make + docs,dist,install_wheel: make From 743ea925831e2ab60303caad651ac5d28d45e3a8 Mon Sep 17 00:00:00 2001 From: Nicola Soranzo Date: Mon, 12 Dec 2022 23:22:23 +0000 Subject: [PATCH 16/35] Mypy fixes --- mypy.ini | 4 ++++ pulsar/client/manager.py | 7 ++----- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/mypy.ini b/mypy.ini index b10a4e64..e8f9bc4a 100644 --- a/mypy.ini +++ b/mypy.ini @@ -1,3 +1,7 @@ +[mypy] +show_error_codes = True +pretty = True + [mypy-galaxy.*] ignore_missing_imports = True diff --git a/pulsar/client/manager.py b/pulsar/client/manager.py index 8f2ab74f..9c429734 100644 --- a/pulsar/client/manager.py +++ b/pulsar/client/manager.py @@ -52,6 +52,7 @@ def get_client(self, destination_params: Dict[str, Any], job_id: str, **kwargs: def shutdown(self, ensure_cleanup=False) -> None: """Mark client manager's work as complete and clean up resources it managed.""" + return class ClientManager(ClientManagerInterface): @@ -104,10 +105,6 @@ def get_client(self, destination_params, job_id, **kwargs): job_manager_interface = job_manager_interface_class(**job_manager_interface_args) return self.client_class(destination_params, job_id, job_manager_interface, **self.extra_client_kwds) - def shutdown(self, ensure_cleanup=False): - """Mark client manager's work as complete and clean up resources it managed.""" - pass - try: from galaxy.jobs.runners.util.cli import factory as cli_factory @@ -336,7 +333,7 @@ def __perform_transfer(self, transfer_info): def __init_transfer_threads(self, num_transfer_threads): self.num_transfer_threads = num_transfer_threads self.transfer_queue = Queue() - for i in range(num_transfer_threads): + for _ in range(num_transfer_threads): t = threading.Thread(target=self._transfer_worker) t.daemon = True t.start() From bd62da1cca676025a832ab30b75fb67e193cff64 Mon Sep 17 00:00:00 2001 From: Nicola Soranzo Date: Mon, 12 Dec 2022 23:26:35 +0000 Subject: [PATCH 17/35] Add support for Python 3.11 Test on Python 3.6 and 3.11 without all intermediate versions. Also: - Remove install of `python-pycurl` in GitHub workflow, `pycurl` is installed later via tox without having to keep track of Ubuntu package name changes. --- .github/workflows/pulsar.yaml | 24 +++++++----------------- pulsar/web/framework.py | 4 ++-- setup.py | 7 +------ tox.ini | 2 +- 4 files changed, 11 insertions(+), 26 deletions(-) diff --git a/.github/workflows/pulsar.yaml b/.github/workflows/pulsar.yaml index 3dc95e43..92e85d32 100644 --- a/.github/workflows/pulsar.yaml +++ b/.github/workflows/pulsar.yaml @@ -24,10 +24,8 @@ jobs: include: - tox-env: py36-mypy python: 3.6 - - tox-env: py37-mypy - python: 3.7 - - tox-env: py38-mypy - python: 3.8 + - tox-env: py311-mypy + python: 3.11 steps: - uses: actions/checkout@v2 - uses: actions/setup-python@v3 @@ -36,7 +34,7 @@ jobs: - name: Install tox run: pip install tox - name: Setup pycurl - run: sudo apt update; sudo apt install -y libxml2-dev libxslt1-dev libcurl4-openssl-dev python-pycurl openssh-server + run: sudo apt update; sudo apt install -y libxml2-dev libxslt1-dev libcurl4-openssl-dev openssh-server - name: Run tox run: tox -e ${{ matrix.tox-env }} test: @@ -49,18 +47,10 @@ jobs: python: 3.6 - tox-env: py36-test-unit python: 3.6 - - tox-env: py37-test-ci - python: 3.7 - - tox-env: py37-test-unit - python: 3.7 - - tox-env: py38-test-ci - python: 3.8 - - tox-env: py38-test-unit - python: 3.8 - - tox-env: py39-test-ci - python: 3.9 - - tox-env: py39-test-unit - python: 3.9 + - tox-env: py311-test-ci + python: 3.11 + - tox-env: py311-test-unit + python: 3.11 - tox-env: py39-install_wheel-no_conda python: 3.9 - tox-env: py37-install_wheel diff --git a/pulsar/web/framework.py b/pulsar/web/framework.py index 20fb7164..b2abce1e 100644 --- a/pulsar/web/framework.py +++ b/pulsar/web/framework.py @@ -69,7 +69,7 @@ def add_args(func_args, arg_values): if func_arg not in args and func_arg in arg_values: args[func_arg] = arg_values[func_arg] - func_args = inspect.getargspec(func).args + func_args = inspect.getfullargspec(func).args for arg_dict in arg_dicts: add_args(func_args, arg_dict) @@ -108,7 +108,7 @@ def __handle_access(self, req, environ, start_response): def __build_args(self, func, args, req, environ): args = build_func_args(func, args, req.GET, self._app_args(args, req)) - func_args = inspect.getargspec(func).args + func_args = inspect.getfullargspec(func).args for func_arg in func_args: if func_arg == "ip": diff --git a/setup.py b/setup.py index 6976216c..5dfbc38c 100644 --- a/setup.py +++ b/setup.py @@ -31,10 +31,6 @@ if PULSAR_GALAXY_LIB: requirements = [r for r in requirements if not r.startswith("galaxy-")] -test_requirements = [ - # TODO: put package test requirements here -] - _version_re = re.compile(r'__version__\s+=\s+(.*)') @@ -128,7 +124,6 @@ 'Programming Language :: Python :: 3.8', 'Programming Language :: Python :: 3.9', 'Programming Language :: Python :: 3.10', + 'Programming Language :: Python :: 3.11', ], - test_suite='test', - tests_require=test_requirements ) diff --git a/tox.ini b/tox.ini index 2d5e6c02..50f3d1ab 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist = py{36,37,38,39}-lint, py{36,37,38,39}-docs, py{36,37,38,39}-dist, py{36,37,38,39}-test-unit, py{36,37,38,39}-test, py{36,37,38,39}-install_wheel, py{36,37,38,39}-install_wheel-no_conda +envlist = lint, docs, dist, test-unit, test, install_wheel, install_wheel-no_conda toxworkdir={env:TOX_WORK_DIR:.tox} source_dir = pulsar test_dir = test From 63103a39cf0ed82771100f121d7be1089c1dd415 Mon Sep 17 00:00:00 2001 From: Nicola Soranzo Date: Tue, 13 Dec 2022 00:41:04 +0000 Subject: [PATCH 18/35] Restore minimum version requirements Lost in https://github.com/galaxyproject/pulsar/pull/315 . --- requirements.txt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/requirements.txt b/requirements.txt index 7edae48a..44b0c6d4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,9 +2,9 @@ webob psutil PasteDeploy pyyaml -galaxy-job-metrics -galaxy-objectstore -galaxy-tool-util +galaxy-job-metrics>=19.9.0 +galaxy-objectstore>=19.9.0 +galaxy-tool-util>=19.9.0 galaxy-util>=22.1.2 paramiko typing-extensions From eea97d195d4c432521b2a7e60ef62afac5320894 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Thu, 13 Apr 2023 17:12:06 +0200 Subject: [PATCH 19/35] Update docs --- HISTORY.rst | 29 +++++++++++++++--------- docs/pulsar.client.rst | 16 +++++++------- docs/pulsar.managers.util.cli.job.rst | 16 ++++++++++++++ docs/pulsar.managers.util.rst | 32 +++++++++++++++++++++++++++ docs/pulsar.scripts.rst | 8 +++++++ pulsar/__init__.py | 2 +- 6 files changed, 84 insertions(+), 19 deletions(-) diff --git a/HISTORY.rst b/HISTORY.rst index b559ee58..ee5b0cf2 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -6,23 +6,23 @@ History .. to_doc --------------------- -0.15.0.dev3 +0.15.0 (2023-04-13) --------------------- -* Updated Galaxy+Pulsar container. +* Updated Galaxy+Pulsar container. `Pull Request 306`_ * Rework container execution - generalize Kubernetes execution to allow it to work without a - message queue and to allow TES execution based on pydantic-tes (https://github.com/jmchilton/pydantic-tes). -* Add documentation and diagrams for container execution scenarios. + message queue and to allow TES execution based on pydantic-tes (https://github.com/jmchilton/pydantic-tes). `Pull Request 302`_ +* Add documentation and diagrams for container execution scenarios. `Pull Request 302`_ * Rework integration tests to use pytest more aggressively. * Fixes to CI to run more tests that weren't being executed because Tox was not sending environment variables through to pytest. * Add option ``amqp_key_prefix`` to direct task queue naming while retaining simple - default manager names and such in container scheduling deployments. -* Various typing and CI fixes. -* Fixes for extra_file handling -* Separate tool_stdio and job_stdio handling -* Re-import MEMORY_STATEMENT.sh from Galaxy -* Add support for logging to sentry + default manager names and such in container scheduling deployments. `Pull Request 315`_ +* Various typing and CI fixes. `Pull Request 312`_, `Pull Request 319`_ +* Fixes for extra_file handling. `Pull Request 318`_ +* Separate tool_stdio and job_stdio handling. `Pull Request 318`_ +* Re-import MEMORY_STATEMENT.sh from Galaxy. `Pull Request 297`_ +* Add support for logging to sentry. `Pull Request 322`_ --------------------- 0.14.16 (2022-10-04) @@ -455,6 +455,15 @@ History .. github_links + +.. _Pull Request 322: https://github.com/galaxyproject/pulsar/pull/322 +.. _Pull Request 318: https://github.com/galaxyproject/pulsar/pull/318 +.. _Pull Request 319: https://github.com/galaxyproject/pulsar/pull/319 +.. _Pull Request 312: https://github.com/galaxyproject/pulsar/pull/312 +.. _Pull Request 315: https://github.com/galaxyproject/pulsar/pull/315 +.. _Pull Request 306: https://github.com/galaxyproject/pulsar/pull/306 +.. _Pull Request 297: https://github.com/galaxyproject/pulsar/pull/297 +.. _Pull Request 302: https://github.com/galaxyproject/pulsar/pull/302 .. _Pull Request 303: https://github.com/galaxyproject/pulsar/pull/303 .. _Pull Request 301: https://github.com/galaxyproject/pulsar/pull/301 .. _Pull Request 299: https://github.com/galaxyproject/pulsar/pull/299 diff --git a/docs/pulsar.client.rst b/docs/pulsar.client.rst index dd006a59..e99a7171 100644 --- a/docs/pulsar.client.rst +++ b/docs/pulsar.client.rst @@ -77,14 +77,6 @@ pulsar.client.exceptions module :undoc-members: :show-inheritance: -pulsar.client.interface module ------------------------------- - -.. automodule:: pulsar.client.interface - :members: - :undoc-members: - :show-inheritance: - pulsar.client.job\_directory module ----------------------------------- @@ -117,6 +109,14 @@ pulsar.client.path\_mapper module :undoc-members: :show-inheritance: +pulsar.client.server\_interface module +-------------------------------------- + +.. automodule:: pulsar.client.server_interface + :members: + :undoc-members: + :show-inheritance: + pulsar.client.setup\_handler module ----------------------------------- diff --git a/docs/pulsar.managers.util.cli.job.rst b/docs/pulsar.managers.util.cli.job.rst index 45383ff5..6db7c001 100644 --- a/docs/pulsar.managers.util.cli.job.rst +++ b/docs/pulsar.managers.util.cli.job.rst @@ -4,6 +4,22 @@ pulsar.managers.util.cli.job package Submodules ---------- +pulsar.managers.util.cli.job.lsf module +--------------------------------------- + +.. automodule:: pulsar.managers.util.cli.job.lsf + :members: + :undoc-members: + :show-inheritance: + +pulsar.managers.util.cli.job.pbs module +--------------------------------------- + +.. automodule:: pulsar.managers.util.cli.job.pbs + :members: + :undoc-members: + :show-inheritance: + pulsar.managers.util.cli.job.slurm module ----------------------------------------- diff --git a/docs/pulsar.managers.util.rst b/docs/pulsar.managers.util.rst index d970ed3d..310cde4d 100644 --- a/docs/pulsar.managers.util.rst +++ b/docs/pulsar.managers.util.rst @@ -14,6 +14,14 @@ Subpackages Submodules ---------- +pulsar.managers.util.aws\_batch module +-------------------------------------- + +.. automodule:: pulsar.managers.util.aws_batch + :members: + :undoc-members: + :show-inheritance: + pulsar.managers.util.env module ------------------------------- @@ -38,6 +46,22 @@ pulsar.managers.util.kill module :undoc-members: :show-inheritance: +pulsar.managers.util.process\_groups module +------------------------------------------- + +.. automodule:: pulsar.managers.util.process_groups + :members: + :undoc-members: + :show-inheritance: + +pulsar.managers.util.pykube\_util module +---------------------------------------- + +.. automodule:: pulsar.managers.util.pykube_util + :members: + :undoc-members: + :show-inheritance: + pulsar.managers.util.retry module --------------------------------- @@ -54,6 +78,14 @@ pulsar.managers.util.sudo module :undoc-members: :show-inheritance: +pulsar.managers.util.tes module +------------------------------- + +.. automodule:: pulsar.managers.util.tes + :members: + :undoc-members: + :show-inheritance: + Module contents --------------- diff --git a/docs/pulsar.scripts.rst b/docs/pulsar.scripts.rst index 06d50230..08740e57 100644 --- a/docs/pulsar.scripts.rst +++ b/docs/pulsar.scripts.rst @@ -44,6 +44,14 @@ pulsar.scripts.drmaa\_launch module :undoc-members: :show-inheritance: +pulsar.scripts.finish module +---------------------------- + +.. automodule:: pulsar.scripts.finish + :members: + :undoc-members: + :show-inheritance: + pulsar.scripts.mesos\_executor module ------------------------------------- diff --git a/pulsar/__init__.py b/pulsar/__init__.py index 77c55244..fcbcc060 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -1,4 +1,4 @@ -__version__ = '0.15.0.dev3' +__version__ = '0.15.0' PROJECT_NAME = "pulsar" PROJECT_OWNER = PROJECT_USERAME = "galaxyproject" From 56fd06ca08abaa334baafa49781944bf00d688a4 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Thu, 13 Apr 2023 17:21:41 +0200 Subject: [PATCH 20/35] Bump version again --- HISTORY.rst | 5 +++++ pulsar/__init__.py | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/HISTORY.rst b/HISTORY.rst index ee5b0cf2..2969bde8 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -5,6 +5,11 @@ History .. to_doc +--------------------- +0.15.1 (2023-04-13) +--------------------- +* No changes, working around pypi isssue. + --------------------- 0.15.0 (2023-04-13) --------------------- diff --git a/pulsar/__init__.py b/pulsar/__init__.py index fcbcc060..2cc28b2c 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -1,4 +1,4 @@ -__version__ = '0.15.0' +__version__ = '0.15.1' PROJECT_NAME = "pulsar" PROJECT_OWNER = PROJECT_USERAME = "galaxyproject" From 73caeee3355d044753b66ea58d1229cb4225deb6 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Thu, 20 Apr 2023 11:59:40 +0200 Subject: [PATCH 21/35] Allow recovery from restart --- pulsar/client/amqp_exchange.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/pulsar/client/amqp_exchange.py b/pulsar/client/amqp_exchange.py index 0de56ced..4b9399f7 100644 --- a/pulsar/client/amqp_exchange.py +++ b/pulsar/client/amqp_exchange.py @@ -14,10 +14,17 @@ except ImportError: kombu = None +try: + import amqp + import amqp.exceptions +except ImportError: + amqp = None + log = logging.getLogger(__name__) KOMBU_UNAVAILABLE = "Attempting to bind to AMQP message queue, but kombu dependency unavailable" +AMQP_UNAVAILABLE = "Attempting to bind to AMQP message queue, but pyampq dependency unavailable" DEFAULT_EXCHANGE_NAME = "pulsar" DEFAULT_EXCHANGE_TYPE = "direct" @@ -47,7 +54,7 @@ class PulsarExchange: Each Pulsar manager is defined solely by name in the scheme, so only one Pulsar should target each AMQP endpoint or care should be taken that unique - manager names are used across Pulsar servers targetting same AMQP endpoint - + manager names are used across Pulsar servers targeting the same AMQP endpoint - and in particular only one such Pulsar should define an default manager with name _default_. """ @@ -68,6 +75,8 @@ def __init__( """ if not kombu: raise Exception(KOMBU_UNAVAILABLE) + if not amqp: + raise Exception(AMQP_UNAVAILABLE) self.__url = url self.__manager_name = manager_name self.__amqp_key_prefix = amqp_key_prefix @@ -84,7 +93,7 @@ def __init__( self.consume_uuid_store = consume_uuid_store self.publish_ack_lock = threading.Lock() # Ack manager should sleep before checking for - # repbulishes, but if that changes, need to drain the + # republishes, but if that changes, need to drain the # queue once before the ack manager starts doing its # thing self.ack_manager_thread = self.__start_ack_manager() @@ -119,7 +128,7 @@ def consume(self, queue_name, callback, check=True, connection_kwargs={}): connection.drain_events(timeout=self.__timeout) except socket.timeout: pass - except OSError as exc: + except (OSError, amqp.exceptions.ConnectionForced, amqp.exceptions.RecoverableChannelError, amqp.exceptions.RecoverableConnectionError) as exc: self.__handle_io_error(exc, heartbeat_thread) except BaseException: log.exception("Problem consuming queue, consumer quitting in problematic fashion!") From 7864de58623d55a37f0d10c9fbad47039379a027 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Thu, 20 Apr 2023 12:39:40 +0200 Subject: [PATCH 22/35] Add kombu.exceptions.OperationError to list of retryable exceptions Should fix ``` Apr 20 10:35:34 gat-4.eu.galaxy.training galaxyctl[1034676]: pulsar.client.manager ERROR 2023-04-20 10:35:34,753 [pN:handler_0,p:1034676,tN:pulsar_client__default__kill_ack] Exception while handling kill acknowledgement messages, this shouldn't really happen. Handler should be restarted. Apr 20 10:35:34 gat-4.eu.galaxy.training galaxyctl[1034676]: Traceback (most recent call last): Apr 20 10:35:34 gat-4.eu.galaxy.training galaxyctl[1034676]: File "/srv/galaxy/venv/lib/python3.10/site-packages/kombu/connection.py", line 446, in _reraise_as_library_errors Apr 20 10:35:34 gat-4.eu.galaxy.training galaxyctl[1034676]: yield Apr 20 10:35:34 gat-4.eu.galaxy.training galaxyctl[1034676]: File "/srv/galaxy/venv/lib/python3.10/site-packages/kombu/connection.py", line 433, in _ensure_connection Apr 20 10:35:34 gat-4.eu.galaxy.training galaxyctl[1034676]: return retry_over_time( Apr 20 10:35:34 gat-4.eu.galaxy.training galaxyctl[1034676]: File "/srv/galaxy/venv/lib/python3.10/site-packages/kombu/utils/functional.py", line 312, in retry_over_time Apr 20 10:35:34 gat-4.eu.galaxy.training galaxyctl[1034676]: return fun(*args, **kwargs) Apr 20 10:35:34 gat-4.eu.galaxy.training galaxyctl[1034676]: File "/srv/galaxy/venv/lib/python3.10/site-packages/kombu/connection.py", line 877, in _connection_factory Apr 20 10:35:34 gat-4.eu.galaxy.training galaxyctl[1034676]: self._connection = self._establish_connection() Apr 20 10:35:34 gat-4.eu.galaxy.training galaxyctl[1034676]: File "/srv/galaxy/venv/lib/python3.10/site-packages/kombu/connection.py", line 812, in _establish_connection Apr 20 10:35:34 gat-4.eu.galaxy.training galaxyctl[1034676]: conn = self.transport.establish_connection() Apr 20 10:35:34 gat-4.eu.galaxy.training galaxyctl[1034676]: File "/srv/galaxy/venv/lib/python3.10/site-packages/kombu/transport/pyamqp.py", line 201, in establish_connection Apr 20 10:35:34 gat-4.eu.galaxy.training galaxyctl[1034676]: conn.connect() Apr 20 10:35:34 gat-4.eu.galaxy.training galaxyctl[1034676]: File "/srv/galaxy/venv/lib/python3.10/site-packages/amqp/connection.py", line 323, in connect Apr 20 10:35:34 gat-4.eu.galaxy.training galaxyctl[1034676]: self.transport.connect() Apr 20 10:35:34 gat-4.eu.galaxy.training galaxyctl[1034676]: File "/srv/galaxy/venv/lib/python3.10/site-packages/amqp/transport.py", line 129, in connect Apr 20 10:35:34 gat-4.eu.galaxy.training galaxyctl[1034676]: self._connect(self.host, self.port, self.connect_timeout) Apr 20 10:35:34 gat-4.eu.galaxy.training galaxyctl[1034676]: File "/srv/galaxy/venv/lib/python3.10/site-packages/amqp/transport.py", line 184, in _connect Apr 20 10:35:34 gat-4.eu.galaxy.training galaxyctl[1034676]: self.sock.connect(sa) Apr 20 10:35:34 gat-4.eu.galaxy.training galaxyctl[1034676]: ConnectionRefusedError: [Errno 111] Connection refused Apr 20 10:35:34 gat-4.eu.galaxy.training galaxyctl[1034676]: The above exception was the direct cause of the following exception: Apr 20 10:35:34 gat-4.eu.galaxy.training galaxyctl[1034676]: Traceback (most recent call last): Apr 20 10:35:34 gat-4.eu.galaxy.training galaxyctl[1034676]: File "/srv/galaxy/venv/lib/python3.10/site-packages/pulsar/client/manager.py", line 191, in ack_consumer Apr 20 10:35:34 gat-4.eu.galaxy.training galaxyctl[1034676]: self.exchange.consume(queue_name + '_ack', None, check=self) Apr 20 10:35:34 gat-4.eu.galaxy.training galaxyctl[1034676]: File "/srv/galaxy/venv/lib/python3.10/site-packages/pulsar/client/amqp_exchange.py", line 124, in consume Apr 20 10:35:34 gat-4.eu.galaxy.training galaxyctl[1034676]: with kombu.Consumer(connection, queues=[queue], callbacks=callbacks, accept=['json']): Apr 20 10:35:34 gat-4.eu.galaxy.training galaxyctl[1034676]: File "/srv/galaxy/venv/lib/python3.10/site-packages/kombu/messaging.py", line 387, in __init__ Apr 20 10:35:34 gat-4.eu.galaxy.training galaxyctl[1034676]: self.revive(self.channel) Apr 20 10:35:34 gat-4.eu.galaxy.training galaxyctl[1034676]: File "/srv/galaxy/venv/lib/python3.10/site-packages/kombu/messaging.py", line 400, in revive Apr 20 10:35:34 gat-4.eu.galaxy.training galaxyctl[1034676]: channel = self.channel = maybe_channel(channel) Apr 20 10:35:34 gat-4.eu.galaxy.training galaxyctl[1034676]: File "/srv/galaxy/venv/lib/python3.10/site-packages/kombu/connection.py", line 1052, in maybe_channel Apr 20 10:35:34 gat-4.eu.galaxy.training galaxyctl[1034676]: return channel.default_channel Apr 20 10:35:34 gat-4.eu.galaxy.training galaxyctl[1034676]: File "/srv/galaxy/venv/lib/python3.10/site-packages/kombu/connection.py", line 895, in default_channel Apr 20 10:35:34 gat-4.eu.galaxy.training galaxyctl[1034676]: self._ensure_connection(**conn_opts) Apr 20 10:35:34 gat-4.eu.galaxy.training galaxyctl[1034676]: File "/srv/galaxy/venv/lib/python3.10/site-packages/kombu/connection.py", line 432, in _ensure_connection Apr 20 10:35:34 gat-4.eu.galaxy.training galaxyctl[1034676]: with ctx(): Apr 20 10:35:34 gat-4.eu.galaxy.training galaxyctl[1034676]: File "/usr/lib/python3.10/contextlib.py", line 153, in __exit__ Apr 20 10:35:34 gat-4.eu.galaxy.training galaxyctl[1034676]: self.gen.throw(typ, value, traceback) Apr 20 10:35:34 gat-4.eu.galaxy.training galaxyctl[1034676]: File "/srv/galaxy/venv/lib/python3.10/site-packages/kombu/connection.py", line 450, in _reraise_as_library_errors Apr 20 10:35:34 gat-4.eu.galaxy.training galaxyctl[1034676]: raise ConnectionError(str(exc)) from exc Apr 20 10:35:34 gat-4.eu.galaxy.training galaxyctl[1034676]: kombu.exceptions.OperationalError: [Errno 111] Connection refused ``` on the client side (=Galaxy) when restarting rabbitmq. --- pulsar/client/amqp_exchange.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pulsar/client/amqp_exchange.py b/pulsar/client/amqp_exchange.py index 4b9399f7..f24ebb94 100644 --- a/pulsar/client/amqp_exchange.py +++ b/pulsar/client/amqp_exchange.py @@ -10,6 +10,7 @@ try: import kombu + import kombu.exceptions from kombu import pools except ImportError: kombu = None @@ -128,7 +129,11 @@ def consume(self, queue_name, callback, check=True, connection_kwargs={}): connection.drain_events(timeout=self.__timeout) except socket.timeout: pass - except (OSError, amqp.exceptions.ConnectionForced, amqp.exceptions.RecoverableChannelError, amqp.exceptions.RecoverableConnectionError) as exc: + except ( + OSError, + amqp.exceptions.ConnectionForced, + kombu.exceptions.OperationalError, + ) as exc: self.__handle_io_error(exc, heartbeat_thread) except BaseException: log.exception("Problem consuming queue, consumer quitting in problematic fashion!") From ca385a2c216a90e9a8339ce3675d992cdf12b5ac Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Thu, 20 Apr 2023 14:11:47 +0200 Subject: [PATCH 23/35] Ignore missing amqp imports in mypy --- mypy.ini | 3 +++ 1 file changed, 3 insertions(+) diff --git a/mypy.ini b/mypy.ini index e8f9bc4a..e8722c14 100644 --- a/mypy.ini +++ b/mypy.ini @@ -35,6 +35,9 @@ ignore_missing_imports = True [mypy-kombu.*] ignore_missing_imports = True +[mypy-amqp.*] +ignore_missing_imports = True + [mypy-requests_toolbelt.*] ignore_missing_imports = True From c2b5e68c17472904f3f325f4bbb91e290c2ba6e0 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Fri, 21 Apr 2023 09:52:14 +0200 Subject: [PATCH 24/35] Add connection closed and publish timeout to exceptions that should lead to retries --- pulsar/client/amqp_exchange.py | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/pulsar/client/amqp_exchange.py b/pulsar/client/amqp_exchange.py index f24ebb94..b82f8156 100644 --- a/pulsar/client/amqp_exchange.py +++ b/pulsar/client/amqp_exchange.py @@ -7,6 +7,7 @@ sleep, time, ) +from typing import Optional try: import kombu @@ -78,6 +79,14 @@ def __init__( raise Exception(KOMBU_UNAVAILABLE) if not amqp: raise Exception(AMQP_UNAVAILABLE) + # conditional imports and type checking prevent us from doing this at the module level. + self.recoverable_exceptions = ( + socket.timeout, + amqp.exceptions.ConnectionForced, # e.g. connection closed on rabbitmq sigterm + amqp.exceptions.RecoverableConnectionError, # connection closed + amqp.exceptions.RecoverableChannelError, # publish time out + kombu.exceptions.OperationalError, # ConnectionRefusedError, e.g. when getting a new connection while rabbitmq is down + ) self.__url = url self.__manager_name = manager_name self.__amqp_key_prefix = amqp_key_prefix @@ -125,15 +134,8 @@ def consume(self, queue_name, callback, check=True, connection_kwargs={}): with kombu.Consumer(connection, queues=[queue], callbacks=callbacks, accept=['json']): heartbeat_thread = self.__start_heartbeat(queue_name, connection) while check and connection.connected: - try: - connection.drain_events(timeout=self.__timeout) - except socket.timeout: - pass - except ( - OSError, - amqp.exceptions.ConnectionForced, - kombu.exceptions.OperationalError, - ) as exc: + connection.drain_events(timeout=self.__timeout) + except self.recoverable_exceptions as exc: self.__handle_io_error(exc, heartbeat_thread) except BaseException: log.exception("Problem consuming queue, consumer quitting in problematic fashion!") @@ -175,7 +177,7 @@ def __ack_callback(self, body, message): log.warning('Cannot remove UUID %s from store, already removed', ack_uuid) message.ack() - def __handle_io_error(self, exc, heartbeat_thread): + def __handle_io_error(self, exc: BaseException, heartbeat_thread: Optional[threading.Thread] = None): # In testing, errno is None log.warning('Got %s, will retry: %s', exc.__class__.__name__, exc) try: @@ -247,8 +249,12 @@ def ack_manager(self): log.debug('UUID %s has not been acknowledged, ' 'republishing original message on queue %s', unack_uuid, resubmit_queue) - self.publish(resubmit_queue, payload) - self.publish_uuid_store.set_time(unack_uuid) + try: + self.publish(resubmit_queue, payload) + self.publish_uuid_store.set_time(unack_uuid) + except self.recoverable_exceptions as e: + self.__handle_io_error(e) + continue except Exception: log.exception("Problem with acknowledgement manager, leaving ack_manager method in problematic state!") raise From 8b9e2a8150e57a958e0b980261d3297bf1fa20c3 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Wed, 12 Apr 2023 13:57:21 +0200 Subject: [PATCH 25/35] Add publish timeout option to kombu --- app.yml.sample | 4 ++++ dev-requirements.txt | 2 +- pulsar/client/amqp_exchange_factory.py | 2 +- pulsar/messaging/bind_amqp.py | 1 + setup.py | 1 + 5 files changed, 8 insertions(+), 2 deletions(-) diff --git a/app.yml.sample b/app.yml.sample index 69113ff8..6e52e2cf 100644 --- a/app.yml.sample +++ b/app.yml.sample @@ -89,6 +89,10 @@ ## kombu.Connection's drain_events() method. #amqp_consumer_timeout: 0.2 +## publishing messages to the queue may hang if the connection becomes invalid. +## this value is used as the timeout argument to the producer.publish function. +#amqp_publish_timeout: 2.0 + # AMQP does not guarantee that a published message is received by the AMQP # server, so Pulsar can request that the consumer acknowledge messages and will # resend them if acknowledgement is not received after a configurable timeout diff --git a/dev-requirements.txt b/dev-requirements.txt index f163c94d..406872f8 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,6 +1,6 @@ # Optional requirements used by test cases pycurl -kombu +kombu>=5.2.3 pykube boto3 diff --git a/pulsar/client/amqp_exchange_factory.py b/pulsar/client/amqp_exchange_factory.py index 6ca8fd11..94f41f57 100644 --- a/pulsar/client/amqp_exchange_factory.py +++ b/pulsar/client/amqp_exchange_factory.py @@ -11,7 +11,7 @@ def get_exchange(url, manager_name, params): manager_name=manager_name, amqp_key_prefix=params.get("amqp_key_prefix"), connect_ssl=connect_ssl, - publish_kwds=parse_amqp_publish_kwds(params) + publish_kwds=parse_amqp_publish_kwds(params), ) if params.get('amqp_acknowledge', False): exchange_kwds.update(parse_ack_kwds(params, manager_name)) diff --git a/pulsar/messaging/bind_amqp.py b/pulsar/messaging/bind_amqp.py index 4d7798e9..c735a3cb 100644 --- a/pulsar/messaging/bind_amqp.py +++ b/pulsar/messaging/bind_amqp.py @@ -15,6 +15,7 @@ TYPED_PARAMS = { "amqp_consumer_timeout": lambda val: None if str(val) == "None" else float(val), + "amqp_publish_timeout": lambda val: None if str(val) == "None" else float(val), "amqp_publish_retry": asbool, "amqp_publish_retry_max_retries": int, "amqp_publish_retry_interval_start": int, diff --git a/setup.py b/setup.py index 5dfbc38c..de1898c7 100644 --- a/setup.py +++ b/setup.py @@ -104,6 +104,7 @@ include_package_data=True, install_requires=requirements, extras_require={ + 'amqp': ['kombu>=5.2.3'], 'web': ['Paste', 'PasteScript'], 'galaxy_extended_metadata': ['galaxy-job-execution', 'galaxy-util[template]'], }, From cfede2b987d3b192d0cf855f5893958b8b26b5b9 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Thu, 20 Apr 2023 16:46:55 +0200 Subject: [PATCH 26/35] Conditionally enable publish timeout argument --- dev-requirements.txt | 2 +- pulsar/client/amqp_exchange.py | 6 ++++++ setup.py | 2 +- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/dev-requirements.txt b/dev-requirements.txt index 406872f8..f163c94d 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,6 +1,6 @@ # Optional requirements used by test cases pycurl -kombu>=5.2.3 +kombu pykube boto3 diff --git a/pulsar/client/amqp_exchange.py b/pulsar/client/amqp_exchange.py index b82f8156..0d500ee2 100644 --- a/pulsar/client/amqp_exchange.py +++ b/pulsar/client/amqp_exchange.py @@ -9,6 +9,7 @@ ) from typing import Optional +from packaging.version import parse as parse_version try: import kombu import kombu.exceptions @@ -46,6 +47,7 @@ ACK_FORCE_NOACK_KEY = 'force_noack' DEFAULT_ACK_MANAGER_SLEEP = 15 DEFAULT_REPUBLISH_TIME = 30 +MINIMUM_KOMBU_VERSION_PUBLISH_TIMEOUT = parse_version("5.2.0") class PulsarExchange: @@ -87,6 +89,7 @@ def __init__( amqp.exceptions.RecoverableChannelError, # publish time out kombu.exceptions.OperationalError, # ConnectionRefusedError, e.g. when getting a new connection while rabbitmq is down ) + self.__kombu_version = parse_version(kombu.__version__) self.__url = url self.__manager_name = manager_name self.__amqp_key_prefix = amqp_key_prefix @@ -291,6 +294,9 @@ def errback(exc, interval): publish_kwds["retry_policy"]["errback"] = errback else: publish_kwds = self.__publish_kwds + if self.__kombu_version < MINIMUM_KOMBU_VERSION_PUBLISH_TIMEOUT: + log.warning(f"kombu version {kombu.__version__} does not support timeout argument to publish. Consider updating to 5.2.0 or newer") + publish_kwds.pop("timeout", None) return publish_kwds def __publish_log_prefex(self, transaction_uuid=None): diff --git a/setup.py b/setup.py index de1898c7..689ba972 100644 --- a/setup.py +++ b/setup.py @@ -104,7 +104,7 @@ include_package_data=True, install_requires=requirements, extras_require={ - 'amqp': ['kombu>=5.2.3'], + 'amqp': ['kombu'], 'web': ['Paste', 'PasteScript'], 'galaxy_extended_metadata': ['galaxy-job-execution', 'galaxy-util[template]'], }, From 0a5410a63ce52c47339be0204693b3dbe0d34785 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Sun, 23 Apr 2023 09:50:02 +0200 Subject: [PATCH 27/35] 0.15.2.dev0 --- HISTORY.rst | 6 ++++++ pulsar/__init__.py | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/HISTORY.rst b/HISTORY.rst index 2969bde8..a53a4f9d 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -5,6 +5,11 @@ History .. to_doc +--------------------- +0.15.2.dev0 (2023-04-23) +--------------------- +* Fix Pulsar and Pulsar client reconnection to AMQP server. `Pull Request 324`_ + --------------------- 0.15.1 (2023-04-13) --------------------- @@ -461,6 +466,7 @@ History .. github_links +.. _Pull Request 324: https://github.com/galaxyproject/pulsar/pull/324 .. _Pull Request 322: https://github.com/galaxyproject/pulsar/pull/322 .. _Pull Request 318: https://github.com/galaxyproject/pulsar/pull/318 .. _Pull Request 319: https://github.com/galaxyproject/pulsar/pull/319 diff --git a/pulsar/__init__.py b/pulsar/__init__.py index 2cc28b2c..1f506758 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -1,4 +1,4 @@ -__version__ = '0.15.1' +__version__ = '0.15.2.dev0' PROJECT_NAME = "pulsar" PROJECT_OWNER = PROJECT_USERAME = "galaxyproject" From bb1e5f5ba429f2469638d2a690790ccb05fdff53 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Sun, 23 Apr 2023 09:53:26 +0200 Subject: [PATCH 28/35] Fix up HISTORY.rst --- HISTORY.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/HISTORY.rst b/HISTORY.rst index a53a4f9d..6b22f300 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -5,9 +5,9 @@ History .. to_doc ---------------------- +-------------------------- 0.15.2.dev0 (2023-04-23) ---------------------- +-------------------------- * Fix Pulsar and Pulsar client reconnection to AMQP server. `Pull Request 324`_ --------------------- From 0692ad52db3f583ead54a437d08d7be63198755b Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Fri, 28 Apr 2023 10:46:38 +0200 Subject: [PATCH 29/35] Don't close heartbeat thread on timeout --- pulsar/client/amqp_exchange.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pulsar/client/amqp_exchange.py b/pulsar/client/amqp_exchange.py index 0d500ee2..bde924be 100644 --- a/pulsar/client/amqp_exchange.py +++ b/pulsar/client/amqp_exchange.py @@ -137,7 +137,10 @@ def consume(self, queue_name, callback, check=True, connection_kwargs={}): with kombu.Consumer(connection, queues=[queue], callbacks=callbacks, accept=['json']): heartbeat_thread = self.__start_heartbeat(queue_name, connection) while check and connection.connected: - connection.drain_events(timeout=self.__timeout) + try: + connection.drain_events(timeout=self.__timeout) + except socket.timeout: + pass except self.recoverable_exceptions as exc: self.__handle_io_error(exc, heartbeat_thread) except BaseException: From 6bbe49af5d1554f5ab2c11227a707ddb75a24e59 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Fri, 28 Apr 2023 16:11:53 +0200 Subject: [PATCH 30/35] Push 0.15.2.dev1 version --- HISTORY.rst | 6 ++++++ pulsar/__init__.py | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/HISTORY.rst b/HISTORY.rst index 6b22f300..ce33f8b5 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -5,6 +5,11 @@ History .. to_doc +-------------------------- +0.15.2.dev1 (2023-04-28) +-------------------------- +* Reduce verbosity of timeout exception catching. `Pull Request 325`_ + -------------------------- 0.15.2.dev0 (2023-04-23) -------------------------- @@ -466,6 +471,7 @@ History .. github_links +.. _Pull Request 325: https://github.com/galaxyproject/pulsar/pull/325 .. _Pull Request 324: https://github.com/galaxyproject/pulsar/pull/324 .. _Pull Request 322: https://github.com/galaxyproject/pulsar/pull/322 .. _Pull Request 318: https://github.com/galaxyproject/pulsar/pull/318 diff --git a/pulsar/__init__.py b/pulsar/__init__.py index 1f506758..516edd73 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -1,4 +1,4 @@ -__version__ = '0.15.2.dev0' +__version__ = '0.15.2.dev1' PROJECT_NAME = "pulsar" PROJECT_OWNER = PROJECT_USERAME = "galaxyproject" From fbf5214593eabc7ed2ec813ee1332a12633b14e8 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Tue, 2 May 2023 12:15:32 +0200 Subject: [PATCH 31/35] Publish pulsar 0.15.2 --- HISTORY.rst | 12 ++++-------- pulsar/__init__.py | 2 +- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/HISTORY.rst b/HISTORY.rst index ce33f8b5..226c2c2a 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -5,15 +5,11 @@ History .. to_doc --------------------------- -0.15.2.dev1 (2023-04-28) --------------------------- -* Reduce verbosity of timeout exception catching. `Pull Request 325`_ - --------------------------- -0.15.2.dev0 (2023-04-23) --------------------------- +--------------------- +0.15.2 (2023-05-02) +--------------------- * Fix Pulsar and Pulsar client reconnection to AMQP server. `Pull Request 324`_ +* Reduce verbosity of timeout exception catching. `Pull Request 325`_ --------------------- 0.15.1 (2023-04-13) diff --git a/pulsar/__init__.py b/pulsar/__init__.py index 516edd73..459b4941 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -1,4 +1,4 @@ -__version__ = '0.15.2.dev1' +__version__ = '0.15.2' PROJECT_NAME = "pulsar" PROJECT_OWNER = PROJECT_USERAME = "galaxyproject" From 39697add82ff1066e33468b2862e612a6403e6ba Mon Sep 17 00:00:00 2001 From: "Yakubov, Sergey" Date: Mon, 10 Apr 2023 09:41:53 -0400 Subject: [PATCH 32/35] add token endpoint as job launch parameter so that it is sent to Pulsar from Galaxy. See corresponding PR (https://github.com/galaxyproject/galaxy/pull/15300) and issue (https://github.com/galaxyproject/galaxy/issues/15526) for Galaxy --- pulsar/client/client.py | 21 +++++++++++++++++---- pulsar/client/staging/up.py | 2 +- pulsar/manager_endpoint_util.py | 3 +++ test/client_staging_test.py | 6 +++++- 4 files changed, 26 insertions(+), 6 deletions(-) diff --git a/pulsar/client/client.py b/pulsar/client/client.py index d7edff63..e28d84ea 100644 --- a/pulsar/client/client.py +++ b/pulsar/client/client.py @@ -100,6 +100,7 @@ def __init__(self, destination_params, job_id): setattr(self, attr, destination_params.get(attr, None)) self.env = destination_params.get("env", []) self.files_endpoint = destination_params.get("files_endpoint", None) + self.token_endpoint = destination_params.get("token_endpoint", None) default_file_action = self.destination_params.get("default_file_action", "transfer") if default_file_action not in actions: @@ -166,7 +167,8 @@ def __init__(self, destination_params, job_id, job_manager_interface): super().__init__(destination_params, job_id) self.job_manager_interface = job_manager_interface - def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None, dynamic_file_sources=None): + def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None, + dynamic_file_sources=None, token_endpoint=None): """ Queue up the execution of the supplied `command_line` on the remote server. Called launch for historical reasons, should be renamed to @@ -190,6 +192,8 @@ def launch(self, command_line, dependencies_description=None, env=None, remote_s if job_config and 'touch_outputs' in job_config: # message clients pass the entire job config launch_params['submit_extras'] = json_dumps({'touch_outputs': job_config['touch_outputs']}) + if token_endpoint is not None: + launch_params["token_endpoint"] = json_dumps({'token_endpoint': token_endpoint}) if job_config and self.setup_handler.local: # Setup not yet called, job properties were inferred from @@ -344,7 +348,8 @@ def __init__(self, destination_params, job_id, client_manager): self.client_manager = client_manager self.amqp_key_prefix = self.destination_params.get("amqp_key_prefix") - def _build_setup_message(self, command_line, dependencies_description, env, remote_staging, job_config, dynamic_file_sources): + def _build_setup_message(self, command_line, dependencies_description, env, remote_staging, job_config, + dynamic_file_sources, token_endpoint): """ """ launch_params = dict(command_line=command_line, job_id=self.job_id) @@ -359,6 +364,8 @@ def _build_setup_message(self, command_line, dependencies_description, env, remo launch_params['remote_staging'] = remote_staging launch_params['remote_staging']['ssh_key'] = self.ssh_key launch_params['dynamic_file_sources'] = dynamic_file_sources + launch_params['token_endpoint'] = token_endpoint + if job_config and self.setup_handler.local: # Setup not yet called, job properties were inferred from # destination arguments. Hence, must have Pulsar setup job @@ -397,7 +404,8 @@ def _build_status_request_message(self): class MessageJobClient(BaseMessageJobClient): - def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None, dynamic_file_sources=None): + def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None, + dynamic_file_sources=None, token_endpoint=None): """ """ launch_params = self._build_setup_message( @@ -407,6 +415,7 @@ def launch(self, command_line, dependencies_description=None, env=None, remote_s remote_staging=remote_staging, job_config=job_config, dynamic_file_sources=dynamic_file_sources, + token_endpoint=token_endpoint, ) self.client_manager.exchange.publish("setup", launch_params) log.info("Job published to setup message queue: %s", self.job_id) @@ -429,7 +438,8 @@ def __init__(self, destination_params, job_id, client_manager, shell): self.remote_pulsar_path = destination_params["remote_pulsar_path"] self.shell = shell - def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None, dynamic_file_sources=None): + def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None, + dynamic_file_sources=None, token_endpoint=None): """ """ launch_params = self._build_setup_message( @@ -439,6 +449,7 @@ def launch(self, command_line, dependencies_description=None, env=None, remote_s remote_staging=remote_staging, job_config=job_config, dynamic_file_sources=dynamic_file_sources, + token_endpoint=token_endpoint, ) base64_message = to_base64_json(launch_params) submit_command = os.path.join(self.remote_pulsar_path, "scripts", "submit.bash") @@ -479,6 +490,7 @@ def launch( job_config=None, dynamic_file_sources=None, container_info=None, + token_endpoint=None, pulsar_app_config=None ) -> Optional[ExternalId]: """ @@ -490,6 +502,7 @@ def launch( remote_staging=remote_staging, job_config=job_config, dynamic_file_sources=dynamic_file_sources, + token_endpoint=token_endpoint, ) container = None guest_ports = None diff --git a/pulsar/client/staging/up.py b/pulsar/client/staging/up.py index bfdddc29..9f08bef8 100644 --- a/pulsar/client/staging/up.py +++ b/pulsar/client/staging/up.py @@ -70,7 +70,7 @@ def submit_job(client, client_job_description, job_config=None): # potentially duplicated but we don't want to count on remote staging to include this # it needs to be in the response to Pulsar even Pulsar is inititing staging actions launch_kwds["dynamic_file_sources"] = client_job_description.client_outputs.dynamic_file_sources - + launch_kwds["token_endpoint"] = client.token_endpoint # for pulsar modalities that skip the explicit "setup" step, give them a chance to set an external # id from the submission process (e.g. to TES). launch_response = client.launch(**launch_kwds) diff --git a/pulsar/manager_endpoint_util.py b/pulsar/manager_endpoint_util.py index cea80cb2..bf96dd10 100644 --- a/pulsar/manager_endpoint_util.py +++ b/pulsar/manager_endpoint_util.py @@ -81,6 +81,8 @@ def submit_job(manager, job_config): submit_params = job_config.get('submit_params', {}) touch_outputs = job_config.get('touch_outputs', []) dynamic_file_sources = job_config.get("dynamic_file_sources", None) + token_endpoint = job_config.get("token_endpoint", None) + job_config = None if setup_params or force_setup: input_job_id = setup_params.get("job_id", job_id) @@ -108,6 +110,7 @@ def submit_job(manager, job_config): "env": env, "setup_params": setup_params, "dynamic_file_sources": dynamic_file_sources, + "token_endpoint": token_endpoint, } manager.preprocess_and_launch(job_id, launch_config) except Exception: diff --git a/test/client_staging_test.py b/test/client_staging_test.py index b161de9e..86298ef6 100644 --- a/test/client_staging_test.py +++ b/test/client_staging_test.py @@ -11,6 +11,7 @@ TEST_REQUIREMENT_1 = ToolRequirement("test1", "1.0") TEST_REQUIREMENT_2 = ToolRequirement("test2", "1.0") TEST_ENV_1 = dict(name="x", value="y") +TEST_TOKEN_ENDPOINT = "endpoint" class TestStager(TempDirectoryTestCase): @@ -153,6 +154,7 @@ def __init__(self, temp_directory, tool): self.default_file_action = "transfer" self.action_config_path = None self.files_endpoint = None + self.token_endpoint = TEST_TOKEN_ENDPOINT self.expected_tool = tool self.job_id = "1234" self.expected_command_line = None @@ -176,11 +178,13 @@ def setup(self, tool_id, tool_version, use_metadata=False): assert tool_version == self.expected_tool.version return {} - def launch(self, command_line, dependencies_description, job_config={}, remote_staging={}, env=[], dynamic_file_sources=None): + def launch(self, command_line, dependencies_description, job_config={}, remote_staging={}, env=[], dynamic_file_sources=None, + token_endpoint=None): if self.expected_command_line is not None: message = "Excepected command line {}, got {}".format(self.expected_command_line, command_line) assert self.expected_command_line == command_line, message assert dependencies_description.requirements == [TEST_REQUIREMENT_1, TEST_REQUIREMENT_2] + assert token_endpoint == TEST_TOKEN_ENDPOINT assert env == [TEST_ENV_1] def expect_command_line(self, expected_command_line): From 4609eca6604f881993411aa4ac7483650ce4f7cb Mon Sep 17 00:00:00 2001 From: "Yakubov, Sergey" Date: Mon, 10 Apr 2023 09:48:42 -0400 Subject: [PATCH 33/35] allows to configure plugins for user authentication/authorization. Includes methods to authenticate based on OIDC token (see issue https://github.com/galaxyproject/galaxy/issues/15526) --- app.yml.sample | 16 ++++++ pulsar/core.py | 6 +++ pulsar/managers/base/__init__.py | 3 ++ pulsar/user_auth/__init__.py | 0 pulsar/user_auth/manager.py | 66 ++++++++++++++++++++++++ pulsar/user_auth/methods/__init__.py | 0 pulsar/user_auth/methods/allow_all.py | 18 +++++++ pulsar/user_auth/methods/interface.py | 15 ++++++ pulsar/user_auth/methods/oidc.py | 74 +++++++++++++++++++++++++++ pulsar/user_auth/methods/userlist.py | 21 ++++++++ requirements.txt | 1 + test/manager_test.py | 8 ++- test/persistence_test.py | 3 +- test/test_utils.py | 37 +++++++++----- test/user_authorization_test.py | 20 ++++++++ 15 files changed, 274 insertions(+), 14 deletions(-) create mode 100644 pulsar/user_auth/__init__.py create mode 100644 pulsar/user_auth/manager.py create mode 100644 pulsar/user_auth/methods/__init__.py create mode 100644 pulsar/user_auth/methods/allow_all.py create mode 100644 pulsar/user_auth/methods/interface.py create mode 100644 pulsar/user_auth/methods/oidc.py create mode 100644 pulsar/user_auth/methods/userlist.py create mode 100644 test/user_authorization_test.py diff --git a/app.yml.sample b/app.yml.sample index 6e52e2cf..a4bcb3b3 100644 --- a/app.yml.sample +++ b/app.yml.sample @@ -124,6 +124,22 @@ ## Maximum number of seconds to sleep between each retry. #amqp_publish_retry_interval_max: 60 + +## configure user authentication/authorization plugins +## parameters depend on auth type. Authentication plugin should return a username +## and authorization plugin should authorize this user +#user_auth: +# authentication: +# - type: oidc +# oidc_jwks_url: https://login.microsoftonline.com/xxx/discovery/v2.0/keys +# oidc_provider: azure +# oidc_username_in_token: preferred_username +# oidc_username_template: *. +# authorization: +# - type: userlist +# userlist_allowed_users: +# - xxx + ## *Experimental*. Enable file caching by specifing a directory here. ## Directory used to store incoming file cache. It works fine for HTTP ## transfer, have not tested with staging by coping. Also there is no diff --git a/pulsar/core.py b/pulsar/core.py index cccadea3..c53dd3f2 100644 --- a/pulsar/core.py +++ b/pulsar/core.py @@ -19,6 +19,8 @@ from pulsar.tools import ToolBox from pulsar.tools.authorization import get_authorizer +from pulsar.user_auth.manager import UserAuthManager + log = getLogger(__name__) DEFAULT_PRIVATE_TOKEN = None @@ -46,6 +48,7 @@ def __init__(self, **conf): self.__setup_object_store(conf) self.__setup_dependency_manager(conf) self.__setup_job_metrics(conf) + self.__setup_user_auth_manager(conf) self.__setup_managers(conf) self.__setup_file_cache(conf) self.__setup_bind_to_message_queue(conf) @@ -71,6 +74,9 @@ def __setup_bind_to_message_queue(self, conf): queue_state = messaging.bind_app(self, message_queue_url, conf) self.__queue_state = queue_state + def __setup_user_auth_manager(self, conf): + self.user_auth_manager = UserAuthManager(conf) + def __setup_tool_config(self, conf): """ Setups toolbox object and authorization mechanism based diff --git a/pulsar/managers/base/__init__.py b/pulsar/managers/base/__init__.py index 2bbe0e8e..5c89eb66 100644 --- a/pulsar/managers/base/__init__.py +++ b/pulsar/managers/base/__init__.py @@ -73,6 +73,7 @@ def __init__(self, name, app, **kwds): self.tmp_dir = kwds.get("tmp_dir", None) self.debug = str(kwds.get("debug", False)).lower() == "true" self.authorizer = app.authorizer + self.user_auth_manager = app.user_auth_manager self.__init_system_properties() self.__init_env_vars(**kwds) self.dependency_manager = app.dependency_manager @@ -179,6 +180,8 @@ def _check_execution(self, job_id, tool_id, command_line): log.debug("job_id: {} - Checking authorization of command_line [{}]".format(job_id, command_line)) authorization = self._get_authorization(job_id, tool_id) job_directory = self._job_directory(job_id) + self.user_auth_manager.authorize(job_id, job_directory) + tool_files_dir = job_directory.tool_files_directory() for file in self._list_dir(tool_files_dir): if os.path.isdir(join(tool_files_dir, file)): diff --git a/pulsar/user_auth/__init__.py b/pulsar/user_auth/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pulsar/user_auth/manager.py b/pulsar/user_auth/manager.py new file mode 100644 index 00000000..08b16f77 --- /dev/null +++ b/pulsar/user_auth/manager.py @@ -0,0 +1,66 @@ +from abc import ABC + +import inspect + + +class UserAuthManager(ABC): + """ + Authorization/Authentication manager. + """ + + def __init__(self, config): + self._authorization_methods = [] + self._authentication_methods = [] + + try: + user_auth = config.get("user_auth", None) + if not user_auth: + return + authentications = user_auth.pop("authentication", []) + authorizations = user_auth.pop("authorization", []) + + for authorization in authorizations: + authorization.update(user_auth) + obj = get_object("pulsar.user_auth.methods." + authorization["type"], "auth_type", + authorization["type"]) + self._authorization_methods.append(obj(authorization)) + + for authentication in authentications: + authentication.update(user_auth) + obj = get_object("pulsar.user_auth.methods." + authentication["type"], "auth_type", + authentication["type"]) + self._authentication_methods.append(obj(authentication)) + except Exception as e: + raise Exception("cannot read auth configuration") from e + + def authorize(self, job_id, job_directory): + authentication_info = self.__authenticate(job_id, job_directory) + + if len(self._authorization_methods) == 0: + return True + for method in self._authorization_methods: + res = method.authorize(authentication_info) + if res: + return True + + raise Exception("Could not authorize job execution on remote resource") + + def __authenticate(self, job_id, job_directory): + if len(self._authentication_methods) == 0: + return {} + for method in self._authentication_methods: + res = method.authenticate(job_directory) + if res: + return res + + raise Exception("Could not authenticate job %s" % job_id) + + +def get_object(module_name, attribute_name, attribute_value): + module = __import__(module_name) + for comp in module_name.split(".")[1:]: + module = getattr(module, comp) + for _, obj in inspect.getmembers(module): + if inspect.isclass(obj) and hasattr(obj, attribute_name) and getattr(obj, attribute_name) == attribute_value: + return obj + raise Exception("Cannot find object %s with attribute %s=%s " % (module_name, attribute_name, attribute_value)) diff --git a/pulsar/user_auth/methods/__init__.py b/pulsar/user_auth/methods/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pulsar/user_auth/methods/allow_all.py b/pulsar/user_auth/methods/allow_all.py new file mode 100644 index 00000000..c51c521f --- /dev/null +++ b/pulsar/user_auth/methods/allow_all.py @@ -0,0 +1,18 @@ +from pulsar.user_auth.methods.interface import AuthMethod + + +class AlwaysAllowAuthMethod(AuthMethod): + """ + Always allow + """ + + def __init__(self, _config): + pass + + auth_type = "allow_all" + + def authorize(self, authentication_info): + return True + + def authenticate(self, job_directory): + return {"username": "anonymous"} diff --git a/pulsar/user_auth/methods/interface.py b/pulsar/user_auth/methods/interface.py new file mode 100644 index 00000000..983ca9ca --- /dev/null +++ b/pulsar/user_auth/methods/interface.py @@ -0,0 +1,15 @@ +from abc import ABC, abstractmethod + + +class AuthMethod(ABC): + """ + Defines the interface to various authentication/authorization methods. + """ + + @abstractmethod + def authorize(self, authentication_info): + raise NotImplementedError("a concrete class should implement this") + + @abstractmethod + def authenticate(self, job_directory): + raise NotImplementedError("a concrete class should implement this") diff --git a/pulsar/user_auth/methods/oidc.py b/pulsar/user_auth/methods/oidc.py new file mode 100644 index 00000000..5a14957d --- /dev/null +++ b/pulsar/user_auth/methods/oidc.py @@ -0,0 +1,74 @@ +import requests +import base64 +import json +import jwt +import re +from cryptography.hazmat.backends import default_backend +from cryptography.x509 import load_der_x509_certificate + +from pulsar.user_auth.methods.interface import AuthMethod + +import logging + +log = logging.getLogger(__name__) + + +def get_token(job_directory, provider): + log.debug("Getting OIDC token for provider " + provider + " from Galaxy") + endpoint = job_directory.load_metadata("launch_config")["token_endpoint"] + endpoint = endpoint + "&provider=" + provider + r = requests.get(url=endpoint) + return r.text + + +class OIDCAuth(AuthMethod): + """ + Authorization based on OIDC tokens + """ + auth_type = "oidc" + + def __init__(self, config): + try: + self._provider = config["oidc_provider"] + self._jwks_url = config["oidc_jwks_url"] + self._username_in_token = config["oidc_username_in_token"] + self._username_template = config["oidc_username_template"] + + except Exception as e: + raise Exception("cannot read OIDCAuth configuration") from e + + def _verify_token(self, token): + try: + # Obtain appropriate cert from JWK URI + key_set = requests.get(self._jwks_url, timeout=5) + encoded_header, rest = token.split('.', 1) + headerobj = json.loads(base64.b64decode(encoded_header + '==').decode('utf8')) + key_id = headerobj['kid'] + for key in key_set.json()['keys']: + if key['kid'] == key_id: + x5c = key['x5c'][0] + break + else: + raise jwt.DecodeError('Cannot find kid ' + key_id) + cert = load_der_x509_certificate(base64.b64decode(x5c), default_backend()) + # Decode token (exp date is checked automatically) + decoded_token = jwt.decode( + token, + key=cert.public_key(), + algorithms=['RS256'], + options={'exp': True, 'verify_aud': False} + ) + return decoded_token + except Exception as error: + raise Exception("Error verifying jwt token") from error + + def authorize(self, authentication_info): + raise NotImplementedError("authorization not implemented for this class") + + def authenticate(self, job_directory): + token = get_token(job_directory, self._provider) + + decoded_token = self._verify_token(token) + user = decoded_token[self._username_in_token] + user = re.match(self._username_template, user).group(0) + return {"username": user} diff --git a/pulsar/user_auth/methods/userlist.py b/pulsar/user_auth/methods/userlist.py new file mode 100644 index 00000000..30f3c449 --- /dev/null +++ b/pulsar/user_auth/methods/userlist.py @@ -0,0 +1,21 @@ +from pulsar.user_auth.methods.interface import AuthMethod + + +class UserListAuth(AuthMethod): + """ + Defines authorization user by username + """ + + def __init__(self, config): + try: + self._allowed_users = config["userlist_allowed_users"] + except Exception as e: + raise Exception("cannot read UsernameAuth configuration") from e + + auth_type = "userlist" + + def authorize(self, authentication_info): + return authentication_info["username"] in self._allowed_users + + def authenticate(self, job_directory): + raise NotImplementedError("authentication not implemented for this class") diff --git a/requirements.txt b/requirements.txt index 44b0c6d4..46f29c85 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,6 +9,7 @@ galaxy-util>=22.1.2 paramiko typing-extensions pydantic-tes>=0.1.5 +pyjwt ## Uncomment if using DRMAA queue manager. #drmaa diff --git a/test/manager_test.py b/test/manager_test.py index 147456c9..09491b95 100644 --- a/test/manager_test.py +++ b/test/manager_test.py @@ -2,7 +2,7 @@ from os.path import join -from .test_utils import BaseManagerTestCase +from .test_utils import BaseManagerTestCase, get_failing_user_auth_manager class ManagerTest(BaseManagerTestCase): @@ -34,6 +34,12 @@ def test_unauthorized_command_line(self): with self.assertRaises(Exception): self.manager.launch(job_id, 'python') + def test_unauthorized_user(self): + self.manager.user_auth_manager = get_failing_user_auth_manager() + job_id = self.manager.setup_job("123", "tool1", "1.0.0") + with self.assertRaises(Exception): + self.manager.launch(job_id, 'python') + def test_id_assigners(self): self._set_manager(assign_ids="galaxy") job_id = self.manager.setup_job("123", "tool1", "1.0.0") diff --git a/test/persistence_test.py b/test/persistence_test.py index 99367a4f..3c90868f 100644 --- a/test/persistence_test.py +++ b/test/persistence_test.py @@ -7,7 +7,7 @@ from pulsar.tools.authorization import get_authorizer from .test_utils import ( temp_directory, - TestDependencyManager + TestDependencyManager, get_test_user_auth_manager ) from galaxy.job_metrics import NULL_JOB_INSTRUMENTER from galaxy.util.bunch import Bunch @@ -125,6 +125,7 @@ def _app(): staging_directory=staging_directory, persistence_directory=staging_directory, authorizer=get_authorizer(None), + user_auth_manager=get_test_user_auth_manager(), dependency_manager=TestDependencyManager(), job_metrics=Bunch(default_job_instrumenter=NULL_JOB_INSTRUMENTER), object_store=None, diff --git a/test/test_utils.py b/test/test_utils.py index 00c021a2..e41f9130 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -32,6 +32,7 @@ from pulsar.managers.util import drmaa from pulsar.tools import ToolBox from pulsar.managers.base import JobDirectory +from pulsar.user_auth.manager import UserAuthManager from unittest import TestCase, skip @@ -171,6 +172,7 @@ def setUp(self): self.app = minimal_app_for_managers() self.staging_directory = self.app.staging_directory self.authorizer = self.app.authorizer + self.user_auth_manager = self.app.user_auth_manager def tearDown(self): rmtree(self.staging_directory) @@ -229,13 +231,26 @@ def minimal_app_for_managers(): staging_directory = temp_directory_persist(prefix='minimal_app_') rmtree(staging_directory) authorizer = TestAuthorizer() + user_auth_manager = get_test_user_auth_manager() return Bunch(staging_directory=staging_directory, authorizer=authorizer, job_metrics=NullJobMetrics(), dependency_manager=TestDependencyManager(), + user_auth_manager=user_auth_manager, object_store=object()) +def get_test_user_auth_manager(): + config = {"user_auth": {"authentication": [{"type": "allow_all"}], "authorization": [{"type": "allow_all"}]}} + return UserAuthManager(config) + + +def get_failing_user_auth_manager(): + config = {"user_auth": {"authentication": [{"type": "allow_all"}], + "authorization": [{"type": "userlist", "userlist_allowed_users": []}]}} + return UserAuthManager(config) + + class NullJobMetrics: def __init__(self): @@ -286,11 +301,11 @@ def __init__(self, global_conf={}, app_conf={}, test_conf={}, web=True): @contextmanager def new_app(self): with test_pulsar_app( - self.global_conf, - self.app_conf, - self.test_conf, - staging_directory=self.staging_directory, - web=self.web, + self.global_conf, + self.app_conf, + self.test_conf, + staging_directory=self.staging_directory, + web=self.web, ) as app: yield app @@ -313,11 +328,11 @@ def restartable_pulsar_app_provider(**kwds): @nottest @contextmanager def test_pulsar_app( - global_conf={}, - app_conf={}, - test_conf={}, - staging_directory=None, - web=True, + global_conf={}, + app_conf={}, + test_conf={}, + staging_directory=None, + web=True, ): clean_staging_directory = False if staging_directory is None: @@ -424,7 +439,6 @@ def skip_without_drmaa(f): def _which(program): - def is_exe(fpath): return isfile(fpath) and access(fpath, X_OK) @@ -491,7 +505,6 @@ def dump_other_threads(): # Extracted from: https://github.com/python/cpython/blob/ # 937ee9e745d7ff3c2010b927903c0e2a83623324/Lib/test/support/__init__.py class EnvironmentVarGuard: - """Class to help protect the environment variable properly. Can be used as a context manager.""" diff --git a/test/user_authorization_test.py b/test/user_authorization_test.py new file mode 100644 index 00000000..3696a243 --- /dev/null +++ b/test/user_authorization_test.py @@ -0,0 +1,20 @@ +from unittest import TestCase + +from test.test_utils import get_test_user_auth_manager, get_failing_user_auth_manager + + +class UserAuthorizationTestCase(TestCase): + + def setUp(self): + self.authorizer = get_test_user_auth_manager() + self.failing_authorizer = get_failing_user_auth_manager() + + def test_passes(self): + self.authorizer.authorize("123", None) + + def test_fails(self): + with self.unauthorized_expectation(): + self.failing_authorizer.authorize("123", None) + + def unauthorized_expectation(self): + return self.assertRaises(Exception) From 56faf45b49781b069ec06a019712c712d4c0cf0a Mon Sep 17 00:00:00 2001 From: "Yakubov, Sergey" Date: Wed, 3 May 2023 15:46:29 -0400 Subject: [PATCH 34/35] update documentation for user auth plugins --- .gitignore | 1 + docs/configure.rst | 28 ++++++++++++++++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/.gitignore b/.gitignore index a4956890..3ee584c8 100644 --- a/.gitignore +++ b/.gitignore @@ -32,3 +32,4 @@ dependencies dependency_resolvers_conf.xml job_metrics_conf.xml .DS_Store +.idea diff --git a/docs/configure.rst b/docs/configure.rst index a2a8dbe4..fb1dc748 100644 --- a/docs/configure.rst +++ b/docs/configure.rst @@ -112,6 +112,34 @@ You can consult the `Kombu documentation `__ for even more information. +User Authentication/Authorization +````````````` + +You can configure Pulsar to authenticate user during request processing and check +if this user is allowed to run a job. + +Various authentication/authorization plugins can be configured in `app.yml` to +do that and plugin parameters depend on auth type. For example, the following +configuration uses `oidc` plugin for authentication and `userlist` for +authorization:: + + user_auth: + authentication: + - type: oidc + oidc_jwks_url: https://login.microsoftonline.com/xxx/discovery/v2.0/keys + oidc_provider: azure + oidc_username_in_token: preferred_username + oidc_username_template: *. + authorization: + - type: userlist + userlist_allowed_users: + - xxx + + +see `plugins folder +`_ +for available plugins and their parameters. + Customizing the Pulsar Environment (\*nix only) ----------------------------------------------- From 8fe294204180dd41ae2fcf33ddc708b8263d47e7 Mon Sep 17 00:00:00 2001 From: "Yakubov, Sergey" Date: Thu, 4 May 2023 08:21:01 -0400 Subject: [PATCH 35/35] add user_auth package to setup.py --- setup.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/setup.py b/setup.py index 689ba972..68faeed3 100644 --- a/setup.py +++ b/setup.py @@ -75,6 +75,8 @@ 'pulsar.messaging', 'pulsar.scripts', 'pulsar.tools', + 'pulsar.user_auth', + 'pulsar.user_auth.methods', 'pulsar.util', 'pulsar.util.pastescript', 'pulsar.web',