Skip to content

Commit

Permalink
Merge pull request #29397 from vespa-engine/mpolden/tweak-feed-output
Browse files Browse the repository at this point in the history
Count operations passed to feeder
  • Loading branch information
mpolden authored Nov 21, 2023
2 parents 430c0f8 + 29fd9a8 commit 8f9be37
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 31 deletions.
32 changes: 30 additions & 2 deletions client/go/internal/cli/cmd/feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,36 @@ func newFeedCmd(cli *CLI) *cobra.Command {
This command can be used to feed large amounts of documents to a Vespa cluster
efficiently.
The contents of JSON-FILE must be either a JSON array or JSON objects separated by
The contents of json-file must be either a JSON array or JSON objects separated by
newline (JSONL).
If JSON-FILE is a single dash ('-'), documents will be read from standard input.
If json-file is a single dash ('-'), documents will be read from standard input.
Once feeding completes, metrics of the feed session are printed to standard out
in a JSON format:
- feeder.operation.count: Number of operations passed to the feeder by the user,
not counting retries.
- feeder.seconds: Total time spent feeding.
- feeder.ok.count: Number of successful operations.
- feeder.ok.rate: Number of successful operations per second.
- feeder.error.count: Number of network errors (transport layer).
- feeder.inflight.count: Number of operations currently being sent.
- http.request.count: Number of HTTP requests made, including retries.
- http.request.bytes: Number of bytes sent.
- http.request.MBps: Request throughput measured in MB/s. This is the raw
operation throughput, and not the network throughput,
I.e. using compression does not affect this number.
- http.exception.count: Same as feeder.error.count. Present for compatiblity
with vespa-feed-client.
- http.response.count: Number of HTTP responses received.
- http.response.bytes: Number of bytes received.
- http.response.MBps: Response throughput measured in MB/s.
- http.response.error.count: Number of non-OK HTTP responses received.
- http.response.latency.millis.min: Lowest latency of a successful operation.
- http.response.latency.millis.avg: Average latency of successful operations.
- http.response.latency.millis.max: Highest latency of a successful operation.
- http.response.code.counts: Number of responses grouped by their HTTP code.
`,
Example: `$ vespa feed docs.jsonl moredocs.json
$ cat docs.jsonl | vespa feed -`,
Expand Down Expand Up @@ -244,6 +270,7 @@ type number float32
func (n number) MarshalJSON() ([]byte, error) { return []byte(fmt.Sprintf("%.3f", n)), nil }

type feedSummary struct {
Operations int64 `json:"feeder.operation.count"`
Seconds number `json:"feeder.seconds"`
SuccessCount int64 `json:"feeder.ok.count"`
SuccessRate number `json:"feeder.ok.rate"`
Expand Down Expand Up @@ -272,6 +299,7 @@ func mbps(bytes int64, duration time.Duration) float64 {

func writeSummaryJSON(w io.Writer, stats document.Stats, duration time.Duration) error {
summary := feedSummary{
Operations: stats.Operations,
Seconds: number(duration.Seconds()),
SuccessCount: stats.Successful(),
SuccessRate: number(float64(stats.Successful()) / math.Max(1, duration.Seconds())),
Expand Down
2 changes: 2 additions & 0 deletions client/go/internal/cli/cmd/feed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func TestFeed(t *testing.T) {

assert.Equal(t, "", stderr.String())
want := `{
"feeder.operation.count": 2,
"feeder.seconds": 5.000,
"feeder.ok.count": 2,
"feeder.ok.rate": 0.400,
Expand Down Expand Up @@ -122,6 +123,7 @@ func TestFeedInvalid(t *testing.T) {
require.NotNil(t, cli.Run("feed", "-t", "http://127.0.0.1:8080", jsonFile))

want := `{
"feeder.operation.count": 1,
"feeder.seconds": 3.000,
"feeder.ok.count": 1,
"feeder.ok.rate": 0.333,
Expand Down
2 changes: 1 addition & 1 deletion client/go/internal/vespa/document/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (d *Dispatcher) processResults() {
defer d.wg.Done()
for op := range d.results {
d.statsMu.Lock()
d.stats.Add(op.result)
d.stats.Add(op.result, op.attempts > 1)
d.statsMu.Unlock()
retry := d.shouldRetry(op, op.result)
d.logResult(op, retry)
Expand Down
7 changes: 4 additions & 3 deletions client/go/internal/vespa/document/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func TestClientSend(t *testing.T) {
if !reflect.DeepEqual(res, wantRes) {
t.Fatalf("#%d: got result %+v, want %+v", i, res, wantRes)
}
stats.Add(res)
stats.Add(res, false)
r := httpClient.LastRequest
if r.Method != tt.method {
t.Errorf("#%d: got r.Method = %q, want %q", i, r.Method, tt.method)
Expand All @@ -139,8 +139,9 @@ func TestClientSend(t *testing.T) {
}
}
want := Stats{
Requests: 5,
Responses: 4,
Operations: 5,
Requests: 5,
Responses: 4,
ResponsesByCode: map[int]int64{
200: 3,
502: 1,
Expand Down
39 changes: 27 additions & 12 deletions client/go/internal/vespa/document/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,29 @@ func (r Result) Success() bool {

// Stats represents feeding operation statistics.
type Stats struct {
// Number of operations passed to the feeder by the user, not counting retries.
Operations int64
// Number of responses received, grouped by the HTTP status code. Requests that do not receive a response (i.e. no
// status code) are not counted.
ResponsesByCode map[int]int64
Requests int64
Responses int64
Errors int64
Inflight int64
TotalLatency time.Duration
MinLatency time.Duration
MaxLatency time.Duration
BytesSent int64
BytesRecv int64
// Number of requests made, including retries.
Requests int64
// Number of responses received.
Responses int64
// Number of transport layer errors.
Errors int64
// Number of requests currently in-flight.
Inflight int64
// Sum of response latency
TotalLatency time.Duration
// Lowest recorded response latency
MinLatency time.Duration
// Highest recorded response latency
MaxLatency time.Duration
// Total bytes sent
BytesSent int64
// Total bytes received
BytesRecv int64
}

// AvgLatency returns the average latency for a request.
Expand Down Expand Up @@ -82,14 +95,16 @@ func (s Stats) Clone() Stats {
}

// Add statistics from result to this.
func (s *Stats) Add(result Result) {
func (s *Stats) Add(result Result, retry bool) {
if !retry {
s.Operations++
}
s.Requests++
if s.ResponsesByCode == nil {
s.ResponsesByCode = make(map[int]int64)
}
if result.Err == nil {
responsesByCode := s.ResponsesByCode[result.HTTPStatus]
s.ResponsesByCode[result.HTTPStatus] = responsesByCode + 1
s.ResponsesByCode[result.HTTPStatus]++
s.Responses++
} else {
s.Errors++
Expand Down
28 changes: 15 additions & 13 deletions client/go/internal/vespa/document/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,19 @@ import (

func TestStatsAdd(t *testing.T) {
var stats Stats
stats.Add(Result{HTTPStatus: 200, Latency: 200 * time.Millisecond})
stats.Add(Result{HTTPStatus: 200, Latency: 400 * time.Millisecond})
stats.Add(Result{HTTPStatus: 200, Latency: 100 * time.Millisecond})
stats.Add(Result{HTTPStatus: 200, Latency: 500 * time.Millisecond})
stats.Add(Result{HTTPStatus: 200, Latency: 300 * time.Millisecond})
stats.Add(Result{HTTPStatus: 500, Latency: 100 * time.Millisecond})
stats.Add(Result{HTTPStatus: 200, Latency: 200 * time.Millisecond}, false)
stats.Add(Result{HTTPStatus: 200, Latency: 400 * time.Millisecond}, false)
stats.Add(Result{HTTPStatus: 200, Latency: 100 * time.Millisecond}, false)
stats.Add(Result{HTTPStatus: 200, Latency: 500 * time.Millisecond}, false)
stats.Add(Result{HTTPStatus: 200, Latency: 300 * time.Millisecond}, false)
stats.Add(Result{HTTPStatus: 500, Latency: 100 * time.Millisecond}, false)
stats.Add(Result{HTTPStatus: 200, Latency: 100 * time.Millisecond}, true)
expected := Stats{
Requests: 6,
Responses: 6,
ResponsesByCode: map[int]int64{200: 5, 500: 1},
TotalLatency: 1600 * time.Millisecond,
Operations: 6,
Requests: 7,
Responses: 7,
ResponsesByCode: map[int]int64{200: 6, 500: 1},
TotalLatency: 1700 * time.Millisecond,
MinLatency: 100 * time.Millisecond,
MaxLatency: 500 * time.Millisecond,
}
Expand All @@ -33,11 +35,11 @@ func TestStatsAdd(t *testing.T) {

func TestStatsClone(t *testing.T) {
var a Stats
a.Add(Result{HTTPStatus: 200})
a.Add(Result{HTTPStatus: 200}, false)
b := a.Clone()
a.Add(Result{HTTPStatus: 200})
a.Add(Result{HTTPStatus: 200}, false)

want := Stats{Requests: 1, Responses: 1, ResponsesByCode: map[int]int64{200: 1}}
want := Stats{Operations: 1, Requests: 1, Responses: 1, ResponsesByCode: map[int]int64{200: 1}}
if !reflect.DeepEqual(b, want) {
t.Errorf("got %+v, want %+v", b, want)
}
Expand Down

0 comments on commit 8f9be37

Please sign in to comment.