From ace7cc44f0da97d3c00b0470687663bed07fc9e1 Mon Sep 17 00:00:00 2001 From: Rueian Date: Wed, 27 Nov 2024 14:36:36 -0800 Subject: [PATCH 1/3] [RayJob][Feature] add light weight job submitter in kuberay image Signed-off-by: Rueian --- ray-operator/Dockerfile | 3 + ray-operator/go.mod | 1 + ray-operator/go.sum | 2 + ray-operator/rayjob-submitter/main.go | 151 ++++++++++++++++++++++++++ 4 files changed, 157 insertions(+) create mode 100644 ray-operator/rayjob-submitter/main.go diff --git a/ray-operator/Dockerfile b/ray-operator/Dockerfile index 932e24dde7..45c1ed0213 100644 --- a/ray-operator/Dockerfile +++ b/ray-operator/Dockerfile @@ -15,14 +15,17 @@ COPY apis/ apis/ COPY controllers/ controllers/ COPY pkg/features pkg/features COPY pkg/utils pkg/utils +COPY rayjob-submitter/ rayjob-submitter/ # Build USER root RUN CGO_ENABLED=1 GOOS=linux go build -tags strictfipsruntime -a -o manager main.go +RUN CGO_ENABLED=1 GOOS=linux go build -tags strictfipsruntime -a -o submitter rayjob-submitter/main.go FROM gcr.io/distroless/base-debian12:nonroot WORKDIR / COPY --from=builder /workspace/manager . +COPY --from=builder /workspace/submitter . USER 65532:65532 ENTRYPOINT ["/manager"] diff --git a/ray-operator/go.mod b/ray-operator/go.mod index b41b5f3cdc..2b500ca32c 100644 --- a/ray-operator/go.mod +++ b/ray-operator/go.mod @@ -42,6 +42,7 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/blang/semver/v4 v4.0.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/coder/websocket v1.8.12 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/emicklei/go-restful/v3 v3.12.1 // indirect github.com/evanphx/json-patch v5.9.0+incompatible // indirect diff --git a/ray-operator/go.sum b/ray-operator/go.sum index e5f311d24a..938bcb4699 100644 --- a/ray-operator/go.sum +++ b/ray-operator/go.sum @@ -8,6 +8,8 @@ github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/coder/websocket v1.8.12 h1:5bUXkEPPIbewrnkU8LTCLVaxi4N4J8ahufH2vlo4NAo= +github.com/coder/websocket v1.8.12/go.mod h1:LNVeNrXQZfe5qhS9ALED3uA+l5pPqvwXg3CKoDBB2gs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/ray-operator/rayjob-submitter/main.go b/ray-operator/rayjob-submitter/main.go new file mode 100644 index 0000000000..f1c2e8b2c5 --- /dev/null +++ b/ray-operator/rayjob-submitter/main.go @@ -0,0 +1,151 @@ +package main + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "os" + "strings" + + "github.com/coder/websocket" + flag "github.com/spf13/pflag" + + "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" +) + +func submitJobReq(address string, request utils.RayJobRequest) (jobId string, err error) { + rayJobJson, err := json.Marshal(request) + if err != nil { + return "", err + } + + req, err := http.NewRequestWithContext(context.Background(), http.MethodPost, address, bytes.NewBuffer(rayJobJson)) + if err != nil { + return "", err + } + req.Header.Set("Content-Type", "application/json") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return "", err + } + defer resp.Body.Close() + + body, _ := io.ReadAll(resp.Body) + + if resp.StatusCode == http.StatusBadRequest { // ignore the duplicated submission error + if strings.Contains(string(body), "Please use a different submission_id") { + return request.SubmissionId, nil + } + } + + if resp.StatusCode < 200 || resp.StatusCode > 299 { + return "", fmt.Errorf("SubmitJob fail: %s %s", resp.Status, string(body)) + } + + return request.SubmissionId, nil +} + +func jobSubmissionURL(address string) string { + if !strings.HasPrefix(address, "http://") { + address = "http://" + address + } + address, err := url.JoinPath(address, "/api/jobs/") // the tailing "/" is required. + if err != nil { + panic(err) + } + return address +} + +func logTailingURL(address, submissionId string) string { + address = strings.Replace(address, "http", "ws", 1) + address, err := url.JoinPath(address, submissionId, "/logs/tail") + if err != nil { + panic(err) + } + return address +} + +func Submit(address string, req utils.RayJobRequest, out io.Writer) { + fmt.Fprintf(out, "INFO -- Job submission server address: %s\n", address) + + address = jobSubmissionURL(address) + submissionId, err := submitJobReq(address, req) + if err != nil { + panic(err) + } + + fmt.Fprintf(out, "SUCC -- Job '%s' submitted successfully\n", submissionId) + fmt.Fprintf(out, "INFO -- Tailing logs until the job exits (disable with --no-wait):\n") + + wsAddr := logTailingURL(address, submissionId) + c, _, err := websocket.Dial(context.Background(), wsAddr, nil) + if err != nil { + panic(err) + } + defer func() { _ = c.CloseNow() }() + for { + _, msg, err := c.Read(context.Background()) + if err != nil { + if websocket.CloseStatus(err) == websocket.StatusNormalClosure { + fmt.Fprintf(out, "SUCC -- Job '%s' succeeded\n", submissionId) + return + } + panic(err) + } + _, _ = out.Write(msg) + } +} + +func main() { + var ( + runtimeEnvJson string + metadataJson string + entrypointResources string + entrypointNumCpus float32 + entrypointNumGpus float32 + ) + + flag.StringVar(&runtimeEnvJson, "runtime-env-json", "", "") + flag.StringVar(&metadataJson, "metadata-json", "", "") + flag.StringVar(&entrypointResources, "entrypoint-resources", "", "") + flag.Float32Var(&entrypointNumCpus, "entrypoint-num-cpus", 0.0, "") + flag.Float32Var(&entrypointNumGpus, "entrypoint-num-gpus", 0.0, "") + flag.Parse() + + address := os.Getenv("RAY_DASHBOARD_ADDRESS") + if address == "" { + panic("Missing RAY_DASHBOARD_ADDRESS") + } + submissionId := os.Getenv("RAY_JOB_SUBMISSION_ID") + if submissionId == "" { + panic("Missing RAY_JOB_SUBMISSION_ID") + } + + req := utils.RayJobRequest{ + Entrypoint: strings.Join(flag.Args(), " "), + SubmissionId: submissionId, + NumCpus: entrypointNumCpus, + NumGpus: entrypointNumGpus, + } + if len(runtimeEnvJson) > 0 { + if err := json.Unmarshal([]byte(runtimeEnvJson), &req.RuntimeEnv); err != nil { + panic(err) + } + } + if len(metadataJson) > 0 { + if err := json.Unmarshal([]byte(metadataJson), &req.Metadata); err != nil { + panic(err) + } + } + if len(entrypointResources) > 0 { + if err := json.Unmarshal([]byte(entrypointResources), &req.Resources); err != nil { + panic(err) + } + } + Submit(address, req, os.Stdout) +} From a43cc3a4095870c301afc09d16888e8d251c790e Mon Sep 17 00:00:00 2001 From: Rueian Date: Fri, 29 Nov 2024 23:14:04 -0800 Subject: [PATCH 2/3] [RayJob][Feature] test light weight job submitter in kuberay image Signed-off-by: Rueian --- .../e2e-tests-ray-job-submitter.yaml | 47 ++++++ ray-operator/Dockerfile | 4 +- ray-operator/Makefile | 4 + ray-operator/rayjob-submitter/main.go | 151 ------------------ ray-operator/rayjobsubmitter/README.md | 21 +++ ray-operator/rayjobsubmitter/cmd/main.go | 61 +++++++ ray-operator/rayjobsubmitter/main.go | 98 ++++++++++++ .../test/e2erayjobsubmitter/e2e_test.go | 121 ++++++++++++++ 8 files changed, 354 insertions(+), 153 deletions(-) create mode 100644 .github/workflows/e2e-tests-ray-job-submitter.yaml delete mode 100644 ray-operator/rayjob-submitter/main.go create mode 100644 ray-operator/rayjobsubmitter/README.md create mode 100644 ray-operator/rayjobsubmitter/cmd/main.go create mode 100644 ray-operator/rayjobsubmitter/main.go create mode 100644 ray-operator/test/e2erayjobsubmitter/e2e_test.go diff --git a/.github/workflows/e2e-tests-ray-job-submitter.yaml b/.github/workflows/e2e-tests-ray-job-submitter.yaml new file mode 100644 index 0000000000..9007dea46f --- /dev/null +++ b/.github/workflows/e2e-tests-ray-job-submitter.yaml @@ -0,0 +1,47 @@ +name: e2e-ray-job-submitter + +on: + pull_request: + branches: + - master + - 'release-*' + push: + branches: + - master + - 'release-*' + +concurrency: + group: ${{ github.head_ref }}-${{ github.workflow }} + cancel-in-progress: true + +jobs: + ray-job-submitter: + runs-on: ubuntu-20.04 + strategy: + fail-fast: false + matrix: + ray-version: [ '2.39.0' ] + go-version: [ '1.22.0' ] + steps: + - name: Checkout code + uses: actions/checkout@v3 + with: + submodules: recursive + + - name: Set up Go + uses: actions/setup-go@v3 + with: + go-version: ${{ matrix.go-version }} + + - name: Setup Python + uses: actions/setup-python@v4 + with: + python-version: '3.x' + + - name: Install Ray + run: pip install ray[default]==${{ matrix.ray-version }} + + - name: Run e2e tests + run: | + cd ray-operator + go test -timeout 30m -v ./test/e2erayjobsubmitter diff --git a/ray-operator/Dockerfile b/ray-operator/Dockerfile index 45c1ed0213..4ef41d830e 100644 --- a/ray-operator/Dockerfile +++ b/ray-operator/Dockerfile @@ -15,12 +15,12 @@ COPY apis/ apis/ COPY controllers/ controllers/ COPY pkg/features pkg/features COPY pkg/utils pkg/utils -COPY rayjob-submitter/ rayjob-submitter/ +COPY rayjobsubmitter/ rayjobsubmitter/ # Build USER root RUN CGO_ENABLED=1 GOOS=linux go build -tags strictfipsruntime -a -o manager main.go -RUN CGO_ENABLED=1 GOOS=linux go build -tags strictfipsruntime -a -o submitter rayjob-submitter/main.go +RUN CGO_ENABLED=1 GOOS=linux go build -tags strictfipsruntime -a -o submitter ./rayjobsubmitter/cmd/main.go FROM gcr.io/distroless/base-debian12:nonroot WORKDIR / diff --git a/ray-operator/Makefile b/ray-operator/Makefile index 2ca043467d..6144e2b71b 100644 --- a/ray-operator/Makefile +++ b/ray-operator/Makefile @@ -77,6 +77,10 @@ test-sampleyaml: WHAT ?= ./test/sampleyaml test-sampleyaml: manifests fmt vet go test -timeout 30m -v $(WHAT) +test-e2erayjobsubmitter: WHAT ?= ./test/e2erayjobsubmitter +test-e2erayjobsubmitter: fmt vet + go test -timeout 30m -v $(WHAT) + sync: helm api-docs ./hack/update-codegen.sh diff --git a/ray-operator/rayjob-submitter/main.go b/ray-operator/rayjob-submitter/main.go deleted file mode 100644 index f1c2e8b2c5..0000000000 --- a/ray-operator/rayjob-submitter/main.go +++ /dev/null @@ -1,151 +0,0 @@ -package main - -import ( - "bytes" - "context" - "encoding/json" - "fmt" - "io" - "net/http" - "net/url" - "os" - "strings" - - "github.com/coder/websocket" - flag "github.com/spf13/pflag" - - "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" -) - -func submitJobReq(address string, request utils.RayJobRequest) (jobId string, err error) { - rayJobJson, err := json.Marshal(request) - if err != nil { - return "", err - } - - req, err := http.NewRequestWithContext(context.Background(), http.MethodPost, address, bytes.NewBuffer(rayJobJson)) - if err != nil { - return "", err - } - req.Header.Set("Content-Type", "application/json") - - resp, err := http.DefaultClient.Do(req) - if err != nil { - return "", err - } - defer resp.Body.Close() - - body, _ := io.ReadAll(resp.Body) - - if resp.StatusCode == http.StatusBadRequest { // ignore the duplicated submission error - if strings.Contains(string(body), "Please use a different submission_id") { - return request.SubmissionId, nil - } - } - - if resp.StatusCode < 200 || resp.StatusCode > 299 { - return "", fmt.Errorf("SubmitJob fail: %s %s", resp.Status, string(body)) - } - - return request.SubmissionId, nil -} - -func jobSubmissionURL(address string) string { - if !strings.HasPrefix(address, "http://") { - address = "http://" + address - } - address, err := url.JoinPath(address, "/api/jobs/") // the tailing "/" is required. - if err != nil { - panic(err) - } - return address -} - -func logTailingURL(address, submissionId string) string { - address = strings.Replace(address, "http", "ws", 1) - address, err := url.JoinPath(address, submissionId, "/logs/tail") - if err != nil { - panic(err) - } - return address -} - -func Submit(address string, req utils.RayJobRequest, out io.Writer) { - fmt.Fprintf(out, "INFO -- Job submission server address: %s\n", address) - - address = jobSubmissionURL(address) - submissionId, err := submitJobReq(address, req) - if err != nil { - panic(err) - } - - fmt.Fprintf(out, "SUCC -- Job '%s' submitted successfully\n", submissionId) - fmt.Fprintf(out, "INFO -- Tailing logs until the job exits (disable with --no-wait):\n") - - wsAddr := logTailingURL(address, submissionId) - c, _, err := websocket.Dial(context.Background(), wsAddr, nil) - if err != nil { - panic(err) - } - defer func() { _ = c.CloseNow() }() - for { - _, msg, err := c.Read(context.Background()) - if err != nil { - if websocket.CloseStatus(err) == websocket.StatusNormalClosure { - fmt.Fprintf(out, "SUCC -- Job '%s' succeeded\n", submissionId) - return - } - panic(err) - } - _, _ = out.Write(msg) - } -} - -func main() { - var ( - runtimeEnvJson string - metadataJson string - entrypointResources string - entrypointNumCpus float32 - entrypointNumGpus float32 - ) - - flag.StringVar(&runtimeEnvJson, "runtime-env-json", "", "") - flag.StringVar(&metadataJson, "metadata-json", "", "") - flag.StringVar(&entrypointResources, "entrypoint-resources", "", "") - flag.Float32Var(&entrypointNumCpus, "entrypoint-num-cpus", 0.0, "") - flag.Float32Var(&entrypointNumGpus, "entrypoint-num-gpus", 0.0, "") - flag.Parse() - - address := os.Getenv("RAY_DASHBOARD_ADDRESS") - if address == "" { - panic("Missing RAY_DASHBOARD_ADDRESS") - } - submissionId := os.Getenv("RAY_JOB_SUBMISSION_ID") - if submissionId == "" { - panic("Missing RAY_JOB_SUBMISSION_ID") - } - - req := utils.RayJobRequest{ - Entrypoint: strings.Join(flag.Args(), " "), - SubmissionId: submissionId, - NumCpus: entrypointNumCpus, - NumGpus: entrypointNumGpus, - } - if len(runtimeEnvJson) > 0 { - if err := json.Unmarshal([]byte(runtimeEnvJson), &req.RuntimeEnv); err != nil { - panic(err) - } - } - if len(metadataJson) > 0 { - if err := json.Unmarshal([]byte(metadataJson), &req.Metadata); err != nil { - panic(err) - } - } - if len(entrypointResources) > 0 { - if err := json.Unmarshal([]byte(entrypointResources), &req.Resources); err != nil { - panic(err) - } - } - Submit(address, req, os.Stdout) -} diff --git a/ray-operator/rayjobsubmitter/README.md b/ray-operator/rayjobsubmitter/README.md new file mode 100644 index 0000000000..12fc4bef95 --- /dev/null +++ b/ray-operator/rayjobsubmitter/README.md @@ -0,0 +1,21 @@ +# Ray Job Submitter + +This is a Go Ray Job Submitter for KubeRay to submit a Ray Job +and tail its logs without installing Ray which is very large. + +Note that this tool is designed specifically for KubeRay and +will not support some `ray job submit` features that people +don't use with KubeRay, for example, uploading local files to +a Ray cluster will not be supported by this tool. + +## Testing + +Tests are located at [../test/e2erayjobsubmitter](../test/e2erayjobsubmitter). + +As the e2e suggests, you need to have `ray` installed for these tests +because they need to start a real Ray Head. You can run the tests with: + +```sh +make test-e2erayjobsubmitter +``` +or GitHub Action: [../../.github/workflows/e2e-tests-ray-job-submitter.yaml](../../.github/workflows/e2e-tests-ray-job-submitter.yaml) diff --git a/ray-operator/rayjobsubmitter/cmd/main.go b/ray-operator/rayjobsubmitter/cmd/main.go new file mode 100644 index 0000000000..d0f7d6cad2 --- /dev/null +++ b/ray-operator/rayjobsubmitter/cmd/main.go @@ -0,0 +1,61 @@ +package main + +import ( + "encoding/json" + "os" + "strings" + + flag "github.com/spf13/pflag" + + "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" + "github.com/ray-project/kuberay/ray-operator/rayjobsubmitter" +) + +func main() { + var ( + runtimeEnvJson string + metadataJson string + entrypointResources string + entrypointNumCpus float32 + entrypointNumGpus float32 + ) + + flag.StringVar(&runtimeEnvJson, "runtime-env-json", "", "") + flag.StringVar(&metadataJson, "metadata-json", "", "") + flag.StringVar(&entrypointResources, "entrypoint-resources", "", "") + flag.Float32Var(&entrypointNumCpus, "entrypoint-num-cpus", 0.0, "") + flag.Float32Var(&entrypointNumGpus, "entrypoint-num-gpus", 0.0, "") + flag.Parse() + + address := os.Getenv("RAY_DASHBOARD_ADDRESS") + if address == "" { + panic("Missing RAY_DASHBOARD_ADDRESS") + } + submissionId := os.Getenv("RAY_JOB_SUBMISSION_ID") + if submissionId == "" { + panic("Missing RAY_JOB_SUBMISSION_ID") + } + + req := utils.RayJobRequest{ + Entrypoint: strings.Join(flag.Args(), " "), + SubmissionId: submissionId, + NumCpus: entrypointNumCpus, + NumGpus: entrypointNumGpus, + } + if len(runtimeEnvJson) > 0 { + if err := json.Unmarshal([]byte(runtimeEnvJson), &req.RuntimeEnv); err != nil { + panic(err) + } + } + if len(metadataJson) > 0 { + if err := json.Unmarshal([]byte(metadataJson), &req.Metadata); err != nil { + panic(err) + } + } + if len(entrypointResources) > 0 { + if err := json.Unmarshal([]byte(entrypointResources), &req.Resources); err != nil { + panic(err) + } + } + rayjobsubmitter.Submit(address, req, os.Stdout) +} diff --git a/ray-operator/rayjobsubmitter/main.go b/ray-operator/rayjobsubmitter/main.go new file mode 100644 index 0000000000..09eabe0853 --- /dev/null +++ b/ray-operator/rayjobsubmitter/main.go @@ -0,0 +1,98 @@ +package rayjobsubmitter + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "strings" + + "github.com/coder/websocket" + + "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" +) + +func submitJobReq(address string, request utils.RayJobRequest) (jobId string, err error) { + rayJobJson, err := json.Marshal(request) + if err != nil { + return "", err + } + + req, err := http.NewRequestWithContext(context.Background(), http.MethodPost, address, bytes.NewBuffer(rayJobJson)) + if err != nil { + return "", err + } + req.Header.Set("Content-Type", "application/json") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return "", err + } + defer func() { _ = resp.Body.Close() }() + + body, _ := io.ReadAll(resp.Body) + + if strings.Contains(string(body), "Please use a different submission_id") { + return request.SubmissionId, nil + } + + if resp.StatusCode < 200 || resp.StatusCode > 299 { + return "", fmt.Errorf("SubmitJob fail: %s %s", resp.Status, string(body)) + } + + return request.SubmissionId, nil +} + +func jobSubmissionURL(address string) string { + if !strings.HasPrefix(address, "http://") { + address = "http://" + address + } + address, err := url.JoinPath(address, "/api/jobs/") // the tailing "/" is required. + if err != nil { + panic(err) + } + return address +} + +func logTailingURL(address, submissionId string) string { + address = strings.Replace(address, "http", "ws", 1) + address, err := url.JoinPath(address, submissionId, "/logs/tail") + if err != nil { + panic(err) + } + return address +} + +func Submit(address string, req utils.RayJobRequest, out io.Writer) { + _, _ = fmt.Fprintf(out, "INFO -- Job submission server address: %s\n", address) + + address = jobSubmissionURL(address) + submissionId, err := submitJobReq(address, req) + if err != nil { + panic(err) + } + + _, _ = fmt.Fprintf(out, "SUCC -- Job '%s' submitted successfully\n", submissionId) + _, _ = fmt.Fprintf(out, "INFO -- Tailing logs until the job exits (disable with --no-wait):\n") + + wsAddr := logTailingURL(address, submissionId) + c, _, err := websocket.Dial(context.Background(), wsAddr, nil) + if err != nil { + panic(err) + } + defer func() { _ = c.CloseNow() }() + for { + _, msg, err := c.Read(context.Background()) + if err != nil { + if websocket.CloseStatus(err) == websocket.StatusNormalClosure { + _, _ = fmt.Fprintf(out, "SUCC -- Job '%s' succeeded\n", submissionId) + return + } + panic(err) + } + _, _ = out.Write(msg) + } +} diff --git a/ray-operator/test/e2erayjobsubmitter/e2e_test.go b/ray-operator/test/e2erayjobsubmitter/e2e_test.go new file mode 100644 index 0000000000..29fba4ea74 --- /dev/null +++ b/ray-operator/test/e2erayjobsubmitter/e2e_test.go @@ -0,0 +1,121 @@ +package e2erayjobsubmitter + +import ( + "bytes" + "fmt" + "os" + "os/exec" + "regexp" + "strings" + "testing" + + "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" + "github.com/ray-project/kuberay/ray-operator/rayjobsubmitter" +) + +var script = `import ray +import os + +ray.init() + +@ray.remote +class Counter: + def __init__(self): + # Used to verify runtimeEnv + self.name = os.getenv("counter_name") + assert self.name == "test_counter" + self.counter = 0 + + def inc(self): + self.counter += 1 + + def get_counter(self): + return "{} got {}".format(self.name, self.counter) + +counter = Counter.remote() + +for _ in range(5): + ray.get(counter.inc.remote()) + print(ray.get(counter.get_counter.remote())) +` + +func TestRayJobSubmitter(t *testing.T) { + // Create a temp job script + scriptpy, err := os.CreateTemp("", "counter.py") + if err != nil { + t.Fatalf("Failed to create job script: %v", err) + } + defer func() { _ = os.Remove(scriptpy.Name()) }() + if _, err = scriptpy.WriteString(script); err != nil { + t.Fatalf("Failed to write to job script: %v", err) + } + if err = scriptpy.Close(); err != nil { + t.Fatalf("Failed to close job script: %v", err) + } + + // start ray + cmd := exec.Command("ray", "start", "--head", "--disable-usage-stats", "--include-dashboard=true") + out, err := cmd.CombinedOutput() + if err != nil { + t.Fatalf("Failed to start ray head: %v", err) + } + t.Log(string(out)) + if cmd.ProcessState.ExitCode() != 0 { + t.Fatalf("Failed to start ray head with exit code: %v", cmd.ProcessState.ExitCode()) + } + defer func() { + cmd := exec.Command("ray", "stop") + if _, err := cmd.CombinedOutput(); err != nil { + t.Fatalf("Failed to stop ray head: %v", err) + } + }() + + var address string + re := regexp.MustCompile(`RAY_ADDRESS='([^']+)'`) + matches := re.FindStringSubmatch(string(out)) + if len(matches) > 1 { + address = matches[1] + } else { + t.Fatalf("Failed to find RAY_ADDRESS from the ray start output") + } + + testcases := []struct { + name string + out string + req utils.RayJobRequest + }{ + { + name: "my-job-1", + req: utils.RayJobRequest{ + Entrypoint: "python " + scriptpy.Name(), + RuntimeEnv: map[string]interface{}{"env_vars": map[string]string{"counter_name": "test_counter"}}, + SubmissionId: "my-job-1", + }, + out: "test_counter got 5", + }, + { + name: "my-job-1-duplicated", + req: utils.RayJobRequest{ + Entrypoint: "python " + scriptpy.Name(), + RuntimeEnv: map[string]interface{}{"env_vars": map[string]string{"counter_name": "test_counter"}}, + SubmissionId: "my-job-1", + }, + out: "test_counter got 5", + }, + } + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + out := bytes.NewBuffer(nil) + + rayjobsubmitter.Submit(address, tc.req, out) + for _, expected := range []string{ + tc.out, + fmt.Sprintf("Job '%s' succeeded", tc.req.SubmissionId), + } { + if !strings.Contains(out.String(), tc.out) { + t.Errorf("Output did not contain expected string. output=%s\nexpected=%s\n", out.String(), expected) + } + } + }) + } +} From e3fb56453e69d03484fbeedc1204a0294f3f1ef4 Mon Sep 17 00:00:00 2001 From: Rueian Date: Tue, 10 Dec 2024 14:42:38 -0800 Subject: [PATCH 3/3] [RayJob][Feature] move light weight job submitter to a dedicated image Signed-off-by: Rueian --- .github/workflows/image-release.yaml | 87 ++++++++++++++++++++++++ ray-operator/Dockerfile | 12 +++- ray-operator/Dockerfile.submitter.buildx | 7 ++ ray-operator/Makefile | 3 + 4 files changed, 106 insertions(+), 3 deletions(-) create mode 100644 ray-operator/Dockerfile.submitter.buildx diff --git a/.github/workflows/image-release.yaml b/.github/workflows/image-release.yaml index c9cbb65291..f522ac6cad 100644 --- a/.github/workflows/image-release.yaml +++ b/.github/workflows/image-release.yaml @@ -172,3 +172,90 @@ jobs: ref: 'refs/tags/ray-operator/${{ github.event.inputs.tag }}', sha: '${{ github.event.inputs.commit }}' }) + + release_submitter_image: + env: + working-directory: ./ray-operator + name: Release Submitter Docker Images + runs-on: ubuntu-latest + steps: + + - name: Set up Go + uses: actions/setup-go@v3 + with: + go-version: v1.22 + + - name: Check out code into the Go module directory + uses: actions/checkout@v2 + with: + ref: ${{ github.event.inputs.commit }} + + - name: Get revision SHA + id: vars + run: echo "::set-output name=sha_short::$(git rev-parse --short HEAD)" + + - name: Get dependencies + run: go mod download + working-directory: ${{env.working-directory}} + + - name: Setup Python + uses: actions/setup-python@v4 + with: + python-version: '3.x' + + - name: Install Ray + run: pip install ray[default]==2.39.0 + + - name: Test + run: make test-e2erayjobsubmitter + working-directory: ${{env.working-directory}} + + - name: Set up Docker + uses: docker-practice/actions-setup-docker@master + + - name: Build Docker Image - Submitter + run: | + IMG=kuberay/submitter:${{ steps.vars.outputs.sha_short }} make docker-submitter-image + working-directory: ${{env.working-directory}} + + - name: Log in to Quay.io + uses: docker/login-action@v2 + with: + registry: quay.io + username: ${{ secrets.QUAY_USERNAME }} + password: ${{ secrets.QUAY_ROBOT_TOKEN }} + + # Build submitter inside the gh runner vm directly and then copy the go binaries to docker images using the Dockerfile.submitter.buildx + - name: Build linux/amd64 submitter go binary + env: + CGO_ENABLED: 0 + GOOS: linux + GOARCH: amd64 + run: | + CGO_ENABLED=$CGO_ENABLED GOOS=$GOOS GOARCH=$GOARCH go build -tags strictfipsruntime -a -o submitter-$GOARCH ./rayjobsubmitter/cmd/main.go + working-directory: ${{env.working-directory}} + + - name: Build linux/arm64 submitter binary + env: + CGO_ENABLED: 0 + GOOS: linux + GOARCH: arm64 + run: | + CGO_ENABLED=$CGO_ENABLED GOOS=$GOOS GOARCH=$GOARCH go build -tags strictfipsruntime -a -o submitter-$GOARCH ./rayjobsubmitter/cmd/main.go + working-directory: ${{env.working-directory}} + + - name: Build MultiArch Image + uses: docker/build-push-action@v5 + env: + PUSH: true + REPO_ORG: kuberay + REPO_NAME: submitter + with: + platforms: linux/amd64,linux/arm64 + context: ${{env.working-directory}} + file: ${{env.working-directory}}/Dockerfile.submitter.buildx + push: ${{env.PUSH}} + provenance: false + tags: | + quay.io/${{env.REPO_ORG}}/${{env.REPO_NAME}}:${{ steps.vars.outputs.sha_short }} + quay.io/${{env.REPO_ORG}}/${{env.REPO_NAME}}:${{ github.event.inputs.tag }} diff --git a/ray-operator/Dockerfile b/ray-operator/Dockerfile index 4ef41d830e..7765fb67ef 100644 --- a/ray-operator/Dockerfile +++ b/ray-operator/Dockerfile @@ -1,5 +1,5 @@ # Build the manager binary -FROM golang:1.22.4-bullseye as builder +FROM golang:1.22.4-bullseye AS builder WORKDIR /workspace # Copy the Go Modules manifests @@ -20,12 +20,18 @@ COPY rayjobsubmitter/ rayjobsubmitter/ # Build USER root RUN CGO_ENABLED=1 GOOS=linux go build -tags strictfipsruntime -a -o manager main.go -RUN CGO_ENABLED=1 GOOS=linux go build -tags strictfipsruntime -a -o submitter ./rayjobsubmitter/cmd/main.go +RUN CGO_ENABLED=0 GOOS=linux go build -tags strictfipsruntime -a -o submitter ./rayjobsubmitter/cmd/main.go + +FROM scratch AS submitter +WORKDIR / +COPY --from=builder /workspace/submitter . +USER 65532:65532 + +ENTRYPOINT ["/submitter"] FROM gcr.io/distroless/base-debian12:nonroot WORKDIR / COPY --from=builder /workspace/manager . -COPY --from=builder /workspace/submitter . USER 65532:65532 ENTRYPOINT ["/manager"] diff --git a/ray-operator/Dockerfile.submitter.buildx b/ray-operator/Dockerfile.submitter.buildx new file mode 100644 index 0000000000..ec2a5fc7c7 --- /dev/null +++ b/ray-operator/Dockerfile.submitter.buildx @@ -0,0 +1,7 @@ +FROM scratch +ARG TARGETARCH +WORKDIR / +COPY ./submitter-${TARGETARCH} ./submitter +USER 65532:65532 + +ENTRYPOINT ["/submitter"] diff --git a/ray-operator/Makefile b/ray-operator/Makefile index 6144e2b71b..7c106fb0ee 100644 --- a/ray-operator/Makefile +++ b/ray-operator/Makefile @@ -104,6 +104,9 @@ build: fmt vet ## Build manager binary. docker-image: ## Build image only ${ENGINE} build -t ${IMG} . +docker-submitter-image: ## Build image only + ${ENGINE} build -t ${IMG} --target submitter . + docker-build: build docker-image ## Build image with the manager. docker-push: ## Push image with the manager.