Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fs] Expose an experimental fs module with an open function and a File abstraction (1/3) #3165

Merged
merged 18 commits into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/experimental/fs/bonjour.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Bonjour, tout le monde!
51 changes: 51 additions & 0 deletions examples/experimental/fs/fs.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import { open, SeekMode } from "k6/experimental/fs";

export const options = {
vus: 100,
iterations: 1000,
};

// k6 doesn't support async in the init context. We use a top-level async function for `await`.
//
// Each Virtual User gets its own `file` copy.
// So, operations like `seek` or `read` won't impact other VUs.
let file;
(async function () {
file = await open("bonjour.txt");
})();

export default async function () {
// About information about the file
const fileinfo = await file.stat();
if (fileinfo.name != "bonjour.txt") {
throw new Error("Unexpected file name");
}

const buffer = new Uint8Array(4);

let totalBytesRead = 0;
while (true) {
// Read into the buffer
const bytesRead = await file.read(buffer);
if (bytesRead == null) {
// EOF
break;
}

// Do something useful with the content of the buffer
totalBytesRead += bytesRead;

// If bytesRead is less than the buffer size, we've read the whole file
if (bytesRead < buffer.byteLength) {
break;
}
}

// Check that we read the expected number of bytes
if (totalBytesRead != fileinfo.size) {
throw new Error("Unexpected number of bytes read");
}

// Seek back to the beginning of the file
await file.seek(0, SeekMode.Start);
}
2 changes: 2 additions & 0 deletions js/jsmodules.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"go.k6.io/k6/js/modules/k6/data"
"go.k6.io/k6/js/modules/k6/encoding"
"go.k6.io/k6/js/modules/k6/execution"
"go.k6.io/k6/js/modules/k6/experimental/fs"
"go.k6.io/k6/js/modules/k6/experimental/tracing"
"go.k6.io/k6/js/modules/k6/grpc"
"go.k6.io/k6/js/modules/k6/html"
Expand Down Expand Up @@ -38,6 +39,7 @@ func getInternalJSModules() map[string]interface{} {
"k6/experimental/timers": timers.New(),
"k6/experimental/tracing": tracing.New(),
"k6/experimental/browser": browser.New(),
"k6/experimental/fs": fs.New(),
"k6/net/grpc": grpc.New(),
"k6/html": html.New(),
"k6/http": http.New(),
Expand Down
110 changes: 110 additions & 0 deletions js/modules/k6/experimental/fs/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package fs

import (
"fmt"
"io"
"path/filepath"
"sync"

"go.k6.io/k6/lib/fsext"
)

// cache is a cache of opened files, designed to minimize redundant file reads, and
// avoid replicating the content of the files in memory as much as possible.
//
// Unlike the underlying [fsext.Fs] which also caches file contents, this cache minimizes
// synchronization overhead. [fsext.Fs], using `afero`, employs a [sync.RWMutex] for each
// file access, involving lock/unlock operations. Our cache, however, utilizes a concurrent-safe
// map (openedFiles), bypassing the need for these locks and enhancing performance.
//
// This cache could be seen as redundant, as the underlying [fsext.Fs] implementation
// already caches the content of the files it opens. However, the current implementation of
// [fsext.Fs] relies on `afero` under the hood, which in turn relies on a [sync.RWMutex] to
// protect access to the cached file content. This means that every time a file is opened,
// the `fsext.Fs` cache is accessed, and the [sync.RWMutex] is locked and unlocked.
//
// This cache is designed to avoid this synchronization overhead, by caching the content of
// the files in a map that is safe for concurrent use, and thus avoid the need for a lock.
//
// This leads to a performance improvement, at the cost of holding the content of the files
// in memory twice, once in the cache's `openedFiles` map, and once in the `fsext.Fs` cache.
//
// Note that the current implementation of the cache diverges from the guarantees expressed in the
// [design document] defining the `fs` module, as it we effectively hold the file's content in memory
// twice as opposed to once.
//
// Future updates (see [#1079](https://github.com/grafana/k6/issues/1079)) may phase out reliance on `afero`.
// Depending on our new choice for [fsext] implementation, this cache might become obsolete, allowing us
// to solely depend on [fsext.Fs.Open].
//
// [#1079]: https://github.com/grafana/k6/issues/1079
type cache struct {
// openedFiles holds a safe for concurrent use map, holding the content
// of the files that were opened by the user.
//
// Keys are expected to be strings holding the openedFiles' path.
// Values are expected to be byte slices holding the content of the opened file.
//
// That way, we can cache the file's content and avoid opening too many
// file descriptor, and re-reading its content every time the file is opened.
//
// Importantly, this also means that if the
// file is modified from outside of k6, the changes will not be reflected in the file's data.
openedFiles sync.Map
}

// open retrieves the content of a given file from the specified filesystem (fromFs) and
// stores it in the cache's internal `openedFiles` map.
//
// The function cleans the provided filename using filepath.Clean before using it.
//
// If the file was previously "opened" (and thus cached), it
// returns the cached content. Otherwise, it reads the file from the
// filesystem, caches its content, and then returns it.
//
// The function is designed to minimize redundant file reads by leveraging an internal cache (openedFiles).
// In case the cached value is not a byte slice (which should never occur in regular use), it
// panics with a descriptive error.
//
// Parameters:
// - filename: The name of the file to be retrieved. This should be a relative or absolute path.
// - fromFs: The filesystem (from the fsext package) from which the file should be read if not already cached.
//
// Returns:
// - A byte slice containing the content of the specified file.
// - An error if there's any issue opening or reading the file. If the file content is
// successfully cached and returned once, subsequent calls will not produce
// file-related errors for the same file, as the cached value will be used.
func (fr *cache) open(filename string, fromFs fsext.Fs) (data []byte, err error) {
filename = filepath.Clean(filename)

if f, ok := fr.openedFiles.Load(filename); ok {
data, ok = f.([]byte)
if !ok {
panic(fmt.Errorf("cache's file %s is not stored as a byte slice", filename))
}

return data, nil
}

// TODO: re-evaluate opening from the FS this once #1079 is resolved.
f, err := fromFs.Open(filename)
if err != nil {
return nil, err
}
defer func() {
cerr := f.Close()
if cerr != nil {
err = fmt.Errorf("failed to close file %s: %w", filename, cerr)
}
}()

data, err = io.ReadAll(f)
if err != nil {
return nil, fmt.Errorf("failed to read the content of file %s: %w", filename, err)
}

fr.openedFiles.Store(filename, data)

return data, nil
}
70 changes: 70 additions & 0 deletions js/modules/k6/experimental/fs/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package fs

import (
"testing"

"github.com/stretchr/testify/assert"
"go.k6.io/k6/lib/fsext"
)

func TestFileCacheOpen(t *testing.T) {
t.Parallel()

t.Run("open succeeds", func(t *testing.T) {
t.Parallel()

cache := &cache{}
fs := newTestFs(t, func(fs fsext.Fs) error {
return fsext.WriteFile(fs, "bonjour.txt", []byte("Bonjour, le monde"), 0o644)
})

_, gotBeforeOk := cache.openedFiles.Load("bonjour.txt")
gotData, gotErr := cache.open("bonjour.txt", fs)
_, gotAfterOk := cache.openedFiles.Load("bonjour.txt")

assert.False(t, gotBeforeOk)
assert.NoError(t, gotErr)
assert.Equal(t, []byte("Bonjour, le monde"), gotData)
assert.True(t, gotAfterOk)
})

t.Run("double open succeeds", func(t *testing.T) {
t.Parallel()

cache := &cache{}
fs := newTestFs(t, func(fs fsext.Fs) error {
return fsext.WriteFile(fs, "bonjour.txt", []byte("Bonjour, le monde"), 0o644)
})

firstData, firstErr := cache.open("bonjour.txt", fs)
_, gotFirstOk := cache.openedFiles.Load("bonjour.txt")
secondData, secondErr := cache.open("bonjour.txt", fs)
_, gotSecondOk := cache.openedFiles.Load("bonjour.txt")

assert.True(t, gotFirstOk)
assert.NoError(t, firstErr)
assert.Equal(t, []byte("Bonjour, le monde"), firstData)
assert.True(t, gotSecondOk)
assert.NoError(t, secondErr)
assert.True(t, sameUnderlyingArray(firstData, secondData))
assert.Equal(t, []byte("Bonjour, le monde"), secondData)
})
}

// sameUnderlyingArray returns true if the underlying array of lhs and rhs are the same.
//
// This is done by checking that the two slices have a capacity greater than 0 and that
// the last element of the underlying array is the same for both slices.
//
// Once a slice is created, its starting address can move forward, but can never move
// behond its starting address + its capacity, which is a fixed value for any Go slice.
//
// Hence, if the last element of the underlying array is the same for both slices, it
// means that the underlying array is the same.
//
// See [explanation] for more details.
//
// [explanation]: https://groups.google.com/g/golang-nuts/c/ks1jvoyMYuc?pli=1
func sameUnderlyingArray(lhs, rhs []byte) bool {
return cap(lhs) > 0 && cap(rhs) > 0 && &lhs[0:cap(lhs)][cap(lhs)-1] == &rhs[0:cap(rhs)][cap(rhs)-1]
}
71 changes: 71 additions & 0 deletions js/modules/k6/experimental/fs/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package fs

// newFsError creates a new Error object of the provided kind and with the
// provided message.
func newFsError(k errorKind, message string) *fsError {
return &fsError{
Name: k.String(),
olegbespalov marked this conversation as resolved.
Show resolved Hide resolved
Message: message,
kind: k,
}
}

// errorKind indicates the kind of file system error that has occurred.
//
// Its string representation is generated by the `enumer` tool. The
// `enumer` tool is run by the `go generate` command. See the `go generate`
// command documentation.
// The tool itself is not tracked as part of the k6 go.mod file, and
// therefore must be installed manually using `go install github.com/dmarkham/enumer`.
//
//go:generate enumer -type=errorKind -output errors_gen.go
type errorKind uint8

const (
// NotFoundError is emitted when a file is not found.
NotFoundError errorKind = iota + 1

// InvalidResourceError is emitted when a resource is invalid: for
// instance when attempting to open a directory, which is not supported.
InvalidResourceError

// ForbiddenError is emitted when an operation is forbidden.
ForbiddenError

// TypeError is emitted when an incorrect type has been used.
TypeError

// EOFError is emitted when the end of a file has been reached.
EOFError
)

// fsError represents a custom error object emitted by the fs module.
//
// It is used to provide a more detailed error message to the user, and
// provide a concrete error type that can be used to differentiate between
// different types of errors.
//
// Exposing error types to the user in a way that's compatible with some
// JavaScript error handling constructs such as `instanceof` is still non-trivial
// in Go. See the [dedicated goja issue] with have opened for more details.
//
// [dedicated goja issue]: https://github.com/dop251/goja/issues/529
type fsError struct {
oleiade marked this conversation as resolved.
Show resolved Hide resolved
oleiade marked this conversation as resolved.
Show resolved Hide resolved
// Name contains the name of the error as formalized by the [ErrorKind]
// type.
Name string `json:"name"`

// Message contains the error message as presented to the user.
Message string `json:"message"`

// kind contains the kind of error that has occurred.
kind errorKind
}

// Ensure that the Error type implements the Go `error` interface.
var _ error = (*fsError)(nil)

// Error implements the Go `error` interface.
func (e *fsError) Error() string {
return e.Name + ": " + e.Message
}
Loading