From 51d17ceb39ab3a325680ed06c94469554d100c1c Mon Sep 17 00:00:00 2001 From: Victor Schappert Date: Sat, 13 Mar 2021 13:04:19 -0800 Subject: [PATCH] Finish racing tests, fix edge cases they revealed Edge case issues fixed: 1. handleCheckpoint was invoking the retry policy's Decide method on attempts that were cancelled as redundant. 2. Wave cleanup code would deadlock if multiple instances of the sendAndReadBody goroutine panicked concurrently. 3. attemptState goroutine's recoverPanic wasn't resilient to a secondary panic from closing the response body. ----------------------------------------------------------------------- Checkpoint: As of this commit, the tests pass reliably at least up to 10K iterations on my Windows laptop, in just over 3 hours. C:\...\httpx>go test -count=10000 -cover -timeout=14h PASS coverage: 98.0% of statements ok github.com/gogama/httpx 10887.090s ----------------------------------------------------------------------- The only two significant uncovered lines in `client.go` are line 318 (the channel read in setTimer, somewhat surprising to me), and line 388 (the fallthrough branch in the switch statemenet in cleanupWave (this isn't surprising, it's a small bit of missing test coverage). --- client.go | 33 +++-- client_test.go | 357 ++++++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 340 insertions(+), 50 deletions(-) diff --git a/client.go b/client.go index 7ac53b6..215c957 100644 --- a/client.go +++ b/client.go @@ -207,12 +207,11 @@ func (es *execState) wave() bool { es.handleCheckpoint(attempt) es.installAttempt(0, nil, nil, nil, nil) - d := es.racingPolicy.Schedule(es.exec) - es.setTimer(d) + es.setTimer(es.racingPolicy.Schedule(es.exec)) - // Flag drain indicates whether to finish all attempts in the wave. It is - // set true as soon as any one attempt in the wave finishes, whether the - // attempt is retryable or not. + // Flag drain indicates whether to close out the wave, finishing in flight + // attempts but not starting any new ones. It is set true as soon as any one + // attempt in the wave finishes, whether the attempt is retryable or not. // // Flag halt indicates whether to stop the whole execution. It is set true // as soon as a non-retryable attempt is detected. @@ -244,8 +243,7 @@ func (es *execState) wave() bool { es.exec.Racing++ es.handleCheckpoint(attempt) es.installAttempt(attempt.index, nil, nil, nil, nil) - d = es.racingPolicy.Schedule(es.exec) - es.setTimer(d) + es.setTimer(es.racingPolicy.Schedule(es.exec)) } } } @@ -293,7 +291,7 @@ func (es *execState) handleCheckpoint(attempt *attemptState) (drain bool, halt b attempt.body = es.exec.Body attempt.checkpoint = done drain = true - halt = es.planCancelled() || !es.retryPolicy.Decide(es.exec) + halt = attempt.redundant || es.planCancelled() || !es.retryPolicy.Decide(es.exec) return case panicked: es.exec.Racing-- @@ -390,6 +388,8 @@ func (es *execState) cleanupWave() { fallthrough case readBody: es.exec.Racing-- + case panicked: + es.exec.Racing-- default: panic("httpx: bad attempt checkpoint") } @@ -499,6 +499,13 @@ func (as *attemptState) recoverPanic() { return } + // Communicate the panic. + defer func() { + as.panicVal = r + as.checkpoint = panicked + as.es.signal <- as + }() + // Close the body. If checkpoint is already readBodyClosing, it means // the panic likely emanated from calling Close() and there's no // point doing it again. @@ -507,18 +514,10 @@ func (as *attemptState) recoverPanic() { if resp != nil { body := resp.Body if body != nil { - func() { - defer func() { _ = recover() }() - _ = body.Close() - }() + _ = body.Close() } } } - - // Communicate the panic. - as.panicVal = r - as.checkpoint = panicked - as.es.signal <- as } func (as *attemptState) maybeRedundant(err error) error { diff --git a/client_test.go b/client_test.go index 7a965b1..dac3196 100644 --- a/client_test.go +++ b/client_test.go @@ -19,6 +19,8 @@ import ( "testing" "time" + "github.com/gogama/httpx/racing" + "github.com/gogama/httpx/request" "github.com/gogama/httpx/retry" "github.com/gogama/httpx/timeout" @@ -40,7 +42,7 @@ func TestClient(t *testing.T) { t.Run("plan cancel", testClientPlanCancel) t.Run("plan replace", testClientPlanChange) t.Run("close idle connections", testClientCloseIdleConnections) - //t.Run("racing", testClientRacing) FIXME + t.Run("racing", testClientRacing) } func TestURLErrorOp(t *testing.T) { @@ -749,6 +751,7 @@ func testClientGoroutinePanicCore(t *testing.T) { Panic(panicVal). Once() mockReadCloser.On("Close"). + Return(nil). Once() return mockReadCloser }, @@ -1012,33 +1015,25 @@ func testClientCloseIdleConnections(t *testing.T) { }) } -func TestClientRacing(t *testing.T) { - // What are some test cases we can do? - // - // 1. Schedule an extra attempt. Return success, take a long time. - // Ensure long time taker cancelled as redundant AND that all - // the handlers are called as expected. - // - // 2. Ensure plan cancelled in the middle of the wave loop doesn't - // keep creating new attempts. - // - // 3. Ensure plan cancelled while multiple concurrent requests running - // is correctly detected. - // - // 4. Panic in a handler while a bunch of parallel request attempts - // are running and ensure the cleanup code runs. +func testClientRacing(t *testing.T) { + t.Run("never start", testClientRacingNeverStart) + t.Run("retry", testClientRacingRetry) + t.Run("cancel redundant", testClientRacingCancelRedundant) + t.Run("plan cancel in wave", testRacingPlanCancelDuringWaveLoop) + t.Run("panic", testClientRacingPanic) } -func TestClientRacingNeverStart(t *testing.T) { +func testClientRacingNeverStart(t *testing.T) { // This test schedules one concurrent attempt but never starts it. doer := newMockHTTPDoer(t) retryPolicy := newMockRetryPolicy(t) racingPolicy := newMockRacingPolicy(t) cl := Client{ - HTTPDoer: doer, - RetryPolicy: retryPolicy, - RacingPolicy: racingPolicy, - Handlers: &HandlerGroup{}, + HTTPDoer: doer, + TimeoutPolicy: timeout.Infinite, + RetryPolicy: retryPolicy, + RacingPolicy: racingPolicy, + Handlers: &HandlerGroup{}, } trace := cl.addTraceHandlers() waiter := make(chan time.Time) @@ -1082,7 +1077,7 @@ func TestClientRacingNeverStart(t *testing.T) { }, trace.calls) } -func TestClientRacingRetry(t *testing.T) { +func testClientRacingRetry(t *testing.T) { // This test schedules two additional attempts to race the first one, // and all three requests "fail" (result in a positive retry decision). // On the next wave, no new concurrent attempts are scheduled and the @@ -1091,10 +1086,11 @@ func TestClientRacingRetry(t *testing.T) { retryPolicy := newMockRetryPolicy(t) racingPolicy := newMockRacingPolicy(t) cl := Client{ - HTTPDoer: doer, - RetryPolicy: retryPolicy, - RacingPolicy: racingPolicy, - Handlers: &HandlerGroup{}, + HTTPDoer: doer, + TimeoutPolicy: timeout.Infinite, + RetryPolicy: retryPolicy, + RacingPolicy: racingPolicy, + Handlers: &HandlerGroup{}, } trace := cl.addTraceHandlers() bridge1, bridge2 := make(chan time.Time), make(chan time.Time) @@ -1105,24 +1101,24 @@ func TestClientRacingRetry(t *testing.T) { // Chain the first three attempts (all are racing) so that the first one // waits for the second one to start, and the second one waits for the // third one to start. - callOrder := callOrder{} + callOrder := callOrderMatcher{} doer.On("Do", mock.MatchedBy(func(_ *http.Request) bool { return callOrder.Match(0) })). WaitUntil(bridge1). Return(&http.Response{StatusCode: 200, Body: ioutil.NopCloser(strings.NewReader("racing-retry-0"))}, nil). - Times(1) + Once() doer.On("Do", mock.MatchedBy(func(_ *http.Request) bool { return callOrder.Match(1) })). WaitUntil(bridge2). Run(func(_ mock.Arguments) { bridge1 <- time.Now() }). Return(&http.Response{StatusCode: 200, Body: ioutil.NopCloser(strings.NewReader("racing-retry-1"))}, nil). - Times(1) + Once() doer.On("Do", mock.MatchedBy(func(_ *http.Request) bool { return callOrder.Match(2) })). Run(func(_ mock.Arguments) { bridge2 <- time.Now() }). Return(&http.Response{StatusCode: 200, Body: ioutil.NopCloser(strings.NewReader("racing-retry=2"))}, nil). - Times(1) + Once() // The last attempt occurs in wave 3, by itself (no racing). doer.On("Do", mock.MatchedBy(func(_ *http.Request) bool { return callOrder.Match(3) })). Return(&http.Response{StatusCode: 200, Body: ioutil.NopCloser(strings.NewReader("racing-retry=3"))}, nil). - Times(1) + Once() retryPolicy.On("Decide", mock.MatchedBy(func(e *request.Execution) bool { return e.Wave == 0 && e.Attempt <= 2 })).Return(true).Times(3) @@ -1168,6 +1164,301 @@ func TestClientRacingRetry(t *testing.T) { }, trace.calls[n-5:n]) } +func testClientRacingCancelRedundant(t *testing.T) { + // This test schedules and starts a second request racing the first + // one. The second attempt completes before the first one. The test + // ensures the first attempt is cancelled as redundant. + testCases := []struct { + name string + setupFirstDoCall func(t *testing.T, doCall *mock.Call) + afterAttemptMatcher func(e *request.Execution) bool + }{ + { + name: "cancel Do", + setupFirstDoCall: func(t *testing.T, doCall *mock.Call) { + doCall. + Run(func(args mock.Arguments) { + req := args.Get(0).(*http.Request) + ctx := req.Context() + <-ctx.Done() + }). + Return(nil, context.Canceled). + Once() + }, + afterAttemptMatcher: func(e *request.Execution) bool { + return e.Response == nil && e.Body == nil + }, + }, + { + name: "cancel read body", + setupFirstDoCall: func(t *testing.T, doCall *mock.Call) { + var ctx context.Context + mockBody := newMockReadCloser(t) + mockBody.On("Read", mock.Anything). + Run(func(_ mock.Arguments) { + <-ctx.Done() + }). + Return(0, context.Canceled). + Once() + mockBody.On("Close"). + Return(nil). + Once() + doCall. + Run(func(args mock.Arguments) { + req := args.Get(0).(*http.Request) + ctx = req.Context() + }). + Return(&http.Response{StatusCode: 200, Body: mockBody}, nil). + Once() + }, + afterAttemptMatcher: func(e *request.Execution) bool { + return e.StatusCode() == 200 && e.Response != nil && len(e.Body) == 0 + }, + }, + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + doer := newMockHTTPDoer(t) + retryPolicy := newMockRetryPolicy(t) + racingPolicy := newMockRacingPolicy(t) + cl := Client{ + HTTPDoer: doer, + TimeoutPolicy: timeout.Infinite, + RetryPolicy: retryPolicy, + RacingPolicy: racingPolicy, + Handlers: &HandlerGroup{}, + } + callOrder := callOrderMatcher{} + firstDoCall := doer.On("Do", mock.MatchedBy(func(_ *http.Request) bool { return callOrder.Match(0) })) + testCase.setupFirstDoCall(t, firstDoCall) + doer.On("Do", mock.MatchedBy(func(_ *http.Request) bool { return callOrder.Match(1) })). + Return(&http.Response{StatusCode: 418, Body: ioutil.NopCloser(strings.NewReader("I'm a teapot"))}, nil). + Once() + retryPolicy.On("Decide", mock.AnythingOfType("*request.Execution")). + Return(false). + Once() + racingPolicy.On("Schedule", mock.MatchedBy(func(e *request.Execution) bool { + return e.Attempt == 0 && e.Wave == 0 && e.Racing == 1 + })).Return(time.Microsecond).Once() + racingPolicy.On("Schedule", mock.MatchedBy(func(e *request.Execution) bool { + return e.Attempt == 1 && e.Wave == 0 && e.Racing == 2 + })).Return(time.Duration(0)).Once() + racingPolicy.On("Start", mock.MatchedBy(func(e *request.Execution) bool { + return e.Attempt == 0 && e.Wave == 0 && e.Racing == 1 + })).Return(true).Once() + cl.Handlers.mock(AfterAttempt).On("Handle", AfterAttempt, mock.MatchedBy(func(e *request.Execution) bool { + return e.StatusCode() == 418 && bytes.Equal(e.Body, []byte("I'm a teapot")) && e.Err == nil && e.Wave == 0 + })).Once() + cl.Handlers.mock(AfterAttempt).On("Handle", AfterAttempt, mock.MatchedBy(func(e *request.Execution) bool { + return errors.Unwrap(e.Err) == racing.Redundant && e.Wave == 0 && testCase.afterAttemptMatcher(e) + })).Once() + + e, err := cl.Get("test") + + doer.AssertExpectations(t) + retryPolicy.AssertExpectations(t) + racingPolicy.AssertExpectations(t) + cl.Handlers.assertExpectations(t) + assert.NoError(t, err) + require.NotNil(t, e) + assert.Equal(t, 418, e.StatusCode()) + assert.Equal(t, []byte("I'm a teapot"), e.Body) + }) + } +} + +func testRacingPlanCancelDuringWaveLoop(t *testing.T) { + // This test cancels the plan context while the wave loop is in the + // process of racing attempts in the wave. It verifies that the wave + // ends and that the client doesn't continue generating new attempts. + // ----------------------------------------------------------------- + N := 5 // Number of attempts to start before cancelling plan context. + planCtx, cancelPlanCtx := context.WithCancel(context.Background()) + defer cancelPlanCtx() + doer := newMockHTTPDoer(t) + retryPolicy := newMockRetryPolicy(t) + racingPolicy := newMockRacingPolicy(t) + cl := Client{ + HTTPDoer: doer, + TimeoutPolicy: timeout.Infinite, + RetryPolicy: retryPolicy, + RacingPolicy: racingPolicy, + Handlers: &HandlerGroup{}, + } + cancelRequest := make(chan time.Time, N) + defer close(cancelRequest) + cl.Handlers.mock(BeforeAttempt). + On("Handle", BeforeAttempt, mock.MatchedBy(func(e *request.Execution) bool { + return e.Wave == 0 + })). + Run(func(args mock.Arguments) { + e := args.Get(1).(*request.Execution) + if e.Attempt == N-1 { + cancelPlanCtx() + now := time.Now() + for i := 0; i < N; i++ { + cancelRequest <- now + } + } else if e.Attempt >= N { + cancelRequest <- time.Now() + } + }) + doer.On("Do", mock.AnythingOfType("*http.Request")). + WaitUntil(cancelRequest). + Return(nil, context.Canceled) + racingPolicy.On("Schedule", mock.AnythingOfType("*request.Execution")). + Return(100 * time.Microsecond) + racingPolicy.On("Start", mock.AnythingOfType("*request.Execution")). + Return(true) + + p, err := request.NewPlanWithContext(planCtx, "DELETE", "test", nil) + require.NoError(t, err) + e, err := cl.Do(p) + + doer.AssertExpectations(t) + racingPolicy.AssertExpectations(t) + cl.Handlers.assertExpectations(t) + assert.Error(t, err) + var urlError *url.Error + require.ErrorAs(t, err, &urlError) + assert.Same(t, context.Canceled, urlError.Unwrap()) + require.NotNil(t, e) + assert.Same(t, err, e.Err) + assert.Equal(t, 0, e.Wave) + assert.GreaterOrEqual(t, e.Attempt, 0) + assert.LessOrEqual(t, e.Attempt, N+5) +} + +func testClientRacingPanic(t *testing.T) { + // This test starts multiple concurrent racing attempts and then + // triggers at various points both on the robust client Do request's + // main goroutine and within the attempt send/read goroutines. + + N := 10 // Number of attempts to start before panicking + + // Function used to setup the HTTPDoer to block until N is reached then + // return a response. + setupHTTPDoerBlock := func(_ *testing.T, doer *mockHTTPDoer, ch <-chan time.Time) { + doer.On("Do", mock.AnythingOfType("*http.Request")). + WaitUntil(ch). + Return(&http.Response{StatusCode: 218, Body: ioutil.NopCloser(strings.NewReader("This is fine."))}, nil). + Times(N) + } + + // Function used to setup the HTTPDoer to block until N is reached then + // panic. + setupHTTPDoerSendPanic := func(_ *testing.T, doer *mockHTTPDoer, ch <-chan time.Time) { + doer.On("Do", mock.AnythingOfType("*http.Request")). + WaitUntil(ch). + Panic("mock HTTP doer panic!"). + Times(N) + } + + // Function used to setup the HTTPDoer to block until N is reached, then + // return a body reader that panics. + setupHTTPDoerReadPanic := func(t *testing.T, doer *mockHTTPDoer, ch <-chan time.Time) { + mockBody := newMockReadCloser(t) + mockBody.On("Read", mock.Anything). + Panic("mock body panic!"). + Times(N) + mockBody.On("Close"). + Return(nil) + doer.On("Do", mock.AnythingOfType("*http.Request")). + WaitUntil(ch). + Return(&http.Response{StatusCode: 218, Body: mockBody}, nil). + Times(N) + } + + // Function used to configure an event handler to panic. + setupEventHandlerPanic := func(handlers *HandlerGroup, event Event) { + handlers.mock(event). + On("Handle", event, mock.AnythingOfType("*request.Execution")). + Panic("event handler panic - " + event.Name() + "!"). + Once() + } + + testCases := []struct { + name string + setupHTTPDoer func(t *testing.T, doer *mockHTTPDoer, ch <-chan time.Time) + setupEventHandler func(handlers *HandlerGroup, event Event) + event Event + panicVal string + }{ + { + name: "BeforeReadBody handler", + setupHTTPDoer: setupHTTPDoerBlock, + setupEventHandler: setupEventHandlerPanic, + event: BeforeReadBody, + panicVal: "event handler panic - BeforeReadBody!", + }, + { + name: "AfterAttempt handler", + setupHTTPDoer: setupHTTPDoerBlock, + setupEventHandler: setupEventHandlerPanic, + event: AfterAttempt, + panicVal: "event handler panic - AfterAttempt!", + }, + { + name: "send", + setupHTTPDoer: setupHTTPDoerSendPanic, + panicVal: "mock HTTP doer panic!", + }, + { + name: "read body", + setupHTTPDoer: setupHTTPDoerReadPanic, + panicVal: "mock body panic!", + }, + } + + schedule := make([]time.Duration, N) + for i := 0; i < N; i++ { + schedule[i] = time.Duration(i) * 50 * time.Microsecond + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + doer := newMockHTTPDoer(t) + retryPolicy := newMockRetryPolicy(t) + cl := Client{ + HTTPDoer: doer, + TimeoutPolicy: timeout.Infinite, + RetryPolicy: retryPolicy, + RacingPolicy: racing.NewPolicy(racing.NewStaticScheduler(schedule...), racing.AlwaysStart), + Handlers: &HandlerGroup{}, + } + showtime := make(chan time.Time, N) + // defer close(showtime) + testCase.setupHTTPDoer(t, doer, showtime) + cl.Handlers.mock(BeforeAttempt). + On("Handle", BeforeAttempt, mock.MatchedBy(func(e *request.Execution) bool { + return e.Wave == 0 + })). + Run(func(args mock.Arguments) { + e := args.Get(1).(*request.Execution) + if e.Attempt == N-1 { + now := time.Now() + for i := 0; i < N; i++ { + showtime <- now + } + } else if e.Attempt >= N { + showtime <- time.Now() + } + }) + if testCase.setupEventHandler != nil { + testCase.setupEventHandler(cl.Handlers, testCase.event) + } + + assert.PanicsWithValue(t, testCase.panicVal, func() { + _, _ = cl.Get("test") + }) + + doer.AssertExpectations(t) + cl.Handlers.assertExpectations(t) + }) + } +} + type mockHTTPDoer struct { mock.Mock } @@ -1336,10 +1627,10 @@ func (m *mockRacingPolicy) Start(e *request.Execution) bool { return args.Bool(0) } -type callOrder struct { +type callOrderMatcher struct { counter int32 } -func (com *callOrder) Match(x int32) bool { +func (com *callOrderMatcher) Match(x int32) bool { return atomic.CompareAndSwapInt32(&com.counter, x, x+1) }