Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add inflight option to vespa feed #32991

Merged
merged 2 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion client/go/internal/cli/cmd/feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

func addFeedFlags(cli *CLI, cmd *cobra.Command, options *feedOptions) {
cmd.PersistentFlags().IntVar(&options.connections, "connections", 8, "The number of connections to use")
cmd.PersistentFlags().IntVar(&options.inflight, "inflight", 0, "The target number of inflight requests. 0 to dynamically detect the best value (default 0)")
cmd.PersistentFlags().StringVar(&options.compression, "compression", "auto", `Whether to compress the document data when sending the HTTP request. Default is "auto", which compresses large documents. Must be "auto", "gzip" or "none"`)
cmd.PersistentFlags().IntVar(&options.timeoutSecs, "timeout", 0, "Individual feed operation timeout in seconds. 0 to disable (default 0)")
cmd.Flags().StringSliceVarP(&options.headers, "header", "", nil, "Add a header to all HTTP requests, on the format 'Header: Value'. This can be specified multiple times")
Expand All @@ -40,6 +41,7 @@ func addFeedFlags(cli *CLI, cmd *cobra.Command, options *feedOptions) {

type feedOptions struct {
connections int
inflight int
compression string
route string
verbose bool
Expand Down Expand Up @@ -257,7 +259,7 @@ func feed(files []string, options feedOptions, cli *CLI, cmd *cobra.Command) err
if err != nil {
return err
}
throttler := document.NewThrottler(options.connections)
throttler := document.NewThrottler(options.connections, options.inflight)
circuitBreaker := document.NewCircuitBreaker(10*time.Second, time.Duration(options.doomSecs)*time.Second)
dispatcher := document.NewDispatcher(client, throttler, circuitBreaker, cli.Stderr, options.verbose)
start := cli.now()
Expand Down
16 changes: 15 additions & 1 deletion client/go/internal/vespa/document/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ type Throttler interface {
TargetInflight() int64
}

type staticThrottler struct {
inflight int
}

func (*staticThrottler) Sent() {}
func (*staticThrottler) Success() {}
func (*staticThrottler) Throttled(count int64) {}
func (s *staticThrottler) TargetInflight() int64 { return int64(s.inflight) }

type dynamicThrottler struct {
minInflight int64
maxInflight int64
Expand Down Expand Up @@ -54,7 +63,12 @@ func newThrottler(connections int, nowFunc func() time.Time) *dynamicThrottler {
return t
}

func NewThrottler(connections int) Throttler { return newThrottler(connections, time.Now) }
func NewThrottler(connections int, inflight int) Throttler {
if inflight > 0 {
return &staticThrottler{inflight}
}
return newThrottler(connections, time.Now)
}

func (t *dynamicThrottler) Sent() {
currentInflight := t.TargetInflight()
Expand Down
7 changes: 7 additions & 0 deletions client/go/internal/vespa/document/throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,10 @@ func TestThrottler(t *testing.T) {
t.Errorf("got TargetInflight() = %d, but want %d", got, want)
}
}

func TestStaticThrottler(t *testing.T) {
var tr Throttler = &staticThrottler{369}
if got, want := tr.TargetInflight(), int64(369); got != want {
t.Errorf("got TargetInflight() = %d, but want %d", got, want)
}
}
Loading