Skip to content

Commit

Permalink
Merge pull request #32991 from vespa-engine/havardpe/add-inflight-option
Browse files Browse the repository at this point in the history
add inflight option to vespa feed
  • Loading branch information
mpolden authored Dec 5, 2024
2 parents c595b5c + 8c67a04 commit af60474
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 2 deletions.
4 changes: 3 additions & 1 deletion client/go/internal/cli/cmd/feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

func addFeedFlags(cli *CLI, cmd *cobra.Command, options *feedOptions) {
cmd.PersistentFlags().IntVar(&options.connections, "connections", 8, "The number of connections to use")
cmd.PersistentFlags().IntVar(&options.inflight, "inflight", 0, "The target number of inflight requests. 0 to dynamically detect the best value (default 0)")
cmd.PersistentFlags().StringVar(&options.compression, "compression", "auto", `Whether to compress the document data when sending the HTTP request. Default is "auto", which compresses large documents. Must be "auto", "gzip" or "none"`)
cmd.PersistentFlags().IntVar(&options.timeoutSecs, "timeout", 0, "Individual feed operation timeout in seconds. 0 to disable (default 0)")
cmd.Flags().StringSliceVarP(&options.headers, "header", "", nil, "Add a header to all HTTP requests, on the format 'Header: Value'. This can be specified multiple times")
Expand All @@ -40,6 +41,7 @@ func addFeedFlags(cli *CLI, cmd *cobra.Command, options *feedOptions) {

type feedOptions struct {
connections int
inflight int
compression string
route string
verbose bool
Expand Down Expand Up @@ -257,7 +259,7 @@ func feed(files []string, options feedOptions, cli *CLI, cmd *cobra.Command) err
if err != nil {
return err
}
throttler := document.NewThrottler(options.connections)
throttler := document.NewThrottler(options.connections, options.inflight)
circuitBreaker := document.NewCircuitBreaker(10*time.Second, time.Duration(options.doomSecs)*time.Second)
dispatcher := document.NewDispatcher(client, throttler, circuitBreaker, cli.Stderr, options.verbose)
start := cli.now()
Expand Down
16 changes: 15 additions & 1 deletion client/go/internal/vespa/document/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ type Throttler interface {
TargetInflight() int64
}

type staticThrottler struct {
inflight int
}

func (*staticThrottler) Sent() {}
func (*staticThrottler) Success() {}
func (*staticThrottler) Throttled(count int64) {}
func (s *staticThrottler) TargetInflight() int64 { return int64(s.inflight) }

type dynamicThrottler struct {
minInflight int64
maxInflight int64
Expand Down Expand Up @@ -54,7 +63,12 @@ func newThrottler(connections int, nowFunc func() time.Time) *dynamicThrottler {
return t
}

func NewThrottler(connections int) Throttler { return newThrottler(connections, time.Now) }
func NewThrottler(connections int, inflight int) Throttler {
if inflight > 0 {
return &staticThrottler{inflight}
}
return newThrottler(connections, time.Now)
}

func (t *dynamicThrottler) Sent() {
currentInflight := t.TargetInflight()
Expand Down
7 changes: 7 additions & 0 deletions client/go/internal/vespa/document/throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,10 @@ func TestThrottler(t *testing.T) {
t.Errorf("got TargetInflight() = %d, but want %d", got, want)
}
}

func TestStaticThrottler(t *testing.T) {
var tr Throttler = &staticThrottler{369}
if got, want := tr.TargetInflight(), int64(369); got != want {
t.Errorf("got TargetInflight() = %d, but want %d", got, want)
}
}

0 comments on commit af60474

Please sign in to comment.