Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Internal] Update Jobs GetRun API to support paginated responses for jobs and ForEach tasks with >100 runs #1009

Closed
wants to merge 11 commits into from
30 changes: 30 additions & 0 deletions service/jobs/ext_api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package jobs

import "context"

func (a *JobsAPI) GetRun(ctx context.Context, request GetRunRequest) (*Run, error) {
renaudhartert-db marked this conversation as resolved.
Show resolved Hide resolved
run, err := a.jobsImpl.GetRun(ctx, request)
if err != nil {
return nil, err
}

isPaginatingIterations := run.Iterations != nil && len(run.Iterations) > 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For posterity, it would be good to document this behavior, as it isn't immediately obvious how this works from the spec itself. Something like:

When querying a Job run, a page token is returned when there are more than 100 tasks. No iterations are defined for a Job run. Therefore, the next page in the response only includes the next page of tasks.
When querying a ForEach task run, a page token is returned when there are more than 100 iterations. Only a single task is returned, corresponding to the ForEach task itself. Therefore, the client only reads the iterations from the next page and not the tasks.
For any other task type, iterations is empty and tasks only contains the one task, so no pagination is needed.


for len(run.NextPageToken) != 0 {
renaudhartert-db marked this conversation as resolved.
Show resolved Hide resolved
gkiko10 marked this conversation as resolved.
Show resolved Hide resolved
request.PageToken = run.NextPageToken
nextRun, err := a.jobsImpl.GetRun(ctx, request)
if err != nil {
return nil, err
}

if isPaginatingIterations {
run.Iterations = append(run.Iterations, nextRun.Iterations...)
} else {
run.Tasks = append(run.Tasks, nextRun.Tasks...)
}
run.NextPageToken = nextRun.NextPageToken
}

run.PrevPageToken = ""
return run, nil
}
217 changes: 217 additions & 0 deletions service/jobs/ext_api_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
package jobs

import (
"context"
"github.com/databricks/databricks-sdk-go/qa"
"testing"

"github.com/stretchr/testify/assert"
)

func commonFixtureWithStatusResponse() qa.HTTPFixtures {
gkiko10 marked this conversation as resolved.
Show resolved Hide resolved
return []qa.HTTPFixture{
{
Method: "GET",
ReuseRequest: true,
Resource: "/api/2.2/jobs/runs/get?run_id=514594995218126",
Response: Run{
Iterations: []RunTask{},
Tasks: []RunTask{
{
RunId: 123,
TaskKey: "task1",
},
{
RunId: 1234,
TaskKey: "task2",
},
},
NextPageToken: "",
},
},
{
Method: "GET",
ReuseRequest: true,
Resource: "/api/2.2/jobs/runs/get?run_id=111222333",
Response: Run{
Iterations: []RunTask{},
Tasks: []RunTask{
{
RunId: 123,
},
{
RunId: 1234,
},
},
JobClusters: []JobCluster{
{
JobClusterKey: "cluster1",
},
{
JobClusterKey: "cluster2",
},
},
NextPageToken: "token1",
},
},
{
Method: "GET",
ReuseRequest: true,
Resource: "/api/2.2/jobs/runs/get?page_token=token1&run_id=111222333",
Response: Run{
Iterations: []RunTask{},
Tasks: []RunTask{
{
RunId: 222,
},
{
RunId: 333,
},
},
JobClusters: []JobCluster{
{
JobClusterKey: "cluster1",
},
{
JobClusterKey: "cluster2",
},
},
PrevPageToken: "token1-reverse",
},
},
{
Method: "GET",
ReuseRequest: true,
Resource: "/api/2.2/jobs/runs/get?run_id=4444",
Response: Run{
Iterations: []RunTask{
{
RunId: 123,
},
{
RunId: 1234,
},
},
Tasks: []RunTask{
{
RunId: 999,
},
},
NextPageToken: "token1",
},
},
{
Method: "GET",
ReuseRequest: true,
Resource: "/api/2.2/jobs/runs/get?page_token=token1&run_id=4444",
Response: Run{
Iterations: []RunTask{
{
RunId: 222,
},
{
RunId: 333,
},
},
Tasks: []RunTask{
{
RunId: 999,
},
},
PrevPageToken: "token1-reverse",
},
},
}
}

