diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index bb80e038c21..107c5b7f9a9 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -292,7 +292,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { log.Warn("watch resource group meta failed", zap.Error(err)) timerutil.SafeResetTimer(watchRetryTimer, watchRetryInterval) failpoint.Inject("watchStreamError", func() { - watchRetryTimer.Reset(20 * time.Millisecond) + timerutil.SafeResetTimer(watchRetryTimer, 20*time.Millisecond) }) } } @@ -336,7 +336,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { watchMetaChannel = nil timerutil.SafeResetTimer(watchRetryTimer, watchRetryInterval) failpoint.Inject("watchStreamError", func() { - watchRetryTimer.Reset(20 * time.Millisecond) + timerutil.SafeResetTimer(watchRetryTimer, 20*time.Millisecond) }) continue } @@ -372,7 +372,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { watchConfigChannel = nil timerutil.SafeResetTimer(watchRetryTimer, watchRetryInterval) failpoint.Inject("watchStreamError", func() { - watchRetryTimer.Reset(20 * time.Millisecond) + timerutil.SafeResetTimer(watchRetryTimer, 20*time.Millisecond) }) continue } @@ -525,7 +525,7 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context, ClientUniqueId: c.clientUniqueID, } if c.ruConfig.DegradedModeWaitDuration > 0 && c.responseDeadlineCh == nil { - c.run.responseDeadline.Reset(c.ruConfig.DegradedModeWaitDuration) + timerutil.SafeResetTimer(c.run.responseDeadline, c.ruConfig.DegradedModeWaitDuration) c.responseDeadlineCh = c.run.responseDeadline.C } go func() { diff --git a/client/retry/backoff.go b/client/retry/backoff.go index e2ca9ab3972..39751734ff1 100644 --- a/client/retry/backoff.go +++ b/client/retry/backoff.go @@ -34,9 +34,11 @@ func (bo *BackOffer) Exec( fn func() error, ) error { if err := fn(); err != nil { + timer := time.NewTimer(bo.nextInterval()) + defer timer.Stop() select { case <-ctx.Done(): - case <-time.After(bo.nextInterval()): + case <-timer.C: failpoint.Inject("backOffExecute", func() { testBackOffExecuteFlag = true }) diff --git a/pkg/syncer/client.go b/pkg/syncer/client.go index 68ce35899fc..8a15dc17ff4 100644 --- a/pkg/syncer/client.go +++ b/pkg/syncer/client.go @@ -27,6 +27,7 @@ import ( "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/pkg/utils/grpcutil" "github.com/tikv/pd/pkg/utils/logutil" + "github.com/tikv/pd/pkg/utils/timerutil" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/backoff" @@ -142,13 +143,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { } } log.Error("server failed to establish sync stream with leader", zap.String("server", s.server.Name()), zap.String("leader", s.server.GetLeader().GetName()), errs.ZapError(err)) - if !timer.Stop() { - select { - case <-timer.C: // try to drain from the channel - default: - } - } - timer.Reset(retryInterval) + timerutil.SafeResetTimer(timer, retryInterval) select { case <-ctx.Done(): log.Info("stop synchronizing with leader due to context canceled") @@ -166,13 +161,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { if err = stream.CloseSend(); err != nil { log.Error("failed to terminate client stream", errs.ZapError(errs.ErrGRPCCloseSend, err)) } - if !timer.Stop() { - select { - case <-timer.C: // try to drain from the channel - default: - } - } - timer.Reset(retryInterval) + timerutil.SafeResetTimer(timer, retryInterval) select { case <-ctx.Done(): log.Info("stop synchronizing with leader due to context canceled")