From 89215c5e57a81fc0a50d6a85e5c8cdb1ce470d45 Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Mon, 15 May 2023 18:33:47 +0800 Subject: [PATCH] timer: add user interfaces for timer framework --- build/nogo_config.json | 6 +- timer/api/BUILD.bazel | 31 +++++ timer/api/client.go | 98 ++++++++++++++ timer/api/client_test.go | 120 +++++++++++++++++ timer/api/hook.go | 59 +++++++++ timer/api/schedule_policy_test.go | 67 ++++++++++ timer/api/store.go | 207 ++++++++++++++++++++++++++++++ timer/api/store_test.go | 189 +++++++++++++++++++++++++++ timer/api/timer.go | 159 +++++++++++++++++++++++ 9 files changed, 934 insertions(+), 2 deletions(-) create mode 100644 timer/api/BUILD.bazel create mode 100644 timer/api/client.go create mode 100644 timer/api/client_test.go create mode 100644 timer/api/hook.go create mode 100644 timer/api/schedule_policy_test.go create mode 100644 timer/api/store.go create mode 100644 timer/api/store_test.go create mode 100644 timer/api/timer.go diff --git a/build/nogo_config.json b/build/nogo_config.json index c183db10b5993..68648d3a70c92 100644 --- a/build/nogo_config.json +++ b/build/nogo_config.json @@ -559,7 +559,8 @@ "extension/": "extension code", "resourcemanager/": "resourcemanager code", "keyspace/": "keyspace code", - "owner/": "owner code" + "owner/": "owner code", + "timer/": "timer code" } }, "shift": { @@ -977,7 +978,8 @@ "keyspace/": "keyspace code", "server/": "server code", "owner/": "owner code", - "meta": "meta code" + "meta": "meta code", + "timer/": "timer code" } }, "SA1029": { diff --git a/timer/api/BUILD.bazel b/timer/api/BUILD.bazel new file mode 100644 index 0000000000000..5289fee58e34f --- /dev/null +++ b/timer/api/BUILD.bazel @@ -0,0 +1,31 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "api", + srcs = [ + "client.go", + "hook.go", + "store.go", + "timer.go", + ], + importpath = "github.com/pingcap/tidb/timer/api", + visibility = ["//visibility:public"], + deps = [ + "//parser/duration", + "@com_github_pingcap_errors//:errors", + ], +) + +go_test( + name = "api_test", + timeout = "short", + srcs = [ + "client_test.go", + "schedule_policy_test.go", + "store_test.go", + ], + embed = [":api"], + flaky = True, + shard_count = 7, + deps = ["@com_github_stretchr_testify//require"], +) diff --git a/timer/api/client.go b/timer/api/client.go new file mode 100644 index 0000000000000..692f6da21fb03 --- /dev/null +++ b/timer/api/client.go @@ -0,0 +1,98 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import ( + "context" + "time" +) + +// GetTimerOption is the option to get timers +type GetTimerOption func(*TimerCond) + +// WithKey indicates to get a timer with the specified key +func WithKey(key string) GetTimerOption { + return func(cond *TimerCond) { + cond.Key.Set(key) + cond.KeyPrefix = false + } +} + +// WithKeyPrefix to get timers with the indicated key prefix +func WithKeyPrefix(keyPrefix string) GetTimerOption { + return func(cond *TimerCond) { + cond.Key.Set(keyPrefix) + cond.KeyPrefix = true + } +} + +// WithID indicates to get a timer with the specified id +func WithID(id string) GetTimerOption { + return func(cond *TimerCond) { + cond.ID.Set(id) + } +} + +// UpdateTimerOption is the option to update the timer +type UpdateTimerOption func(*TimerUpdate) + +// WithSetEnable indicates to set the timer's `Enable` field +func WithSetEnable(enable bool) UpdateTimerOption { + return func(update *TimerUpdate) { + update.Enable.Set(enable) + } +} + +// WithSetSchedExpr indicates to set the timer's schedule policy +func WithSetSchedExpr(tp SchedPolicyType, expr string) UpdateTimerOption { + return func(update *TimerUpdate) { + update.SchedPolicyType.Set(tp) + update.SchedPolicyExpr.Set(expr) + } +} + +// WithSetWatermark indicates to set the timer's watermark +func WithSetWatermark(watermark time.Time) UpdateTimerOption { + return func(update *TimerUpdate) { + update.Watermark.Set(watermark) + } +} + +// WithSetSummaryData indicates to set the timer's summary +func WithSetSummaryData(summary []byte) UpdateTimerOption { + return func(update *TimerUpdate) { + update.SummaryData.Set(summary) + } +} + +// TimerClient is an interface exposed to user to manage timers +type TimerClient interface { + // GetDefaultNamespace returns the default namespace of this client + GetDefaultNamespace() string + // CreateTimer creates a new timer + CreateTimer(ctx context.Context, spec TimerSpec) (*TimerRecord, error) + // GetTimerByID queries the timer by ID + GetTimerByID(ctx context.Context, timerID string) (*TimerRecord, error) + // GetTimerByKey queries the timer by key + GetTimerByKey(ctx context.Context, key string) (*TimerRecord, error) + // GetTimers queries timers by options + GetTimers(ctx context.Context, opt ...GetTimerOption) ([]*TimerRecord, error) + // UpdateTimer updates a timer + UpdateTimer(ctx context.Context, timerID string, opt ...UpdateTimerOption) error + // CloseTimerEvent closes the triggering event of a timer + CloseTimerEvent(ctx context.Context, timerID string, eventID string, opts ...UpdateTimerOption) error + // DeleteTimer deletes a timer + DeleteTimer(ctx context.Context, timerID string) (bool, error) +} diff --git a/timer/api/client_test.go b/timer/api/client_test.go new file mode 100644 index 0000000000000..958123f3a4ad7 --- /dev/null +++ b/timer/api/client_test.go @@ -0,0 +1,120 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestGetTimerOption(t *testing.T) { + var cond TimerCond + require.Empty(t, cond.FieldsSet()) + + // test 'Key' field + require.False(t, cond.Key.Present()) + + WithKey("k1")(&cond) + key, ok := cond.Key.Get() + require.True(t, ok) + require.Equal(t, "k1", key) + require.False(t, cond.KeyPrefix) + require.Equal(t, []string{"Key"}, cond.FieldsSet()) + + WithKeyPrefix("k2")(&cond) + key, ok = cond.Key.Get() + require.True(t, ok) + require.Equal(t, "k2", key) + require.True(t, cond.KeyPrefix) + require.Equal(t, []string{"Key"}, cond.FieldsSet()) + + WithKey("k3")(&cond) + key, ok = cond.Key.Get() + require.True(t, ok) + require.Equal(t, "k3", key) + require.False(t, cond.KeyPrefix) + require.Equal(t, []string{"Key"}, cond.FieldsSet()) + + // test 'ID' field + require.False(t, cond.ID.Present()) + + WithID("id1")(&cond) + id, ok := cond.ID.Get() + require.True(t, ok) + require.Equal(t, "id1", id) + require.Equal(t, []string{"ID", "Key"}, cond.FieldsSet()) +} + +func TestUpdateTimerOption(t *testing.T) { + var update TimerUpdate + require.Empty(t, update) + + // test 'Enable' field + require.False(t, update.Enable.Present()) + + WithSetEnable(true)(&update) + setEnable, ok := update.Enable.Get() + require.True(t, ok) + require.True(t, setEnable) + require.Equal(t, []string{"Enable"}, update.FieldsSet()) + + WithSetEnable(false)(&update) + setEnable, ok = update.Enable.Get() + require.True(t, ok) + require.False(t, setEnable) + require.Equal(t, []string{"Enable"}, update.FieldsSet()) + + // test schedule policy + require.False(t, update.SchedPolicyType.Present()) + require.False(t, update.SchedPolicyExpr.Present()) + + WithSetSchedExpr(SchedEventInterval, "3h")(&update) + stp, ok := update.SchedPolicyType.Get() + require.True(t, ok) + require.Equal(t, SchedEventInterval, stp) + expr, ok := update.SchedPolicyExpr.Get() + require.True(t, ok) + require.Equal(t, "3h", expr) + require.Equal(t, []string{"Enable", "SchedPolicyType", "SchedPolicyExpr"}, update.FieldsSet()) + + WithSetSchedExpr(SchedEventInterval, "1h")(&update) + stp, ok = update.SchedPolicyType.Get() + require.True(t, ok) + require.Equal(t, SchedEventInterval, stp) + expr, ok = update.SchedPolicyExpr.Get() + require.True(t, ok) + require.Equal(t, "1h", expr) + require.Equal(t, []string{"Enable", "SchedPolicyType", "SchedPolicyExpr"}, update.FieldsSet()) + + // test 'Watermark' field + require.False(t, update.Watermark.Present()) + + WithSetWatermark(time.Unix(1234, 5678))(&update) + watermark, ok := update.Watermark.Get() + require.True(t, ok) + require.Equal(t, time.Unix(1234, 5678), watermark) + require.Equal(t, []string{"Enable", "SchedPolicyType", "SchedPolicyExpr", "Watermark"}, update.FieldsSet()) + + // test 'SummaryData' field + require.False(t, update.SummaryData.Present()) + + WithSetSummaryData([]byte("hello"))(&update) + summary, ok := update.SummaryData.Get() + require.True(t, ok) + require.Equal(t, []byte("hello"), summary) + require.Equal(t, []string{"Enable", "SchedPolicyType", "SchedPolicyExpr", "Watermark", "SummaryData"}, update.FieldsSet()) +} diff --git a/timer/api/hook.go b/timer/api/hook.go new file mode 100644 index 0000000000000..afa564df592a1 --- /dev/null +++ b/timer/api/hook.go @@ -0,0 +1,59 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import ( + "context" + "time" +) + +// TimerShedEvent is an interface which gives the timer's schedule event's information +type TimerShedEvent interface { + // EventID returns the event ID the current event + EventID() string + // Timer returns the timer record object of the current event + Timer() *TimerRecord +} + +// PreSchedEventResult is the result of `OnPreSchedEvent` +type PreSchedEventResult struct { + // Delay indicates to delay the event after a while. + // If `Delay` is 0, it means no delay, and then `OnSchedEvent` will be called. + // Otherwise, after a while according to `Delay`, `OnPreSchedEvent` will be called again to + // check whether to trigger the event. + Delay time.Duration + // EventData indicates the data should be passed to the event that should be triggered. + // EventData can be used to store some pre-computed configurations of the next event + EventData []byte +} + +// Hook is an interface which should be implemented by user to tell framework how to trigger an event +// Several timers with a same hook class can share one hook in a runtime +type Hook interface { + // Start starts the hook + Start() + // Stop stops the hook. When it is called, this means the framework is shutting down + Stop() + // OnPreSchedEvent will be called before triggering a new event. It's return value tells the next action of the triggering. + // For example, if `TimerShedEvent.Delay` is a non-zero value, the event triggering will be postponed. + // Notice that `event.Timer().EventID` will be empty because the current event is not actually triggered, + // use `event.EventID()` to get the event id instead. + OnPreSchedEvent(ctx context.Context, event TimerShedEvent) (*TimerShedEvent, error) + // OnSchedEvent will be called when a new event is triggered. + OnSchedEvent(ctx context.Context, event TimerShedEvent) error +} + +// HookFactory is the factory function to construct a new Hook object with `hookClass` +type HookFactory func(hookClass string, cli TimerClient) Hook diff --git a/timer/api/schedule_policy_test.go b/timer/api/schedule_policy_test.go new file mode 100644 index 0000000000000..e74c48fbdfc76 --- /dev/null +++ b/timer/api/schedule_policy_test.go @@ -0,0 +1,67 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestIntervalPolicy(t *testing.T) { + watermark1 := time.Now() + watermark2, err := time.Parse(time.RFC3339, "2021-11-21T11:21:31Z") + require.NoError(t, err) + + cases := []struct { + expr string + err bool + interval time.Duration + }{ + { + expr: "6m", + interval: 6 * time.Minute, + }, + { + expr: "7h", + interval: 7 * time.Hour, + }, + { + expr: "8d", + interval: 8 * 24 * time.Hour, + }, + { + expr: "11", + err: true, + }, + } + + for _, c := range cases { + p, err := NewSchedIntervalPolicy(c.expr) + if c.err { + require.ErrorContains(t, err, fmt.Sprintf("invalid schedule event expr '%s'", c.expr)) + continue + } + require.NoError(t, err) + tm, ok := p.NextEventTime(watermark1) + require.True(t, ok) + require.Equal(t, watermark1.Add(c.interval), tm) + tm, ok = p.NextEventTime(watermark2) + require.True(t, ok) + require.Equal(t, watermark2.Add(c.interval), tm) + } +} diff --git a/timer/api/store.go b/timer/api/store.go new file mode 100644 index 0000000000000..dcee5b9863c25 --- /dev/null +++ b/timer/api/store.go @@ -0,0 +1,207 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import ( + "reflect" + "strings" + "time" + "unsafe" +) + +type optionalVal interface { + optionalVal() + Present() bool + Clear() +} + +// OptionalVal is used by `TimerCond` and `TimerUpdate` to indicate +// whether some field's condition has been set or whether is field should be update +type OptionalVal[T any] struct { + v T + present bool +} + +// NewOptionalVal creates a new OptionalVal +func NewOptionalVal[T any](val T) (o OptionalVal[T]) { + o.Set(val) + return +} + +func (*OptionalVal[T]) optionalVal() {} + +// Present indicates whether the field value is set. +func (o *OptionalVal[T]) internalPresent() bool { + return o.present +} + +// Present indicates whether the field value is set. +func (o *OptionalVal[T]) Present() bool { + return o.present +} + +// Get returns the current value, if the second return value is false, this means it is not set. +func (o *OptionalVal[T]) Get() (v T, present bool) { + return o.v, o.present +} + +// Set sets the value +func (o *OptionalVal[T]) Set(v T) { + o.v, o.present = v, true +} + +// Clear clears the value. The `Present()` will return false after `Clear` is called. +func (o *OptionalVal[T]) Clear() { + var v T + o.v, o.present = v, false +} + +func iterOptionalFields(v reflect.Value, excludes []unsafe.Pointer, fn func(name string, val optionalVal) bool) { + tp := v.Type() +loop: + for i := 0; i < v.NumField(); i++ { + fieldVal := v.Field(i).Addr() + optVal, ok := fieldVal.Interface().(optionalVal) + if !ok { + continue + } + + for _, ex := range excludes { + if fieldVal.UnsafePointer() == ex { + continue loop + } + } + + if !fn(tp.Field(i).Name, optVal) { + break loop + } + } +} + +// TimerCond is the condition to filter a timer record +type TimerCond struct { + // ID indicates to filter the timer record with ID + ID OptionalVal[string] + // Namespace indicates to filter the timer by Namespace + Namespace OptionalVal[string] + // Key indicates to filter the timer record with ID + // The filter behavior is defined by `KeyPrefix` + Key OptionalVal[string] + // KeyPrefix indicates how to filter with timer's key if `Key` is set + // If `KeyPrefix is` true, it will check whether the timer's key is prefixed with `TimerCond.Key`. + // Otherwise, it will check whether the timer's key equals `TimerCond.Key`. + KeyPrefix bool +} + +// Match will return whether the condition match the timer record +func (c *TimerCond) Match(t *TimerRecord) bool { + if val, ok := c.ID.Get(); ok && t.ID != val { + return false + } + + if val, ok := c.Namespace.Get(); ok && t.Namespace != val { + return false + } + + if val, ok := c.Key.Get(); ok { + if c.KeyPrefix && !strings.HasPrefix(t.Key, val) { + return false + } + + if !c.KeyPrefix && t.Key != val { + return false + } + } + + return true +} + +// FieldsSet returns all fields that has been set exclude excludes +func (c *TimerCond) FieldsSet(excludes ...unsafe.Pointer) (fields []string) { + iterOptionalFields(reflect.ValueOf(c).Elem(), excludes, func(name string, val optionalVal) bool { + if val.Present() { + fields = append(fields, name) + } + return true + }) + return +} + +// Clear clears all fields +func (c *TimerCond) Clear() { + iterOptionalFields(reflect.ValueOf(c).Elem(), nil, func(name string, val optionalVal) bool { + val.Clear() + return true + }) + c.KeyPrefix = false +} + +// TimerUpdate indicates how to update a timer +type TimerUpdate struct { + // Enable indicates to set the timer's `Enable` field + Enable OptionalVal[bool] + // SchedPolicyType indicates to set the timer's `SchedPolicyType` field + SchedPolicyType OptionalVal[SchedPolicyType] + // SchedPolicyExpr indicates to set the timer's `SchedPolicyExpr` field + SchedPolicyExpr OptionalVal[string] + // Watermark indicates to set the timer's `Watermark` field + Watermark OptionalVal[time.Time] + // SummaryData indicates to set the timer's `Summary` field + SummaryData OptionalVal[[]byte] +} + +// Apply applies the update to a timer +func (u *TimerUpdate) Apply(record *TimerRecord) error { + if v, ok := u.Enable.Get(); ok { + record.Enable = v + } + + if v, ok := u.SchedPolicyType.Get(); ok { + record.SchedPolicyType = v + } + + if v, ok := u.SchedPolicyExpr.Get(); ok { + record.SchedPolicyExpr = v + } + + if v, ok := u.Watermark.Get(); ok { + record.Watermark = v + } + + if v, ok := u.SummaryData.Get(); ok { + record.SummaryData = v + } + + return nil +} + +// FieldsSet returns all fields that has been set exclude excludes +func (u *TimerUpdate) FieldsSet(excludes ...unsafe.Pointer) (fields []string) { + iterOptionalFields(reflect.ValueOf(u).Elem(), excludes, func(name string, val optionalVal) bool { + if val.Present() { + fields = append(fields, name) + } + return true + }) + return +} + +// Clear clears all fields +func (u *TimerUpdate) Clear() { + iterOptionalFields(reflect.ValueOf(u).Elem(), nil, func(name string, val optionalVal) bool { + val.Clear() + return true + }) +} diff --git a/timer/api/store_test.go b/timer/api/store_test.go new file mode 100644 index 0000000000000..0847eaf6f7d47 --- /dev/null +++ b/timer/api/store_test.go @@ -0,0 +1,189 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import ( + "testing" + "time" + "unsafe" + + "github.com/stretchr/testify/require" +) + +func TestFieldOptional(t *testing.T) { + var opt1 OptionalVal[string] + require.False(t, opt1.Present()) + s, ok := opt1.Get() + require.False(t, ok) + require.Equal(t, "", s) + + opt1.Set("a1") + require.True(t, opt1.Present()) + s, ok = opt1.Get() + require.True(t, ok) + require.Equal(t, "a1", s) + + opt1.Set("a2") + require.True(t, opt1.Present()) + s, ok = opt1.Get() + require.True(t, ok) + require.Equal(t, "a2", s) + + opt1.Clear() + require.False(t, opt1.Present()) + s, ok = opt1.Get() + require.False(t, ok) + require.Equal(t, "", s) + + type Foo struct { + v int + } + var opt2 OptionalVal[*Foo] + foo := &Foo{v: 1} + + f, ok := opt2.Get() + require.False(t, ok) + require.Nil(t, f) + + opt2.Set(foo) + require.True(t, opt2.Present()) + f, ok = opt2.Get() + require.True(t, ok) + require.Same(t, foo, f) + + opt2.Set(nil) + require.True(t, opt2.Present()) + f, ok = opt2.Get() + require.True(t, ok) + require.Nil(t, f) + + opt2.Clear() + f, ok = opt2.Get() + require.False(t, ok) + require.Nil(t, f) +} + +func TestFieldsReflect(t *testing.T) { + var cond TimerCond + require.Empty(t, cond.FieldsSet()) + + cond.Key.Set("k1") + require.Equal(t, []string{"Key"}, cond.FieldsSet()) + + cond.ID.Set("22") + require.Equal(t, []string{"ID", "Key"}, cond.FieldsSet()) + require.Equal(t, []string{"Key"}, cond.FieldsSet(unsafe.Pointer(&cond.ID))) + + cond.Key.Clear() + require.Equal(t, []string{"ID"}, cond.FieldsSet()) + + cond.KeyPrefix = true + cond.Clear() + require.Empty(t, cond.FieldsSet()) + require.False(t, cond.KeyPrefix) + + var update TimerUpdate + require.Empty(t, update.FieldsSet()) + + update.Watermark.Set(time.Now()) + require.Equal(t, []string{"Watermark"}, update.FieldsSet()) + + update.Enable.Set(true) + require.Equal(t, []string{"Enable", "Watermark"}, update.FieldsSet()) + require.Equal(t, []string{"Watermark"}, update.FieldsSet(unsafe.Pointer(&update.Enable))) + + update.Watermark.Clear() + require.Equal(t, []string{"Enable"}, update.FieldsSet()) + + update.Clear() + require.Empty(t, update.FieldsSet()) +} + +func TestTimerRecordCond(t *testing.T) { + tm := &TimerRecord{ + ID: "123", + TimerSpec: TimerSpec{ + Namespace: "n1", + Key: "/path/to/key", + }, + } + + // ID + cond := &TimerCond{ID: NewOptionalVal("123")} + require.True(t, cond.Match(tm)) + + cond = &TimerCond{ID: NewOptionalVal("1")} + require.False(t, cond.Match(tm)) + + // Namespace + cond = &TimerCond{Namespace: NewOptionalVal("n1")} + require.True(t, cond.Match(tm)) + + cond = &TimerCond{Namespace: NewOptionalVal("n2")} + require.False(t, cond.Match(tm)) + + // Key + cond = &TimerCond{Key: NewOptionalVal("/path/to/key")} + require.True(t, cond.Match(tm)) + + cond = &TimerCond{Key: NewOptionalVal("/path/to/")} + require.False(t, cond.Match(tm)) + + // keyPrefix + cond = &TimerCond{Key: NewOptionalVal("/path/to/"), KeyPrefix: true} + require.True(t, cond.Match(tm)) + + cond = &TimerCond{Key: NewOptionalVal("/path/to2"), KeyPrefix: true} + require.False(t, cond.Match(tm)) + + // Combined condition + cond = &TimerCond{ID: NewOptionalVal("123"), Key: NewOptionalVal("/path/to/key")} + require.True(t, cond.Match(tm)) + + cond = &TimerCond{ID: NewOptionalVal("123"), Key: NewOptionalVal("/path/to/")} + require.False(t, cond.Match(tm)) +} + +func TestTimerUpdate(t *testing.T) { + tm := &TimerRecord{ + ID: "123", + TimerSpec: TimerSpec{ + Namespace: "n1", + Key: "/path/to/key", + }, + } + + now := time.Now() + data := []byte("aabbcc") + update := &TimerUpdate{ + Enable: NewOptionalVal(true), + SchedPolicyType: NewOptionalVal(SchedEventInterval), + SchedPolicyExpr: NewOptionalVal("1h"), + Watermark: NewOptionalVal(now), + SummaryData: NewOptionalVal(data), + } + + require.NoError(t, update.Apply(tm)) + require.True(t, tm.Enable) + require.Equal(t, SchedEventInterval, tm.SchedPolicyType) + require.Equal(t, "1h", tm.SchedPolicyExpr) + require.Equal(t, now, tm.Watermark) + require.Equal(t, data, tm.SummaryData) + + emptyUpdate := &TimerUpdate{} + tm2 := tm.Clone() + require.NoError(t, emptyUpdate.Apply(tm2)) + require.Equal(t, tm, tm2) +} diff --git a/timer/api/timer.go b/timer/api/timer.go new file mode 100644 index 0000000000000..61e657fca5c9a --- /dev/null +++ b/timer/api/timer.go @@ -0,0 +1,159 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import ( + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/parser/duration" +) + +// SchedPolicyType is the type of the event schedule policy +type SchedPolicyType string + +const ( + // SchedEventInterval indicates to schedule events every fixed interval + SchedEventInterval SchedPolicyType = "INTERVAL" +) + +// SchedEventPolicy is an interface to tell the runtime how to schedule a timer's events +type SchedEventPolicy interface { + // NextEventTime returns the time to schedule the next timer event. If the second return value is true, + // it means we have a next event schedule after `watermark`. Otherwise, it means there is no more event after `watermark`. + NextEventTime(watermark time.Time) (time.Time, bool) +} + +// SchedIntervalPolicy implements SchedEventPolicy, it is the policy of type `SchedEventInterval` +type SchedIntervalPolicy struct { + expr string + interval time.Duration +} + +// NewSchedIntervalPolicy creates a new SchedIntervalPolicy +func NewSchedIntervalPolicy(expr string) (*SchedIntervalPolicy, error) { + interval, err := duration.ParseDuration(expr) + if err != nil { + return nil, errors.Wrapf(err, "invalid schedule event expr '%s'", expr) + } + + return &SchedIntervalPolicy{ + expr: expr, + interval: interval, + }, nil +} + +// NextEventTime returns the next time of the timer event +// A next event should be triggered after a time indicated by `interval` after watermark +func (p *SchedIntervalPolicy) NextEventTime(watermark time.Time) (time.Time, bool) { + if watermark.IsZero() { + return watermark, true + } + return watermark.Add(p.interval), true +} + +// TimerSpec is the specification of a timer without any runtime status +type TimerSpec struct { + // Namespace is the namespace of the timer + Namespace string + // Key is the key of the timer. Key is unique in each namespace + Key string + // Data is a binary which is defined by user + Data []byte + // SchedPolicyType is the type of the event schedule policy + SchedPolicyType SchedPolicyType + // SchedPolicyExpr is the expression of event schedule policy with the type specified by SchedPolicyType + SchedPolicyExpr string + // HookClass is the class of the hook + HookClass string + // Enable indicated whether the timer is enabled. + // If it is false, the new timer event will not be scheduled even it is up to time. + Enable bool +} + +// Clone returns a cloned TimerSpec +func (t *TimerSpec) Clone() *TimerSpec { + clone := *t + return &clone +} + +// Validate validates the TimerSpec +func (t *TimerSpec) Validate() error { + if t.Namespace == "" { + return errors.New("field 'namespace' should not be empty") + } + + if t.Key == "" { + return errors.New("field 'key' should not be empty") + } + + if _, err := t.CreateSchedEventPolicy(); err != nil { + return errors.Wrap(err, "schedule event configuration is not valid") + } + + return nil +} + +// CreateSchedEventPolicy creates a SchedEventPolicy according to `SchedPolicyType` and `SchedPolicyExpr` +func (t *TimerSpec) CreateSchedEventPolicy() (SchedEventPolicy, error) { + switch t.SchedPolicyType { + case SchedEventInterval: + return NewSchedIntervalPolicy(t.SchedPolicyExpr) + default: + return nil, errors.Errorf("invalid schedule event type: '%s'", t.SchedPolicyExpr) + } +} + +// SchedEventStatus is the current schedule status of timer's event +type SchedEventStatus string + +const ( + // SchedEventIdle means the timer is not in trigger state currently + SchedEventIdle SchedEventStatus = "IDLE" + // SchedEventTrigger means the timer is in trigger state + SchedEventTrigger SchedEventStatus = "TRIGGER" +) + +// TimerRecord is the timer record saved in the timer store +type TimerRecord struct { + TimerSpec + // ID is the id of timer, it is unique and auto assigned by the store when created. + ID string + // Watermark indicates the progress the timer's event schedule + Watermark time.Time + // EventStatus indicates the current schedule status of the timer's event + EventStatus SchedEventStatus + // EventID indicates the id of current triggered event + // If the `EventStatus` is `IDLE`, this value should be empty + EventID string + // EventData indicates the data of current triggered event + // If the `EventStatus` is `IDLE`, this value should be empty + EventData []byte + // EventStart indicates the start time of current triggered event + // If the `EventStatus` is `IDLE`, `EventStart.IsZero()` should returns true + EventStart time.Time + // SummaryData is a binary which is used to store some summary information of the timer. + // User can update it when closing a timer's event to update the summary. + SummaryData []byte + // CreateTime is the creation time of the timer + CreateTime time.Time +} + +// Clone returns a cloned TimerRecord +func (r *TimerRecord) Clone() *TimerRecord { + cloned := *r + cloned.TimerSpec = *r.TimerSpec.Clone() + return &cloned +}