func TestGetRun(t *testing.T) {
ctx := context.Background()

t.Run("successful run with no pagination", func(t *testing.T) {
client, server := commonFixtureWithStatusResponse().Client(t)
defer server.Close()

mockJobsImpl := &jobsImpl{
client: client,
}
api := &JobsAPI{jobsImpl: *mockJobsImpl}

request := GetRunRequest{RunId: 514594995218126}
run, err := api.GetRun(ctx, request)

assert.NoError(t, err)
assert.EqualValues(t, 123, run.Tasks[0].RunId)
gkiko10 marked this conversation as resolved.
Show resolved Hide resolved
assert.EqualValues(t, 1234, run.Tasks[1].RunId)
})

t.Run("successful run with two tasks pages", func(t *testing.T) {
client, server := commonFixtureWithStatusResponse().Client(t)
defer server.Close()

mockJobsImpl := &jobsImpl{
client: client,
}
api := &JobsAPI{jobsImpl: *mockJobsImpl}

request := GetRunRequest{RunId: 111222333}
run, err := api.GetRun(ctx, request)

assert.NoError(t, err)
assert.Equal(t, 4, len(run.Tasks))
assert.Empty(t, run.Iterations)
assert.Empty(t, run.NextPageToken)
assert.Empty(t, run.PrevPageToken)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not now, but longer-term, we may want to hide these fields from the SDK in general. cc @hectorcast-db @renaudhartert-db

expected := []RunTask{
{RunId: 123, ForceSendFields: []string{"RunId", "TaskKey"}},
{RunId: 1234, ForceSendFields: []string{"RunId", "TaskKey"}},
{RunId: 222, ForceSendFields: []string{"RunId", "TaskKey"}},
{RunId: 333, ForceSendFields: []string{"RunId", "TaskKey"}},
}
assert.Equal(t, expected, run.Tasks)
})

t.Run("successful run with two iterations pages", func(t *testing.T) {
client, server := commonFixtureWithStatusResponse().Client(t)
defer server.Close()

mockJobsImpl := &jobsImpl{
client: client,
}
api := &JobsAPI{jobsImpl: *mockJobsImpl}

request := GetRunRequest{RunId: 4444}
run, err := api.GetRun(ctx, request)

assert.NoError(t, err)
assert.Equal(t, 4, len(run.Iterations))
assert.Equal(t, 1, len(run.Tasks))
assert.Empty(t, run.NextPageToken)
assert.Empty(t, run.PrevPageToken)
expected := []RunTask{
{RunId: 123, ForceSendFields: []string{"RunId", "TaskKey"}},
{RunId: 1234, ForceSendFields: []string{"RunId", "TaskKey"}},
{RunId: 222, ForceSendFields: []string{"RunId", "TaskKey"}},
{RunId: 333, ForceSendFields: []string{"RunId", "TaskKey"}},
}
assert.Equal(t, expected, run.Iterations)
assert.EqualValues(t, 999, run.Tasks[0].RunId)
})

t.Run("clusters array is not increased when paginated", func(t *testing.T) {
client, server := commonFixtureWithStatusResponse().Client(t)
defer server.Close()

mockJobsImpl := &jobsImpl{
client: client,
}
api := &JobsAPI{jobsImpl: *mockJobsImpl}

request := GetRunRequest{RunId: 111222333}
run, err := api.GetRun(ctx, request)

assert.NoError(t, err)
assert.Equal(t, 2, len(run.JobClusters))
assert.Equal(t, "cluster1", run.JobClusters[0].JobClusterKey)
assert.Equal(t, "cluster2", run.JobClusters[1].JobClusterKey)
})
}
Loading