Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop NetTrail - emit iterations metrics in runner instead of dialer #3908

Merged
merged 3 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 63 additions & 22 deletions execution/scheduler_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,8 @@ func TestSchedulerSystemTags(t *testing.T) {

expTrailPVUTags := expCommonTrailTags.With("scenario", "per_vu_test")
expTrailSITags := expCommonTrailTags.With("scenario", "shared_test")
expNetTrailPVUTags := testRunState.RunTags.With("group", "").With("scenario", "per_vu_test")
expNetTrailSITags := testRunState.RunTags.With("group", "").With("scenario", "shared_test")
expDataSentPVUTags := testRunState.RunTags.With("group", "").With("scenario", "per_vu_test")
expDataSentSITags := testRunState.RunTags.With("group", "").With("scenario", "shared_test")

var gotCorrectTags int
for {
Expand All @@ -375,9 +375,13 @@ func TestSchedulerSystemTags(t *testing.T) {
if s.Tags == expTrailPVUTags || s.Tags == expTrailSITags {
gotCorrectTags++
}
case *netext.NetTrail:
if s.Tags == expNetTrailPVUTags || s.Tags == expNetTrailSITags {
gotCorrectTags++
default:
for _, sample := range s.GetSamples() {
if sample.Metric.Name == metrics.DataSentName {
if sample.Tags == expDataSentPVUTags || sample.Tags == expDataSentSITags {
gotCorrectTags++
}
}
}
}

Expand Down Expand Up @@ -487,7 +491,7 @@ func TestSchedulerRunCustomTags(t *testing.T) {
defer stopEmission()
require.NoError(t, execScheduler.Run(ctx, ctx, samples))
}()
var gotTrailTag, gotNetTrailTag bool
var gotTrailTag, gotDataSentTag bool
for {
select {
case sample := <-samples:
Expand All @@ -497,14 +501,16 @@ func TestSchedulerRunCustomTags(t *testing.T) {
gotTrailTag = true
}
}
if netTrail, ok := sample.(*netext.NetTrail); ok && !gotNetTrailTag {
tags := netTrail.Tags.Map()
if v, ok := tags["customTag"]; ok && v == "value" {
gotNetTrailTag = true
for _, s := range sample.GetSamples() {
if s.Metric.Name == metrics.DataSentName && !gotDataSentTag {
tags := s.Tags.Map()
if v, ok := tags["customTag"]; ok && v == "value" {
gotDataSentTag = true
}
}
}
case <-done:
if !gotTrailTag || !gotNetTrailTag {
if !gotTrailTag || !gotDataSentTag {
assert.FailNow(t, "a sample with expected tag wasn't received")
}
return
Expand Down Expand Up @@ -692,11 +698,15 @@ func TestSchedulerRunCustomConfigNoCrossover(t *testing.T) {
gotSampleTags++
}
}
case *netext.NetTrail:
tags := s.Tags.Map()
for _, expTags := range expectedNetTrailTags {
if reflect.DeepEqual(expTags, tags) {
gotSampleTags++
case metrics.Samples:
for _, sample := range s.GetSamples() {
if sample.Metric.Name == metrics.DataSentName {
tags := sample.Tags.Map()
for _, expTags := range expectedNetTrailTags {
if reflect.DeepEqual(expTags, tags) {
gotSampleTags++
}
}
}
}
case metrics.ConnectedSamples:
Expand Down Expand Up @@ -1275,7 +1285,7 @@ func TestRealTimeAndSetupTeardownMetrics(t *testing.T) {
Value: expValue,
}
}
getDummyTrail := func(group string, emitIterations bool, addExpTags ...string) metrics.SampleContainer {
getNetworkSamples := func(group string, addExpTags ...string) metrics.SampleContainer {
expTags := []string{"group", group}
expTags = append(expTags, addExpTags...)
dialer := netext.NewDialer(
Expand All @@ -1284,25 +1294,56 @@ func TestRealTimeAndSetupTeardownMetrics(t *testing.T) {
)

ctm := metrics.TagsAndMeta{Tags: getTags(piState.Registry, expTags...)}
return dialer.GetTrail(time.Now(), time.Now(), true, emitIterations, ctm, piState.BuiltinMetrics)
return dialer.IOSamples(time.Now(), ctm, piState.BuiltinMetrics)
}

getIterationsSamples := func(group string, addExpTags ...string) metrics.SampleContainer {
expTags := []string{"group", group}
expTags = append(expTags, addExpTags...)
ctm := metrics.TagsAndMeta{Tags: getTags(piState.Registry, expTags...)}
startTime := time.Now()
endTime := time.Now()

return metrics.Samples([]metrics.Sample{
{
TimeSeries: metrics.TimeSeries{
Metric: piState.BuiltinMetrics.IterationDuration,
Tags: ctm.Tags,
},
Time: endTime,
Metadata: ctm.Metadata,
Value: metrics.D(endTime.Sub(startTime)),
},
{
TimeSeries: metrics.TimeSeries{
Metric: piState.BuiltinMetrics.Iterations,
Tags: ctm.Tags,
},
Time: endTime,
Metadata: ctm.Metadata,
Value: 1,
},
})
}

// Initially give a long time (5s) for the execScheduler to start
expectIn(0, 5000, getSample(1, testCounter, "group", "::setup", "place", "setupBeforeSleep"))
expectIn(900, 1100, getSample(2, testCounter, "group", "::setup", "place", "setupAfterSleep"))
expectIn(0, 100, getDummyTrail("::setup", false))
expectIn(0, 100, getNetworkSamples("::setup"))

expectIn(0, 100, getSample(5, testCounter, "group", "", "place", "defaultBeforeSleep", "scenario", "default"))
expectIn(900, 1100, getSample(6, testCounter, "group", "", "place", "defaultAfterSleep", "scenario", "default"))
expectIn(0, 100, getDummyTrail("", true, "scenario", "default"))
expectIn(0, 100, getNetworkSamples("", "scenario", "default"))
expectIn(0, 100, getIterationsSamples("", "scenario", "default"))

expectIn(0, 100, getSample(5, testCounter, "group", "", "place", "defaultBeforeSleep", "scenario", "default"))
expectIn(900, 1100, getSample(6, testCounter, "group", "", "place", "defaultAfterSleep", "scenario", "default"))
expectIn(0, 100, getDummyTrail("", true, "scenario", "default"))
expectIn(0, 100, getNetworkSamples("", "scenario", "default"))
expectIn(0, 100, getIterationsSamples("", "scenario", "default"))

expectIn(0, 1000, getSample(3, testCounter, "group", "::teardown", "place", "teardownBeforeSleep"))
expectIn(900, 1100, getSample(4, testCounter, "group", "::teardown", "place", "teardownAfterSleep"))
expectIn(0, 100, getDummyTrail("::teardown", false))
expectIn(0, 100, getNetworkSamples("::teardown"))

for {
select {
Expand Down
35 changes: 32 additions & 3 deletions js/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -865,15 +865,44 @@ func (u *VU) runFn(
u.Transport.CloseIdleConnections()
}

u.state.Samples <- u.Dialer.GetTrail(
startTime, endTime, isFullIteration,
isDefault, u.state.Tags.GetCurrentValues(), u.Runner.preInitState.BuiltinMetrics)
builtinMetrics := u.Runner.preInitState.BuiltinMetrics
ctm := u.state.Tags.GetCurrentValues()
u.state.Samples <- u.Dialer.IOSamples(endTime, ctm, builtinMetrics)

if isFullIteration && isDefault {
u.state.Samples <- iterationSamples(startTime, endTime, ctm, builtinMetrics)
}

v = unPromisify(v)

return v, isFullIteration, endTime.Sub(startTime), err
}

func iterationSamples(
startTime, endTime time.Time, ctm metrics.TagsAndMeta, builtinMetrics *metrics.BuiltinMetrics,
) metrics.Samples {
return metrics.Samples([]metrics.Sample{
{
TimeSeries: metrics.TimeSeries{
Metric: builtinMetrics.IterationDuration,
Tags: ctm.Tags,
},
Time: endTime,
Metadata: ctm.Metadata,
Value: metrics.D(endTime.Sub(startTime)),
},
{
TimeSeries: metrics.TimeSeries{
Metric: builtinMetrics.Iterations,
Tags: ctm.Tags,
},
Time: endTime,
Metadata: ctm.Metadata,
Value: 1,
},
})
}

func (u *ActiveVU) incrIteration() {
u.iteration++
u.state.Iteration = u.iteration
Expand Down
8 changes: 4 additions & 4 deletions js/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -794,10 +794,9 @@ func TestVUIntegrationMetrics(t *testing.T) {
require.NoError(t, err)
sampleCount := 0
builtinMetrics := r.preInitState.BuiltinMetrics
for i, sampleC := range metrics.GetBufferedSamples(samples) {
for j, s := range sampleC.GetSamples() {
sampleCount++
switch i + j {
for _, sampleC := range metrics.GetBufferedSamples(samples) {
for _, s := range sampleC.GetSamples() {
switch sampleCount {
case 0:
assert.Equal(t, 5.0, s.Value)
assert.Equal(t, "my_metric", s.Metric.Name)
Expand All @@ -814,6 +813,7 @@ func TestVUIntegrationMetrics(t *testing.T) {
assert.Same(t, builtinMetrics.Iterations, s.Metric, "`iterations` sample is after `iteration_duration`")
assert.Equal(t, float64(1), s.Value)
}
sampleCount++
}
}
assert.Equal(t, sampleCount, 5)
Expand Down
83 changes: 9 additions & 74 deletions lib/netext/dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,23 +69,20 @@ func (d *Dialer) DialContext(ctx context.Context, proto, addr string) (net.Conn,
return conn, err
}

// GetTrail creates a new NetTrail instance with the Dialer
// sent and received data metrics and the supplied times and tags.
// TODO: Refactor this according to
// https://github.com/k6io/k6/pull/1203#discussion_r337938370
func (d *Dialer) GetTrail(
startTime, endTime time.Time, fullIteration bool, emitIterations bool, ctm metrics.TagsAndMeta,
builtinMetrics *metrics.BuiltinMetrics,
) *NetTrail {
// IOSamples returns samples for data send and received since it last call and zeros out.
// It uses the provided time as the sample time and tags and builtinMetrics to build the samples.
func (d *Dialer) IOSamples(
sampleTime time.Time, ctm metrics.TagsAndMeta, builtinMetrics *metrics.BuiltinMetrics,
) metrics.SampleContainer {
bytesWritten := atomic.SwapInt64(&d.BytesWritten, 0)
bytesRead := atomic.SwapInt64(&d.BytesRead, 0)
samples := []metrics.Sample{
return metrics.Samples([]metrics.Sample{
{
TimeSeries: metrics.TimeSeries{
Metric: builtinMetrics.DataSent,
Tags: ctm.Tags,
},
Time: endTime,
Time: sampleTime,
Metadata: ctm.Metadata,
Value: float64(bytesWritten),
},
Expand All @@ -94,43 +91,11 @@ func (d *Dialer) GetTrail(
Metric: builtinMetrics.DataReceived,
Tags: ctm.Tags,
},
Time: endTime,
Time: sampleTime,
Metadata: ctm.Metadata,
Value: float64(bytesRead),
},
}
if fullIteration {
samples = append(samples, metrics.Sample{
TimeSeries: metrics.TimeSeries{
Metric: builtinMetrics.IterationDuration,
Tags: ctm.Tags,
},
Time: endTime,
Metadata: ctm.Metadata,
Value: metrics.D(endTime.Sub(startTime)),
})
if emitIterations {
samples = append(samples, metrics.Sample{
TimeSeries: metrics.TimeSeries{
Metric: builtinMetrics.Iterations,
Tags: ctm.Tags,
},
Time: endTime,
Metadata: ctm.Metadata,
Value: 1,
})
}
}

return &NetTrail{
BytesRead: bytesRead,
BytesWritten: bytesWritten,
FullIteration: fullIteration,
StartTime: startTime,
EndTime: endTime,
Tags: ctm.Tags,
Samples: samples,
}
})
}

func (d *Dialer) getDialAddr(addr string) (string, error) {
Expand Down Expand Up @@ -208,36 +173,6 @@ func (d *Dialer) getConfiguredHost(addr, host, port string) (*types.Host, error)
return nil, nil //nolint:nilnil
}

// NetTrail contains information about the exchanged data size and length of a
// series of connections from a particular netext.Dialer
type NetTrail struct {
BytesRead int64
BytesWritten int64
FullIteration bool
StartTime time.Time
EndTime time.Time
Tags *metrics.TagSet
Samples []metrics.Sample
}

// Ensure that interfaces are implemented correctly
var _ metrics.ConnectedSampleContainer = &NetTrail{}

// GetSamples implements the metrics.SampleContainer interface.
func (ntr *NetTrail) GetSamples() []metrics.Sample {
return ntr.Samples
}

// GetTags implements the metrics.ConnectedSampleContainer interface.
func (ntr *NetTrail) GetTags() *metrics.TagSet {
return ntr.Tags
}

// GetTime implements the metrics.ConnectedSampleContainer interface.
func (ntr *NetTrail) GetTime() time.Time {
return ntr.EndTime
}

// Conn wraps net.Conn and keeps track of sent and received data size
type Conn struct {
net.Conn
Expand Down
4 changes: 0 additions & 4 deletions output/cloud/insights/collect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/stretchr/testify/require"

"go.k6.io/k6/cloudapi/insights"
"go.k6.io/k6/lib/netext"
"go.k6.io/k6/lib/netext/httpext"
"go.k6.io/k6/metrics"
)
Expand Down Expand Up @@ -51,9 +50,6 @@ func Test_Collector_CollectRequestMetadatas_FiltersAndStoresHTTPTrailsAsRequestM
&httpext.Trail{
// HTTP trail without trace ID should be ignored
},
&netext.NetTrail{
// Net trail should be ignored
},
&httpext.Trail{
EndTime: time.Unix(20, 0),
Duration: time.Second,
Expand Down