Skip to content

Commit

Permalink
timer: add user interfaces for timer framework (#43837)
Browse files Browse the repository at this point in the history
close #43836
  • Loading branch information
lcwangchao authored May 17, 2023
1 parent 4486008 commit b22d23e
Show file tree
Hide file tree
Showing 9 changed files with 934 additions and 2 deletions.
6 changes: 4 additions & 2 deletions build/nogo_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,8 @@
"extension/": "extension code",
"resourcemanager/": "resourcemanager code",
"keyspace/": "keyspace code",
"owner/": "owner code"
"owner/": "owner code",
"timer/": "timer code"
}
},
"shift": {
Expand Down Expand Up @@ -978,7 +979,8 @@
"keyspace/": "keyspace code",
"server/": "server code",
"owner/": "owner code",
"meta": "meta code"
"meta": "meta code",
"timer/": "timer code"
}
},
"SA1029": {
Expand Down
31 changes: 31 additions & 0 deletions timer/api/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "api",
srcs = [
"client.go",
"hook.go",
"store.go",
"timer.go",
],
importpath = "github.com/pingcap/tidb/timer/api",
visibility = ["//visibility:public"],
deps = [
"//parser/duration",
"@com_github_pingcap_errors//:errors",
],
)

go_test(
name = "api_test",
timeout = "short",
srcs = [
"client_test.go",
"schedule_policy_test.go",
"store_test.go",
],
embed = [":api"],
flaky = True,
shard_count = 7,
deps = ["@com_github_stretchr_testify//require"],
)
98 changes: 98 additions & 0 deletions timer/api/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package api

import (
"context"
"time"
)

// GetTimerOption is the option to get timers
type GetTimerOption func(*TimerCond)

// WithKey indicates to get a timer with the specified key
func WithKey(key string) GetTimerOption {
return func(cond *TimerCond) {
cond.Key.Set(key)
cond.KeyPrefix = false
}
}

// WithKeyPrefix to get timers with the indicated key prefix
func WithKeyPrefix(keyPrefix string) GetTimerOption {
return func(cond *TimerCond) {
cond.Key.Set(keyPrefix)
cond.KeyPrefix = true
}
}

// WithID indicates to get a timer with the specified id
func WithID(id string) GetTimerOption {
return func(cond *TimerCond) {
cond.ID.Set(id)
}
}

// UpdateTimerOption is the option to update the timer
type UpdateTimerOption func(*TimerUpdate)

// WithSetEnable indicates to set the timer's `Enable` field
func WithSetEnable(enable bool) UpdateTimerOption {
return func(update *TimerUpdate) {
update.Enable.Set(enable)
}
}

// WithSetSchedExpr indicates to set the timer's schedule policy
func WithSetSchedExpr(tp SchedPolicyType, expr string) UpdateTimerOption {
return func(update *TimerUpdate) {
update.SchedPolicyType.Set(tp)
update.SchedPolicyExpr.Set(expr)
}
}

// WithSetWatermark indicates to set the timer's watermark
func WithSetWatermark(watermark time.Time) UpdateTimerOption {
return func(update *TimerUpdate) {
update.Watermark.Set(watermark)
}
}

// WithSetSummaryData indicates to set the timer's summary
func WithSetSummaryData(summary []byte) UpdateTimerOption {
return func(update *TimerUpdate) {
update.SummaryData.Set(summary)
}
}

// TimerClient is an interface exposed to user to manage timers
type TimerClient interface {
// GetDefaultNamespace returns the default namespace of this client
GetDefaultNamespace() string
// CreateTimer creates a new timer
CreateTimer(ctx context.Context, spec TimerSpec) (*TimerRecord, error)
// GetTimerByID queries the timer by ID
GetTimerByID(ctx context.Context, timerID string) (*TimerRecord, error)
// GetTimerByKey queries the timer by key
GetTimerByKey(ctx context.Context, key string) (*TimerRecord, error)
// GetTimers queries timers by options
GetTimers(ctx context.Context, opt ...GetTimerOption) ([]*TimerRecord, error)
// UpdateTimer updates a timer
UpdateTimer(ctx context.Context, timerID string, opt ...UpdateTimerOption) error
// CloseTimerEvent closes the triggering event of a timer
CloseTimerEvent(ctx context.Context, timerID string, eventID string, opts ...UpdateTimerOption) error
// DeleteTimer deletes a timer
DeleteTimer(ctx context.Context, timerID string) (bool, error)
}
120 changes: 120 additions & 0 deletions timer/api/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package api

