Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle input params correctly for rerun_invocation() #395

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions bioblend/_tests/TestGalaxyInvocations.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,27 @@ def test_rerun_invocation(self):
history = self.gi.histories.show_history(rerun_invocation["history_id"], contents=True)
self.assertEqual(len(history), 3)

@test_util.skip_unless_galaxy("release_21.05")
def test_rerun_and_remap_invocation(self):
path = test_util.get_abspath(os.path.join("data", "select_first.ga"))
wf = self.gi.workflows.import_workflow_from_local_path(path)
wf_inputs = {
"0": {"src": "hda", "id": self.dataset_id},
"1": "-1",
}
invocation_id = self.gi.workflows.invoke_workflow(wf["id"], inputs=wf_inputs, history_id=self.history_id)["id"]
self.gi.invocations.wait_for_invocation(invocation_id)
failed_datasets = self.gi.datasets.get_datasets(history_id=self.history_id, name="Select first on data 1")
failed_dataset = self.gi.datasets.wait_for_dataset(failed_datasets[0]["id"], check=False)
self.assertEqual(failed_dataset["state"], "error")

self.gi.invocations.rerun_invocation(invocation_id, remap=True)
for dataset in self.gi.datasets.get_datasets(history_id=self.history_id):
self.gi.datasets.wait_for_dataset(dataset["id"], check=False)

new_ok_jobs = self.gi.datasets.get_datasets(state="ok", history_id=self.history_id)
self.assertEqual(len(new_ok_jobs), 3)

def _invoke_workflow(self):
dataset = {"src": "hda", "id": self.dataset_id}

Expand Down
58 changes: 45 additions & 13 deletions bioblend/galaxy/invocations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,9 @@ def show_invocation(self, invocation_id):
def rerun_invocation(
self,
invocation_id: str,
remap: bool = False,
inputs_update: Optional[dict] = None,
params_update: Optional[dict] = None,
params: Optional[dict] = None,
history_id: Optional[str] = None,
history_name: Optional[str] = None,
import_inputs_to_history: bool = False,
Expand All @@ -153,15 +154,23 @@ def rerun_invocation(
:type invocation_id: str
:param invocation_id: Encoded workflow invocation ID to be rerun

:type remap: bool
:param remap: when ``True``, only failed jobs will be rerun. All other parameters
for this method will be ignored. Job output(s) will be remapped onto the dataset(s)
created by the original jobs; if other jobs were waiting for these jobs to finish
successfully, they will be resumed using the new outputs of the tool runs. When ``False``,
an entire new invocation will be created, using the other parameters specified.

:type inputs_update: dict
:param inputs_update: If different datasets should be used to the original
:param inputs_update: If different datasets or parameters should be used to the original
invocation, this should contain a mapping of workflow inputs to the new
datasets and dataset collections.

:type params_update: dict
:param params_update: If different non-dataset tool parameters should be
:type params: dict
:param params: If different non-dataset tool parameters should be
used to the original invocation, this should contain a mapping of the
new parameter values.
new parameter values. Runtime parameters should be specified through
``inputs_update``.

:type history_id: str
:param history_id: The encoded history ID where to store the workflow
Expand Down Expand Up @@ -199,24 +208,47 @@ def rerun_invocation(
Default is ``False``, but when setting parameters for a subworkflow,
``True`` is required.

:rtype: dict
:return: A dict describing the new workflow invocation.
:rtype: dict if ``remap=False``, or list if ``remap=True``
:return: A dict describing the new workflow invocation, or a list of remapped jobs.

.. note::
This method works only on Galaxy 21.01 or later.
"""
if remap:
errored_jobs = self.gi.jobs.get_jobs(state="error", invocation_id=invocation_id)
remap_failures = 0
rerun_jobs = []
for job in errored_jobs:
try:
job = self.gi.jobs.rerun_job(job["id"], remap=True)
rerun_jobs.append(job)
except ValueError:
# should not occur, jobs from an invocation should always be remappable
remap_failures += 1
if remap_failures:
raise ValueError(
f"remap was set to True, but {remap_failures} out of {len(errored_jobs)} errored jobs could not be remapped."
)
return rerun_jobs

invocation_details = self.show_invocation(invocation_id)
workflow_id = invocation_details["workflow_id"]
inputs = invocation_details["inputs"]
wf_params = invocation_details["input_step_parameters"]
step_ids_to_indices = {
step["workflow_step_id"]: step_index for step_index, step in enumerate(invocation_details["steps"])
}
inputs.update(
{
step_ids_to_indices[param["workflow_step_id"]]: param["parameter_value"]
for _, param in invocation_details["input_step_parameters"].items()
}
)
if inputs_update:
for inp, input_value in inputs_update.items():
inputs[inp] = input_value
if params_update:
for param, param_value in params_update.items():
wf_params[param] = param_value
payload = {"inputs": inputs, "params": wf_params}

payload = {"inputs": inputs}
if params:
payload["parameters"] = params
if replacement_params:
payload["replacement_params"] = replacement_params
if history_id:
Expand Down