Skip to content

Commit

Permalink
Implement support to collect Usage dynamically
Browse files Browse the repository at this point in the history
Previously Usage collection happened in one place in a pull way. The
usage report needed to get access to the given data and then pull the
info from it and put it in.

This reverses the pattern and adds (if available) the cloud test run id
to the usage report.

Future work can pull a bunch of the other parts of it out. For example:
1. used modules can now be reported from the modules
2. outputs can also report their usage
3. same for executors

Currently all the above are still done in the usage report code, but
that is not necessary.

This also will allow additional usage reporting without the need to
propagate this data through getters to the usage report, and instead
just push it from the place it is used.

Allowing potentially reporting usages that we are interested to remove
in a more generic and easy way.
  • Loading branch information
mstoykov committed Aug 28, 2024
1 parent 8244d49 commit 63175cd
Show file tree
Hide file tree
Showing 8 changed files with 179 additions and 40 deletions.
1 change: 1 addition & 0 deletions cmd/outputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ func createOutputs(
ScriptOptions: test.derivedConfig.Options,
RuntimeOptions: test.preInitState.RuntimeOptions,
ExecutionPlan: executionPlan,
Usage: test.usage,
}

outputs := test.derivedConfig.Out
Expand Down
47 changes: 15 additions & 32 deletions cmd/report.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,17 @@ import (

"go.k6.io/k6/execution"
"go.k6.io/k6/lib/consts"
"go.k6.io/k6/usage"
)

type report struct {
Version string `json:"k6_version"`
Executors map[string]int `json:"executors"`
VUsMax int64 `json:"vus_max"`
Iterations uint64 `json:"iterations"`
Duration string `json:"duration"`
GoOS string `json:"goos"`
GoArch string `json:"goarch"`
Modules []string `json:"modules"`
Outputs []string `json:"outputs"`
}

