Skip to content

Commit

Permalink
api: support to query whether pd has loaded region (#8749)
Browse files Browse the repository at this point in the history
close #8426, close #8748

Signed-off-by: lhy1024 <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
lhy1024 and ti-chi-bot[bot] authored Dec 19, 2024
1 parent c5812ee commit da4b43a
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 19 deletions.
59 changes: 44 additions & 15 deletions pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

clientv3 "go.etcd.io/etcd/client/v3"

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"

"github.com/tikv/pd/pkg/core"
Expand Down Expand Up @@ -75,13 +76,21 @@ func NewRegionStorageWithLevelDBBackend(

// TODO: support other KV storage backends like BadgerDB in the future.

type regionSource int

const (
unloaded regionSource = iota
fromEtcd
fromLeveldb
)

type coreStorage struct {
Storage
regionStorage endpoint.RegionStorage

useRegionStorage int32
regionLoaded bool
mu syncutil.Mutex
useRegionStorage atomic.Bool
regionLoaded regionSource
mu syncutil.RWMutex
}

// NewCoreStorage creates a new core storage with the given default and region storage.
Expand All @@ -92,6 +101,7 @@ func NewCoreStorage(defaultStorage Storage, regionStorage endpoint.RegionStorage
return &coreStorage{
Storage: defaultStorage,
regionStorage: regionStorage,
regionLoaded: unloaded,
}
}

Expand All @@ -118,12 +128,12 @@ func TrySwitchRegionStorage(s Storage, useLocalRegionStorage bool) endpoint.Regi
if useLocalRegionStorage {
// Switch the region storage to regionStorage, all region info will be read/saved by the internal
// regionStorage, and in most cases it's LevelDB-backend.
atomic.StoreInt32(&ps.useRegionStorage, 1)
ps.useRegionStorage.Store(true)
return ps.regionStorage
}
// Switch the region storage to defaultStorage, all region info will be read/saved by the internal
// defaultStorage, and in most cases it's etcd-backend.
atomic.StoreInt32(&ps.useRegionStorage, 0)
ps.useRegionStorage.Store(false)
return ps.Storage
}

Expand All @@ -135,48 +145,53 @@ func TryLoadRegionsOnce(ctx context.Context, s Storage, f func(region *core.Regi
return s.LoadRegions(ctx, f)
}

if atomic.LoadInt32(&ps.useRegionStorage) == 0 {
return ps.Storage.LoadRegions(ctx, f)
}

ps.mu.Lock()
defer ps.mu.Unlock()
if !ps.regionLoaded {

if !ps.useRegionStorage.Load() {
err := ps.Storage.LoadRegions(ctx, f)
if err == nil {
ps.regionLoaded = fromEtcd
}
return err
}

if ps.regionLoaded == unloaded {
if err := ps.regionStorage.LoadRegions(ctx, f); err != nil {
return err
}
ps.regionLoaded = true
ps.regionLoaded = fromLeveldb
}
return nil
}

// LoadRegion loads one region from storage.
func (ps *coreStorage) LoadRegion(regionID uint64, region *metapb.Region) (ok bool, err error) {
if atomic.LoadInt32(&ps.useRegionStorage) > 0 {
if ps.useRegionStorage.Load() {
return ps.regionStorage.LoadRegion(regionID, region)
}
return ps.Storage.LoadRegion(regionID, region)
}

// LoadRegions loads all regions from storage to RegionsInfo.
func (ps *coreStorage) LoadRegions(ctx context.Context, f func(region *core.RegionInfo) []*core.RegionInfo) error {
if atomic.LoadInt32(&ps.useRegionStorage) > 0 {
if ps.useRegionStorage.Load() {
return ps.regionStorage.LoadRegions(ctx, f)
}
return ps.Storage.LoadRegions(ctx, f)
}

// SaveRegion saves one region to storage.
func (ps *coreStorage) SaveRegion(region *metapb.Region) error {
if atomic.LoadInt32(&ps.useRegionStorage) > 0 {
if ps.useRegionStorage.Load() {
return ps.regionStorage.SaveRegion(region)
}
return ps.Storage.SaveRegion(region)
}

// DeleteRegion deletes one region from storage.
func (ps *coreStorage) DeleteRegion(region *metapb.Region) error {
if atomic.LoadInt32(&ps.useRegionStorage) > 0 {
if ps.useRegionStorage.Load() {
return ps.regionStorage.DeleteRegion(region)
}
return ps.Storage.DeleteRegion(region)
Expand All @@ -199,3 +214,17 @@ func (ps *coreStorage) Close() error {
}
return nil
}

// AreRegionsLoaded returns whether the regions are loaded.
func AreRegionsLoaded(s Storage) bool {
ps := s.(*coreStorage)
ps.mu.RLock()
defer ps.mu.RUnlock()
failpoint.Inject("loadRegionSlow", func() {
failpoint.Return(false)
})
if ps.useRegionStorage.Load() {
return ps.regionLoaded == fromLeveldb
}
return ps.regionLoaded == fromEtcd
}
8 changes: 4 additions & 4 deletions server/api/health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,22 +50,22 @@ func TestHealthSlice(t *testing.T) {
re := require.New(t)
cfgs, svrs, clean := mustNewCluster(re, 3)
defer clean()
var leader, follow *server.Server
var leader, follower *server.Server

for _, svr := range svrs {
if !svr.IsClosed() && svr.GetMember().IsLeader() {
leader = svr
} else {
follow = svr
follower = svr
}
}
mustBootstrapCluster(re, leader)
addr := leader.GetConfig().ClientUrls + apiPrefix + "/api/v1/health"
follow.Close()
follower.Close()
resp, err := testDialClient.Get(addr)
re.NoError(err)
defer resp.Body.Close()
buf, err := io.ReadAll(resp.Body)
re.NoError(err)
checkSliceResponse(re, buf, cfgs, follow.GetConfig().Name)
checkSliceResponse(re, buf, cfgs, follower.GetConfig().Name)
}
59 changes: 59 additions & 0 deletions server/apiv2/handlers/ready.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright 2024 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package handlers

import (
"net/http"

"github.com/gin-gonic/gin"

"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/server"
"github.com/tikv/pd/server/apiv2/middlewares"
)

// ReadyStatus reflects the cluster's ready status.
// NOTE: This type is exported by HTTP API. Please pay more attention when modifying it.
type ReadyStatus struct {
RegionLoaded bool `json:"region_loaded"`
}

// @Summary It will return whether pd follower is ready to became leader.
// @Router /ready [get]
// @Param verbose query bool false "Whether to return details."
// @Success 200
// @Failure 500
func Ready(c *gin.Context) {
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
s := svr.GetStorage()
regionLoaded := storage.AreRegionsLoaded(s)
if regionLoaded {
c.Status(http.StatusOK)
} else {
c.Status(http.StatusInternalServerError)
}

if _, ok := c.GetQuery("verbose"); !ok {
return
}
resp := &ReadyStatus{
RegionLoaded: regionLoaded,
}
if regionLoaded {
c.IndentedJSON(http.StatusOK, resp)
} else {
c.AbortWithStatusJSON(http.StatusInternalServerError, resp)
}
}
1 change: 1 addition & 0 deletions server/apiv2/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func NewV2Handler(_ context.Context, svr *server.Server) (http.Handler, apiutil.
})
router.Use(middlewares.Redirector())
root := router.Group(apiV2Prefix)
root.GET("ready", handlers.Ready)
handlers.RegisterKeyspace(root)
handlers.RegisterTSOKeyspaceGroup(root)
handlers.RegisterMicroService(root)
Expand Down
72 changes: 72 additions & 0 deletions tests/server/apiv2/handlers/ready_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright 2024 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package handlers

import (
"context"
"encoding/json"
"io"
"net/http"
"testing"

"github.com/stretchr/testify/require"

"github.com/pingcap/failpoint"

tu "github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/server/apiv2/handlers"
"github.com/tikv/pd/tests"
)

func TestReady(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cluster, err := tests.NewTestCluster(ctx, 1)
re.NoError(err)
defer cluster.Destroy()
re.NoError(cluster.RunInitialServers())
re.NotEmpty(cluster.WaitLeader())
server := cluster.GetLeaderServer()
re.NoError(server.BootstrapCluster())
url := server.GetConfig().ClientUrls + v2Prefix + "/ready"
failpoint.Enable("github.com/tikv/pd/pkg/storage/loadRegionSlow", `return()`)
checkReady(re, url, false)
failpoint.Disable("github.com/tikv/pd/pkg/storage/loadRegionSlow")
checkReady(re, url, true)
}

func checkReady(re *require.Assertions, url string, isReady bool) {
expectCode := http.StatusOK
if !isReady {
expectCode = http.StatusInternalServerError
}
resp, err := tests.TestDialClient.Get(url)
re.NoError(err)
defer resp.Body.Close()
buf, err := io.ReadAll(resp.Body)
re.NoError(err)
re.Empty(buf)
re.Equal(expectCode, resp.StatusCode)
r := &handlers.ReadyStatus{}
if isReady {
r.RegionLoaded = true
}
data, err := json.Marshal(r)
re.NoError(err)
err = tu.CheckGetJSON(tests.TestDialClient, url+"?verbose", data,
tu.Status(re, expectCode))
re.NoError(err)
}
1 change: 1 addition & 0 deletions tests/server/apiv2/handlers/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
)

const (
v2Prefix = "/pd/api/v2"
keyspacesPrefix = "/pd/api/v2/keyspaces"
keyspaceGroupsPrefix = "/pd/api/v2/tso/keyspace-groups"
)
Expand Down

0 comments on commit da4b43a

Please sign in to comment.