Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #35 'kubectl logs -f' implemented #332

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/build_images.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
with:
registry: ghcr.io
username: ${{ github.repository_owner }}
password: ${{ secrets.GH_CR_TOKEN }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Get Repo Owner
id: get_repo_owner
run: echo ::set-output name=repo_owner::$(echo ${{ github.repository_owner }} | tr '[:upper:]' '[:lower:]')
Expand Down Expand Up @@ -70,7 +70,7 @@ jobs:
with:
registry: ghcr.io
username: ${{ github.repository_owner }}
password: ${{ secrets.GH_CR_TOKEN }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Get Repo Owner
id: get_repo_owner
run: echo ::set-output name=repo_owner::$(echo ${{ github.repository_owner }} | tr '[:upper:]' '[:lower:]')
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,6 @@ test-deployment.yaml
test-pod.yaml
examples/interlink-slurm/vk/*
examples/sidecar/templates/python/__pycache__/*
# Eclipse IDE
.project
.settings
5 changes: 5 additions & 0 deletions ADOPTERS.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ Used to enable a seamless provisioning of heterogeneous resources to k8s-based w

## HPC supercomputing centers

### CNES

Project: LISA DDPC

In the context of LISA (Laser Interferometer Space Antenna) DDPC (Distributed Data Processing Center), CNES is using Interlink to prototype an hybrid execution of LISA pipelines on either Kubernetes or Slurm resources.

## Industry

1 change: 1 addition & 0 deletions MAINTAINERS.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@

- Vibhav Bobade - vibhav.bobde\<at\>gmail.com
- Mauro Gattari - INFN - mauro.gattari\<at\>infn.it
- Antoine Tran - Thales for CNES - no public email, but can be contacted through InterLink Slack channel or in github issue to @antoinetran
3 changes: 2 additions & 1 deletion pkg/interlink/api/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ func (h *InterLinkHandler) CreateHandler(w http.ResponseWriter, r *http.Request)

log.G(h.Ctx).Info("InterLink: forwarding Create call to sidecar")

_, err := ReqWithError(h.Ctx, req, w, start, span, true)
sessionContext := GetSessionContext(r)
_, err := ReqWithError(h.Ctx, req, w, start, span, true, false, sessionContext, http.DefaultClient)
if err != nil {
log.L.Error(err)
return
Expand Down
143 changes: 122 additions & 21 deletions pkg/interlink/api/handler.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package api

import (
"bufio"
"context"
"fmt"
"io"
"net/http"
"strconv"

"github.com/containerd/containerd/log"

Expand All @@ -21,6 +23,22 @@ type InterLinkHandler struct {
// TODO: http client with TLS
}

func AddSessionContext(req *http.Request, sessionContext string) {
req.Header.Set("InterLink-Http-Session", sessionContext)
}

func GetSessionContext(r *http.Request) string {
sessionContext := r.Header.Get("InterLink-Http-Session")
if sessionContext == "" {
sessionContext = "NoSessionFound#0"
}
return sessionContext
}

func GetSessionContextMessage(sessionContext string) string {
return "HTTP InterLink session " + sessionContext + ": "
}

func DoReq(req *http.Request) (*http.Response, error) {
resp, err := http.DefaultClient.Do(req)
if err != nil {
Expand All @@ -30,57 +48,140 @@ func DoReq(req *http.Request) (*http.Response, error) {
return resp, nil
}

// respondWithReturn: if false, return nil. Useful when body is too big to be contained in one big string.
// sessionNumber: integer number for debugging purpose, generated from InterLink VK, to follow HTTP request from end-to-end.
func ReqWithError(
ctx context.Context,
req *http.Request,
w http.ResponseWriter,
start int64,
span trace.Span,
respondWithValues bool,
respondWithReturn bool,
sessionContext string,
logHTTPClient *http.Client,
) ([]byte, error) {

req.Header.Set("Content-Type", "application/json")
resp, err := DoReq(req)

sessionContextMessage := GetSessionContextMessage(sessionContext)
log.G(ctx).Debug(sessionContextMessage, "doing request: ", fmt.Sprintf("%#v", req))

// Add session number for end-to-end from API to InterLink plugin (eg interlink-slurm-plugin)
AddSessionContext(req, sessionContext)

log.G(ctx).Debug(sessionContextMessage, "before DoReq()")
resp, err := logHTTPClient.Do(req)
if err != nil {
statusCode := http.StatusInternalServerError
w.WriteHeader(statusCode)
log.G(ctx).Error(err)
return nil, err
errWithContext := fmt.Errorf(sessionContextMessage+"error doing DoReq() of ReqWithErrorWithSessionNumber error %w", err)
return nil, errWithContext
}
defer resp.Body.Close()
log.G(ctx).Debug(sessionContextMessage, "after DoReq()")

log.G(ctx).Debug(sessionContextMessage, "after Do(), writing header and status code: ", resp.StatusCode)
w.WriteHeader(resp.StatusCode)
// Flush headers ASAP so that the client is not blocked in request.
if f, ok := w.(http.Flusher); ok {
log.G(ctx).Debug(sessionContextMessage, "now flushing...")
f.Flush()
} else {
log.G(ctx).Error(sessionContextMessage, "could not flush because server does not support Flusher.")
}

if resp.StatusCode != http.StatusOK {
log.G(ctx).Error(sessionContextMessage, "HTTP request in error.")
statusCode := http.StatusInternalServerError
w.WriteHeader(statusCode)
ret, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
return nil, fmt.Errorf(sessionContextMessage+"HTTP request in error and could not read body response error: %w", err)
}
_, err = w.Write(ret)
errHTTP := fmt.Errorf(sessionContextMessage+"call exit status: %d. Body: %s", statusCode, ret)
log.G(ctx).Error(errHTTP)
_, err = w.Write([]byte(errHTTP.Error()))
if err != nil {
return nil, err
return nil, fmt.Errorf(sessionContextMessage+"HTTP request in error and could not write all body response to InterLink Node error: %w", err)
}
return nil, fmt.Errorf("Call exit status: %d. Body: %s", statusCode, ret)
return nil, errHTTP
}

returnValue, err := io.ReadAll(resp.Body)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
log.G(ctx).Error(err)
return nil, err
}
log.G(ctx).Debug(string(returnValue))

w.WriteHeader(resp.StatusCode)
types.SetDurationSpan(start, span, types.WithHTTPReturnCode(resp.StatusCode))

if respondWithValues {
_, err = w.Write(returnValue)
log.G(ctx).Debug(sessionContextMessage, "before respondWithValues")
if respondWithReturn {

log.G(ctx).Debug(sessionContextMessage, "reading all body once for all")
returnValue, err := io.ReadAll(resp.Body)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
log.G(ctx).Error(err)
return nil, fmt.Errorf(sessionContextMessage+"error doing ReadAll() of ReqWithErrorComplex see error %w", err)
}

if respondWithValues {
_, err = w.Write(returnValue)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return nil, fmt.Errorf(sessionContextMessage+"error doing Write() of ReqWithErrorComplex see error %w", err)
}
}

return returnValue, nil
}

// Case no return needed.

if respondWithValues {
// Because no return needed, we can write continuously instead of writing one big block of data.
// Useful to get following logs.
log.G(ctx).Debug(sessionContextMessage, "in respondWithValues loop, reading body continuously until EOF")

// In this case, we return continuously the values in the w, instead of reading it all. This allows for logs to be followed.
bodyReader := bufio.NewReader(resp.Body)

// 4096 is bufio.NewReader default buffer size.
bufferBytes := make([]byte, 4096)

// Looping until we get EOF from sidecar.
for {
log.G(ctx).Debug(sessionContextMessage, "trying to read some bytes from InterLink sidecar "+req.RequestURI)
n, err := bodyReader.Read(bufferBytes)
if err != nil {
if err == io.EOF {
log.G(ctx).Debug(sessionContextMessage, "received EOF and read number of bytes: "+strconv.Itoa(n))

// EOF but we still have something to read!
if n != 0 {
_, err = w.Write(bufferBytes[:n])
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return nil, fmt.Errorf(sessionContextMessage+"could not write during ReqWithError() error: %w", err)
}
}
return nil, nil
}
// Error during read.
w.WriteHeader(http.StatusInternalServerError)
return nil, fmt.Errorf(sessionContextMessage+"could not read HTTP body: see error %w", err)
}
log.G(ctx).Debug(sessionContextMessage, "received some bytes from InterLink sidecar")
_, err = w.Write(bufferBytes[:n])
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return nil, fmt.Errorf(sessionContextMessage+"could not write during ReqWithError() error: %w", err)
}

// Flush otherwise it will take time to appear in kubectl logs.
if f, ok := w.(http.Flusher); ok {
log.G(ctx).Debug(sessionContextMessage, "Wrote some logs, now flushing...")
f.Flush()
} else {
log.G(ctx).Error(sessionContextMessage, "could not flush because server does not support Flusher.")
}
}
}

return returnValue, nil
// Case no respondWithValue no respondWithReturn , it means we are doing a request and not using response.
return nil, nil
}
31 changes: 20 additions & 11 deletions pkg/interlink/api/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,23 @@ func (h *InterLinkHandler) GetLogsHandler(w http.ResponseWriter, r *http.Request
defer span.End()
defer types.SetDurationSpan(start, span)

sessionContext := GetSessionContext(r)
sessionContextMessage := GetSessionContextMessage(sessionContext)

var statusCode int
log.G(h.Ctx).Info("InterLink: received GetLogs call")
log.G(h.Ctx).Info(sessionContextMessage, "InterLink: received GetLogs call")
bodyBytes, err := io.ReadAll(r.Body)
if err != nil {
log.G(h.Ctx).Fatal(err)
log.G(h.Ctx).Fatal(sessionContextMessage, err)
}

log.G(h.Ctx).Info("InterLink: unmarshal GetLogs request")
log.G(h.Ctx).Info(sessionContextMessage, "InterLink: unmarshal GetLogs request")
var req2 types.LogStruct // incoming request. To be used in interlink API. req is directly forwarded to sidecar
err = json.Unmarshal(bodyBytes, &req2)
if err != nil {
statusCode = http.StatusInternalServerError
w.WriteHeader(statusCode)
log.G(h.Ctx).Error(err)
log.G(h.Ctx).Error(sessionContextMessage, err)
return
}

Expand All @@ -55,27 +58,27 @@ func (h *InterLinkHandler) GetLogsHandler(w http.ResponseWriter, r *http.Request
attribute.Bool("opts.timestamps", req2.Opts.Timestamps),
)

log.G(h.Ctx).Info("InterLink: new GetLogs podUID: now ", req2.PodUID)
log.G(h.Ctx).Info(sessionContextMessage, "InterLink: new GetLogs podUID: now ", req2.PodUID)
if (req2.Opts.Tail != 0 && req2.Opts.LimitBytes != 0) || (req2.Opts.SinceSeconds != 0 && !req2.Opts.SinceTime.IsZero()) {
statusCode = http.StatusInternalServerError
w.WriteHeader(statusCode)

if req2.Opts.Tail != 0 && req2.Opts.LimitBytes != 0 {
_, err = w.Write([]byte("Both Tail and LimitBytes set. Set only one of them"))
if err != nil {
log.G(h.Ctx).Error(errors.New("Failed to write to http buffer"))
log.G(h.Ctx).Error(errors.New(sessionContextMessage + "Failed to write to http buffer"))
}
return
}

_, err = w.Write([]byte("Both SinceSeconds and SinceTime set. Set only one of them"))
if err != nil {
log.G(h.Ctx).Error(errors.New("Failed to write to http buffer"))
log.G(h.Ctx).Error(errors.New(sessionContextMessage + "Failed to write to http buffer"))
}

}

log.G(h.Ctx).Info("InterLink: marshal GetLogs request ")
log.G(h.Ctx).Info(sessionContextMessage, "InterLink: marshal GetLogs request ")

bodyBytes, err = json.Marshal(req2)
if err != nil {
Expand All @@ -91,10 +94,16 @@ func (h *InterLinkHandler) GetLogsHandler(w http.ResponseWriter, r *http.Request
}

req.Header.Set("Content-Type", "application/json")
log.G(h.Ctx).Info("InterLink: forwarding GetLogs call to sidecar")
_, err = ReqWithError(h.Ctx, req, w, start, span, true)

logTransport := http.DefaultTransport.(*http.Transport).Clone()
// logTransport.DisableKeepAlives = true
// logTransport.MaxIdleConnsPerHost = -1
var logHTTPClient = &http.Client{Transport: logTransport}

log.G(h.Ctx).Info(sessionContextMessage, "InterLink: forwarding GetLogs call to sidecar")
_, err = ReqWithError(h.Ctx, req, w, start, span, true, false, sessionContext, logHTTPClient)
if err != nil {
log.L.Error(err)
log.L.Error(sessionContextMessage, err)
return
}

Expand Down
10 changes: 7 additions & 3 deletions pkg/interlink/api/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"time"
Expand Down Expand Up @@ -37,7 +38,8 @@ func (h *InterLinkHandler) StatusHandler(w http.ResponseWriter, r *http.Request)

err = json.Unmarshal(bodyBytes, &pods)
if err != nil {
log.G(h.Ctx).Error(err)
errWithContext := fmt.Errorf("error doing fisrt Unmarshal() in StatusHandler() error detail: %s error: %w", fmt.Sprintf("%#v", err), err)
log.G(h.Ctx).Error(errWithContext)
}

span.SetAttributes(
Expand Down Expand Up @@ -80,7 +82,8 @@ func (h *InterLinkHandler) StatusHandler(w http.ResponseWriter, r *http.Request)
req.Header.Set("Content-Type", "application/json")
log.G(h.Ctx).Debug("Interlink get status request content:", req)

bodyBytes, err = ReqWithError(h.Ctx, req, w, start, span, false)
sessionContext := GetSessionContext(r)
bodyBytes, err = ReqWithError(h.Ctx, req, w, start, span, false, true, sessionContext, http.DefaultClient)
if err != nil {
log.L.Error(err)
return
Expand All @@ -90,7 +93,8 @@ func (h *InterLinkHandler) StatusHandler(w http.ResponseWriter, r *http.Request)
if err != nil {
statusCode = http.StatusInternalServerError
w.WriteHeader(statusCode)
log.G(h.Ctx).Error(err)
errWithContext := fmt.Errorf("error doing Unmarshal() in StatusHandler() of req %s error detail: %s error: %w", fmt.Sprintf("%#v", req), fmt.Sprintf("%#v", err), err)
log.G(h.Ctx).Error(errWithContext)
return
}

Expand Down
Loading
Loading