func createReport(execScheduler *execution.Scheduler, importedModules []string, outputs []string) report {
executors := make(map[string]int)
func createReport(
u *usage.Usage, execScheduler *execution.Scheduler, importedModules []string, outputs []string,
) map[string]any {
for _, ec := range execScheduler.GetExecutorConfigs() {
executors[ec.GetType()]++
u.Count("executors/"+ec.GetType(), 1)
}

// collect the report only with k6 public modules
publicModules := make([]string, 0, len(importedModules))
for _, module := range importedModules {
// Exclude JS modules extensions to prevent to leak
// any user's custom extensions
Expand All @@ -44,7 +33,7 @@ func createReport(execScheduler *execution.Scheduler, importedModules []string,
if !strings.HasPrefix(module, "k6") {
continue
}
publicModules = append(publicModules, module)
u.Strings("modules", module)
}

builtinOutputs := builtinOutputStrings()
Expand All @@ -57,7 +46,6 @@ func createReport(execScheduler *execution.Scheduler, importedModules []string,
}

// collect only the used outputs that are builtin
publicOutputs := make([]string, 0, len(builtinOutputs))
for _, o := range outputs {
// TODO:
// if !slices.Contains(builtinOutputs, o) {
Expand All @@ -66,21 +54,17 @@ func createReport(execScheduler *execution.Scheduler, importedModules []string,
if !builtinOutputsIndex[o] {
continue
}
publicOutputs = append(publicOutputs, o)
u.Strings("outputs", o)
}

u.String("k6_version", consts.Version)
execState := execScheduler.GetState()
return report{
Version: consts.Version,
Executors: executors,
VUsMax: execState.GetInitializedVUsCount(),
Iterations: execState.GetFullIterationCount(),
Duration: execState.GetCurrentTestRunDuration().String(),
GoOS: runtime.GOOS,
GoArch: runtime.GOARCH,
Modules: publicModules,
Outputs: publicOutputs,
}
u.Count("vus_max", execState.GetInitializedVUsCount())
u.Count("iterations", int64(execState.GetFullIterationCount()))
u.String("duration", execState.GetCurrentTestRunDuration().String())
u.String("goos", runtime.GOOS)
u.String("goarch", runtime.GOARCH)
return u.Map()
}

func reportUsage(ctx context.Context, execScheduler *execution.Scheduler, test *loadedAndConfiguredTest) error {
Expand All @@ -90,8 +74,7 @@ func reportUsage(ctx context.Context, execScheduler *execution.Scheduler, test *
outputs = append(outputs, outputName)
}

r := createReport(execScheduler, test.moduleResolver.Imported(), outputs)
body, err := json.Marshal(r)
body, err := json.Marshal(createReport(test.usage, execScheduler, test.moduleResolver.Imported(), outputs))
if err != nil {
return err
}
Expand Down
18 changes: 10 additions & 8 deletions cmd/report_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"go.k6.io/k6/lib/consts"
"go.k6.io/k6/lib/executor"
"go.k6.io/k6/lib/testutils"
"go.k6.io/k6/usage"
"gopkg.in/guregu/null.v3"
)

Expand Down Expand Up @@ -51,12 +52,13 @@ func TestCreateReport(t *testing.T) {
time.Sleep(10 * time.Millisecond)
s.GetState().MarkEnded()

r := createReport(s, importedModules, outputs)
assert.Equal(t, consts.Version, r.Version)
assert.Equal(t, map[string]int{"shared-iterations": 1}, r.Executors)
assert.Equal(t, 6, int(r.VUsMax))
assert.Equal(t, 170, int(r.Iterations))
assert.NotEqual(t, "0s", r.Duration)
assert.ElementsMatch(t, []string{"k6", "k6/http", "k6/experimental/webcrypto"}, r.Modules)
assert.ElementsMatch(t, []string{"json"}, r.Outputs)
m := createReport(usage.New(), s, importedModules, outputs)

assert.Equal(t, consts.Version, m["k6_version"])
assert.Equal(t, map[string]interface{}{"shared-iterations": int64(1)}, m["executors"])
assert.EqualValues(t, 6, m["vus_max"])
assert.EqualValues(t, 170, m["iterations"])
assert.NotEqual(t, "0s", m["duration"])
assert.ElementsMatch(t, []string{"k6", "k6/http", "k6/experimental/webcrypto"}, m["modules"])
assert.ElementsMatch(t, []string{"json"}, m["outputs"])
}
3 changes: 3 additions & 0 deletions cmd/test_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"go.k6.io/k6/lib/fsext"
"go.k6.io/k6/loader"
"go.k6.io/k6/metrics"
"go.k6.io/k6/usage"
)

const (
Expand All @@ -41,6 +42,7 @@ type loadedTest struct {
initRunner lib.Runner // TODO: rename to something more appropriate
keyLogger io.Closer
moduleResolver *modules.ModuleResolver
usage *usage.Usage
}

func loadLocalTest(gs *state.GlobalState, cmd *cobra.Command, args []string) (*loadedTest, error) {
Expand Down Expand Up @@ -86,6 +88,7 @@ func loadLocalTest(gs *state.GlobalState, cmd *cobra.Command, args []string) (*l
fs: gs.FS,
fileSystems: fileSystems,
preInitState: state,
usage: usage.New(),
}

gs.Logger.Debugf("Initializing k6 runner for '%s' (%s)...", sourceRootPath, resolvedPath)
Expand Down
6 changes: 6 additions & 0 deletions output/cloud/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"go.k6.io/k6/metrics"
"go.k6.io/k6/output"
cloudv2 "go.k6.io/k6/output/cloud/expv2"
"go.k6.io/k6/usage"
"gopkg.in/guregu/null.v3"
)

Expand Down Expand Up @@ -61,6 +62,8 @@ type Output struct {

client *cloudapi.Client
testStopFunc func(error)

usage *usage.Usage
}

// Verify that Output implements the wanted interfaces
Expand Down Expand Up @@ -135,6 +138,7 @@ func newOutput(params output.Params) (*Output, error) {
executionPlan: params.ExecutionPlan,
duration: int64(duration / time.Second),
logger: logger,
usage: params.Usage,
}, nil
}

Expand Down Expand Up @@ -341,6 +345,8 @@ func (out *Output) startVersionedOutput() error {
}
var err error

out.usage.String("output.cloud.test_run_id", out.testRunID)

// TODO: move here the creation of a new cloudapi.Client
// so in the case the config has been overwritten the client uses the correct
// value.
Expand Down
6 changes: 6 additions & 0 deletions output/cloud/output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"go.k6.io/k6/metrics"
"go.k6.io/k6/output"
cloudv2 "go.k6.io/k6/output/cloud/expv2"
"go.k6.io/k6/usage"
"gopkg.in/guregu/null.v3"
)

Expand Down Expand Up @@ -121,6 +122,7 @@ func TestOutputCreateTestWithConfigOverwrite(t *testing.T) {
SystemTags: &metrics.DefaultSystemTagSet,
},
ScriptPath: &url.URL{Path: "/script.js"},
Usage: usage.New(),
})
require.NoError(t, err)
require.NoError(t, out.Start())
Expand All @@ -147,6 +149,7 @@ func TestOutputStartVersionError(t *testing.T) {
"K6_CLOUD_API_VERSION": "99",
},
ScriptPath: &url.URL{Path: "/script.js"},
Usage: usage.New(),
})
require.NoError(t, err)

