Skip to content

Commit

Permalink
Finish racing tests, fix edge cases they revealed
Browse files Browse the repository at this point in the history
Edge case issues fixed:

1. handleCheckpoint was invoking the retry policy's Decide method
   on attempts that were cancelled as redundant.

2. Wave cleanup code would deadlock if multiple instances of the
   sendAndReadBody goroutine panicked concurrently.

3. attemptState goroutine's recoverPanic wasn't resilient to a
   secondary panic from closing the response body.

-----------------------------------------------------------------------

Checkpoint: As of this commit, the tests pass reliably at least up
to 10K iterations on my Windows laptop, in just over 3 hours.

    C:\...\httpx>go test -count=10000 -cover -timeout=14h
    PASS
    coverage: 98.0% of statements
    ok      github.com/gogama/httpx 10887.090s

-----------------------------------------------------------------------

The only two significant uncovered lines in `client.go` are line 318
(the channel read in setTimer, somewhat surprising to me), and line 388
(the fallthrough branch in the switch statemenet in cleanupWave (this
isn't surprising, it's a small bit of missing test coverage).
  • Loading branch information
vcschapp committed Mar 14, 2021
1 parent 42d5bce commit 51d17ce
Show file tree
Hide file tree
Showing 2 changed files with 340 additions and 50 deletions.
33 changes: 16 additions & 17 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,12 +207,11 @@ func (es *execState) wave() bool {
es.handleCheckpoint(attempt)

es.installAttempt(0, nil, nil, nil, nil)
d := es.racingPolicy.Schedule(es.exec)
es.setTimer(d)
es.setTimer(es.racingPolicy.Schedule(es.exec))

// Flag drain indicates whether to finish all attempts in the wave. It is
// set true as soon as any one attempt in the wave finishes, whether the
// attempt is retryable or not.
// Flag drain indicates whether to close out the wave, finishing in flight
// attempts but not starting any new ones. It is set true as soon as any one
// attempt in the wave finishes, whether the attempt is retryable or not.
//
// Flag halt indicates whether to stop the whole execution. It is set true
// as soon as a non-retryable attempt is detected.
Expand Down Expand Up @@ -244,8 +243,7 @@ func (es *execState) wave() bool {
es.exec.Racing++
es.handleCheckpoint(attempt)
es.installAttempt(attempt.index, nil, nil, nil, nil)
d = es.racingPolicy.Schedule(es.exec)
es.setTimer(d)
es.setTimer(es.racingPolicy.Schedule(es.exec))
}
}
}
Expand Down Expand Up @@ -293,7 +291,7 @@ func (es *execState) handleCheckpoint(attempt *attemptState) (drain bool, halt b
attempt.body = es.exec.Body
attempt.checkpoint = done
drain = true
halt = es.planCancelled() || !es.retryPolicy.Decide(es.exec)
halt = attempt.redundant || es.planCancelled() || !es.retryPolicy.Decide(es.exec)
return
case panicked:
es.exec.Racing--
Expand Down Expand Up @@ -390,6 +388,8 @@ func (es *execState) cleanupWave() {
fallthrough
case readBody:
es.exec.Racing--
case panicked:
es.exec.Racing--
default:
panic("httpx: bad attempt checkpoint")
}
Expand Down Expand Up @@ -499,6 +499,13 @@ func (as *attemptState) recoverPanic() {
return
}

// Communicate the panic.
defer func() {
as.panicVal = r
as.checkpoint = panicked
as.es.signal <- as
}()

// Close the body. If checkpoint is already readBodyClosing, it means
// the panic likely emanated from calling Close() and there's no
// point doing it again.
Expand All @@ -507,18 +514,10 @@ func (as *attemptState) recoverPanic() {
if resp != nil {
body := resp.Body
if body != nil {
func() {
defer func() { _ = recover() }()
_ = body.Close()
}()
_ = body.Close()
}
}
}

// Communicate the panic.
as.panicVal = r
as.checkpoint = panicked
as.es.signal <- as
}

func (as *attemptState) maybeRedundant(err error) error {
Expand Down
Loading

0 comments on commit 51d17ce

Please sign in to comment.