diff --git a/client/src/api/schema/schema.ts b/client/src/api/schema/schema.ts index 87e99248cd64..3fb68b0488d6 100644 --- a/client/src/api/schema/schema.ts +++ b/client/src/api/schema/schema.ts @@ -2594,6 +2594,23 @@ export interface paths { patch?: never; trace?: never; }; + "/api/invocations/{invocation_id}/metrics": { + parameters: { + query?: never; + header?: never; + path?: never; + cookie?: never; + }; + /** Get Invocation Metrics */ + get: operations["get_invocation_metrics_api_invocations__invocation_id__metrics_get"]; + put?: never; + post?: never; + delete?: never; + options?: never; + head?: never; + patch?: never; + trace?: never; + }; "/api/invocations/{invocation_id}/prepare_store_download": { parameters: { query?: never; @@ -26928,6 +26945,50 @@ export interface operations { }; }; }; + get_invocation_metrics_api_invocations__invocation_id__metrics_get: { + parameters: { + query?: never; + header?: { + /** @description The user ID that will be used to effectively make this API call. Only admins and designated users can make API calls on behalf of other users. */ + "run-as"?: string | null; + }; + path: { + /** @description The encoded database identifier of the Invocation. */ + invocation_id: string; + }; + cookie?: never; + }; + requestBody?: never; + responses: { + /** @description Successful Response */ + 200: { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["JobMetric"][]; + }; + }; + /** @description Request Error */ + "4XX": { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["MessageExceptionModel"]; + }; + }; + /** @description Server Error */ + "5XX": { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["MessageExceptionModel"]; + }; + }; + }; + }; prepare_store_download_api_invocations__invocation_id__prepare_store_download_post: { parameters: { query?: never; diff --git a/lib/galaxy/managers/jobs.py b/lib/galaxy/managers/jobs.py index 1d4a61dac6f0..4fb86b7f4e85 100644 --- a/lib/galaxy/managers/jobs.py +++ b/lib/galaxy/managers/jobs.py @@ -1,5 +1,6 @@ import json import logging +from collections.abc import Sequence from datetime import ( date, datetime, @@ -26,6 +27,7 @@ null, or_, true, + union, ) from sqlalchemy.orm import aliased from sqlalchemy.sql import select @@ -54,6 +56,7 @@ ImplicitCollectionJobs, ImplicitCollectionJobsJobAssociation, Job, + JobMetricNumeric, JobParameter, User, Workflow, @@ -729,6 +732,36 @@ def invocation_job_source_iter(sa_session, invocation_id): yield ("ImplicitCollectionJobs", row[1], row[2]) +def get_job_metrics_for_invocation(sa_session: galaxy_scoped_session, invocation_id: int): + single_job_stmnt = ( + select(JobMetricNumeric) + .join(Job, JobMetricNumeric.job_id == Job.id) + .join( + WorkflowInvocationStep, + and_( + WorkflowInvocationStep.workflow_invocation_id == invocation_id, WorkflowInvocationStep.job_id == Job.id + ), + ) + ) + collection_job_stmnt = ( + select(JobMetricNumeric) + .join(Job, JobMetricNumeric.job_id == Job.id) + .join(ImplicitCollectionJobsJobAssociation, Job.id == ImplicitCollectionJobsJobAssociation.job_id) + .join( + ImplicitCollectionJobs, + ImplicitCollectionJobs.id == ImplicitCollectionJobsJobAssociation.implicit_collection_jobs_id, + ) + .join( + WorkflowInvocationStep, + and_( + WorkflowInvocationStep.workflow_invocation_id == invocation_id, + WorkflowInvocationStep.implicit_collection_jobs_id == ImplicitCollectionJobs.id, + ), + ) + ) + return sa_session.execute(union(single_job_stmnt, collection_job_stmnt)).all() + + def fetch_job_states(sa_session, job_source_ids, job_source_types): assert len(job_source_ids) == len(job_source_types) job_ids = set() @@ -911,6 +944,10 @@ def summarize_job_metrics(trans, job): Precondition: the caller has verified the job is accessible to the user represented by the trans parameter. """ + return summarize_metrics(trans, job.metrics) + + +def summarize_metrics(trans: ProvidesUserContext, job_metrics): safety_level = Safety.SAFE if trans.user_is_admin: safety_level = Safety.UNSAFE @@ -922,7 +959,7 @@ def summarize_job_metrics(trans, job): m.metric_value, m.plugin, ) - for m in job.metrics + for m in job_metrics ] dictifiable_metrics = trans.app.job_metrics.dictifiable_metrics(raw_metrics, safety_level) return [d.dict() for d in dictifiable_metrics] diff --git a/lib/galaxy/webapps/galaxy/api/workflows.py b/lib/galaxy/webapps/galaxy/api/workflows.py index 6f94550e2134..e966b5f2b7a8 100644 --- a/lib/galaxy/webapps/galaxy/api/workflows.py +++ b/lib/galaxy/webapps/galaxy/api/workflows.py @@ -73,6 +73,7 @@ CreateWorkflowLandingRequestPayload, InvocationSortByEnum, InvocationsStateCounts, + JobMetric, SetSlugPayload, ShareWithPayload, ShareWithStatus, @@ -1760,3 +1761,11 @@ def workflow_invocation_jobs_summary( ) -> InvocationJobsResponse: """An alias for `GET /api/invocations/{invocation_id}/jobs_summary`. `workflow_id` is ignored.""" return self.invocation_jobs_summary(trans=trans, invocation_id=invocation_id) + + @router.get("/api/invocations/{invocation_id}/metrics") + def get_invocation_metrics( + self, + invocation_id: InvocationIDPathParam, + trans: ProvidesUserContext = DependsOnTrans, + ) -> List[JobMetric]: + return self.invocations_service.show_invocation_metrics(trans=trans, invocation_id=invocation_id) diff --git a/lib/galaxy/webapps/galaxy/services/invocations.py b/lib/galaxy/webapps/galaxy/services/invocations.py index 2195c7202dd7..81b109b224ef 100644 --- a/lib/galaxy/webapps/galaxy/services/invocations.py +++ b/lib/galaxy/webapps/galaxy/services/invocations.py @@ -20,7 +20,9 @@ from galaxy.managers.histories import HistoryManager from galaxy.managers.jobs import ( fetch_job_states, + get_job_metrics_for_invocation, invocation_job_source_iter, + summarize_metrics, ) from galaxy.managers.workflows import WorkflowsManager from galaxy.model import ( @@ -147,6 +149,10 @@ def show_invocation_step(self, trans, step_id) -> InvocationStep: ) return self.serialize_workflow_invocation_step(wfi_step) + def show_invocation_metrics(self, trans: ProvidesHistoryContext, invocation_id: int): + job_metrics = get_job_metrics_for_invocation(trans.sa_session, invocation_id) + return summarize_metrics(trans, job_metrics) + def update_invocation_step(self, trans, step_id, action): wfi_step = self._workflows_manager.update_invocation_step(trans, step_id, action) return self.serialize_workflow_invocation_step(wfi_step) diff --git a/lib/galaxy_test/api/test_workflows.py b/lib/galaxy_test/api/test_workflows.py index 8408837df347..54aadc9ab949 100644 --- a/lib/galaxy_test/api/test_workflows.py +++ b/lib/galaxy_test/api/test_workflows.py @@ -3474,6 +3474,39 @@ def test_workflow_new_autocreated_history(self): invocation_id = run_workflow_dict["id"] self.workflow_populator.wait_for_invocation_and_jobs(new_history_id, workflow_id, invocation_id) + def test_invocation_job_metrics_simple(self): + with self.dataset_populator.test_history() as history_id: + summary = self._run_workflow(WORKFLOW_SIMPLE, test_data={"input1": "hello world"}, history_id=history_id) + self.workflow_populator.wait_for_invocation_and_jobs( + history_id=history_id, workflow_id=summary.workflow_id, invocation_id=summary.invocation_id + ) + job_metrics = self._get(f"invocations/{summary.invocation_id}/metrics").json() + galaxy_slots = [m for m in job_metrics if m["name"] == "galaxy_slots"] + assert len(galaxy_slots) == 1 + + def test_invocation_job_metrics_map_over(self): + with self.dataset_populator.test_history() as history_id: + summary = self._run_workflow( + WORKFLOW_SIMPLE, + test_data={ + "input1": { + "collection_type": "list", + "name": "the_dataset_list", + "elements": [ + {"identifier": "el1", "value": "1.fastq", "type": "File"}, + {"identifier": "el2", "value": "1.fastq", "type": "File"}, + ], + } + }, + history_id=history_id, + ) + self.workflow_populator.wait_for_invocation_and_jobs( + history_id=history_id, workflow_id=summary.workflow_id, invocation_id=summary.invocation_id + ) + job_metrics = self._get(f"invocations/{summary.invocation_id}/metrics").json() + galaxy_slots = [m for m in job_metrics if m["name"] == "galaxy_slots"] + assert len(galaxy_slots) == 2 + def test_workflow_output_dataset(self): with self.dataset_populator.test_history() as history_id: summary = self._run_workflow(WORKFLOW_SIMPLE, test_data={"input1": "hello world"}, history_id=history_id)