From 005dd7708b35deb7f7b59b50796cea827884f977 Mon Sep 17 00:00:00 2001 From: Giorgi Kikolashvili Date: Fri, 8 Nov 2024 15:28:25 +0100 Subject: [PATCH 1/2] get_run paginates tasks and iterations --- databricks/sdk/mixins/jobs.py | 49 ++++++++++++++ tests/test_jobs_mixin.py | 123 ++++++++++++++++++++++++++++++++++ 2 files changed, 172 insertions(+) create mode 100644 databricks/sdk/mixins/jobs.py create mode 100644 tests/test_jobs_mixin.py diff --git a/databricks/sdk/mixins/jobs.py b/databricks/sdk/mixins/jobs.py new file mode 100644 index 00000000..01fb013b --- /dev/null +++ b/databricks/sdk/mixins/jobs.py @@ -0,0 +1,49 @@ +from typing import Optional + +from databricks.sdk.service import jobs + + +class JobsExt(jobs.JobsAPI): + + def get_run(self, + run_id: int, + *, + include_history: Optional[bool] = None, + include_resolved_values: Optional[bool] = None, + page_token: Optional[str] = None) -> jobs.Run: + """ + This method fetches the details of a run identified by `run_id`. If the run has multiple pages of tasks or iterations, + it will paginate through all pages and aggregate the results. + :param run_id: int + The canonical identifier of the run for which to retrieve the metadata. This field is required. + :param include_history: bool (optional) + Whether to include the repair history in the response. + :param include_resolved_values: bool (optional) + Whether to include resolved parameter values in the response. + :param page_token: str (optional) + To list the next page or the previous page of job tasks, set this field to the value of the + `next_page_token` or `prev_page_token` returned in the GetJob response. + :returns: :class:`Run` + """ + run = super().get_run(run_id, + include_history=include_history, + include_resolved_values=include_resolved_values, + page_token=page_token) + + # When querying a Job run, a page token is returned when there are more than 100 tasks. No iterations are defined for a Job run. Therefore, the next page in the response only includes the next page of tasks. + # When querying a ForEach task run, a page token is returned when there are more than 100 iterations. Only a single task is returned, corresponding to the ForEach task itself. Therefore, the client only reads the iterations from the next page and not the tasks. + is_paginating_iterations = run.iterations is not None and len(run.iterations) > 0 + + while run.next_page_token is not None: + next_run = super().get_run(run_id, + include_history=include_history, + include_resolved_values=include_resolved_values, + page_token=run.next_page_token) + if is_paginating_iterations: + run.iterations.extend(next_run.iterations) + else: + run.tasks.extend(next_run.tasks) + run.next_page_token = next_run.next_page_token + + run.prev_page_token = None + return run \ No newline at end of file diff --git a/tests/test_jobs_mixin.py b/tests/test_jobs_mixin.py new file mode 100644 index 00000000..9b5f2713 --- /dev/null +++ b/tests/test_jobs_mixin.py @@ -0,0 +1,123 @@ +import json +import re +from typing import Pattern + +from databricks.sdk import WorkspaceClient + + +def make_path_pattern(run_id: int, page_token: str) -> Pattern[str]: + return re.compile( + rf'{re.escape("http://localhost/api/")}2.\d{re.escape(f"/jobs/runs/get?page_token={page_token}&run_id={run_id}")}' + ) + + +def test_get_run_with_no_pagination(config, requests_mock): + run1 = {"tasks": [{"run_id": 0}, {"run_id": 1}], } + requests_mock.get(make_path_pattern(1337, "initialToken"), text=json.dumps(run1)) + w = WorkspaceClient(config=config) + + run = w.jobs.get_run(1337, page_token="initialToken") + + assert run.as_dict() == {"tasks": [{'run_id': 0}, {'run_id': 1}], } + + +def test_get_run_pagination_with_tasks(config, requests_mock): + run1 = { + "tasks": [{ + "run_id": 0 + }, { + "run_id": 1 + }], + "next_page_token": "tokenToSecondPage", + "prev_page_token": "tokenToPreviousPage" + } + run2 = { + "tasks": [{ + "run_id": 2 + }, { + "run_id": 3 + }], + "next_page_token": "tokenToThirdPage", + "prev_page_token": "initialToken" + } + run3 = {"tasks": [{"run_id": 4}], "next_page_token": None, "prev_page_token": "tokenToSecondPage"} + requests_mock.get(make_path_pattern(1337, "initialToken"), text=json.dumps(run1)) + requests_mock.get(make_path_pattern(1337, "tokenToSecondPage"), text=json.dumps(run2)) + requests_mock.get(make_path_pattern(1337, "tokenToThirdPage"), text=json.dumps(run3)) + w = WorkspaceClient(config=config) + + run = w.jobs.get_run(1337, page_token="initialToken") + + assert run.as_dict() == { + "tasks": [{ + 'run_id': 0 + }, { + 'run_id': 1 + }, { + 'run_id': 2 + }, { + 'run_id': 3 + }, { + 'run_id': 4 + }], + } + + +def test_get_run_pagination_with_iterations(config, requests_mock): + run1 = { + "tasks": [{ + "run_id": 1337 + }], + "iterations": [{ + "run_id": 0 + }, { + "run_id": 1 + }], + "next_page_token": "tokenToSecondPage", + "prev_page_token": "tokenToPreviousPage" + } + run2 = { + "tasks": [{ + "run_id": 1337 + }], + "iterations": [{ + "run_id": 2 + }, { + "run_id": 3 + }], + "next_page_token": "tokenToThirdPage", + "prev_page_token": "initialToken" + } + run3 = { + "tasks": [{ + "run_id": 1337 + }], + "iterations": [{ + "run_id": 4 + }], + "next_page_token": None, + "prev_page_token": "tokenToSecondPage" + } + requests_mock.get(make_path_pattern(1337, "initialToken"), text=json.dumps(run1)) + requests_mock.get(make_path_pattern(1337, "tokenToSecondPage"), text=json.dumps(run2)) + requests_mock.get(make_path_pattern(1337, "tokenToThirdPage"), text=json.dumps(run3)) + w = WorkspaceClient(config=config) + + run = w.jobs.get_run(1337, page_token="initialToken") + + assert run.as_dict() == { + "tasks": [{ + 'run_id': 1337 + }], + "iterations": [{ + 'run_id': 0 + }, { + 'run_id': 1 + }, { + 'run_id': 2 + }, { + 'run_id': 3 + }, { + 'run_id': 4 + }], + } \ No newline at end of file From fa9f460941aa9147835c0c085a9e53793f21649d Mon Sep 17 00:00:00 2001 From: Giorgi Kikolashvili Date: Fri, 8 Nov 2024 15:32:21 +0100 Subject: [PATCH 2/2] Add missing init 'wiring' --- databricks/sdk/__init__.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/databricks/sdk/__init__.py b/databricks/sdk/__init__.py index 746f8d7e..4f4689af 100755 --- a/databricks/sdk/__init__.py +++ b/databricks/sdk/__init__.py @@ -6,6 +6,7 @@ from databricks.sdk.credentials_provider import CredentialsStrategy from databricks.sdk.mixins.compute import ClustersExt from databricks.sdk.mixins.files import DbfsExt +from databricks.sdk.mixins.jobs import JobsExt from databricks.sdk.mixins.open_ai_client import ServingEndpointsExt from databricks.sdk.mixins.workspace import WorkspaceExt from databricks.sdk.service.apps import AppsAPI @@ -204,7 +205,7 @@ def __init__(self, self._instance_pools = InstancePoolsAPI(self._api_client) self._instance_profiles = InstanceProfilesAPI(self._api_client) self._ip_access_lists = IpAccessListsAPI(self._api_client) - self._jobs = JobsAPI(self._api_client) + self._jobs = JobsExt(self._api_client) self._lakeview = LakeviewAPI(self._api_client) self._libraries = LibrariesAPI(self._api_client) self._metastores = MetastoresAPI(self._api_client) @@ -450,7 +451,7 @@ def ip_access_lists(self) -> IpAccessListsAPI: return self._ip_access_lists @property - def jobs(self) -> JobsAPI: + def jobs(self) -> JobsExt: """The Jobs API allows you to create, edit, and delete jobs.""" return self._jobs