diff --git a/kubectl-plugin/pkg/cmd/log/log.go b/kubectl-plugin/pkg/cmd/log/log.go index 5409a93d6a6..fb561be2599 100644 --- a/kubectl-plugin/pkg/cmd/log/log.go +++ b/kubectl-plugin/pkg/cmd/log/log.go @@ -30,12 +30,12 @@ import ( const filePathInPod = "/tmp/ray/session_latest/logs/" type ClusterLogOptions struct { - configFlags *genericclioptions.ConfigFlags - ioStreams *genericclioptions.IOStreams - Executor RemoteExecutor - outputDir string - nodeType string - args []string + configFlags *genericclioptions.ConfigFlags + ioStreams *genericclioptions.IOStreams + Executor RemoteExecutor + outputDir string + nodeType string + ResourceName string } var ( @@ -44,7 +44,7 @@ var ( `) logExample = templates.Examples(` - # Download logs from a RayCluster and save them to a directory with the RayCluster's name + # Download logs from a RayCluster and save them to a directory with the RayCluster's name. Retrieves 'all' logs kubectl ray log my-raycluster # Download logs from a RayCluster and save them to a directory named /path/to/dir @@ -52,6 +52,12 @@ var ( # Download logs from a RayCluster, but only for the head node kubectl ray log my-raycluster --node-type head + + # Download logs from a RayCluster, but only for the worker nodes + kubectl ray log my-raycluster --node-type worker + + # Download all (worker node and head node) the logs from a RayCluster + kubectl ray log my-raycluster --node-type all `) ) @@ -77,7 +83,7 @@ func NewClusterLogCommand(streams genericclioptions.IOStreams) *cobra.Command { SilenceUsage: true, ValidArgsFunction: completion.RayClusterCompletionFunc(cmdFactory), RunE: func(cmd *cobra.Command, args []string) error { - if err := options.Complete(args); err != nil { + if err := options.Complete(cmd, args); err != nil { return err } if err := options.Validate(); err != nil { @@ -87,16 +93,20 @@ func NewClusterLogCommand(streams genericclioptions.IOStreams) *cobra.Command { }, } cmd.Flags().StringVar(&options.outputDir, "out-dir", options.outputDir, "File Directory PATH of where to download the file logs to.") - cmd.Flags().StringVar(&options.nodeType, "node-type", options.nodeType, "Type of Ray node to download the files for.") + cmd.Flags().StringVar(&options.nodeType, "node-type", options.nodeType, "Type of Ray node to download the files for, supports 'worker', 'head', or 'all'") options.configFlags.AddFlags(cmd.Flags()) return cmd } -func (options *ClusterLogOptions) Complete(args []string) error { - options.args = args +func (options *ClusterLogOptions) Complete(cmd *cobra.Command, args []string) error { + if len(args) != 1 { + return cmdutil.UsageErrorf(cmd, "%s", cmd.Use) + } + + options.ResourceName = args[0] if options.nodeType == "" { - options.nodeType = "head" + options.nodeType = "all" } return nil @@ -112,12 +122,9 @@ func (options *ClusterLogOptions) Validate() error { return fmt.Errorf("no context is currently set, use %q to select a new one", "kubectl config use-context ") } - // Command must have ray cluster name - if len(options.args) != 1 { - return fmt.Errorf("must have at only one argument") - } else if options.outputDir == "" { + if options.outputDir == "" { fmt.Fprintln(options.ioStreams.Out, "No output directory specified, creating dir under current directory using cluster name.") - options.outputDir = options.args[0] + options.outputDir = options.ResourceName err := os.MkdirAll(options.outputDir, 0o755) if err != nil { return fmt.Errorf("could not create directory with cluster name %s: %w", options.outputDir, err) @@ -126,11 +133,11 @@ func (options *ClusterLogOptions) Validate() error { switch options.nodeType { case "all": - return fmt.Errorf("node type `all` is currently not supported") + fmt.Fprintln(options.ioStreams.Out, "Command set to retrieve both head and worker node logs.") case "head": - break + fmt.Fprintln(options.ioStreams.Out, "Command set to retrieve only head node logs.") case "worker": - return fmt.Errorf("node type `worker` is currently not supported") + fmt.Fprintln(options.ioStreams.Out, "Command set to retrieve only worker node logs.") default: return fmt.Errorf("unknown node type `%s`", options.nodeType) } @@ -154,26 +161,34 @@ func (options *ClusterLogOptions) Run(ctx context.Context, factory cmdutil.Facto } var listopts v1.ListOptions - if options.nodeType == "head" { + if options.nodeType == "all" { listopts = v1.ListOptions{ - LabelSelector: fmt.Sprintf("ray.io/group=headgroup, ray.io/cluster=%s", options.args[0]), + LabelSelector: fmt.Sprintf("ray.io/cluster=%s", options.ResourceName), + } + } else if options.nodeType == "head" { + listopts = v1.ListOptions{ + LabelSelector: fmt.Sprintf("ray.io/node-type=head, ray.io/cluster=%s", options.ResourceName), + } + } else if options.nodeType == "worker" { + listopts = v1.ListOptions{ + LabelSelector: fmt.Sprintf("ray.io/node-type=worker, ray.io/cluster=%s", options.ResourceName), } } - // Get list of nodes that are considered ray heads - rayHeads, err := kubeClientSet.CoreV1().Pods(*options.configFlags.Namespace).List(ctx, listopts) + // Get list of nodes that are considered the specified node type + rayNodes, err := kubeClientSet.CoreV1().Pods(*options.configFlags.Namespace).List(ctx, listopts) if err != nil { - return fmt.Errorf("failed to retrieve head node for cluster %s: %w", options.args[0], err) + return fmt.Errorf("failed to retrieve head node for RayCluster %s: %w", options.ResourceName, err) } - // Get a list of logs of the ray heads. + // Get a list of logs of the ray nodes. var logList []*bytes.Buffer - for _, rayHead := range rayHeads.Items { - request := kubeClientSet.CoreV1().Pods(rayHead.Namespace).GetLogs(rayHead.Name, &corev1.PodLogOptions{}) + for _, rayNode := range rayNodes.Items { + request := kubeClientSet.CoreV1().Pods(rayNode.Namespace).GetLogs(rayNode.Name, &corev1.PodLogOptions{}) podLogs, err := request.Stream(ctx) if err != nil { - return fmt.Errorf("Error retrieving log for kuberay-head %s: %w", rayHead.Name, err) + return fmt.Errorf("Error retrieving log for RayCluster node %s: %w", rayNode.Name, err) } defer podLogs.Close() @@ -181,16 +196,16 @@ func (options *ClusterLogOptions) Run(ctx context.Context, factory cmdutil.Facto buf := new(bytes.Buffer) _, err = io.Copy(buf, podLogs) if err != nil { - return fmt.Errorf("Failed to get read current logs for kuberay-head %s: %w", rayHead.Name, err) + return fmt.Errorf("Failed to get read current logs for RayCluster Node %s: %w", rayNode.Name, err) } logList = append(logList, buf) } - // Pod file name format is name of the ray head + // Pod file name format is name of the ray node for ind, logList := range logList { - curFilePath := filepath.Join(options.outputDir, rayHeads.Items[ind].Name, "stdout.log") - dirPath := filepath.Join(options.outputDir, rayHeads.Items[ind].Name) + curFilePath := filepath.Join(options.outputDir, rayNodes.Items[ind].Name, "stdout.log") + dirPath := filepath.Join(options.outputDir, rayNodes.Items[ind].Name) err := os.MkdirAll(dirPath, 0o755) if err != nil { return fmt.Errorf("failed to create directory within path %s: %w", dirPath, err) @@ -203,14 +218,14 @@ func (options *ClusterLogOptions) Run(ctx context.Context, factory cmdutil.Facto _, err = logList.WriteTo(file) if err != nil { - return fmt.Errorf("failed to write to file for kuberay-head: %s: %w", rayHeads.Items[ind].Name, err) + return fmt.Errorf("failed to write to file for kuberay-head: %s: %w", rayNodes.Items[ind].Name, err) } req := kubeClientSet.CoreV1().RESTClient(). Get(). - Namespace(rayHeads.Items[ind].Namespace). + Namespace(rayNodes.Items[ind].Namespace). Resource("pods"). - Name(rayHeads.Items[ind].Name). + Name(rayNodes.Items[ind].Name). SubResource("exec"). VersionedParams(&corev1.PodExecOptions{ Command: []string{"tar", "--warning=no-file-changed", "-cf", "-", "-C", filePathInPod, "."}, @@ -230,7 +245,7 @@ func (options *ClusterLogOptions) Run(ctx context.Context, factory cmdutil.Facto return fmt.Errorf("failed to create executor with error: %w", err) } - err = options.downloadRayLogFiles(ctx, exec, rayHeads.Items[ind]) + err = options.downloadRayLogFiles(ctx, exec, rayNodes.Items[ind]) if err != nil { return fmt.Errorf("failed to download ray head log files with error: %w", err) } @@ -251,7 +266,7 @@ func (dre *DefaultRemoteExecutor) CreateExecutor(restConfig *rest.Config, url *u } // downloadRayLogFiles will use to the executor and retrieve the logs file from the inputted ray head -func (options *ClusterLogOptions) downloadRayLogFiles(ctx context.Context, exec remotecommand.Executor, rayhead corev1.Pod) error { +func (options *ClusterLogOptions) downloadRayLogFiles(ctx context.Context, exec remotecommand.Executor, rayNode corev1.Pod) error { outreader, outStream := io.Pipe() go func() { defer outStream.Close() @@ -270,16 +285,17 @@ func (options *ClusterLogOptions) downloadRayLogFiles(ctx context.Context, exec tarReader := tar.NewReader(outreader) header, err := tarReader.Next() if err != nil && !errors.Is(err, io.EOF) { - return fmt.Errorf("error will extracting head tar file for ray head %s: %w", rayhead.Name, err) + return fmt.Errorf("error will extracting head tar file for ray head %s: %w", rayNode.Name, err) } + + fmt.Fprintf(options.ioStreams.Out, "Downloading log for Ray Node %s\n", rayNode.Name) for !errors.Is(err, io.EOF) { - fmt.Printf("Downloading file %s for Ray Head %s\n", header.Name, rayhead.Name) if err != nil { return fmt.Errorf("Error reading tar archive: %w", err) } // Construct the full local path and a directory for the tmp file logs - localFilePath := filepath.Join(path.Clean(options.outputDir), path.Clean(rayhead.Name), path.Clean(header.Name)) + localFilePath := filepath.Join(path.Clean(options.outputDir), path.Clean(rayNode.Name), path.Clean(header.Name)) switch header.Typeflag { case tar.TypeDir: diff --git a/kubectl-plugin/pkg/cmd/log/log_test.go b/kubectl-plugin/pkg/cmd/log/log_test.go index c5c38c58f6b..f551991aaaf 100644 --- a/kubectl-plugin/pkg/cmd/log/log_test.go +++ b/kubectl-plugin/pkg/cmd/log/log_test.go @@ -12,6 +12,7 @@ import ( "testing" "time" + "github.com/spf13/cobra" "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -116,11 +117,13 @@ func TestRayClusterLogComplete(t *testing.T) { fakeClusterLogOptions := NewClusterLogOptions(testStreams) fakeArgs := []string{"Expectedoutput"} - err := fakeClusterLogOptions.Complete(fakeArgs) + cmd := &cobra.Command{Use: "log"} - assert.Equal(t, fakeClusterLogOptions.nodeType, "head") + err := fakeClusterLogOptions.Complete(cmd, fakeArgs) + + assert.Equal(t, fakeClusterLogOptions.nodeType, "all") assert.Nil(t, err) - assert.Equal(t, fakeClusterLogOptions.args, fakeArgs) + assert.Equal(t, fakeClusterLogOptions.ResourceName, fakeArgs[0]) } func TestRayClusterLogValidate(t *testing.T) { @@ -180,59 +183,23 @@ func TestRayClusterLogValidate(t *testing.T) { { name: "Test validation when no context is set", opts: &ClusterLogOptions{ - configFlags: genericclioptions.NewConfigFlags(false), - outputDir: fakeDir, - args: []string{"fake-cluster"}, - nodeType: "head", - ioStreams: &testStreams, + configFlags: genericclioptions.NewConfigFlags(false), + outputDir: fakeDir, + ResourceName: "fake-cluster", + nodeType: "head", + ioStreams: &testStreams, }, expectError: "no context is currently set, use \"kubectl config use-context \" to select a new one", }, - { - name: "Test validation when more than 1 arg", - opts: &ClusterLogOptions{ - // Use fake config to bypass the config flag checks - configFlags: fakeConfigFlags, - outputDir: fakeDir, - args: []string{"fake-cluster", "another-fake"}, - nodeType: "head", - ioStreams: &testStreams, - }, - expectError: "must have at only one argument", - }, - { - name: "Test validation when node type is `all`", - opts: &ClusterLogOptions{ - // Use fake config to bypass the config flag checks - configFlags: fakeConfigFlags, - outputDir: fakeDir, - args: []string{"fake-cluster"}, - nodeType: "all", - ioStreams: &testStreams, - }, - expectError: "node type `all` is currently not supported", - }, - { - name: "Test validation when node type is `worker`", - opts: &ClusterLogOptions{ - // Use fake config to bypass the config flag checks - configFlags: fakeConfigFlags, - outputDir: fakeDir, - args: []string{"fake-cluster"}, - nodeType: "worker", - ioStreams: &testStreams, - }, - expectError: "node type `worker` is currently not supported", - }, { name: "Test validation when node type is `random-string`", opts: &ClusterLogOptions{ // Use fake config to bypass the config flag checks - configFlags: fakeConfigFlags, - outputDir: fakeDir, - args: []string{"fake-cluster"}, - nodeType: "random-string", - ioStreams: &testStreams, + configFlags: fakeConfigFlags, + outputDir: fakeDir, + ResourceName: "fake-cluster", + nodeType: "random-string", + ioStreams: &testStreams, }, expectError: "unknown node type `random-string`", }, @@ -240,23 +207,23 @@ func TestRayClusterLogValidate(t *testing.T) { name: "Successful validation call", opts: &ClusterLogOptions{ // Use fake config to bypass the config flag checks - configFlags: fakeConfigFlags, - outputDir: fakeDir, - args: []string{"random_arg"}, - nodeType: "head", - ioStreams: &testStreams, + configFlags: fakeConfigFlags, + outputDir: fakeDir, + ResourceName: "fake-cluster", + nodeType: "head", + ioStreams: &testStreams, }, expectError: "", }, { - name: "Validate output directory when no out-dir i set.", + name: "Validate output directory when no out-dir is set.", opts: &ClusterLogOptions{ // Use fake config to bypass the config flag checks - configFlags: fakeConfigFlags, - outputDir: "", - args: []string{"cluster-name"}, - nodeType: "head", - ioStreams: &testStreams, + configFlags: fakeConfigFlags, + outputDir: "", + ResourceName: "fake-cluster", + nodeType: "head", + ioStreams: &testStreams, }, expectError: "", }, @@ -264,11 +231,11 @@ func TestRayClusterLogValidate(t *testing.T) { name: "Failed validation call with output directory not exist", opts: &ClusterLogOptions{ // Use fake config to bypass the config flag checks - configFlags: fakeConfigFlags, - outputDir: "randomPath-here", - args: []string{"random_arg"}, - nodeType: "head", - ioStreams: &testStreams, + configFlags: fakeConfigFlags, + outputDir: "randomPath-here", + ResourceName: "fake-cluster", + nodeType: "head", + ioStreams: &testStreams, }, expectError: "Directory does not exist. Failed with: stat randomPath-here: no such file or directory", }, @@ -276,11 +243,11 @@ func TestRayClusterLogValidate(t *testing.T) { name: "Failed validation call with output directory is file", opts: &ClusterLogOptions{ // Use fake config to bypass the config flag checks - configFlags: fakeConfigFlags, - outputDir: fakeFile, - args: []string{"random_arg"}, - nodeType: "head", - ioStreams: &testStreams, + configFlags: fakeConfigFlags, + outputDir: fakeFile, + ResourceName: "fake-cluster", + nodeType: "head", + ioStreams: &testStreams, }, expectError: "Path is Not a directory. Please input a directory and try again", }, @@ -293,7 +260,7 @@ func TestRayClusterLogValidate(t *testing.T) { assert.Equal(t, tc.expectError, err.Error()) } else { if tc.opts.outputDir == "" { - assert.Equal(t, tc.opts.args[0], tc.opts.outputDir) + assert.Equal(t, tc.opts.ResourceName, tc.opts.outputDir) } assert.True(t, err == nil) } @@ -314,7 +281,7 @@ func TestRayClusterLogRun(t *testing.T) { fakeClusterLogOptions := NewClusterLogOptions(testStreams) // Uses the mocked executor fakeClusterLogOptions.Executor = &FakeRemoteExecutor{} - fakeClusterLogOptions.args = []string{"test-cluster"} + fakeClusterLogOptions.ResourceName = "test-cluster" fakeClusterLogOptions.outputDir = fakeDir // Create list of fake ray heads @@ -434,7 +401,7 @@ func TestDownloadRayLogFiles(t *testing.T) { testStreams, _, _, _ := genericiooptions.NewTestIOStreams() fakeClusterLogOptions := NewClusterLogOptions(testStreams) - fakeClusterLogOptions.args = []string{"test-cluster"} + fakeClusterLogOptions.ResourceName = "test-cluster" fakeClusterLogOptions.outputDir = fakeDir // create fake tar files to test