diff --git a/apis/core/v1alpha1/common_types.go b/apis/core/v1alpha1/common_types.go index b4c67337b6..4ab1925305 100644 --- a/apis/core/v1alpha1/common_types.go +++ b/apis/core/v1alpha1/common_types.go @@ -288,15 +288,15 @@ type GroupStatus struct { Replicas int32 `json:"replicas"` // ReadyReplicas is the number of Instances created for this ComponentGroup with a Ready Condition. - ReadyReplicas int32 `json:"readyReplicas,omitempty"` + ReadyReplicas int32 `json:"readyReplicas"` // CurrentReplicas is the number of Instances created by the Group controller from the Group version // indicated by currentRevision. - CurrentReplicas int32 `json:"currentReplicas,omitempty"` + CurrentReplicas int32 `json:"currentReplicas"` // UpdatedReplicas is the number of Instances created by the Group controller from the Group version // indicated by updateRevision. - UpdatedReplicas int32 `json:"updatedReplicas,omitempty"` + UpdatedReplicas int32 `json:"updatedReplicas"` } type UpdateStrategy struct { diff --git a/apis/core/v1alpha1/pd_types.go b/apis/core/v1alpha1/pd_types.go index c4f872898d..5482a6d902 100644 --- a/apis/core/v1alpha1/pd_types.go +++ b/apis/core/v1alpha1/pd_types.go @@ -230,7 +230,7 @@ func (in *PD) CollisionCount() *int32 { } func (in *PD) IsHealthy() bool { - return meta.IsStatusConditionTrue(in.Status.Conditions, PDCondInitialized) && meta.IsStatusConditionTrue(in.Status.Conditions, PDCondHealth) && in.DeletionTimestamp.IsZero() + return meta.IsStatusConditionTrue(in.Status.Conditions, PDCondHealth) && in.DeletionTimestamp.IsZero() } func (in *PD) GetClientPort() int32 { diff --git a/manifests/crd/core.pingcap.com_pdgroups.yaml b/manifests/crd/core.pingcap.com_pdgroups.yaml index 24e712c80f..76bf63253f 100644 --- a/manifests/crd/core.pingcap.com_pdgroups.yaml +++ b/manifests/crd/core.pingcap.com_pdgroups.yaml @@ -399,7 +399,10 @@ spec: It will be same as the `spec.version` only when all instances are upgraded to the desired version. type: string required: + - currentReplicas + - readyReplicas - replicas + - updatedReplicas type: object type: object served: true diff --git a/manifests/crd/core.pingcap.com_tidbgroups.yaml b/manifests/crd/core.pingcap.com_tidbgroups.yaml index 6bae97e0c7..6f30d2808f 100644 --- a/manifests/crd/core.pingcap.com_tidbgroups.yaml +++ b/manifests/crd/core.pingcap.com_tidbgroups.yaml @@ -538,7 +538,10 @@ spec: It will be same as the `spec.version` only when all instances are upgraded to the desired version. type: string required: + - currentReplicas + - readyReplicas - replicas + - updatedReplicas type: object type: object served: true diff --git a/manifests/crd/core.pingcap.com_tiflashgroups.yaml b/manifests/crd/core.pingcap.com_tiflashgroups.yaml index 67bd926d57..afd0add86e 100644 --- a/manifests/crd/core.pingcap.com_tiflashgroups.yaml +++ b/manifests/crd/core.pingcap.com_tiflashgroups.yaml @@ -435,7 +435,10 @@ spec: It will be same as the `spec.version` only when all instances are upgraded to the desired version. type: string required: + - currentReplicas + - readyReplicas - replicas + - updatedReplicas type: object type: object served: true diff --git a/manifests/crd/core.pingcap.com_tikvgroups.yaml b/manifests/crd/core.pingcap.com_tikvgroups.yaml index 35df978c1c..5bb8cb471e 100644 --- a/manifests/crd/core.pingcap.com_tikvgroups.yaml +++ b/manifests/crd/core.pingcap.com_tikvgroups.yaml @@ -400,7 +400,10 @@ spec: It will be same as the `spec.version` only when all instances are upgraded to the desired version. type: string required: + - currentReplicas + - readyReplicas - replicas + - updatedReplicas type: object type: object served: true diff --git a/pkg/client/fake.go b/pkg/client/fake.go index 603a63b76c..999b10167c 100644 --- a/pkg/client/fake.go +++ b/pkg/client/fake.go @@ -89,6 +89,9 @@ var _ client.WithWatch = &fakeUnderlayClient{} func newFakeUnderlayClient(objs ...client.Object) *fakeUnderlayClient { t := testing.NewObjectTracker(scheme.Scheme, scheme.Codecs.UniversalDecoder()) for _, obj := range objs { + if obj == nil { + continue + } if err := t.Add(obj); err != nil { panic(err) } @@ -432,7 +435,40 @@ func (*SubResourceClient) Create(_ context.Context, _, _ client.Object, _ ...cli return nil } -func (*SubResourceClient) Update(_ context.Context, _ client.Object, _ ...client.SubResourceUpdateOption) error { +func (c *SubResourceClient) Update(_ context.Context, obj client.Object, _ ...client.SubResourceUpdateOption) error { + gvk, err := apiutil.GVKForObject(obj, c.scheme) + if err != nil { + return err + } + + namespaced, err := apiutil.IsGVKNamespaced(gvk, c.restMapper) + if err != nil { + return err + } + + mapping, err := c.restMapper.RESTMapping(gvk.GroupKind(), gvk.Version) + if err != nil { + return err + } + + var action testing.UpdateAction + if namespaced { + action = testing.NewUpdateSubresourceAction(mapping.Resource, c.subResource, obj.GetNamespace(), obj) + } else { + action = testing.NewRootUpdateSubresourceAction(mapping.Resource, c.subResource, obj) + } + newObj, err := c.Invokes(action, nil) + if err != nil { + return err + } + if newObj == nil { + return fmt.Errorf("obj is not handled") + } + + nv := reflect.ValueOf(newObj).Elem() + v := reflect.ValueOf(obj).Elem() + v.Set(nv) + return nil } diff --git a/pkg/controllers/common/cond.go b/pkg/controllers/common/cond.go index e9ad8e8e99..a421d25925 100644 --- a/pkg/controllers/common/cond.go +++ b/pkg/controllers/common/cond.go @@ -39,3 +39,15 @@ func CondClusterIsPaused(ctx ClusterState) task.Condition { return ctx.Cluster().ShouldPauseReconcile() }) } + +func CondPDGroupHasBeenDeleted(ctx PDGroupState) task.Condition { + return task.CondFunc(func() bool { + return ctx.PDGroup() == nil + }) +} + +func CondPDGroupIsDeleting(ctx PDGroupState) task.Condition { + return task.CondFunc(func() bool { + return !ctx.PDGroup().GetDeletionTimestamp().IsZero() + }) +} diff --git a/pkg/controllers/common/interfaces.go b/pkg/controllers/common/interfaces.go index 395367880c..85330f0609 100644 --- a/pkg/controllers/common/interfaces.go +++ b/pkg/controllers/common/interfaces.go @@ -32,12 +32,13 @@ type ObjectList[T any] interface { } type ( - PDInitializer = ResourceInitializer[*v1alpha1.PD] + PDInitializer = ResourceInitializer[v1alpha1.PD] - ClusterInitializer = ResourceInitializer[*v1alpha1.Cluster] + ClusterInitializer = ResourceInitializer[v1alpha1.Cluster] - PodInitializer = ResourceInitializer[*corev1.Pod] - PDSliceInitializer = ResourceSliceInitializer[*v1alpha1.PD] + PodInitializer = ResourceInitializer[corev1.Pod] + PDSliceInitializer = ResourceSliceInitializer[v1alpha1.PD] + PDGroupInitializer = ResourceInitializer[v1alpha1.PDGroup] ) type PDStateInitializer interface { @@ -64,6 +65,14 @@ type PodState interface { Pod() *corev1.Pod } +type PDGroupStateInitializer interface { + PDGroupInitializer() PDGroupInitializer +} + +type PDGroupState interface { + PDGroup() *v1alpha1.PDGroup +} + type PDSliceStateInitializer interface { PDSliceInitializer() PDSliceInitializer } diff --git a/pkg/controllers/common/interfaces_test.go b/pkg/controllers/common/interfaces_test.go index cba65f4166..a1fbc46c9e 100644 --- a/pkg/controllers/common/interfaces_test.go +++ b/pkg/controllers/common/interfaces_test.go @@ -30,7 +30,7 @@ func (f *fakeState[T]) Object() *T { return f.obj } -func (f *fakeState[T]) Initializer() ResourceInitializer[*T] { +func (f *fakeState[T]) Initializer() ResourceInitializer[T] { return NewResource(func(obj *T) { f.obj = obj }). WithNamespace(Namespace(f.ns)). WithName(Name(f.name)). @@ -47,7 +47,7 @@ func (f *fakeSliceState[T]) Slice() []*T { return f.objs } -func (f *fakeSliceState[T]) Initializer() ResourceSliceInitializer[*T] { +func (f *fakeSliceState[T]) Initializer() ResourceSliceInitializer[T] { return NewResourceSlice(func(objs []*T) { f.objs = objs }). WithNamespace(Namespace(f.ns)). WithLabels(Labels(f.labels)). diff --git a/pkg/controllers/common/resource.go b/pkg/controllers/common/resource.go index 91fe8f2432..2f0c419e53 100644 --- a/pkg/controllers/common/resource.go +++ b/pkg/controllers/common/resource.go @@ -82,7 +82,7 @@ func (f SetFunc[T]) Set(obj T) { type ResourceInitializer[T any] interface { GetOptions - Setter[T] + Setter[*T] } type Resource[T any] interface { @@ -91,19 +91,19 @@ type Resource[T any] interface { Initializer() ResourceInitializer[T] } -func NewResource[T any](setter SetFunc[T]) Resource[T] { +func NewResource[T any](setter SetFunc[*T]) Resource[T] { return &resource[T]{ setter: setter, } } type resource[T any] struct { - setter Setter[T] + setter Setter[*T] ns NamespaceOption name NameOption } -func (r *resource[T]) Set(obj T) { +func (r *resource[T]) Set(obj *T) { r.setter.Set(obj) } @@ -131,7 +131,7 @@ func (r *resource[T]) Initializer() ResourceInitializer[T] { type ResourceSliceInitializer[T any] interface { ListOptions - Setter[[]T] + Setter[[]*T] } type ResourceSlice[T any] interface { @@ -140,7 +140,7 @@ type ResourceSlice[T any] interface { Initializer() ResourceSliceInitializer[T] } -func NewResourceSlice[T any](setter SetFunc[[]T]) ResourceSlice[T] { +func NewResourceSlice[T any](setter SetFunc[[]*T]) ResourceSlice[T] { return &resourceSlice[T]{ setter: setter, } @@ -149,7 +149,7 @@ func NewResourceSlice[T any](setter SetFunc[[]T]) ResourceSlice[T] { type resourceSlice[T any] struct { ns NamespaceOption labels LabelsOption - setter Setter[[]T] + setter Setter[[]*T] } func (r *resourceSlice[T]) Namespace() string { @@ -160,7 +160,7 @@ func (r *resourceSlice[T]) Labels() map[string]string { return r.labels.Labels() } -func (r *resourceSlice[T]) Set(objs []T) { +func (r *resourceSlice[T]) Set(objs []*T) { r.setter.Set(objs) } diff --git a/pkg/controllers/common/resource_test.go b/pkg/controllers/common/resource_test.go index b829da9bb9..d85d527c4b 100644 --- a/pkg/controllers/common/resource_test.go +++ b/pkg/controllers/common/resource_test.go @@ -18,6 +18,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "k8s.io/utils/ptr" ) func TestResource(t *testing.T) { @@ -56,14 +57,14 @@ func TestResource(t *testing.T) { tt.Parallel() var obj int - r := NewResource(func(t int) { - obj = t + r := NewResource(func(t *int) { + obj = *t }). WithNamespace(c.ns). WithName(c.name). Initializer() - r.Set(c.obj) + r.Set(&c.obj) assert.Equal(tt, c.expectedNs, r.Namespace(), c.desc) assert.Equal(tt, c.expectedName, r.Name(), c.desc) @@ -77,32 +78,32 @@ func TestResourceSlice(t *testing.T) { desc string ns NamespaceOption labels LabelsOption - objs []int + objs []*int expectedNs string expectedLabels map[string]string - expectedObjs []int + expectedObjs []*int }{ { desc: "normal", ns: Namespace("aaa"), labels: Labels(map[string]string{"xxx": "yyy"}), - objs: []int{42}, + objs: []*int{ptr.To(42)}, expectedNs: "aaa", expectedLabels: map[string]string{ "xxx": "yyy", }, - expectedObjs: []int{42}, + expectedObjs: []*int{ptr.To(42)}, }, { desc: "use func", ns: NameFunc(func() string { return "aaa" }), labels: LabelsFunc(func() map[string]string { return map[string]string{"xxx": "yyy"} }), - objs: []int{42}, + objs: []*int{ptr.To(42)}, expectedNs: "aaa", expectedLabels: map[string]string{ "xxx": "yyy", }, - expectedObjs: []int{42}, + expectedObjs: []*int{ptr.To(42)}, }, } @@ -111,8 +112,8 @@ func TestResourceSlice(t *testing.T) { t.Run(c.desc, func(tt *testing.T) { tt.Parallel() - var objs []int - r := NewResourceSlice(func(t []int) { + var objs []*int + r := NewResourceSlice(func(t []*int) { objs = t }). WithNamespace(c.ns). diff --git a/pkg/controllers/common/task.go b/pkg/controllers/common/task.go index c973cfe93c..8859ce409d 100644 --- a/pkg/controllers/common/task.go +++ b/pkg/controllers/common/task.go @@ -31,7 +31,7 @@ import ( "github.com/pingcap/tidb-operator/pkg/utils/task/v3" ) -func taskContextResource[T any, PT Object[T]](name string, w ResourceInitializer[PT], c client.Client, shouldExist bool) task.Task { +func taskContextResource[T any, PT Object[T]](name string, w ResourceInitializer[T], c client.Client, shouldExist bool) task.Task { return task.NameTaskFunc("Context"+name, func(ctx context.Context) task.Result { var obj PT = new(T) key := types.NamespacedName{ @@ -56,7 +56,7 @@ func taskContextResource[T any, PT Object[T]](name string, w ResourceInitializer func taskContextResourceSlice[T any, L any, PT Object[T], PL ObjectList[L]]( name string, - w ResourceSliceInitializer[PT], + w ResourceSliceInitializer[T], c client.Client, ) task.Task { return task.NameTaskFunc("Context"+name, func(ctx context.Context) task.Result { @@ -68,7 +68,7 @@ func taskContextResourceSlice[T any, L any, PT Object[T], PL ObjectList[L]]( return task.Fail().With("cannot list objs: %v", err) } - objs := make([]PT, 0, meta.LenList(l)) + objs := make([]*T, 0, meta.LenList(l)) if err := meta.EachListItem(l, func(item kuberuntime.Object) error { obj, ok := item.(PT) if !ok { @@ -82,8 +82,9 @@ func taskContextResourceSlice[T any, L any, PT Object[T], PL ObjectList[L]]( return task.Fail().With("cannot extract list objs: %v", err) } - slices.SortFunc(objs, func(a, b PT) int { - return cmp.Compare(a.GetName(), b.GetName()) + slices.SortFunc(objs, func(a, b *T) int { + var pa, pb PT = a, b + return cmp.Compare(pa.GetName(), pb.GetName()) }) w.Set(objs) @@ -107,6 +108,11 @@ func TaskContextPod(state PodStateInitializer, c client.Client) task.Task { return taskContextResource("Pod", w, c, false) } +func TaskContextPDGroup(state PDGroupStateInitializer, c client.Client) task.Task { + w := state.PDGroupInitializer() + return taskContextResource("PDGroup", w, c, false) +} + func TaskContextPDSlice(state PDSliceStateInitializer, c client.Client) task.Task { w := state.PDSliceInitializer() return taskContextResourceSlice[v1alpha1.PD, v1alpha1.PDList]("PDSlice", w, c) diff --git a/pkg/controllers/pd/builder.go b/pkg/controllers/pd/builder.go index 080b078f9b..0e9e983eb6 100644 --- a/pkg/controllers/pd/builder.go +++ b/pkg/controllers/pd/builder.go @@ -24,33 +24,32 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task runner := task.NewTaskRunner(reporter, // get pd common.TaskContextPD(state, r.Client), - // if it's deleted just return + // if it's gone just return task.IfBreak(common.CondPDHasBeenDeleted(state)), + // get cluster + common.TaskContextCluster(state, r.Client), + // if it's paused just return + task.IfBreak(common.CondClusterIsPaused(state)), + // get info from pd tasks.TaskContextInfoFromPD(state, r.PDClientManager), task.IfBreak(common.CondPDIsDeleting(state), tasks.TaskFinalizerDel(state, r.Client), ), + tasks.TaskFinalizerAdd(state, r.Client), - // get cluster and check whether it's paused - common.TaskContextCluster(state, r.Client), - task.IfBreak( - common.CondClusterIsPaused(state), - ), - - // get pod and check whether the cluster is suspending + // get pod common.TaskContextPod(state, r.Client), + task.IfBreak( common.CondClusterIsSuspending(state), - tasks.TaskFinalizerAdd(state, r.Client), common.TaskSuspendPod(state, r.Client), // TODO: extract as a common task tasks.TaskStatusSuspend(state, r.Client), ), common.TaskContextPDSlice(state, r.Client), - tasks.TaskFinalizerAdd(state, r.Client), tasks.TaskConfigMap(state, r.Logger, r.Client), tasks.TaskPVC(state, r.Logger, r.Client, r.VolumeModifier), tasks.TaskPod(state, r.Logger, r.Client), diff --git a/pkg/controllers/pd/controller.go b/pkg/controllers/pd/controller.go index e92957236a..356a6d7a21 100644 --- a/pkg/controllers/pd/controller.go +++ b/pkg/controllers/pd/controller.go @@ -75,7 +75,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu }() rtx := &tasks.ReconcileContext{ - Key: req.NamespacedName, + State: tasks.NewState(req.NamespacedName), } runner := r.NewRunner(rtx, reporter) diff --git a/pkg/controllers/pd/tasks/ctx.go b/pkg/controllers/pd/tasks/ctx.go index 5a99c0004b..29b36070e8 100644 --- a/pkg/controllers/pd/tasks/ctx.go +++ b/pkg/controllers/pd/tasks/ctx.go @@ -18,7 +18,6 @@ import ( "context" "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/types" pdm "github.com/pingcap/tidb-operator/pkg/timanager/pd" "github.com/pingcap/tidb-operator/pkg/utils/task/v3" @@ -27,7 +26,6 @@ import ( type ReconcileContext struct { // TODO: replace all fields in ReconcileContext by State State - Key types.NamespacedName PDClient pdm.PDClient // this means whether pd is available diff --git a/pkg/controllers/pdgroup/builder.go b/pkg/controllers/pdgroup/builder.go new file mode 100644 index 0000000000..ae0099c8cf --- /dev/null +++ b/pkg/controllers/pdgroup/builder.go @@ -0,0 +1,55 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pdgroup + +import ( + "github.com/pingcap/tidb-operator/pkg/controllers/common" + "github.com/pingcap/tidb-operator/pkg/controllers/pdgroup/tasks" + "github.com/pingcap/tidb-operator/pkg/utils/task/v3" +) + +func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.TaskReporter) task.TaskRunner { + runner := task.NewTaskRunner(reporter, + // get pdgroup + common.TaskContextPDGroup(state, r.Client), + // if it's gone just return + task.IfBreak(common.CondPDGroupHasBeenDeleted(state)), + + // get cluster + common.TaskContextCluster(state, r.Client), + // if it's paused just return + task.IfBreak(common.CondClusterIsPaused(state)), + + // get all pds + common.TaskContextPDSlice(state, r.Client), + + task.IfBreak(common.CondPDGroupIsDeleting(state), + tasks.TaskFinalizerDel(state, r.Client, r.PDClientManager), + ), + tasks.TaskFinalizerAdd(state, r.Client), + + task.IfBreak( + common.CondClusterIsSuspending(state), + tasks.TaskStatusSuspend(state, r.Client), + ), + tasks.TaskContextPDClient(state, r.PDClientManager), + tasks.TaskBoot(state, r.Client), + tasks.TaskService(state, r.Client), + tasks.TaskUpdater(state, r.Client), + tasks.TaskStatus(state, r.Client), + ) + + return runner +} diff --git a/pkg/controllers/pdgroup/controller.go b/pkg/controllers/pdgroup/controller.go index 1297fe02d4..ed40226ddc 100644 --- a/pkg/controllers/pdgroup/controller.go +++ b/pkg/controllers/pdgroup/controller.go @@ -36,7 +36,7 @@ import ( "github.com/pingcap/tidb-operator/pkg/controllers/pdgroup/tasks" pdm "github.com/pingcap/tidb-operator/pkg/timanager/pd" "github.com/pingcap/tidb-operator/pkg/utils/k8s" - "github.com/pingcap/tidb-operator/pkg/utils/task" + "github.com/pingcap/tidb-operator/pkg/utils/task/v3" ) type Reconciler struct { @@ -63,7 +63,8 @@ func Setup(mgr manager.Manager, c client.Client, pdcm pdm.PDClientManager) error func (r *Reconciler) ClusterEventHandler() handler.TypedEventHandler[client.Object, reconcile.Request] { return handler.TypedFuncs[client.Object, reconcile.Request]{ UpdateFunc: func(ctx context.Context, event event.TypedUpdateEvent[client.Object], - queue workqueue.TypedRateLimitingInterface[reconcile.Request]) { + queue workqueue.TypedRateLimitingInterface[reconcile.Request], + ) { cluster := event.ObjectNew.(*v1alpha1.Cluster) var list v1alpha1.PDGroupList @@ -101,20 +102,10 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu }() rtx := &tasks.ReconcileContext{ - // some fields will be set in the context task - Context: ctx, - Key: req.NamespacedName, + State: tasks.NewState(req.NamespacedName), } - runner := task.NewTaskRunner[tasks.ReconcileContext](reporter) - runner.AddTasks( - tasks.NewTaskContext(logger, r.Client, r.PDClientManager), - tasks.NewTaskFinalizer(logger, r.Client, r.PDClientManager), - tasks.NewTaskBoot(logger, r.Client), - tasks.NewTaskService(logger, r.Client), - tasks.NewTaskUpdater(logger, r.Client), - tasks.NewTaskStatus(logger, r.Client), - ) + runner := r.NewRunner(rtx, reporter) - return runner.Run(rtx) + return runner.Run(ctx) } diff --git a/pkg/controllers/pdgroup/tasks/boot.go b/pkg/controllers/pdgroup/tasks/boot.go index 131b82e0cd..1f529946aa 100644 --- a/pkg/controllers/pdgroup/tasks/boot.go +++ b/pkg/controllers/pdgroup/tasks/boot.go @@ -5,7 +5,6 @@ // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 -// // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -15,42 +14,25 @@ package tasks import ( - "github.com/go-logr/logr" + "context" "github.com/pingcap/tidb-operator/pkg/client" - "github.com/pingcap/tidb-operator/pkg/utils/task" + "github.com/pingcap/tidb-operator/pkg/utils/task/v3" ) -type TaskBoot struct { - Logger logr.Logger - Client client.Client -} - -func NewTaskBoot(logger logr.Logger, c client.Client) task.Task[ReconcileContext] { - return &TaskBoot{ - Logger: logger, - Client: c, - } -} - -func (*TaskBoot) Name() string { - return "Boot" -} - -func (t *TaskBoot) Sync(ctx task.Context[ReconcileContext]) task.Result { - rtx := ctx.Self() - - if rtx.IsAvailable && !rtx.PDGroup.Spec.Bootstrapped { - rtx.PDGroup.Spec.Bootstrapped = true - if err := t.Client.Update(ctx, rtx.PDGroup); err != nil { - return task.Fail().With("pd cluster is available but not marked as bootstrapped: %w", err) +func TaskBoot(state *ReconcileContext, c client.Client) task.Task { + return task.NameTaskFunc("Boot", func(ctx context.Context) task.Result { + pdg := state.PDGroup() + if !state.IsBootstrapped && !pdg.Spec.Bootstrapped { + return task.Wait().With("skip the task and wait until the pd svc is available") + } + if !pdg.Spec.Bootstrapped { + pdg.Spec.Bootstrapped = true + if err := c.Update(ctx, pdg); err != nil { + return task.Fail().With("pd cluster is available but not marked as bootstrapped: %w", err) + } } - } - - if !rtx.PDGroup.Spec.Bootstrapped { - // TODO: use task.Retry? - return task.Fail().Continue().With("pd cluster is not bootstrapped") - } - return task.Complete().With("pd cluster is bootstrapped") + return task.Complete().With("pd cluster is bootstrapped") + }) } diff --git a/pkg/controllers/pdgroup/tasks/boot_test.go b/pkg/controllers/pdgroup/tasks/boot_test.go new file mode 100644 index 0000000000..8e55a23b52 --- /dev/null +++ b/pkg/controllers/pdgroup/tasks/boot_test.go @@ -0,0 +1,138 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tasks + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/api/errors" + + "github.com/pingcap/tidb-operator/apis/core/v1alpha1" + "github.com/pingcap/tidb-operator/pkg/client" + "github.com/pingcap/tidb-operator/pkg/utils/fake" + "github.com/pingcap/tidb-operator/pkg/utils/task/v3" +) + +func TestTaskBoot(t *testing.T) { + cases := []struct { + desc string + state *ReconcileContext + unexpectedErr bool + + expectedBootstrapped bool + expectedStatus task.Status + }{ + { + desc: "pd svc is not available, pdg is not bootstrapped", + state: &ReconcileContext{ + State: &state{ + pdg: fake.FakeObj("aaa", func(obj *v1alpha1.PDGroup) *v1alpha1.PDGroup { + obj.Spec.Bootstrapped = false + return obj + }), + }, + IsBootstrapped: false, + }, + expectedBootstrapped: false, + expectedStatus: task.SWait, + }, + { + desc: "pd svc is not available, pdg is bootstrapped", + state: &ReconcileContext{ + State: &state{ + pdg: fake.FakeObj("aaa", func(obj *v1alpha1.PDGroup) *v1alpha1.PDGroup { + obj.Spec.Bootstrapped = true + return obj + }), + }, + IsBootstrapped: false, + }, + expectedBootstrapped: true, + expectedStatus: task.SComplete, + }, + { + desc: "pd svc is available, pdg is not bootstrapped", + state: &ReconcileContext{ + State: &state{ + pdg: fake.FakeObj("aaa", func(obj *v1alpha1.PDGroup) *v1alpha1.PDGroup { + obj.Spec.Bootstrapped = false + return obj + }), + }, + IsBootstrapped: true, + }, + expectedBootstrapped: true, + expectedStatus: task.SComplete, + }, + { + desc: "pd svc is available, pdg is not bootstrapped, but update failed", + state: &ReconcileContext{ + State: &state{ + pdg: fake.FakeObj("aaa", func(obj *v1alpha1.PDGroup) *v1alpha1.PDGroup { + obj.Spec.Bootstrapped = false + return obj + }), + }, + IsBootstrapped: true, + }, + unexpectedErr: true, + expectedStatus: task.SFail, + }, + { + desc: "pd svc is available, pdg is bootstrapped", + state: &ReconcileContext{ + State: &state{ + pdg: fake.FakeObj("aaa", func(obj *v1alpha1.PDGroup) *v1alpha1.PDGroup { + obj.Spec.Bootstrapped = true + return obj + }), + }, + IsBootstrapped: true, + }, + expectedBootstrapped: true, + expectedStatus: task.SComplete, + }, + } + + for i := range cases { + c := &cases[i] + t.Run(c.desc, func(tt *testing.T) { + tt.Parallel() + + fc := client.NewFakeClient(c.state.PDGroup()) + if c.unexpectedErr { + fc.WithError("*", "*", errors.NewInternalError(fmt.Errorf("fake internal err"))) + } + + ctx := context.Background() + res, done := task.RunTask(ctx, TaskBoot(c.state, fc)) + assert.Equal(tt, c.expectedStatus, res.Status(), c.desc) + assert.False(tt, done, c.desc) + + // no need to check update result + if c.unexpectedErr { + return + } + + pdg := &v1alpha1.PDGroup{} + require.NoError(tt, fc.Get(ctx, client.ObjectKey{Name: "aaa"}, pdg), c.desc) + assert.Equal(tt, c.expectedBootstrapped, pdg.Spec.Bootstrapped, c.desc) + }) + } +} diff --git a/pkg/controllers/pdgroup/tasks/ctx.go b/pkg/controllers/pdgroup/tasks/ctx.go index 6731e05625..f13c59e09b 100644 --- a/pkg/controllers/pdgroup/tasks/ctx.go +++ b/pkg/controllers/pdgroup/tasks/ctx.go @@ -19,40 +19,23 @@ import ( "context" "slices" - "github.com/go-logr/logr" - "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/types" - "github.com/pingcap/tidb-operator/apis/core/v1alpha1" - "github.com/pingcap/tidb-operator/pkg/action" - "github.com/pingcap/tidb-operator/pkg/client" - "github.com/pingcap/tidb-operator/pkg/pdapi/v1" pdm "github.com/pingcap/tidb-operator/pkg/timanager/pd" - "github.com/pingcap/tidb-operator/pkg/utils/task" + "github.com/pingcap/tidb-operator/pkg/utils/task/v3" ) type ReconcileContext struct { - context.Context + State - Key types.NamespacedName + Members []Member - PDClient pdapi.PDClient - Members []Member + // mark pdgroup is bootstrapped if cache of pd is synced + IsBootstrapped bool - // check whether pd is available - IsAvailable bool - - Suspended bool - - PDGroup *v1alpha1.PDGroup - Peers []*v1alpha1.PD - Cluster *v1alpha1.Cluster - UpgradeChecker action.UpgradeChecker - - // Status fields - v1alpha1.CommonStatus + UpdateRevision string + CurrentRevision string + CollisionCount int32 } // TODO: move to pdapi @@ -61,116 +44,41 @@ type Member struct { Name string } -func (ctx *ReconcileContext) Self() *ReconcileContext { - return ctx -} - -type TaskContext struct { - Logger logr.Logger - Client client.Client - PDClientManager pdm.PDClientManager -} - -func NewTaskContext(logger logr.Logger, c client.Client, pdcm pdm.PDClientManager) task.Task[ReconcileContext] { - return &TaskContext{ - Logger: logger, - Client: c, - PDClientManager: pdcm, - } -} - -func (*TaskContext) Name() string { - return "Context" -} - -//nolint:gocyclo // refactor if possible -func (t *TaskContext) Sync(ctx task.Context[ReconcileContext]) task.Result { - rtx := ctx.Self() - - var pdg v1alpha1.PDGroup - if err := t.Client.Get(ctx, rtx.Key, &pdg); err != nil { - if !errors.IsNotFound(err) { - return task.Fail().With("can't get pd group: %w", err) +func TaskContextPDClient(state *ReconcileContext, m pdm.PDClientManager) task.Task { + return task.NameTaskFunc("ContextPDClient", func(_ context.Context) task.Result { + if len(state.PDSlice()) > 0 { + // TODO: register pd client after it is ready + if err := m.Register(state.PDGroup()); err != nil { + return task.Fail().With("cannot register pd client: %v", err) + } } - - return task.Complete().Break().With("pd group has been deleted") - } - rtx.PDGroup = &pdg - - var cluster v1alpha1.Cluster - if err := t.Client.Get(ctx, client.ObjectKey{ - Name: pdg.Spec.Cluster.Name, - Namespace: pdg.Namespace, - }, &cluster); err != nil { - return task.Fail().With("cannot find cluster %s: %w", pdg.Spec.Cluster.Name, err) - } - rtx.Cluster = &cluster - - if cluster.ShouldPauseReconcile() { - return task.Complete().Break().With("cluster reconciliation is paused") - } - - var pdList v1alpha1.PDList - if err := t.Client.List(ctx, &pdList, client.InNamespace(pdg.Namespace), client.MatchingLabels{ - v1alpha1.LabelKeyManagedBy: v1alpha1.LabelValManagedByOperator, - v1alpha1.LabelKeyCluster: cluster.Name, - v1alpha1.LabelKeyComponent: v1alpha1.LabelValComponentPD, - v1alpha1.LabelKeyGroup: pdg.Name, - }); err != nil { - return task.Fail().With("cannot list pd peers: %w", err) - } - - rtx.Peers = make([]*v1alpha1.PD, len(pdList.Items)) - rtx.Suspended = len(pdList.Items) > 0 - for i := range pdList.Items { - rtx.Peers[i] = &pdList.Items[i] - if !meta.IsStatusConditionTrue(rtx.Peers[i].Status.Conditions, v1alpha1.PDGroupCondSuspended) { - // PD Group is not suspended if any of its members is not suspended - rtx.Suspended = false + ck := state.Cluster() + pc, ok := m.Get(pdm.PrimaryKey(ck.Namespace, ck.Name)) + if !ok { + return task.Complete().With("context without pd client is completed, pd cannot be visited") } - } - slices.SortFunc(rtx.Peers, func(a, b *v1alpha1.PD) int { - return cmp.Compare(a.Name, b.Name) - }) - if rtx.PDGroup.GetDeletionTimestamp().IsZero() && len(rtx.Peers) > 0 { - // TODO: register pd client after it is ready - if err := t.PDClientManager.Register(rtx.PDGroup); err != nil { - return task.Fail().With("cannot register pd client: %v", err) + if !pc.HasSynced() { + return task.Complete().With("context without pd client is completed, cache of pd info is not synced") } - } - - if rtx.Suspended { - return task.Complete().With("context without member info is completed, pd is suspended") - } - c, ok := t.PDClientManager.Get(pdm.PrimaryKey(pdg.Namespace, pdg.Spec.Cluster.Name)) - if !ok { - return task.Complete().With("context without pd client is completed, pd cannot be visited") - } - rtx.PDClient = c.Underlay() + state.IsBootstrapped = true - if !c.HasSynced() { - return task.Complete().With("context without pd client is completed, cache of pd info is not synced") - } - - rtx.IsAvailable = true - - ms, err := c.Members().List(labels.Everything()) - if err != nil { - return task.Fail().With("cannot list members: %w", err) - } + ms, err := pc.Members().List(labels.Everything()) + if err != nil { + return task.Fail().With("cannot list members: %w", err) + } - for _, m := range ms { - rtx.Members = append(rtx.Members, Member{ - Name: m.Name, - ID: m.ID, + for _, m := range ms { + state.Members = append(state.Members, Member{ + Name: m.Name, + ID: m.ID, + }) + } + slices.SortFunc(state.Members, func(a, b Member) int { + return cmp.Compare(a.Name, b.Name) }) - } - slices.SortFunc(rtx.Members, func(a, b Member) int { - return cmp.Compare(a.Name, b.Name) - }) - rtx.UpgradeChecker = action.NewUpgradeChecker(t.Client, rtx.Cluster, t.Logger) - return task.Complete().With("context is fully completed") + return task.Complete().With("context is fully completed") + }) } diff --git a/pkg/controllers/pdgroup/tasks/finalizer.go b/pkg/controllers/pdgroup/tasks/finalizer.go index d1ea14ee33..c2ea612198 100644 --- a/pkg/controllers/pdgroup/tasks/finalizer.go +++ b/pkg/controllers/pdgroup/tasks/finalizer.go @@ -15,80 +15,73 @@ package tasks import ( - "github.com/go-logr/logr" + "context" + + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" utilerr "k8s.io/apimachinery/pkg/util/errors" "github.com/pingcap/tidb-operator/pkg/client" + "github.com/pingcap/tidb-operator/pkg/controllers/common" + "github.com/pingcap/tidb-operator/pkg/runtime" pdm "github.com/pingcap/tidb-operator/pkg/timanager/pd" "github.com/pingcap/tidb-operator/pkg/utils/k8s" - "github.com/pingcap/tidb-operator/pkg/utils/task" + "github.com/pingcap/tidb-operator/pkg/utils/task/v3" ) -type TaskFinalizer struct { - Client client.Client - Logger logr.Logger - PDClientManager pdm.PDClientManager -} - -func NewTaskFinalizer(logger logr.Logger, c client.Client, pdcm pdm.PDClientManager) task.Task[ReconcileContext] { - return &TaskFinalizer{ - Client: c, - Logger: logger, - PDClientManager: pdcm, - } -} - -func (*TaskFinalizer) Name() string { - return "Finalizer" -} - -func (t *TaskFinalizer) Sync(ctx task.Context[ReconcileContext]) task.Result { - rtx := ctx.Self() - - if rtx.PDGroup.GetDeletionTimestamp().IsZero() { - if err := k8s.EnsureFinalizer(ctx, t.Client, rtx.PDGroup); err != nil { - return task.Fail().With("failed to ensure finalizer has been added: %w", err) - } - return task.Complete().With("finalizer is synced") - } - - errList := []error{} - for _, peer := range rtx.Peers { - if err := t.Client.Delete(ctx, peer); err != nil { - if errors.IsNotFound(err) { - continue +func TaskFinalizerDel(state State, c client.Client, m pdm.PDClientManager) task.Task { + return task.NameTaskFunc("FinalizerDel", func(ctx context.Context) task.Result { + var errList []error + for _, peer := range state.PDSlice() { + if peer.GetDeletionTimestamp().IsZero() { + if err := c.Delete(ctx, peer); err != nil { + if errors.IsNotFound(err) { + continue + } + + errList = append(errList, err) + continue + } } - errList = append(errList, err) - continue + // PD controller cannot clean up finalizer after quorum is lost + // Forcely clean up all finalizers of pd instances + if err := k8s.RemoveFinalizer(ctx, c, peer); err != nil { + errList = append(errList, err) + } } - // PD controller cannot clean up finalizer after quorum is lost - // Forcely clean up all finalizers of pd instances - if err := k8s.RemoveFinalizer(ctx, t.Client, peer); err != nil { - errList = append(errList, err) + if len(errList) != 0 { + return task.Fail().With("failed to delete all pd instances: %v", utilerr.NewAggregate(errList)) } - } - if len(errList) != 0 { - return task.Fail().With("failed to delete all pd instances: %v", utilerr.NewAggregate(errList)) - } + if len(state.PDSlice()) != 0 { + return task.Wait().With("wait for all pd instances being removed, %v still exists", len(state.PDSlice())) + } - if len(rtx.Peers) != 0 { - return task.Fail().With("wait for all pd instances being removed, %v still exists", rtx.Peers) - } + wait, err := k8s.DeleteGroupSubresource(ctx, c, runtime.FromPDGroup(state.PDGroup()), &corev1.ServiceList{}) + if err != nil { + return task.Fail().With("cannot delete subresources: %w", err) + } + if wait { + return task.Wait().With("wait all subresources deleted") + } - if err := k8s.EnsureGroupSubResourceDeleted(ctx, t.Client, - rtx.PDGroup.Namespace, rtx.PDGroup.Name); err != nil { - return task.Fail().With("cannot delete subresources: %w", err) - } + if err := k8s.RemoveFinalizer(ctx, c, state.PDGroup()); err != nil { + return task.Fail().With("failed to ensure finalizer has been removed: %w", err) + } - if err := k8s.RemoveFinalizer(ctx, t.Client, rtx.PDGroup); err != nil { - return task.Fail().With("failed to ensure finalizer has been removed: %w", err) - } + m.Deregister(pdm.PrimaryKey(state.Cluster().Namespace, state.Cluster().Name)) - t.PDClientManager.Deregister(pdm.PrimaryKey(rtx.PDGroup.Namespace, rtx.PDGroup.Spec.Cluster.Name)) + return task.Complete().With("finalizer has been removed") + }) +} - return task.Complete().With("finalizer has been removed") +func TaskFinalizerAdd(state common.PDGroupState, c client.Client) task.Task { + return task.NameTaskFunc("FinalizerAdd", func(ctx context.Context) task.Result { + if err := k8s.EnsureFinalizer(ctx, c, state.PDGroup()); err != nil { + return task.Fail().With("failed to ensure finalizer has been added: %v", err) + } + return task.Complete().With("finalizer is added") + }) } diff --git a/pkg/controllers/pdgroup/tasks/finalizer_test.go b/pkg/controllers/pdgroup/tasks/finalizer_test.go new file mode 100644 index 0000000000..ecbcbc7f45 --- /dev/null +++ b/pkg/controllers/pdgroup/tasks/finalizer_test.go @@ -0,0 +1,368 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tasks + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/pingcap/tidb-operator/apis/core/v1alpha1" + "github.com/pingcap/tidb-operator/pkg/client" + "github.com/pingcap/tidb-operator/pkg/controllers/common" + "github.com/pingcap/tidb-operator/pkg/pdapi/v1" + "github.com/pingcap/tidb-operator/pkg/timanager" + pdm "github.com/pingcap/tidb-operator/pkg/timanager/pd" + "github.com/pingcap/tidb-operator/pkg/utils/fake" + "github.com/pingcap/tidb-operator/pkg/utils/task/v3" +) + +const ( + defaultTestClusterName = "cluster" +) + +func TestTaskFinalizerAdd(t *testing.T) { + cases := []struct { + desc string + state common.PDGroupState + unexpectedErr bool + + expectedStatus task.Status + expectedObj *v1alpha1.PDGroup + }{ + { + desc: "no finalizer", + state: &state{ + pdg: fake.FakeObj[v1alpha1.PDGroup]("aaa"), + }, + expectedStatus: task.SComplete, + expectedObj: fake.FakeObj("aaa", func(obj *v1alpha1.PDGroup) *v1alpha1.PDGroup { + obj.Finalizers = append(obj.Finalizers, v1alpha1.Finalizer) + return obj + }), + }, + { + desc: "no finalizer and cannot call api", + state: &state{ + pdg: fake.FakeObj[v1alpha1.PDGroup]("aaa"), + }, + unexpectedErr: true, + expectedStatus: task.SFail, + }, + { + desc: "has another finalizer", + state: &state{ + pdg: fake.FakeObj("aaa", func(obj *v1alpha1.PDGroup) *v1alpha1.PDGroup { + obj.Finalizers = append(obj.Finalizers, "xxxx") + return obj + }), + }, + expectedStatus: task.SComplete, + expectedObj: fake.FakeObj("aaa", func(obj *v1alpha1.PDGroup) *v1alpha1.PDGroup { + obj.Finalizers = append(obj.Finalizers, "xxxx", v1alpha1.Finalizer) + return obj + }), + }, + { + desc: "already has the finalizer", + state: &state{ + pdg: fake.FakeObj("aaa", func(obj *v1alpha1.PDGroup) *v1alpha1.PDGroup { + obj.Finalizers = append(obj.Finalizers, v1alpha1.Finalizer) + return obj + }), + }, + expectedStatus: task.SComplete, + expectedObj: fake.FakeObj("aaa", func(obj *v1alpha1.PDGroup) *v1alpha1.PDGroup { + obj.Finalizers = append(obj.Finalizers, v1alpha1.Finalizer) + return obj + }), + }, + { + desc: "already has the finalizer and cannot call api", + state: &state{ + pdg: fake.FakeObj("aaa", func(obj *v1alpha1.PDGroup) *v1alpha1.PDGroup { + obj.Finalizers = append(obj.Finalizers, v1alpha1.Finalizer) + return obj + }), + }, + unexpectedErr: true, + expectedStatus: task.SComplete, + }, + } + + for i := range cases { + c := &cases[i] + t.Run(c.desc, func(tt *testing.T) { + tt.Parallel() + + fc := client.NewFakeClient(c.state.PDGroup()) + if c.unexpectedErr { + fc.WithError("*", "*", errors.NewInternalError(fmt.Errorf("fake internal err"))) + } + + ctx := context.Background() + res, done := task.RunTask(ctx, TaskFinalizerAdd(c.state, fc)) + assert.Equal(tt, c.expectedStatus, res.Status(), c.desc) + assert.False(tt, done, c.desc) + + // no need to check update result + if c.unexpectedErr { + return + } + + pdg := &v1alpha1.PDGroup{} + require.NoError(tt, fc.Get(ctx, client.ObjectKey{Name: "aaa"}, pdg), c.desc) + assert.Equal(tt, c.expectedObj, pdg, c.desc) + }) + } +} + +func TestTaskFinalizerDel(t *testing.T) { + now := metav1.Now() + cases := []struct { + desc string + state State + subresources []client.Object + unexpectedErr bool + + expectedStatus task.Status + isDeregistered bool + expectedObj *v1alpha1.PDGroup + }{ + { + desc: "no pd and no sub resources and no finalizer", + state: &state{ + pdg: fake.FakeObj("aaa", fake.DeleteTimestamp[v1alpha1.PDGroup](&now), func(obj *v1alpha1.PDGroup) *v1alpha1.PDGroup { + obj.Spec.Cluster.Name = defaultTestClusterName + return obj + }), + cluster: fake.FakeObj[v1alpha1.Cluster](defaultTestClusterName), + }, + isDeregistered: true, + expectedStatus: task.SComplete, + expectedObj: fake.FakeObj("aaa", fake.DeleteTimestamp[v1alpha1.PDGroup](&now), func(obj *v1alpha1.PDGroup) *v1alpha1.PDGroup { + obj.Spec.Cluster.Name = defaultTestClusterName + return obj + }), + }, + { + desc: "no pd and no sub resources", + state: &state{ + pdg: fake.FakeObj("aaa", fake.DeleteTimestamp[v1alpha1.PDGroup](&now), func(obj *v1alpha1.PDGroup) *v1alpha1.PDGroup { + obj.Spec.Cluster.Name = defaultTestClusterName + obj.Finalizers = append(obj.Finalizers, v1alpha1.Finalizer) + return obj + }), + cluster: fake.FakeObj[v1alpha1.Cluster](defaultTestClusterName), + }, + + isDeregistered: true, + expectedStatus: task.SComplete, + expectedObj: fake.FakeObj("aaa", fake.DeleteTimestamp[v1alpha1.PDGroup](&now), func(obj *v1alpha1.PDGroup) *v1alpha1.PDGroup { + obj.Spec.Cluster.Name = defaultTestClusterName + obj.Finalizers = []string{} + return obj + }), + }, + { + desc: "no pd and no sub resources but call api failed", + state: &state{ + pdg: fake.FakeObj("aaa", fake.DeleteTimestamp[v1alpha1.PDGroup](&now), func(obj *v1alpha1.PDGroup) *v1alpha1.PDGroup { + obj.Spec.Cluster.Name = defaultTestClusterName + obj.Finalizers = append(obj.Finalizers, v1alpha1.Finalizer) + return obj + }), + cluster: fake.FakeObj[v1alpha1.Cluster](defaultTestClusterName), + }, + unexpectedErr: true, + + expectedStatus: task.SFail, + }, + { + desc: "no pd but has sub resources", + state: &state{ + pdg: fake.FakeObj("aaa", fake.DeleteTimestamp[v1alpha1.PDGroup](&now), func(obj *v1alpha1.PDGroup) *v1alpha1.PDGroup { + obj.Spec.Cluster.Name = defaultTestClusterName + obj.Finalizers = append(obj.Finalizers, v1alpha1.Finalizer) + return obj + }), + cluster: fake.FakeObj[v1alpha1.Cluster](defaultTestClusterName), + }, + subresources: []client.Object{ + fake.FakeObj("aaa", + fake.Label[corev1.Service](v1alpha1.LabelKeyManagedBy, v1alpha1.LabelValManagedByOperator), + fake.Label[corev1.Service](v1alpha1.LabelKeyCluster, defaultTestClusterName), + fake.Label[corev1.Service](v1alpha1.LabelKeyComponent, v1alpha1.LabelValComponentPD), + fake.Label[corev1.Service](v1alpha1.LabelKeyGroup, "aaa"), + ), + }, + + expectedStatus: task.SWait, + expectedObj: fake.FakeObj("aaa", fake.DeleteTimestamp[v1alpha1.PDGroup](&now), func(obj *v1alpha1.PDGroup) *v1alpha1.PDGroup { + obj.Spec.Cluster.Name = defaultTestClusterName + obj.Finalizers = append(obj.Finalizers, v1alpha1.Finalizer) + return obj + }), + }, + { + desc: "no pd but has sub resources and call api failed", + state: &state{ + pdg: fake.FakeObj("aaa", fake.DeleteTimestamp[v1alpha1.PDGroup](&now), func(obj *v1alpha1.PDGroup) *v1alpha1.PDGroup { + obj.Spec.Cluster.Name = defaultTestClusterName + obj.Finalizers = append(obj.Finalizers, v1alpha1.Finalizer) + return obj + }), + cluster: fake.FakeObj[v1alpha1.Cluster](defaultTestClusterName), + }, + subresources: []client.Object{ + fake.FakeObj("aaa", + fake.Label[corev1.Service](v1alpha1.LabelKeyManagedBy, v1alpha1.LabelValManagedByOperator), + fake.Label[corev1.Service](v1alpha1.LabelKeyCluster, defaultTestClusterName), + fake.Label[corev1.Service](v1alpha1.LabelKeyComponent, v1alpha1.LabelValComponentPD), + fake.Label[corev1.Service](v1alpha1.LabelKeyGroup, "aaa"), + ), + }, + unexpectedErr: true, + + expectedStatus: task.SFail, + }, + { + desc: "has pd with finalizer", + state: &state{ + pdg: fake.FakeObj("aaa", fake.DeleteTimestamp[v1alpha1.PDGroup](&now), func(obj *v1alpha1.PDGroup) *v1alpha1.PDGroup { + obj.Spec.Cluster.Name = defaultTestClusterName + obj.Finalizers = append(obj.Finalizers, v1alpha1.Finalizer) + return obj + }), + cluster: fake.FakeObj[v1alpha1.Cluster](defaultTestClusterName), + pds: []*v1alpha1.PD{ + fake.FakeObj("aaa", func(obj *v1alpha1.PD) *v1alpha1.PD { + obj.Finalizers = append(obj.Finalizers, v1alpha1.Finalizer) + return obj + }), + }, + }, + + expectedStatus: task.SWait, + expectedObj: fake.FakeObj("aaa", fake.DeleteTimestamp[v1alpha1.PDGroup](&now), func(obj *v1alpha1.PDGroup) *v1alpha1.PDGroup { + obj.Spec.Cluster.Name = defaultTestClusterName + obj.Finalizers = append(obj.Finalizers, v1alpha1.Finalizer) + return obj + }), + }, + { + desc: "has pd with finalizer but call api failed", + state: &state{ + pdg: fake.FakeObj("aaa", fake.DeleteTimestamp[v1alpha1.PDGroup](&now), func(obj *v1alpha1.PDGroup) *v1alpha1.PDGroup { + obj.Spec.Cluster.Name = defaultTestClusterName + obj.Finalizers = append(obj.Finalizers, v1alpha1.Finalizer) + return obj + }), + cluster: fake.FakeObj[v1alpha1.Cluster](defaultTestClusterName), + pds: []*v1alpha1.PD{ + fake.FakeObj("aaa", func(obj *v1alpha1.PD) *v1alpha1.PD { + obj.Finalizers = append(obj.Finalizers, v1alpha1.Finalizer) + return obj + }), + }, + }, + unexpectedErr: true, + + expectedStatus: task.SFail, + }, + { + desc: "has deleting pd with finalizer but call api failed", + state: &state{ + pdg: fake.FakeObj("aaa", fake.DeleteTimestamp[v1alpha1.PDGroup](&now), func(obj *v1alpha1.PDGroup) *v1alpha1.PDGroup { + obj.Spec.Cluster.Name = defaultTestClusterName + obj.Finalizers = append(obj.Finalizers, v1alpha1.Finalizer) + return obj + }), + cluster: fake.FakeObj[v1alpha1.Cluster](defaultTestClusterName), + pds: []*v1alpha1.PD{ + fake.FakeObj("aaa", fake.DeleteNow[v1alpha1.PD](), func(obj *v1alpha1.PD) *v1alpha1.PD { + obj.Finalizers = append(obj.Finalizers, v1alpha1.Finalizer) + return obj + }), + }, + }, + unexpectedErr: true, + + expectedStatus: task.SFail, + }, + } + + for i := range cases { + c := &cases[i] + t.Run(c.desc, func(tt *testing.T) { + tt.Parallel() + + objs := []client.Object{ + c.state.PDGroup(), c.state.Cluster(), + } + + objs = append(objs, c.subresources...) + + fc := client.NewFakeClient(objs...) + if c.unexpectedErr { + // cannot remove finalizer + fc.WithError("update", "*", errors.NewInternalError(fmt.Errorf("fake internal err"))) + // cannot delete sub resources + fc.WithError("delete", "*", errors.NewInternalError(fmt.Errorf("fake internal err"))) + } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + m := NewFakePDClientManager() + m.Start(ctx) + require.NoError(tt, m.Register(c.state.PDGroup()), c.desc) + + res, done := task.RunTask(ctx, TaskFinalizerDel(c.state, fc, m)) + assert.Equal(tt, c.expectedStatus, res.Status(), c.desc) + assert.False(tt, done, c.desc) + + ck := c.state.Cluster() + _, ok := m.Get(pdm.PrimaryKey(ck.Namespace, ck.Name)) + assert.Equal(tt, c.isDeregistered, !ok, c.desc) + + // no need to check update result + if c.unexpectedErr { + return + } + + pdg := &v1alpha1.PDGroup{} + require.NoError(tt, fc.Get(ctx, client.ObjectKey{Name: "aaa"}, pdg), c.desc) + assert.Equal(tt, c.expectedObj, pdg, c.desc) + }) + } +} + +func NewFakePDClientManager() pdm.PDClientManager { + return timanager.NewManagerBuilder[*v1alpha1.PDGroup, pdapi.PDClient, pdm.PDClient](). + WithNewUnderlayClientFunc(func(*v1alpha1.PDGroup) (pdapi.PDClient, error) { + return nil, nil + }). + WithNewClientFunc(func(string, pdapi.PDClient, timanager.SharedInformerFactory[pdapi.PDClient]) pdm.PDClient { + return nil + }). + WithCacheKeysFunc(pdm.CacheKeys). + Build() +} diff --git a/pkg/controllers/pdgroup/tasks/state.go b/pkg/controllers/pdgroup/tasks/state.go new file mode 100644 index 0000000000..cb07887f34 --- /dev/null +++ b/pkg/controllers/pdgroup/tasks/state.go @@ -0,0 +1,89 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tasks + +import ( + "k8s.io/apimachinery/pkg/types" + + "github.com/pingcap/tidb-operator/apis/core/v1alpha1" + "github.com/pingcap/tidb-operator/pkg/controllers/common" +) + +type state struct { + key types.NamespacedName + + cluster *v1alpha1.Cluster + pdg *v1alpha1.PDGroup + pds []*v1alpha1.PD +} + +type State interface { + common.PDGroupStateInitializer + common.ClusterStateInitializer + common.PDSliceStateInitializer + + common.PDGroupState + common.ClusterState + common.PDSliceState +} + +func NewState(key types.NamespacedName) State { + s := &state{ + key: key, + } + return s +} + +func (s *state) PDGroup() *v1alpha1.PDGroup { + return s.pdg +} + +func (s *state) Cluster() *v1alpha1.Cluster { + return s.cluster +} + +func (s *state) PDSlice() []*v1alpha1.PD { + return s.pds +} + +func (s *state) PDGroupInitializer() common.PDGroupInitializer { + return common.NewResource(func(pdg *v1alpha1.PDGroup) { s.pdg = pdg }). + WithNamespace(common.Namespace(s.key.Namespace)). + WithName(common.Name(s.key.Name)). + Initializer() +} + +func (s *state) ClusterInitializer() common.ClusterInitializer { + return common.NewResource(func(cluster *v1alpha1.Cluster) { s.cluster = cluster }). + WithNamespace(common.Namespace(s.key.Namespace)). + WithName(common.NameFunc(func() string { + return s.pdg.Spec.Cluster.Name + })). + Initializer() +} + +func (s *state) PDSliceInitializer() common.PDSliceInitializer { + return common.NewResourceSlice(func(pds []*v1alpha1.PD) { s.pds = pds }). + WithNamespace(common.Namespace(s.key.Namespace)). + WithLabels(common.LabelsFunc(func() map[string]string { + return map[string]string{ + v1alpha1.LabelKeyManagedBy: v1alpha1.LabelValManagedByOperator, + v1alpha1.LabelKeyComponent: v1alpha1.LabelValComponentPD, + v1alpha1.LabelKeyCluster: s.cluster.Name, + v1alpha1.LabelKeyGroup: s.pdg.Name, + } + })). + Initializer() +} diff --git a/pkg/controllers/pdgroup/tasks/state_test.go b/pkg/controllers/pdgroup/tasks/state_test.go new file mode 100644 index 0000000000..7e29255691 --- /dev/null +++ b/pkg/controllers/pdgroup/tasks/state_test.go @@ -0,0 +1,105 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tasks + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/types" + + "github.com/pingcap/tidb-operator/apis/core/v1alpha1" + "github.com/pingcap/tidb-operator/pkg/client" + "github.com/pingcap/tidb-operator/pkg/controllers/common" + "github.com/pingcap/tidb-operator/pkg/utils/fake" + "github.com/pingcap/tidb-operator/pkg/utils/task/v3" +) + +func TestState(t *testing.T) { + cases := []struct { + desc string + key types.NamespacedName + objs []client.Object + + expected State + }{ + { + desc: "normal", + key: types.NamespacedName{ + Name: "aaa", + }, + objs: []client.Object{ + fake.FakeObj("aaa", func(obj *v1alpha1.PDGroup) *v1alpha1.PDGroup { + obj.Spec.Cluster.Name = "aaa" + return obj + }), + fake.FakeObj[v1alpha1.Cluster]("aaa"), + fake.FakeObj("aaa", func(obj *v1alpha1.PD) *v1alpha1.PD { + obj.Labels = map[string]string{ + v1alpha1.LabelKeyManagedBy: v1alpha1.LabelValManagedByOperator, + v1alpha1.LabelKeyComponent: v1alpha1.LabelValComponentPD, + v1alpha1.LabelKeyCluster: "aaa", + v1alpha1.LabelKeyGroup: "aaa", + } + return obj + }), + }, + + expected: &state{ + key: types.NamespacedName{ + Name: "aaa", + }, + pdg: fake.FakeObj("aaa", func(obj *v1alpha1.PDGroup) *v1alpha1.PDGroup { + obj.Spec.Cluster.Name = "aaa" + return obj + }), + cluster: fake.FakeObj[v1alpha1.Cluster]("aaa"), + pds: []*v1alpha1.PD{ + fake.FakeObj("aaa", func(obj *v1alpha1.PD) *v1alpha1.PD { + obj.Labels = map[string]string{ + v1alpha1.LabelKeyManagedBy: v1alpha1.LabelValManagedByOperator, + v1alpha1.LabelKeyComponent: v1alpha1.LabelValComponentPD, + v1alpha1.LabelKeyCluster: "aaa", + v1alpha1.LabelKeyGroup: "aaa", + } + return obj + }), + }, + }, + }, + } + + for i := range cases { + c := &cases[i] + t.Run(c.desc, func(tt *testing.T) { + tt.Parallel() + + fc := client.NewFakeClient(c.objs...) + + s := NewState(c.key) + + ctx := context.Background() + res, done := task.RunTask(ctx, task.Block( + common.TaskContextPDGroup(s, fc), + common.TaskContextCluster(s, fc), + common.TaskContextPDSlice(s, fc), + )) + assert.Equal(tt, task.SComplete, res.Status(), c.desc) + assert.False(tt, done, c.desc) + assert.Equal(tt, c.expected, s, c.desc) + }) + } +} diff --git a/pkg/controllers/pdgroup/tasks/status.go b/pkg/controllers/pdgroup/tasks/status.go index b22a40bf88..2f94842b4b 100644 --- a/pkg/controllers/pdgroup/tasks/status.go +++ b/pkg/controllers/pdgroup/tasks/status.go @@ -15,82 +15,138 @@ package tasks import ( - "github.com/go-logr/logr" + "context" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/pingcap/tidb-operator/apis/core/v1alpha1" "github.com/pingcap/tidb-operator/pkg/client" - "github.com/pingcap/tidb-operator/pkg/utils/task" + "github.com/pingcap/tidb-operator/pkg/utils/task/v3" ) -type TaskStatus struct { - Client client.Client - Logger logr.Logger -} +func TaskStatusSuspend(state State, c client.Client) task.Task { + return task.NameTaskFunc("StatusSuspend", func(ctx context.Context) task.Result { + suspendStatus := metav1.ConditionFalse + suspendMessage := "pd group is suspending" -func NewTaskStatus(logger logr.Logger, c client.Client) task.Task[ReconcileContext] { - return &TaskStatus{ - Client: c, - Logger: logger, - } -} + suspended := true + pdg := state.PDGroup() + pds := state.PDSlice() + for _, pd := range pds { + if !meta.IsStatusConditionTrue(pd.Status.Conditions, v1alpha1.PDCondSuspended) { + suspended = false + } + } + if suspended { + suspendStatus = metav1.ConditionTrue + suspendMessage = "pd group is suspended" + } + + needUpdate := meta.SetStatusCondition(&pdg.Status.Conditions, metav1.Condition{ + Type: v1alpha1.PDGroupCondSuspended, + Status: suspendStatus, + ObservedGeneration: pdg.Generation, + Reason: v1alpha1.PDGroupSuspendReason, + Message: suspendMessage, + }) + needUpdate = SetIfChanged(&pdg.Status.ObservedGeneration, pdg.Generation) || needUpdate -func (*TaskStatus) Name() string { - return "Status" + if needUpdate { + if err := c.Status().Update(ctx, state.PDGroup()); err != nil { + return task.Fail().With("cannot update status: %v", err) + } + } + + return task.Complete().With("status of suspend pd is updated") + }) } -func (t *TaskStatus) Sync(ctx task.Context[ReconcileContext]) task.Result { - rtx := ctx.Self() +func TaskStatus(state *ReconcileContext, c client.Client) task.Task { + return task.NameTaskFunc("Status", func(ctx context.Context) task.Result { + pdg := state.PDGroup() - suspendStatus := metav1.ConditionFalse - suspendMessage := "pd group is not suspended" - if rtx.Suspended { - suspendStatus = metav1.ConditionTrue - suspendMessage = "pd group is suspended" - } else if rtx.Cluster.ShouldSuspendCompute() { - suspendMessage = "pd group is suspending" - } - conditionChanged := meta.SetStatusCondition(&rtx.PDGroup.Status.Conditions, metav1.Condition{ - Type: v1alpha1.PDGroupCondSuspended, - Status: suspendStatus, - ObservedGeneration: rtx.PDGroup.Generation, - Reason: v1alpha1.PDGroupSuspendReason, - Message: suspendMessage, + needUpdate := meta.SetStatusCondition(&pdg.Status.Conditions, metav1.Condition{ + Type: v1alpha1.PDGroupCondSuspended, + Status: metav1.ConditionFalse, + ObservedGeneration: pdg.Generation, + Reason: v1alpha1.PDGroupSuspendReason, + Message: "pd group is not suspended", + }) + + replicas, readyReplicas, updateReplicas, currentReplicas := calcReplicas(state.PDSlice(), state.CurrentRevision, state.UpdateRevision) + + // all instances are updated + if updateReplicas == replicas { + // update current revision + state.CurrentRevision = state.UpdateRevision + // update status of pdg version + // TODO(liubo02): version of a group is hard to understand + // We need to change it to a more meaningful field + needUpdate = SetIfChanged(&pdg.Status.Version, pdg.Spec.Version) || needUpdate + } + + needUpdate = SetIfChanged(&pdg.Status.ObservedGeneration, pdg.Generation) || needUpdate + needUpdate = SetIfChanged(&pdg.Status.Replicas, replicas) || needUpdate + needUpdate = SetIfChanged(&pdg.Status.ReadyReplicas, readyReplicas) || needUpdate + needUpdate = SetIfChanged(&pdg.Status.UpdatedReplicas, updateReplicas) || needUpdate + needUpdate = SetIfChanged(&pdg.Status.CurrentReplicas, currentReplicas) || needUpdate + needUpdate = SetIfChanged(&pdg.Status.UpdateRevision, state.UpdateRevision) || needUpdate + needUpdate = SetIfChanged(&pdg.Status.CurrentRevision, state.CurrentRevision) || needUpdate + needUpdate = NewAndSetIfChanged(&pdg.Status.CollisionCount, state.CollisionCount) || needUpdate + + if needUpdate { + if err := c.Status().Update(ctx, pdg); err != nil { + return task.Fail().With("cannot update status: %w", err) + } + } + + if !state.IsBootstrapped { + return task.Wait().With("pd group may not be available, wait") + } + + return task.Complete().With("status is synced") }) +} - // Update the current revision if all instances are synced. - if int(rtx.PDGroup.GetDesiredReplicas()) == len(rtx.Peers) && v1alpha1.AllInstancesSynced(rtx.Peers, rtx.UpdateRevision) { - conditionChanged = true - rtx.CurrentRevision = rtx.UpdateRevision - rtx.PDGroup.Status.Version = rtx.PDGroup.Spec.Version - } - var readyReplicas int32 - for _, peer := range rtx.Peers { +func calcReplicas(pds []*v1alpha1.PD, currentRevision, updateRevision string) ( + replicas, + readyReplicas, + updateReplicas, + currentReplicas int32, +) { + for _, peer := range pds { + replicas++ if peer.IsHealthy() { readyReplicas++ } + if peer.CurrentRevision() == currentRevision { + currentReplicas++ + } + if peer.CurrentRevision() == updateRevision { + updateReplicas++ + } } - if conditionChanged || rtx.PDGroup.Status.ReadyReplicas != readyReplicas || - rtx.PDGroup.Status.Replicas != int32(len(rtx.Peers)) || //nolint:gosec // expected type conversion - !v1alpha1.IsReconciled(rtx.PDGroup) || - v1alpha1.StatusChanged(rtx.PDGroup, rtx.CommonStatus) { - rtx.PDGroup.Status.ReadyReplicas = readyReplicas - rtx.PDGroup.Status.Replicas = int32(len(rtx.Peers)) //nolint:gosec // expected type conversion - rtx.PDGroup.Status.ObservedGeneration = rtx.PDGroup.Generation - rtx.PDGroup.Status.CurrentRevision = rtx.CurrentRevision - rtx.PDGroup.Status.UpdateRevision = rtx.UpdateRevision - rtx.PDGroup.Status.CollisionCount = rtx.CollisionCount - - if err := t.Client.Status().Update(ctx, rtx.PDGroup); err != nil { - return task.Fail().With("cannot update status: %w", err) + return +} + +func NewAndSetIfChanged[T comparable](dst **T, src T) bool { + if *dst == nil { + zero := new(T) + if *zero == src { + return false } + *dst = zero } + return SetIfChanged(*dst, src) +} - if !rtx.IsAvailable && !rtx.Suspended { - return task.Fail().With("pd group may not be available, requeue to retry") +func SetIfChanged[T comparable](dst *T, src T) bool { + if *dst != src { + *dst = src + return true } - return task.Complete().With("status is synced") + return false } diff --git a/pkg/controllers/pdgroup/tasks/status_test.go b/pkg/controllers/pdgroup/tasks/status_test.go new file mode 100644 index 0000000000..dc9a9125ae --- /dev/null +++ b/pkg/controllers/pdgroup/tasks/status_test.go @@ -0,0 +1,475 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tasks + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/ptr" + + "github.com/pingcap/tidb-operator/apis/core/v1alpha1" + "github.com/pingcap/tidb-operator/pkg/client" + "github.com/pingcap/tidb-operator/pkg/utils/fake" + "github.com/pingcap/tidb-operator/pkg/utils/task/v3" +) + +func TestTaskStatusSuspend(t *testing.T) { + cases := []struct { + desc string + state State + unexpectedErr bool + + expectedStatus task.Status + expectedObj *v1alpha1.PDGroup + }{ + { + desc: "no pds", + state: &state{ + pdg: fake.FakeObj("aaa", fake.SetGeneration[v1alpha1.PDGroup](3)), + }, + + expectedStatus: task.SComplete, + expectedObj: fake.FakeObj("aaa", fake.SetGeneration[v1alpha1.PDGroup](3), func(obj *v1alpha1.PDGroup) *v1alpha1.PDGroup { + obj.Status.ObservedGeneration = 3 + obj.Status.Conditions = append(obj.Status.Conditions, metav1.Condition{ + Type: v1alpha1.PDGroupCondSuspended, + Status: metav1.ConditionTrue, + ObservedGeneration: 3, + Reason: v1alpha1.PDGroupSuspendReason, + Message: "pd group is suspended", + }) + return obj + }), + }, + { + desc: "all pds are suspended", + state: &state{ + pdg: fake.FakeObj("aaa", fake.SetGeneration[v1alpha1.PDGroup](3)), + pds: []*v1alpha1.PD{ + fake.FakeObj("aaa", func(obj *v1alpha1.PD) *v1alpha1.PD { + obj.Status.Conditions = append(obj.Status.Conditions, metav1.Condition{ + Type: v1alpha1.PDCondSuspended, + Status: metav1.ConditionTrue, + }) + return obj + }), + }, + }, + + expectedStatus: task.SComplete, + expectedObj: fake.FakeObj("aaa", fake.SetGeneration[v1alpha1.PDGroup](3), func(obj *v1alpha1.PDGroup) *v1alpha1.PDGroup { + obj.Status.ObservedGeneration = 3 + obj.Status.Conditions = append(obj.Status.Conditions, metav1.Condition{ + Type: v1alpha1.PDGroupCondSuspended, + Status: metav1.ConditionTrue, + ObservedGeneration: 3, + Reason: v1alpha1.PDGroupSuspendReason, + Message: "pd group is suspended", + }) + return obj + }), + }, + { + desc: "one pd is not suspended", + state: &state{ + pdg: fake.FakeObj("aaa", fake.SetGeneration[v1alpha1.PDGroup](3)), + pds: []*v1alpha1.PD{ + fake.FakeObj("aaa", func(obj *v1alpha1.PD) *v1alpha1.PD { + obj.Status.Conditions = append(obj.Status.Conditions, metav1.Condition{ + Type: v1alpha1.PDCondSuspended, + Status: metav1.ConditionFalse, + }) + return obj + }), + }, + }, + + expectedStatus: task.SComplete, + expectedObj: fake.FakeObj("aaa", fake.SetGeneration[v1alpha1.PDGroup](3), func(obj *v1alpha1.PDGroup) *v1alpha1.PDGroup { + obj.Status.ObservedGeneration = 3 + obj.Status.Conditions = append(obj.Status.Conditions, metav1.Condition{ + Type: v1alpha1.PDGroupCondSuspended, + Status: metav1.ConditionFalse, + ObservedGeneration: 3, + Reason: v1alpha1.PDGroupSuspendReason, + Message: "pd group is suspending", + }) + return obj + }), + }, + { + desc: "all pds are suspended but cannot call api", + state: &state{ + pdg: fake.FakeObj("aaa", fake.SetGeneration[v1alpha1.PDGroup](3)), + pds: []*v1alpha1.PD{ + fake.FakeObj("aaa", func(obj *v1alpha1.PD) *v1alpha1.PD { + obj.Status.Conditions = append(obj.Status.Conditions, metav1.Condition{ + Type: v1alpha1.PDCondSuspended, + Status: metav1.ConditionTrue, + }) + return obj + }), + }, + }, + unexpectedErr: true, + + expectedStatus: task.SFail, + }, + { + desc: "all pds are suspended and pdg is up to date and cannot call api", + state: &state{ + pdg: fake.FakeObj("aaa", fake.SetGeneration[v1alpha1.PDGroup](3), func(obj *v1alpha1.PDGroup) *v1alpha1.PDGroup { + obj.Status.ObservedGeneration = 3 + obj.Status.Conditions = append(obj.Status.Conditions, metav1.Condition{ + Type: v1alpha1.PDGroupCondSuspended, + Status: metav1.ConditionTrue, + ObservedGeneration: 3, + Reason: v1alpha1.PDGroupSuspendReason, + Message: "pd group is suspended", + }) + return obj + }), + pds: []*v1alpha1.PD{ + fake.FakeObj("aaa", func(obj *v1alpha1.PD) *v1alpha1.PD { + obj.Status.Conditions = append(obj.Status.Conditions, metav1.Condition{ + Type: v1alpha1.PDCondSuspended, + Status: metav1.ConditionTrue, + }) + return obj + }), + }, + }, + unexpectedErr: true, + + expectedStatus: task.SComplete, + }, + } + + for i := range cases { + c := &cases[i] + t.Run(c.desc, func(tt *testing.T) { + tt.Parallel() + + fc := client.NewFakeClient(c.state.PDGroup()) + if c.unexpectedErr { + fc.WithError("*", "*", errors.NewInternalError(fmt.Errorf("fake internal err"))) + } + + ctx := context.Background() + res, done := task.RunTask(ctx, TaskStatusSuspend(c.state, fc)) + assert.Equal(tt, c.expectedStatus, res.Status(), c.desc) + assert.False(tt, done, c.desc) + + // no need to check update result + if c.unexpectedErr { + return + } + + pdg := &v1alpha1.PDGroup{} + require.NoError(tt, fc.Get(ctx, client.ObjectKey{Name: "aaa"}, pdg), c.desc) + for i := range pdg.Status.Conditions { + cond := &pdg.Status.Conditions[i] + cond.LastTransitionTime = metav1.Time{} + } + assert.Equal(tt, c.expectedObj, pdg, c.desc) + }) + } +} + +func TestTaskStatus(t *testing.T) { + cases := []struct { + desc string + state *ReconcileContext + unexpectedErr bool + + expectedStatus task.Status + expectedObj *v1alpha1.PDGroup + }{ + { + desc: "no pds", + state: &ReconcileContext{ + State: &state{ + pdg: fake.FakeObj("aaa", fake.SetGeneration[v1alpha1.PDGroup](3)), + }, + UpdateRevision: newRevision, + CurrentRevision: oldRevision, + CollisionCount: 3, + IsBootstrapped: true, + }, + + expectedStatus: task.SComplete, + expectedObj: fake.FakeObj("aaa", fake.SetGeneration[v1alpha1.PDGroup](3), func(obj *v1alpha1.PDGroup) *v1alpha1.PDGroup { + obj.Status.Conditions = append(obj.Status.Conditions, metav1.Condition{ + Type: v1alpha1.PDGroupCondSuspended, + Status: metav1.ConditionFalse, + ObservedGeneration: 3, + Reason: v1alpha1.PDGroupSuspendReason, + Message: "pd group is not suspended", + }) + obj.Status.ObservedGeneration = 3 + obj.Status.Replicas = 0 + obj.Status.ReadyReplicas = 0 + obj.Status.UpdatedReplicas = 0 + obj.Status.CurrentReplicas = 0 + obj.Status.UpdateRevision = newRevision + obj.Status.CurrentRevision = newRevision + obj.Status.CollisionCount = ptr.To[int32](3) + return obj + }), + }, + { + desc: "no pds and not bootstrapped", + state: &ReconcileContext{ + State: &state{ + pdg: fake.FakeObj("aaa", fake.SetGeneration[v1alpha1.PDGroup](3)), + }, + UpdateRevision: newRevision, + CurrentRevision: oldRevision, + }, + + expectedStatus: task.SWait, + expectedObj: fake.FakeObj("aaa", fake.SetGeneration[v1alpha1.PDGroup](3), func(obj *v1alpha1.PDGroup) *v1alpha1.PDGroup { + obj.Status.Conditions = append(obj.Status.Conditions, metav1.Condition{ + Type: v1alpha1.PDGroupCondSuspended, + Status: metav1.ConditionFalse, + ObservedGeneration: 3, + Reason: v1alpha1.PDGroupSuspendReason, + Message: "pd group is not suspended", + }) + obj.Status.ObservedGeneration = 3 + obj.Status.Replicas = 0 + obj.Status.ReadyReplicas = 0 + obj.Status.UpdatedReplicas = 0 + obj.Status.CurrentReplicas = 0 + obj.Status.UpdateRevision = newRevision + obj.Status.CurrentRevision = newRevision + obj.Status.CollisionCount = nil + return obj + }), + }, + { + desc: "all pds are outdated and healthy", + state: &ReconcileContext{ + State: &state{ + pdg: fake.FakeObj("aaa", fake.SetGeneration[v1alpha1.PDGroup](3)), + pds: []*v1alpha1.PD{ + fake.FakeObj("aaa", func(obj *v1alpha1.PD) *v1alpha1.PD { + obj.Status.CurrentRevision = oldRevision + obj.Status.Conditions = append(obj.Status.Conditions, metav1.Condition{ + Type: v1alpha1.PDCondHealth, + Status: metav1.ConditionTrue, + }) + return obj + }), + }, + }, + UpdateRevision: newRevision, + CurrentRevision: oldRevision, + IsBootstrapped: true, + }, + + expectedStatus: task.SComplete, + expectedObj: fake.FakeObj("aaa", fake.SetGeneration[v1alpha1.PDGroup](3), func(obj *v1alpha1.PDGroup) *v1alpha1.PDGroup { + obj.Status.Conditions = append(obj.Status.Conditions, metav1.Condition{ + Type: v1alpha1.PDGroupCondSuspended, + Status: metav1.ConditionFalse, + ObservedGeneration: 3, + Reason: v1alpha1.PDGroupSuspendReason, + Message: "pd group is not suspended", + }) + obj.Status.ObservedGeneration = 3 + obj.Status.Replicas = 1 + obj.Status.ReadyReplicas = 1 + obj.Status.UpdatedReplicas = 0 + obj.Status.CurrentReplicas = 1 + obj.Status.UpdateRevision = newRevision + obj.Status.CurrentRevision = oldRevision + obj.Status.CollisionCount = nil + return obj + }), + }, + { + desc: "all pds are updated and healthy", + state: &ReconcileContext{ + State: &state{ + pdg: fake.FakeObj("aaa", fake.SetGeneration[v1alpha1.PDGroup](3)), + pds: []*v1alpha1.PD{ + fake.FakeObj("aaa", func(obj *v1alpha1.PD) *v1alpha1.PD { + obj.Status.CurrentRevision = newRevision + obj.Status.Conditions = append(obj.Status.Conditions, metav1.Condition{ + Type: v1alpha1.PDCondHealth, + Status: metav1.ConditionTrue, + }) + return obj + }), + }, + }, + UpdateRevision: newRevision, + CurrentRevision: oldRevision, + IsBootstrapped: true, + }, + + expectedStatus: task.SComplete, + expectedObj: fake.FakeObj("aaa", fake.SetGeneration[v1alpha1.PDGroup](3), func(obj *v1alpha1.PDGroup) *v1alpha1.PDGroup { + obj.Status.Conditions = append(obj.Status.Conditions, metav1.Condition{ + Type: v1alpha1.PDGroupCondSuspended, + Status: metav1.ConditionFalse, + ObservedGeneration: 3, + Reason: v1alpha1.PDGroupSuspendReason, + Message: "pd group is not suspended", + }) + obj.Status.ObservedGeneration = 3 + obj.Status.Replicas = 1 + obj.Status.ReadyReplicas = 1 + obj.Status.UpdatedReplicas = 1 + obj.Status.CurrentReplicas = 0 + obj.Status.UpdateRevision = newRevision + obj.Status.CurrentRevision = newRevision + obj.Status.CollisionCount = nil + return obj + }), + }, + { + desc: "all pds are updated but not healthy", + state: &ReconcileContext{ + State: &state{ + pdg: fake.FakeObj("aaa", fake.SetGeneration[v1alpha1.PDGroup](3)), + pds: []*v1alpha1.PD{ + fake.FakeObj("aaa", func(obj *v1alpha1.PD) *v1alpha1.PD { + obj.Status.CurrentRevision = newRevision + return obj + }), + }, + }, + UpdateRevision: newRevision, + CurrentRevision: oldRevision, + IsBootstrapped: true, + }, + + expectedStatus: task.SComplete, + expectedObj: fake.FakeObj("aaa", fake.SetGeneration[v1alpha1.PDGroup](3), func(obj *v1alpha1.PDGroup) *v1alpha1.PDGroup { + obj.Status.Conditions = append(obj.Status.Conditions, metav1.Condition{ + Type: v1alpha1.PDGroupCondSuspended, + Status: metav1.ConditionFalse, + ObservedGeneration: 3, + Reason: v1alpha1.PDGroupSuspendReason, + Message: "pd group is not suspended", + }) + obj.Status.ObservedGeneration = 3 + obj.Status.Replicas = 1 + obj.Status.ReadyReplicas = 0 + obj.Status.UpdatedReplicas = 1 + obj.Status.CurrentReplicas = 0 + obj.Status.UpdateRevision = newRevision + obj.Status.CurrentRevision = newRevision + obj.Status.CollisionCount = nil + return obj + }), + }, + { + desc: "status changed but cannot call api", + state: &ReconcileContext{ + State: &state{ + pdg: fake.FakeObj("aaa", fake.SetGeneration[v1alpha1.PDGroup](3)), + pds: []*v1alpha1.PD{ + fake.FakeObj("aaa", func(obj *v1alpha1.PD) *v1alpha1.PD { + obj.Status.CurrentRevision = newRevision + return obj + }), + }, + }, + UpdateRevision: newRevision, + CurrentRevision: oldRevision, + IsBootstrapped: true, + }, + unexpectedErr: true, + + expectedStatus: task.SFail, + }, + { + desc: "status is not changed and cannot call api", + state: &ReconcileContext{ + State: &state{ + pdg: fake.FakeObj("aaa", fake.SetGeneration[v1alpha1.PDGroup](3), func(obj *v1alpha1.PDGroup) *v1alpha1.PDGroup { + obj.Status.Conditions = append(obj.Status.Conditions, metav1.Condition{ + Type: v1alpha1.PDGroupCondSuspended, + Status: metav1.ConditionFalse, + ObservedGeneration: 3, + Reason: v1alpha1.PDGroupSuspendReason, + Message: "pd group is not suspended", + }) + obj.Status.ObservedGeneration = 3 + obj.Status.Replicas = 1 + obj.Status.ReadyReplicas = 0 + obj.Status.UpdatedReplicas = 1 + obj.Status.CurrentReplicas = 0 + obj.Status.UpdateRevision = newRevision + obj.Status.CurrentRevision = newRevision + obj.Status.CollisionCount = nil + return obj + }), + pds: []*v1alpha1.PD{ + fake.FakeObj("aaa", func(obj *v1alpha1.PD) *v1alpha1.PD { + obj.Status.CurrentRevision = newRevision + return obj + }), + }, + }, + UpdateRevision: newRevision, + CurrentRevision: oldRevision, + IsBootstrapped: true, + }, + unexpectedErr: true, + + expectedStatus: task.SComplete, + }, + } + + for i := range cases { + c := &cases[i] + t.Run(c.desc, func(tt *testing.T) { + tt.Parallel() + + fc := client.NewFakeClient(c.state.PDGroup()) + if c.unexpectedErr { + fc.WithError("*", "*", errors.NewInternalError(fmt.Errorf("fake internal err"))) + } + + ctx := context.Background() + res, done := task.RunTask(ctx, TaskStatus(c.state, fc)) + assert.Equal(tt, c.expectedStatus, res.Status(), c.desc) + assert.False(tt, done, c.desc) + + // no need to check update result + if c.unexpectedErr { + return + } + + pdg := &v1alpha1.PDGroup{} + require.NoError(tt, fc.Get(ctx, client.ObjectKey{Name: "aaa"}, pdg), c.desc) + for i := range pdg.Status.Conditions { + cond := &pdg.Status.Conditions[i] + cond.LastTransitionTime = metav1.Time{} + } + assert.Equal(tt, c.expectedObj, pdg, c.desc) + }) + } +} diff --git a/pkg/controllers/pdgroup/tasks/svc.go b/pkg/controllers/pdgroup/tasks/svc.go index 20f29640c3..9b9313daab 100644 --- a/pkg/controllers/pdgroup/tasks/svc.go +++ b/pkg/controllers/pdgroup/tasks/svc.go @@ -15,54 +15,35 @@ package tasks import ( + "context" "fmt" - "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" "github.com/pingcap/tidb-operator/apis/core/v1alpha1" "github.com/pingcap/tidb-operator/pkg/client" - "github.com/pingcap/tidb-operator/pkg/utils/task" + "github.com/pingcap/tidb-operator/pkg/controllers/common" + "github.com/pingcap/tidb-operator/pkg/utils/task/v3" ) -type TaskService struct { - Logger logr.Logger - Client client.Client -} - -func NewTaskService(logger logr.Logger, c client.Client) task.Task[ReconcileContext] { - return &TaskService{ - Logger: logger, - Client: c, - } -} - -func (*TaskService) Name() string { - return "Service" -} +func TaskService(state common.PDGroupState, c client.Client) task.Task { + return task.NameTaskFunc("Service", func(ctx context.Context) task.Result { + pdg := state.PDGroup() -func (t *TaskService) Sync(ctx task.Context[ReconcileContext]) task.Result { - rtx := ctx.Self() + headless := newHeadlessService(pdg) + if err := c.Apply(ctx, headless); err != nil { + return task.Fail().With(fmt.Sprintf("can't create headless service of pd: %v", err)) + } - if rtx.Cluster.ShouldSuspendCompute() { - return task.Complete().With("skip service for suspension") - } - - pdg := rtx.PDGroup - - headless := newHeadlessService(pdg) - if err := t.Client.Apply(ctx, headless); err != nil { - return task.Fail().With(fmt.Sprintf("can't create headless service of pd: %v", err)) - } - - svc := newInternalService(pdg) - if err := t.Client.Apply(ctx, svc); err != nil { - return task.Fail().With(fmt.Sprintf("can't create internal service of pd: %v", err)) - } + svc := newInternalService(pdg) + if err := c.Apply(ctx, svc); err != nil { + return task.Fail().With(fmt.Sprintf("can't create internal service of pd: %v", err)) + } - return task.Complete().With("services of pd have been applied") + return task.Complete().With("services of pd have been applied") + }) } func newHeadlessService(pdg *v1alpha1.PDGroup) *corev1.Service { diff --git a/pkg/controllers/pdgroup/tasks/svc_test.go b/pkg/controllers/pdgroup/tasks/svc_test.go new file mode 100644 index 0000000000..f97fd3ce0a --- /dev/null +++ b/pkg/controllers/pdgroup/tasks/svc_test.go @@ -0,0 +1,135 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tasks + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + + "github.com/pingcap/tidb-operator/apis/core/v1alpha1" + "github.com/pingcap/tidb-operator/pkg/client" + "github.com/pingcap/tidb-operator/pkg/utils/fake" + "github.com/pingcap/tidb-operator/pkg/utils/task/v3" +) + +func TestTaskService(t *testing.T) { + cases := []struct { + desc string + state State + objs []client.Object + unexpectedErr bool + + expectedStatus task.Status + }{ + { + desc: "no svc exists", + state: &state{ + pdg: fake.FakeObj[v1alpha1.PDGroup]("aaa"), + }, + + expectedStatus: task.SComplete, + }, + { + desc: "headless svc has exists", + state: &state{ + pdg: fake.FakeObj[v1alpha1.PDGroup]("aaa"), + }, + objs: []client.Object{ + fake.FakeObj[corev1.Service]("aaa-pd-peer"), + }, + + expectedStatus: task.SComplete, + }, + { + desc: "internal svc has exists", + state: &state{ + pdg: fake.FakeObj[v1alpha1.PDGroup]("aaa"), + }, + objs: []client.Object{ + fake.FakeObj[corev1.Service]("aaa-pd"), + }, + + expectedStatus: task.SComplete, + }, + { + desc: "apply headless svc with unexpected err", + state: &state{ + pdg: fake.FakeObj[v1alpha1.PDGroup]("aaa"), + }, + unexpectedErr: true, + + expectedStatus: task.SFail, + }, + { + desc: "apply internal svc with unexpected err", + state: &state{ + pdg: fake.FakeObj[v1alpha1.PDGroup]("aaa"), + }, + objs: []client.Object{ + newHeadlessService(fake.FakeObj[v1alpha1.PDGroup]("aaa")), + }, + unexpectedErr: true, + + expectedStatus: task.SFail, + }, + { + desc: "all svcs are updated with unexpected err", + state: &state{ + pdg: fake.FakeObj[v1alpha1.PDGroup]("aaa"), + }, + objs: []client.Object{ + newHeadlessService(fake.FakeObj[v1alpha1.PDGroup]("aaa")), + newInternalService(fake.FakeObj[v1alpha1.PDGroup]("aaa")), + }, + unexpectedErr: true, + + expectedStatus: task.SComplete, + }, + } + + for i := range cases { + c := &cases[i] + t.Run(c.desc, func(tt *testing.T) { + tt.Parallel() + + ctx := context.Background() + fc := client.NewFakeClient(c.state.PDGroup()) + for _, obj := range c.objs { + require.NoError(tt, fc.Apply(ctx, obj), c.desc) + } + + if c.unexpectedErr { + // cannot update svc + fc.WithError("patch", "*", errors.NewInternalError(fmt.Errorf("fake internal err"))) + } + + res, done := task.RunTask(ctx, TaskService(c.state, fc)) + assert.Equal(tt, c.expectedStatus, res.Status(), c.desc) + assert.False(tt, done, c.desc) + + if !c.unexpectedErr { + svcs := corev1.ServiceList{} + require.NoError(tt, fc.List(ctx, &svcs), c.desc) + assert.Len(tt, svcs.Items, 2, c.desc) + } + }) + } +} diff --git a/pkg/controllers/pdgroup/tasks/updater.go b/pkg/controllers/pdgroup/tasks/updater.go index 68fc7eab9c..e7483f6f26 100644 --- a/pkg/controllers/pdgroup/tasks/updater.go +++ b/pkg/controllers/pdgroup/tasks/updater.go @@ -15,14 +15,19 @@ package tasks import ( + "context" "fmt" "strconv" + "time" - "github.com/go-logr/logr" appsv1 "k8s.io/api/apps/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + + "github.com/go-logr/logr" "github.com/pingcap/tidb-operator/apis/core/v1alpha1" + "github.com/pingcap/tidb-operator/pkg/action" "github.com/pingcap/tidb-operator/pkg/client" "github.com/pingcap/tidb-operator/pkg/runtime" "github.com/pingcap/tidb-operator/pkg/updater" @@ -30,135 +35,119 @@ import ( "github.com/pingcap/tidb-operator/pkg/utils/k8s/revision" maputil "github.com/pingcap/tidb-operator/pkg/utils/map" "github.com/pingcap/tidb-operator/pkg/utils/random" - "github.com/pingcap/tidb-operator/pkg/utils/task" + "github.com/pingcap/tidb-operator/pkg/utils/task/v3" "github.com/pingcap/tidb-operator/third_party/kubernetes/pkg/controller/history" ) +const ( + defaultUpdateWaitTime = time.Second * 30 +) + // TaskUpdater is a task to scale or update PD when spec of PDGroup is changed. -type TaskUpdater struct { - Logger logr.Logger - Client client.Client - CRCli history.Interface -} +func TaskUpdater(state *ReconcileContext, c client.Client) task.Task { + return task.NameTaskFunc("Updater", func(ctx context.Context) task.Result { + logger := logr.FromContextOrDiscard(ctx) + historyCli := history.NewClient(c) + pdg := state.PDGroup() + + selector := labels.SelectorFromSet(labels.Set{ + // TODO(liubo02): add label of managed by operator ? + v1alpha1.LabelKeyCluster: pdg.Spec.Cluster.Name, + v1alpha1.LabelKeyComponent: v1alpha1.LabelValComponentPD, + v1alpha1.LabelKeyGroup: pdg.Name, + }) -func NewTaskUpdater(logger logr.Logger, c client.Client) task.Task[ReconcileContext] { - return &TaskUpdater{ - Logger: logger, - Client: c, - CRCli: history.NewClient(c), - } -} + revisions, err := historyCli.ListControllerRevisions(pdg, selector) + if err != nil { + return task.Fail().With("cannot list controller revisions: %w", err) + } + history.SortControllerRevisions(revisions) + + // Get the current(old) and update(new) ControllerRevisions + currentRevision, updateRevision, collisionCount, err := func() (*appsv1.ControllerRevision, *appsv1.ControllerRevision, int32, error) { + // always ignore bootstrapped field in spec + bootstrapped := pdg.Spec.Bootstrapped + pdg.Spec.Bootstrapped = false + defer func() { + pdg.Spec.Bootstrapped = bootstrapped + }() + return revision.GetCurrentAndUpdate(pdg, revisions, historyCli, pdg) + }() + if err != nil { + return task.Fail().With("cannot get revisions: %w", err) + } + state.CurrentRevision = currentRevision.Name + state.UpdateRevision = updateRevision.Name + state.CollisionCount = collisionCount + + // TODO(liubo02): add a controller to do it + if err = revision.TruncateHistory(historyCli, state.PDSlice(), revisions, + currentRevision, updateRevision, state.Cluster().Spec.RevisionHistoryLimit); err != nil { + logger.Error(err, "failed to truncate history") + } -func (*TaskUpdater) Name() string { - return "Updater" -} + checker := action.NewUpgradeChecker(c, state.Cluster(), logger) -func (t *TaskUpdater) Sync(ctx task.Context[ReconcileContext]) task.Result { - rtx := ctx.Self() + if needVersionUpgrade(pdg) && !checker.CanUpgrade(ctx, pdg) { + // TODO(liubo02): change to Wait + return task.Retry(defaultUpdateWaitTime).With("wait until preconditions of upgrading is met") + } - // TODO: move to task v2 - if !rtx.PDGroup.GetDeletionTimestamp().IsZero() { - return task.Complete().With("pd group has been deleted") - } + desired := 1 + if pdg.Spec.Replicas != nil { + desired = int(*pdg.Spec.Replicas) + } - if rtx.Cluster.ShouldSuspendCompute() { - return task.Complete().With("skip updating PDGroup for suspension") - } + var topos []v1alpha1.ScheduleTopology + for _, p := range pdg.Spec.SchedulePolicies { + switch p.Type { + case v1alpha1.SchedulePolicyTypeEvenlySpread: + topos = p.EvenlySpread.Topologies + default: + // do nothing + } + } - // List all controller revisions for the PDGroup - selector, _ := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{ - MatchLabels: map[string]string{ - v1alpha1.LabelKeyCluster: rtx.Cluster.Name, - v1alpha1.LabelKeyComponent: v1alpha1.LabelValComponentPD, - v1alpha1.LabelKeyGroup: rtx.PDGroup.Name, - }, - }) - revisions, err := t.CRCli.ListControllerRevisions(rtx.PDGroup, selector) - if err != nil { - return task.Fail().With("cannot list controller revisions: %w", err) - } - history.SortControllerRevisions(revisions) - - // Get the current(old) and update(new) ControllerRevisions - currentRevision, updateRevision, collisionCount, err := func() (*appsv1.ControllerRevision, *appsv1.ControllerRevision, int32, error) { - // always ignore bootstrapped field in spec - bootstrapped := rtx.PDGroup.Spec.Bootstrapped - rtx.PDGroup.Spec.Bootstrapped = false - defer func() { - rtx.PDGroup.Spec.Bootstrapped = bootstrapped - }() - return revision.GetCurrentAndUpdate(rtx.PDGroup, revisions, t.CRCli, rtx.PDGroup) - }() - if err != nil { - return task.Fail().With("cannot get revisions: %w", err) - } - rtx.CurrentRevision = currentRevision.Name - rtx.UpdateRevision = updateRevision.Name - rtx.CollisionCount = &collisionCount - - if err = revision.TruncateHistory(t.CRCli, rtx.Peers, revisions, - currentRevision, updateRevision, rtx.Cluster.Spec.RevisionHistoryLimit); err != nil { - t.Logger.Error(err, "failed to truncate history") - } - - if needVersionUpgrade(rtx.PDGroup) && !rtx.UpgradeChecker.CanUpgrade(ctx, rtx.PDGroup) { - return task.Fail().Continue().With("preconditions of upgrading the pd group %s/%s are not met", rtx.PDGroup.Namespace, rtx.PDGroup.Name) - } - - desired := 1 - if rtx.PDGroup.Spec.Replicas != nil { - desired = int(*rtx.PDGroup.Spec.Replicas) - } - - var topos []v1alpha1.ScheduleTopology - for _, p := range rtx.PDGroup.Spec.SchedulePolicies { - switch p.Type { - case v1alpha1.SchedulePolicyTypeEvenlySpread: - topos = p.EvenlySpread.Topologies - default: - // do nothing + topoPolicy, err := policy.NewTopologyPolicy[*runtime.PD](topos) + if err != nil { + return task.Fail().With("invalid topo policy, it should be validated: %w", err) } - } - - topoPolicy, err := policy.NewTopologyPolicy[*runtime.PD](topos) - if err != nil { - return task.Fail().With("invalid topo policy, it should be validated: %w", err) - } - - for _, pd := range rtx.Peers { - topoPolicy.Add(runtime.FromPD(pd)) - } - - wait, err := updater.New[*runtime.PD](). - WithInstances(runtime.FromPDSlice(rtx.Peers)...). - WithDesired(desired). - WithClient(t.Client). - WithMaxSurge(0). - WithMaxUnavailable(1). - WithRevision(rtx.UpdateRevision). - WithNewFactory(PDNewer(rtx.PDGroup, rtx.UpdateRevision)). - WithAddHooks(topoPolicy). - WithUpdateHooks( - policy.KeepName[*runtime.PD](), - policy.KeepTopology[*runtime.PD](), - ). - WithDelHooks(topoPolicy). - WithScaleInPreferPolicy( - NotLeaderPolicy(), - topoPolicy, - ). - WithUpdatePreferPolicy( - NotLeaderPolicy(), - ). - Build(). - Do(ctx) - if err != nil { - return task.Fail().With("cannot update instances: %w", err) - } - if wait { - return task.Complete().With("wait for all instances ready") - } - return task.Complete().With("all instances are synced") + + for _, pd := range state.PDSlice() { + topoPolicy.Add(runtime.FromPD(pd)) + } + + wait, err := updater.New[*runtime.PD](). + WithInstances(runtime.FromPDSlice(state.PDSlice())...). + WithDesired(desired). + WithClient(c). + WithMaxSurge(0). + WithMaxUnavailable(1). + WithRevision(state.UpdateRevision). + WithNewFactory(PDNewer(pdg, state.UpdateRevision)). + WithAddHooks(topoPolicy). + WithUpdateHooks( + policy.KeepName[*runtime.PD](), + policy.KeepTopology[*runtime.PD](), + ). + WithDelHooks(topoPolicy). + WithScaleInPreferPolicy( + NotLeaderPolicy(), + topoPolicy, + ). + WithUpdatePreferPolicy( + NotLeaderPolicy(), + ). + Build(). + Do(ctx) + if err != nil { + return task.Fail().With("cannot update instances: %w", err) + } + if wait { + return task.Wait().With("wait for all instances ready") + } + return task.Complete().With("all instances are synced") + }) } func needVersionUpgrade(pdg *v1alpha1.PDGroup) bool { diff --git a/pkg/controllers/pdgroup/tasks/updater_test.go b/pkg/controllers/pdgroup/tasks/updater_test.go new file mode 100644 index 0000000000..4cd9ba7017 --- /dev/null +++ b/pkg/controllers/pdgroup/tasks/updater_test.go @@ -0,0 +1,321 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tasks + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/ptr" + + "github.com/pingcap/tidb-operator/apis/core/v1alpha1" + "github.com/pingcap/tidb-operator/pkg/client" + "github.com/pingcap/tidb-operator/pkg/runtime" + "github.com/pingcap/tidb-operator/pkg/utils/fake" + "github.com/pingcap/tidb-operator/pkg/utils/task/v3" +) + +const ( + oldRevision = "aaa-c9f48df69" + newRevision = "aaa-6cd5c46fb8" +) + +func TestTaskUpdater(t *testing.T) { + cases := []struct { + desc string + state *ReconcileContext + unexpectedErr bool + + expectedStatus task.Status + expectedUpdateRevision string + expectedCurrentRevision string + expectedPDNum int + }{ + { + desc: "no pds with 1 replicas", + state: &ReconcileContext{ + State: &state{ + pdg: fake.FakeObj[v1alpha1.PDGroup]("aaa"), + cluster: fake.FakeObj[v1alpha1.Cluster]("cluster"), + }, + }, + + expectedStatus: task.SComplete, + expectedUpdateRevision: oldRevision, + expectedCurrentRevision: oldRevision, + expectedPDNum: 1, + }, + { + desc: "ignore bootstrapped field", + state: &ReconcileContext{ + State: &state{ + pdg: fake.FakeObj("aaa", func(obj *v1alpha1.PDGroup) *v1alpha1.PDGroup { + obj.Spec.Bootstrapped = true + return obj + }), + cluster: fake.FakeObj[v1alpha1.Cluster]("cluster"), + }, + }, + + expectedStatus: task.SComplete, + expectedUpdateRevision: oldRevision, + expectedCurrentRevision: oldRevision, + expectedPDNum: 1, + }, + { + desc: "version upgrade check", + state: &ReconcileContext{ + State: &state{ + pdg: fake.FakeObj("aaa", func(obj *v1alpha1.PDGroup) *v1alpha1.PDGroup { + // use an wrong version to trigger version check + // TODO(liubo02): it's not happened actually. Maybe remove whole checking + obj.Spec.Version = "xxx" + obj.Status.Version = "yyy" + return obj + }), + cluster: fake.FakeObj[v1alpha1.Cluster]("cluster"), + }, + }, + + expectedStatus: task.SRetry, + expectedUpdateRevision: "aaa-6d4b685647", + expectedCurrentRevision: "aaa-6d4b685647", + }, + { + desc: "1 updated pd with 1 replicas", + state: &ReconcileContext{ + State: &state{ + pdg: fake.FakeObj[v1alpha1.PDGroup]("aaa"), + cluster: fake.FakeObj[v1alpha1.Cluster]("cluster"), + pds: []*v1alpha1.PD{ + fakeAvailablePD("aaa-xxx", fake.FakeObj[v1alpha1.PDGroup]("aaa"), oldRevision), + }, + }, + }, + + expectedStatus: task.SComplete, + expectedUpdateRevision: oldRevision, + expectedCurrentRevision: oldRevision, + expectedPDNum: 1, + }, + { + desc: "no pds with 2 replicas", + state: &ReconcileContext{ + State: &state{ + pdg: fake.FakeObj("aaa", func(obj *v1alpha1.PDGroup) *v1alpha1.PDGroup { + obj.Spec.Replicas = ptr.To[int32](2) + return obj + }), + cluster: fake.FakeObj[v1alpha1.Cluster]("cluster"), + }, + }, + + expectedStatus: task.SComplete, + expectedUpdateRevision: newRevision, + expectedCurrentRevision: newRevision, + expectedPDNum: 2, + }, + { + desc: "no pds with 2 replicas and call api failed", + state: &ReconcileContext{ + State: &state{ + pdg: fake.FakeObj("aaa", func(obj *v1alpha1.PDGroup) *v1alpha1.PDGroup { + obj.Spec.Replicas = ptr.To[int32](2) + return obj + }), + cluster: fake.FakeObj[v1alpha1.Cluster]("cluster"), + }, + }, + unexpectedErr: true, + + expectedStatus: task.SFail, + expectedUpdateRevision: newRevision, + expectedCurrentRevision: newRevision, + }, + { + desc: "1 outdated pd with 2 replicas", + state: &ReconcileContext{ + State: &state{ + pdg: fake.FakeObj("aaa", func(obj *v1alpha1.PDGroup) *v1alpha1.PDGroup { + obj.Spec.Replicas = ptr.To[int32](2) + return obj + }), + cluster: fake.FakeObj[v1alpha1.Cluster]("cluster"), + pds: []*v1alpha1.PD{ + fakeAvailablePD("aaa-xxx", fake.FakeObj[v1alpha1.PDGroup]("aaa"), oldRevision), + }, + }, + }, + + expectedStatus: task.SWait, + expectedUpdateRevision: newRevision, + expectedCurrentRevision: newRevision, + expectedPDNum: 2, + }, + { + desc: "1 outdated pd with 2 replicas but cannot call api, will fail", + state: &ReconcileContext{ + State: &state{ + pdg: fake.FakeObj("aaa", func(obj *v1alpha1.PDGroup) *v1alpha1.PDGroup { + obj.Spec.Replicas = ptr.To[int32](2) + return obj + }), + cluster: fake.FakeObj[v1alpha1.Cluster]("cluster"), + pds: []*v1alpha1.PD{ + fakeAvailablePD("aaa-xxx", fake.FakeObj[v1alpha1.PDGroup]("aaa"), oldRevision), + }, + }, + }, + unexpectedErr: true, + + expectedStatus: task.SFail, + expectedUpdateRevision: newRevision, + expectedCurrentRevision: newRevision, + }, + { + desc: "2 updated pd with 2 replicas", + state: &ReconcileContext{ + State: &state{ + pdg: fake.FakeObj("aaa", func(obj *v1alpha1.PDGroup) *v1alpha1.PDGroup { + obj.Spec.Replicas = ptr.To[int32](2) + return obj + }), + cluster: fake.FakeObj[v1alpha1.Cluster]("cluster"), + pds: []*v1alpha1.PD{ + fakeAvailablePD("aaa-xxx", fake.FakeObj[v1alpha1.PDGroup]("aaa"), newRevision), + fakeAvailablePD("aaa-yyy", fake.FakeObj[v1alpha1.PDGroup]("aaa"), newRevision), + }, + }, + }, + + expectedStatus: task.SComplete, + expectedUpdateRevision: newRevision, + expectedCurrentRevision: newRevision, + expectedPDNum: 2, + }, + { + desc: "2 updated pd with 2 replicas and cannot call api, can complete", + state: &ReconcileContext{ + State: &state{ + pdg: fake.FakeObj("aaa", func(obj *v1alpha1.PDGroup) *v1alpha1.PDGroup { + obj.Spec.Replicas = ptr.To[int32](2) + return obj + }), + cluster: fake.FakeObj[v1alpha1.Cluster]("cluster"), + pds: []*v1alpha1.PD{ + fakeAvailablePD("aaa-xxx", fake.FakeObj[v1alpha1.PDGroup]("aaa"), newRevision), + fakeAvailablePD("aaa-yyy", fake.FakeObj[v1alpha1.PDGroup]("aaa"), newRevision), + }, + }, + }, + unexpectedErr: true, + + expectedStatus: task.SComplete, + expectedUpdateRevision: newRevision, + expectedCurrentRevision: newRevision, + expectedPDNum: 2, + }, + { + // NOTE: it not really check whether the policy is worked + // It should be tested in /pkg/updater and /pkg/updater/policy package + desc: "topology evenly spread", + state: &ReconcileContext{ + State: &state{ + pdg: fake.FakeObj("aaa", func(obj *v1alpha1.PDGroup) *v1alpha1.PDGroup { + obj.Spec.Replicas = ptr.To[int32](3) + obj.Spec.SchedulePolicies = append(obj.Spec.SchedulePolicies, v1alpha1.SchedulePolicy{ + Type: v1alpha1.SchedulePolicyTypeEvenlySpread, + EvenlySpread: &v1alpha1.SchedulePolicyEvenlySpread{ + Topologies: []v1alpha1.ScheduleTopology{ + { + Topology: v1alpha1.Topology{ + "zone": "us-west-1a", + }, + }, + { + Topology: v1alpha1.Topology{ + "zone": "us-west-1b", + }, + }, + { + Topology: v1alpha1.Topology{ + "zone": "us-west-1c", + }, + }, + }, + }, + }) + return obj + }), + cluster: fake.FakeObj[v1alpha1.Cluster]("cluster"), + }, + }, + + expectedStatus: task.SComplete, + expectedUpdateRevision: "aaa-5cd4fb5cb4", + expectedCurrentRevision: "aaa-5cd4fb5cb4", + expectedPDNum: 3, + }, + } + + for i := range cases { + c := &cases[i] + t.Run(c.desc, func(tt *testing.T) { + tt.Parallel() + + ctx := context.Background() + fc := client.NewFakeClient(c.state.PDGroup(), c.state.Cluster()) + for _, obj := range c.state.PDSlice() { + require.NoError(tt, fc.Apply(ctx, obj), c.desc) + } + + if c.unexpectedErr { + // cannot create or update pd instance + fc.WithError("patch", "pds", errors.NewInternalError(fmt.Errorf("fake internal err"))) + } + + res, done := task.RunTask(ctx, TaskUpdater(c.state, fc)) + assert.Equal(tt, c.expectedStatus, res.Status(), c.desc) + assert.False(tt, done, c.desc) + + assert.Equal(tt, c.expectedUpdateRevision, c.state.UpdateRevision, c.desc) + assert.Equal(tt, c.expectedCurrentRevision, c.state.CurrentRevision, c.desc) + if !c.unexpectedErr { + pds := v1alpha1.PDList{} + require.NoError(tt, fc.List(ctx, &pds), c.desc) + assert.Len(tt, pds.Items, c.expectedPDNum, c.desc) + } + }) + } +} + +func fakeAvailablePD(name string, pdg *v1alpha1.PDGroup, rev string) *v1alpha1.PD { + return fake.FakeObj(name, func(obj *v1alpha1.PD) *v1alpha1.PD { + pd := runtime.ToPD(PDNewer(pdg, rev).New()) + pd.Name = "" + pd.Status.Conditions = append(pd.Status.Conditions, metav1.Condition{ + Type: v1alpha1.PDCondHealth, + Status: metav1.ConditionTrue, + }) + pd.Status.CurrentRevision = rev + pd.DeepCopyInto(obj) + return obj + }) +} diff --git a/pkg/controllers/pdgroup/tasks/util.go b/pkg/controllers/pdgroup/tasks/util.go index 6d946f7f3c..4c08c1fe00 100644 --- a/pkg/controllers/pdgroup/tasks/util.go +++ b/pkg/controllers/pdgroup/tasks/util.go @@ -29,7 +29,7 @@ func HeadlessServiceName(groupName string) string { func NotLeaderPolicy() updater.PreferPolicy[*runtime.PD] { return updater.PreferPolicyFunc[*runtime.PD](func(pds []*runtime.PD) []*runtime.PD { - notLeader := []*runtime.PD{} + var notLeader []*runtime.PD for _, pd := range pds { if !pd.Status.IsLeader { notLeader = append(notLeader, pd) diff --git a/pkg/controllers/pdgroup/tasks/util_test.go b/pkg/controllers/pdgroup/tasks/util_test.go new file mode 100644 index 0000000000..ddf6aa2b5a --- /dev/null +++ b/pkg/controllers/pdgroup/tasks/util_test.go @@ -0,0 +1,99 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tasks + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/pingcap/tidb-operator/apis/core/v1alpha1" + "github.com/pingcap/tidb-operator/pkg/runtime" + "github.com/pingcap/tidb-operator/pkg/utils/fake" +) + +func TestHeadlessServiceName(t *testing.T) { + cases := []struct { + desc string + groupName string + expectedName string + }{ + { + desc: "normal", + groupName: "xxx", + expectedName: "xxx-pd-peer", + }, + } + + for i := range cases { + c := &cases[i] + t.Run(c.desc, func(tt *testing.T) { + tt.Parallel() + + assert.Equal(tt, c.expectedName, HeadlessServiceName(c.groupName), c.desc) + }) + } +} + +func TestNotLeaderPolicy(t *testing.T) { + cases := []struct { + desc string + pds []*v1alpha1.PD + expected []*v1alpha1.PD + }{ + { + desc: "none", + }, + { + desc: "no leader", + pds: []*v1alpha1.PD{ + fake.FakeObj[v1alpha1.PD]("aaa"), + fake.FakeObj[v1alpha1.PD]("bbb"), + fake.FakeObj[v1alpha1.PD]("ccc"), + }, + expected: []*v1alpha1.PD{ + fake.FakeObj[v1alpha1.PD]("aaa"), + fake.FakeObj[v1alpha1.PD]("bbb"), + fake.FakeObj[v1alpha1.PD]("ccc"), + }, + }, + { + desc: "filter leader", + pds: []*v1alpha1.PD{ + fake.FakeObj[v1alpha1.PD]("aaa"), + fake.FakeObj("bbb", func(obj *v1alpha1.PD) *v1alpha1.PD { + obj.Status.IsLeader = true + return obj + }), + fake.FakeObj[v1alpha1.PD]("ccc"), + }, + expected: []*v1alpha1.PD{ + fake.FakeObj[v1alpha1.PD]("aaa"), + fake.FakeObj[v1alpha1.PD]("ccc"), + }, + }, + } + + for i := range cases { + c := &cases[i] + t.Run(c.desc, func(tt *testing.T) { + tt.Parallel() + + policy := NotLeaderPolicy() + prefer := policy.Prefer(runtime.FromPDSlice(c.pds)) + assert.Equal(tt, c.expected, runtime.ToPDSlice(prefer), c.desc) + }) + } +} diff --git a/pkg/updater/builder.go b/pkg/updater/builder.go index e366e012b9..1244a3be18 100644 --- a/pkg/updater/builder.go +++ b/pkg/updater/builder.go @@ -56,6 +56,7 @@ type builder[PT runtime.Instance] struct { func (b *builder[PT]) Build() Executor { update, outdated := split(b.instances, b.rev) + updatePolicies := b.updatePreferPolicies updatePolicies = append(updatePolicies, PreferUnavailable[PT]()) actor := &actor[PT]{ diff --git a/pkg/utils/k8s/deletion.go b/pkg/utils/k8s/deletion.go index 32d3631857..503bc90b94 100644 --- a/pkg/utils/k8s/deletion.go +++ b/pkg/utils/k8s/deletion.go @@ -127,13 +127,47 @@ func DeleteInstanceSubresource[T runtime.Instance]( objs client.ObjectList, opts ...client.DeleteOption, ) (wait bool, _ error) { - if err := c.List(ctx, objs, client.InNamespace(instance.GetNamespace()), client.MatchingLabels{ + wait, err := deleteSubresource(ctx, c, instance.GetNamespace(), objs, map[string]string{ v1alpha1.LabelKeyManagedBy: v1alpha1.LabelValManagedByOperator, v1alpha1.LabelKeyInstance: instance.GetName(), v1alpha1.LabelKeyCluster: instance.Cluster(), v1alpha1.LabelKeyComponent: instance.Component(), - }); err != nil { - return false, fmt.Errorf("failed to list %T for instance %s/%s: %w", objs, instance.GetNamespace(), instance.GetName(), err) + }, opts...) + if err != nil { + return false, fmt.Errorf("cannot delete sub resource for instance %s/%s: %w", instance.GetNamespace(), instance.GetName(), err) + } + return wait, nil +} + +func DeleteGroupSubresource[T runtime.Group]( + ctx context.Context, + c client.Client, + group T, + objs client.ObjectList, + opts ...client.DeleteOption, +) (wait bool, _ error) { + wait, err := deleteSubresource(ctx, c, group.GetNamespace(), objs, map[string]string{ + v1alpha1.LabelKeyManagedBy: v1alpha1.LabelValManagedByOperator, + v1alpha1.LabelKeyCluster: group.Cluster(), + v1alpha1.LabelKeyComponent: group.Component(), + v1alpha1.LabelKeyGroup: group.GetName(), + }, opts...) + if err != nil { + return false, fmt.Errorf("cannot delete sub resource for group %s/%s: %w", group.GetNamespace(), group.GetName(), err) + } + return wait, nil +} + +func deleteSubresource( + ctx context.Context, + c client.Client, + ns string, + objs client.ObjectList, + labels map[string]string, + opts ...client.DeleteOption, +) (wait bool, _ error) { + if err := c.List(ctx, objs, client.InNamespace(ns), client.MatchingLabels(labels)); err != nil { + return false, fmt.Errorf("failed to list %T in %s: %w", objs, ns, err) } if meta.LenList(objs) == 0 { @@ -142,12 +176,12 @@ func DeleteInstanceSubresource[T runtime.Instance]( items, err := meta.ExtractList(objs) if err != nil { - return false, fmt.Errorf("failed to extract %T for instance %s/%s: %w", objs, instance.GetNamespace(), instance.GetName(), err) + return false, fmt.Errorf("failed to extract %T: %w", objs, err) } for _, item := range items { obj, ok := item.(client.Object) if !ok { - return false, fmt.Errorf("unexpected %T for instance %s/%s", item, instance.GetNamespace(), instance.GetName()) + return false, fmt.Errorf("unexpected %T", item) } if !obj.GetDeletionTimestamp().IsZero() { wait = true @@ -155,11 +189,9 @@ func DeleteInstanceSubresource[T runtime.Instance]( } if err := c.Delete(ctx, obj, opts...); err != nil { if !errors.IsNotFound(err) { - return false, fmt.Errorf("failed to delete sub resource %s/%s of instance %s/%s: %w", + return false, fmt.Errorf("failed to delete sub resource %s/%s: %w", obj.GetNamespace(), obj.GetName(), - instance.GetNamespace(), - instance.GetName(), err, ) } diff --git a/tests/e2e/pd/pd.go b/tests/e2e/pd/pd.go index f4b562776b..abaac8a436 100644 --- a/tests/e2e/pd/pd.go +++ b/tests/e2e/pd/pd.go @@ -120,7 +120,7 @@ var _ = ginkgo.Describe("PD", label.PD, func() { }() changeTime := time.Now() - ginkgo.By("Change replica of the PDGroup") + ginkgo.By("Change config of the PDGroup") f.Must(f.Client.Patch(ctx, pdg, patch)) f.Must(waiter.WaitForPodsRecreated(ctx, f.Client, runtime.FromPDGroup(pdg), changeTime, waiter.LongTaskTimeout)) f.WaitForPDGroupReady(ctx, pdg)