Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][kubectl-plugin] Add all and worker node type to kubectl ray log #2442

Merged
merged 1 commit into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 57 additions & 41 deletions kubectl-plugin/pkg/cmd/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ import (
const filePathInPod = "/tmp/ray/session_latest/logs/"

type ClusterLogOptions struct {
configFlags *genericclioptions.ConfigFlags
ioStreams *genericclioptions.IOStreams
Executor RemoteExecutor
outputDir string
nodeType string
args []string
configFlags *genericclioptions.ConfigFlags
ioStreams *genericclioptions.IOStreams
Executor RemoteExecutor
outputDir string
nodeType string
ResourceName string
}

var (
Expand All @@ -44,14 +44,20 @@ var (
`)

logExample = templates.Examples(`
# Download logs from a RayCluster and save them to a directory with the RayCluster's name
# Download logs from a RayCluster and save them to a directory with the RayCluster's name. Retrieves 'all' logs
kubectl ray log my-raycluster

# Download logs from a RayCluster and save them to a directory named /path/to/dir
kubectl ray log my-raycluster --out-dir /path/to/dir

# Download logs from a RayCluster, but only for the head node
kubectl ray log my-raycluster --node-type head

# Download logs from a RayCluster, but only for the worker nodes
kubectl ray log my-raycluster --node-type worker

# Download all (worker node and head node) the logs from a RayCluster
kubectl ray log my-raycluster --node-type all
chiayi marked this conversation as resolved.
Show resolved Hide resolved
`)
)

Expand All @@ -77,7 +83,7 @@ func NewClusterLogCommand(streams genericclioptions.IOStreams) *cobra.Command {
SilenceUsage: true,
ValidArgsFunction: completion.RayClusterCompletionFunc(cmdFactory),
RunE: func(cmd *cobra.Command, args []string) error {
if err := options.Complete(args); err != nil {
if err := options.Complete(cmd, args); err != nil {
return err
}
if err := options.Validate(); err != nil {
Expand All @@ -87,16 +93,20 @@ func NewClusterLogCommand(streams genericclioptions.IOStreams) *cobra.Command {
},
}
cmd.Flags().StringVar(&options.outputDir, "out-dir", options.outputDir, "File Directory PATH of where to download the file logs to.")
cmd.Flags().StringVar(&options.nodeType, "node-type", options.nodeType, "Type of Ray node to download the files for.")
cmd.Flags().StringVar(&options.nodeType, "node-type", options.nodeType, "Type of Ray node to download the files for, supports 'worker', 'head', or 'all'")
options.configFlags.AddFlags(cmd.Flags())
return cmd
}

func (options *ClusterLogOptions) Complete(args []string) error {
options.args = args
func (options *ClusterLogOptions) Complete(cmd *cobra.Command, args []string) error {
if len(args) != 1 {
return cmdutil.UsageErrorf(cmd, "%s", cmd.Use)
}

options.ResourceName = args[0]

if options.nodeType == "" {
options.nodeType = "head"
options.nodeType = "all"
}

return nil
Expand All @@ -112,12 +122,9 @@ func (options *ClusterLogOptions) Validate() error {
return fmt.Errorf("no context is currently set, use %q to select a new one", "kubectl config use-context <context>")
}

// Command must have ray cluster name
if len(options.args) != 1 {
return fmt.Errorf("must have at only one argument")
} else if options.outputDir == "" {
if options.outputDir == "" {
fmt.Fprintln(options.ioStreams.Out, "No output directory specified, creating dir under current directory using cluster name.")
options.outputDir = options.args[0]
options.outputDir = options.ResourceName
err := os.MkdirAll(options.outputDir, 0o755)
if err != nil {
return fmt.Errorf("could not create directory with cluster name %s: %w", options.outputDir, err)
Expand All @@ -126,11 +133,11 @@ func (options *ClusterLogOptions) Validate() error {

switch options.nodeType {
case "all":
return fmt.Errorf("node type `all` is currently not supported")
fmt.Fprintln(options.ioStreams.Out, "Command set to retrieve both head and worker node logs.")
case "head":
break
fmt.Fprintln(options.ioStreams.Out, "Command set to retrieve only head node logs.")
case "worker":
return fmt.Errorf("node type `worker` is currently not supported")
fmt.Fprintln(options.ioStreams.Out, "Command set to retrieve only worker node logs.")
default:
return fmt.Errorf("unknown node type `%s`", options.nodeType)
}
Expand All @@ -154,43 +161,51 @@ func (options *ClusterLogOptions) Run(ctx context.Context, factory cmdutil.Facto
}

var listopts v1.ListOptions
if options.nodeType == "head" {
if options.nodeType == "all" {
listopts = v1.ListOptions{
LabelSelector: fmt.Sprintf("ray.io/group=headgroup, ray.io/cluster=%s", options.args[0]),
LabelSelector: fmt.Sprintf("ray.io/cluster=%s", options.ResourceName),
}
} else if options.nodeType == "head" {
listopts = v1.ListOptions{
LabelSelector: fmt.Sprintf("ray.io/node-type=head, ray.io/cluster=%s", options.ResourceName),
}
} else if options.nodeType == "worker" {
listopts = v1.ListOptions{
LabelSelector: fmt.Sprintf("ray.io/node-type=worker, ray.io/cluster=%s", options.ResourceName),
}
}

// Get list of nodes that are considered ray heads
rayHeads, err := kubeClientSet.CoreV1().Pods(*options.configFlags.Namespace).List(ctx, listopts)
// Get list of nodes that are considered the specified node type
rayNodes, err := kubeClientSet.CoreV1().Pods(*options.configFlags.Namespace).List(ctx, listopts)
if err != nil {
return fmt.Errorf("failed to retrieve head node for cluster %s: %w", options.args[0], err)
return fmt.Errorf("failed to retrieve head node for RayCluster %s: %w", options.ResourceName, err)
}

// Get a list of logs of the ray heads.
// Get a list of logs of the ray nodes.
var logList []*bytes.Buffer
for _, rayHead := range rayHeads.Items {
request := kubeClientSet.CoreV1().Pods(rayHead.Namespace).GetLogs(rayHead.Name, &corev1.PodLogOptions{})
for _, rayNode := range rayNodes.Items {
request := kubeClientSet.CoreV1().Pods(rayNode.Namespace).GetLogs(rayNode.Name, &corev1.PodLogOptions{})

podLogs, err := request.Stream(ctx)
if err != nil {
return fmt.Errorf("Error retrieving log for kuberay-head %s: %w", rayHead.Name, err)
return fmt.Errorf("Error retrieving log for RayCluster node %s: %w", rayNode.Name, err)
}
defer podLogs.Close()

// Get current logs:
buf := new(bytes.Buffer)
_, err = io.Copy(buf, podLogs)
if err != nil {
return fmt.Errorf("Failed to get read current logs for kuberay-head %s: %w", rayHead.Name, err)
return fmt.Errorf("Failed to get read current logs for RayCluster Node %s: %w", rayNode.Name, err)
}

logList = append(logList, buf)
}

// Pod file name format is name of the ray head
// Pod file name format is name of the ray node
for ind, logList := range logList {
curFilePath := filepath.Join(options.outputDir, rayHeads.Items[ind].Name, "stdout.log")
dirPath := filepath.Join(options.outputDir, rayHeads.Items[ind].Name)
curFilePath := filepath.Join(options.outputDir, rayNodes.Items[ind].Name, "stdout.log")
dirPath := filepath.Join(options.outputDir, rayNodes.Items[ind].Name)
err := os.MkdirAll(dirPath, 0o755)
if err != nil {
return fmt.Errorf("failed to create directory within path %s: %w", dirPath, err)
Expand All @@ -203,14 +218,14 @@ func (options *ClusterLogOptions) Run(ctx context.Context, factory cmdutil.Facto

_, err = logList.WriteTo(file)
if err != nil {
return fmt.Errorf("failed to write to file for kuberay-head: %s: %w", rayHeads.Items[ind].Name, err)
return fmt.Errorf("failed to write to file for kuberay-head: %s: %w", rayNodes.Items[ind].Name, err)
}

req := kubeClientSet.CoreV1().RESTClient().
Get().
Namespace(rayHeads.Items[ind].Namespace).
Namespace(rayNodes.Items[ind].Namespace).
Resource("pods").
Name(rayHeads.Items[ind].Name).
Name(rayNodes.Items[ind].Name).
SubResource("exec").
VersionedParams(&corev1.PodExecOptions{
Command: []string{"tar", "--warning=no-file-changed", "-cf", "-", "-C", filePathInPod, "."},
Expand All @@ -230,7 +245,7 @@ func (options *ClusterLogOptions) Run(ctx context.Context, factory cmdutil.Facto
return fmt.Errorf("failed to create executor with error: %w", err)
}

err = options.downloadRayLogFiles(ctx, exec, rayHeads.Items[ind])
err = options.downloadRayLogFiles(ctx, exec, rayNodes.Items[ind])
if err != nil {
return fmt.Errorf("failed to download ray head log files with error: %w", err)
}
Expand All @@ -251,7 +266,7 @@ func (dre *DefaultRemoteExecutor) CreateExecutor(restConfig *rest.Config, url *u
}

// downloadRayLogFiles will use to the executor and retrieve the logs file from the inputted ray head
func (options *ClusterLogOptions) downloadRayLogFiles(ctx context.Context, exec remotecommand.Executor, rayhead corev1.Pod) error {
func (options *ClusterLogOptions) downloadRayLogFiles(ctx context.Context, exec remotecommand.Executor, rayNode corev1.Pod) error {
outreader, outStream := io.Pipe()
go func() {
defer outStream.Close()
Expand All @@ -270,16 +285,17 @@ func (options *ClusterLogOptions) downloadRayLogFiles(ctx context.Context, exec
tarReader := tar.NewReader(outreader)
header, err := tarReader.Next()
if err != nil && !errors.Is(err, io.EOF) {
return fmt.Errorf("error will extracting head tar file for ray head %s: %w", rayhead.Name, err)
return fmt.Errorf("error will extracting head tar file for ray head %s: %w", rayNode.Name, err)
}

fmt.Fprintf(options.ioStreams.Out, "Downloading log for Ray Node %s\n", rayNode.Name)
for !errors.Is(err, io.EOF) {
fmt.Printf("Downloading file %s for Ray Head %s\n", header.Name, rayhead.Name)
if err != nil {
return fmt.Errorf("Error reading tar archive: %w", err)
}

// Construct the full local path and a directory for the tmp file logs
localFilePath := filepath.Join(path.Clean(options.outputDir), path.Clean(rayhead.Name), path.Clean(header.Name))
localFilePath := filepath.Join(path.Clean(options.outputDir), path.Clean(rayNode.Name), path.Clean(header.Name))

switch header.Typeflag {
case tar.TypeDir:
Expand Down
Loading
Loading