Skip to content

Commit

Permalink
fix: buffer the logs on the file system before sending them to Object…
Browse files Browse the repository at this point in the history
… Storage (#5866)
  • Loading branch information
rangoo94 authored Sep 25, 2024
1 parent 894db7e commit 8a9a8d0
Show file tree
Hide file tree
Showing 9 changed files with 214 additions and 36 deletions.
9 changes: 9 additions & 0 deletions cmd/testworkflow-toolkit/artifacts/internalartifactstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"time"

"github.com/kubeshop/testkube/cmd/testworkflow-toolkit/env"
"github.com/kubeshop/testkube/pkg/bufferedstream"
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor/constants"
)

type InternalArtifactStorage interface {
Expand Down Expand Up @@ -67,6 +69,13 @@ func (s *internalArtifactStorage) SaveStream(artifactPath string, stream io.Read
size := -1
if streamL, ok := stream.(withLength); ok {
size = streamL.Len()
} else {
stream, err = bufferedstream.NewBufferedStream(constants.DefaultTmpDirPath, "log", stream)
if err != nil {
return err
}
defer stream.(bufferedstream.BufferedStream).Cleanup()
size = stream.(bufferedstream.BufferedStream).Len()
}
err = s.uploader.Add(filepath.Join(s.prefix, artifactPath), stream, int64(size))
if err != nil {
Expand Down
76 changes: 76 additions & 0 deletions pkg/bufferedstream/bufferedstream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package bufferedstream

import (
"context"
"io"
"os"

"github.com/pkg/errors"
)

// bufferedStream is a mechanism to buffer data in FS instead of memory.
type bufferedStream struct {
ctx context.Context
end context.CancelCauseFunc
file *os.File
size int
}

type BufferedStream interface {
io.Reader
Len() int
Cleanup() error
Err() error
Ready() <-chan struct{}
}

func newBufferedStream(file *os.File, source io.Reader) BufferedStream {
ctx, end := context.WithCancelCause(context.Background())
stream := &bufferedStream{ctx: ctx, end: end, file: file}

// Stream the data into file buffer
go func() {
size, err := io.Copy(file, source)
stream.size = int(size)
if err == nil || errors.Is(err, io.EOF) {
_, err = file.Seek(0, io.SeekStart)
}
if err == nil {
err = io.EOF
}
stream.end(err)
}()

return stream
}

func (b *bufferedStream) Read(p []byte) (n int, err error) {
<-b.ctx.Done()
return b.file.Read(p)
}

func (b *bufferedStream) Err() error {
return context.Cause(b.ctx)
}

func (b *bufferedStream) Ready() <-chan struct{} {
return b.ctx.Done()
}

func (b *bufferedStream) Len() int {
<-b.ctx.Done()
return b.size
}

func (b *bufferedStream) Cleanup() error {
b.end(nil)
return os.Remove(b.file.Name())
}

func NewBufferedStream(dirPath, prefix string, source io.Reader) (BufferedStream, error) {
file, err := os.CreateTemp(dirPath, prefix)
if err != nil {
return nil, err
}
return newBufferedStream(file, source), nil
}
90 changes: 90 additions & 0 deletions pkg/bufferedstream/bufferedstream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package bufferedstream

import (
"bytes"
"io"
"os"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestBufferedStream(t *testing.T) {
inputStream := bytes.NewBuffer([]byte("test input stream"))
file, err := os.CreateTemp("", "testbuffer")
if err != nil {
t.Error("failed to create temp file")
return
}
stream := newBufferedStream(file, inputStream)
defer stream.Cleanup()
select {
case <-stream.Ready():
case <-time.After(1 * time.Second):
t.Error("timed out waiting for stream to be ready")
return
}

result, _ := io.ReadAll(stream)
assert.Equal(t, []byte("test input stream"), result)
}

func TestBufferedStream_Cleanup(t *testing.T) {
inputStream := bytes.NewBuffer([]byte("test input stream"))
file, err := os.CreateTemp("", "testbuffer")
if err != nil {
t.Error("failed to create temp file")
return
}
stream := newBufferedStream(file, inputStream)
select {
case <-stream.Ready():
case <-time.After(1 * time.Second):
stream.Cleanup()
t.Error("timed out waiting for stream to be ready")
return
}

statBefore, statBeforeErr := os.Stat(file.Name())

err = stream.Cleanup()
stat, statErr := os.Stat(file.Name())

assert.NoError(t, statBeforeErr)
assert.NotEqual(t, nil, statBefore)
assert.NoError(t, err)
assert.Equal(t, nil, stat)
assert.Error(t, statErr)
}

func TestBufferedStream_Len(t *testing.T) {
inputStream := bytes.NewBuffer([]byte("test input stream"))
inputStreamLen := inputStream.Len()
file, err := os.CreateTemp("", "testbuffer")
if err != nil {
t.Error("failed to create temp file")
return
}
stream := newBufferedStream(file, inputStream)
defer stream.Cleanup()

size := -1
var wg sync.WaitGroup
wg.Add(1)
go func() {
size = stream.Len()
wg.Done()
}()

select {
case <-stream.Ready():
case <-time.After(1 * time.Second):
t.Error("timed out waiting for stream to be ready")
return
}

wg.Wait()
assert.Equal(t, inputStreamLen, size)
}
14 changes: 8 additions & 6 deletions pkg/cloud/data/testworkflow/output.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package testworkflow

import (
"bytes"
"context"
"crypto/tls"
"io"
Expand All @@ -10,6 +9,7 @@ import (
"github.com/pkg/errors"
"google.golang.org/grpc"

"github.com/kubeshop/testkube/pkg/bufferedstream"
"github.com/kubeshop/testkube/pkg/repository/testworkflow"

"github.com/kubeshop/testkube/pkg/cloud"
Expand Down Expand Up @@ -60,20 +60,22 @@ func (r *CloudOutputRepository) PresignReadLog(ctx context.Context, id, workflow

// SaveLog streams the output from the workflow to Cloud
func (r *CloudOutputRepository) SaveLog(ctx context.Context, id, workflowName string, reader io.Reader) error {
url, err := r.PresignSaveLog(ctx, id, workflowName)
// TODO: consider how to choose the temp dir
buffer, err := bufferedstream.NewBufferedStream("", "log", reader)
if err != nil {
return err
}
// FIXME: It should stream instead
data, err := io.ReadAll(reader)
defer buffer.Cleanup()
url, err := r.PresignSaveLog(ctx, id, workflowName)
if err != nil {
return err
}
req, err := http.NewRequestWithContext(ctx, http.MethodPut, url, bytes.NewBuffer(data))
req.Header.Add("Content-Type", "application/octet-stream")
req, err := http.NewRequestWithContext(ctx, http.MethodPut, url, buffer)
if err != nil {
return err
}
req.Header.Add("Content-Type", "application/octet-stream")
req.ContentLength = int64(buffer.Len())
res, err := r.httpClient.Do(req)
if err != nil {
return errors.Wrap(err, "failed to save file in cloud storage")
Expand Down
8 changes: 7 additions & 1 deletion pkg/repository/testworkflow/minio_output_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/kubeshop/testkube/pkg/api/v1/testkube"
"github.com/kubeshop/testkube/pkg/bufferedstream"
"github.com/kubeshop/testkube/pkg/log"
"github.com/kubeshop/testkube/pkg/storage"

Expand Down Expand Up @@ -44,7 +45,12 @@ func (m *MinioRepository) PresignReadLog(ctx context.Context, id, workflowName s

func (m *MinioRepository) SaveLog(ctx context.Context, id, workflowName string, reader io.Reader) error {
log.DefaultLogger.Debugw("inserting output", "id", id, "workflowName", workflowName)
return m.storage.UploadFileToBucket(ctx, m.bucket, bucketFolder, id, reader, -1)
buffer, err := bufferedstream.NewBufferedStream("", "log", reader)
if err != nil {
return nil
}
defer buffer.Cleanup()
return m.storage.UploadFileToBucket(ctx, m.bucket, bucketFolder, id, reader, int64(buffer.Len()))
}

func (m *MinioRepository) ReadLog(ctx context.Context, id, workflowName string) (io.Reader, error) {
Expand Down
7 changes: 1 addition & 6 deletions pkg/storage/minio/minio.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,15 +494,10 @@ func (c *Client) uploadFile(ctx context.Context, bucket, bucketFolder, filePath
filePath = strings.Trim(bucketFolder, "/") + "/" + filePath
}

var partSize uint64
if objectSize == -1 {
partSize = absMinPartSize
}

c.Log.Debugw("saving object in minio", "file", filePath, "bucket", bucket)
_, err = c.minioClient.PutObject(ctx, bucket, filePath, reader, objectSize, minio.PutObjectOptions{
ContentType: "application/octet-stream",
PartSize: partSize})
PartSize: absMinPartSize})
if err != nil {
return fmt.Errorf("minio saving file (%s) put object error: %w", filePath, err)
}
Expand Down
4 changes: 0 additions & 4 deletions pkg/testworkflows/testworkflowprocessor/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,6 @@ func ProcessContentGit(_ InternalProcessor, layer Intermediate, container stage.
mountPath = filepath.Join(constants.DefaultDataPath, "repo")
}

// Build a temporary volume to clone the repository initially.
// This will allow mounting files in the destination at the same level (i.e. overriding the configuration).
container.AppendVolumeMounts(layer.AddEmptyDirVolume(nil, constants.DefaultTmpDirPath))

// Build volume pair and share with all siblings
volumeMount := layer.AddEmptyDirVolume(nil, mountPath)
container.AppendVolumeMounts(volumeMount)
Expand Down
41 changes: 22 additions & 19 deletions pkg/testworkflows/testworkflowprocessor/presets/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,12 +206,14 @@ func TestProcessBasic(t *testing.T) {

assert.Equal(t, want, res.Job)

assert.Equal(t, 2, len(volumeMounts))
assert.Equal(t, 2, len(volumes))
assert.Equal(t, 3, len(volumeMounts))
assert.Equal(t, 3, len(volumes))
assert.Equal(t, constants.DefaultInternalPath, volumeMounts[0].MountPath)
assert.Equal(t, constants.DefaultDataPath, volumeMounts[1].MountPath)
assert.Equal(t, constants.DefaultTmpDirPath, volumeMounts[1].MountPath)
assert.Equal(t, constants.DefaultDataPath, volumeMounts[2].MountPath)
assert.True(t, volumeMounts[0].Name == volumes[0].Name)
assert.True(t, volumeMounts[1].Name == volumes[1].Name)
assert.True(t, volumeMounts[2].Name == volumes[2].Name)
}

func TestProcessShellWithNonStandardImage(t *testing.T) {
Expand Down Expand Up @@ -332,12 +334,14 @@ func TestProcessShellWithNonStandardImage(t *testing.T) {

assert.Equal(t, want, res.Job)

assert.Equal(t, 2, len(volumeMounts))
assert.Equal(t, 2, len(volumes))
assert.Equal(t, 3, len(volumeMounts))
assert.Equal(t, 3, len(volumes))
assert.Equal(t, constants.DefaultInternalPath, volumeMounts[0].MountPath)
assert.Equal(t, constants.DefaultDataPath, volumeMounts[1].MountPath)
assert.Equal(t, constants.DefaultTmpDirPath, volumeMounts[1].MountPath)
assert.Equal(t, constants.DefaultDataPath, volumeMounts[2].MountPath)
assert.True(t, volumeMounts[0].Name == volumes[0].Name)
assert.True(t, volumeMounts[1].Name == volumes[1].Name)
assert.True(t, volumeMounts[2].Name == volumes[2].Name)
}

func TestProcessBasicEnvReference(t *testing.T) {
Expand Down Expand Up @@ -767,14 +771,14 @@ func TestProcessLocalContent(t *testing.T) {
}

assert.Equal(t, want, res.Job.Spec.Template.Spec)
assert.Equal(t, 2, len(volumeMounts))
assert.Equal(t, 3, len(volumeMountsWithContent))
assert.Equal(t, volumeMounts, volumeMountsWithContent[:2])
assert.Equal(t, "/some/path", volumeMountsWithContent[2].MountPath)
assert.Equal(t, 3, len(volumeMounts))
assert.Equal(t, 4, len(volumeMountsWithContent))
assert.Equal(t, volumeMounts, volumeMountsWithContent[:3])
assert.Equal(t, "/some/path", volumeMountsWithContent[3].MountPath)
assert.Equal(t, 1, len(res.ConfigMaps))
assert.Equal(t, volumeMountsWithContent[2].Name, volumes[2].Name)
assert.Equal(t, volumes[2].ConfigMap.Name, res.ConfigMaps[0].Name)
assert.Equal(t, "some-{{content", res.ConfigMaps[0].Data[volumeMountsWithContent[2].SubPath])
assert.Equal(t, volumeMountsWithContent[3].Name, volumes[3].Name)
assert.Equal(t, volumes[3].ConfigMap.Name, res.ConfigMaps[0].Name)
assert.Equal(t, "some-{{content", res.ConfigMaps[0].Data[volumeMountsWithContent[3].SubPath])
}

func TestProcessGlobalContent(t *testing.T) {
Expand Down Expand Up @@ -846,13 +850,12 @@ func TestProcessGlobalContent(t *testing.T) {
}

assert.Equal(t, want, res.Job.Spec.Template.Spec)
assert.Equal(t, 3, len(volumeMounts))
fmt.Println(volumeMounts)
assert.Equal(t, "/some/path", volumeMounts[2].MountPath)
assert.Equal(t, 4, len(volumeMounts))
assert.Equal(t, "/some/path", volumeMounts[3].MountPath)
assert.Equal(t, 1, len(res.ConfigMaps))
assert.Equal(t, volumeMounts[2].Name, volumes[2].Name)
assert.Equal(t, volumes[2].ConfigMap.Name, res.ConfigMaps[0].Name)
assert.Equal(t, "some-{{content", res.ConfigMaps[0].Data[volumeMounts[2].SubPath])
assert.Equal(t, volumeMounts[3].Name, volumes[3].Name)
assert.Equal(t, volumes[3].ConfigMap.Name, res.ConfigMaps[0].Name)
assert.Equal(t, "some-{{content", res.ConfigMaps[0].Data[volumeMounts[3].SubPath])
}

func TestProcessEscapedAnnotations(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions pkg/testworkflows/testworkflowprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func (p *processor) Bundle(ctx context.Context, workflow *testworkflowsv1.TestWo
layer.ContainerDefaults().
ApplyCR(constants.DefaultContainerConfig.DeepCopy()).
AppendVolumeMounts(layer.AddEmptyDirVolume(nil, constants.DefaultInternalPath)).
AppendVolumeMounts(layer.AddEmptyDirVolume(nil, constants.DefaultTmpDirPath)).
AppendVolumeMounts(layer.AddEmptyDirVolume(nil, constants.DefaultDataPath))

mapEnv := make(map[string]corev1.EnvVarSource)
Expand Down

0 comments on commit 8a9a8d0

Please sign in to comment.