Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an experimental csv module exposing a streaming csv parser #3743

Merged
merged 15 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions examples/experimental/csv-parse.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { open } from 'k6/experimental/fs'
import csv from 'k6/experimental/csv'
import { scenario } from 'k6/execution'

export const options = {
iterations: 10,
}

// Open the csv file, and parse it ahead of time.
let file;
let csvRecords;
(async function () {
file = await open('data.csv');

// The `csv.parse` function consumes the entire file at once, and returns
// the parsed records as a SharedArray object.
csvRecords = await csv.parse(file, {delimiter: ','})
oleiade marked this conversation as resolved.
Show resolved Hide resolved
})();


export default async function() {
console.log(csvRecords[scenario.iterationInTest])
oleiade marked this conversation as resolved.
Show resolved Hide resolved
}

22 changes: 22 additions & 0 deletions examples/experimental/csv-parser.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { open } from 'k6/experimental/fs'
import csv from 'k6/experimental/csv'

export const options = {
iterations: 10,
}

let file;
let parser;
(async function () {
file = await open('data.csv');
parser = new csv.Parser(file);
})();

export default async function() {
const {done, value} = await parser.next();
if (done) {
throw new Error("No more rows to read");
}

console.log(value);
oleiade marked this conversation as resolved.
Show resolved Hide resolved
}
11 changes: 11 additions & 0 deletions examples/experimental/data.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
firstname,lastname,age
fariha,ehlenfeldt,72
qudratullah,gillfillan,50
jeleah,rodina,41
thaisia,nowalk,99
joey-lynn,wilsford,75
tudur,granville,81
aytek,umber,56
aynoor,hisaw,30
fiadh-rose,iravani,31
annely,ooley,70
3 changes: 3 additions & 0 deletions js/jsmodules.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"errors"
"sync"

"go.k6.io/k6/js/modules/k6/experimental/csv"
oleiade marked this conversation as resolved.
Show resolved Hide resolved

"go.k6.io/k6/ext"
"go.k6.io/k6/js/common"
"go.k6.io/k6/js/modules"
Expand Down Expand Up @@ -38,6 +40,7 @@ func getInternalJSModules() map[string]interface{} {
"k6/encoding": encoding.New(),
"k6/timers": timers.New(),
"k6/execution": execution.New(),
"k6/experimental/csv": csv.New(),
"k6/experimental/redis": redis.New(),
"k6/experimental/streams": streams.New(),
"k6/experimental/webcrypto": webcrypto.New(),
Expand Down
63 changes: 60 additions & 3 deletions js/modules/k6/data/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
package data

import (
"encoding/json"
"errors"
"fmt"
"io"
"strconv"
"sync"

Expand Down Expand Up @@ -90,7 +93,61 @@ func (d *Data) sharedArray(call sobek.ConstructorCall) *sobek.Object {
}

array := d.shared.get(rt, name, fn)
return array.wrap(rt).ToObject(rt)
return array.Wrap(rt).ToObject(rt)
}

// RecordReader is the interface that wraps the action of reading records from a resource.
//
// The data module RecordReader interface is implemented by types that can read data that can be
// treated as records, from data sources such as a CSV file, etc.
type RecordReader interface {
Read() ([]string, error)
}

// NewSharedArrayFrom creates a new shared array from the provided data.
//
// This function is not exposed to the JS runtime. It is used internally to instantiate
// shared arrays without having to go through the whole JS runtime machinery, which effectively has
// a big performance impact (e.g. when filling a shared array from a CSV file).
//
// This function takes an explicit runtime argument to retain control over which VU runtime it is
// executed in. This is important because the shared array underlying implementation relies on maintaining
// a single instance of arrays for the whole test setup and VUs.
func (d *Data) NewSharedArrayFrom(rt *sobek.Runtime, name string, r RecordReader) *sobek.Object {
if name == "" {
common.Throw(rt, errors.New("empty name provided to SharedArray's constructor"))
}

var arr []string
for {
record, err := r.Read()
if errors.Is(err, io.EOF) {
break
}
if err != nil {
common.Throw(rt, fmt.Errorf("failed to read record; reason: %w", err))
}

marshaled, err := json.Marshal(record)
if err != nil {
common.Throw(rt, fmt.Errorf("failed to marshal record; reason: %w", err))
}

arr = append(arr, string(marshaled))
}

return d.shared.set(name, arr).Wrap(rt).ToObject(rt)
}

// set is a helper method to set a shared array in the underlying shared arrays map.
func (s *sharedArrays) set(name string, arr []string) sharedArray {
// FIXME (@oleiade): we should probably return an error if the name is empty?
oleiade marked this conversation as resolved.
Show resolved Hide resolved
s.mu.Lock()
defer s.mu.Unlock()
array := sharedArray{arr: arr}
s.data[name] = array

return array
}

func (s *sharedArrays) get(rt *sobek.Runtime, name string, call sobek.Callable) sharedArray {
Expand Down Expand Up @@ -121,10 +178,10 @@ func getShareArrayFromCall(rt *sobek.Runtime, call sobek.Callable) sharedArray {
}
arr := make([]string, obj.Get("length").ToInteger())

stringify, _ := sobek.AssertFunction(rt.GlobalObject().Get("JSON").ToObject(rt).Get("stringify"))
stringifyFunc, _ := sobek.AssertFunction(rt.GlobalObject().Get("JSON").ToObject(rt).Get("stringify"))
var val sobek.Value
for i := range arr {
val, err = stringify(sobek.Undefined(), obj.Get(strconv.Itoa(i)))
val, err = stringifyFunc(sobek.Undefined(), obj.Get(strconv.Itoa(i)))
if err != nil {
panic(err)
}
Expand Down
3 changes: 2 additions & 1 deletion js/modules/k6/data/share.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type wrappedSharedArray struct {
parse sobek.Callable
}

func (s sharedArray) wrap(rt *sobek.Runtime) sobek.Value {
func (s sharedArray) Wrap(rt *sobek.Runtime) sobek.Value {
oleiade marked this conversation as resolved.
Show resolved Hide resolved
freeze, _ := sobek.AssertFunction(rt.GlobalObject().Get("Object").ToObject(rt).Get("freeze"))
isFrozen, _ := sobek.AssertFunction(rt.GlobalObject().Get("Object").ToObject(rt).Get("isFrozen"))
parse, _ := sobek.AssertFunction(rt.GlobalObject().Get("JSON").ToObject(rt).Get("parse"))
Expand Down Expand Up @@ -49,6 +49,7 @@ func (s wrappedSharedArray) Get(index int) sobek.Value {
if err != nil {
common.Throw(s.rt, err)
}

err = s.deepFreeze(s.rt, val)
if err != nil {
common.Throw(s.rt, err)
Expand Down
Loading