Skip to content

Commit

Permalink
Move compute package utilities
Browse files Browse the repository at this point in the history
  • Loading branch information
parthban-db committed Jan 16, 2025
1 parent 6d78d03 commit a623694
Show file tree
Hide file tree
Showing 13 changed files with 1,019 additions and 2 deletions.
8 changes: 6 additions & 2 deletions compute/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,32 @@ replace github.com/databricks/databricks-sdk-go/databricks => ../databricks

require (
github.com/databricks/databricks-sdk-go/databricks v0.0.0-00010101000000-000000000000
golang.org/x/oauth2 v0.25.0
github.com/stretchr/testify v1.10.0
golang.org/x/mod v0.22.0
)

require (
cloud.google.com/go/auth v0.13.0 // indirect
cloud.google.com/go/auth/oauth2adapt v0.2.6 // indirect
cloud.google.com/go/compute/metadata v0.6.0 // indirect
github.com/databricks/databricks-sdk-go v0.55.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/google/go-querystring v1.1.0 // indirect
github.com/google/s2a-go v0.1.8 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect
github.com/googleapis/gax-go/v2 v2.14.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 // indirect
go.opentelemetry.io/otel v1.31.0 // indirect
go.opentelemetry.io/otel/metric v1.31.0 // indirect
go.opentelemetry.io/otel/trace v1.31.0 // indirect
golang.org/x/crypto v0.31.0 // indirect
golang.org/x/exp v0.0.0-20250106191152-7588d65b2ba8 // indirect
golang.org/x/mod v0.22.0 // indirect
golang.org/x/net v0.33.0 // indirect
golang.org/x/oauth2 v0.25.0 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/text v0.21.0 // indirect
golang.org/x/time v0.9.0 // indirect
Expand All @@ -37,4 +40,5 @@ require (
google.golang.org/grpc v1.69.2 // indirect
google.golang.org/protobuf v1.36.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
2 changes: 2 additions & 0 deletions compute/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ google.golang.org/grpc v1.69.2 h1:U3S9QEtbXC0bYNvRtcoklF3xGtLViumSYxWykJS+7AU=
google.golang.org/grpc v1.69.2/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4=
google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk=
google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA=
gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
Expand Down
46 changes: 46 additions & 0 deletions compute/v2/ext_commands.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// TODO : Add the missing methods and implement the methods
// This file has not been completely shifted from the SDK-Beta
// as we still don't have the wait for state methods in the SDK-mod
package compute

import (
"context"
)

type CommandExecutorV2 struct {
clustersAPI *ClustersAPI
executionAPI *CommandExecutionAPI
language Language
clusterID string
contextID string
}

type commandExecutionAPIUtilities interface {
Start(ctx context.Context, clusterID string, language Language) (*CommandExecutorV2, error)
}

// Start the command execution context on a cluster and ensure it transitions to a running state
func (c *CommandExecutorV2) Destroy(ctx context.Context) error {
return c.executionAPI.Destroy(ctx, DestroyContext{
ClusterId: c.clusterID,
ContextId: c.contextID,
})
}

// CommandExecutor creates a spark context and executes a command and then closes context
type CommandExecutor interface {
Execute(ctx context.Context, clusterID, language, commandStr string) Results
}

// CommandMock mocks the execution of command
type CommandMock func(commandStr string) Results

func (m CommandMock) Execute(_ context.Context, _, _, commandStr string) Results {
return m(commandStr)
}

// CommandsHighLevelAPI exposes more friendly wrapper over command execution
type CommandsHighLevelAPI struct {
clusters *ClustersAPI
execution *CommandExecutionAPI
}
36 changes: 36 additions & 0 deletions compute/v2/ext_leading_whitespace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package compute

import (
"strings"
)

// TrimLeadingWhitespace removes leading whitespace, so that Python code blocks
// that are embedded into Go code still could be interpreted properly.
func TrimLeadingWhitespace(commandStr string) (newCommand string) {
lines := strings.Split(strings.ReplaceAll(commandStr, "\t", " "), "\n")
leadingWhitespace := 1<<31 - 1
for _, line := range lines {
for pos, char := range line {
if char == ' ' || char == '\t' {
continue
}
// first non-whitespace character
if pos < leadingWhitespace {
leadingWhitespace = pos
}
// is not needed further
break
}
}
for i := 0; i < len(lines); i++ {
if lines[i] == "" || strings.Trim(lines[i], " \t") == "" {
continue
}
if len(lines[i]) < leadingWhitespace {
newCommand += lines[i] + "\n" // or not..
} else {
newCommand += lines[i][leadingWhitespace:] + "\n"
}
}
return
}
16 changes: 16 additions & 0 deletions compute/v2/ext_leading_whitespace_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package compute

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestTrimLeadingWhitespace(t *testing.T) {
assert.Equal(t, "foo\nbar\n", TrimLeadingWhitespace(`
foo
bar
`))
}
243 changes: 243 additions & 0 deletions compute/v2/ext_library_utilities.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
package compute

import (
"context"
"fmt"
"sort"
"strings"
"time"

"github.com/databricks/databricks-sdk-go/databricks/apierr"
"github.com/databricks/databricks-sdk-go/databricks/log"
"github.com/databricks/databricks-sdk-go/databricks/retries"
"github.com/databricks/databricks-sdk-go/databricks/useragent"
)

type Wait struct {
ClusterID string
Libraries []Library
IsRunning bool
IsRefresh bool
}

func (library Library) String() string {
if library.Whl != "" {
return fmt.Sprintf("whl:%s", library.Whl)
}
if library.Jar != "" {
return fmt.Sprintf("jar:%s", library.Jar)
}
if library.Pypi != nil && library.Pypi.Package != "" {
return fmt.Sprintf("pypi:%s%s", library.Pypi.Repo, library.Pypi.Package)
}
if library.Maven != nil && library.Maven.Coordinates != "" {
mvn := library.Maven
return fmt.Sprintf("mvn:%s%s%s", mvn.Repo, mvn.Coordinates,
strings.Join(mvn.Exclusions, ""))
}
if library.Egg != "" {
return fmt.Sprintf("egg:%s", library.Egg)
}
if library.Cran != nil && library.Cran.Package != "" {
return fmt.Sprintf("cran:%s%s", library.Cran.Repo, library.Cran.Package)
}
return "unknown"
}

func (cll *InstallLibraries) Sort() {
sort.Slice(cll.Libraries, func(i, j int) bool {
return cll.Libraries[i].String() < cll.Libraries[j].String()
})
}

// ToLibraryList convert to envity for convenient comparison
func (cls ClusterLibraryStatuses) ToLibraryList() InstallLibraries {
cll := InstallLibraries{ClusterId: cls.ClusterId}
for _, lib := range cls.LibraryStatuses {
cll.Libraries = append(cll.Libraries, *lib.Library)
}
cll.Sort()
return cll
}

func (w *Wait) IsNotInScope(lib *Library) bool {
// if we don't know concrete libraries
if len(w.Libraries) == 0 {
return false
}
// if we know concrete libraries
for _, v := range w.Libraries {
if v.String() == lib.String() {
return false
}
}
return true
}

// IsRetryNeeded returns first bool if there needs to be retry.
// If there needs to be retry, error message will explain why.
// If retry does not need to happen and error is not nil - it failed.
func (cls ClusterLibraryStatuses) IsRetryNeeded(w Wait) (bool, error) {
pending := 0
ready := 0
errors := []string{}
for _, lib := range cls.LibraryStatuses {
if lib.IsLibraryForAllClusters {
continue
}
if w.IsNotInScope(lib.Library) {
continue
}
switch lib.Status {
// No action has yet been taken to install the library. This state should be very short lived.
case "PENDING":
pending++
// Metadata necessary to install the library is being retrieved from the provided repository.
case "RESOLVING":
pending++
// The library is actively being installed, either by adding resources to Spark
// or executing system commands inside the Spark nodes.
case "INSTALLING":
pending++
// The library has been successfully installed.
case "INSTALLED":
ready++
// Installation on a Databricks Runtime 7.0 or above cluster was skipped due to Scala version incompatibility.
case "SKIPPED":
ready++
// The library has been marked for removal. Libraries can be removed only when clusters are restarted.
case "UNINSTALL_ON_RESTART":
ready++
//Some step in installation failed. More information can be found in the messages field.
case "FAILED":
if w.IsRefresh {
// we're reading library list on a running cluster and some of the libs failed to install
continue
}
errors = append(errors, fmt.Sprintf("%s failed: %s", lib.Library, strings.Join(lib.Messages, ", ")))
continue
}
}
if pending > 0 {
return true, fmt.Errorf("%d libraries are ready, but there are still %d pending", ready, pending)
}
if len(errors) > 0 {
return false, fmt.Errorf("%s", strings.Join(errors, "\n"))
}
return false, nil
}

type Update struct {
ClusterId string
// The libraries to install.
Install []Library
// The libraries to install.
Uninstall []Library
}

type librariesAPIUtilities interface {
UpdateAndWait(ctx context.Context, update Update, options ...retries.Option[ClusterLibraryStatuses]) error
}

func (a *LibrariesAPI) UpdateAndWait(ctx context.Context, update Update,
options ...retries.Option[ClusterLibraryStatuses]) error {
ctx = useragent.InContext(ctx, "sdk-feature", "update-libraries")
if len(update.Uninstall) > 0 {
err := a.Uninstall(ctx, UninstallLibraries{
ClusterId: update.ClusterId,
Libraries: update.Uninstall,
})
if err != nil {
return fmt.Errorf("uninstall: %w", err)
}
}
if len(update.Install) > 0 {
err := a.Install(ctx, InstallLibraries{
ClusterId: update.ClusterId,
Libraries: update.Install,
})
if err != nil {
return fmt.Errorf("install: %w", err)
}
}
// this helps to avoid erroring out when out-of-list library gets added to
// the cluster manually and thereforce fails the wait on error
scope := make([]Library, len(update.Install)+len(update.Uninstall))
scope = append(scope, update.Install...)
scope = append(scope, update.Uninstall...)
_, err := a.Wait(ctx, Wait{
ClusterID: update.ClusterId,
Libraries: scope,
IsRunning: true,
IsRefresh: false,
}, options...)
return err
}

// clusterID string, timeout time.Duration, isActive bool, refresh bool
func (a *LibrariesAPI) Wait(ctx context.Context, wait Wait,
options ...retries.Option[ClusterLibraryStatuses]) (*ClusterLibraryStatuses, error) {
ctx = useragent.InContext(ctx, "sdk-feature", "wait-for-libraries")
i := retries.Info[ClusterLibraryStatuses]{Timeout: 30 * time.Minute}
for _, o := range options {
o(&i)
}
result, err := retries.Poll(ctx, i.Timeout, func() (*ClusterLibraryStatuses, *retries.Err) {
status, err := a.ClusterStatusByClusterId(ctx, wait.ClusterID)
if apierr.IsMissing(err) {
// eventual consistency error
return nil, retries.Continue(err)
}
for _, o := range options {
o(&retries.Info[ClusterLibraryStatuses]{
Timeout: i.Timeout,
Info: status,
})
}
if err != nil {
return nil, retries.Halt(err)
}
if !wait.IsRunning {
log.InfoContext(ctx, "Cluster %s is currently not running, so just returning list of %d libraries",
wait.ClusterID, len(status.LibraryStatuses))
return status, nil
}
retry, err := status.IsRetryNeeded(wait)
if retry {
return status, retries.Continue(err)
}
if err != nil {
return status, retries.Halt(err)
}
return status, nil
})
if err != nil {
return nil, err
}
if wait.IsRunning {
installed := []LibraryFullStatus{}
cleanup := UninstallLibraries{
ClusterId: wait.ClusterID,
Libraries: []Library{},
}
// cleanup libraries that failed to install
for _, v := range result.LibraryStatuses {
if v.Status == "FAILED" {
log.WarningContext(ctx, "Removing failed library %s from %s",
v.Library, wait.ClusterID)
cleanup.Libraries = append(cleanup.Libraries, *v.Library)
continue
}
installed = append(installed, v)
}
// and result contains only the libraries that were successfully installed
result.LibraryStatuses = installed
if len(cleanup.Libraries) > 0 {
err = a.Uninstall(ctx, cleanup)
if err != nil {
return nil, fmt.Errorf("cannot cleanup libraries: %w", err)
}
}
}
return result, nil
}
Loading

0 comments on commit a623694

Please sign in to comment.