From 63a02b057a1cccc26ca6ddec409a5bd0d465d732 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 30 Oct 2024 12:13:30 +0800 Subject: [PATCH] statistics: add gc in hot peer cache (#8702) #8750 Signed-off-by: lhy1024 --- pkg/mock/mockcluster/mockcluster.go | 5 +- pkg/schedule/schedulers/hot_region_test.go | 18 +++ pkg/statistics/hot_cache.go | 6 +- pkg/statistics/hot_peer_cache.go | 37 ++++- pkg/statistics/hot_peer_cache_test.go | 164 +++++++++++++-------- pkg/statistics/hot_stat.go | 6 +- pkg/statistics/topn.go | 21 +-- pkg/statistics/topn_test.go | 4 +- server/cluster/cluster.go | 2 +- server/cluster/cluster_test.go | 5 +- 10 files changed, 188 insertions(+), 80 deletions(-) diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index 3a6d1805af8..7ae24a428a8 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -62,10 +62,11 @@ type Cluster struct { // NewCluster creates a new Cluster func NewCluster(ctx context.Context, opts *config.PersistOptions) *Cluster { + basicCluster:= core.NewBasicCluster() clus := &Cluster{ - BasicCluster: core.NewBasicCluster(), + BasicCluster: basicCluster, IDAllocator: mockid.NewIDAllocator(), - HotStat: statistics.NewHotStat(ctx), + HotStat: statistics.NewHotStat(ctx, basicCluster), HotBucketCache: buckets.NewBucketsCache(ctx), PersistOptions: opts, suspectRegions: map[uint64]struct{}{}, diff --git a/pkg/schedule/schedulers/hot_region_test.go b/pkg/schedule/schedulers/hot_region_test.go index 09b8a6946f1..0a17e1baa58 100644 --- a/pkg/schedule/schedulers/hot_region_test.go +++ b/pkg/schedule/schedulers/hot_region_test.go @@ -1494,6 +1494,9 @@ func TestHotCacheUpdateCache(t *testing.T) { re := require.New(t) cancel, _, tc, _ := prepareSchedulersTest() defer cancel() + for i := 0; i < 3; i++ { + tc.PutStore(core.NewStoreInfo(&metapb.Store{Id: uint64(i + 1)})) + } tc.SetHotRegionCacheHitsThreshold(0) // For read flow @@ -1561,6 +1564,9 @@ func TestHotCacheKeyThresholds(t *testing.T) { { // only a few regions cancel, _, tc, _ := prepareSchedulersTest() defer cancel() + for i := 0; i < 6; i++ { + tc.PutStore(core.NewStoreInfo(&metapb.Store{Id: uint64(i + 1)})) + } tc.SetHotRegionCacheHitsThreshold(0) addRegionInfo(tc, statistics.Read, []testRegionInfo{ {1, []uint64{1, 2, 3}, 0, 1, 0}, @@ -1580,6 +1586,9 @@ func TestHotCacheKeyThresholds(t *testing.T) { { // many regions cancel, _, tc, _ := prepareSchedulersTest() defer cancel() + for i := 0; i < 3; i++ { + tc.PutStore(core.NewStoreInfo(&metapb.Store{Id: uint64(i + 1)})) + } regions := []testRegionInfo{} for i := 1; i <= 1000; i += 2 { regions = append(regions, @@ -1633,6 +1642,9 @@ func TestHotCacheByteAndKey(t *testing.T) { re := require.New(t) cancel, _, tc, _ := prepareSchedulersTest() defer cancel() + for i := 0; i < 3; i++ { + tc.PutStore(core.NewStoreInfo(&metapb.Store{Id: uint64(i + 1)})) + } tc.SetHotRegionCacheHitsThreshold(0) statistics.ThresholdsUpdateInterval = 0 defer func() { @@ -1760,6 +1772,9 @@ func TestHotCacheCheckRegionFlow(t *testing.T) { func checkHotCacheCheckRegionFlow(re *require.Assertions, testCase testHotCacheCheckRegionFlowCase, enablePlacementRules bool) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() + for i := 0; i < 3; i++ { + tc.PutStore(core.NewStoreInfo(&metapb.Store{Id: uint64(i + 1)})) + } tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) tc.SetEnablePlacementRules(enablePlacementRules) labels := []string{"zone", "host"} @@ -1835,6 +1850,9 @@ func TestHotCacheCheckRegionFlowWithDifferentThreshold(t *testing.T) { func checkHotCacheCheckRegionFlowWithDifferentThreshold(re *require.Assertions, enablePlacementRules bool) { cancel, _, tc, _ := prepareSchedulersTest() defer cancel() + for i := 0; i < 3; i++ { + tc.PutStore(core.NewStoreInfo(&metapb.Store{Id: uint64(i + 1)})) + } tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) tc.SetEnablePlacementRules(enablePlacementRules) labels := []string{"zone", "host"} diff --git a/pkg/statistics/hot_cache.go b/pkg/statistics/hot_cache.go index 42c1e6c49a7..f956e3ba5dd 100644 --- a/pkg/statistics/hot_cache.go +++ b/pkg/statistics/hot_cache.go @@ -37,11 +37,11 @@ type HotCache struct { } // NewHotCache creates a new hot spot cache. -func NewHotCache(ctx context.Context) *HotCache { +func NewHotCache(ctx context.Context, cluster *core.BasicCluster) *HotCache { w := &HotCache{ ctx: ctx, - writeCache: NewHotPeerCache(ctx, Write), - readCache: NewHotPeerCache(ctx, Read), + writeCache: NewHotPeerCache(ctx, cluster, Write), + readCache: NewHotPeerCache(ctx, cluster, Read), } go w.updateItems(w.readCache.taskQueue, w.runReadTask) go w.updateItems(w.writeCache.taskQueue, w.runWriteTask) diff --git a/pkg/statistics/hot_peer_cache.go b/pkg/statistics/hot_peer_cache.go index 16c64b752e0..eba2b5f2c58 100644 --- a/pkg/statistics/hot_peer_cache.go +++ b/pkg/statistics/hot_peer_cache.go @@ -76,6 +76,7 @@ type thresholds struct { // hotPeerCache saves the hot peer's statistics. type hotPeerCache struct { kind RWType + cluster *core.BasicCluster peersOfStore map[uint64]*TopN // storeID -> hot peers storesOfRegion map[uint64]map[uint64]struct{} // regionID -> storeIDs regionsOfStore map[uint64]map[uint64]struct{} // storeID -> regionIDs @@ -83,13 +84,14 @@ type hotPeerCache struct { taskQueue *chanx.UnboundedChan[FlowItemTask] thresholdsOfStore map[uint64]*thresholds // storeID -> thresholds metrics map[uint64][ActionTypeLen]prometheus.Gauge // storeID -> metrics - // TODO: consider to remove store info when store is offline. + lastGCTime time.Time } // NewHotPeerCache creates a hotPeerCache -func NewHotPeerCache(ctx context.Context, kind RWType) *hotPeerCache { +func NewHotPeerCache(ctx context.Context, cluster *core.BasicCluster, kind RWType) *hotPeerCache { return &hotPeerCache{ kind: kind, + cluster: cluster, peersOfStore: make(map[uint64]*TopN), storesOfRegion: make(map[uint64]map[uint64]struct{}), regionsOfStore: make(map[uint64]map[uint64]struct{}), @@ -130,6 +132,7 @@ func (f *hotPeerCache) updateStat(item *HotPeerStat) { return } f.incMetrics(item.actionType, item.StoreID) + f.gc() } func (f *hotPeerCache) incMetrics(action ActionType, storeID uint64) { @@ -560,6 +563,36 @@ func (f *hotPeerCache) removeItem(item *HotPeerStat) { } } +func (f *hotPeerCache) gc() { + if time.Since(f.lastGCTime) < f.topNTTL { + return + } + f.lastGCTime = time.Now() + // remove tombstone stores + stores := make(map[uint64]struct{}) + for _, storeID := range f.cluster.GetStores() { + stores[storeID.GetID()] = struct{}{} + } + for storeID := range f.peersOfStore { + if _, ok := stores[storeID]; !ok { + delete(f.peersOfStore, storeID) + delete(f.regionsOfStore, storeID) + delete(f.thresholdsOfStore, storeID) + delete(f.metrics, storeID) + } + } + // remove expired items + for _, peers := range f.peersOfStore { + regions := peers.RemoveExpired() + for _, regionID := range regions { + delete(f.storesOfRegion, regionID) + for storeID := range f.regionsOfStore { + delete(f.regionsOfStore[storeID], regionID) + } + } + } +} + func (f *hotPeerCache) coldItem(newItem, oldItem *HotPeerStat) { newItem.HotDegree = oldItem.HotDegree - 1 newItem.AntiCount = oldItem.AntiCount - 1 diff --git a/pkg/statistics/hot_peer_cache_test.go b/pkg/statistics/hot_peer_cache_test.go index 5e0a7f64141..01747eaa2cc 100644 --- a/pkg/statistics/hot_peer_cache_test.go +++ b/pkg/statistics/hot_peer_cache_test.go @@ -18,6 +18,7 @@ import ( "context" "math/rand" "sort" + "sync" "testing" "time" @@ -26,27 +27,11 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/mock/mockid" "github.com/tikv/pd/pkg/movingaverage" "github.com/tikv/pd/pkg/utils/typeutil" ) -func TestStoreTimeUnsync(t *testing.T) { - re := require.New(t) - cache := NewHotPeerCache(context.Background(), Write) - intervals := []uint64{120, 60} - for _, interval := range intervals { - region := buildRegion(Write, 3, interval) - checkAndUpdate(re, cache, region, 3) - { - stats := cache.RegionStats(0) - re.Len(stats, 3) - for _, s := range stats { - re.Len(s, 1) - } - } - } -} - type operator int const ( @@ -78,8 +63,9 @@ func TestCache(t *testing.T) { Read: 3, // all peers Write: 3, // all peers } - cache := NewHotPeerCache(context.Background(), test.kind) - region := buildRegion(test.kind, 3, 60) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, test.kind) + region := buildRegion(cluster, test.kind, 3, 60) checkAndUpdate(re, cache, region, defaultSize[test.kind]) checkHit(re, cache, region, test.kind, Add) // all peers are new @@ -251,23 +237,41 @@ func pickFollower(region *core.RegionInfo) (index int, peer *metapb.Peer) { return dst, meta.Peers[dst] } -func buildRegion(kind RWType, peerCount int, interval uint64) *core.RegionInfo { - peers := newPeers(peerCount, - func(i int) uint64 { return uint64(10000 + i) }, - func(i int) uint64 { return uint64(i) }) +var ( + idAllocator *mockid.IDAllocator + once sync.Once +) + +func getIDAllocator() *mockid.IDAllocator { + once.Do(func() { + idAllocator = mockid.NewIDAllocator() + }) + return idAllocator +} + +func buildRegion(cluster *core.BasicCluster, kind RWType, peerCount int, interval uint64) (region *core.RegionInfo) { + peers := make([]*metapb.Peer, 0, peerCount) + for i := 0; i < peerCount; i++ { + id, _ := getIDAllocator().Alloc() + storeID, _ := getIDAllocator().Alloc() + peers = append(peers, &metapb.Peer{ + Id: id, + StoreId: storeID, + }) + } + id, _ := getIDAllocator().Alloc() meta := &metapb.Region{ - Id: 1000, + Id: id, Peers: peers, StartKey: []byte(""), EndKey: []byte(""), RegionEpoch: &metapb.RegionEpoch{ConfVer: 6, Version: 6}, } - leader := meta.Peers[rand.Intn(3)] switch kind { case Read: - return core.NewRegionInfo( + region = core.NewRegionInfo( meta, leader, core.SetReportInterval(0, interval), @@ -276,7 +280,7 @@ func buildRegion(kind RWType, peerCount int, interval uint64) *core.RegionInfo { core.SetReadQuery(1024*interval), ) case Write: - return core.NewRegionInfo( + region = core.NewRegionInfo( meta, leader, core.SetReportInterval(0, interval), @@ -284,31 +288,21 @@ func buildRegion(kind RWType, peerCount int, interval uint64) *core.RegionInfo { core.SetWrittenKeys(10*units.MiB*interval), core.SetWrittenQuery(1024*interval), ) - default: - return nil } -} - -type genID func(i int) uint64 - -func newPeers(n int, pid genID, sid genID) []*metapb.Peer { - peers := make([]*metapb.Peer, 0, n) - for i := 1; i <= n; i++ { - peer := &metapb.Peer{ - Id: pid(i), - } - peer.StoreId = sid(i) - peers = append(peers, peer) + for _, peer := range region.GetPeers() { + cluster.PutStore(core.NewStoreInfo(&metapb.Store{Id: peer.GetStoreId()}, core.SetLastHeartbeatTS(time.Now()))) } - return peers + return region } func TestUpdateHotPeerStat(t *testing.T) { re := require.New(t) - cache := NewHotPeerCache(context.Background(), Read) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, Read) storeID, regionID := uint64(1), uint64(2) peer := &metapb.Peer{StoreId: storeID} region := core.NewRegionInfo(&metapb.Region{Id: regionID, Peers: []*metapb.Peer{peer}}, peer) + cluster.PutStore(core.NewStoreInfo(&metapb.Store{Id: storeID}, core.SetLastHeartbeatTS(time.Now()))) // we statistic read peer info from store heartbeat rather than region heartbeat m := RegionHeartBeatReportInterval / StoreHeartBeatReportInterval ThresholdsUpdateInterval = 0 @@ -399,8 +393,10 @@ func TestThresholdWithUpdateHotPeerStat(t *testing.T) { } func testMetrics(re *require.Assertions, interval, byteRate, expectThreshold float64) { - cache := NewHotPeerCache(context.Background(), Read) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, Read) storeID := uint64(1) + cluster.PutStore(core.NewStoreInfo(&metapb.Store{Id: storeID}, core.SetLastHeartbeatTS(time.Now()))) re.GreaterOrEqual(byteRate, MinHotThresholds[RegionReadBytes]) ThresholdsUpdateInterval = 0 defer func() { @@ -446,8 +442,9 @@ func TestRemoveFromCache(t *testing.T) { interval := uint64(5) checkers := []check{checkAndUpdate, checkAndUpdateWithOrdering} for _, checker := range checkers { - cache := NewHotPeerCache(context.Background(), Write) - region := buildRegion(Write, peerCount, interval) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, Write) + region := buildRegion(cluster, Write, peerCount, interval) // prepare intervalSums := make(map[uint64]int) for i := 1; i <= 200; i++ { @@ -481,8 +478,9 @@ func TestRemoveFromCacheRandom(t *testing.T) { for _, peerCount := range peerCounts { for _, interval := range intervals { for _, checker := range checkers { - cache := NewHotPeerCache(context.Background(), Write) - region := buildRegion(Write, peerCount, interval) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, Write) + region := buildRegion(cluster, Write, peerCount, interval) target := uint64(10) intervalSums := make(map[uint64]int) @@ -535,8 +533,9 @@ func checkCoolDown(re *require.Assertions, cache *hotPeerCache, region *core.Reg func TestCoolDownTransferLeader(t *testing.T) { re := require.New(t) - cache := NewHotPeerCache(context.Background(), Read) - region := buildRegion(Read, 3, 60) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, Read) + region := buildRegion(cluster, Read, 3, 60) moveLeader := func() { _, region = schedule(re, movePeer, region, 10) @@ -568,8 +567,9 @@ func TestCoolDownTransferLeader(t *testing.T) { } testCases := []func(){moveLeader, transferLeader, movePeer, addReplica, removeReplica} for _, testCase := range testCases { - cache = NewHotPeerCache(context.Background(), Read) - region = buildRegion(Read, 3, 60) + cluster = core.NewBasicCluster() + cache = NewHotPeerCache(context.Background(), cluster, Read) + region = buildRegion(cluster, Read, 3, 60) for i := 1; i <= 200; i++ { checkAndUpdate(re, cache, region) } @@ -581,8 +581,9 @@ func TestCoolDownTransferLeader(t *testing.T) { // See issue #4510 func TestCacheInherit(t *testing.T) { re := require.New(t) - cache := NewHotPeerCache(context.Background(), Read) - region := buildRegion(Read, 3, 10) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, Read) + region := buildRegion(cluster, Read, 3, 10) // prepare for i := 1; i <= 200; i++ { checkAndUpdate(re, cache, region) @@ -672,13 +673,16 @@ func TestHotPeerCacheTopNThreshold(t *testing.T) { re := require.New(t) testWithUpdateInterval := func(interval time.Duration) { ThresholdsUpdateInterval = interval - cache := NewHotPeerCache(context.Background(), Write) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, Write) now := time.Now() + storeID := uint64(1) for id := uint64(0); id < 100; id++ { meta := &metapb.Region{ Id: id, - Peers: []*metapb.Peer{{Id: id, StoreId: 1}}, + Peers: []*metapb.Peer{{Id: id, StoreId: storeID}}, } + cluster.PutStore(core.NewStoreInfo(&metapb.Store{Id: storeID}, core.SetLastHeartbeatTS(time.Now()))) region := core.NewRegionInfo(meta, meta.Peers[0], core.SetWrittenBytes(id*6000), core.SetWrittenKeys(id*6000), core.SetWrittenQuery(id*6000)) for i := 0; i < 10; i++ { start := uint64(now.Add(time.Minute * time.Duration(i)).Unix()) @@ -713,9 +717,53 @@ func TestHotPeerCacheTopNThreshold(t *testing.T) { testWithUpdateInterval(0) } +func TestRemoveExpireItems(t *testing.T) { + re := require.New(t) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, Write) + cache.topNTTL = 100 * time.Millisecond + // case1: remove expired items + region1 := buildRegion(cluster, Write, 3, 10) + checkAndUpdate(re, cache, region1) + re.NotEmpty(cache.storesOfRegion[region1.GetID()]) + time.Sleep(cache.topNTTL) + region2 := buildRegion(cluster, Write, 3, 10) + checkAndUpdate(re, cache, region2) + re.Empty(cache.storesOfRegion[region1.GetID()]) + re.NotEmpty(cache.storesOfRegion[region2.GetID()]) + time.Sleep(cache.topNTTL) + // case2: remove items when the store is not exist + re.NotNil(cache.peersOfStore[region1.GetLeader().GetStoreId()]) + re.NotNil(cache.peersOfStore[region2.GetLeader().GetStoreId()]) + cluster.ResetStores() + re.Empty(cluster.GetStores()) + region3 := buildRegion(cluster, Write, 3, 10) + checkAndUpdate(re, cache, region3) + re.Nil(cache.peersOfStore[region1.GetLeader().GetStoreId()]) + re.Nil(cache.peersOfStore[region2.GetLeader().GetStoreId()]) + re.NotEmpty(cache.regionsOfStore[region3.GetLeader().GetStoreId()]) +} + +func TestDifferentReportInterval(t *testing.T) { + re := require.New(t) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, Write) + region := buildRegion(cluster, Write, 3, 5) + for _, interval := range []uint64{120, 60, 30} { + region = region.Clone(core.SetReportInterval(0, interval)) + checkAndUpdate(re, cache, region, 3) + stats := cache.RegionStats(0) + re.Len(stats, 3) + for _, s := range stats { + re.Len(s, 1) + } + } +} + func BenchmarkCheckRegionFlow(b *testing.B) { - cache := NewHotPeerCache(context.Background(), Read) - region := buildRegion(Read, 3, 10) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, Read) + region := buildRegion(cluster, Read, 3, 10) peerInfos := make([]*core.PeerInfo, 0) for _, peer := range region.GetPeers() { peerInfo := core.NewPeerInfo(peer, region.GetLoads(), 10) diff --git a/pkg/statistics/hot_stat.go b/pkg/statistics/hot_stat.go index 9a69de23e22..d6239f15437 100644 --- a/pkg/statistics/hot_stat.go +++ b/pkg/statistics/hot_stat.go @@ -16,6 +16,8 @@ package statistics import ( "context" + + "github.com/tikv/pd/pkg/core" ) // HotStat contains cluster's hotspot statistics. @@ -25,9 +27,9 @@ type HotStat struct { } // NewHotStat creates the container to hold cluster's hotspot statistics. -func NewHotStat(ctx context.Context) *HotStat { +func NewHotStat(ctx context.Context, cluster *core.BasicCluster) *HotStat { return &HotStat{ - HotCache: NewHotCache(ctx), + HotCache: NewHotCache(ctx, cluster), StoresStats: NewStoresStats(), } } diff --git a/pkg/statistics/topn.go b/pkg/statistics/topn.go index f5b71db66d5..910bc21ab3c 100644 --- a/pkg/statistics/topn.go +++ b/pkg/statistics/topn.go @@ -97,15 +97,14 @@ func (tn *TopN) Put(item TopNItem) (isUpdate bool) { isUpdate = stn.Put(item) } tn.ttlLst.Put(item.ID()) - tn.maintain() return } // RemoveExpired deletes all expired items. -func (tn *TopN) RemoveExpired() { +func (tn *TopN) RemoveExpired() []uint64 { tn.rw.Lock() defer tn.rw.Unlock() - tn.maintain() + return tn.maintain() } // Remove deletes the item by given ID and returns it. @@ -113,19 +112,21 @@ func (tn *TopN) Remove(id uint64) (item TopNItem) { tn.rw.Lock() defer tn.rw.Unlock() for _, stn := range tn.topns { - item = stn.Remove(id) + item = stn.remove(id) } _ = tn.ttlLst.Remove(id) - tn.maintain() return } -func (tn *TopN) maintain() { - for _, id := range tn.ttlLst.TakeExpired() { +func (tn *TopN) maintain() []uint64 { + ids := make([]uint64, 0) + for _, id := range tn.ttlLst.takeExpired() { for _, stn := range tn.topns { - stn.Remove(id) + stn.remove(id) + ids = append(ids, id) } } + return ids } type singleTopN struct { @@ -179,7 +180,7 @@ func (stn *singleTopN) Put(item TopNItem) (isUpdate bool) { return } -func (stn *singleTopN) Remove(id uint64) TopNItem { +func (stn *singleTopN) remove(id uint64) TopNItem { item := stn.topn.Remove(id) if item == nil { item = stn.rest.Remove(id) @@ -344,7 +345,7 @@ func (tl *ttlList) Len() int { return tl.lst.Len() } -func (tl *ttlList) TakeExpired() []uint64 { +func (tl *ttlList) takeExpired() []uint64 { expired := []uint64{} now := time.Now() for ele := tl.lst.Front(); ele != nil; ele = tl.lst.Front() { diff --git a/pkg/statistics/topn_test.go b/pkg/statistics/topn_test.go index 6aac24103aa..8ff501a68fe 100644 --- a/pkg/statistics/topn_test.go +++ b/pkg/statistics/topn_test.go @@ -208,6 +208,7 @@ func TestTTL(t *testing.T) { putPerm(re, tn, Total, func(x int) float64 { return float64(-x) }, false /*insert*/) + re.Len(tn.GetAll(), Total) time.Sleep(900 * time.Millisecond) { @@ -217,6 +218,8 @@ func TestTTL(t *testing.T) { } re.True(tn.Put(item)) } + re.Len(tn.RemoveExpired(), (Total-1)*DimLen) + for i := 3; i < Total; i += 3 { item := &item{id: uint64(i), values: []float64{float64(-i) + 100}} for k := 1; k < DimLen; k++ { @@ -224,7 +227,6 @@ func TestTTL(t *testing.T) { } re.False(tn.Put(item)) } - tn.RemoveExpired() re.Equal(Total/3+1, tn.Len()) items := tn.GetAllTopN(0) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 3ed7718f69a..8de58590909 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -248,7 +248,7 @@ func (c *RaftCluster) InitCluster( c.core, c.opt, c.storage, c.id = basicCluster, opt, storage, id c.ctx, c.cancel = context.WithCancel(c.serverCtx) c.labelLevelStats = statistics.NewLabelStatistics() - c.hotStat = statistics.NewHotStat(c.ctx) + c.hotStat = statistics.NewHotStat(c.ctx, basicCluster) c.hotBuckets = buckets.NewBucketsCache(c.ctx) c.slowStat = statistics.NewSlowStat(c.ctx) c.progressManager = progress.NewManager() diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index f7ff71b888d..8bfb0c619de 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -589,7 +589,10 @@ func TestRegionHeartbeatHotStat(t *testing.T) { re.NoError(err) cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster()) cluster.coordinator = newCoordinator(ctx, cluster, nil) - newTestStores(4, "2.0.0") + stores := newTestStores(4, "2.0.0") + for _, store := range stores { + re.NoError(cluster.PutStore(store.GetMeta())) + } peers := []*metapb.Peer{ { Id: 1,