Skip to content

Commit

Permalink
getRun paginates tasks and iterations
Browse files Browse the repository at this point in the history
  • Loading branch information
gkiko10 committed Nov 8, 2024
1 parent 9b7ca5d commit f3c43f4
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 6 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package com.databricks.sdk.mixin;

import com.databricks.sdk.core.ApiClient;
import com.databricks.sdk.service.jobs.*;
import java.util.Collection;

public class JobsExt extends JobsAPI {

public JobsExt(ApiClient apiClient) {
super(apiClient);
}

public JobsExt(JobsService mock) {
super(mock);
}

/**
* Wrap the {@code JobsApi.getRun} operation to retrieve paginated content without breaking the
* response contract.
*
* <p>Depending on the Jobs API version used under the hood, tasks or iteration runs retrieved by
* the initial request may be truncated due to high cardinalities. Truncation can happen for job
* runs over 100 task runs, as well as ForEach task runs with over 100 iteration runs. To avoid
* returning an incomplete {@code Run} object to the user, this method performs all the requests
* required to collect all task/iteration runs into a single {@code Run} object.
*/
@Override
public Run getRun(GetRunRequest request) {
Run run = super.getRun(request);

/*
* fetch all additional pages (if any) and accumulate the result in a single response
*/

Collection<RunTask> iterations = run.getIterations();
boolean paginatingIterations = iterations != null && !iterations.isEmpty();

Run currRun = run;
while (currRun.getNextPageToken() != null) {
request.setPageToken(currRun.getNextPageToken());
currRun = super.getRun(request);
if (paginatingIterations) {
Collection<RunTask> newIterations = currRun.getIterations();
if (newIterations != null) {
run.getIterations().addAll(newIterations);
}
} else {
Collection<RunTask> newTasks = currRun.getTasks();
if (newTasks != null) {
run.getTasks().addAll(newTasks);
}
}
}

// now that we've added all pages to the Run, the tokens are useless
run.setNextPageToken(null);

return run;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package com.databricks.sdk.mixin;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;

import com.databricks.sdk.service.jobs.GetRunRequest;
import com.databricks.sdk.service.jobs.JobsService;
import com.databricks.sdk.service.jobs.Run;
import com.databricks.sdk.service.jobs.RunTask;
import java.util.ArrayList;
import java.util.Collection;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

public class JobsExtTest {

@Test
public void testGetRunPaginationWithTasks() {
JobsService service = Mockito.mock(JobsService.class);

Run firstPage = new Run().setNextPageToken("tokenToSecondPage");
addTasks(firstPage, 0L, 1L);
Run secondPage = new Run().setNextPageToken("tokenToThirdPage");
addTasks(secondPage, 2L, 3L);
Run thirdPage = new Run();
addTasks(thirdPage, 4L);

when(service.getRun(any())).thenReturn(firstPage).thenReturn(secondPage).thenReturn(thirdPage);

JobsExt jobsExt = new JobsExt(service);

GetRunRequest request = new GetRunRequest();

Run run = jobsExt.getRun(request);

Run expectedRun = new Run();
addTasks(expectedRun, 0L, 1L, 2L, 3L, 4L);

assertEquals(expectedRun, run);
verify(service, times(3)).getRun(any());
}

@Test
public void testGetRunPaginationWithIterations() {
JobsService service = Mockito.mock(JobsService.class);

Run firstPage = new Run().setNextPageToken("tokenToSecondPage");
addIterations(firstPage, 0L, 1L);
Run secondPage = new Run().setNextPageToken("tokenToThirdPage");
addIterations(secondPage, 2L, 3L);
Run thirdPage = new Run();
addIterations(thirdPage, 4L);

when(service.getRun(any())).thenReturn(firstPage).thenReturn(secondPage).thenReturn(thirdPage);

JobsExt jobsExt = new JobsExt(service);

GetRunRequest request = new GetRunRequest();

Run run = jobsExt.getRun(request);

Run expectedRun = new Run();
addIterations(expectedRun, 0L, 1L, 2L, 3L, 4L);

assertEquals(expectedRun, run);
verify(service, times(3)).getRun(any());
}

private void addTasks(Run run, long... taskRunIds) {
Collection<RunTask> tasks = new ArrayList<>();
for (long runId : taskRunIds) {
tasks.add(new RunTask().setRunId(runId));
}
run.setTasks(tasks);
}

private void addIterations(Run run, long... iterationRunIds) {
Collection<RunTask> iterations = new ArrayList<>();
for (long runId : iterationRunIds) {
iterations.add(new RunTask().setRunId(runId));
}
run.setIterations(iterations);
}
}

0 comments on commit f3c43f4

Please sign in to comment.