From bafea3bf14c3febd396f817f0fe81121c767b321 Mon Sep 17 00:00:00 2001 From: antoinetran Date: Wed, 13 Nov 2024 18:39:47 +0100 Subject: [PATCH 1/5] Fix #35 'kubectl logs -f' implemented + improved debug with sessionContext --- .github/workflows/build_images.yaml | 4 +- pkg/interlink/api/create.go | 3 +- pkg/interlink/api/handler.go | 144 +++++++++++++++++++++++---- pkg/interlink/api/logs.go | 31 ++++-- pkg/interlink/api/status.go | 10 +- pkg/virtualkubelet/execute.go | 69 +++++++++---- pkg/virtualkubelet/virtualkubelet.go | 19 +++- 7 files changed, 223 insertions(+), 57 deletions(-) diff --git a/.github/workflows/build_images.yaml b/.github/workflows/build_images.yaml index 6e475f76..a02bdd9f 100644 --- a/.github/workflows/build_images.yaml +++ b/.github/workflows/build_images.yaml @@ -25,7 +25,7 @@ jobs: with: registry: ghcr.io username: ${{ github.repository_owner }} - password: ${{ secrets.GH_CR_TOKEN }} + password: ${{ secrets.GITHUB_TOKEN }} - name: Get Repo Owner id: get_repo_owner run: echo ::set-output name=repo_owner::$(echo ${{ github.repository_owner }} | tr '[:upper:]' '[:lower:]') @@ -70,7 +70,7 @@ jobs: with: registry: ghcr.io username: ${{ github.repository_owner }} - password: ${{ secrets.GH_CR_TOKEN }} + password: ${{ secrets.GITHUB_TOKEN }} - name: Get Repo Owner id: get_repo_owner run: echo ::set-output name=repo_owner::$(echo ${{ github.repository_owner }} | tr '[:upper:]' '[:lower:]') diff --git a/pkg/interlink/api/create.go b/pkg/interlink/api/create.go index afe3e925..b8ec2e68 100644 --- a/pkg/interlink/api/create.go +++ b/pkg/interlink/api/create.go @@ -91,7 +91,8 @@ func (h *InterLinkHandler) CreateHandler(w http.ResponseWriter, r *http.Request) log.G(h.Ctx).Info("InterLink: forwarding Create call to sidecar") - _, err := ReqWithError(h.Ctx, req, w, start, span, true) + sessionContext := GetSessionContext(r) + _, err := ReqWithError(h.Ctx, req, w, start, span, true, false, sessionContext, http.DefaultClient) if err != nil { log.L.Error(err) return diff --git a/pkg/interlink/api/handler.go b/pkg/interlink/api/handler.go index 308ce4ce..a4acba8b 100644 --- a/pkg/interlink/api/handler.go +++ b/pkg/interlink/api/handler.go @@ -1,10 +1,12 @@ package api import ( + "bufio" "context" "fmt" "io" "net/http" + "strconv" "github.com/containerd/containerd/log" @@ -21,6 +23,22 @@ type InterLinkHandler struct { // TODO: http client with TLS } +func AddSessionContext(req *http.Request, sessionContext string) { + req.Header.Set("InterLink-Http-Session", sessionContext) +} + +func GetSessionContext(r *http.Request) string { + sessionContext := r.Header.Get("InterLink-Http-Session") + if sessionContext == "" { + sessionContext = "NoSessionFound#0" + } + return sessionContext +} + +func GetSessionContextMessage(sessionContext string) string { + return "HTTP InterLink session " + sessionContext + ": " +} + func DoReq(req *http.Request) (*http.Response, error) { resp, err := http.DefaultClient.Do(req) if err != nil { @@ -30,6 +48,8 @@ func DoReq(req *http.Request) (*http.Response, error) { return resp, nil } +// respondWithReturn: if false, return nil. Useful when body is too big to be contained in one big string. +// sessionNumber: integer number for debugging purpose, generated from InterLink VK, to follow HTTP request from end-to-end. func ReqWithError( ctx context.Context, req *http.Request, @@ -37,50 +57,132 @@ func ReqWithError( start int64, span trace.Span, respondWithValues bool, + respondWithReturn bool, + sessionContext string, + logHttpClient *http.Client, ) ([]byte, error) { - req.Header.Set("Content-Type", "application/json") - resp, err := DoReq(req) + + sessionContextMessage := GetSessionContextMessage(sessionContext) + log.G(ctx).Debug(sessionContextMessage, "doing request: ", fmt.Sprintf("%#v", req)) + + // Add session number for end-to-end from API to InterLink plugin (eg interlink-slurm-plugin) + AddSessionContext(req, sessionContext) + + log.G(ctx).Debug(sessionContextMessage, "before DoReq()") + resp, err := logHttpClient.Do(req) if err != nil { statusCode := http.StatusInternalServerError w.WriteHeader(statusCode) - log.G(ctx).Error(err) - return nil, err + errWithContext := fmt.Errorf(sessionContextMessage+"error doing DoReq() of ReqWithErrorWithSessionNumber error %w", err) + return nil, errWithContext } defer resp.Body.Close() + log.G(ctx).Debug(sessionContextMessage, "after DoReq()") + + log.G(ctx).Debug(sessionContextMessage, "after Do(), writing header and status code: ", resp.StatusCode) + w.WriteHeader(resp.StatusCode) + // Flush headers ASAP so that the client is not blocked in request. + if f, ok := w.(http.Flusher); ok { + log.G(ctx).Debug(sessionContextMessage, "now flushing...") + f.Flush() + } else { + log.G(ctx).Error(sessionContextMessage, "could not flush because server does not support Flusher.") + } if resp.StatusCode != http.StatusOK { + log.G(ctx).Error(sessionContextMessage, "HTTP request in error.") statusCode := http.StatusInternalServerError w.WriteHeader(statusCode) ret, err := io.ReadAll(resp.Body) if err != nil { - return nil, err + return nil, fmt.Errorf(sessionContextMessage+"HTTP request in error and could not read body response error: %w", err) } - _, err = w.Write(ret) + errHttp := fmt.Errorf(sessionContextMessage+"call exit status: %d. Body: %s", statusCode, ret) + log.G(ctx).Error(errHttp) + _, err = w.Write([]byte(errHttp.Error())) if err != nil { - return nil, err + return nil, fmt.Errorf(sessionContextMessage+"HTTP request in error and could not write all body response to InterLink Node error: %w", err) } - return nil, fmt.Errorf("Call exit status: %d. Body: %s", statusCode, ret) + return nil, errHttp } - returnValue, err := io.ReadAll(resp.Body) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - log.G(ctx).Error(err) - return nil, err - } - log.G(ctx).Debug(string(returnValue)) - - w.WriteHeader(resp.StatusCode) types.SetDurationSpan(start, span, types.WithHTTPReturnCode(resp.StatusCode)) - if respondWithValues { - _, err = w.Write(returnValue) + log.G(ctx).Debug(sessionContextMessage, "before respondWithValues") + if respondWithReturn { + + log.G(ctx).Debug(sessionContextMessage, "reading all body once for all") + returnValue, err := io.ReadAll(resp.Body) if err != nil { w.WriteHeader(http.StatusInternalServerError) - log.G(ctx).Error(err) + return nil, fmt.Errorf(sessionContextMessage+"error doing ReadAll() of ReqWithErrorComplex see error %w", err) + } + + if respondWithValues { + _, err = w.Write(returnValue) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return nil, fmt.Errorf(sessionContextMessage+"error doing Write() of ReqWithErrorComplex see error %w", err) + } + } + + return returnValue, nil + } + + // Case no return needed. + + if respondWithValues { + // Because no return needed, we can write continuously instead of writing one big block of data. + // Useful to get following logs. + log.G(ctx).Debug(sessionContextMessage, "in respondWithValues loop, reading body continuously until EOF") + + // In this case, we return continuously the values in the w, instead of reading it all. This allows for logs to be followed. + bodyReader := bufio.NewReader(resp.Body) + + // 4096 is bufio.NewReader default buffer size. + bufferBytes := make([]byte, 4096) + + // Looping until we get EOF from sidecar. + for { + log.G(ctx).Debug(sessionContextMessage, "trying to read some bytes from InterLink sidecar "+string(req.RequestURI)) + n, err := bodyReader.Read(bufferBytes) + if err != nil { + if err == io.EOF { + log.G(ctx).Debug(sessionContextMessage, "received EOF and read number of bytes: "+strconv.Itoa(n)) + + // EOF but we still have something to read! + if n != 0 { + _, err = w.Write(bufferBytes[:n]) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return nil, fmt.Errorf(sessionContextMessage+"could not write during ReqWithError() error: %w", err) + } + } + return nil, nil + } else { + // Error during read. + w.WriteHeader(http.StatusInternalServerError) + return nil, fmt.Errorf(sessionContextMessage+"could not read HTTP body: see error %w", err) + } + } + log.G(ctx).Debug(sessionContextMessage, "received some bytes from InterLink sidecar") + _, err = w.Write(bufferBytes[:n]) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return nil, fmt.Errorf(sessionContextMessage+"could not write during ReqWithError() error: %w", err) + } + + // Flush otherwise it will take time to appear in kubectl logs. + if f, ok := w.(http.Flusher); ok { + log.G(ctx).Debug(sessionContextMessage, "Wrote some logs, now flushing...") + f.Flush() + } else { + log.G(ctx).Error(sessionContextMessage, "could not flush because server does not support Flusher.") + } } } - return returnValue, nil + // Case no respondWithValue no respondWithReturn , it means we are doing a request and not using response. + return nil, nil } diff --git a/pkg/interlink/api/logs.go b/pkg/interlink/api/logs.go index a5a77685..4db2ec12 100644 --- a/pkg/interlink/api/logs.go +++ b/pkg/interlink/api/logs.go @@ -26,20 +26,23 @@ func (h *InterLinkHandler) GetLogsHandler(w http.ResponseWriter, r *http.Request defer span.End() defer types.SetDurationSpan(start, span) + sessionContext := GetSessionContext(r) + sessionContextMessage := GetSessionContextMessage(sessionContext) + var statusCode int - log.G(h.Ctx).Info("InterLink: received GetLogs call") + log.G(h.Ctx).Info(sessionContextMessage, "InterLink: received GetLogs call") bodyBytes, err := io.ReadAll(r.Body) if err != nil { - log.G(h.Ctx).Fatal(err) + log.G(h.Ctx).Fatal(sessionContextMessage, err) } - log.G(h.Ctx).Info("InterLink: unmarshal GetLogs request") + log.G(h.Ctx).Info(sessionContextMessage, "InterLink: unmarshal GetLogs request") var req2 types.LogStruct // incoming request. To be used in interlink API. req is directly forwarded to sidecar err = json.Unmarshal(bodyBytes, &req2) if err != nil { statusCode = http.StatusInternalServerError w.WriteHeader(statusCode) - log.G(h.Ctx).Error(err) + log.G(h.Ctx).Error(sessionContextMessage, err) return } @@ -55,7 +58,7 @@ func (h *InterLinkHandler) GetLogsHandler(w http.ResponseWriter, r *http.Request attribute.Bool("opts.timestamps", req2.Opts.Timestamps), ) - log.G(h.Ctx).Info("InterLink: new GetLogs podUID: now ", req2.PodUID) + log.G(h.Ctx).Info(sessionContextMessage, "InterLink: new GetLogs podUID: now ", req2.PodUID) if (req2.Opts.Tail != 0 && req2.Opts.LimitBytes != 0) || (req2.Opts.SinceSeconds != 0 && !req2.Opts.SinceTime.IsZero()) { statusCode = http.StatusInternalServerError w.WriteHeader(statusCode) @@ -63,19 +66,19 @@ func (h *InterLinkHandler) GetLogsHandler(w http.ResponseWriter, r *http.Request if req2.Opts.Tail != 0 && req2.Opts.LimitBytes != 0 { _, err = w.Write([]byte("Both Tail and LimitBytes set. Set only one of them")) if err != nil { - log.G(h.Ctx).Error(errors.New("Failed to write to http buffer")) + log.G(h.Ctx).Error(errors.New(sessionContextMessage + "Failed to write to http buffer")) } return } _, err = w.Write([]byte("Both SinceSeconds and SinceTime set. Set only one of them")) if err != nil { - log.G(h.Ctx).Error(errors.New("Failed to write to http buffer")) + log.G(h.Ctx).Error(errors.New(sessionContextMessage + "Failed to write to http buffer")) } } - log.G(h.Ctx).Info("InterLink: marshal GetLogs request ") + log.G(h.Ctx).Info(sessionContextMessage, "InterLink: marshal GetLogs request ") bodyBytes, err = json.Marshal(req2) if err != nil { @@ -91,10 +94,16 @@ func (h *InterLinkHandler) GetLogsHandler(w http.ResponseWriter, r *http.Request } req.Header.Set("Content-Type", "application/json") - log.G(h.Ctx).Info("InterLink: forwarding GetLogs call to sidecar") - _, err = ReqWithError(h.Ctx, req, w, start, span, true) + + logTransport := http.DefaultTransport.(*http.Transport).Clone() + //logTransport.DisableKeepAlives = true + //logTransport.MaxIdleConnsPerHost = -1 + var logHttpClient = &http.Client{Transport: logTransport} + + log.G(h.Ctx).Info(sessionContextMessage, "InterLink: forwarding GetLogs call to sidecar") + _, err = ReqWithError(h.Ctx, req, w, start, span, true, false, sessionContext, logHttpClient) if err != nil { - log.L.Error(err) + log.L.Error(sessionContextMessage, err) return } diff --git a/pkg/interlink/api/status.go b/pkg/interlink/api/status.go index 98a94d50..3c686628 100644 --- a/pkg/interlink/api/status.go +++ b/pkg/interlink/api/status.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/json" "errors" + "fmt" "io" "net/http" "time" @@ -37,7 +38,8 @@ func (h *InterLinkHandler) StatusHandler(w http.ResponseWriter, r *http.Request) err = json.Unmarshal(bodyBytes, &pods) if err != nil { - log.G(h.Ctx).Error(err) + errWithContext := fmt.Errorf("error doing fisrt Unmarshal() in StatusHandler() error detail: %s error: %w", fmt.Sprintf("%#v", err), err) + log.G(h.Ctx).Error(errWithContext) } span.SetAttributes( @@ -80,7 +82,8 @@ func (h *InterLinkHandler) StatusHandler(w http.ResponseWriter, r *http.Request) req.Header.Set("Content-Type", "application/json") log.G(h.Ctx).Debug("Interlink get status request content:", req) - bodyBytes, err = ReqWithError(h.Ctx, req, w, start, span, false) + sessionContext := GetSessionContext(r) + bodyBytes, err = ReqWithError(h.Ctx, req, w, start, span, false, true, sessionContext, http.DefaultClient) if err != nil { log.L.Error(err) return @@ -90,7 +93,8 @@ func (h *InterLinkHandler) StatusHandler(w http.ResponseWriter, r *http.Request) if err != nil { statusCode = http.StatusInternalServerError w.WriteHeader(statusCode) - log.G(h.Ctx).Error(err) + errWithContext := fmt.Errorf("error doing Unmarshal() in StatusHandler() of req %s error detail: %s error: %w", fmt.Sprintf("%#v", req), fmt.Sprintf("%#v", err), err) + log.G(h.Ctx).Error(errWithContext) return } diff --git a/pkg/virtualkubelet/execute.go b/pkg/virtualkubelet/execute.go index aece8aa4..401367a0 100644 --- a/pkg/virtualkubelet/execute.go +++ b/pkg/virtualkubelet/execute.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "io" + "math/rand" "net/http" "os" "strconv" @@ -54,13 +55,15 @@ func traceExecute(ctx context.Context, pod *v1.Pod, name string, startHTTPCall i } func doRequest(req *http.Request, token string) (*http.Response, error) { + return doRequestWithClient(req, token, http.DefaultClient) +} +func doRequestWithClient(req *http.Request, token string, httpClient *http.Client) (*http.Response, error) { if token != "" { req.Header.Add("Authorization", "Bearer "+token) } req.Header.Set("Content-Type", "application/json") - return http.DefaultClient.Do(req) - + return httpClient.Do(req) } func getSidecarEndpoint(ctx context.Context, interLinkURL string, interLinkPort string) string { @@ -107,6 +110,9 @@ func PingInterLink(ctx context.Context, config Config) (bool, int, error) { defer spanHTTP.End() defer types.SetDurationSpan(startHTTPCall, spanHTTP) + // Add session number for end-to-end from VK to API to InterLink plugin (eg interlink-slurm-plugin) + AddSessionContext(req, "PingInterLink#"+strconv.Itoa(rand.Intn(100000))) + resp, err := http.DefaultClient.Do(req) if err != nil { @@ -159,6 +165,9 @@ func updateCacheRequest(ctx context.Context, config Config, pod v1.Pod, token st startHTTPCall := time.Now().UnixMicro() spanHTTP := traceExecute(ctx, &pod, "UpdateCacheHttpCall", startHTTPCall) + // Add session number for end-to-end from VK to API to InterLink plugin (eg interlink-slurm-plugin) + AddSessionContext(req, "UpdateCache#"+strconv.Itoa(rand.Intn(100000))) + resp, err := http.DefaultClient.Do(req) if err != nil { log.L.Error(err) @@ -202,10 +211,12 @@ func createRequest(ctx context.Context, config Config, pod types.PodCreateReques defer spanHTTP.End() defer types.SetDurationSpan(startHTTPCall, spanHTTP) + // Add session number for end-to-end from VK to API to InterLink plugin (eg interlink-slurm-plugin) + AddSessionContext(req, "CreatePod#"+strconv.Itoa(rand.Intn(100000))) + resp, err := doRequest(req, token) if err != nil { - log.L.Error(err) - return nil, err + return nil, fmt.Errorf("error doing doRequest() in createRequest() log request: %s error: %w", fmt.Sprintf("%#v", req), err) } defer resp.Body.Close() @@ -216,8 +227,7 @@ func createRequest(ctx context.Context, config Config, pod types.PodCreateReques } returnValue, err := io.ReadAll(resp.Body) if err != nil { - log.L.Error(err) - return nil, err + return nil, fmt.Errorf("error doing ReadAll() in createRequest() log request: %s error: %w", fmt.Sprintf("%#v", req), err) } return returnValue, nil @@ -243,6 +253,9 @@ func deleteRequest(ctx context.Context, config Config, pod *v1.Pod, token string startHTTPCall := time.Now().UnixMicro() spanHTTP := traceExecute(ctx, pod, "DeleteHttpCall", startHTTPCall) + // Add session number for end-to-end from VK to API to InterLink plugin (eg interlink-slurm-plugin) + AddSessionContext(req, "DeletePod#"+strconv.Itoa(rand.Intn(100000))) + resp, err := doRequest(req, token) if err != nil { log.G(context.Background()).Error(err) @@ -302,6 +315,9 @@ func statusRequest(ctx context.Context, config Config, podsList []*v1.Pod, token defer spanHTTP.End() defer types.SetDurationSpan(startHTTPCall, spanHTTP) + // Add session number for end-to-end from VK to API to InterLink plugin (eg interlink-slurm-plugin) + AddSessionContext(req, "GetStatus#"+strconv.Itoa(rand.Intn(100000))) + resp, err := doRequest(req, token) if err != nil { return nil, err @@ -329,7 +345,7 @@ func statusRequest(ctx context.Context, config Config, podsList []*v1.Pod, token // LogRetrieval performs a REST call to the InterLink API when the user ask for a log retrieval. Compared to create/delete/status request, a way smaller struct is marshalled and sent. // This struct only includes a minimum data set needed to identify the job/container to get the logs from. // Returns the call response and/or the first encountered error -func LogRetrieval(ctx context.Context, config Config, logsRequest types.LogStruct) (io.ReadCloser, error) { +func LogRetrieval(ctx context.Context, config Config, logsRequest types.LogStruct, sessionContext string) (io.ReadCloser, error) { tracer := otel.Tracer("interlink-service") interLinkEndpoint := getSidecarEndpoint(ctx, config.InterlinkURL, config.Interlinkport) @@ -343,16 +359,21 @@ func LogRetrieval(ctx context.Context, config Config, logsRequest types.LogStruc token = string(b) } + sessionContextMessage := GetSessionContextMessage(sessionContext) + bodyBytes, err := json.Marshal(logsRequest) if err != nil { - log.G(ctx).Error(err) - return nil, err + errWithContext := fmt.Errorf(sessionContextMessage+"error during marshalling to JSON the log request: %s. Bodybytes: %s error: %w", fmt.Sprintf("%#v", logsRequest), bodyBytes, err) + log.G(ctx).Error(errWithContext) + return nil, errWithContext } + reader := bytes.NewReader(bodyBytes) req, err := http.NewRequest(http.MethodGet, interLinkEndpoint+"/getLogs", reader) if err != nil { - log.G(ctx).Error(err) - return nil, err + errWithContext := fmt.Errorf(sessionContextMessage+"error during HTTP request: %s/getLogs %w", interLinkEndpoint, err) + log.G(ctx).Error(errWithContext) + return nil, errWithContext } // log.G(ctx).Println(string(bodyBytes)) @@ -367,16 +388,28 @@ func LogRetrieval(ctx context.Context, config Config, logsRequest types.LogStruc defer spanHTTP.End() defer types.SetDurationSpan(startHTTPCall, spanHTTP) - resp, err := doRequest(req, token) + log.G(ctx).Debug(sessionContextMessage, "before doRequestWithClient()") + // Add session number for end-to-end from VK to API to InterLink plugin (eg interlink-slurm-plugin) + AddSessionContext(req, sessionContext) + + logTransport := http.DefaultTransport.(*http.Transport).Clone() + //logTransport.DisableKeepAlives = true + //logTransport.MaxIdleConnsPerHost = -1 + var logHttpClient = &http.Client{Transport: logTransport} + + resp, err := doRequestWithClient(req, token, logHttpClient) if err != nil { log.G(ctx).Error(err) return nil, err } - // defer resp.Body.Close() + // resp.body must not be closed because the kubelet needs to consume it! This is the responsability of the caller to close it. + // Called here https://github.com/virtual-kubelet/virtual-kubelet/blob/v1.11.0/node/api/logs.go#L132 + //defer resp.Body.Close() + log.G(ctx).Debug(sessionContextMessage, "after doRequestWithClient()") types.SetDurationSpan(startHTTPCall, spanHTTP, types.WithHTTPReturnCode(resp.StatusCode)) if resp.StatusCode != http.StatusOK { - err = errors.New("Unexpected error occured while getting logs. Status code: " + strconv.Itoa(resp.StatusCode) + ". Check InterLink's logs for further informations") + err = errors.New(sessionContextMessage + "Unexpected error occured while getting logs. Status code: " + strconv.Itoa(resp.StatusCode) + ". Check InterLink's logs for further informations") } // return io.NopCloser(bufio.NewReader(resp.Body)), err @@ -469,13 +502,14 @@ func RemoteExecution(ctx context.Context, config Config, p *Provider, pod *v1.Po returnVal, err := createRequest(ctx, config, req, token) if err != nil { - return err + return fmt.Errorf("error doing createRequest() in RemoteExecution() return value %s error detail %s error: %w", returnVal, fmt.Sprintf("%#v", err), err) } + log.G(ctx).Debug("Pod " + pod.Name + " with Job ID " + resp.PodJID + " before json.Unmarshal()") // get remote job ID and annotate it into the pod err = json.Unmarshal(returnVal, &resp) if err != nil { - return err + return fmt.Errorf("error doing Unmarshal() in RemoteExecution() return value %s error detail %s error: %w", returnVal, fmt.Sprintf("%#v", err), err) } if string(pod.UID) == resp.PodUID { @@ -524,7 +558,8 @@ func checkPodsStatus(ctx context.Context, p *Provider, podsList []*v1.Pod, token err = json.Unmarshal(returnVal, &ret) if err != nil { - return nil, err + errWithContext := fmt.Errorf("error doing Unmarshal() in checkPodsStatus() error detail: %s error: %w", fmt.Sprintf("%#v", err), err) + return nil, errWithContext } // if there is a pod status available go ahead to match with the latest state available in etcd diff --git a/pkg/virtualkubelet/virtualkubelet.go b/pkg/virtualkubelet/virtualkubelet.go index 4d92056a..117a1fc2 100644 --- a/pkg/virtualkubelet/virtualkubelet.go +++ b/pkg/virtualkubelet/virtualkubelet.go @@ -4,6 +4,8 @@ import ( "context" "fmt" "io" + "math/rand" + "net/http" "os" "strconv" "time" @@ -687,6 +689,14 @@ func (p *Provider) statusLoop(ctx context.Context) { } } +func AddSessionContext(req *http.Request, sessionContext string) { + req.Header.Set("InterLink-Http-Session", sessionContext) +} + +func GetSessionContextMessage(sessionContext string) string { + return "HTTP InterLink session " + sessionContext + ": " +} + // GetLogs implements the logic for interLink pod logs retrieval. func (p *Provider) GetLogs(ctx context.Context, namespace, podName, containerName string, opts api.ContainerLogOpts) (io.ReadCloser, error) { start := time.Now().Unix() @@ -697,7 +707,12 @@ func (p *Provider) GetLogs(ctx context.Context, namespace, podName, containerNam defer span.End() defer types.SetDurationSpan(start, span) - log.G(ctx).Infof("receive GetPodLogs %q", podName) + // For debugging purpose, when we have many API calls, we can differentiate each one. + sessionNumber := rand.Intn(100000) + sessionContext := "GetLogs#" + strconv.Itoa(sessionNumber) + sessionContextMessage := GetSessionContextMessage(sessionContext) + + log.G(ctx).Infof(sessionContextMessage+"receive GetPodLogs %q", podName) key, err := buildKeyFromNames(namespace, podName) if err != nil { @@ -712,7 +727,7 @@ func (p *Provider) GetLogs(ctx context.Context, namespace, podName, containerNam Opts: types.ContainerLogOpts(opts), } - return LogRetrieval(ctx, p.config, logsRequest) + return LogRetrieval(ctx, p.config, logsRequest, sessionContext) } // GetStatsSummary returns dummy stats for all pods known by this provider. From 56eb1c1856123e233c5d91dd805d7721b598c0a6 Mon Sep 17 00:00:00 2001 From: antoinetran Date: Tue, 3 Dec 2024 14:19:43 +0100 Subject: [PATCH 2/5] fix lint --- pkg/interlink/api/handler.go | 13 ++++++------- pkg/interlink/api/logs.go | 8 ++++---- pkg/virtualkubelet/execute.go | 6 +++--- 3 files changed, 13 insertions(+), 14 deletions(-) diff --git a/pkg/interlink/api/handler.go b/pkg/interlink/api/handler.go index a4acba8b..d88834ed 100644 --- a/pkg/interlink/api/handler.go +++ b/pkg/interlink/api/handler.go @@ -59,7 +59,7 @@ func ReqWithError( respondWithValues bool, respondWithReturn bool, sessionContext string, - logHttpClient *http.Client, + logHTTPClient *http.Client, ) ([]byte, error) { req.Header.Set("Content-Type", "application/json") @@ -70,7 +70,7 @@ func ReqWithError( AddSessionContext(req, sessionContext) log.G(ctx).Debug(sessionContextMessage, "before DoReq()") - resp, err := logHttpClient.Do(req) + resp, err := logHTTPClient.Do(req) if err != nil { statusCode := http.StatusInternalServerError w.WriteHeader(statusCode) @@ -145,7 +145,7 @@ func ReqWithError( // Looping until we get EOF from sidecar. for { - log.G(ctx).Debug(sessionContextMessage, "trying to read some bytes from InterLink sidecar "+string(req.RequestURI)) + log.G(ctx).Debug(sessionContextMessage, "trying to read some bytes from InterLink sidecar "+req.RequestURI) n, err := bodyReader.Read(bufferBytes) if err != nil { if err == io.EOF { @@ -160,11 +160,10 @@ func ReqWithError( } } return nil, nil - } else { - // Error during read. - w.WriteHeader(http.StatusInternalServerError) - return nil, fmt.Errorf(sessionContextMessage+"could not read HTTP body: see error %w", err) } + // Error during read. + w.WriteHeader(http.StatusInternalServerError) + return nil, fmt.Errorf(sessionContextMessage+"could not read HTTP body: see error %w", err) } log.G(ctx).Debug(sessionContextMessage, "received some bytes from InterLink sidecar") _, err = w.Write(bufferBytes[:n]) diff --git a/pkg/interlink/api/logs.go b/pkg/interlink/api/logs.go index 4db2ec12..db9e148c 100644 --- a/pkg/interlink/api/logs.go +++ b/pkg/interlink/api/logs.go @@ -96,12 +96,12 @@ func (h *InterLinkHandler) GetLogsHandler(w http.ResponseWriter, r *http.Request req.Header.Set("Content-Type", "application/json") logTransport := http.DefaultTransport.(*http.Transport).Clone() - //logTransport.DisableKeepAlives = true - //logTransport.MaxIdleConnsPerHost = -1 - var logHttpClient = &http.Client{Transport: logTransport} + // logTransport.DisableKeepAlives = true + // logTransport.MaxIdleConnsPerHost = -1 + var logHTTPClient = &http.Client{Transport: logTransport} log.G(h.Ctx).Info(sessionContextMessage, "InterLink: forwarding GetLogs call to sidecar") - _, err = ReqWithError(h.Ctx, req, w, start, span, true, false, sessionContext, logHttpClient) + _, err = ReqWithError(h.Ctx, req, w, start, span, true, false, sessionContext, logHTTPClient) if err != nil { log.L.Error(sessionContextMessage, err) return diff --git a/pkg/virtualkubelet/execute.go b/pkg/virtualkubelet/execute.go index 401367a0..1b788225 100644 --- a/pkg/virtualkubelet/execute.go +++ b/pkg/virtualkubelet/execute.go @@ -395,16 +395,16 @@ func LogRetrieval(ctx context.Context, config Config, logsRequest types.LogStruc logTransport := http.DefaultTransport.(*http.Transport).Clone() //logTransport.DisableKeepAlives = true //logTransport.MaxIdleConnsPerHost = -1 - var logHttpClient = &http.Client{Transport: logTransport} + var logHTTPClient = &http.Client{Transport: logTransport} - resp, err := doRequestWithClient(req, token, logHttpClient) + resp, err := doRequestWithClient(req, token, logHTTPClient) if err != nil { log.G(ctx).Error(err) return nil, err } // resp.body must not be closed because the kubelet needs to consume it! This is the responsability of the caller to close it. // Called here https://github.com/virtual-kubelet/virtual-kubelet/blob/v1.11.0/node/api/logs.go#L132 - //defer resp.Body.Close() + // defer resp.Body.Close() log.G(ctx).Debug(sessionContextMessage, "after doRequestWithClient()") types.SetDurationSpan(startHTTPCall, spanHTTP, types.WithHTTPReturnCode(resp.StatusCode)) From 1660ed37f49ac72e0133e40228184a4865873e95 Mon Sep 17 00:00:00 2001 From: antoinetran Date: Tue, 3 Dec 2024 14:35:21 +0100 Subject: [PATCH 3/5] ADOPTERS.md MAINTAINERS.md --- ADOPTERS.md | 5 +++++ MAINTAINERS.md | 1 + 2 files changed, 6 insertions(+) diff --git a/ADOPTERS.md b/ADOPTERS.md index 6595dcd7..3ba422d7 100644 --- a/ADOPTERS.md +++ b/ADOPTERS.md @@ -10,6 +10,11 @@ Used to enable a seamless provisioning of heterogeneous resources to k8s-based w ## HPC supercomputing centers +### CNES + +Project: LISA DDPC + +In the context of LISA (Laser Interferometer Space Antenna) DDPC (Distributed Data Processing Center), CNES is using Interlink to prototype an hybrid execution of LISA pipelines on either Kubernetes or Slurm resources. ## Industry diff --git a/MAINTAINERS.md b/MAINTAINERS.md index e11c234a..056a0d64 100644 --- a/MAINTAINERS.md +++ b/MAINTAINERS.md @@ -8,3 +8,4 @@ - Vibhav Bobade - vibhav.bobde\gmail.com - Mauro Gattari - INFN - mauro.gattari\infn.it +- Antoine Tran - Thales for CNES - no public email, but can be contacted through InterLink Slack channel or in github issue to @antoinetran From 438676363c6ebcb8d6f66eb856371cc3f074ee05 Mon Sep 17 00:00:00 2001 From: antoinetran Date: Tue, 3 Dec 2024 14:42:32 +0100 Subject: [PATCH 4/5] fix lint --- pkg/interlink/api/handler.go | 8 ++++---- pkg/virtualkubelet/execute.go | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/interlink/api/handler.go b/pkg/interlink/api/handler.go index d88834ed..83ae5d5b 100644 --- a/pkg/interlink/api/handler.go +++ b/pkg/interlink/api/handler.go @@ -98,13 +98,13 @@ func ReqWithError( if err != nil { return nil, fmt.Errorf(sessionContextMessage+"HTTP request in error and could not read body response error: %w", err) } - errHttp := fmt.Errorf(sessionContextMessage+"call exit status: %d. Body: %s", statusCode, ret) - log.G(ctx).Error(errHttp) - _, err = w.Write([]byte(errHttp.Error())) + errHTTP := fmt.Errorf(sessionContextMessage+"call exit status: %d. Body: %s", statusCode, ret) + log.G(ctx).Error(errHTTP) + _, err = w.Write([]byte(errHTTP.Error())) if err != nil { return nil, fmt.Errorf(sessionContextMessage+"HTTP request in error and could not write all body response to InterLink Node error: %w", err) } - return nil, errHttp + return nil, errHTTP } types.SetDurationSpan(start, span, types.WithHTTPReturnCode(resp.StatusCode)) diff --git a/pkg/virtualkubelet/execute.go b/pkg/virtualkubelet/execute.go index 1b788225..22d93198 100644 --- a/pkg/virtualkubelet/execute.go +++ b/pkg/virtualkubelet/execute.go @@ -393,8 +393,8 @@ func LogRetrieval(ctx context.Context, config Config, logsRequest types.LogStruc AddSessionContext(req, sessionContext) logTransport := http.DefaultTransport.(*http.Transport).Clone() - //logTransport.DisableKeepAlives = true - //logTransport.MaxIdleConnsPerHost = -1 + // logTransport.DisableKeepAlives = true + // logTransport.MaxIdleConnsPerHost = -1 var logHTTPClient = &http.Client{Transport: logTransport} resp, err := doRequestWithClient(req, token, logHTTPClient) From 81a4a12ea4c8174896ab81e83d02aeff12c2fd99 Mon Sep 17 00:00:00 2001 From: antoinetran Date: Thu, 12 Dec 2024 17:06:46 +0100 Subject: [PATCH 5/5] ignore eclipse files --- .gitignore | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.gitignore b/.gitignore index b8ed2c30..541c0ea0 100644 --- a/.gitignore +++ b/.gitignore @@ -26,3 +26,6 @@ test-deployment.yaml test-pod.yaml examples/interlink-slurm/vk/* examples/sidecar/templates/python/__pycache__/* +# Eclipse IDE +.project +.settings