From 592eb9b27c53c36cc0d761c5b9ec9fc250371cf6 Mon Sep 17 00:00:00 2001 From: antoinetran Date: Wed, 13 Nov 2024 18:38:29 +0100 Subject: [PATCH 1/6] Fix #35 'kubectl logs -f' implemented + improved debug with sessionContext --- pkg/slurm/Create.go | 4 +- pkg/slurm/GetLogs.go | 220 ++++++++++++++++++++++++++++++++++++------- pkg/slurm/func.go | 18 ++++ pkg/slurm/prepare.go | 84 ++++++++--------- 4 files changed, 250 insertions(+), 76 deletions(-) diff --git a/pkg/slurm/Create.go b/pkg/slurm/Create.go index dac6de3..022dfe6 100644 --- a/pkg/slurm/Create.go +++ b/pkg/slurm/Create.go @@ -83,7 +83,7 @@ func (h *SidecarHandler) SubmitHandler(w http.ResponseWriter, r *http.Request) { // singularity run will honor the entrypoint/command (if exist) in container image, while exec will override entrypoint. // Thus if pod command (equivalent to container entrypoint) exist, we do exec, and other case we do run singularityCommand := "" - if container.Command != nil && len(container.Command) != 0 { + if len(container.Command) != 0 { singularityCommand = "exec" } else { singularityCommand = "run" @@ -116,7 +116,7 @@ func (h *SidecarHandler) SubmitHandler(w http.ResponseWriter, r *http.Request) { os.RemoveAll(filesPath) return } - + // prepareEnvs creates a file in the working directory, that must exist. This is created at prepareMounts. envs := prepareEnvs(spanCtx, h.Config, data, container) diff --git a/pkg/slurm/GetLogs.go b/pkg/slurm/GetLogs.go index dc92a50..bd81b07 100644 --- a/pkg/slurm/GetLogs.go +++ b/pkg/slurm/GetLogs.go @@ -1,10 +1,16 @@ package slurm import ( + "bufio" + "context" "encoding/json" + "errors" + "fmt" "io" + "io/fs" "net/http" "os" + "strconv" "strings" "time" @@ -17,6 +23,116 @@ import ( trace "go.opentelemetry.io/otel/trace" ) +// Logs in follow mode (get logs until the death of the container) with "kubectl -f". +func (h *SidecarHandler) GetLogsFollowMode(w http.ResponseWriter, r *http.Request, path string, req commonIL.LogStruct, containerOutputPath string, containerOutput []byte, sessionContext string) error { + // Follow until this file exist, that indicates the end of container, thus the end of following. + containerStatusPath := path + "/" + req.ContainerName + ".status" + // Get the offset of what we read. + containerOutputLastOffset := len(containerOutput) + sessionContextMessage := GetSessionContextMessage(sessionContext) + log.G(h.Ctx).Debug(sessionContextMessage, "Check container status", containerStatusPath, " with current length/offset: ", containerOutputLastOffset) + + var containerOutputFd *os.File + var err error + for { + containerOutputFd, err = os.Open(containerOutputPath) + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + // Case the file does not exist yet, we loop until it exist. + notFoundMsg := sessionContextMessage + "Cannot open in follow mode the container logs " + containerOutputPath + " because it does not exist yet, sleeping before retrying..." + log.G(h.Ctx).Debug(notFoundMsg) + // Warning: if we don't write anything to body before 30s, there will be a timeout in VK to API DoReq(), thus we send an informational message that the log is not ready. + w.Write([]byte(notFoundMsg)) + // Flush otherwise it will be as if nothing was written. + if f, ok := w.(http.Flusher); ok { + log.G(h.Ctx).Debug(sessionContextMessage, "wrote file not found yet, now flushing this message...") + f.Flush() + } else { + log.G(h.Ctx).Error(sessionContextMessage, "wrote file not found but could not flush because server does not support Flusher.") + } + time.Sleep(5 * time.Second) + continue + } else { + // Case unknown error. + errWithContext := fmt.Errorf(sessionContextMessage+"could not open file to follow logs at %s error type: %s error: %w", containerOutputPath, fmt.Sprintf("%#v", err), err) + log.G(h.Ctx).Error(errWithContext) + w.Write([]byte(errWithContext.Error())) + return err + } + } + // File exist. + log.G(h.Ctx).Debug(sessionContextMessage, "opened for follow mode the container logs ", containerOutputPath) + break + } + defer containerOutputFd.Close() + + // We follow only from after what is already read. + _, err = containerOutputFd.Seek(int64(containerOutputLastOffset), 0) + if err != nil { + errWithContext := fmt.Errorf(sessionContextMessage+"error during Seek() of GetLogsFollowMode() in GetLogsHandler of file %s offset %d type: %s %w", containerOutputPath, containerOutputLastOffset, fmt.Sprintf("%#v", err), err) + w.Write([]byte(errWithContext.Error())) + return errWithContext + } + + containerOutputReader := bufio.NewReader(containerOutputFd) + + bufferBytes := make([]byte, 4096) + + // Looping until we get end of job. + // TODO: handle the Ctrl+C of kubectl logs. + var isContainerDead bool = false + for { + n, errRead := containerOutputReader.Read(bufferBytes) + if errRead != nil && errRead != io.EOF { + // Error during read. + h.logErrorVerbose(sessionContextMessage+"error doing Read() of GetLogsFollowMode", h.Ctx, w, errRead) + return errRead + } + // Write ASAP what we could read of it. + _, err = w.Write(bufferBytes[:n]) + if err != nil { + h.logErrorVerbose(sessionContextMessage+"error doing Write() of GetLogsFollowMode", h.Ctx, w, err) + return err + } + + // Flush otherwise it will take time to appear in kubectl logs. + if f, ok := w.(http.Flusher); ok { + log.G(h.Ctx).Debug(sessionContextMessage, "wrote some logs, now flushing...") + f.Flush() + } else { + log.G(h.Ctx).Error(sessionContextMessage, "wrote some logs but could not flush because server does not support Flusher.") + } + + if errRead != nil { + if errRead == io.EOF { + // Nothing more to read, but in follow mode, is the container still alive? + if isContainerDead { + // Container already marked as dead, and we tried to get logs one last time. Exiting the loop. + log.G(h.Ctx).Info(sessionContextMessage, "Container is found dead and no more logs are found at this step, exiting following mode...") + break + } + // Checking if container is dead (meaning the status file exist). + if _, err := os.Stat(containerStatusPath); errors.Is(err, os.ErrNotExist) { + // The status file of the container does not exist, so the container is still alive. Continuing to follow logs. + // Sleep because otherwise it can be a stress to file system to always read it when it has nothing. + log.G(h.Ctx).Debug(sessionContextMessage, "EOF of container logs, sleeping 4s before retrying...") + time.Sleep(4 * time.Second) + } else { + // The status file exist, so the container is dead. Trying to get the latest log one last time. + // Because the moment we found the status file, there might be some more logs to read. + isContainerDead = true + log.G(h.Ctx).Info(sessionContextMessage, "Container is found dead, reading last logs...") + } + continue + } else { + // Unreachable code. + } + } + } + // No error, err = nil + return nil +} + // GetLogsHandler reads Jobs' output file to return what's logged inside. // What's returned is based on the provided parameters (Tail/LimitBytes/Timestamps/etc) func (h *SidecarHandler) GetLogsHandler(w http.ResponseWriter, r *http.Request) { @@ -28,22 +144,23 @@ func (h *SidecarHandler) GetLogsHandler(w http.ResponseWriter, r *http.Request) defer span.End() defer commonIL.SetDurationSpan(start, span) - log.G(h.Ctx).Info("Docker Sidecar: received GetLogs call") + // For debugging purpose, when we have many kubectl logs, we can differentiate each one. + sessionContext := GetSessionContext(r) + sessionContextMessage := GetSessionContextMessage(sessionContext) + + log.G(h.Ctx).Info(sessionContextMessage, "Docker Sidecar: received GetLogs call") var req commonIL.LogStruct - statusCode := http.StatusOK currentTime := time.Now() bodyBytes, err := io.ReadAll(r.Body) if err != nil { - statusCode = http.StatusInternalServerError - h.handleError(spanCtx, w, statusCode, err) + h.logErrorVerbose(sessionContextMessage+"error during ReadAll() in GetLogsHandler request body", spanCtx, w, err) return } err = json.Unmarshal(bodyBytes, &req) if err != nil { - statusCode = http.StatusInternalServerError - h.handleError(spanCtx, w, statusCode, err) + h.logErrorVerbose(sessionContextMessage+"error during Unmarshal() in GetLogsHandler request body", spanCtx, w, err) return } @@ -60,32 +177,26 @@ func (h *SidecarHandler) GetLogsHandler(w http.ResponseWriter, r *http.Request) ) path := h.Config.DataRootFolder + req.Namespace + "-" + req.PodUID + containerOutputPath := path + "/" + req.ContainerName + ".out" var output []byte if req.Opts.Timestamps { - h.handleError(spanCtx, w, statusCode, err) + h.logErrorVerbose(sessionContextMessage+"unsupported option req.Opts.Timestamps", spanCtx, w, err) + return + } + containerOutput, err := h.ReadLogs(containerOutputPath, span, spanCtx, w, sessionContextMessage) + if err != nil { + // Error already handled in waitAndReadLogs + return + } + jobOutput, err := h.ReadLogs(path+"/"+"job.out", span, spanCtx, w, sessionContextMessage) + if err != nil { + // Error already handled in waitAndReadLogs return - } else { - log.G(h.Ctx).Info("Reading " + path + "/" + req.ContainerName + ".out") - containerOutput, err1 := os.ReadFile(path + "/" + req.ContainerName + ".out") - if err1 != nil { - log.G(h.Ctx).Error("Failed to read container logs.") - } - jobOutput, err2 := os.ReadFile(path + "/" + "job.out") - if err2 != nil { - log.G(h.Ctx).Error("Failed to read job logs.") - } - - if err1 != nil && err2 != nil { - span.AddEvent("Error retrieving logs") - h.handleError(spanCtx, w, statusCode, err) - return - } - - output = append(output, jobOutput...) - output = append(output, containerOutput...) - } + output = append(output, jobOutput...) + output = append(output, containerOutput...) + var returnedLogs string if req.Opts.Tail != 0 { @@ -140,12 +251,57 @@ func (h *SidecarHandler) GetLogsHandler(w http.ResponseWriter, r *http.Request) } } - commonIL.SetDurationSpan(start, span, commonIL.WithHTTPReturnCode(statusCode)) + commonIL.SetDurationSpan(start, span, commonIL.WithHTTPReturnCode(http.StatusOK)) + + //w.Header().Set("Transfer-Encoding", "chunked") + w.Header().Set("Content-Type", "text/plain") - if statusCode != http.StatusOK { - w.Write([]byte("Some errors occurred while checking container status. Check Docker Sidecar's logs")) + log.G(h.Ctx).Info(sessionContextMessage, "writing response headers and OK status") + w.WriteHeader(http.StatusOK) + + log.G(h.Ctx).Info(sessionContextMessage, "writing response body len: ", len(returnedLogs)) + n, err := w.Write([]byte(returnedLogs)) + log.G(h.Ctx).Info(sessionContextMessage, "written response body len: ", n) + if err != nil { + h.logErrorVerbose(sessionContextMessage+"error during Write() in GetLogsHandler, could write bytes: "+strconv.Itoa(n), spanCtx, w, err) + return + } + + // Flush or else, it could be lost in the pipe. + if f, ok := w.(http.Flusher); ok { + log.G(h.Ctx).Debug(sessionContextMessage, "flushing after wrote response body bytes: "+strconv.Itoa(n)) + f.Flush() } else { - w.WriteHeader(statusCode) - w.Write([]byte(returnedLogs)) + log.G(h.Ctx).Error(sessionContextMessage, "wrote response body but could not flush because server does not support Flusher.") + return + } + + if req.Opts.Follow { + err := h.GetLogsFollowMode(w, r, path, req, containerOutputPath, containerOutput, sessionContext) + if err != nil { + h.logErrorVerbose(sessionContextMessage+"follow mode error", spanCtx, w, err) + } + } +} + +// Goal: read the file if it exist. If not, return empty. +// Important to wait because if we don't wait and return empty array, it will generates a JSON unmarshall error in InterLink VK. +// Fail for any error not related to file not existing (eg: permission error will raise an error). +// Already handle error. +func (h *SidecarHandler) ReadLogs(logsPath string, span trace.Span, ctx context.Context, w http.ResponseWriter, sessionContextMessage string) ([]byte, error) { + var output []byte + var err error + log.G(h.Ctx).Info(sessionContextMessage, "reading file ", logsPath) + output, err = os.ReadFile(logsPath) + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + log.G(h.Ctx).Info(sessionContextMessage, "file ", logsPath, " not found.") + output = make([]byte, 0, 0) + } else { + span.AddEvent("Error retrieving logs") + h.logErrorVerbose(sessionContextMessage+"error during ReadFile() of readLogs() in GetLogsHandler of file "+logsPath, ctx, w, err) + return nil, err + } } + return output, nil } diff --git a/pkg/slurm/func.go b/pkg/slurm/func.go index 34097c6..3723aeb 100644 --- a/pkg/slurm/func.go +++ b/pkg/slurm/func.go @@ -103,3 +103,21 @@ func (h *SidecarHandler) handleError(ctx context.Context, w http.ResponseWriter, w.Write([]byte("Some errors occurred while creating container. Check Slurm Sidecar's logs")) log.G(h.Ctx).Error(err) } + +func (h *SidecarHandler) logErrorVerbose(context string, ctx context.Context, w http.ResponseWriter, err error) { + errWithContext := fmt.Errorf("error context: %s type: %s %w", context, fmt.Sprintf("%#v", err), err) + log.G(h.Ctx).Error(errWithContext) + h.handleError(ctx, w, http.StatusInternalServerError, errWithContext) +} + +func GetSessionContext(r *http.Request) string { + sessionContext := r.Header.Get("InterLink-Http-Session") + if sessionContext == "" { + sessionContext = "NoSessionFound#0" + } + return sessionContext +} + +func GetSessionContextMessage(sessionContext string) string { + return "HTTP InterLink session " + sessionContext + ": " +} diff --git a/pkg/slurm/prepare.go b/pkg/slurm/prepare.go index 2c6515e..dbde4c6 100644 --- a/pkg/slurm/prepare.go +++ b/pkg/slurm/prepare.go @@ -186,50 +186,50 @@ func (h *SidecarHandler) LoadJIDs() error { } func createEnvFile(Ctx context.Context, config SlurmConfig, podData commonIL.RetrievedPodData, container v1.Container) ([]string, []string, error) { - envs := []string{} - // For debugging purpose only - envs_data := []string{} - - envfilePath := (config.DataRootFolder + podData.Pod.Namespace + "-" + string(podData.Pod.UID) + "/" + "envfile.properties") - log.G(Ctx).Info("-- Appending envs using envfile " + envfilePath) - envs = append(envs, "--env-file") - envs = append(envs, envfilePath) - - envfile, err := os.Create(envfilePath) - if err != nil { - log.G(Ctx).Error(err) - return nil, nil, err - } - defer envfile.Close() - - for _, envVar := range container.Env { - // The environment variable values can contains all sort of simple/double quote and space and any arbitrary values. - // singularity reads the env-file and parse it like a bash string, so shellescape will escape any quote properly. - tmpValue := shellescape.Quote(envVar.Value) - tmp := (envVar.Name + "=" + tmpValue) - - envs_data = append(envs_data, tmp) - - _, err := envfile.WriteString(tmp + "\n") - if err != nil { - log.G(Ctx).Error(err) - return nil, nil, err - } else { - log.G(Ctx).Debug("---- Written envfile file " + envfilePath + " key " + envVar.Name + " value " + tmpValue) - } - } - - // All env variables are written, we flush it now. - err = envfile.Sync() + envs := []string{} + // For debugging purpose only + envs_data := []string{} + + envfilePath := (config.DataRootFolder + podData.Pod.Namespace + "-" + string(podData.Pod.UID) + "/" + "envfile.properties") + log.G(Ctx).Info("-- Appending envs using envfile " + envfilePath) + envs = append(envs, "--env-file") + envs = append(envs, envfilePath) + + envfile, err := os.Create(envfilePath) + if err != nil { + log.G(Ctx).Error(err) + return nil, nil, err + } + defer envfile.Close() + + for _, envVar := range container.Env { + // The environment variable values can contains all sort of simple/double quote and space and any arbitrary values. + // singularity reads the env-file and parse it like a bash string, so shellescape will escape any quote properly. + tmpValue := shellescape.Quote(envVar.Value) + tmp := (envVar.Name + "=" + tmpValue) + + envs_data = append(envs_data, tmp) + + _, err := envfile.WriteString(tmp + "\n") if err != nil { log.G(Ctx).Error(err) return nil, nil, err + } else { + log.G(Ctx).Debug("---- Written envfile file " + envfilePath + " key " + envVar.Name + " value " + tmpValue) } - - // Calling Close() in case of error. If not error, the defer will close it again but it should be idempotent. - envfile.Close() - - return envs, envs_data, nil + } + + // All env variables are written, we flush it now. + err = envfile.Sync() + if err != nil { + log.G(Ctx).Error(err) + return nil, nil, err + } + + // Calling Close() in case of error. If not error, the defer will close it again but it should be idempotent. + envfile.Close() + + return envs, envs_data, nil } // prepareEnvs reads all Environment variables from a container and append them to a envfile.properties. The values are bash-escaped. @@ -509,8 +509,8 @@ func produceSLURMScript( stringToBeWritten.WriteString(singularityCommand.containerName) stringToBeWritten.WriteString(".out; ") stringToBeWritten.WriteString("echo $? > " + path + "/" + singularityCommand.containerName + ".status") - - if ! singularityCommand.isInitContainer { + + if !singularityCommand.isInitContainer { // Not init containers are run in parallel. stringToBeWritten.WriteString("; sleep 30 &") } From d41c9e885a3454ba37dc68c3258baa5f4e299645 Mon Sep 17 00:00:00 2001 From: antoinetran Date: Mon, 2 Dec 2024 15:34:19 +0100 Subject: [PATCH 2/6] Fix #37 getStatus based on container.status is not reliable --- pkg/slurm/Status.go | 38 ++++++++++++++++++++++++++------------ pkg/slurm/prepare.go | 32 ++++++++++++++++++++++++++++---- 2 files changed, 54 insertions(+), 16 deletions(-) diff --git a/pkg/slurm/Status.go b/pkg/slurm/Status.go index 5f3e073..2e9ea42 100644 --- a/pkg/slurm/Status.go +++ b/pkg/slurm/Status.go @@ -34,6 +34,10 @@ func (h *SidecarHandler) StatusHandler(w http.ResponseWriter, r *http.Request) { defer span.End() defer commonIL.SetDurationSpan(start, span) + // For debugging purpose, when we have many kubectl logs, we can differentiate each one. + sessionContext := GetSessionContext(r) + sessionContextMessage := GetSessionContextMessage(sessionContext) + var req []*v1.Pod var resp []commonIL.PodStatus statusCode := http.StatusOK @@ -76,11 +80,14 @@ func (h *SidecarHandler) StatusHandler(w http.ResponseWriter, r *http.Request) { path := h.Config.DataRootFolder + pod.Namespace + "-" + string(pod.UID) if checkIfJidExists(spanCtx, (h.JIDs), uid) { - cmd := []string{"--noheader", "-a", "-j " + (*h.JIDs)[uid].JID} + // Eg of output: "R 0" + // With test, exit_code is better than DerivedEC, because for canceled jobs, it gives 15 while DerivedEC gives 0. + cmd := []string{"--noheader", "-a", "-j ", (*h.JIDs)[uid].JID, "-O", "StateCompact,exit_code"} shell := exec.ExecTask{ Command: h.Config.Squeuepath, Args: cmd, - Shell: true, + // true to be able to add prefix to squeue, but this is ugly + Shell: true, } execReturn, _ := shell.Execute() timeNow = time.Now() @@ -133,13 +140,20 @@ func (h *SidecarHandler) StatusHandler(w http.ResponseWriter, r *http.Request) { resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodUID: string(pod.UID), PodNamespace: pod.Namespace, Containers: containerStatuses}) } else { - pattern := `(CD|CG|F|PD|PR|R|S|ST)` - re := regexp.MustCompile(pattern) - match := re.FindString(execReturn.Stdout) + statePattern := `(CD|CG|F|PD|PR|R|S|ST)` + stateRe := regexp.MustCompile(statePattern) + stateMatch := stateRe.FindString(execReturn.Stdout) + + // Magic REGEX that matches any number from 0 to 255 included. Eg: match 2, 255, does not match 256, 02, -1. + // If the job is not in terminal state, the exit code has no meaning, however squeue returns 0 for exit code in this case. Just ignore the value. + exitCodePattern := ` ([0-9]|[1-9][0-9]|1[0-9][0-9]|2[0-4][0-9]|25[0-5])$` + exitCodeRe := regexp.MustCompile(exitCodePattern) + exitCodeMatch := exitCodeRe.FindString(execReturn.Stdout) - log.G(h.Ctx).Info("JID: " + (*h.JIDs)[uid].JID + " | Status: " + match + " | Pod: " + pod.Name + " | UID: " + string(pod.UID)) + //log.G(h.Ctx).Info("JID: " + (*h.JIDs)[uid].JID + " | Status: " + stateMatch + " | Pod: " + pod.Name + " | UID: " + string(pod.UID)) + log.G(h.Ctx).Infof("JID: %s | Status: %s | Job exit code (if applicable): %s | Pod: %s | UID: %s", (*h.JIDs)[uid].JID, exitCodeMatch, stateMatch, pod.Name, string(pod.UID)) - switch match { + switch stateMatch { case "CD": if (*h.JIDs)[uid].EndTime.IsZero() { (*h.JIDs)[uid].EndTime = timeNow @@ -152,7 +166,7 @@ func (h *SidecarHandler) StatusHandler(w http.ResponseWriter, r *http.Request) { f.WriteString((*h.JIDs)[uid].EndTime.Format("2006-01-02 15:04:05.999999999 -0700 MST")) } for _, ct := range pod.Spec.Containers { - exitCode, err := getExitCode(h.Ctx, path, ct.Name) + exitCode, err := getExitCode(h.Ctx, path, ct.Name, exitCodeMatch, sessionContextMessage) if err != nil { log.G(h.Ctx).Error(err) continue @@ -189,7 +203,7 @@ func (h *SidecarHandler) StatusHandler(w http.ResponseWriter, r *http.Request) { f.WriteString((*h.JIDs)[uid].EndTime.Format("2006-01-02 15:04:05.999999999 -0700 MST")) } for _, ct := range pod.Spec.Containers { - exitCode, err := getExitCode(h.Ctx, path, ct.Name) + exitCode, err := getExitCode(h.Ctx, path, ct.Name, exitCodeMatch, sessionContextMessage) if err != nil { log.G(h.Ctx).Error(err) continue @@ -216,7 +230,7 @@ func (h *SidecarHandler) StatusHandler(w http.ResponseWriter, r *http.Request) { f.WriteString((*h.JIDs)[uid].EndTime.Format("2006-01-02 15:04:05.999999999 -0700 MST")) } for _, ct := range pod.Spec.Containers { - exitCode, err := getExitCode(h.Ctx, path, ct.Name) + exitCode, err := getExitCode(h.Ctx, path, ct.Name, exitCodeMatch, sessionContextMessage) if err != nil { log.G(h.Ctx).Error(err) continue @@ -259,7 +273,7 @@ func (h *SidecarHandler) StatusHandler(w http.ResponseWriter, r *http.Request) { f.WriteString((*h.JIDs)[uid].EndTime.Format("2006-01-02 15:04:05.999999999 -0700 MST")) } for _, ct := range pod.Spec.Containers { - exitCode, err := getExitCode(h.Ctx, path, ct.Name) + exitCode, err := getExitCode(h.Ctx, path, ct.Name, exitCodeMatch, sessionContextMessage) if err != nil { log.G(h.Ctx).Error(err) continue @@ -280,7 +294,7 @@ func (h *SidecarHandler) StatusHandler(w http.ResponseWriter, r *http.Request) { f.WriteString((*h.JIDs)[uid].EndTime.Format("2006-01-02 15:04:05.999999999 -0700 MST")) } for _, ct := range pod.Spec.Containers { - exitCode, err := getExitCode(h.Ctx, path, ct.Name) + exitCode, err := getExitCode(h.Ctx, path, ct.Name, exitCodeMatch, sessionContextMessage) if err != nil { log.G(h.Ctx).Error(err) continue diff --git a/pkg/slurm/prepare.go b/pkg/slurm/prepare.go index dbde4c6..24dd644 100644 --- a/pkg/slurm/prepare.go +++ b/pkg/slurm/prepare.go @@ -7,6 +7,7 @@ import ( "encoding/hex" "errors" "fmt" + "io/fs" "os" "os/exec" "path/filepath" @@ -920,11 +921,34 @@ func checkIfJidExists(ctx context.Context, JIDs *map[string]*JidStruct, uid stri } // getExitCode returns the exit code read from the .status file of a specific container and returns it as an int32 number -func getExitCode(ctx context.Context, path string, ctName string) (int32, error) { - exitCode, err := os.ReadFile(path + "/" + ctName + ".status") +func getExitCode(ctx context.Context, path string, ctName string, exitCodeMatch string, sessionContextMessage string) (int32, error) { + statusFilePath := path + "/" + ctName + ".status" + exitCode, err := os.ReadFile(statusFilePath) if err != nil { - log.G(ctx).Error(err) - return 0, err + if errors.Is(err, fs.ErrNotExist) { + // Case job terminated before the container script has the time to write status file (eg: canceled jobs). + log.G(ctx).Warning(sessionContextMessage, "file ", statusFilePath, " not found despite the job being in terminal state. Workaround: using Slurm job exit code:", exitCodeMatch) + + exitCodeInt, errAtoi := strconv.Atoi(exitCodeMatch) + if errAtoi != nil { + errWithContext := fmt.Errorf(sessionContextMessage+"error during Atoi() of getExitCode() of file %s exitCodeMatch: %s error: %s %w", statusFilePath, exitCodeMatch, fmt.Sprintf("%#v", errAtoi), errAtoi) + log.G(ctx).Error(errWithContext) + return 11, errWithContext + } + + errWriteFile := os.WriteFile(statusFilePath, []byte(exitCodeMatch), 0644) + if errWriteFile != nil { + errWithContext := fmt.Errorf(sessionContextMessage+"error during WriteFile() of getExitCode() of file %s error: %s %w", statusFilePath, fmt.Sprintf("%#v", errWriteFile), errWriteFile) + log.G(ctx).Error(errWithContext) + return 12, errWithContext + } + + return int32(exitCodeInt), nil + } else { + errWithContext := fmt.Errorf(sessionContextMessage+"error during ReadFile() of getExitCode() of file %s error: %s %w", statusFilePath, fmt.Sprintf("%#v", err), err) + log.G(ctx).Error(errWithContext) + return 21, errWithContext + } } exitCodeInt, err := strconv.Atoi(strings.Replace(string(exitCode), "\n", "", -1)) if err != nil { From 12ee99c00b4744d5a5e99f9e67cc697a85048e25 Mon Sep 17 00:00:00 2001 From: antoinetran Date: Mon, 2 Dec 2024 15:36:07 +0100 Subject: [PATCH 3/6] Fix #38 squeue is missing requesting ALL states --- pkg/slurm/Status.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/slurm/Status.go b/pkg/slurm/Status.go index 2e9ea42..0a11091 100644 --- a/pkg/slurm/Status.go +++ b/pkg/slurm/Status.go @@ -82,7 +82,8 @@ func (h *SidecarHandler) StatusHandler(w http.ResponseWriter, r *http.Request) { if checkIfJidExists(spanCtx, (h.JIDs), uid) { // Eg of output: "R 0" // With test, exit_code is better than DerivedEC, because for canceled jobs, it gives 15 while DerivedEC gives 0. - cmd := []string{"--noheader", "-a", "-j ", (*h.JIDs)[uid].JID, "-O", "StateCompact,exit_code"} + // states=all or else some jobs are hidden, then it is impossible to get job exit code. + cmd := []string{"--noheader", "-a", "--states=all", "-O", "StateCompact,exit_code", "-j ", (*h.JIDs)[uid].JID} shell := exec.ExecTask{ Command: h.Config.Squeuepath, Args: cmd, From 6cc6b0385f782825ed769f3a3988cc4bd7375e58 Mon Sep 17 00:00:00 2001 From: antoinetran Date: Mon, 2 Dec 2024 17:17:12 +0100 Subject: [PATCH 4/6] Fix #35 'kubectl logs -f' improved dead container tracking --- .project | 11 +++++++++++ pkg/slurm/Delete.go | 6 +++++- pkg/slurm/GetLogs.go | 21 +++++++++++++-------- pkg/slurm/Status.go | 29 +++++++++++++++++------------ pkg/slurm/prepare.go | 38 ++++++++++++++++++++++++-------------- 5 files changed, 70 insertions(+), 35 deletions(-) create mode 100644 .project diff --git a/.project b/.project new file mode 100644 index 0000000..5a82b18 --- /dev/null +++ b/.project @@ -0,0 +1,11 @@ + + + interlink-slurm-plugin + + + + + + + + diff --git a/pkg/slurm/Delete.go b/pkg/slurm/Delete.go index 4be046c..8c4e5ed 100644 --- a/pkg/slurm/Delete.go +++ b/pkg/slurm/Delete.go @@ -26,7 +26,11 @@ func (h *SidecarHandler) StopHandler(w http.ResponseWriter, r *http.Request) { defer span.End() defer commonIL.SetDurationSpan(start, span) - log.G(h.Ctx).Info("Slurm Sidecar: received Stop call") + // For debugging purpose, when we have many kubectl logs, we can differentiate each one. + sessionContext := GetSessionContext(r) + sessionContextMessage := GetSessionContextMessage(sessionContext) + + log.G(h.Ctx).Info(sessionContextMessage, "Slurm Sidecar: received Stop call") statusCode := http.StatusOK bodyBytes, err := io.ReadAll(r.Body) diff --git a/pkg/slurm/GetLogs.go b/pkg/slurm/GetLogs.go index bd81b07..5595ad0 100644 --- a/pkg/slurm/GetLogs.go +++ b/pkg/slurm/GetLogs.go @@ -24,7 +24,7 @@ import ( ) // Logs in follow mode (get logs until the death of the container) with "kubectl -f". -func (h *SidecarHandler) GetLogsFollowMode(w http.ResponseWriter, r *http.Request, path string, req commonIL.LogStruct, containerOutputPath string, containerOutput []byte, sessionContext string) error { +func (h *SidecarHandler) GetLogsFollowMode(spanCtx context.Context, podUid string, w http.ResponseWriter, r *http.Request, path string, req commonIL.LogStruct, containerOutputPath string, containerOutput []byte, sessionContext string) error { // Follow until this file exist, that indicates the end of container, thus the end of following. containerStatusPath := path + "/" + req.ContainerName + ".status" // Get the offset of what we read. @@ -50,7 +50,7 @@ func (h *SidecarHandler) GetLogsFollowMode(w http.ResponseWriter, r *http.Reques } else { log.G(h.Ctx).Error(sessionContextMessage, "wrote file not found but could not flush because server does not support Flusher.") } - time.Sleep(5 * time.Second) + time.Sleep(4 * time.Second) continue } else { // Case unknown error. @@ -108,11 +108,16 @@ func (h *SidecarHandler) GetLogsFollowMode(w http.ResponseWriter, r *http.Reques // Nothing more to read, but in follow mode, is the container still alive? if isContainerDead { // Container already marked as dead, and we tried to get logs one last time. Exiting the loop. - log.G(h.Ctx).Info(sessionContextMessage, "Container is found dead and no more logs are found at this step, exiting following mode...") + log.G(h.Ctx).Info(sessionContextMessage, "Container was found dead and no more logs are found at this step, exiting following mode...") break } - // Checking if container is dead (meaning the status file exist). - if _, err := os.Stat(containerStatusPath); errors.Is(err, os.ErrNotExist) { + // Checking if container is dead (meaning the job ID is not in context anymore, OR if the status file exist). + if !checkIfJidExists(spanCtx, (h.JIDs), podUid) { + // The JID disappeared, so the container is dead, probably from a POD delete request. Trying to get the latest log one last time. + // Because the moment we found this, there might be some more logs to read. + isContainerDead = true + log.G(h.Ctx).Info(sessionContextMessage, "Container is found dead thanks to missing JID, reading last logs...") + } else if _, err := os.Stat(containerStatusPath); errors.Is(err, os.ErrNotExist) { // The status file of the container does not exist, so the container is still alive. Continuing to follow logs. // Sleep because otherwise it can be a stress to file system to always read it when it has nothing. log.G(h.Ctx).Debug(sessionContextMessage, "EOF of container logs, sleeping 4s before retrying...") @@ -121,7 +126,7 @@ func (h *SidecarHandler) GetLogsFollowMode(w http.ResponseWriter, r *http.Reques // The status file exist, so the container is dead. Trying to get the latest log one last time. // Because the moment we found the status file, there might be some more logs to read. isContainerDead = true - log.G(h.Ctx).Info(sessionContextMessage, "Container is found dead, reading last logs...") + log.G(h.Ctx).Info(sessionContextMessage, "Container is found dead thanks to status file, reading last logs...") } continue } else { @@ -277,7 +282,7 @@ func (h *SidecarHandler) GetLogsHandler(w http.ResponseWriter, r *http.Request) } if req.Opts.Follow { - err := h.GetLogsFollowMode(w, r, path, req, containerOutputPath, containerOutput, sessionContext) + err := h.GetLogsFollowMode(spanCtx, req.PodUID, w, r, path, req, containerOutputPath, containerOutput, sessionContext) if err != nil { h.logErrorVerbose(sessionContextMessage+"follow mode error", spanCtx, w, err) } @@ -296,7 +301,7 @@ func (h *SidecarHandler) ReadLogs(logsPath string, span trace.Span, ctx context. if err != nil { if errors.Is(err, fs.ErrNotExist) { log.G(h.Ctx).Info(sessionContextMessage, "file ", logsPath, " not found.") - output = make([]byte, 0, 0) + output = make([]byte, 0) } else { span.AddEvent("Error retrieving logs") h.logErrorVerbose(sessionContextMessage+"error during ReadFile() of readLogs() in GetLogsHandler of file "+logsPath, ctx, w, err) diff --git a/pkg/slurm/Status.go b/pkg/slurm/Status.go index 0a11091..57ef2dc 100644 --- a/pkg/slurm/Status.go +++ b/pkg/slurm/Status.go @@ -70,7 +70,7 @@ func (h *SidecarHandler) StatusHandler(w http.ResponseWriter, r *http.Request) { if execReturn.Stderr != "" { statusCode = http.StatusInternalServerError - h.handleError(spanCtx, w, statusCode, errors.New("unable to retrieve job status: "+execReturn.Stderr)) + h.handleError(spanCtx, w, statusCode, errors.New(sessionContextMessage+"unable to retrieve job status: "+execReturn.Stderr)) return } @@ -83,7 +83,7 @@ func (h *SidecarHandler) StatusHandler(w http.ResponseWriter, r *http.Request) { // Eg of output: "R 0" // With test, exit_code is better than DerivedEC, because for canceled jobs, it gives 15 while DerivedEC gives 0. // states=all or else some jobs are hidden, then it is impossible to get job exit code. - cmd := []string{"--noheader", "-a", "--states=all", "-O", "StateCompact,exit_code", "-j ", (*h.JIDs)[uid].JID} + cmd := []string{"--noheader", "-a", "--states=all", "-O", "exit_code,StateCompact", "-j ", (*h.JIDs)[uid].JID} shell := exec.ExecTask{ Command: h.Config.Squeuepath, Args: cmd, @@ -97,13 +97,13 @@ func (h *SidecarHandler) StatusHandler(w http.ResponseWriter, r *http.Request) { if execReturn.Stderr != "" { span.AddEvent("squeue returned error " + execReturn.Stderr + " for Job " + (*h.JIDs)[uid].JID + ".\nGetting status from files") - log.G(h.Ctx).Error("ERR: ", execReturn.Stderr) + log.G(h.Ctx).Error(sessionContextMessage, "ERR: ", execReturn.Stderr) for _, ct := range pod.Spec.Containers { - log.G(h.Ctx).Info("Getting exit status from " + path + "/" + ct.Name + ".status") + log.G(h.Ctx).Info(sessionContextMessage, "getting exit status from "+path+"/"+ct.Name+".status") file, err := os.Open(path + "/" + ct.Name + ".status") if err != nil { statusCode = http.StatusInternalServerError - h.handleError(spanCtx, w, statusCode, fmt.Errorf("unable to retrieve container status: %s", err)) + h.handleError(spanCtx, w, statusCode, fmt.Errorf(sessionContextMessage+"unable to retrieve container status: %s", err)) log.G(h.Ctx).Error() return } @@ -111,7 +111,7 @@ func (h *SidecarHandler) StatusHandler(w http.ResponseWriter, r *http.Request) { statusb, err := io.ReadAll(file) if err != nil { statusCode = http.StatusInternalServerError - h.handleError(spanCtx, w, statusCode, fmt.Errorf("unable to read container status: %s", err)) + h.handleError(spanCtx, w, statusCode, fmt.Errorf(sessionContextMessage+"unable to read container status: %s", err)) log.G(h.Ctx).Error() return } @@ -119,7 +119,7 @@ func (h *SidecarHandler) StatusHandler(w http.ResponseWriter, r *http.Request) { status, err := strconv.Atoi(strings.Replace(string(statusb), "\n", "", -1)) if err != nil { statusCode = http.StatusInternalServerError - h.handleError(spanCtx, w, statusCode, fmt.Errorf("unable to convert container status: %s", err)) + h.handleError(spanCtx, w, statusCode, fmt.Errorf(sessionContextMessage+"unable to convert container status: %s", err)) log.G(h.Ctx).Error() status = 500 } @@ -145,14 +145,19 @@ func (h *SidecarHandler) StatusHandler(w http.ResponseWriter, r *http.Request) { stateRe := regexp.MustCompile(statePattern) stateMatch := stateRe.FindString(execReturn.Stdout) - // Magic REGEX that matches any number from 0 to 255 included. Eg: match 2, 255, does not match 256, 02, -1. // If the job is not in terminal state, the exit code has no meaning, however squeue returns 0 for exit code in this case. Just ignore the value. - exitCodePattern := ` ([0-9]|[1-9][0-9]|1[0-9][0-9]|2[0-4][0-9]|25[0-5])$` + // Magic REGEX that matches any number from 0 to 255 included. Eg: match 2, 255, does not match 256, 02, -1. + // Adds whitespace because otherwise it will take too few letter. Eg: for "123", it will take only "1". With \s, it will take "123 ". + // Then we only keep the number part, not the last space. + exitCodePattern := `([0-9]|[1-9][0-9]|1[0-9][0-9]|2[0-4][0-9]|25[0-5])\s` exitCodeRe := regexp.MustCompile(exitCodePattern) - exitCodeMatch := exitCodeRe.FindString(execReturn.Stdout) + // Eg: exitCodeMatchSlice = "123 " + exitCodeMatchSlice := exitCodeRe.FindStringSubmatch(execReturn.Stdout) + // Only keep the number part. Eg: exitCodeMatch = "123" + exitCodeMatch := exitCodeMatchSlice[1] //log.G(h.Ctx).Info("JID: " + (*h.JIDs)[uid].JID + " | Status: " + stateMatch + " | Pod: " + pod.Name + " | UID: " + string(pod.UID)) - log.G(h.Ctx).Infof("JID: %s | Status: %s | Job exit code (if applicable): %s | Pod: %s | UID: %s", (*h.JIDs)[uid].JID, exitCodeMatch, stateMatch, pod.Name, string(pod.UID)) + log.G(h.Ctx).Infof("%sJID: %s | Status: %s | Job exit code (if applicable): %s | Pod: %s | UID: %s", sessionContextMessage, (*h.JIDs)[uid].JID, stateMatch, exitCodeMatch, pod.Name, string(pod.UID)) switch stateMatch { case "CD": @@ -318,7 +323,7 @@ func (h *SidecarHandler) StatusHandler(w http.ResponseWriter, r *http.Request) { cachedStatus = resp timer = time.Now() } else { - log.G(h.Ctx).Debug("Cached status") + log.G(h.Ctx).Debug(sessionContextMessage, "Cached status") resp = cachedStatus } diff --git a/pkg/slurm/prepare.go b/pkg/slurm/prepare.go index 24dd644..cb34311 100644 --- a/pkg/slurm/prepare.go +++ b/pkg/slurm/prepare.go @@ -401,21 +401,19 @@ func produceSLURMScript( f, err := os.Create(path + "/job.sh") if err != nil { - log.G(Ctx).Error(err) - return "", err - } - err = os.Chmod(path+"/job.sh", 0774) - if err != nil { + log.G(Ctx).Error("Unable to create file ", path, "/job.sh") log.G(Ctx).Error(err) return "", err } defer f.Close() + err = os.Chmod(path+"/job.sh", 0774) if err != nil { - log.G(Ctx).Error("Unable to create file " + path + "/job.sh") + log.G(Ctx).Error("Unable to chmod file ", path, "/job.sh") + log.G(Ctx).Error(err) return "", err } else { - log.G(Ctx).Debug("--- Created file " + path + "/job.sh") + log.G(Ctx).Debug("--- Created with correct permission file ", path, "/job.sh") } var sbatchFlagsFromArgo []string @@ -640,23 +638,35 @@ func deleteContainer(Ctx context.Context, config SlurmConfig, podUID string, JID log.G(Ctx).Info("- Deleted Job ", (*JIDs)[podUID].JID) } } - err := os.RemoveAll(path) jid := (*JIDs)[podUID].JID removeJID(podUID, JIDs) + errFirstAttempt := os.RemoveAll(path) span.SetAttributes( attribute.String("delete.pod.uid", podUID), attribute.String("delete.jid", jid), ) - if err != nil { - log.G(Ctx).Error(err) - span.AddEvent("Failed to delete SLURM Job " + (*JIDs)[podUID].JID + " for Pod " + podUID) - } else { - span.AddEvent("SLURM Job " + jid + " for Pod " + podUID + " successfully deleted") + if errFirstAttempt != nil { + log.G(Ctx).Debug("Attempt 1 of deletion failed, not really an error! Probably log file still opened, waiting for close... Error: ", errFirstAttempt) + // We expect first rm of directory to possibly fail, in case for eg logs are in follow mode, so opened. The removeJID will end the follow loop, + // maximum after the loop period of 4s. So we ignore the error and attempt a second time after being sure the loop has ended. + time.Sleep(5 * time.Second) + + errSecondAttempt := os.RemoveAll(path) + if errSecondAttempt != nil { + log.G(Ctx).Error("Attempt 2 of deletion failed: ", errSecondAttempt) + span.AddEvent("Failed to delete SLURM Job " + jid + " for Pod " + podUID) + return errSecondAttempt + } else { + log.G(Ctx).Info("Attempt 2 of deletion succeeded!") + } } + span.AddEvent("SLURM Job " + jid + " for Pod " + podUID + " successfully deleted") - return err + // We ignore the deletion error because it is already logged, and because InterLink can still be opening files (eg logs in follow mode). + // Once InterLink will not use files, all files will be deleted then. + return nil } // mountData is called by prepareMounts and creates files and directory according to their definition in the pod structure. From 908c15e2a6de273a395ba446511a7cdd4aac3702 Mon Sep 17 00:00:00 2001 From: antoinetran Date: Thu, 12 Dec 2024 15:48:48 +0100 Subject: [PATCH 5/6] remove + ignore eclipse files --- .gitignore | 5 ++++- .project | 11 ----------- 2 files changed, 4 insertions(+), 12 deletions(-) delete mode 100644 .project diff --git a/.gitignore b/.gitignore index e04c62b..2b1e4bd 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,5 @@ envs.sh -bin \ No newline at end of file +bin +# Eclipse IDE +.project +.settings \ No newline at end of file diff --git a/.project b/.project deleted file mode 100644 index 5a82b18..0000000 --- a/.project +++ /dev/null @@ -1,11 +0,0 @@ - - - interlink-slurm-plugin - - - - - - - - From 58803784e41b95fc9988bf3ff3f15f879cdad630 Mon Sep 17 00:00:00 2001 From: antoinetran Date: Thu, 12 Dec 2024 17:27:36 +0100 Subject: [PATCH 6/6] formatted GetLogsFollowMode args --- pkg/slurm/GetLogs.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/pkg/slurm/GetLogs.go b/pkg/slurm/GetLogs.go index 5595ad0..390a1e8 100644 --- a/pkg/slurm/GetLogs.go +++ b/pkg/slurm/GetLogs.go @@ -24,7 +24,17 @@ import ( ) // Logs in follow mode (get logs until the death of the container) with "kubectl -f". -func (h *SidecarHandler) GetLogsFollowMode(spanCtx context.Context, podUid string, w http.ResponseWriter, r *http.Request, path string, req commonIL.LogStruct, containerOutputPath string, containerOutput []byte, sessionContext string) error { +func (h *SidecarHandler) GetLogsFollowMode( + spanCtx context.Context, + podUid string, + w http.ResponseWriter, + r *http.Request, + path string, + req commonIL.LogStruct, + containerOutputPath string, + containerOutput []byte, + sessionContext string, +) error { // Follow until this file exist, that indicates the end of container, thus the end of following. containerStatusPath := path + "/" + req.ContainerName + ".status" // Get the offset of what we read.