From 751f81c0b1d1a694a52e4f34373e51782838f8ce Mon Sep 17 00:00:00 2001 From: Wenqi Mou Date: Wed, 11 Dec 2024 20:53:29 -0500 Subject: [PATCH 1/5] expose chaos testing Signed-off-by: Wenqi Mou --- br/pkg/streamhelper/advancer.go | 13 ++----- br/pkg/streamhelper/advancer_test.go | 2 +- br/pkg/streamhelper/config/advancer_conf.go | 28 +++++++++----- br/pkg/streamhelper/daemon/owner_daemon.go | 18 +++++++-- .../streamhelper/daemon/owner_daemon_test.go | 38 ++++++++++++++++++- br/pkg/task/stream.go | 2 +- pkg/domain/domain.go | 2 +- 7 files changed, 76 insertions(+), 27 deletions(-) diff --git a/br/pkg/streamhelper/advancer.go b/br/pkg/streamhelper/advancer.go index 8e8263f63fed0..d6a0d265d0e4a 100644 --- a/br/pkg/streamhelper/advancer.go +++ b/br/pkg/streamhelper/advancer.go @@ -90,8 +90,8 @@ func (c *CheckpointAdvancer) HasTask() bool { return c.task != nil } -// HasSubscriber returns whether the advancer is associated with a subscriber. -func (c *CheckpointAdvancer) HasSubscribion() bool { +// HasSubscriptions returns whether the advancer is associated with a subscriber. +func (c *CheckpointAdvancer) HasSubscriptions() bool { c.subscriberMu.Lock() defer c.subscriberMu.Unlock() @@ -117,7 +117,7 @@ func newCheckpointWithTS(ts uint64) *checkpoint { } } -func NewCheckpointWithSpan(s spans.Valued) *checkpoint { +func newCheckpointWithSpan(s spans.Valued) *checkpoint { return &checkpoint{ StartKey: s.Key.StartKey, EndKey: s.Key.EndKey, @@ -270,11 +270,6 @@ func (c *CheckpointAdvancer) WithCheckpoints(f func(*spans.ValueSortedFull)) { f(c.checkpoints) } -// only used for test -func (c *CheckpointAdvancer) NewCheckpoints(cps *spans.ValueSortedFull) { - c.checkpoints = cps -} - func (c *CheckpointAdvancer) fetchRegionHint(ctx context.Context, startKey []byte) string { region, err := locateKeyOfRegion(ctx, c.env, startKey) if err != nil { @@ -473,7 +468,7 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error } func (c *CheckpointAdvancer) setCheckpoint(s spans.Valued) bool { - cp := NewCheckpointWithSpan(s) + cp := newCheckpointWithSpan(s) if cp.TS < c.lastCheckpoint.TS { log.Warn("failed to update global checkpoint: stale", zap.Uint64("old", c.lastCheckpoint.TS), zap.Uint64("new", cp.TS)) diff --git a/br/pkg/streamhelper/advancer_test.go b/br/pkg/streamhelper/advancer_test.go index e4d83c682f789..892873defae5f 100644 --- a/br/pkg/streamhelper/advancer_test.go +++ b/br/pkg/streamhelper/advancer_test.go @@ -475,7 +475,7 @@ func TestRemoveTaskAndFlush(t *testing.T) { }, 10*time.Second, 100*time.Millisecond) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/br/pkg/streamhelper/subscription-handler-loop")) require.Eventually(t, func() bool { - return !adv.HasSubscribion() + return !adv.HasSubscriptions() }, 10*time.Second, 100*time.Millisecond) } diff --git a/br/pkg/streamhelper/config/advancer_conf.go b/br/pkg/streamhelper/config/advancer_conf.go index 24b76741388fb..00c08c1fc041a 100644 --- a/br/pkg/streamhelper/config/advancer_conf.go +++ b/br/pkg/streamhelper/config/advancer_conf.go @@ -11,18 +11,19 @@ import ( const ( flagBackoffTime = "backoff-time" flagTickInterval = "tick-interval" - flagFullScanDiffTick = "full-scan-tick" - flagAdvancingByCache = "advancing-by-cache" flagTryAdvanceThreshold = "try-advance-threshold" flagCheckPointLagLimit = "check-point-lag-limit" - DefaultConsistencyCheckTick = 5 - DefaultTryAdvanceThreshold = 4 * time.Minute - DefaultCheckPointLagLimit = 48 * time.Hour - DefaultBackOffTime = 5 * time.Second - DefaultTickInterval = 12 * time.Second - DefaultFullScanTick = 4 - DefaultAdvanceByCache = true + // used for chaos testing + flagOwnerRetireInterval = "advance-owner-resign-interval" + + DefaultTryAdvanceThreshold = 4 * time.Minute + DefaultCheckPointLagLimit = 48 * time.Hour + DefaultBackOffTime = 5 * time.Second + DefaultTickInterval = 12 * time.Second + + // used for chaos testing, default to disable + DefaultAdvancerOwnerRetireInterval = 0 ) var ( @@ -38,6 +39,11 @@ type Config struct { TryAdvanceThreshold time.Duration `toml:"try-advance-threshold" json:"try-advance-threshold"` // The maximum lag could be tolerated for the checkpoint lag. CheckPointLagLimit time.Duration `toml:"check-point-lag-limit" json:"check-point-lag-limit"` + + // Following configs are used in chaos testings, better not to enable in prod + // + // used to periodically retire advancer owner for chaos testing + AdvancerOwnerRetireInterval time.Duration `toml:"advancer-owner-retire-interval" json:"advancer-owner-retire-interval"` } func DefineFlagsForCheckpointAdvancerConfig(f *pflag.FlagSet) { @@ -49,6 +55,10 @@ func DefineFlagsForCheckpointAdvancerConfig(f *pflag.FlagSet) { "If the checkpoint lag is greater than how long, we would try to poll TiKV for checkpoints.") f.Duration(flagCheckPointLagLimit, DefaultCheckPointLagLimit, "The maximum lag could be tolerated for the checkpoint lag.") + + // used for chaos testing + f.Duration(flagOwnerRetireInterval, DefaultAdvancerOwnerRetireInterval, + "The interval that the owner will retire itself") } func Default() Config { diff --git a/br/pkg/streamhelper/daemon/owner_daemon.go b/br/pkg/streamhelper/daemon/owner_daemon.go index 5956b643c971d..84f17c210d6e3 100644 --- a/br/pkg/streamhelper/daemon/owner_daemon.go +++ b/br/pkg/streamhelper/daemon/owner_daemon.go @@ -24,14 +24,20 @@ type OwnerDaemon struct { // When not `nil`, implies the daemon is running. cancel context.CancelFunc + + // leader retire internal, used for chaos testing, suggest not to enable in prod + // default to 0 to disable + retireInterval time.Duration + ownerStartTime time.Time } // New creates a new owner daemon. -func New(daemon Interface, manager owner.Manager, tickInterval time.Duration) *OwnerDaemon { +func New(daemon Interface, manager owner.Manager, tickInterval time.Duration, retireInternal time.Duration) *OwnerDaemon { return &OwnerDaemon{ - daemon: daemon, - manager: manager, - tickInterval: tickInterval, + daemon: daemon, + manager: manager, + tickInterval: tickInterval, + retireInterval: retireInternal, } } @@ -56,12 +62,16 @@ func (od *OwnerDaemon) ownerTick(ctx context.Context) { log.Info("daemon became owner", zap.String("id", od.manager.ID()), zap.String("daemon-id", od.daemon.Name())) // Note: maybe save the context so we can cancel the tick when we are not owner? od.daemon.OnBecomeOwner(cx) + od.ownerStartTime = time.Now() } // Tick anyway. if err := od.daemon.OnTick(ctx); err != nil { log.Warn("failed on tick", logutil.ShortError(err)) } + if od.retireInterval != 0 && time.Now().Sub(od.ownerStartTime) > od.retireInterval { + od.manager.RetireOwner() + } } // Begin starts the daemon. diff --git a/br/pkg/streamhelper/daemon/owner_daemon_test.go b/br/pkg/streamhelper/daemon/owner_daemon_test.go index e7693a4c1d124..c7759d0f3c3bb 100644 --- a/br/pkg/streamhelper/daemon/owner_daemon_test.go +++ b/br/pkg/streamhelper/daemon/owner_daemon_test.go @@ -137,7 +137,7 @@ func TestDaemon(t *testing.T) { req := require.New(t) app := newTestApp(t) ow := owner.NewMockManager(ctx, "owner_daemon_test", nil, "owner_key") - d := daemon.New(app, ow, 100*time.Millisecond) + d := daemon.New(app, ow, 100*time.Millisecond, 0) app.AssertService(req, false) f, err := d.Begin(ctx) @@ -149,10 +149,44 @@ func TestDaemon(t *testing.T) { ow.RetireOwner() req.False(ow.IsOwner()) app.AssertNotRunning(1 * time.Second) - ow.CampaignOwner() req.Eventually(func() bool { return ow.IsOwner() }, 1*time.Second, 100*time.Millisecond) app.AssertStart(1 * time.Second) app.AssertTick(1 * time.Second) + + // make sure chaos did not kick in so never retires + req.Neverf(func() bool { + return !ow.IsOwner() + }, 5*time.Second, 100*time.Millisecond, "should never retire") +} + +func TestDaemonWithChaos(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + req := require.New(t) + app := newTestApp(t) + ow := owner.NewMockManager(ctx, "owner_daemon_test", nil, "owner_key") + d := daemon.New(app, ow, 100*time.Millisecond, 2*time.Second) + + app.AssertService(req, false) + f, err := d.Begin(ctx) + req.NoError(err) + app.AssertService(req, true) + go f() + + // wait for it to become owner + req.Eventually(func() bool { + return ow.IsOwner() + }, 1*time.Second, 100*time.Millisecond) + + // wait for chaos test to kick in to auto retire + req.Eventually(func() bool { + return !ow.IsOwner() + }, 3*time.Second, 500*time.Millisecond) + + // sanity check it will try to become leader in background again + req.Eventually(func() bool { + return ow.IsOwner() + }, 2*time.Second, 500*time.Millisecond) } diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 07990cc382d34..dc4a96089865d 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -936,7 +936,7 @@ func RunStreamAdvancer(c context.Context, g glue.Glue, cmdName string, cfg *Stre defer func() { ownerMgr.Close() }() - advancerd := daemon.New(advancer, ownerMgr, cfg.AdvancerCfg.TickDuration) + advancerd := daemon.New(advancer, ownerMgr, cfg.AdvancerCfg.TickDuration, cfg.AdvancerCfg.AdvancerOwnerRetireInterval) loop, err := advancerd.Begin(ctx) if err != nil { return err diff --git a/pkg/domain/domain.go b/pkg/domain/domain.go index 3de1a7d839411..1bc5567e8a3c2 100644 --- a/pkg/domain/domain.go +++ b/pkg/domain/domain.go @@ -1557,7 +1557,7 @@ func (do *Domain) initLogBackup(ctx context.Context, pdClient pd.Client) error { } adv := streamhelper.NewCheckpointAdvancer(env) do.brOwnerMgr = streamhelper.OwnerManagerForLogBackup(ctx, do.etcdClient) - do.logBackupAdvancer = daemon.New(adv, do.brOwnerMgr, adv.Config().TickDuration) + do.logBackupAdvancer = daemon.New(adv, do.brOwnerMgr, adv.Config().TickDuration, adv.Config().AdvancerOwnerRetireInterval) loop, err := do.logBackupAdvancer.Begin(ctx) if err != nil { return err From 252cbc1bf911f32424c47bdbf064ddd5ae6bffca Mon Sep 17 00:00:00 2001 From: Wenqi Mou Date: Thu, 12 Dec 2024 15:47:00 -0500 Subject: [PATCH 2/5] add config to stream advancer command Signed-off-by: Wenqi Mou --- br/pkg/streamhelper/config/advancer_conf.go | 25 +++++++---- br/pkg/streamhelper/daemon/owner_daemon.go | 26 ++++++------ .../streamhelper/daemon/owner_daemon_test.go | 37 +---------------- br/pkg/task/stream.go | 41 ++++++++++++++++++- pkg/config/config.go | 8 ---- pkg/domain/domain.go | 6 +-- 6 files changed, 72 insertions(+), 71 deletions(-) diff --git a/br/pkg/streamhelper/config/advancer_conf.go b/br/pkg/streamhelper/config/advancer_conf.go index 00c08c1fc041a..5638e5d638603 100644 --- a/br/pkg/streamhelper/config/advancer_conf.go +++ b/br/pkg/streamhelper/config/advancer_conf.go @@ -15,15 +15,17 @@ const ( flagCheckPointLagLimit = "check-point-lag-limit" // used for chaos testing - flagOwnerRetireInterval = "advance-owner-resign-interval" + flagOwnershipCycleInterval = "ownership-cycle-interval" +) +const ( DefaultTryAdvanceThreshold = 4 * time.Minute DefaultCheckPointLagLimit = 48 * time.Hour DefaultBackOffTime = 5 * time.Second DefaultTickInterval = 12 * time.Second // used for chaos testing, default to disable - DefaultAdvancerOwnerRetireInterval = 0 + DefaultOwnershipCycleInterval = 0 ) var ( @@ -42,8 +44,8 @@ type Config struct { // Following configs are used in chaos testings, better not to enable in prod // - // used to periodically retire advancer owner for chaos testing - AdvancerOwnerRetireInterval time.Duration `toml:"advancer-owner-retire-interval" json:"advancer-owner-retire-interval"` + // used to periodically becomes/retire advancer owner + OwnershipCycleInterval time.Duration `toml:"ownership-cycle-interval" json:"ownership-cycle-interval"` } func DefineFlagsForCheckpointAdvancerConfig(f *pflag.FlagSet) { @@ -57,16 +59,17 @@ func DefineFlagsForCheckpointAdvancerConfig(f *pflag.FlagSet) { "The maximum lag could be tolerated for the checkpoint lag.") // used for chaos testing - f.Duration(flagOwnerRetireInterval, DefaultAdvancerOwnerRetireInterval, + f.Duration(flagOwnershipCycleInterval, DefaultOwnershipCycleInterval, "The interval that the owner will retire itself") } func Default() Config { return Config{ - BackoffTime: DefaultBackOffTime, - TickDuration: DefaultTickInterval, - TryAdvanceThreshold: DefaultTryAdvanceThreshold, - CheckPointLagLimit: DefaultCheckPointLagLimit, + BackoffTime: DefaultBackOffTime, + TickDuration: DefaultTickInterval, + TryAdvanceThreshold: DefaultTryAdvanceThreshold, + CheckPointLagLimit: DefaultCheckPointLagLimit, + OwnershipCycleInterval: DefaultOwnershipCycleInterval, } } @@ -88,6 +91,10 @@ func (conf *Config) GetFromFlags(f *pflag.FlagSet) error { if err != nil { return err } + conf.OwnershipCycleInterval, err = f.GetDuration(flagOwnershipCycleInterval) + if err != nil { + return err + } return nil } diff --git a/br/pkg/streamhelper/daemon/owner_daemon.go b/br/pkg/streamhelper/daemon/owner_daemon.go index 84f17c210d6e3..0884004c4abc4 100644 --- a/br/pkg/streamhelper/daemon/owner_daemon.go +++ b/br/pkg/streamhelper/daemon/owner_daemon.go @@ -24,20 +24,14 @@ type OwnerDaemon struct { // When not `nil`, implies the daemon is running. cancel context.CancelFunc - - // leader retire internal, used for chaos testing, suggest not to enable in prod - // default to 0 to disable - retireInterval time.Duration - ownerStartTime time.Time } // New creates a new owner daemon. -func New(daemon Interface, manager owner.Manager, tickInterval time.Duration, retireInternal time.Duration) *OwnerDaemon { +func New(daemon Interface, manager owner.Manager, tickInterval time.Duration) *OwnerDaemon { return &OwnerDaemon{ - daemon: daemon, - manager: manager, - tickInterval: tickInterval, - retireInterval: retireInternal, + daemon: daemon, + manager: manager, + tickInterval: tickInterval, } } @@ -62,16 +56,12 @@ func (od *OwnerDaemon) ownerTick(ctx context.Context) { log.Info("daemon became owner", zap.String("id", od.manager.ID()), zap.String("daemon-id", od.daemon.Name())) // Note: maybe save the context so we can cancel the tick when we are not owner? od.daemon.OnBecomeOwner(cx) - od.ownerStartTime = time.Now() } // Tick anyway. if err := od.daemon.OnTick(ctx); err != nil { log.Warn("failed on tick", logutil.ShortError(err)) } - if od.retireInterval != 0 && time.Now().Sub(od.ownerStartTime) > od.retireInterval { - od.manager.RetireOwner() - } } // Begin starts the daemon. @@ -112,3 +102,11 @@ func (od *OwnerDaemon) Begin(ctx context.Context) (func(), error) { } return loop, nil } + +func (od *OwnerDaemon) ForceToBeOwner(ctx context.Context) error { + return od.manager.ForceToBeOwner(ctx) +} + +func (od *OwnerDaemon) RetireIfOwner() { + od.manager.RetireOwner() +} diff --git a/br/pkg/streamhelper/daemon/owner_daemon_test.go b/br/pkg/streamhelper/daemon/owner_daemon_test.go index c7759d0f3c3bb..2ca9b85c78fa2 100644 --- a/br/pkg/streamhelper/daemon/owner_daemon_test.go +++ b/br/pkg/streamhelper/daemon/owner_daemon_test.go @@ -137,7 +137,7 @@ func TestDaemon(t *testing.T) { req := require.New(t) app := newTestApp(t) ow := owner.NewMockManager(ctx, "owner_daemon_test", nil, "owner_key") - d := daemon.New(app, ow, 100*time.Millisecond, 0) + d := daemon.New(app, ow, 100*time.Millisecond) app.AssertService(req, false) f, err := d.Begin(ctx) @@ -154,39 +154,4 @@ func TestDaemon(t *testing.T) { }, 1*time.Second, 100*time.Millisecond) app.AssertStart(1 * time.Second) app.AssertTick(1 * time.Second) - - // make sure chaos did not kick in so never retires - req.Neverf(func() bool { - return !ow.IsOwner() - }, 5*time.Second, 100*time.Millisecond, "should never retire") -} - -func TestDaemonWithChaos(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - req := require.New(t) - app := newTestApp(t) - ow := owner.NewMockManager(ctx, "owner_daemon_test", nil, "owner_key") - d := daemon.New(app, ow, 100*time.Millisecond, 2*time.Second) - - app.AssertService(req, false) - f, err := d.Begin(ctx) - req.NoError(err) - app.AssertService(req, true) - go f() - - // wait for it to become owner - req.Eventually(func() bool { - return ow.IsOwner() - }, 1*time.Second, 100*time.Millisecond) - - // wait for chaos test to kick in to auto retire - req.Eventually(func() bool { - return !ow.IsOwner() - }, 3*time.Second, 500*time.Millisecond) - - // sanity check it will try to become leader in background again - req.Eventually(func() bool { - return ow.IsOwner() - }, 2*time.Second, 500*time.Millisecond) } diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index dc4a96089865d..44b5c272adb7d 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -919,6 +919,8 @@ func RunStreamResume( func RunStreamAdvancer(c context.Context, g glue.Glue, cmdName string, cfg *StreamConfig) error { ctx, cancel := context.WithCancel(c) defer cancel() + log.Info("starting", zap.String("cmd", cmdName)) + mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements, false, conn.StreamVersionChecker) if err != nil { @@ -936,15 +938,52 @@ func RunStreamAdvancer(c context.Context, g glue.Glue, cmdName string, cfg *Stre defer func() { ownerMgr.Close() }() - advancerd := daemon.New(advancer, ownerMgr, cfg.AdvancerCfg.TickDuration, cfg.AdvancerCfg.AdvancerOwnerRetireInterval) + advancerd := daemon.New(advancer, ownerMgr, cfg.AdvancerCfg.TickDuration) loop, err := advancerd.Begin(ctx) if err != nil { return err } + if cfg.AdvancerCfg.OwnershipCycleInterval > 0 { + err = advancerd.ForceToBeOwner(ctx) + if err != nil { + return err + } + log.Info("this advancerd forced to be the owner") + go runOwnershipCycle(ctx, advancerd, cfg.AdvancerCfg.OwnershipCycleInterval) + } loop() return nil } +// runOwnershipCycle handles the periodic cycling of ownership for the advancer +func runOwnershipCycle(ctx context.Context, advancerd *daemon.OwnerDaemon, cycleDuration time.Duration) { + ticker := time.NewTicker(cycleDuration) + defer ticker.Stop() + + isOwner := false + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if !isOwner { + // try to become owner + if err := advancerd.ForceToBeOwner(ctx); err != nil { + log.Error("failed to force ownership", zap.Error(err)) + continue + } + log.Info("advancer forced to be the owner") + isOwner = true + } else { + // retire from being owner + advancerd.RetireIfOwner() + log.Info("advancer retired from being owner") + isOwner = false + } + } + } +} + func checkConfigForStatus(pd []string) error { if len(pd) == 0 { return errors.Annotatef(berrors.ErrInvalidArgument, diff --git a/pkg/config/config.go b/pkg/config/config.go index 3df1ac5cefc36..ac3bf7927b631 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -33,7 +33,6 @@ import ( "github.com/BurntSushi/toml" "github.com/pingcap/errors" zaplog "github.com/pingcap/log" - logbackupconf "github.com/pingcap/tidb/br/pkg/streamhelper/config" "github.com/pingcap/tidb/pkg/parser/terror" "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/tiflashcompute" @@ -458,13 +457,6 @@ func (b *AtomicBool) UnmarshalText(text []byte) error { return nil } -// LogBackup is the config for log backup service. -// For now, it includes the embed advancer. -type LogBackup struct { - Advancer logbackupconf.Config `toml:"advancer" json:"advancer"` - Enabled bool `toml:"enabled" json:"enabled"` -} - // Log is the log section of config. type Log struct { // Log level. diff --git a/pkg/domain/domain.go b/pkg/domain/domain.go index 1bc5567e8a3c2..3aa47f1d38e2d 100644 --- a/pkg/domain/domain.go +++ b/pkg/domain/domain.go @@ -1512,7 +1512,7 @@ func (do *Domain) Start(startMode ddl.StartMode) error { }, "closestReplicaReadCheckLoop") } - err = do.initLogBackup(do.ctx, pdCli) + err = do.initLogBackupAdvancer(do.ctx, pdCli) if err != nil { return err } @@ -1540,7 +1540,7 @@ func (do *Domain) SetOnClose(onClose func()) { do.onClose = onClose } -func (do *Domain) initLogBackup(ctx context.Context, pdClient pd.Client) error { +func (do *Domain) initLogBackupAdvancer(ctx context.Context, pdClient pd.Client) error { cfg := config.GetGlobalConfig() if pdClient == nil || do.etcdClient == nil { log.Warn("pd / etcd client not provided, won't begin Advancer.") @@ -1557,7 +1557,7 @@ func (do *Domain) initLogBackup(ctx context.Context, pdClient pd.Client) error { } adv := streamhelper.NewCheckpointAdvancer(env) do.brOwnerMgr = streamhelper.OwnerManagerForLogBackup(ctx, do.etcdClient) - do.logBackupAdvancer = daemon.New(adv, do.brOwnerMgr, adv.Config().TickDuration, adv.Config().AdvancerOwnerRetireInterval) + do.logBackupAdvancer = daemon.New(adv, do.brOwnerMgr, adv.Config().TickDuration) loop, err := do.logBackupAdvancer.Begin(ctx) if err != nil { return err From 546eb1e05862642f2b3a41b08f97a571207088b7 Mon Sep 17 00:00:00 2001 From: Wenqi Mou Date: Thu, 12 Dec 2024 16:00:50 -0500 Subject: [PATCH 3/5] fix bazel Signed-off-by: Wenqi Mou --- pkg/config/BUILD.bazel | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/config/BUILD.bazel b/pkg/config/BUILD.bazel index c4387914a9036..f3bf88df15e7a 100644 --- a/pkg/config/BUILD.bazel +++ b/pkg/config/BUILD.bazel @@ -11,7 +11,6 @@ go_library( importpath = "github.com/pingcap/tidb/pkg/config", visibility = ["//visibility:public"], deps = [ - "//br/pkg/streamhelper/config", "//pkg/parser/terror", "//pkg/util/logutil", "//pkg/util/tiflashcompute", From a4e8efc698d201837f69695c88dc1fe0dd46cae2 Mon Sep 17 00:00:00 2001 From: Wenqi Mou Date: Mon, 30 Dec 2024 22:48:42 -0500 Subject: [PATCH 4/5] mark hidden and better comments Signed-off-by: Wenqi Mou --- br/pkg/streamhelper/config/advancer_conf.go | 3 +++ br/pkg/task/stream.go | 13 ++++++------- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/br/pkg/streamhelper/config/advancer_conf.go b/br/pkg/streamhelper/config/advancer_conf.go index 5638e5d638603..a2bfd9ac52b78 100644 --- a/br/pkg/streamhelper/config/advancer_conf.go +++ b/br/pkg/streamhelper/config/advancer_conf.go @@ -61,6 +61,9 @@ func DefineFlagsForCheckpointAdvancerConfig(f *pflag.FlagSet) { // used for chaos testing f.Duration(flagOwnershipCycleInterval, DefaultOwnershipCycleInterval, "The interval that the owner will retire itself") + + // mark hidden + _ = f.MarkHidden(flagOwnershipCycleInterval) } func Default() Config { diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 44b5c272adb7d..cc190fccd5e36 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -948,19 +948,18 @@ func RunStreamAdvancer(c context.Context, g glue.Glue, cmdName string, cfg *Stre if err != nil { return err } - log.Info("this advancerd forced to be the owner") - go runOwnershipCycle(ctx, advancerd, cfg.AdvancerCfg.OwnershipCycleInterval) + log.Info("command line advancer forced to be the owner") + go runOwnershipCycle(ctx, advancerd, cfg.AdvancerCfg.OwnershipCycleInterval, true) } loop() return nil } // runOwnershipCycle handles the periodic cycling of ownership for the advancer -func runOwnershipCycle(ctx context.Context, advancerd *daemon.OwnerDaemon, cycleDuration time.Duration) { +func runOwnershipCycle(ctx context.Context, advancerd *daemon.OwnerDaemon, cycleDuration time.Duration, isOwner bool) { ticker := time.NewTicker(cycleDuration) defer ticker.Stop() - isOwner := false for { select { case <-ctx.Done(): @@ -969,15 +968,15 @@ func runOwnershipCycle(ctx context.Context, advancerd *daemon.OwnerDaemon, cycle if !isOwner { // try to become owner if err := advancerd.ForceToBeOwner(ctx); err != nil { - log.Error("failed to force ownership", zap.Error(err)) + log.Error("command line advancer failed to force ownership", zap.Error(err)) continue } - log.Info("advancer forced to be the owner") + log.Info("command line advancer forced to be the owner") isOwner = true } else { // retire from being owner advancerd.RetireIfOwner() - log.Info("advancer retired from being owner") + log.Info("command line advancer retired from being owner") isOwner = false } } From 8e34b5e1e0d74d58f3d72699e456e8d1c4854d0b Mon Sep 17 00:00:00 2001 From: Wenqi Mou Date: Mon, 30 Dec 2024 22:53:58 -0500 Subject: [PATCH 5/5] revert domain naming change Signed-off-by: Wenqi Mou --- pkg/domain/domain.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/domain/domain.go b/pkg/domain/domain.go index 3aa47f1d38e2d..3de1a7d839411 100644 --- a/pkg/domain/domain.go +++ b/pkg/domain/domain.go @@ -1512,7 +1512,7 @@ func (do *Domain) Start(startMode ddl.StartMode) error { }, "closestReplicaReadCheckLoop") } - err = do.initLogBackupAdvancer(do.ctx, pdCli) + err = do.initLogBackup(do.ctx, pdCli) if err != nil { return err } @@ -1540,7 +1540,7 @@ func (do *Domain) SetOnClose(onClose func()) { do.onClose = onClose } -func (do *Domain) initLogBackupAdvancer(ctx context.Context, pdClient pd.Client) error { +func (do *Domain) initLogBackup(ctx context.Context, pdClient pd.Client) error { cfg := config.GetGlobalConfig() if pdClient == nil || do.etcdClient == nil { log.Warn("pd / etcd client not provided, won't begin Advancer.")