import (
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestGetTimerOption(t *testing.T) {
var cond TimerCond
require.Empty(t, cond.FieldsSet())

// test 'Key' field
require.False(t, cond.Key.Present())

WithKey("k1")(&cond)
key, ok := cond.Key.Get()
require.True(t, ok)
require.Equal(t, "k1", key)
require.False(t, cond.KeyPrefix)
require.Equal(t, []string{"Key"}, cond.FieldsSet())

WithKeyPrefix("k2")(&cond)
key, ok = cond.Key.Get()
require.True(t, ok)
require.Equal(t, "k2", key)
require.True(t, cond.KeyPrefix)
require.Equal(t, []string{"Key"}, cond.FieldsSet())

WithKey("k3")(&cond)
key, ok = cond.Key.Get()
require.True(t, ok)
require.Equal(t, "k3", key)
require.False(t, cond.KeyPrefix)
require.Equal(t, []string{"Key"}, cond.FieldsSet())

// test 'ID' field
require.False(t, cond.ID.Present())

WithID("id1")(&cond)
id, ok := cond.ID.Get()
require.True(t, ok)
require.Equal(t, "id1", id)
require.Equal(t, []string{"ID", "Key"}, cond.FieldsSet())
}

func TestUpdateTimerOption(t *testing.T) {
var update TimerUpdate
require.Empty(t, update)

// test 'Enable' field
require.False(t, update.Enable.Present())

WithSetEnable(true)(&update)
setEnable, ok := update.Enable.Get()
require.True(t, ok)
require.True(t, setEnable)
require.Equal(t, []string{"Enable"}, update.FieldsSet())

WithSetEnable(false)(&update)
setEnable, ok = update.Enable.Get()
require.True(t, ok)
require.False(t, setEnable)
require.Equal(t, []string{"Enable"}, update.FieldsSet())

// test schedule policy
require.False(t, update.SchedPolicyType.Present())
require.False(t, update.SchedPolicyExpr.Present())

WithSetSchedExpr(SchedEventInterval, "3h")(&update)
stp, ok := update.SchedPolicyType.Get()
require.True(t, ok)
require.Equal(t, SchedEventInterval, stp)
expr, ok := update.SchedPolicyExpr.Get()
require.True(t, ok)
require.Equal(t, "3h", expr)
require.Equal(t, []string{"Enable", "SchedPolicyType", "SchedPolicyExpr"}, update.FieldsSet())

WithSetSchedExpr(SchedEventInterval, "1h")(&update)
stp, ok = update.SchedPolicyType.Get()
require.True(t, ok)
require.Equal(t, SchedEventInterval, stp)
expr, ok = update.SchedPolicyExpr.Get()
require.True(t, ok)
require.Equal(t, "1h", expr)
require.Equal(t, []string{"Enable", "SchedPolicyType", "SchedPolicyExpr"}, update.FieldsSet())

// test 'Watermark' field
require.False(t, update.Watermark.Present())

WithSetWatermark(time.Unix(1234, 5678))(&update)
watermark, ok := update.Watermark.Get()
require.True(t, ok)
require.Equal(t, time.Unix(1234, 5678), watermark)
require.Equal(t, []string{"Enable", "SchedPolicyType", "SchedPolicyExpr", "Watermark"}, update.FieldsSet())

// test 'SummaryData' field
require.False(t, update.SummaryData.Present())

WithSetSummaryData([]byte("hello"))(&update)
summary, ok := update.SummaryData.Get()
require.True(t, ok)
require.Equal(t, []byte("hello"), summary)
require.Equal(t, []string{"Enable", "SchedPolicyType", "SchedPolicyExpr", "Watermark", "SummaryData"}, update.FieldsSet())
}
59 changes: 59 additions & 0 deletions timer/api/hook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package api

import (
"context"
"time"
)

// TimerShedEvent is an interface which gives the timer's schedule event's information
type TimerShedEvent interface {
// EventID returns the event ID the current event
EventID() string
// Timer returns the timer record object of the current event
Timer() *TimerRecord
}

// PreSchedEventResult is the result of `OnPreSchedEvent`
type PreSchedEventResult struct {
// Delay indicates to delay the event after a while.
// If `Delay` is 0, it means no delay, and then `OnSchedEvent` will be called.
// Otherwise, after a while according to `Delay`, `OnPreSchedEvent` will be called again to
// check whether to trigger the event.
Delay time.Duration
// EventData indicates the data should be passed to the event that should be triggered.
// EventData can be used to store some pre-computed configurations of the next event
EventData []byte
}

// Hook is an interface which should be implemented by user to tell framework how to trigger an event
// Several timers with a same hook class can share one hook in a runtime
type Hook interface {
// Start starts the hook
Start()
// Stop stops the hook. When it is called, this means the framework is shutting down
Stop()
// OnPreSchedEvent will be called before triggering a new event. It's return value tells the next action of the triggering.
// For example, if `TimerShedEvent.Delay` is a non-zero value, the event triggering will be postponed.
// Notice that `event.Timer().EventID` will be empty because the current event is not actually triggered,
// use `event.EventID()` to get the event id instead.
OnPreSchedEvent(ctx context.Context, event TimerShedEvent) (*TimerShedEvent, error)
// OnSchedEvent will be called when a new event is triggered.
OnSchedEvent(ctx context.Context, event TimerShedEvent) error
}

// HookFactory is the factory function to construct a new Hook object with `hookClass`
type HookFactory func(hookClass string, cli TimerClient) Hook
67 changes: 67 additions & 0 deletions timer/api/schedule_policy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package api

import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestIntervalPolicy(t *testing.T) {
watermark1 := time.Now()
watermark2, err := time.Parse(time.RFC3339, "2021-11-21T11:21:31Z")
require.NoError(t, err)

cases := []struct {
expr string
err bool
interval time.Duration
}{
{
expr: "6m",
interval: 6 * time.Minute,
},
{
expr: "7h",
interval: 7 * time.Hour,
},
{
expr: "8d",
interval: 8 * 24 * time.Hour,
},
{
expr: "11",
err: true,
},
}

for _, c := range cases {
p, err := NewSchedIntervalPolicy(c.expr)
if c.err {
require.ErrorContains(t, err, fmt.Sprintf("invalid schedule event expr '%s'", c.expr))
continue
}
require.NoError(t, err)
tm, ok := p.NextEventTime(watermark1)
require.True(t, ok)
require.Equal(t, watermark1.Add(c.interval), tm)
tm, ok = p.NextEventTime(watermark2)
require.True(t, ok)
require.Equal(t, watermark2.Add(c.interval), tm)
}
}
Loading

0 comments on commit b22d23e

Please sign in to comment.