Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

timer: add user interfaces for timer framework #43837

Merged
merged 1 commit into from
May 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions build/nogo_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,8 @@
"extension/": "extension code",
"resourcemanager/": "resourcemanager code",
"keyspace/": "keyspace code",
"owner/": "owner code"
"owner/": "owner code",
"timer/": "timer code"
}
},
"shift": {
Expand Down Expand Up @@ -977,7 +978,8 @@
"keyspace/": "keyspace code",
"server/": "server code",
"owner/": "owner code",
"meta": "meta code"
"meta": "meta code",
"timer/": "timer code"
}
},
"SA1029": {
Expand Down
31 changes: 31 additions & 0 deletions timer/api/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
lcwangchao marked this conversation as resolved.
Show resolved Hide resolved

go_library(
lcwangchao marked this conversation as resolved.
Show resolved Hide resolved
name = "api",
srcs = [
"client.go",
"hook.go",
"store.go",
"timer.go",
],
importpath = "github.com/pingcap/tidb/timer/api",
visibility = ["//visibility:public"],
deps = [
"//parser/duration",
"@com_github_pingcap_errors//:errors",
],
)

go_test(
name = "api_test",
timeout = "short",
srcs = [
"client_test.go",
"schedule_policy_test.go",
"store_test.go",
],
embed = [":api"],
flaky = True,
shard_count = 7,
deps = ["@com_github_stretchr_testify//require"],
)
hawkingrei marked this conversation as resolved.
Show resolved Hide resolved
98 changes: 98 additions & 0 deletions timer/api/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package api

import (
"context"
"time"
)

// GetTimerOption is the option to get timers
type GetTimerOption func(*TimerCond)

// WithKey indicates to get a timer with the specified key
func WithKey(key string) GetTimerOption {
return func(cond *TimerCond) {
cond.Key.Set(key)
cond.KeyPrefix = false
}
}

// WithKeyPrefix to get timers with the indicated key prefix
func WithKeyPrefix(keyPrefix string) GetTimerOption {
return func(cond *TimerCond) {
cond.Key.Set(keyPrefix)
cond.KeyPrefix = true
}
}

// WithID indicates to get a timer with the specified id
func WithID(id string) GetTimerOption {
return func(cond *TimerCond) {
cond.ID.Set(id)
}
}

// UpdateTimerOption is the option to update the timer
type UpdateTimerOption func(*TimerUpdate)

// WithSetEnable indicates to set the timer's `Enable` field
func WithSetEnable(enable bool) UpdateTimerOption {
return func(update *TimerUpdate) {
update.Enable.Set(enable)
}
}

// WithSetSchedExpr indicates to set the timer's schedule policy
func WithSetSchedExpr(tp SchedPolicyType, expr string) UpdateTimerOption {
return func(update *TimerUpdate) {
update.SchedPolicyType.Set(tp)
update.SchedPolicyExpr.Set(expr)
}
}

// WithSetWatermark indicates to set the timer's watermark
func WithSetWatermark(watermark time.Time) UpdateTimerOption {
return func(update *TimerUpdate) {
update.Watermark.Set(watermark)
}
}

// WithSetSummaryData indicates to set the timer's summary
func WithSetSummaryData(summary []byte) UpdateTimerOption {
return func(update *TimerUpdate) {
update.SummaryData.Set(summary)
}
}

// TimerClient is an interface exposed to user to manage timers
type TimerClient interface {
// GetDefaultNamespace returns the default namespace of this client
GetDefaultNamespace() string
// CreateTimer creates a new timer
CreateTimer(ctx context.Context, spec TimerSpec) (*TimerRecord, error)
// GetTimerByID queries the timer by ID
GetTimerByID(ctx context.Context, timerID string) (*TimerRecord, error)
// GetTimerByKey queries the timer by key
GetTimerByKey(ctx context.Context, key string) (*TimerRecord, error)
// GetTimers queries timers by options
GetTimers(ctx context.Context, opt ...GetTimerOption) ([]*TimerRecord, error)
// UpdateTimer updates a timer
UpdateTimer(ctx context.Context, timerID string, opt ...UpdateTimerOption) error
// CloseTimerEvent closes the triggering event of a timer
CloseTimerEvent(ctx context.Context, timerID string, eventID string, opts ...UpdateTimerOption) error
// DeleteTimer deletes a timer
DeleteTimer(ctx context.Context, timerID string) (bool, error)
}
120 changes: 120 additions & 0 deletions timer/api/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package api

import (
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestGetTimerOption(t *testing.T) {
var cond TimerCond
require.Empty(t, cond.FieldsSet())

// test 'Key' field
require.False(t, cond.Key.Present())

WithKey("k1")(&cond)
key, ok := cond.Key.Get()
require.True(t, ok)
require.Equal(t, "k1", key)
require.False(t, cond.KeyPrefix)
require.Equal(t, []string{"Key"}, cond.FieldsSet())

WithKeyPrefix("k2")(&cond)
key, ok = cond.Key.Get()
require.True(t, ok)
require.Equal(t, "k2", key)
require.True(t, cond.KeyPrefix)
require.Equal(t, []string{"Key"}, cond.FieldsSet())

WithKey("k3")(&cond)
key, ok = cond.Key.Get()
require.True(t, ok)
require.Equal(t, "k3", key)
require.False(t, cond.KeyPrefix)
require.Equal(t, []string{"Key"}, cond.FieldsSet())

// test 'ID' field
require.False(t, cond.ID.Present())

WithID("id1")(&cond)
id, ok := cond.ID.Get()
require.True(t, ok)
require.Equal(t, "id1", id)
require.Equal(t, []string{"ID", "Key"}, cond.FieldsSet())
}

func TestUpdateTimerOption(t *testing.T) {
var update TimerUpdate
require.Empty(t, update)

// test 'Enable' field
require.False(t, update.Enable.Present())

WithSetEnable(true)(&update)
setEnable, ok := update.Enable.Get()
require.True(t, ok)
require.True(t, setEnable)
require.Equal(t, []string{"Enable"}, update.FieldsSet())

WithSetEnable(false)(&update)
setEnable, ok = update.Enable.Get()
require.True(t, ok)
require.False(t, setEnable)
require.Equal(t, []string{"Enable"}, update.FieldsSet())

// test schedule policy
require.False(t, update.SchedPolicyType.Present())
require.False(t, update.SchedPolicyExpr.Present())

WithSetSchedExpr(SchedEventInterval, "3h")(&update)
stp, ok := update.SchedPolicyType.Get()
require.True(t, ok)
require.Equal(t, SchedEventInterval, stp)
expr, ok := update.SchedPolicyExpr.Get()
require.True(t, ok)
require.Equal(t, "3h", expr)
require.Equal(t, []string{"Enable", "SchedPolicyType", "SchedPolicyExpr"}, update.FieldsSet())

WithSetSchedExpr(SchedEventInterval, "1h")(&update)
stp, ok = update.SchedPolicyType.Get()
require.True(t, ok)
require.Equal(t, SchedEventInterval, stp)
expr, ok = update.SchedPolicyExpr.Get()
require.True(t, ok)
require.Equal(t, "1h", expr)
require.Equal(t, []string{"Enable", "SchedPolicyType", "SchedPolicyExpr"}, update.FieldsSet())

// test 'Watermark' field
require.False(t, update.Watermark.Present())

WithSetWatermark(time.Unix(1234, 5678))(&update)
watermark, ok := update.Watermark.Get()
require.True(t, ok)
require.Equal(t, time.Unix(1234, 5678), watermark)
require.Equal(t, []string{"Enable", "SchedPolicyType", "SchedPolicyExpr", "Watermark"}, update.FieldsSet())

// test 'SummaryData' field
require.False(t, update.SummaryData.Present())

WithSetSummaryData([]byte("hello"))(&update)
summary, ok := update.SummaryData.Get()
require.True(t, ok)
require.Equal(t, []byte("hello"), summary)
require.Equal(t, []string{"Enable", "SchedPolicyType", "SchedPolicyExpr", "Watermark", "SummaryData"}, update.FieldsSet())
}
59 changes: 59 additions & 0 deletions timer/api/hook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package api

import (
"context"
"time"
)

// TimerShedEvent is an interface which gives the timer's schedule event's information
type TimerShedEvent interface {
// EventID returns the event ID the current event
EventID() string
// Timer returns the timer record object of the current event
Timer() *TimerRecord
}

// PreSchedEventResult is the result of `OnPreSchedEvent`
type PreSchedEventResult struct {
// Delay indicates to delay the event after a while.
// If `Delay` is 0, it means no delay, and then `OnSchedEvent` will be called.
// Otherwise, after a while according to `Delay`, `OnPreSchedEvent` will be called again to
// check whether to trigger the event.
Delay time.Duration
// EventData indicates the data should be passed to the event that should be triggered.
// EventData can be used to store some pre-computed configurations of the next event
EventData []byte
}

// Hook is an interface which should be implemented by user to tell framework how to trigger an event
// Several timers with a same hook class can share one hook in a runtime
type Hook interface {
// Start starts the hook
Start()
// Stop stops the hook. When it is called, this means the framework is shutting down
Stop()
// OnPreSchedEvent will be called before triggering a new event. It's return value tells the next action of the triggering.
// For example, if `TimerShedEvent.Delay` is a non-zero value, the event triggering will be postponed.
// Notice that `event.Timer().EventID` will be empty because the current event is not actually triggered,
// use `event.EventID()` to get the event id instead.
OnPreSchedEvent(ctx context.Context, event TimerShedEvent) (*TimerShedEvent, error)
// OnSchedEvent will be called when a new event is triggered.
OnSchedEvent(ctx context.Context, event TimerShedEvent) error
}

// HookFactory is the factory function to construct a new Hook object with `hookClass`
type HookFactory func(hookClass string, cli TimerClient) Hook
67 changes: 67 additions & 0 deletions timer/api/schedule_policy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package api

import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestIntervalPolicy(t *testing.T) {
watermark1 := time.Now()
watermark2, err := time.Parse(time.RFC3339, "2021-11-21T11:21:31Z")
require.NoError(t, err)

cases := []struct {
expr string
err bool
interval time.Duration
}{
{
expr: "6m",
interval: 6 * time.Minute,
},
{
expr: "7h",
interval: 7 * time.Hour,
},
{
expr: "8d",
interval: 8 * 24 * time.Hour,
},
{
expr: "11",
err: true,
},
}

for _, c := range cases {
p, err := NewSchedIntervalPolicy(c.expr)
if c.err {
require.ErrorContains(t, err, fmt.Sprintf("invalid schedule event expr '%s'", c.expr))
continue
}
require.NoError(t, err)
tm, ok := p.NextEventTime(watermark1)
require.True(t, ok)
require.Equal(t, watermark1.Add(c.interval), tm)
tm, ok = p.NextEventTime(watermark2)
require.True(t, ok)
require.Equal(t, watermark2.Add(c.interval), tm)
}
}
Loading