diff --git a/client.go b/client.go index 7ac53b6..215c957 100644 --- a/client.go +++ b/client.go @@ -207,12 +207,11 @@ func (es *execState) wave() bool { es.handleCheckpoint(attempt) es.installAttempt(0, nil, nil, nil, nil) - d := es.racingPolicy.Schedule(es.exec) - es.setTimer(d) + es.setTimer(es.racingPolicy.Schedule(es.exec)) - // Flag drain indicates whether to finish all attempts in the wave. It is - // set true as soon as any one attempt in the wave finishes, whether the - // attempt is retryable or not. + // Flag drain indicates whether to close out the wave, finishing in flight + // attempts but not starting any new ones. It is set true as soon as any one + // attempt in the wave finishes, whether the attempt is retryable or not. // // Flag halt indicates whether to stop the whole execution. It is set true // as soon as a non-retryable attempt is detected. @@ -244,8 +243,7 @@ func (es *execState) wave() bool { es.exec.Racing++ es.handleCheckpoint(attempt) es.installAttempt(attempt.index, nil, nil, nil, nil) - d = es.racingPolicy.Schedule(es.exec) - es.setTimer(d) + es.setTimer(es.racingPolicy.Schedule(es.exec)) } } } @@ -293,7 +291,7 @@ func (es *execState) handleCheckpoint(attempt *attemptState) (drain bool, halt b attempt.body = es.exec.Body attempt.checkpoint = done drain = true - halt = es.planCancelled() || !es.retryPolicy.Decide(es.exec) + halt = attempt.redundant || es.planCancelled() || !es.retryPolicy.Decide(es.exec) return case panicked: es.exec.Racing-- @@ -390,6 +388,8 @@ func (es *execState) cleanupWave() { fallthrough case readBody: es.exec.Racing-- + case panicked: + es.exec.Racing-- default: panic("httpx: bad attempt checkpoint") } @@ -499,6 +499,13 @@ func (as *attemptState) recoverPanic() { return } + // Communicate the panic. + defer func() { + as.panicVal = r + as.checkpoint = panicked + as.es.signal <- as + }() + // Close the body. If checkpoint is already readBodyClosing, it means // the panic likely emanated from calling Close() and there's no // point doing it again. @@ -507,18 +514,10 @@ func (as *attemptState) recoverPanic() { if resp != nil { body := resp.Body if body != nil { - func() { - defer func() { _ = recover() }() - _ = body.Close() - }() + _ = body.Close() } } } - - // Communicate the panic. - as.panicVal = r - as.checkpoint = panicked - as.es.signal <- as } func (as *attemptState) maybeRedundant(err error) error { diff --git a/client_test.go b/client_test.go index 7a965b1..dac3196 100644 --- a/client_test.go +++ b/client_test.go @@ -19,6 +19,8 @@ import ( "testing" "time" + "github.com/gogama/httpx/racing" + "github.com/gogama/httpx/request" "github.com/gogama/httpx/retry" "github.com/gogama/httpx/timeout" @@ -40,7 +42,7 @@ func TestClient(t *testing.T) { t.Run("plan cancel", testClientPlanCancel) t.Run("plan replace", testClientPlanChange) t.Run("close idle connections", testClientCloseIdleConnections) - //t.Run("racing", testClientRacing) FIXME + t.Run("racing", testClientRacing) } func TestURLErrorOp(t *testing.T) { @@ -749,6 +751,7 @@ func testClientGoroutinePanicCore(t *testing.T) { Panic(panicVal). Once() mockReadCloser.On("Close"). + Return(nil). Once() return mockReadCloser }, @@ -1012,33 +1015,25 @@ func testClientCloseIdleConnections(t *testing.T) { }) } -func TestClientRacing(t *testing.T) { - // What are some test cases we can do? - // - // 1. Schedule an extra attempt. Return success, take a long time. - // Ensure long time taker cancelled as redundant AND that all - // the handlers are called as expected. - // - // 2. Ensure plan cancelled in the middle of the wave loop doesn't - // keep creating new attempts. - // - // 3. Ensure plan cancelled while multiple concurrent requests running - // is correctly detected. - // - // 4. Panic in a handler while a bunch of parallel request attempts - // are running and ensure the cleanup code runs. +func testClientRacing(t *testing.T) { + t.Run("never start", testClientRacingNeverStart) + t.Run("retry", testClientRacingRetry) + t.Run("cancel redundant", testClientRacingCancelRedundant) + t.Run("plan cancel in wave", testRacingPlanCancelDuringWaveLoop) + t.Run("panic", testClientRacingPanic) } -func TestClientRacingNeverStart(t *testing.T) { +func testClientRacingNeverStart(t *testing.T) { // This test schedules one concurrent attempt but never starts it. doer := newMockHTTPDoer(t) retryPolicy := newMockRetryPolicy(t) racingPolicy := newMockRacingPolicy(t) cl := Client{ - HTTPDoer: doer, - RetryPolicy: retryPolicy, - RacingPolicy: racingPolicy, - Handlers: &HandlerGroup{}, + HTTPDoer: doer, + TimeoutPolicy: timeout.Infinite, + RetryPolicy: retryPolicy, + RacingPolicy: racingPolicy, + Handlers: &HandlerGroup{}, } trace := cl.addTraceHandlers() waiter := make(chan time.Time) @@ -1082,7 +1077,7 @@ func TestClientRacingNeverStart(t *testing.T) { }, trace.calls) } -func TestClientRacingRetry(t *testing.T) { +func testClientRacingRetry(t *testing.T) { // This test schedules two additional attempts to race the first one, // and all three requests "fail" (result in a positive retry decision). // On the next wave, no new concurrent attempts are scheduled and the @@ -1091,10 +1086,11 @@ func TestClientRacingRetry(t *testing.T) { retryPolicy := newMockRetryPolicy(t) racingPolicy := newMockRacingPolicy(t) cl := Client{ - HTTPDoer: doer, - RetryPolicy: retryPolicy, - RacingPolicy: racingPolicy, - Handlers: &HandlerGroup{}, + HTTPDoer: doer, + TimeoutPolicy: timeout.Infinite, + RetryPolicy: retryPolicy, + RacingPolicy: racingPolicy, + Handlers: &HandlerGroup{}, } trace := cl.addTraceHandlers() bridge1, bridge2 := make(chan time.Time), make(chan time.Time) @@ -1105,24 +1101,24 @@ func TestClientRacingRetry(t *testing.T) { // Chain the first three attempts (all are racing) so that the first one // waits for the second one to start, and the second one waits for the // third one to start. - callOrder := callOrder{} + callOrder := callOrderMatcher{} doer.On("Do", mock.MatchedBy(func(_ *http.Request) bool { return callOrder.Match(0) })). WaitUntil(bridge1). Return(&http.Response{StatusCode: 200, Body: ioutil.NopCloser(strings.NewReader("racing-retry-0"))}, nil). - Times(1) + Once() doer.On("Do", mock.MatchedBy(func(_ *http.Request) bool { return callOrder.Match(1) })). WaitUntil(bridge2). Run(func(_ mock.Arguments) { bridge1 <- time.Now() }). Return(&http.Response{StatusCode: 200, Body: ioutil.NopCloser(strings.NewReader("racing-retry-1"))}, nil). - Times(1) + Once() doer.On("Do", mock.MatchedBy(func(_ *http.Request) bool { return callOrder.Match(2) })). Run(func(_ mock.Arguments) { bridge2 <- time.Now() }). Return(&http.Response{StatusCode: 200, Body: ioutil.NopCloser(strings.NewReader("racing-retry=2"))}, nil). - Times(1) + Once() // The last attempt occurs in wave 3, by itself (no racing). doer.On("Do", mock.MatchedBy(func(_ *http.Request) bool { return callOrder.Match(3) })). Return(&http.Response{StatusCode: 200, Body: ioutil.NopCloser(strings.NewReader("racing-retry=3"))}, nil). - Times(1) + Once() retryPolicy.On("Decide", mock.MatchedBy(func(e *request.Execution) bool { return e.Wave == 0 && e.Attempt <= 2 })).Return(true).Times(3) @@ -1168,6 +1164,301 @@ func TestClientRacingRetry(t *testing.T) { }, trace.calls[n-5:n]) } +func testClientRacingCancelRedundant(t *testing.T) { + // This test schedules and starts a second request racing the first + // one. The second attempt completes before the first one. The test + // ensures the first attempt is cancelled as redundant. + testCases := []struct { + name string + setupFirstDoCall func(t *testing.T, doCall *mock.Call) + afterAttemptMatcher func(e *request.Execution) bool + }{ + { + name: "cancel Do", + setupFirstDoCall: func(t *testing.T, doCall *mock.Call) { + doCall. + Run(func(args mock.Arguments) { + req := args.Get(0).(*http.Request) + ctx := req.Context() + <-ctx.Done() + }). + Return(nil, context.Canceled). + Once() + }, + afterAttemptMatcher: func(e *request.Execution) bool { + return e.Response == nil && e.Body == nil + }, + }, + { + name: "cancel read body", + setupFirstDoCall: func(t *testing.T, doCall *mock.Call) { + var ctx context.Context + mockBody := newMockReadCloser(t) + mockBody.On("Read", mock.Anything). + Run(func(_ mock.Arguments) { + <-ctx.Done() + }). + Return(0, context.Canceled). + Once() + mockBody.On("Close"). + Return(nil). + Once() + doCall. + Run(func(args mock.Arguments) { + req := args.Get(0).(*http.Request) + ctx = req.Context() + }). + Return(&http.Response{StatusCode: 200, Body: mockBody}, nil). + Once() + }, + afterAttemptMatcher: func(e *request.Execution) bool { + return e.StatusCode() == 200 && e.Response != nil && len(e.Body) == 0 + }, + }, + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + doer := newMockHTTPDoer(t) + retryPolicy := newMockRetryPolicy(t) + racingPolicy := newMockRacingPolicy(t) + cl := Client{ + HTTPDoer: doer, + TimeoutPolicy: timeout.Infinite, + RetryPolicy: retryPolicy, + RacingPolicy: racingPolicy, + Handlers: &HandlerGroup{}, + } + callOrder := callOrderMatcher{} + firstDoCall := doer.On("Do", mock.MatchedBy(func(_ *http.Request) bool { return callOrder.Match(0) })) + testCase.setupFirstDoCall(t, firstDoCall) + doer.On("Do", mock.MatchedBy(func(_ *http.Request) bool { return callOrder.Match(1) })). + Return(&http.Response{StatusCode: 418, Body: ioutil.NopCloser(strings.NewReader("I'm a teapot"))}, nil). + Once() + retryPolicy.On("Decide", mock.AnythingOfType("*request.Execution")). + Return(false). + Once() + racingPolicy.On("Schedule", mock.MatchedBy(func(e *request.Execution) bool { + return e.Attempt == 0 && e.Wave == 0 && e.Racing == 1 + })).Return(time.Microsecond).Once() + racingPolicy.On("Schedule", mock.MatchedBy(func(e *request.Execution) bool { + return e.Attempt == 1 && e.Wave == 0 && e.Racing == 2 + })).Return(time.Duration(0)).Once() + racingPolicy.On("Start", mock.MatchedBy(func(e *request.Execution) bool { + return e.Attempt == 0 && e.Wave == 0 && e.Racing == 1 + })).Return(true).Once() + cl.Handlers.mock(AfterAttempt).On("Handle", AfterAttempt, mock.MatchedBy(func(e *request.Execution) bool { + return e.StatusCode() == 418 && bytes.Equal(e.Body, []byte("I'm a teapot")) && e.Err == nil && e.Wave == 0 + })).Once() + cl.Handlers.mock(AfterAttempt).On("Handle", AfterAttempt, mock.MatchedBy(func(e *request.Execution) bool { + return errors.Unwrap(e.Err) == racing.Redundant && e.Wave == 0 && testCase.afterAttemptMatcher(e) + })).Once() + + e, err := cl.Get("test") + + doer.AssertExpectations(t) + retryPolicy.AssertExpectations(t) + racingPolicy.AssertExpectations(t) + cl.Handlers.assertExpectations(t) + assert.NoError(t, err) + require.NotNil(t, e) + assert.Equal(t, 418, e.StatusCode()) + assert.Equal(t, []byte("I'm a teapot"), e.Body) + }) + } +} + +func testRacingPlanCancelDuringWaveLoop(t *testing.T) { + // This test cancels the plan context while the wave loop is in the + // process of racing attempts in the wave. It verifies that the wave + // ends and that the client doesn't continue generating new attempts. + // ----------------------------------------------------------------- + N := 5 // Number of attempts to start before cancelling plan context. + planCtx, cancelPlanCtx := context.WithCancel(context.Background()) + defer cancelPlanCtx() + doer := newMockHTTPDoer(t) + retryPolicy := newMockRetryPolicy(t) + racingPolicy := newMockRacingPolicy(t) + cl := Client{ + HTTPDoer: doer, + TimeoutPolicy: timeout.Infinite, + RetryPolicy: retryPolicy, + RacingPolicy: racingPolicy, + Handlers: &HandlerGroup{}, + } + cancelRequest := make(chan time.Time, N) + defer close(cancelRequest) + cl.Handlers.mock(BeforeAttempt). + On("Handle", BeforeAttempt, mock.MatchedBy(func(e *request.Execution) bool { + return e.Wave == 0 + })). + Run(func(args mock.Arguments) { + e := args.Get(1).(*request.Execution) + if e.Attempt == N-1 { + cancelPlanCtx() + now := time.Now() + for i := 0; i < N; i++ { + cancelRequest <- now + } + } else if e.Attempt >= N { + cancelRequest <- time.Now() + } + }) + doer.On("Do", mock.AnythingOfType("*http.Request")). + WaitUntil(cancelRequest). + Return(nil, context.Canceled) + racingPolicy.On("Schedule", mock.AnythingOfType("*request.Execution")). + Return(100 * time.Microsecond) + racingPolicy.On("Start", mock.AnythingOfType("*request.Execution")). + Return(true) + + p, err := request.NewPlanWithContext(planCtx, "DELETE", "test", nil) + require.NoError(t, err) + e, err := cl.Do(p) + + doer.AssertExpectations(t) + racingPolicy.AssertExpectations(t) + cl.Handlers.assertExpectations(t) + assert.Error(t, err) + var urlError *url.Error + require.ErrorAs(t, err, &urlError) + assert.Same(t, context.Canceled, urlError.Unwrap()) + require.NotNil(t, e) + assert.Same(t, err, e.Err) + assert.Equal(t, 0, e.Wave) + assert.GreaterOrEqual(t, e.Attempt, 0) + assert.LessOrEqual(t, e.Attempt, N+5) +} + +func testClientRacingPanic(t *testing.T) { + // This test starts multiple concurrent racing attempts and then + // triggers at various points both on the robust client Do request's + // main goroutine and within the attempt send/read goroutines. + + N := 10 // Number of attempts to start before panicking + + // Function used to setup the HTTPDoer to block until N is reached then + // return a response. + setupHTTPDoerBlock := func(_ *testing.T, doer *mockHTTPDoer, ch <-chan time.Time) { + doer.On("Do", mock.AnythingOfType("*http.Request")). + WaitUntil(ch). + Return(&http.Response{StatusCode: 218, Body: ioutil.NopCloser(strings.NewReader("This is fine."))}, nil). + Times(N) + } + + // Function used to setup the HTTPDoer to block until N is reached then + // panic. + setupHTTPDoerSendPanic := func(_ *testing.T, doer *mockHTTPDoer, ch <-chan time.Time) { + doer.On("Do", mock.AnythingOfType("*http.Request")). + WaitUntil(ch). + Panic("mock HTTP doer panic!"). + Times(N) + } + + // Function used to setup the HTTPDoer to block until N is reached, then + // return a body reader that panics. + setupHTTPDoerReadPanic := func(t *testing.T, doer *mockHTTPDoer, ch <-chan time.Time) { + mockBody := newMockReadCloser(t) + mockBody.On("Read", mock.Anything). + Panic("mock body panic!"). + Times(N) + mockBody.On("Close"). + Return(nil) + doer.On("Do", mock.AnythingOfType("*http.Request")). + WaitUntil(ch). + Return(&http.Response{StatusCode: 218, Body: mockBody}, nil). + Times(N) + } + + // Function used to configure an event handler to panic. + setupEventHandlerPanic := func(handlers *HandlerGroup, event Event) { + handlers.mock(event). + On("Handle", event, mock.AnythingOfType("*request.Execution")). + Panic("event handler panic - " + event.Name() + "!"). + Once() + } + + testCases := []struct { + name string + setupHTTPDoer func(t *testing.T, doer *mockHTTPDoer, ch <-chan time.Time) + setupEventHandler func(handlers *HandlerGroup, event Event) + event Event + panicVal string + }{ + { + name: "BeforeReadBody handler", + setupHTTPDoer: setupHTTPDoerBlock, + setupEventHandler: setupEventHandlerPanic, + event: BeforeReadBody, + panicVal: "event handler panic - BeforeReadBody!", + }, + { + name: "AfterAttempt handler", + setupHTTPDoer: setupHTTPDoerBlock, + setupEventHandler: setupEventHandlerPanic, + event: AfterAttempt, + panicVal: "event handler panic - AfterAttempt!", + }, + { + name: "send", + setupHTTPDoer: setupHTTPDoerSendPanic, + panicVal: "mock HTTP doer panic!", + }, + { + name: "read body", + setupHTTPDoer: setupHTTPDoerReadPanic, + panicVal: "mock body panic!", + }, + } + + schedule := make([]time.Duration, N) + for i := 0; i < N; i++ { + schedule[i] = time.Duration(i) * 50 * time.Microsecond + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + doer := newMockHTTPDoer(t) + retryPolicy := newMockRetryPolicy(t) + cl := Client{ + HTTPDoer: doer, + TimeoutPolicy: timeout.Infinite, + RetryPolicy: retryPolicy, + RacingPolicy: racing.NewPolicy(racing.NewStaticScheduler(schedule...), racing.AlwaysStart), + Handlers: &HandlerGroup{}, + } + showtime := make(chan time.Time, N) + // defer close(showtime) + testCase.setupHTTPDoer(t, doer, showtime) + cl.Handlers.mock(BeforeAttempt). + On("Handle", BeforeAttempt, mock.MatchedBy(func(e *request.Execution) bool { + return e.Wave == 0 + })). + Run(func(args mock.Arguments) { + e := args.Get(1).(*request.Execution) + if e.Attempt == N-1 { + now := time.Now() + for i := 0; i < N; i++ { + showtime <- now + } + } else if e.Attempt >= N { + showtime <- time.Now() + } + }) + if testCase.setupEventHandler != nil { + testCase.setupEventHandler(cl.Handlers, testCase.event) + } + + assert.PanicsWithValue(t, testCase.panicVal, func() { + _, _ = cl.Get("test") + }) + + doer.AssertExpectations(t) + cl.Handlers.assertExpectations(t) + }) + } +} + type mockHTTPDoer struct { mock.Mock } @@ -1336,10 +1627,10 @@ func (m *mockRacingPolicy) Start(e *request.Execution) bool { return args.Bool(0) } -type callOrder struct { +type callOrderMatcher struct { counter int32 } -func (com *callOrder) Match(x int32) bool { +func (com *callOrderMatcher) Match(x int32) bool { return atomic.CompareAndSwapInt32(&com.counter, x, x+1) }