Skip to content

Commit

Permalink
[fs] Expose an experimental fs module with an open function and a…
Browse files Browse the repository at this point in the history
… `File` abstraction (1/3) (#3165)

Co-authored-by: Ivan <[email protected]>
Co-authored-by: Mihail Stoykov <[email protected]>
  • Loading branch information
3 people authored Nov 13, 2023
1 parent 348aadd commit 24ae4d9
Show file tree
Hide file tree
Showing 11 changed files with 1,849 additions and 0 deletions.
1 change: 1 addition & 0 deletions examples/experimental/fs/bonjour.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Bonjour, tout le monde!
51 changes: 51 additions & 0 deletions examples/experimental/fs/fs.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import { open, SeekMode } from "k6/experimental/fs";

export const options = {
vus: 100,
iterations: 1000,
};

// k6 doesn't support async in the init context. We use a top-level async function for `await`.
//
// Each Virtual User gets its own `file` copy.
// So, operations like `seek` or `read` won't impact other VUs.
let file;
(async function () {
file = await open("bonjour.txt");
})();

export default async function () {
// About information about the file
const fileinfo = await file.stat();
if (fileinfo.name != "bonjour.txt") {
throw new Error("Unexpected file name");
}

const buffer = new Uint8Array(4);

let totalBytesRead = 0;
while (true) {
// Read into the buffer
const bytesRead = await file.read(buffer);
if (bytesRead == null) {
// EOF
break;
}

// Do something useful with the content of the buffer
totalBytesRead += bytesRead;

// If bytesRead is less than the buffer size, we've read the whole file
if (bytesRead < buffer.byteLength) {
break;
}
}

// Check that we read the expected number of bytes
if (totalBytesRead != fileinfo.size) {
throw new Error("Unexpected number of bytes read");
}

// Seek back to the beginning of the file
await file.seek(0, SeekMode.Start);
}
2 changes: 2 additions & 0 deletions js/jsmodules.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"go.k6.io/k6/js/modules/k6/data"
"go.k6.io/k6/js/modules/k6/encoding"
"go.k6.io/k6/js/modules/k6/execution"
"go.k6.io/k6/js/modules/k6/experimental/fs"
"go.k6.io/k6/js/modules/k6/experimental/tracing"
"go.k6.io/k6/js/modules/k6/grpc"
"go.k6.io/k6/js/modules/k6/html"
Expand Down Expand Up @@ -38,6 +39,7 @@ func getInternalJSModules() map[string]interface{} {
"k6/experimental/timers": timers.New(),
"k6/experimental/tracing": tracing.New(),
"k6/experimental/browser": browser.New(),
"k6/experimental/fs": fs.New(),
"k6/net/grpc": grpc.New(),
"k6/html": html.New(),
"k6/http": http.New(),
Expand Down
110 changes: 110 additions & 0 deletions js/modules/k6/experimental/fs/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package fs

import (
"fmt"
"io"
"path/filepath"
"sync"

"go.k6.io/k6/lib/fsext"
)

// cache is a cache of opened files, designed to minimize redundant file reads, and
// avoid replicating the content of the files in memory as much as possible.
//
// Unlike the underlying [fsext.Fs] which also caches file contents, this cache minimizes
// synchronization overhead. [fsext.Fs], using `afero`, employs a [sync.RWMutex] for each
// file access, involving lock/unlock operations. Our cache, however, utilizes a concurrent-safe
// map (openedFiles), bypassing the need for these locks and enhancing performance.
//
// This cache could be seen as redundant, as the underlying [fsext.Fs] implementation
// already caches the content of the files it opens. However, the current implementation of
// [fsext.Fs] relies on `afero` under the hood, which in turn relies on a [sync.RWMutex] to
// protect access to the cached file content. This means that every time a file is opened,
// the `fsext.Fs` cache is accessed, and the [sync.RWMutex] is locked and unlocked.
//
// This cache is designed to avoid this synchronization overhead, by caching the content of
// the files in a map that is safe for concurrent use, and thus avoid the need for a lock.
//
// This leads to a performance improvement, at the cost of holding the content of the files
// in memory twice, once in the cache's `openedFiles` map, and once in the `fsext.Fs` cache.
//
// Note that the current implementation of the cache diverges from the guarantees expressed in the
// [design document] defining the `fs` module, as it we effectively hold the file's content in memory
// twice as opposed to once.
//
// Future updates (see [#1079](https://github.com/grafana/k6/issues/1079)) may phase out reliance on `afero`.
// Depending on our new choice for [fsext] implementation, this cache might become obsolete, allowing us
// to solely depend on [fsext.Fs.Open].
//
// [#1079]: https://github.com/grafana/k6/issues/1079
type cache struct {
// openedFiles holds a safe for concurrent use map, holding the content
// of the files that were opened by the user.
//
// Keys are expected to be strings holding the openedFiles' path.
// Values are expected to be byte slices holding the content of the opened file.
//
// That way, we can cache the file's content and avoid opening too many
// file descriptor, and re-reading its content every time the file is opened.
//
// Importantly, this also means that if the
// file is modified from outside of k6, the changes will not be reflected in the file's data.
openedFiles sync.Map
}

// open retrieves the content of a given file from the specified filesystem (fromFs) and
// stores it in the cache's internal `openedFiles` map.
//
// The function cleans the provided filename using filepath.Clean before using it.
//
// If the file was previously "opened" (and thus cached), it
// returns the cached content. Otherwise, it reads the file from the
// filesystem, caches its content, and then returns it.
//
// The function is designed to minimize redundant file reads by leveraging an internal cache (openedFiles).
// In case the cached value is not a byte slice (which should never occur in regular use), it
// panics with a descriptive error.
//
// Parameters:
// - filename: The name of the file to be retrieved. This should be a relative or absolute path.
// - fromFs: The filesystem (from the fsext package) from which the file should be read if not already cached.
//
// Returns:
// - A byte slice containing the content of the specified file.
// - An error if there's any issue opening or reading the file. If the file content is
// successfully cached and returned once, subsequent calls will not produce
// file-related errors for the same file, as the cached value will be used.
func (fr *cache) open(filename string, fromFs fsext.Fs) (data []byte, err error) {
filename = filepath.Clean(filename)

if f, ok := fr.openedFiles.Load(filename); ok {
data, ok = f.([]byte)
if !ok {
panic(fmt.Errorf("cache's file %s is not stored as a byte slice", filename))
}

return data, nil
}

// TODO: re-evaluate opening from the FS this once #1079 is resolved.
f, err := fromFs.Open(filename)
if err != nil {
return nil, err
}
defer func() {
cerr := f.Close()
if cerr != nil {
err = fmt.Errorf("failed to close file %s: %w", filename, cerr)
}
}()

data, err = io.ReadAll(f)
if err != nil {
return nil, fmt.Errorf("failed to read the content of file %s: %w", filename, err)
}

fr.openedFiles.Store(filename, data)

return data, nil
}
70 changes: 70 additions & 0 deletions js/modules/k6/experimental/fs/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package fs

import (
"testing"

"github.com/stretchr/testify/assert"
"go.k6.io/k6/lib/fsext"
)

func TestFileCacheOpen(t *testing.T) {
t.Parallel()

t.Run("open succeeds", func(t *testing.T) {
t.Parallel()

cache := &cache{}
fs := newTestFs(t, func(fs fsext.Fs) error {
return fsext.WriteFile(fs, "bonjour.txt", []byte("Bonjour, le monde"), 0o644)
})

_, gotBeforeOk := cache.openedFiles.Load("bonjour.txt")
gotData, gotErr := cache.open("bonjour.txt", fs)
_, gotAfterOk := cache.openedFiles.Load("bonjour.txt")

assert.False(t, gotBeforeOk)
assert.NoError(t, gotErr)
assert.Equal(t, []byte("Bonjour, le monde"), gotData)
assert.True(t, gotAfterOk)
})

t.Run("double open succeeds", func(t *testing.T) {
t.Parallel()

cache := &cache{}
fs := newTestFs(t, func(fs fsext.Fs) error {
return fsext.WriteFile(fs, "bonjour.txt", []byte("Bonjour, le monde"), 0o644)
})

firstData, firstErr := cache.open("bonjour.txt", fs)
_, gotFirstOk := cache.openedFiles.Load("bonjour.txt")
secondData, secondErr := cache.open("bonjour.txt", fs)
_, gotSecondOk := cache.openedFiles.Load("bonjour.txt")

assert.True(t, gotFirstOk)
assert.NoError(t, firstErr)
assert.Equal(t, []byte("Bonjour, le monde"), firstData)
assert.True(t, gotSecondOk)
assert.NoError(t, secondErr)
assert.True(t, sameUnderlyingArray(firstData, secondData))
assert.Equal(t, []byte("Bonjour, le monde"), secondData)
})
}

// sameUnderlyingArray returns true if the underlying array of lhs and rhs are the same.
//
// This is done by checking that the two slices have a capacity greater than 0 and that
// the last element of the underlying array is the same for both slices.
//
// Once a slice is created, its starting address can move forward, but can never move
// behond its starting address + its capacity, which is a fixed value for any Go slice.
//
// Hence, if the last element of the underlying array is the same for both slices, it
// means that the underlying array is the same.
//
// See [explanation] for more details.
//
// [explanation]: https://groups.google.com/g/golang-nuts/c/ks1jvoyMYuc?pli=1
func sameUnderlyingArray(lhs, rhs []byte) bool {
return cap(lhs) > 0 && cap(rhs) > 0 && &lhs[0:cap(lhs)][cap(lhs)-1] == &rhs[0:cap(rhs)][cap(rhs)-1]
}
71 changes: 71 additions & 0 deletions js/modules/k6/experimental/fs/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package fs

// newFsError creates a new Error object of the provided kind and with the
// provided message.
func newFsError(k errorKind, message string) *fsError {
return &fsError{
Name: k.String(),
Message: message,
kind: k,
}
}

// errorKind indicates the kind of file system error that has occurred.
//
// Its string representation is generated by the `enumer` tool. The
// `enumer` tool is run by the `go generate` command. See the `go generate`
// command documentation.
// The tool itself is not tracked as part of the k6 go.mod file, and
// therefore must be installed manually using `go install github.com/dmarkham/enumer`.
//
//go:generate enumer -type=errorKind -output errors_gen.go
type errorKind uint8

const (
// NotFoundError is emitted when a file is not found.
NotFoundError errorKind = iota + 1

// InvalidResourceError is emitted when a resource is invalid: for
// instance when attempting to open a directory, which is not supported.
InvalidResourceError

// ForbiddenError is emitted when an operation is forbidden.
ForbiddenError

// TypeError is emitted when an incorrect type has been used.
TypeError

// EOFError is emitted when the end of a file has been reached.
EOFError
)

// fsError represents a custom error object emitted by the fs module.
//
// It is used to provide a more detailed error message to the user, and
// provide a concrete error type that can be used to differentiate between
// different types of errors.
//
// Exposing error types to the user in a way that's compatible with some
// JavaScript error handling constructs such as `instanceof` is still non-trivial
// in Go. See the [dedicated goja issue] with have opened for more details.
//
// [dedicated goja issue]: https://github.com/dop251/goja/issues/529
type fsError struct {
// Name contains the name of the error as formalized by the [ErrorKind]
// type.
Name string `json:"name"`

// Message contains the error message as presented to the user.
Message string `json:"message"`

// kind contains the kind of error that has occurred.
kind errorKind
}

// Ensure that the Error type implements the Go `error` interface.
var _ error = (*fsError)(nil)

// Error implements the Go `error` interface.
func (e *fsError) Error() string {
return e.Name + ": " + e.Message
}
Loading

0 comments on commit 24ae4d9

Please sign in to comment.