From 9474fd4afdf4166b2fac2861a0c1e6488d4f7b80 Mon Sep 17 00:00:00 2001 From: liubo02 Date: Tue, 24 Dec 2024 16:50:15 +0800 Subject: [PATCH 1/4] feat(common): add ut for common tasks and refactor task utils Signed-off-by: liubo02 --- pkg/client/alias.go | 29 +- pkg/client/fake.go | 70 ++-- pkg/controllers/common/cond.go | 16 +- pkg/controllers/common/cond_test.go | 148 +++++++++ pkg/controllers/common/interfaces.go | 59 ++++ pkg/controllers/common/interfaces_test.go | 89 +++++ pkg/controllers/common/resource.go | 165 ++++++++++ pkg/controllers/common/resource_test.go | 115 +++++++ pkg/controllers/common/task.go | 159 ++++----- pkg/controllers/common/task_test.go | 383 ++++++++++++++++++++++ pkg/controllers/pd/builder.go | 40 +-- pkg/controllers/pd/controller.go | 6 +- pkg/controllers/pd/tasks/cm.go | 16 +- pkg/controllers/pd/tasks/ctx.go | 105 +----- pkg/controllers/pd/tasks/finalizer.go | 26 +- pkg/controllers/pd/tasks/pod.go | 32 +- pkg/controllers/pd/tasks/pvc.go | 8 +- pkg/controllers/pd/tasks/state.go | 97 ++++++ pkg/controllers/pd/tasks/status.go | 65 ++-- pkg/utils/fake/fake.go | 10 +- pkg/utils/task/v3/runner.go | 7 +- pkg/utils/task/v3/runner_test.go | 22 +- pkg/utils/task/v3/task.go | 34 +- pkg/utils/task/v3/task_test.go | 51 +-- 24 files changed, 1377 insertions(+), 375 deletions(-) create mode 100644 pkg/controllers/common/cond_test.go create mode 100644 pkg/controllers/common/interfaces.go create mode 100644 pkg/controllers/common/interfaces_test.go create mode 100644 pkg/controllers/common/resource.go create mode 100644 pkg/controllers/common/resource_test.go create mode 100644 pkg/controllers/common/task_test.go create mode 100644 pkg/controllers/pd/tasks/state.go diff --git a/pkg/client/alias.go b/pkg/client/alias.go index 08b4a1853d..9754410579 100644 --- a/pkg/client/alias.go +++ b/pkg/client/alias.go @@ -18,17 +18,24 @@ import "sigs.k8s.io/controller-runtime/pkg/client" // Add alias of client.XXX to avoid import // two client pkgs -type Object = client.Object -type ObjectList = client.ObjectList -type ObjectKey = client.ObjectKey - -type Options = client.Options -type DeleteOption = client.DeleteOption - -type MatchingLabels = client.MatchingLabels -type MatchingFields = client.MatchingFields -type InNamespace = client.InNamespace -type ListOptions = client.ListOptions +type ( + Object = client.Object + ObjectList = client.ObjectList + ObjectKey = client.ObjectKey +) + +type ( + Options = client.Options + DeleteOption = client.DeleteOption + ListOption = client.ListOption +) + +type ( + MatchingLabels = client.MatchingLabels + MatchingFields = client.MatchingFields + InNamespace = client.InNamespace + ListOptions = client.ListOptions +) type PropagationPolicy = client.PropagationPolicy diff --git a/pkg/client/fake.go b/pkg/client/fake.go index 900319fc31..306ac06f6f 100644 --- a/pkg/client/fake.go +++ b/pkg/client/fake.go @@ -49,15 +49,34 @@ func (*fakeParser) Type(schema.GroupVersionKind) *typed.ParseableType { return &typed.DeducedParseableType } -func NewFakeClient(objs ...client.Object) Client { - c := newFakeUnderlayClient(objs...) +type FakeClient interface { + Client + WithError(verb, resource string, err error) +} + +func NewFakeClient(objs ...client.Object) FakeClient { + fc := newFakeUnderlayClient(objs...) - return &applier{ - WithWatch: c, - parser: &fakeParser{}, + return &fakeClient{ + Client: &applier{ + WithWatch: fc, + parser: &fakeParser{}, + }, + fc: fc, } } +func (fc *fakeClient) WithError(verb, resource string, err error) { + fc.fc.PrependReactor(verb, resource, func(action testing.Action) (bool, runtime.Object, error) { + return true, nil, err + }) +} + +type fakeClient struct { + Client + fc *fakeUnderlayClient +} + type fakeUnderlayClient struct { testing.Fake tracker testing.ObjectTracker @@ -67,7 +86,7 @@ type fakeUnderlayClient struct { var _ client.WithWatch = &fakeUnderlayClient{} -func newFakeUnderlayClient(objs ...client.Object) client.WithWatch { +func newFakeUnderlayClient(objs ...client.Object) *fakeUnderlayClient { t := testing.NewObjectTracker(scheme.Scheme, scheme.Codecs.UniversalDecoder()) for _, obj := range objs { if err := t.Add(obj); err != nil { @@ -536,37 +555,34 @@ func (c *fakeUnderlayClient) PatchReactionFunc(action *testing.PatchActionImpl) switch action.GetPatchType() { case types.JSONPatchType: - patch, err2 := jsonpatch.DecodePatch(action.GetPatch()) - if err2 != nil { - return true, nil, err2 + patch, err := jsonpatch.DecodePatch(action.GetPatch()) + if err != nil { + return true, nil, err } - modified, err2 := patch.Apply(old) - if err2 != nil { - return true, nil, err2 + modified, err := patch.Apply(old) + if err != nil { + return true, nil, err } - //nolint:gocritic // use := shadow err - if err2 = json.Unmarshal(modified, obj); err2 != nil { - return true, nil, err2 + if err := json.Unmarshal(modified, obj); err != nil { + return true, nil, err } case types.MergePatchType: - modified, err2 := jsonpatch.MergePatch(old, action.GetPatch()) - if err2 != nil { - return true, nil, err2 + modified, err := jsonpatch.MergePatch(old, action.GetPatch()) + if err != nil { + return true, nil, err } - //nolint:gocritic // use := shadow err - if err2 = json.Unmarshal(modified, obj); err2 != nil { - return true, nil, err2 + if err := json.Unmarshal(modified, obj); err != nil { + return true, nil, err } case types.StrategicMergePatchType: - mergedByte, err2 := strategicpatch.StrategicMergePatch(old, action.GetPatch(), obj) - if err2 != nil { - return true, nil, err2 + mergedByte, err := strategicpatch.StrategicMergePatch(old, action.GetPatch(), obj) + if err != nil { + return true, nil, err } - //nolint:gocritic // use := shadow err - if err2 = json.Unmarshal(mergedByte, obj); err2 != nil { - return true, nil, err2 + if err := json.Unmarshal(mergedByte, obj); err != nil { + return true, nil, err } case types.ApplyPatchType: patchObj := &unstructured.Unstructured{Object: map[string]any{}} diff --git a/pkg/controllers/common/cond.go b/pkg/controllers/common/cond.go index f397fa26e4..e9ad8e8e99 100644 --- a/pkg/controllers/common/cond.go +++ b/pkg/controllers/common/cond.go @@ -16,26 +16,26 @@ package common import "github.com/pingcap/tidb-operator/pkg/utils/task/v3" -func CondPDHasBeenDeleted(ctx PDGetter) task.Condition { +func CondPDHasBeenDeleted(ctx PDState) task.Condition { return task.CondFunc(func() bool { - return ctx.GetPD() == nil + return ctx.PD() == nil }) } -func CondPDIsDeleting(ctx PDGetter) task.Condition { +func CondPDIsDeleting(ctx PDState) task.Condition { return task.CondFunc(func() bool { - return !ctx.GetPD().GetDeletionTimestamp().IsZero() + return !ctx.PD().GetDeletionTimestamp().IsZero() }) } -func CondClusterIsSuspending(ctx ClusterGetter) task.Condition { +func CondClusterIsSuspending(ctx ClusterState) task.Condition { return task.CondFunc(func() bool { - return ctx.GetCluster().ShouldSuspendCompute() + return ctx.Cluster().ShouldSuspendCompute() }) } -func CondClusterIsPaused(ctx ClusterGetter) task.Condition { +func CondClusterIsPaused(ctx ClusterState) task.Condition { return task.CondFunc(func() bool { - return ctx.GetCluster().ShouldPauseReconcile() + return ctx.Cluster().ShouldPauseReconcile() }) } diff --git a/pkg/controllers/common/cond_test.go b/pkg/controllers/common/cond_test.go new file mode 100644 index 0000000000..2cb771c31a --- /dev/null +++ b/pkg/controllers/common/cond_test.go @@ -0,0 +1,148 @@ +package common + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/pingcap/tidb-operator/apis/core/v1alpha1" + "github.com/pingcap/tidb-operator/pkg/utils/fake" +) + +func TestCondPDHasBeenDeleted(t *testing.T) { + cases := []struct { + desc string + state *fakeState[v1alpha1.PD] + expectedCond bool + }{ + { + desc: "cond is false", + state: &fakeState[v1alpha1.PD]{ + obj: fake.FakeObj[v1alpha1.PD]("test"), + }, + }, + { + desc: "cond is true", + state: &fakeState[v1alpha1.PD]{}, + expectedCond: true, + }, + } + + for i := range cases { + c := &cases[i] + t.Run(c.desc, func(tt *testing.T) { + tt.Parallel() + + s := &fakePDState{s: c.state} + cond := CondPDHasBeenDeleted(s) + assert.Equal(tt, c.expectedCond, cond.Satisfy(), c.desc) + }) + } +} + +func TestCondPDIsDeleting(t *testing.T) { + cases := []struct { + desc string + state *fakeState[v1alpha1.PD] + expectedCond bool + }{ + { + desc: "cond is false", + state: &fakeState[v1alpha1.PD]{ + obj: fake.FakeObj[v1alpha1.PD]("test"), + }, + }, + { + desc: "cond is true", + state: &fakeState[v1alpha1.PD]{ + obj: fake.FakeObj("test", fake.DeleteNow[v1alpha1.PD]()), + }, + expectedCond: true, + }, + } + + for i := range cases { + c := &cases[i] + t.Run(c.desc, func(tt *testing.T) { + tt.Parallel() + + s := &fakePDState{s: c.state} + cond := CondPDIsDeleting(s) + assert.Equal(tt, c.expectedCond, cond.Satisfy(), c.desc) + }) + } +} + +func TestCondClusterIsSuspending(t *testing.T) { + cases := []struct { + desc string + state *fakeState[v1alpha1.Cluster] + expectedCond bool + }{ + { + desc: "cond is false", + state: &fakeState[v1alpha1.Cluster]{ + obj: fake.FakeObj[v1alpha1.Cluster]("test"), + }, + }, + { + desc: "cond is true", + state: &fakeState[v1alpha1.Cluster]{ + obj: fake.FakeObj("test", func(obj *v1alpha1.Cluster) *v1alpha1.Cluster { + obj.Spec.SuspendAction = &v1alpha1.SuspendAction{ + SuspendCompute: true, + } + return obj + }), + }, + expectedCond: true, + }, + } + + for i := range cases { + c := &cases[i] + t.Run(c.desc, func(tt *testing.T) { + tt.Parallel() + + s := &fakeClusterState{s: c.state} + cond := CondClusterIsSuspending(s) + assert.Equal(tt, c.expectedCond, cond.Satisfy(), c.desc) + }) + } +} + +func TestCondClusterIsPaused(t *testing.T) { + cases := []struct { + desc string + state *fakeState[v1alpha1.Cluster] + expectedCond bool + }{ + { + desc: "cond is false", + state: &fakeState[v1alpha1.Cluster]{ + obj: fake.FakeObj[v1alpha1.Cluster]("test"), + }, + }, + { + desc: "cond is true", + state: &fakeState[v1alpha1.Cluster]{ + obj: fake.FakeObj("test", func(obj *v1alpha1.Cluster) *v1alpha1.Cluster { + obj.Spec.Paused = true + return obj + }), + }, + expectedCond: true, + }, + } + + for i := range cases { + c := &cases[i] + t.Run(c.desc, func(tt *testing.T) { + tt.Parallel() + + s := &fakeClusterState{s: c.state} + cond := CondClusterIsPaused(s) + assert.Equal(tt, c.expectedCond, cond.Satisfy(), c.desc) + }) + } +} diff --git a/pkg/controllers/common/interfaces.go b/pkg/controllers/common/interfaces.go new file mode 100644 index 0000000000..c63e283252 --- /dev/null +++ b/pkg/controllers/common/interfaces.go @@ -0,0 +1,59 @@ +package common + +import ( + corev1 "k8s.io/api/core/v1" + + "github.com/pingcap/tidb-operator/apis/core/v1alpha1" + "github.com/pingcap/tidb-operator/pkg/client" +) + +type Object[T any] interface { + client.Object + *T +} + +type ObjectList[T any] interface { + client.ObjectList + *T +} + +type ( + PDInitializer = ResourceInitializer[*v1alpha1.PD] + + ClusterInitializer = ResourceInitializer[*v1alpha1.Cluster] + + PodInitializer = ResourceInitializer[*corev1.Pod] + PDSliceInitializer = ResourceSliceInitializer[*v1alpha1.PD] +) + +type PDStateInitializer interface { + PDInitializer() PDInitializer +} + +type PDState interface { + PD() *v1alpha1.PD +} + +type ClusterStateInitializer interface { + ClusterInitializer() ClusterInitializer +} + +type ClusterState interface { + Cluster() *v1alpha1.Cluster +} + +type PodStateInitializer interface { + PodInitializer() PodInitializer +} + +type PodState interface { + Pod() *corev1.Pod +} + +type PDSliceStateInitializer interface { + PDSliceInitializer() PDSliceInitializer +} + +type PDSliceState interface { + PDSlice() []*v1alpha1.PD +} diff --git a/pkg/controllers/common/interfaces_test.go b/pkg/controllers/common/interfaces_test.go new file mode 100644 index 0000000000..f865e585f4 --- /dev/null +++ b/pkg/controllers/common/interfaces_test.go @@ -0,0 +1,89 @@ +package common + +import ( + corev1 "k8s.io/api/core/v1" + + "github.com/pingcap/tidb-operator/apis/core/v1alpha1" +) + +type fakeState[T any] struct { + ns string + name string + obj *T +} + +func (f *fakeState[T]) Object() *T { + return f.obj +} + +func (f *fakeState[T]) Initializer() ResourceInitializer[*T] { + return NewResource(func(obj *T) { f.obj = obj }). + WithNamespace(Namespace(f.ns)). + WithName(Name(f.name)). + Initializer() +} + +type fakeSliceState[T any] struct { + ns string + labels map[string]string + objs []*T +} + +func (f *fakeSliceState[T]) Slice() []*T { + return f.objs +} + +func (f *fakeSliceState[T]) Initializer() ResourceSliceInitializer[*T] { + return NewResourceSlice(func(objs []*T) { f.objs = objs }). + WithNamespace(Namespace(f.ns)). + WithLabels(Labels(f.labels)). + Initializer() +} + +type fakePDState struct { + s *fakeState[v1alpha1.PD] +} + +func (f *fakePDState) PD() *v1alpha1.PD { + return f.s.Object() +} + +func (f *fakePDState) PDInitializer() PDInitializer { + return f.s.Initializer() +} + +type fakeClusterState struct { + s *fakeState[v1alpha1.Cluster] +} + +func (f *fakeClusterState) Cluster() *v1alpha1.Cluster { + return f.s.Object() +} + +func (f *fakeClusterState) ClusterInitializer() ClusterInitializer { + return f.s.Initializer() +} + +type fakePodState struct { + s *fakeState[corev1.Pod] +} + +func (f *fakePodState) Pod() *corev1.Pod { + return f.s.Object() +} + +func (f *fakePodState) PodInitializer() PodInitializer { + return f.s.Initializer() +} + +type fakePDSliceState struct { + s *fakeSliceState[v1alpha1.PD] +} + +func (f *fakePDSliceState) PDSlice() []*v1alpha1.PD { + return f.s.Slice() +} + +func (f *fakePDSliceState) PDSliceInitializer() PDSliceInitializer { + return f.s.Initializer() +} diff --git a/pkg/controllers/common/resource.go b/pkg/controllers/common/resource.go new file mode 100644 index 0000000000..45178886ed --- /dev/null +++ b/pkg/controllers/common/resource.go @@ -0,0 +1,165 @@ +package common + +type Setter[T any] interface { + Set(T) +} + +type NamespaceOption interface { + Namespace() string +} + +type NameOption interface { + Name() string +} + +type LabelsOption interface { + Labels() map[string]string +} + +type GetOptions interface { + NamespaceOption + NameOption +} + +type ListOptions interface { + NamespaceOption + LabelsOption +} + +type NameFunc func() string + +func (f NameFunc) Namespace() string { + return f() +} + +func (f NameFunc) Name() string { + return f() +} + +type Namespace string + +func (n Namespace) Namespace() string { + return string(n) +} + +type Name string + +func (n Name) Name() string { + return string(n) +} + +type Labels map[string]string + +func (l Labels) Labels() map[string]string { + return l +} + +type LabelsFunc func() map[string]string + +func (f LabelsFunc) Labels() map[string]string { + return f() +} + +type SetFunc[T any] func(T) + +func (f SetFunc[T]) Set(obj T) { + f(obj) +} + +type ResourceInitializer[T any] interface { + GetOptions + Setter[T] +} + +type Resource[T any] interface { + WithNamespace(NamespaceOption) Resource[T] + WithName(NameOption) Resource[T] + Initializer() ResourceInitializer[T] +} + +func NewResource[T any](setter SetFunc[T]) Resource[T] { + return &resource[T]{ + setter: setter, + } +} + +type resource[T any] struct { + setter Setter[T] + ns NamespaceOption + name NameOption +} + +func (r *resource[T]) Set(obj T) { + r.setter.Set(obj) +} + +func (r *resource[T]) WithNamespace(ns NamespaceOption) Resource[T] { + r.ns = ns + return r +} + +func (r *resource[T]) WithName(name NameOption) Resource[T] { + r.name = name + return r +} + +func (r *resource[T]) Namespace() string { + return r.ns.Namespace() +} + +func (r *resource[T]) Name() string { + return r.name.Name() +} + +func (r *resource[T]) Initializer() ResourceInitializer[T] { + return r +} + +type ResourceSliceInitializer[T any] interface { + ListOptions + Setter[[]T] +} + +type ResourceSlice[T any] interface { + WithNamespace(NamespaceOption) ResourceSlice[T] + WithLabels(LabelsOption) ResourceSlice[T] + Initializer() ResourceSliceInitializer[T] +} + +func NewResourceSlice[T any](setter SetFunc[[]T]) ResourceSlice[T] { + return &resourceSlice[T]{ + setter: setter, + } +} + +type resourceSlice[T any] struct { + ns NamespaceOption + labels LabelsOption + setter Setter[[]T] +} + +func (r *resourceSlice[T]) Namespace() string { + return r.ns.Namespace() +} + +func (r *resourceSlice[T]) Labels() map[string]string { + return r.labels.Labels() +} + +func (r *resourceSlice[T]) Set(objs []T) { + r.setter.Set(objs) +} + +func (r *resourceSlice[T]) WithNamespace(ns NamespaceOption) ResourceSlice[T] { + r.ns = ns + return r +} + +func (r *resourceSlice[T]) WithLabels(labels LabelsOption) ResourceSlice[T] { + r.labels = labels + return r +} + +func (r *resourceSlice[T]) Initializer() ResourceSliceInitializer[T] { + return r +} diff --git a/pkg/controllers/common/resource_test.go b/pkg/controllers/common/resource_test.go new file mode 100644 index 0000000000..f104f4c800 --- /dev/null +++ b/pkg/controllers/common/resource_test.go @@ -0,0 +1,115 @@ +package common + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestResource(t *testing.T) { + cases := []struct { + desc string + ns NamespaceOption + name NameOption + obj int + expectedNs string + expectedName string + expectedObj int + }{ + { + desc: "normal", + ns: Namespace("aaa"), + name: Name("bbb"), + obj: 42, + expectedNs: "aaa", + expectedName: "bbb", + expectedObj: 42, + }, + { + desc: "use name func", + ns: NameFunc(func() string { return "aaa" }), + name: NameFunc(func() string { return "bbb" }), + obj: 42, + expectedNs: "aaa", + expectedName: "bbb", + expectedObj: 42, + }, + } + + for i := range cases { + c := &cases[i] + t.Run(c.desc, func(tt *testing.T) { + tt.Parallel() + + var obj int + r := NewResource(func(t int) { + obj = t + }). + WithNamespace(c.ns). + WithName(c.name). + Initializer() + + r.Set(c.obj) + + assert.Equal(tt, c.expectedNs, r.Namespace(), c.desc) + assert.Equal(tt, c.expectedName, r.Name(), c.desc) + assert.Equal(tt, c.expectedObj, obj, c.desc) + }) + } +} + +func TestResourceSlice(t *testing.T) { + cases := []struct { + desc string + ns NamespaceOption + labels LabelsOption + objs []int + expectedNs string + expectedLabels map[string]string + expectedObjs []int + }{ + { + desc: "normal", + ns: Namespace("aaa"), + labels: Labels(map[string]string{"xxx": "yyy"}), + objs: []int{42}, + expectedNs: "aaa", + expectedLabels: map[string]string{ + "xxx": "yyy", + }, + expectedObjs: []int{42}, + }, + { + desc: "use func", + ns: NameFunc(func() string { return "aaa" }), + labels: LabelsFunc(func() map[string]string { return map[string]string{"xxx": "yyy"} }), + objs: []int{42}, + expectedNs: "aaa", + expectedLabels: map[string]string{ + "xxx": "yyy", + }, + expectedObjs: []int{42}, + }, + } + + for i := range cases { + c := &cases[i] + t.Run(c.desc, func(tt *testing.T) { + tt.Parallel() + + var objs []int + r := NewResourceSlice(func(t []int) { + objs = t + }). + WithNamespace(c.ns). + WithLabels(c.labels). + Initializer() + + r.Set(c.objs) + + assert.Equal(tt, c.expectedNs, r.Namespace(), c.desc) + assert.Equal(tt, c.expectedLabels, r.Labels(), c.desc) + assert.Equal(tt, c.expectedObjs, objs, c.desc) + }) + } +} diff --git a/pkg/controllers/common/task.go b/pkg/controllers/common/task.go index 2b906e0a22..c973cfe93c 100644 --- a/pkg/controllers/common/task.go +++ b/pkg/controllers/common/task.go @@ -17,10 +17,13 @@ package common import ( "cmp" "context" + "fmt" "slices" + "strings" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + kuberuntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "github.com/pingcap/tidb-operator/apis/core/v1alpha1" @@ -28,119 +31,90 @@ import ( "github.com/pingcap/tidb-operator/pkg/utils/task/v3" ) -type PDContextSetter interface { - context.Context - PDKey() types.NamespacedName - SetPD(pd *v1alpha1.PD) -} - -type PDGetter interface { - GetPD() *v1alpha1.PD -} - -func TaskContextPD(ctx PDContextSetter, c client.Client) task.Task { - return task.NameTaskFunc("ContextPD", func() task.Result { - var pd v1alpha1.PD - if err := c.Get(ctx, ctx.PDKey(), &pd); err != nil { +func taskContextResource[T any, PT Object[T]](name string, w ResourceInitializer[PT], c client.Client, shouldExist bool) task.Task { + return task.NameTaskFunc("Context"+name, func(ctx context.Context) task.Result { + var obj PT = new(T) + key := types.NamespacedName{ + Namespace: w.Namespace(), + Name: w.Name(), + } + if err := c.Get(ctx, key, obj); err != nil { if !errors.IsNotFound(err) { - return task.Fail().With("can't get pd instance %s: %v", ctx.PDKey(), err) + return task.Fail().With("can't get %s: %v", key, err) } - return task.Complete().With("pd instance has been deleted") - } - ctx.SetPD(&pd) - return task.Complete().With("pd is set") - }) -} - -type ClusterContextSetter interface { - context.Context - ClusterKey() types.NamespacedName - SetCluster(cluster *v1alpha1.Cluster) -} - -type ClusterGetter interface { - GetCluster() *v1alpha1.Cluster -} + if shouldExist { + return task.Fail().With("cannot find %s: %v", key, err) + } -func TaskContextCluster(ctx ClusterContextSetter, c client.Client) task.Task { - return task.NameTaskFunc("ContextCluster", func() task.Result { - var cluster v1alpha1.Cluster - if err := c.Get(ctx, ctx.ClusterKey(), &cluster); err != nil { - return task.Fail().With("cannot find cluster %s: %v", ctx.ClusterKey(), err) + return task.Complete().With("obj %s does not exist", key) } - ctx.SetCluster(&cluster) - return task.Complete().With("cluster is set") + w.Set(obj) + return task.Complete().With("%s is set", strings.ToLower(name)) }) } -type PodContextSetter interface { - context.Context - PodKey() types.NamespacedName - SetPod(pod *corev1.Pod) -} - -type PodGetter interface { - GetPod() *corev1.Pod -} +func taskContextResourceSlice[T any, L any, PT Object[T], PL ObjectList[L]]( + name string, + w ResourceSliceInitializer[PT], + c client.Client, +) task.Task { + return task.NameTaskFunc("Context"+name, func(ctx context.Context) task.Result { + var l PL = new(L) + ns := w.Namespace() + labels := w.Labels() + + if err := c.List(ctx, l, client.InNamespace(ns), client.MatchingLabels(labels)); err != nil { + return task.Fail().With("cannot list objs: %v", err) + } -func TaskContextPod(ctx PodContextSetter, c client.Client) task.Task { - return task.NameTaskFunc("ContextPod", func() task.Result { - var pod corev1.Pod - if err := c.Get(ctx, ctx.PodKey(), &pod); err != nil { - if errors.IsNotFound(err) { - return task.Complete().With("pod is not created") + objs := make([]PT, 0, meta.LenList(l)) + if err := meta.EachListItem(l, func(item kuberuntime.Object) error { + obj, ok := item.(PT) + if !ok { + // unreachable + return fmt.Errorf("cannot convert item") } - return task.Fail().With("failed to get pod %s: %v", ctx.PodKey(), err) + objs = append(objs, obj) + return nil + }); err != nil { + // unreachable + return task.Fail().With("cannot extract list objs: %v", err) } - ctx.SetPod(&pod) + slices.SortFunc(objs, func(a, b PT) int { + return cmp.Compare(a.GetName(), b.GetName()) + }) + + w.Set(objs) - return task.Complete().With("pod is set") + return task.Complete().With("peers is set") }) } -type PDSliceContextSetter interface { - context.Context - ClusterKey() types.NamespacedName - SetPDSlice(pds []*v1alpha1.PD) +func TaskContextPD(state PDStateInitializer, c client.Client) task.Task { + w := state.PDInitializer() + return taskContextResource("PD", w, c, false) } -// TODO: combine with pd slice context in PDGroup controller -func TaskContextPDSlice(ctx PDSliceContextSetter, c client.Client) task.Task { - return task.NameTaskFunc("ContextPDSlice", func() task.Result { - var pdl v1alpha1.PDList - ck := ctx.ClusterKey() - if err := c.List(ctx, &pdl, client.InNamespace(ck.Namespace), client.MatchingLabels{ - v1alpha1.LabelKeyManagedBy: v1alpha1.LabelValManagedByOperator, - v1alpha1.LabelKeyCluster: ck.Name, - v1alpha1.LabelKeyComponent: v1alpha1.LabelValComponentPD, - }); err != nil { - return task.Fail().With("cannot list pd peers: %v", err) - } - - peers := []*v1alpha1.PD{} - for i := range pdl.Items { - peers = append(peers, &pdl.Items[i]) - } - slices.SortFunc(peers, func(a, b *v1alpha1.PD) int { - return cmp.Compare(a.Name, b.Name) - }) - - ctx.SetPDSlice(peers) +func TaskContextCluster(state ClusterStateInitializer, c client.Client) task.Task { + w := state.ClusterInitializer() + return taskContextResource("Cluster", w, c, true) +} - return task.Complete().With("peers is set") - }) +func TaskContextPod(state PodStateInitializer, c client.Client) task.Task { + w := state.PodInitializer() + return taskContextResource("Pod", w, c, false) } -type PodContext interface { - context.Context - PodGetter +func TaskContextPDSlice(state PDSliceStateInitializer, c client.Client) task.Task { + w := state.PDSliceInitializer() + return taskContextResourceSlice[v1alpha1.PD, v1alpha1.PDList]("PDSlice", w, c) } -func TaskSuspendPod(ctx PodContext, c client.Client) task.Task { - return task.NameTaskFunc("SuspendPod", func() task.Result { - pod := ctx.GetPod() +func TaskSuspendPod(state PodState, c client.Client) task.Task { + return task.NameTaskFunc("SuspendPod", func(ctx context.Context) task.Result { + pod := state.Pod() if pod == nil { return task.Complete().With("pod has been deleted") } @@ -148,6 +122,9 @@ func TaskSuspendPod(ctx PodContext, c client.Client) task.Task { return task.Complete().With("pod has been terminating") } if err := c.Delete(ctx, pod); err != nil { + if errors.IsNotFound(err) { + return task.Complete().With("pod is deleted") + } return task.Fail().With("can't delete pod %s/%s: %v", pod.Namespace, pod.Name, err) } diff --git a/pkg/controllers/common/task_test.go b/pkg/controllers/common/task_test.go new file mode 100644 index 0000000000..77b5be7968 --- /dev/null +++ b/pkg/controllers/common/task_test.go @@ -0,0 +1,383 @@ +package common + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/pingcap/tidb-operator/apis/core/v1alpha1" + "github.com/pingcap/tidb-operator/pkg/client" + "github.com/pingcap/tidb-operator/pkg/utils/fake" + "github.com/pingcap/tidb-operator/pkg/utils/task/v3" +) + +func TestTaskContextPD(t *testing.T) { + cases := []struct { + desc string + state *fakeState[v1alpha1.PD] + objs []client.Object + unexpectedErr bool + + expectedResult task.Status + expectedObj *v1alpha1.PD + }{ + { + desc: "success", + state: &fakeState[v1alpha1.PD]{ + ns: "aaa", + name: "aaa", + }, + objs: []client.Object{ + fake.FakeObj("aaa", fake.SetNamespace[v1alpha1.PD]("aaa")), + }, + expectedResult: task.SComplete, + expectedObj: fake.FakeObj("aaa", fake.SetNamespace[v1alpha1.PD]("aaa")), + }, + { + desc: "not found", + state: &fakeState[v1alpha1.PD]{ + ns: "aaa", + name: "aaa", + }, + expectedResult: task.SComplete, + }, + { + desc: "has unexpected error", + state: &fakeState[v1alpha1.PD]{ + ns: "aaa", + name: "aaa", + }, + objs: []client.Object{ + fake.FakeObj("aaa", fake.SetNamespace[v1alpha1.PD]("aaa")), + }, + unexpectedErr: true, + expectedResult: task.SFail, + }, + } + + for i := range cases { + c := &cases[i] + t.Run(c.desc, func(tt *testing.T) { + tt.Parallel() + + fc := client.NewFakeClient(c.objs...) + + if c.unexpectedErr { + fc.WithError("*", "*", errors.NewInternalError(fmt.Errorf("fake internal err"))) + } + + s := &fakePDState{s: c.state} + res, done := task.RunTask(context.Background(), TaskContextPD(s, fc)) + assert.Equal(tt, c.expectedResult, res.Status(), c.desc) + assert.False(tt, done, c.desc) + assert.Equal(tt, c.expectedObj, c.state.obj, c.desc) + }) + } +} + +func TestTaskContextCluster(t *testing.T) { + cases := []struct { + desc string + state *fakeState[v1alpha1.Cluster] + objs []client.Object + unexpectedErr bool + + expectedResult task.Status + expectedObj *v1alpha1.Cluster + }{ + { + desc: "success", + state: &fakeState[v1alpha1.Cluster]{ + ns: "aaa", + name: "aaa", + }, + objs: []client.Object{ + fake.FakeObj("aaa", fake.SetNamespace[v1alpha1.Cluster]("aaa")), + }, + expectedResult: task.SComplete, + expectedObj: fake.FakeObj("aaa", fake.SetNamespace[v1alpha1.Cluster]("aaa")), + }, + { + desc: "not found", + state: &fakeState[v1alpha1.Cluster]{ + ns: "aaa", + name: "aaa", + }, + expectedResult: task.SFail, + }, + { + desc: "has unexpected error", + state: &fakeState[v1alpha1.Cluster]{ + ns: "aaa", + name: "aaa", + }, + objs: []client.Object{ + fake.FakeObj("aaa", fake.SetNamespace[v1alpha1.Cluster]("aaa")), + }, + unexpectedErr: true, + expectedResult: task.SFail, + }, + } + + for i := range cases { + c := &cases[i] + t.Run(c.desc, func(tt *testing.T) { + tt.Parallel() + + fc := client.NewFakeClient(c.objs...) + + if c.unexpectedErr { + fc.WithError("*", "*", errors.NewInternalError(fmt.Errorf("fake internal err"))) + } + s := &fakeClusterState{s: c.state} + + res, done := task.RunTask(context.Background(), TaskContextCluster(s, fc)) + assert.Equal(tt, c.expectedResult, res.Status(), c.desc) + assert.False(tt, done, c.desc) + assert.Equal(tt, c.expectedObj, c.state.obj, c.desc) + }) + } +} + +func TestTaskContextPod(t *testing.T) { + cases := []struct { + desc string + state *fakeState[corev1.Pod] + objs []client.Object + unexpectedErr bool + + expectedResult task.Status + expectedObj *corev1.Pod + }{ + { + desc: "success", + state: &fakeState[corev1.Pod]{ + ns: "aaa", + name: "aaa", + }, + objs: []client.Object{ + fake.FakeObj("aaa", fake.SetNamespace[corev1.Pod]("aaa")), + }, + expectedResult: task.SComplete, + expectedObj: fake.FakeObj("aaa", fake.SetNamespace[corev1.Pod]("aaa")), + }, + { + desc: "not found", + state: &fakeState[corev1.Pod]{ + ns: "aaa", + name: "aaa", + }, + expectedResult: task.SComplete, + }, + { + desc: "has unexpected error", + state: &fakeState[corev1.Pod]{ + ns: "aaa", + name: "aaa", + }, + objs: []client.Object{ + fake.FakeObj("aaa", fake.SetNamespace[corev1.Pod]("aaa")), + }, + unexpectedErr: true, + expectedResult: task.SFail, + }, + } + + for i := range cases { + c := &cases[i] + t.Run(c.desc, func(tt *testing.T) { + tt.Parallel() + + fc := client.NewFakeClient(c.objs...) + + if c.unexpectedErr { + fc.WithError("*", "*", errors.NewInternalError(fmt.Errorf("fake internal err"))) + } + + s := &fakePodState{s: c.state} + res, done := task.RunTask(context.Background(), TaskContextPod(s, fc)) + assert.Equal(tt, c.expectedResult, res.Status(), c.desc) + assert.False(tt, done, c.desc) + assert.Equal(tt, c.expectedObj, c.state.obj, c.desc) + }) + } +} + +func TestTaskContextPDSlice(t *testing.T) { + cases := []struct { + desc string + state *fakeSliceState[v1alpha1.PD] + objs []client.Object + unexpectedErr bool + + expectedResult task.Status + expectedObjs []*v1alpha1.PD + }{ + { + desc: "success", + state: &fakeSliceState[v1alpha1.PD]{ + ns: "aaa", + labels: map[string]string{ + "xxx": "yyy", + }, + }, + objs: []client.Object{ + fake.FakeObj("aaa", fake.SetNamespace[v1alpha1.PD]("aaa"), fake.Label[v1alpha1.PD]("xxx", "yyy")), + fake.FakeObj("bbb", fake.SetNamespace[v1alpha1.PD]("aaa"), fake.Label[v1alpha1.PD]("xxx", "yyy"), fake.Label[v1alpha1.PD]("aaa", "bbb")), + // ns is mismatched + fake.FakeObj("ccc", fake.SetNamespace[v1alpha1.PD]("bbb"), fake.Label[v1alpha1.PD]("xxx", "yyy")), + // labels are mismatched + fake.FakeObj("ddd", fake.SetNamespace[v1alpha1.PD]("aaa"), fake.Label[v1alpha1.PD]("xxx", "zzz")), + }, + expectedResult: task.SComplete, + expectedObjs: []*v1alpha1.PD{ + fake.FakeObj("aaa", fake.SetNamespace[v1alpha1.PD]("aaa"), fake.Label[v1alpha1.PD]("xxx", "yyy")), + fake.FakeObj("bbb", fake.SetNamespace[v1alpha1.PD]("aaa"), fake.Label[v1alpha1.PD]("xxx", "yyy"), fake.Label[v1alpha1.PD]("aaa", "bbb")), + }, + }, + { + desc: "need be sorted", + state: &fakeSliceState[v1alpha1.PD]{ + ns: "aaa", + labels: map[string]string{ + "xxx": "yyy", + }, + }, + objs: []client.Object{ + fake.FakeObj("bbb", fake.SetNamespace[v1alpha1.PD]("aaa"), fake.Label[v1alpha1.PD]("xxx", "yyy"), fake.Label[v1alpha1.PD]("aaa", "bbb")), + fake.FakeObj("aaa", fake.SetNamespace[v1alpha1.PD]("aaa"), fake.Label[v1alpha1.PD]("xxx", "yyy")), + // ns is mismatched + fake.FakeObj("ccc", fake.SetNamespace[v1alpha1.PD]("bbb"), fake.Label[v1alpha1.PD]("xxx", "yyy")), + // labels are mismatched + fake.FakeObj("ddd", fake.SetNamespace[v1alpha1.PD]("aaa"), fake.Label[v1alpha1.PD]("xxx", "zzz")), + }, + expectedResult: task.SComplete, + expectedObjs: []*v1alpha1.PD{ + fake.FakeObj("aaa", fake.SetNamespace[v1alpha1.PD]("aaa"), fake.Label[v1alpha1.PD]("xxx", "yyy")), + fake.FakeObj("bbb", fake.SetNamespace[v1alpha1.PD]("aaa"), fake.Label[v1alpha1.PD]("xxx", "yyy"), fake.Label[v1alpha1.PD]("aaa", "bbb")), + }, + }, + { + desc: "unexpected err", + state: &fakeSliceState[v1alpha1.PD]{ + ns: "aaa", + labels: map[string]string{ + "xxx": "yyy", + }, + }, + unexpectedErr: true, + objs: []client.Object{ + fake.FakeObj("bbb", fake.SetNamespace[v1alpha1.PD]("aaa"), fake.Label[v1alpha1.PD]("xxx", "yyy"), fake.Label[v1alpha1.PD]("aaa", "bbb")), + fake.FakeObj("aaa", fake.SetNamespace[v1alpha1.PD]("aaa"), fake.Label[v1alpha1.PD]("xxx", "yyy")), + // ns is mismatched + fake.FakeObj("ccc", fake.SetNamespace[v1alpha1.PD]("bbb"), fake.Label[v1alpha1.PD]("xxx", "yyy")), + // labels are mismatched + fake.FakeObj("ddd", fake.SetNamespace[v1alpha1.PD]("aaa"), fake.Label[v1alpha1.PD]("xxx", "zzz")), + }, + expectedResult: task.SFail, + }, + } + + for i := range cases { + c := &cases[i] + t.Run(c.desc, func(tt *testing.T) { + tt.Parallel() + + fc := client.NewFakeClient(c.objs...) + + if c.unexpectedErr { + fc.WithError("*", "*", errors.NewInternalError(fmt.Errorf("fake internal err"))) + } + + s := &fakePDSliceState{s: c.state} + res, done := task.RunTask(context.Background(), TaskContextPDSlice(s, fc)) + assert.Equal(tt, c.expectedResult, res.Status(), c.desc) + assert.False(tt, done, c.desc) + assert.Equal(tt, c.expectedObjs, c.state.objs, c.desc) + }) + } +} + +func TestTaskSuspendPod(t *testing.T) { + now := metav1.Now() + cases := []struct { + desc string + state *fakeState[corev1.Pod] + objs []client.Object + unexpectedErr bool + + expectedResult task.Status + expectedObj *corev1.Pod + }{ + { + desc: "pod is nil", + state: &fakeState[corev1.Pod]{}, + expectedResult: task.SComplete, + }, + { + desc: "pod is deleting", + state: &fakeState[corev1.Pod]{ + obj: fake.FakeObj("aaa", fake.DeleteTimestamp[corev1.Pod](&now)), + }, + expectedResult: task.SComplete, + expectedObj: fake.FakeObj("aaa", fake.DeleteTimestamp[corev1.Pod](&now)), + }, + { + // means pod has been fully deleted after it is fetched previously + desc: "pod is deleted", + state: &fakeState[corev1.Pod]{ + obj: fake.FakeObj[corev1.Pod]("aaa"), + }, + expectedResult: task.SComplete, + expectedObj: fake.FakeObj[corev1.Pod]("aaa"), + }, + { + desc: "delete pod", + state: &fakeState[corev1.Pod]{ + obj: fake.FakeObj[corev1.Pod]("aaa"), + }, + objs: []client.Object{ + fake.FakeObj[corev1.Pod]("aaa"), + }, + expectedResult: task.SWait, + expectedObj: fake.FakeObj[corev1.Pod]("aaa"), + }, + { + desc: "delete pod with unexpected err", + state: &fakeState[corev1.Pod]{ + obj: fake.FakeObj[corev1.Pod]("aaa"), + }, + objs: []client.Object{ + fake.FakeObj[corev1.Pod]("aaa"), + }, + unexpectedErr: true, + expectedResult: task.SFail, + }, + } + + for i := range cases { + c := &cases[i] + t.Run(c.desc, func(tt *testing.T) { + tt.Parallel() + + fc := client.NewFakeClient(c.objs...) + + if c.unexpectedErr { + fc.WithError("*", "*", errors.NewInternalError(fmt.Errorf("fake internal err"))) + } + + s := &fakePodState{s: c.state} + res, done := task.RunTask(context.Background(), TaskSuspendPod(s, fc)) + assert.Equal(tt, c.expectedResult, res.Status(), c.desc) + assert.False(tt, done, c.desc) + if !c.unexpectedErr { + assert.Equal(tt, c.expectedObj, c.state.obj, c.desc) + } + }) + } +} diff --git a/pkg/controllers/pd/builder.go b/pkg/controllers/pd/builder.go index 9f88830f40..080b078f9b 100644 --- a/pkg/controllers/pd/builder.go +++ b/pkg/controllers/pd/builder.go @@ -20,45 +20,45 @@ import ( "github.com/pingcap/tidb-operator/pkg/utils/task/v3" ) -func (r *Reconciler) NewRunner(ctx *tasks.ReconcileContext, reporter task.TaskReporter) task.TaskRunner { +func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.TaskReporter) task.TaskRunner { runner := task.NewTaskRunner(reporter, // get pd - common.TaskContextPD(ctx, r.Client), + common.TaskContextPD(state, r.Client), // if it's deleted just return - task.IfBreak(common.CondPDHasBeenDeleted(ctx)), + task.IfBreak(common.CondPDHasBeenDeleted(state)), // get info from pd - tasks.TaskContextInfoFromPD(ctx, r.PDClientManager), - task.IfBreak(common.CondPDIsDeleting(ctx), - tasks.TaskFinalizerDel(ctx, r.Client), + tasks.TaskContextInfoFromPD(state, r.PDClientManager), + task.IfBreak(common.CondPDIsDeleting(state), + tasks.TaskFinalizerDel(state, r.Client), ), // get cluster and check whether it's paused - common.TaskContextCluster(ctx, r.Client), + common.TaskContextCluster(state, r.Client), task.IfBreak( - common.CondClusterIsPaused(ctx), + common.CondClusterIsPaused(state), ), // get pod and check whether the cluster is suspending - common.TaskContextPod(ctx, r.Client), + common.TaskContextPod(state, r.Client), task.IfBreak( - common.CondClusterIsSuspending(ctx), - tasks.TaskFinalizerAdd(ctx, r.Client), - common.TaskSuspendPod(ctx, r.Client), + common.CondClusterIsSuspending(state), + tasks.TaskFinalizerAdd(state, r.Client), + common.TaskSuspendPod(state, r.Client), // TODO: extract as a common task - tasks.TaskStatusSuspend(ctx, r.Client), + tasks.TaskStatusSuspend(state, r.Client), ), - tasks.TaskContextPeers(ctx, r.Client), - tasks.TaskFinalizerAdd(ctx, r.Client), - tasks.TaskConfigMap(ctx, r.Logger, r.Client), - tasks.TaskPVC(ctx, r.Logger, r.Client, r.VolumeModifier), - tasks.TaskPod(ctx, r.Logger, r.Client), + common.TaskContextPDSlice(state, r.Client), + tasks.TaskFinalizerAdd(state, r.Client), + tasks.TaskConfigMap(state, r.Logger, r.Client), + tasks.TaskPVC(state, r.Logger, r.Client, r.VolumeModifier), + tasks.TaskPod(state, r.Logger, r.Client), // If pd client has not been registered yet, do not update status of the pd - task.IfBreak(tasks.CondPDClientIsNotRegisterred(ctx), + task.IfBreak(tasks.CondPDClientIsNotRegisterred(state), tasks.TaskStatusUnknown(), ), - tasks.TaskStatus(ctx, r.Logger, r.Client), + tasks.TaskStatus(state, r.Logger, r.Client), ) return runner diff --git a/pkg/controllers/pd/controller.go b/pkg/controllers/pd/controller.go index d41237708f..e92957236a 100644 --- a/pkg/controllers/pd/controller.go +++ b/pkg/controllers/pd/controller.go @@ -75,12 +75,10 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu }() rtx := &tasks.ReconcileContext{ - // some fields will be set in the context task - Context: ctx, - Key: req.NamespacedName, + Key: req.NamespacedName, } runner := r.NewRunner(rtx, reporter) - return runner.Run() + return runner.Run(ctx) } diff --git a/pkg/controllers/pd/tasks/cm.go b/pkg/controllers/pd/tasks/cm.go index 0845bd4cd1..ec29684d85 100644 --- a/pkg/controllers/pd/tasks/cm.go +++ b/pkg/controllers/pd/tasks/cm.go @@ -15,6 +15,8 @@ package tasks import ( + "context" + "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -28,17 +30,17 @@ import ( "github.com/pingcap/tidb-operator/pkg/utils/toml" ) -func TaskConfigMap(ctx *ReconcileContext, _ logr.Logger, c client.Client) task.Task { - return task.NameTaskFunc("ConfigMap", func() task.Result { +func TaskConfigMap(state *ReconcileContext, _ logr.Logger, c client.Client) task.Task { + return task.NameTaskFunc("ConfigMap", func(ctx context.Context) task.Result { // TODO: DON'T add bootstrap config back // We need to check current config and forbid adding bootstrap cfg back cfg := pdcfg.Config{} decoder, encoder := toml.Codec[pdcfg.Config]() - if err := decoder.Decode([]byte(ctx.PD.Spec.Config), &cfg); err != nil { + if err := decoder.Decode([]byte(state.PD().Spec.Config), &cfg); err != nil { return task.Fail().With("pd config cannot be decoded: %v", err) } - if err := cfg.Overlay(ctx.Cluster, ctx.PD, ctx.Peers); err != nil { + if err := cfg.Overlay(state.Cluster(), state.PD(), state.PDSlice()); err != nil { return task.Fail().With("cannot generate pd config: %v", err) } @@ -47,12 +49,12 @@ func TaskConfigMap(ctx *ReconcileContext, _ logr.Logger, c client.Client) task.T return task.Fail().With("pd config cannot be encoded: %v", err) } - hash, err := hasher.GenerateHash(ctx.PD.Spec.Config) + hash, err := hasher.GenerateHash(state.PD().Spec.Config) if err != nil { return task.Fail().With("failed to generate hash for `pd.spec.config`: %v", err) } - ctx.ConfigHash = hash - expected := newConfigMap(ctx.PD, data, ctx.ConfigHash) + state.ConfigHash = hash + expected := newConfigMap(state.PD(), data, state.ConfigHash) if err := c.Apply(ctx, expected); err != nil { return task.Fail().With("can't create/update the cm of pd: %v", err) } diff --git a/pkg/controllers/pd/tasks/ctx.go b/pkg/controllers/pd/tasks/ctx.go index b6f620761d..3d5c0a616e 100644 --- a/pkg/controllers/pd/tasks/ctx.go +++ b/pkg/controllers/pd/tasks/ctx.go @@ -15,22 +15,18 @@ package tasks import ( - "cmp" "context" - "slices" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" - "github.com/pingcap/tidb-operator/apis/core/v1alpha1" - "github.com/pingcap/tidb-operator/pkg/client" pdm "github.com/pingcap/tidb-operator/pkg/timanager/pd" "github.com/pingcap/tidb-operator/pkg/utils/task/v3" ) type ReconcileContext struct { - context.Context + // TODO: replace all fields in ReconcileContext by State + State Key types.NamespacedName PDClient pdm.PDClient @@ -42,11 +38,6 @@ type ReconcileContext struct { MemberID string IsLeader bool - PD *v1alpha1.PD - Peers []*v1alpha1.PD - Cluster *v1alpha1.Cluster - Pod *corev1.Pod - // ConfigHash stores the hash of **user-specified** config (i.e.`.Spec.Config`), // which will be used to determine whether the config has changed. // This ensures that our config overlay logic will not restart the tidb cluster unexpectedly. @@ -57,68 +48,23 @@ type ReconcileContext struct { PodIsTerminating bool } -func (ctx *ReconcileContext) PDKey() types.NamespacedName { - return ctx.Key -} - -func (ctx *ReconcileContext) SetPD(pd *v1alpha1.PD) { - ctx.PD = pd -} - -func (ctx *ReconcileContext) GetPD() *v1alpha1.PD { - return ctx.PD -} - -func (ctx *ReconcileContext) ClusterKey() types.NamespacedName { - return types.NamespacedName{ - Namespace: ctx.PD.Namespace, - Name: ctx.PD.Spec.Cluster.Name, - } -} - -func (ctx *ReconcileContext) GetCluster() *v1alpha1.Cluster { - return ctx.Cluster -} - -func (ctx *ReconcileContext) SetCluster(c *v1alpha1.Cluster) { - ctx.Cluster = c -} - -func (ctx *ReconcileContext) PodKey() types.NamespacedName { - return types.NamespacedName{ - Namespace: ctx.PD.Namespace, - Name: ctx.PD.PodName(), - } -} - -func (ctx *ReconcileContext) GetPod() *corev1.Pod { - return ctx.Pod -} - -func (ctx *ReconcileContext) SetPod(pod *corev1.Pod) { - ctx.Pod = pod - if !pod.DeletionTimestamp.IsZero() { - ctx.PodIsTerminating = true - } -} - -func TaskContextInfoFromPD(ctx *ReconcileContext, cm pdm.PDClientManager) task.Task { - return task.NameTaskFunc("ContextInfoFromPD", func() task.Result { - ck := ctx.ClusterKey() +func TaskContextInfoFromPD(state *ReconcileContext, cm pdm.PDClientManager) task.Task { + return task.NameTaskFunc("ContextInfoFromPD", func(ctx context.Context) task.Result { + ck := state.Cluster() pc, ok := cm.Get(pdm.PrimaryKey(ck.Namespace, ck.Name)) if !ok { return task.Wait().With("pd client has not been registered yet") } - ctx.PDClient = pc + state.PDClient = pc if !pc.HasSynced() { return task.Complete().With("context without member info is completed, cache of pd info is not synced") } - ctx.Initialized = true + state.Initialized = true - m, err := pc.Members().Get(ctx.PD.Name) + m, err := pc.Members().Get(state.PD().Name) if err != nil { if errors.IsNotFound(err) { return task.Complete().With("context without member info is completed, pd is not initialized") @@ -126,45 +72,22 @@ func TaskContextInfoFromPD(ctx *ReconcileContext, cm pdm.PDClientManager) task.T return task.Fail().With("cannot get member: %w", err) } - ctx.MemberID = m.ID - ctx.IsLeader = m.IsLeader + state.MemberID = m.ID + state.IsLeader = m.IsLeader // set available and trust health info only when member info is valid if !m.Invalid { - ctx.IsAvailable = true - ctx.Healthy = m.Health + state.IsAvailable = true + state.Healthy = m.Health } return task.Complete().With("pd is ready") }) } -func TaskContextPeers(ctx *ReconcileContext, c client.Client) task.Task { - return task.NameTaskFunc("ContextPeers", func() task.Result { - var pdl v1alpha1.PDList - if err := c.List(ctx, &pdl, client.InNamespace(ctx.PD.Namespace), client.MatchingLabels{ - v1alpha1.LabelKeyManagedBy: v1alpha1.LabelValManagedByOperator, - v1alpha1.LabelKeyCluster: ctx.Cluster.Name, - v1alpha1.LabelKeyComponent: v1alpha1.LabelValComponentPD, - }); err != nil { - return task.Fail().With("cannot list pd peers: %w", err) - } - - peers := []*v1alpha1.PD{} - for i := range pdl.Items { - peers = append(peers, &pdl.Items[i]) - } - slices.SortFunc(peers, func(a, b *v1alpha1.PD) int { - return cmp.Compare(a.Name, b.Name) - }) - ctx.Peers = peers - return task.Complete().With("peers is set") - }) -} - -func CondPDClientIsNotRegisterred(ctx *ReconcileContext) task.Condition { +func CondPDClientIsNotRegisterred(state *ReconcileContext) task.Condition { return task.CondFunc(func() bool { // TODO: do not use HasSynced twice, it may return different results - return ctx.PDClient == nil || !ctx.PDClient.HasSynced() + return state.PDClient == nil || !state.PDClient.HasSynced() }) } diff --git a/pkg/controllers/pd/tasks/finalizer.go b/pkg/controllers/pd/tasks/finalizer.go index 17199a9351..8e59e42c29 100644 --- a/pkg/controllers/pd/tasks/finalizer.go +++ b/pkg/controllers/pd/tasks/finalizer.go @@ -26,17 +26,17 @@ import ( "github.com/pingcap/tidb-operator/pkg/utils/task/v3" ) -func TaskFinalizerDel(ctx *ReconcileContext, c client.Client) task.Task { - return task.NameTaskFunc("FinalizerDel", func() task.Result { +func TaskFinalizerDel(state *ReconcileContext, c client.Client) task.Task { + return task.NameTaskFunc("FinalizerDel", func(ctx context.Context) task.Result { switch { // get member info successfully and the member still exists - case ctx.IsAvailable && ctx.MemberID != "": + case state.IsAvailable && state.MemberID != "": // TODO: check whether quorum will be lost? - if err := ctx.PDClient.Underlay().DeleteMember(ctx, ctx.PD.Name); err != nil { + if err := state.PDClient.Underlay().DeleteMember(ctx, state.PD().Name); err != nil { return task.Fail().With("cannot delete member: %v", err) } - wait, err := EnsureSubResourcesDeleted(ctx, c, ctx.PD) + wait, err := EnsureSubResourcesDeleted(ctx, c, state.PD()) if err != nil { return task.Fail().With("cannot delete subresources: %v", err) } @@ -45,11 +45,11 @@ func TaskFinalizerDel(ctx *ReconcileContext, c client.Client) task.Task { return task.Wait().With("wait all subresources deleted") } - if err := k8s.RemoveFinalizer(ctx, c, ctx.PD); err != nil { + if err := k8s.RemoveFinalizer(ctx, c, state.PD()); err != nil { return task.Fail().With("cannot remove finalizer: %v", err) } - case ctx.IsAvailable: - wait, err := EnsureSubResourcesDeleted(ctx, c, ctx.PD) + case state.IsAvailable: + wait, err := EnsureSubResourcesDeleted(ctx, c, state.PD()) if err != nil { return task.Fail().With("cannot delete subresources: %v", err) } @@ -57,10 +57,10 @@ func TaskFinalizerDel(ctx *ReconcileContext, c client.Client) task.Task { return task.Wait().With("wait all subresources deleted") } - if err := k8s.RemoveFinalizer(ctx, c, ctx.PD); err != nil { + if err := k8s.RemoveFinalizer(ctx, c, state.PD()); err != nil { return task.Fail().With("cannot remove finalizer: %v", err) } - case !ctx.IsAvailable: + case !state.IsAvailable: // it may block some unsafe operations return task.Fail().With("pd cluster is not available") } @@ -69,9 +69,9 @@ func TaskFinalizerDel(ctx *ReconcileContext, c client.Client) task.Task { }) } -func TaskFinalizerAdd(ctx *ReconcileContext, c client.Client) task.Task { - return task.NameTaskFunc("FinalizerAdd", func() task.Result { - if err := k8s.EnsureFinalizer(ctx, c, ctx.PD); err != nil { +func TaskFinalizerAdd(state *ReconcileContext, c client.Client) task.Task { + return task.NameTaskFunc("FinalizerAdd", func(ctx context.Context) task.Result { + if err := k8s.EnsureFinalizer(ctx, c, state.PD()); err != nil { return task.Fail().With("failed to ensure finalizer has been added: %v", err) } return task.Complete().With("finalizer is added") diff --git a/pkg/controllers/pd/tasks/pod.go b/pkg/controllers/pd/tasks/pod.go index 922465e227..a569a10328 100644 --- a/pkg/controllers/pd/tasks/pod.go +++ b/pkg/controllers/pd/tasks/pod.go @@ -39,36 +39,36 @@ const ( defaultReadinessProbeInitialDelaySeconds = 5 ) -func TaskPod(ctx *ReconcileContext, logger logr.Logger, c client.Client) task.Task { - return task.NameTaskFunc("Pod", func() task.Result { - expected := newPod(ctx.Cluster, ctx.PD, ctx.ConfigHash) - if ctx.Pod == nil { +func TaskPod(state *ReconcileContext, logger logr.Logger, c client.Client) task.Task { + return task.NameTaskFunc("Pod", func(ctx context.Context) task.Result { + expected := newPod(state.Cluster(), state.PD(), state.ConfigHash) + if state.Pod() == nil { // We have to refresh cache of members to make sure a pd without pod is unhealthy. // If the healthy info is out of date, the operator may mark this pd up-to-date unexpectedly // and begin to update the next PD. - if ctx.Healthy { - ctx.PDClient.Members().Refresh() + if state.Healthy { + state.PDClient.Members().Refresh() return task.Wait().With("wait until pd's status becomes unhealthy") } if err := c.Apply(ctx, expected); err != nil { return task.Fail().With("can't create pod of pd: %v", err) } - ctx.SetPod(expected) + state.SetPod(expected) return task.Complete().With("pod is synced") } - res := k8s.ComparePods(ctx.Pod, expected) - curHash, expectHash := ctx.Pod.Labels[v1alpha1.LabelKeyConfigHash], expected.Labels[v1alpha1.LabelKeyConfigHash] + res := k8s.ComparePods(state.Pod(), expected) + curHash, expectHash := state.Pod().Labels[v1alpha1.LabelKeyConfigHash], expected.Labels[v1alpha1.LabelKeyConfigHash] configChanged := curHash != expectHash logger.Info("compare pod", "result", res, "configChanged", configChanged, "currentConfigHash", curHash, "expectConfigHash", expectHash) if res == k8s.CompareResultRecreate || - (configChanged && ctx.PD.Spec.UpdateStrategy.Config == v1alpha1.ConfigUpdateStrategyRestart) { + (configChanged && state.PD().Spec.UpdateStrategy.Config == v1alpha1.ConfigUpdateStrategyRestart) { // NOTE: both rtx.Healthy and rtx.Pod are not always newest // So pre delete check may also be skipped in some cases, for example, // the PD is just started. - if ctx.Healthy || statefulset.IsPodReady(ctx.Pod) { - wait, err := preDeleteCheck(ctx, logger, ctx.PDClient, ctx.PD, ctx.Peers, ctx.IsLeader) + if state.Healthy || statefulset.IsPodReady(state.Pod()) { + wait, err := preDeleteCheck(ctx, logger, state.PDClient, state.PD(), state.PDSlice(), state.IsLeader) if err != nil { return task.Fail().With("can't delete pod of pd: %v", err) } @@ -78,13 +78,13 @@ func TaskPod(ctx *ReconcileContext, logger logr.Logger, c client.Client) task.Ta } } - logger.Info("will delete the pod to recreate", "name", ctx.Pod.Name, "namespace", ctx.Pod.Namespace, "UID", ctx.Pod.UID) + logger.Info("will delete the pod to recreate", "name", state.Pod().Name, "namespace", state.Pod().Namespace, "UID", state.Pod().UID) - if err := c.Delete(ctx, ctx.Pod); err != nil { + if err := c.Delete(ctx, state.Pod()); err != nil { return task.Fail().With("can't delete pod of pd: %v", err) } - ctx.PodIsTerminating = true + state.PodIsTerminating = true return task.Complete().With("pod is deleting") } else if res == k8s.CompareResultUpdate { @@ -92,7 +92,7 @@ func TaskPod(ctx *ReconcileContext, logger logr.Logger, c client.Client) task.Ta if err := c.Apply(ctx, expected); err != nil { return task.Fail().With("can't apply pod of pd: %v", err) } - ctx.SetPod(expected) + state.SetPod(expected) } return task.Complete().With("pod is synced") diff --git a/pkg/controllers/pd/tasks/pvc.go b/pkg/controllers/pd/tasks/pvc.go index 91b9224904..e9e44d2ceb 100644 --- a/pkg/controllers/pd/tasks/pvc.go +++ b/pkg/controllers/pd/tasks/pvc.go @@ -15,6 +15,8 @@ package tasks import ( + "context" + "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -26,9 +28,9 @@ import ( "github.com/pingcap/tidb-operator/pkg/volumes" ) -func TaskPVC(ctx *ReconcileContext, logger logr.Logger, c client.Client, vm volumes.Modifier) task.Task { - return task.NameTaskFunc("PVC", func() task.Result { - pvcs := newPVCs(ctx.PD) +func TaskPVC(state *ReconcileContext, logger logr.Logger, c client.Client, vm volumes.Modifier) task.Task { + return task.NameTaskFunc("PVC", func(ctx context.Context) task.Result { + pvcs := newPVCs(state.PD()) if wait, err := volumes.SyncPVCs(ctx, c, pvcs, vm, logger); err != nil { return task.Fail().With("failed to sync pvcs: %v", err) } else if wait { diff --git a/pkg/controllers/pd/tasks/state.go b/pkg/controllers/pd/tasks/state.go new file mode 100644 index 0000000000..7d3f632192 --- /dev/null +++ b/pkg/controllers/pd/tasks/state.go @@ -0,0 +1,97 @@ +package tasks + +import ( + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + + "github.com/pingcap/tidb-operator/apis/core/v1alpha1" + "github.com/pingcap/tidb-operator/pkg/controllers/common" +) + +type state struct { + key types.NamespacedName + + cluster *v1alpha1.Cluster + pd *v1alpha1.PD + pod *corev1.Pod + pds []*v1alpha1.PD +} + +type State interface { + common.PDStateInitializer + common.ClusterStateInitializer + common.PodStateInitializer + common.PDSliceStateInitializer + + common.PDState + common.ClusterState + common.PodState + common.PDSliceState + + SetPod(*corev1.Pod) +} + +func NewState(key types.NamespacedName) State { + s := &state{ + key: key, + } + return s +} + +func (s *state) PD() *v1alpha1.PD { + return s.pd +} + +func (s *state) Cluster() *v1alpha1.Cluster { + return s.cluster +} + +func (s *state) Pod() *corev1.Pod { + return s.pod +} + +func (s *state) SetPod(pod *corev1.Pod) { + s.pod = pod +} + +func (s *state) PDSlice() []*v1alpha1.PD { + return s.pds +} + +func (s *state) PDInitializer() common.PDInitializer { + return common.NewResource(func(pd *v1alpha1.PD) { s.pd = pd }). + WithNamespace(common.Namespace(s.key.Namespace)). + WithName(common.Name(s.key.Name)). + Initializer() +} + +func (s *state) ClusterInitializer() common.ClusterInitializer { + return common.NewResource(func(cluster *v1alpha1.Cluster) { s.cluster = cluster }). + WithNamespace(common.Namespace(s.key.Namespace)). + WithName(common.NameFunc(func() string { + return s.pd.Spec.Cluster.Name + })). + Initializer() +} + +func (s *state) PodInitializer() common.PodInitializer { + return common.NewResource(s.SetPod). + WithNamespace(common.Namespace(s.key.Namespace)). + WithName(common.NameFunc(func() string { + return s.pd.PodName() + })). + Initializer() +} + +func (s *state) PDSliceInitializer() common.PDSliceInitializer { + return common.NewResourceSlice(func(pds []*v1alpha1.PD) { s.pds = pds }). + WithNamespace(common.Namespace(s.key.Namespace)). + WithLabels(common.LabelsFunc(func() map[string]string { + return map[string]string{ + v1alpha1.LabelKeyManagedBy: v1alpha1.LabelValManagedByOperator, + v1alpha1.LabelKeyComponent: v1alpha1.LabelValComponentPD, + v1alpha1.LabelKeyCluster: s.cluster.Name, + } + })). + Initializer() +} diff --git a/pkg/controllers/pd/tasks/status.go b/pkg/controllers/pd/tasks/status.go index 96b301f29b..aa4fce8eac 100644 --- a/pkg/controllers/pd/tasks/status.go +++ b/pkg/controllers/pd/tasks/status.go @@ -15,6 +15,7 @@ package tasks import ( + "context" "time" "github.com/go-logr/logr" @@ -27,9 +28,9 @@ import ( "github.com/pingcap/tidb-operator/third_party/kubernetes/pkg/controller/statefulset" ) -func TaskStatusSuspend(ctx *ReconcileContext, c client.Client) task.Task { - return task.NameTaskFunc("StatusSuspend", func() task.Result { - ctx.PD.Status.ObservedGeneration = ctx.PD.Generation +func TaskStatusSuspend(state *ReconcileContext, c client.Client) task.Task { + return task.NameTaskFunc("StatusSuspend", func(ctx context.Context) task.Result { + state.PD().Status.ObservedGeneration = state.PD().Generation var ( suspendStatus = metav1.ConditionFalse suspendMessage = "pd is suspending" @@ -39,28 +40,28 @@ func TaskStatusSuspend(ctx *ReconcileContext, c client.Client) task.Task { healthMessage = "pd is not healthy" ) - if ctx.Pod == nil { + if state.Pod() == nil { suspendStatus = metav1.ConditionTrue suspendMessage = "pd is suspended" } - needUpdate := meta.SetStatusCondition(&ctx.PD.Status.Conditions, metav1.Condition{ + needUpdate := meta.SetStatusCondition(&state.PD().Status.Conditions, metav1.Condition{ Type: v1alpha1.PDCondSuspended, Status: suspendStatus, - ObservedGeneration: ctx.PD.Generation, + ObservedGeneration: state.PD().Generation, // TODO: use different reason for suspending and suspended Reason: v1alpha1.PDSuspendReason, Message: suspendMessage, }) - needUpdate = meta.SetStatusCondition(&ctx.PD.Status.Conditions, metav1.Condition{ + needUpdate = meta.SetStatusCondition(&state.PD().Status.Conditions, metav1.Condition{ Type: v1alpha1.PDCondHealth, Status: healthStatus, - ObservedGeneration: ctx.PD.Generation, + ObservedGeneration: state.PD().Generation, Reason: v1alpha1.PDHealthReason, Message: healthMessage, }) || needUpdate if needUpdate { - if err := c.Status().Update(ctx, ctx.PD); err != nil { + if err := c.Status().Update(ctx, state.PD()); err != nil { return task.Fail().With("cannot update status: %v", err) } } @@ -70,14 +71,14 @@ func TaskStatusSuspend(ctx *ReconcileContext, c client.Client) task.Task { } func TaskStatusUnknown() task.Task { - return task.NameTaskFunc("StatusUnknown", func() task.Result { + return task.NameTaskFunc("StatusUnknown", func(ctx context.Context) task.Result { return task.Wait().With("status of the pd is unknown") }) } //nolint:gocyclo // refactor if possible -func TaskStatus(ctx *ReconcileContext, _ logr.Logger, c client.Client) task.Task { - return task.NameTaskFunc("Status", func() task.Result { +func TaskStatus(state *ReconcileContext, _ logr.Logger, c client.Client) task.Task { + return task.NameTaskFunc("Status", func(ctx context.Context) task.Result { var ( healthStatus = metav1.ConditionFalse healthMessage = "pd is not healthy" @@ -88,58 +89,58 @@ func TaskStatus(ctx *ReconcileContext, _ logr.Logger, c client.Client) task.Task needUpdate = false ) - if ctx.MemberID != "" { - needUpdate = SetIfChanged(&ctx.PD.Status.ID, ctx.MemberID) || needUpdate + if state.MemberID != "" { + needUpdate = SetIfChanged(&state.PD().Status.ID, state.MemberID) || needUpdate } - needUpdate = SetIfChanged(&ctx.PD.Status.IsLeader, ctx.IsLeader) || needUpdate - needUpdate = syncInitializedCond(ctx.PD, ctx.Initialized) || needUpdate + needUpdate = SetIfChanged(&state.PD().Status.IsLeader, state.IsLeader) || needUpdate + needUpdate = syncInitializedCond(state.PD(), state.Initialized) || needUpdate - needUpdate = meta.SetStatusCondition(&ctx.PD.Status.Conditions, metav1.Condition{ + needUpdate = meta.SetStatusCondition(&state.PD().Status.Conditions, metav1.Condition{ Type: v1alpha1.PDCondSuspended, Status: suspendStatus, - ObservedGeneration: ctx.PD.Generation, + ObservedGeneration: state.PD().Generation, Reason: v1alpha1.PDSuspendReason, Message: suspendMessage, }) || needUpdate - needUpdate = SetIfChanged(&ctx.PD.Status.ObservedGeneration, ctx.PD.Generation) || needUpdate - needUpdate = SetIfChanged(&ctx.PD.Status.UpdateRevision, ctx.PD.Labels[v1alpha1.LabelKeyInstanceRevisionHash]) || needUpdate + needUpdate = SetIfChanged(&state.PD().Status.ObservedGeneration, state.PD().Generation) || needUpdate + needUpdate = SetIfChanged(&state.PD().Status.UpdateRevision, state.PD().Labels[v1alpha1.LabelKeyInstanceRevisionHash]) || needUpdate - if ctx.Pod == nil || ctx.PodIsTerminating { - ctx.Healthy = false - } else if statefulset.IsPodRunningAndReady(ctx.Pod) && ctx.Healthy { - if ctx.PD.Status.CurrentRevision != ctx.Pod.Labels[v1alpha1.LabelKeyInstanceRevisionHash] { - ctx.PD.Status.CurrentRevision = ctx.Pod.Labels[v1alpha1.LabelKeyInstanceRevisionHash] + if state.Pod() == nil || state.PodIsTerminating { + state.Healthy = false + } else if statefulset.IsPodRunningAndReady(state.Pod()) && state.Healthy { + if state.PD().Status.CurrentRevision != state.Pod().Labels[v1alpha1.LabelKeyInstanceRevisionHash] { + state.PD().Status.CurrentRevision = state.Pod().Labels[v1alpha1.LabelKeyInstanceRevisionHash] needUpdate = true } } else { - ctx.Healthy = false + state.Healthy = false } - if ctx.Healthy { + if state.Healthy { healthStatus = metav1.ConditionTrue healthMessage = "pd is healthy" } - needUpdate = meta.SetStatusCondition(&ctx.PD.Status.Conditions, metav1.Condition{ + needUpdate = meta.SetStatusCondition(&state.PD().Status.Conditions, metav1.Condition{ Type: v1alpha1.PDCondHealth, Status: healthStatus, - ObservedGeneration: ctx.PD.Generation, + ObservedGeneration: state.PD().Generation, Reason: v1alpha1.PDHealthReason, Message: healthMessage, }) || needUpdate if needUpdate { - if err := c.Status().Update(ctx, ctx.PD); err != nil { + if err := c.Status().Update(ctx, state.PD()); err != nil { return task.Fail().With("cannot update status: %v", err) } } - if ctx.PodIsTerminating { + if state.PodIsTerminating { //nolint:mnd // refactor to use a constant return task.Retry(5 * time.Second).With("pod is terminating, retry after it's terminated") } - if !ctx.Initialized || !ctx.Healthy { + if !state.Initialized || !state.Healthy { return task.Wait().With("pd may not be initialized or healthy, wait for next event") } diff --git a/pkg/utils/fake/fake.go b/pkg/utils/fake/fake.go index 9a0ec80502..bf96f4a4e4 100644 --- a/pkg/utils/fake/fake.go +++ b/pkg/utils/fake/fake.go @@ -78,14 +78,18 @@ func Annotation[T any, PT Object[T]](k, v string) ChangeFunc[T, PT] { } } -func SetDeleteTimestamp[T any, PT Object[T]]() ChangeFunc[T, PT] { +func DeleteTimestamp[T any, PT Object[T]](t *metav1.Time) ChangeFunc[T, PT] { return func(obj PT) PT { - now := metav1.Now() - obj.SetDeletionTimestamp(&now) + obj.SetDeletionTimestamp(t) return obj } } +func DeleteNow[T any, PT Object[T]]() ChangeFunc[T, PT] { + now := metav1.Now() + return DeleteTimestamp[T, PT](&now) +} + func AddFinalizer[T any, PT Object[T]]() ChangeFunc[T, PT] { return func(obj PT) PT { controllerutil.AddFinalizer(obj, v1alpha1.Finalizer) diff --git a/pkg/utils/task/v3/runner.go b/pkg/utils/task/v3/runner.go index ef98327b28..404dd8654f 100644 --- a/pkg/utils/task/v3/runner.go +++ b/pkg/utils/task/v3/runner.go @@ -15,6 +15,7 @@ package task import ( + "context" "fmt" "strings" @@ -24,7 +25,7 @@ import ( // TaskRunner is an executor to run a series of tasks sequentially type TaskRunner interface { - Run() (ctrl.Result, error) + Run(ctx context.Context) (ctrl.Result, error) } type taskRunner struct { @@ -52,8 +53,8 @@ func NewTaskRunner(reporter TaskReporter, ts ...Task) TaskRunner { } } -func (r *taskRunner) Run() (ctrl.Result, error) { - res, _ := r.task.sync() +func (r *taskRunner) Run(ctx context.Context) (ctrl.Result, error) { + res, _ := RunTask(ctx, r.task) r.reporter.AddResult(res) switch res.Status() { diff --git a/pkg/utils/task/v3/runner_test.go b/pkg/utils/task/v3/runner_test.go index c7d8d96182..8517dbebd2 100644 --- a/pkg/utils/task/v3/runner_test.go +++ b/pkg/utils/task/v3/runner_test.go @@ -15,6 +15,7 @@ package task import ( + "context" "testing" "github.com/stretchr/testify/assert" @@ -31,10 +32,10 @@ func TestTaskRunner(t *testing.T) { { desc: "a task fail", ts: []Task{ - NameTaskFunc("aaa", func() Result { + NameTaskFunc("aaa", func(context.Context) Result { return Complete().With("success") }), - NameTaskFunc("bbb", func() Result { + NameTaskFunc("bbb", func(context.Context) Result { return Fail().With("fail") }), }, @@ -43,10 +44,10 @@ func TestTaskRunner(t *testing.T) { { desc: "a retry task with 0 interval", ts: []Task{ - NameTaskFunc("aaa", func() Result { + NameTaskFunc("aaa", func(context.Context) Result { return Complete().With("success") }), - NameTaskFunc("bbb", func() Result { + NameTaskFunc("bbb", func(context.Context) Result { return Retry(0).With("retry") }), }, @@ -57,10 +58,10 @@ func TestTaskRunner(t *testing.T) { { desc: "a retry task with not 0 interval", ts: []Task{ - NameTaskFunc("aaa", func() Result { + NameTaskFunc("aaa", func(context.Context) Result { return Complete().With("success") }), - NameTaskFunc("bbb", func() Result { + NameTaskFunc("bbb", func(context.Context) Result { return Retry(5).With("retry") }), }, @@ -71,13 +72,13 @@ func TestTaskRunner(t *testing.T) { { desc: "all tasks are Complete or Wait", ts: []Task{ - NameTaskFunc("aaa", func() Result { + NameTaskFunc("aaa", func(context.Context) Result { return Complete().With("success") }), - NameTaskFunc("bbb", func() Result { + NameTaskFunc("bbb", func(context.Context) Result { return Wait().With("wait") }), - NameTaskFunc("ccc", func() Result { + NameTaskFunc("ccc", func(context.Context) Result { return Complete().With("success") }), }, @@ -90,8 +91,9 @@ func TestTaskRunner(t *testing.T) { t.Run(c.desc, func(tt *testing.T) { tt.Parallel() + ctx := context.Background() runner := NewTaskRunner(&dummyReporter{}, c.ts...) - res, err := runner.Run() + res, err := runner.Run(ctx) if c.hasErr { assert.Error(tt, err, c.desc) } else { diff --git a/pkg/utils/task/v3/task.go b/pkg/utils/task/v3/task.go index 60596ad0ba..174b0c1521 100644 --- a/pkg/utils/task/v3/task.go +++ b/pkg/utils/task/v3/task.go @@ -14,15 +14,17 @@ package task +import "context" + // Syncer defines an action to sync actual states to desired. type Syncer interface { - Sync() Result + Sync(ctx context.Context) Result } -type SyncFunc func() Result +type SyncFunc func(ctx context.Context) Result -func (f SyncFunc) Sync() Result { - return f() +func (f SyncFunc) Sync(ctx context.Context) Result { + return f(ctx) } type Condition interface { @@ -37,8 +39,14 @@ func (f CondFunc) Satisfy() bool { // Task is a Syncer wrapper, which can be orchestrated using control structures // such as if and break for conditional logic and flow control. +// Task can only be implemented by this package and users should implement the Syncer interface +// and wrap Syncers as Tasks type Task interface { - sync() (_ Result, done bool) + sync(ctx context.Context) (_ Result, done bool) +} + +func RunTask(ctx context.Context, t Task) (Result, bool) { + return t.sync(ctx) } type task struct { @@ -46,8 +54,8 @@ type task struct { f Syncer } -func (e *task) sync() (Result, bool) { - return nameResult(e.name, e.f.Sync()), false +func (e *task) sync(ctx context.Context) (Result, bool) { + return nameResult(e.name, e.f.Sync(ctx)), false } func NameTaskFunc(name string, f SyncFunc) Task { @@ -62,9 +70,9 @@ type optionalTask struct { cond Condition } -func (e *optionalTask) sync() (Result, bool) { +func (e *optionalTask) sync(ctx context.Context) (Result, bool) { if e.cond.Satisfy() { - return e.Task.sync() + return e.Task.sync(ctx) } return nil, false @@ -81,8 +89,8 @@ type breakTask struct { Task } -func (e *breakTask) sync() (Result, bool) { - r, _ := e.Task.sync() +func (e *breakTask) sync(ctx context.Context) (Result, bool) { + r, _ := e.Task.sync(ctx) return r, true } @@ -100,10 +108,10 @@ type blockTask struct { tasks []Task } -func (e *blockTask) sync() (Result, bool) { +func (e *blockTask) sync(ctx context.Context) (Result, bool) { var rs []Result for _, expr := range e.tasks { - r, done := expr.sync() + r, done := expr.sync(ctx) if r == nil { continue } diff --git a/pkg/utils/task/v3/task_test.go b/pkg/utils/task/v3/task_test.go index 5f0f5fa78e..80822198e5 100644 --- a/pkg/utils/task/v3/task_test.go +++ b/pkg/utils/task/v3/task_test.go @@ -15,6 +15,7 @@ package task import ( + "context" "testing" "github.com/stretchr/testify/assert" @@ -36,7 +37,7 @@ func TestNameTask(t *testing.T) { { desc: "normal", name: "aaa", - syncer: SyncFunc(func() Result { + syncer: SyncFunc(func(context.Context) Result { return Complete().With("success") }), expectedResult: nameResult( @@ -50,9 +51,9 @@ func TestNameTask(t *testing.T) { c := &cases[i] t.Run(c.desc, func(tt *testing.T) { tt.Parallel() - + ctx := context.Background() task := NameTaskFunc(c.name, c.syncer) - res, done := task.sync() + res, done := task.sync(ctx) assert.Equal(tt, c.expectedResult, res, c.desc) assert.False(tt, done, c.desc) }) @@ -68,7 +69,7 @@ func TestIf(t *testing.T) { }{ { desc: "cond is true", - task: NameTaskFunc("aaa", func() Result { + task: NameTaskFunc("aaa", func(context.Context) Result { return Complete().With("success") }), cond: condition(true), @@ -78,7 +79,7 @@ func TestIf(t *testing.T) { }, { desc: "cond is false", - task: NameTaskFunc("aaa", func() Result { + task: NameTaskFunc("aaa", func(context.Context) Result { return Complete().With("success") }), cond: condition(false), @@ -91,8 +92,9 @@ func TestIf(t *testing.T) { t.Run(c.desc, func(tt *testing.T) { tt.Parallel() + ctx := context.Background() task := If(c.cond, c.task) - res, done := task.sync() + res, done := task.sync(ctx) assert.Equal(tt, c.expectedResult, res, c.desc) assert.False(tt, done, c.desc) }) @@ -107,7 +109,7 @@ func TestBreak(t *testing.T) { }{ { desc: "cond is true", - task: NameTaskFunc("aaa", func() Result { + task: NameTaskFunc("aaa", func(context.Context) Result { return Complete().With("success") }), expectedResult: newAggregate( @@ -121,8 +123,9 @@ func TestBreak(t *testing.T) { t.Run(c.desc, func(tt *testing.T) { tt.Parallel() + ctx := context.Background() task := Break(c.task) - res, done := task.sync() + res, done := task.sync(ctx) assert.Equal(tt, c.expectedResult, res, c.desc) assert.True(tt, done, c.desc) }) @@ -139,7 +142,7 @@ func TestIfBreak(t *testing.T) { }{ { desc: "cond is true", - task: NameTaskFunc("aaa", func() Result { + task: NameTaskFunc("aaa", func(context.Context) Result { return Complete().With("success") }), cond: CondFunc(func() bool { return true }), @@ -150,7 +153,7 @@ func TestIfBreak(t *testing.T) { }, { desc: "cond is false", - task: NameTaskFunc("aaa", func() Result { + task: NameTaskFunc("aaa", func(context.Context) Result { return Complete().With("success") }), cond: CondFunc(func() bool { return false }), @@ -164,8 +167,9 @@ func TestIfBreak(t *testing.T) { t.Run(c.desc, func(tt *testing.T) { tt.Parallel() + ctx := context.Background() task := IfBreak(c.cond, c.task) - res, done := task.sync() + res, done := task.sync(ctx) assert.Equal(tt, c.expectedResult, res, c.desc) assert.Equal(tt, c.expectedDone, done, c.desc) }) @@ -188,7 +192,7 @@ func TestBlock(t *testing.T) { { desc: "1 complete task", tasks: []Task{ - NameTaskFunc("aaa", func() Result { + NameTaskFunc("aaa", func(context.Context) Result { return Complete().With("success") }), }, @@ -200,10 +204,10 @@ func TestBlock(t *testing.T) { { desc: "2 complete tasks", tasks: []Task{ - NameTaskFunc("aaa", func() Result { + NameTaskFunc("aaa", func(context.Context) Result { return Complete().With("success") }), - NameTaskFunc("bbb", func() Result { + NameTaskFunc("bbb", func(context.Context) Result { return Complete().With("success") }), }, @@ -216,10 +220,10 @@ func TestBlock(t *testing.T) { { desc: "2 tasks with 1 fail task", tasks: []Task{ - NameTaskFunc("aaa", func() Result { + NameTaskFunc("aaa", func(context.Context) Result { return Fail().With("fail") }), - NameTaskFunc("bbb", func() Result { + NameTaskFunc("bbb", func(context.Context) Result { return Complete().With("success") }), }, @@ -231,10 +235,10 @@ func TestBlock(t *testing.T) { { desc: "if task", tasks: []Task{ - If(condition(false), NameTaskFunc("aaa", func() Result { + If(condition(false), NameTaskFunc("aaa", func(context.Context) Result { return Fail().With("fail") })), - NameTaskFunc("bbb", func() Result { + NameTaskFunc("bbb", func(context.Context) Result { return Complete().With("success") }), }, @@ -246,10 +250,10 @@ func TestBlock(t *testing.T) { { desc: "break task", tasks: []Task{ - NameTaskFunc("aaa", func() Result { + NameTaskFunc("aaa", func(context.Context) Result { return Complete().With("success") }), - Break(NameTaskFunc("bbb", func() Result { + Break(NameTaskFunc("bbb", func(context.Context) Result { return Complete().With("success") })), }, @@ -262,10 +266,10 @@ func TestBlock(t *testing.T) { { desc: "if break task", tasks: []Task{ - IfBreak(condition(true), NameTaskFunc("aaa", func() Result { + IfBreak(condition(true), NameTaskFunc("aaa", func(context.Context) Result { return Complete().With("success") })), - NameTaskFunc("bbb", func() Result { + NameTaskFunc("bbb", func(context.Context) Result { return Complete().With("success") }), }, @@ -281,8 +285,9 @@ func TestBlock(t *testing.T) { t.Run(c.desc, func(tt *testing.T) { tt.Parallel() + ctx := context.Background() task := Block(c.tasks...) - r, done := task.sync() + r, done := task.sync(ctx) assert.Equal(tt, c.expectedResult, r, c.desc) assert.Equal(tt, c.expectedDone, done, c.desc) }) From a1bdc8284602262c38f9662e5fbce5ec162e19c3 Mon Sep 17 00:00:00 2001 From: liubo02 Date: Tue, 24 Dec 2024 16:55:36 +0800 Subject: [PATCH 2/4] fix lint and ut Signed-off-by: liubo02 --- pkg/client/fake.go | 17 ++++++++--------- pkg/controllers/cluster/tasks/finalizer_test.go | 8 ++++---- pkg/controllers/pd/tasks/ctx.go | 2 +- pkg/controllers/pd/tasks/status.go | 2 +- 4 files changed, 14 insertions(+), 15 deletions(-) diff --git a/pkg/client/fake.go b/pkg/client/fake.go index 306ac06f6f..5105b2f883 100644 --- a/pkg/client/fake.go +++ b/pkg/client/fake.go @@ -66,17 +66,17 @@ func NewFakeClient(objs ...client.Object) FakeClient { } } -func (fc *fakeClient) WithError(verb, resource string, err error) { - fc.fc.PrependReactor(verb, resource, func(action testing.Action) (bool, runtime.Object, error) { - return true, nil, err - }) -} - type fakeClient struct { Client fc *fakeUnderlayClient } +func (fc *fakeClient) WithError(verb, resource string, err error) { + fc.fc.PrependReactor(verb, resource, func(_ testing.Action) (bool, runtime.Object, error) { + return true, nil, err + }) +} + type fakeUnderlayClient struct { testing.Fake tracker testing.ObjectTracker @@ -586,11 +586,10 @@ func (c *fakeUnderlayClient) PatchReactionFunc(action *testing.PatchActionImpl) } case types.ApplyPatchType: patchObj := &unstructured.Unstructured{Object: map[string]any{}} - if err = yaml.Unmarshal(action.GetPatch(), &patchObj.Object); err != nil { + if err := yaml.Unmarshal(action.GetPatch(), &patchObj.Object); err != nil { return true, nil, fmt.Errorf("error decoding YAML: %w", err) } - obj, err = manager.Apply(obj, patchObj, "tidb-operator", true) - if err != nil { + if _, err := manager.Apply(obj, patchObj, "tidb-operator", true); err != nil { return true, nil, err } diff --git a/pkg/controllers/cluster/tasks/finalizer_test.go b/pkg/controllers/cluster/tasks/finalizer_test.go index 282a36cc7b..60923879a6 100644 --- a/pkg/controllers/cluster/tasks/finalizer_test.go +++ b/pkg/controllers/cluster/tasks/finalizer_test.go @@ -44,14 +44,14 @@ func TestFinalizer(t *testing.T) { }, { desc: "removed finalizer", - cluster: fake.FakeObj[v1alpha1.Cluster]("test", - fake.SetDeleteTimestamp[v1alpha1.Cluster](), fake.AddFinalizer[v1alpha1.Cluster]()), + cluster: fake.FakeObj("test", + fake.DeleteNow[v1alpha1.Cluster](), fake.AddFinalizer[v1alpha1.Cluster]()), expected: task.Complete().Break().With("removed finalizer"), }, { desc: "deleting components", - cluster: fake.FakeObj[v1alpha1.Cluster]("test", - fake.SetDeleteTimestamp[v1alpha1.Cluster](), fake.AddFinalizer[v1alpha1.Cluster]()), + cluster: fake.FakeObj("test", + fake.DeleteNow[v1alpha1.Cluster](), fake.AddFinalizer[v1alpha1.Cluster]()), pdGroup: fake.FakeObj[v1alpha1.PDGroup]("pd-group"), expected: task.Fail().With("deleting components"), hasFinalizer: true, diff --git a/pkg/controllers/pd/tasks/ctx.go b/pkg/controllers/pd/tasks/ctx.go index 3d5c0a616e..5a99c0004b 100644 --- a/pkg/controllers/pd/tasks/ctx.go +++ b/pkg/controllers/pd/tasks/ctx.go @@ -49,7 +49,7 @@ type ReconcileContext struct { } func TaskContextInfoFromPD(state *ReconcileContext, cm pdm.PDClientManager) task.Task { - return task.NameTaskFunc("ContextInfoFromPD", func(ctx context.Context) task.Result { + return task.NameTaskFunc("ContextInfoFromPD", func(_ context.Context) task.Result { ck := state.Cluster() pc, ok := cm.Get(pdm.PrimaryKey(ck.Namespace, ck.Name)) if !ok { diff --git a/pkg/controllers/pd/tasks/status.go b/pkg/controllers/pd/tasks/status.go index aa4fce8eac..324a882ba4 100644 --- a/pkg/controllers/pd/tasks/status.go +++ b/pkg/controllers/pd/tasks/status.go @@ -71,7 +71,7 @@ func TaskStatusSuspend(state *ReconcileContext, c client.Client) task.Task { } func TaskStatusUnknown() task.Task { - return task.NameTaskFunc("StatusUnknown", func(ctx context.Context) task.Result { + return task.NameTaskFunc("StatusUnknown", func(_ context.Context) task.Result { return task.Wait().With("status of the pd is unknown") }) } From 3dac1b3a7c8ddc2667d4e66d9610e8796ca033b9 Mon Sep 17 00:00:00 2001 From: liubo02 Date: Tue, 24 Dec 2024 17:06:12 +0800 Subject: [PATCH 3/4] fix ut Signed-off-by: liubo02 --- pkg/client/fake.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/client/fake.go b/pkg/client/fake.go index 5105b2f883..603a63b76c 100644 --- a/pkg/client/fake.go +++ b/pkg/client/fake.go @@ -589,9 +589,11 @@ func (c *fakeUnderlayClient) PatchReactionFunc(action *testing.PatchActionImpl) if err := yaml.Unmarshal(action.GetPatch(), &patchObj.Object); err != nil { return true, nil, fmt.Errorf("error decoding YAML: %w", err) } - if _, err := manager.Apply(obj, patchObj, "tidb-operator", true); err != nil { + ret, err := manager.Apply(obj, patchObj, "tidb-operator", true) + if err != nil { return true, nil, err } + obj = ret default: return true, nil, fmt.Errorf("PatchType is not supported") From 96e1ff70b4d28bdf34189bc3894dc3d449274b94 Mon Sep 17 00:00:00 2001 From: liubo02 Date: Tue, 24 Dec 2024 17:07:27 +0800 Subject: [PATCH 4/4] fix license Signed-off-by: liubo02 --- pkg/controllers/common/cond_test.go | 14 ++++++++++++++ pkg/controllers/common/interfaces.go | 14 ++++++++++++++ pkg/controllers/common/interfaces_test.go | 14 ++++++++++++++ pkg/controllers/common/resource.go | 14 ++++++++++++++ pkg/controllers/common/resource_test.go | 14 ++++++++++++++ pkg/controllers/common/task_test.go | 14 ++++++++++++++ pkg/controllers/pd/tasks/state.go | 14 ++++++++++++++ 7 files changed, 98 insertions(+) diff --git a/pkg/controllers/common/cond_test.go b/pkg/controllers/common/cond_test.go index 2cb771c31a..9561a4135d 100644 --- a/pkg/controllers/common/cond_test.go +++ b/pkg/controllers/common/cond_test.go @@ -1,3 +1,17 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package common import ( diff --git a/pkg/controllers/common/interfaces.go b/pkg/controllers/common/interfaces.go index c63e283252..395367880c 100644 --- a/pkg/controllers/common/interfaces.go +++ b/pkg/controllers/common/interfaces.go @@ -1,3 +1,17 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package common import ( diff --git a/pkg/controllers/common/interfaces_test.go b/pkg/controllers/common/interfaces_test.go index f865e585f4..cba65f4166 100644 --- a/pkg/controllers/common/interfaces_test.go +++ b/pkg/controllers/common/interfaces_test.go @@ -1,3 +1,17 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package common import ( diff --git a/pkg/controllers/common/resource.go b/pkg/controllers/common/resource.go index 45178886ed..91fe8f2432 100644 --- a/pkg/controllers/common/resource.go +++ b/pkg/controllers/common/resource.go @@ -1,3 +1,17 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package common type Setter[T any] interface { diff --git a/pkg/controllers/common/resource_test.go b/pkg/controllers/common/resource_test.go index f104f4c800..b829da9bb9 100644 --- a/pkg/controllers/common/resource_test.go +++ b/pkg/controllers/common/resource_test.go @@ -1,3 +1,17 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package common import ( diff --git a/pkg/controllers/common/task_test.go b/pkg/controllers/common/task_test.go index 77b5be7968..82cd79b621 100644 --- a/pkg/controllers/common/task_test.go +++ b/pkg/controllers/common/task_test.go @@ -1,3 +1,17 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package common import ( diff --git a/pkg/controllers/pd/tasks/state.go b/pkg/controllers/pd/tasks/state.go index 7d3f632192..3c84ec27b5 100644 --- a/pkg/controllers/pd/tasks/state.go +++ b/pkg/controllers/pd/tasks/state.go @@ -1,3 +1,17 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package tasks import (