Expand All @@ -170,6 +173,7 @@ func TestOutputStartVersionedOutputV2(t *testing.T) {
AggregationPeriod: types.NullDurationFrom(1 * time.Hour),
MetricPushInterval: types.NullDurationFrom(1 * time.Hour),
},
usage: usage.New(),
}

o.client = cloudapi.NewClient(
Expand All @@ -190,6 +194,7 @@ func TestOutputStartVersionedOutputV1Error(t *testing.T) {
config: cloudapi.Config{
APIVersion: null.IntFrom(1),
},
usage: usage.New(),
}

err := o.startVersionedOutput()
Expand Down Expand Up @@ -217,6 +222,7 @@ func TestOutputStartWithTestRunID(t *testing.T) {
SystemTags: &metrics.DefaultSystemTagSet,
},
ScriptPath: &url.URL{Path: "/script.js"},
Usage: usage.New(),
})
require.NoError(t, err)
require.NoError(t, out.Start())
Expand Down
2 changes: 2 additions & 0 deletions output/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/fsext"
"go.k6.io/k6/metrics"
"go.k6.io/k6/usage"
)

// Params contains all possible constructor parameters an output may need.
Expand All @@ -30,6 +31,7 @@ type Params struct {
ScriptOptions lib.Options
RuntimeOptions lib.RuntimeOptions
ExecutionPlan []lib.ExecutionStep
Usage *usage.Usage
}

// TODO: make v2 with buffered channels?
Expand Down
136 changes: 136 additions & 0 deletions usage/usage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Package usage implements usage tracking for k6 in order to figure what is being used within a given execution
package usage

import (
"strings"
"sync"
)

// Usage is a way to collect usage data for within k6
type Usage struct {
l *sync.Mutex
m map[string]any
}

// New returns a new empty Usage ready to be used
func New() *Usage {
return &Usage{
l: new(sync.Mutex),
m: make(map[string]any),
}
}

// String appends the provided value to a slice of strings that is the value.
// If called only a single time, the value will be just a string not a slice
func (u *Usage) String(k, v string) {
u.l.Lock()
defer u.l.Unlock()
oldV, ok := u.m[k]
if !ok {
u.m[k] = v
return
}
switch oldV := oldV.(type) {
case string:
u.m[k] = []string{oldV, v}
case []string:
u.m[k] = append(oldV, v)
default:
// TODO: error, panic?, nothing, log?
}
}

// Strings appends the provided value to a slice of strings that is the value.
// Unlike String it
func (u *Usage) Strings(k, v string) {
u.l.Lock()
defer u.l.Unlock()
oldV, ok := u.m[k]
if !ok {
u.m[k] = []string{v}
return
}
switch oldV := oldV.(type) {
case string:
u.m[k] = []string{oldV, v}
case []string:
u.m[k] = append(oldV, v)
default:
// TODO: error, panic?, nothing, log?
}
}

// Count adds the provided value to a given key. Creating the key if needed
func (u *Usage) Count(k string, v int64) {
u.l.Lock()
defer u.l.Unlock()
oldV, ok := u.m[k]
if !ok {
u.m[k] = v
return
}
switch oldV := oldV.(type) {
case int64:
u.m[k] = oldV + v
default:
// TODO: error, panic?, nothing, log?
}
}

// Map returns a copy of the internal map plus making subusages from keys that have a slash in them
// only a single level is being respected
func (u *Usage) Map() map[string]any {
u.l.Lock()
defer u.l.Unlock()

result := make(map[string]any, len(u.m))
for k, v := range u.m {
prefix, post, found := strings.Cut(k, "/")
if !found {
result[k] = v
continue
}

topLevel, ok := result[prefix]
if !ok {
topLevel = make(map[string]any)
result[prefix] = topLevel
}
topLevelMap, ok := topLevel.(map[string]any)
if !ok {
continue // TODO panic?, error?
}
keyLevel, ok := topLevelMap[post]
switch value := v.(type) {
case int64:
switch i := keyLevel.(type) {
case int64:
keyLevel = i + value
default:
// TODO:panic? error?
}
case string:
switch i := keyLevel.(type) {
case string:
keyLevel = append([]string(nil), i, value)
case []string:
keyLevel = append(i, value) //nolint:gocritic // we assign to the final value
default:
// TODO:panic? error?
}
case []string:
switch i := keyLevel.(type) {
case []string:
keyLevel = append(i, value...) //nolint:gocritic // we assign to the final value
default:
// TODO:panic? error?
}
}
if !ok {
keyLevel = v
}
topLevelMap[post] = keyLevel
}

return result
}

0 comments on commit 63175cd

Please sign in to comment.