From f3c43f450c4b2d7348deca764e67ca96a79afd7e Mon Sep 17 00:00:00 2001 From: Giorgi Kikolashvili Date: Fri, 8 Nov 2024 16:32:35 +0100 Subject: [PATCH] getRun paginates tasks and iterations --- .../com/databricks/sdk/WorkspaceClient.java | 12 +-- .../com/databricks/sdk/mixin/JobsExt.java | 60 +++++++++++++ .../com/databricks/sdk/mixin/JobsExtTest.java | 85 +++++++++++++++++++ 3 files changed, 151 insertions(+), 6 deletions(-) create mode 100644 databricks-sdk-java/src/main/java/com/databricks/sdk/mixin/JobsExt.java create mode 100644 databricks-sdk-java/src/test/java/com/databricks/sdk/mixin/JobsExtTest.java diff --git a/databricks-sdk-java/src/main/java/com/databricks/sdk/WorkspaceClient.java b/databricks-sdk-java/src/main/java/com/databricks/sdk/WorkspaceClient.java index 655080d97..1ffb47345 100755 --- a/databricks-sdk-java/src/main/java/com/databricks/sdk/WorkspaceClient.java +++ b/databricks-sdk-java/src/main/java/com/databricks/sdk/WorkspaceClient.java @@ -7,6 +7,7 @@ import com.databricks.sdk.core.DatabricksConfig; import com.databricks.sdk.mixin.ClustersExt; import com.databricks.sdk.mixin.DbfsExt; +import com.databricks.sdk.mixin.JobsExt; import com.databricks.sdk.mixin.SecretsExt; import com.databricks.sdk.service.apps.AppsAPI; import com.databricks.sdk.service.apps.AppsService; @@ -90,7 +91,6 @@ import com.databricks.sdk.service.iam.ServicePrincipalsService; import com.databricks.sdk.service.iam.UsersAPI; import com.databricks.sdk.service.iam.UsersService; -import com.databricks.sdk.service.jobs.JobsAPI; import com.databricks.sdk.service.jobs.JobsService; import com.databricks.sdk.service.jobs.PolicyComplianceForJobsAPI; import com.databricks.sdk.service.jobs.PolicyComplianceForJobsService; @@ -228,7 +228,7 @@ public class WorkspaceClient { private InstancePoolsAPI instancePoolsAPI; private InstanceProfilesAPI instanceProfilesAPI; private IpAccessListsAPI ipAccessListsAPI; - private JobsAPI jobsAPI; + private JobsExt jobsAPI; private LakeviewAPI lakeviewAPI; private LibrariesAPI librariesAPI; private MetastoresAPI metastoresAPI; @@ -327,7 +327,7 @@ public WorkspaceClient(DatabricksConfig config) { instancePoolsAPI = new InstancePoolsAPI(apiClient); instanceProfilesAPI = new InstanceProfilesAPI(apiClient); ipAccessListsAPI = new IpAccessListsAPI(apiClient); - jobsAPI = new JobsAPI(apiClient); + jobsAPI = new JobsExt(apiClient); lakeviewAPI = new LakeviewAPI(apiClient); librariesAPI = new LibrariesAPI(apiClient); metastoresAPI = new MetastoresAPI(apiClient); @@ -865,7 +865,7 @@ public IpAccessListsAPI ipAccessLists() { * https://docs.databricks.com/dev-tools/cli/secrets-cli.html [Secrets utility]: * https://docs.databricks.com/dev-tools/databricks-utils.html#dbutils-secrets */ - public JobsAPI jobs() { + public JobsExt jobs() { return jobsAPI; } @@ -2049,11 +2049,11 @@ public WorkspaceClient withIpAccessListsAPI(IpAccessListsAPI ipAccessLists) { /** Replace the default JobsService with a custom implementation. */ public WorkspaceClient withJobsImpl(JobsService jobs) { - return this.withJobsAPI(new JobsAPI(jobs)); + return this.withJobsAPI(new JobsExt(jobs)); } /** Replace the default JobsAPI with a custom implementation. */ - public WorkspaceClient withJobsAPI(JobsAPI jobs) { + public WorkspaceClient withJobsAPI(JobsExt jobs) { this.jobsAPI = jobs; return this; } diff --git a/databricks-sdk-java/src/main/java/com/databricks/sdk/mixin/JobsExt.java b/databricks-sdk-java/src/main/java/com/databricks/sdk/mixin/JobsExt.java new file mode 100644 index 000000000..f6f15f905 --- /dev/null +++ b/databricks-sdk-java/src/main/java/com/databricks/sdk/mixin/JobsExt.java @@ -0,0 +1,60 @@ +package com.databricks.sdk.mixin; + +import com.databricks.sdk.core.ApiClient; +import com.databricks.sdk.service.jobs.*; +import java.util.Collection; + +public class JobsExt extends JobsAPI { + + public JobsExt(ApiClient apiClient) { + super(apiClient); + } + + public JobsExt(JobsService mock) { + super(mock); + } + + /** + * Wrap the {@code JobsApi.getRun} operation to retrieve paginated content without breaking the + * response contract. + * + *

Depending on the Jobs API version used under the hood, tasks or iteration runs retrieved by + * the initial request may be truncated due to high cardinalities. Truncation can happen for job + * runs over 100 task runs, as well as ForEach task runs with over 100 iteration runs. To avoid + * returning an incomplete {@code Run} object to the user, this method performs all the requests + * required to collect all task/iteration runs into a single {@code Run} object. + */ + @Override + public Run getRun(GetRunRequest request) { + Run run = super.getRun(request); + + /* + * fetch all additional pages (if any) and accumulate the result in a single response + */ + + Collection iterations = run.getIterations(); + boolean paginatingIterations = iterations != null && !iterations.isEmpty(); + + Run currRun = run; + while (currRun.getNextPageToken() != null) { + request.setPageToken(currRun.getNextPageToken()); + currRun = super.getRun(request); + if (paginatingIterations) { + Collection newIterations = currRun.getIterations(); + if (newIterations != null) { + run.getIterations().addAll(newIterations); + } + } else { + Collection newTasks = currRun.getTasks(); + if (newTasks != null) { + run.getTasks().addAll(newTasks); + } + } + } + + // now that we've added all pages to the Run, the tokens are useless + run.setNextPageToken(null); + + return run; + } +} diff --git a/databricks-sdk-java/src/test/java/com/databricks/sdk/mixin/JobsExtTest.java b/databricks-sdk-java/src/test/java/com/databricks/sdk/mixin/JobsExtTest.java new file mode 100644 index 000000000..73fb22338 --- /dev/null +++ b/databricks-sdk-java/src/test/java/com/databricks/sdk/mixin/JobsExtTest.java @@ -0,0 +1,85 @@ +package com.databricks.sdk.mixin; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +import com.databricks.sdk.service.jobs.GetRunRequest; +import com.databricks.sdk.service.jobs.JobsService; +import com.databricks.sdk.service.jobs.Run; +import com.databricks.sdk.service.jobs.RunTask; +import java.util.ArrayList; +import java.util.Collection; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +public class JobsExtTest { + + @Test + public void testGetRunPaginationWithTasks() { + JobsService service = Mockito.mock(JobsService.class); + + Run firstPage = new Run().setNextPageToken("tokenToSecondPage"); + addTasks(firstPage, 0L, 1L); + Run secondPage = new Run().setNextPageToken("tokenToThirdPage"); + addTasks(secondPage, 2L, 3L); + Run thirdPage = new Run(); + addTasks(thirdPage, 4L); + + when(service.getRun(any())).thenReturn(firstPage).thenReturn(secondPage).thenReturn(thirdPage); + + JobsExt jobsExt = new JobsExt(service); + + GetRunRequest request = new GetRunRequest(); + + Run run = jobsExt.getRun(request); + + Run expectedRun = new Run(); + addTasks(expectedRun, 0L, 1L, 2L, 3L, 4L); + + assertEquals(expectedRun, run); + verify(service, times(3)).getRun(any()); + } + + @Test + public void testGetRunPaginationWithIterations() { + JobsService service = Mockito.mock(JobsService.class); + + Run firstPage = new Run().setNextPageToken("tokenToSecondPage"); + addIterations(firstPage, 0L, 1L); + Run secondPage = new Run().setNextPageToken("tokenToThirdPage"); + addIterations(secondPage, 2L, 3L); + Run thirdPage = new Run(); + addIterations(thirdPage, 4L); + + when(service.getRun(any())).thenReturn(firstPage).thenReturn(secondPage).thenReturn(thirdPage); + + JobsExt jobsExt = new JobsExt(service); + + GetRunRequest request = new GetRunRequest(); + + Run run = jobsExt.getRun(request); + + Run expectedRun = new Run(); + addIterations(expectedRun, 0L, 1L, 2L, 3L, 4L); + + assertEquals(expectedRun, run); + verify(service, times(3)).getRun(any()); + } + + private void addTasks(Run run, long... taskRunIds) { + Collection tasks = new ArrayList<>(); + for (long runId : taskRunIds) { + tasks.add(new RunTask().setRunId(runId)); + } + run.setTasks(tasks); + } + + private void addIterations(Run run, long... iterationRunIds) { + Collection iterations = new ArrayList<>(); + for (long runId : iterationRunIds) { + iterations.add(new RunTask().setRunId(runId)); + } + run.setIterations(iterations); + } +}