Skip to content

Commit

Permalink
enhance: Enable balance on querynode with different mem capacity (mil…
Browse files Browse the repository at this point in the history
…vus-io#36466)

issue: milvus-io#36464
This PR enable balance on querynode with different mem capacity, for
query node which has more mem capactity will be assigned more records,
and query node with the largest difference between assignedScore and
currentScore will have a higher priority to carry the new segment.

Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 authored Sep 30, 2024
1 parent 00a5025 commit 470bb0c
Show file tree
Hide file tree
Showing 9 changed files with 250 additions and 64 deletions.
1 change: 1 addition & 0 deletions internal/proto/query_coord.proto
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,7 @@ message GetDataDistributionResponse {
repeated ChannelVersionInfo channels = 4;
repeated LeaderView leader_views = 5;
int64 lastModifyTs = 6;
double memCapacityInMB = 7;
}

message LeaderView {
Expand Down
26 changes: 14 additions & 12 deletions internal/querycoordv2/balance/channel_level_score_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,15 @@ func (b *ChannelLevelScoreBalancer) genStoppingSegmentPlan(replica *meta.Replica

func (b *ChannelLevelScoreBalancer) genSegmentPlan(replica *meta.Replica, channelName string, onlineNodes []int64) []SegmentAssignPlan {
segmentDist := make(map[int64][]*meta.Segment)
nodeScore := b.convertToNodeItems(replica.GetCollectionID(), onlineNodes)
totalScore := 0
nodeItemsMap := b.convertToNodeItems(replica.GetCollectionID(), onlineNodes)
if len(nodeItemsMap) == 0 {
return nil
}
log.Info("node workload status",
zap.Int64("collectionID", replica.GetCollectionID()),
zap.Int64("replicaID", replica.GetID()),
zap.String("channelName", channelName),
zap.Stringers("nodes", lo.Values(nodeItemsMap)))

// list all segment which could be balanced, and calculate node's score
for _, node := range onlineNodes {
Expand All @@ -171,19 +178,14 @@ func (b *ChannelLevelScoreBalancer) genSegmentPlan(replica *meta.Replica, channe
return b.targetMgr.CanSegmentBeMoved(segment.GetCollectionID(), segment.GetID())
})
segmentDist[node] = segments
totalScore += nodeScore[node].getPriority()
}

if totalScore == 0 {
return nil
}

// find the segment from the node which has more score than the average
segmentsToMove := make([]*meta.Segment, 0)
average := totalScore / len(onlineNodes)
for node, segments := range segmentDist {
leftScore := nodeScore[node].getPriority()
if leftScore <= average {
currentScore := nodeItemsMap[node].getCurrentScore()
assignedScore := nodeItemsMap[node].getAssignedScore()
if currentScore <= assignedScore {
continue
}

Expand All @@ -192,8 +194,8 @@ func (b *ChannelLevelScoreBalancer) genSegmentPlan(replica *meta.Replica, channe
})
for _, s := range segments {
segmentsToMove = append(segmentsToMove, s)
leftScore -= b.calculateSegmentScore(s)
if leftScore <= average {
currentScore -= b.calculateSegmentScore(s)
if currentScore <= assignedScore {
break
}
}
Expand Down
60 changes: 48 additions & 12 deletions internal/querycoordv2/balance/rowcount_based_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package balance

import (
"context"
"fmt"
"math"
"sort"

Expand Down Expand Up @@ -77,9 +78,8 @@ func (b *RowCountBasedBalancer) AssignSegment(collectionID int64, segments []*me
if len(plans) > balanceBatchSize {
break
}
// change node's priority and push back
p := ni.getPriority()
ni.setPriority(p + int(s.GetNumOfRows()))
// change node's score and push back
ni.AddCurrentScoreDelta(float64(s.GetNumOfRows()))
queue.push(ni)
}
return plans
Expand Down Expand Up @@ -119,9 +119,8 @@ func (b *RowCountBasedBalancer) AssignChannel(channels []*meta.DmChannel, nodes
Channel: c,
}
plans = append(plans, plan)
// change node's priority and push back
p := ni.getPriority()
ni.setPriority(p + 1)
// change node's score and push back
ni.AddCurrentScoreDelta(1)
queue.push(ni)
}
return plans
Expand Down Expand Up @@ -366,14 +365,51 @@ func NewRowCountBasedBalancer(

type nodeItem struct {
baseItem
nodeID int64
fmt.Stringer
nodeID int64
assignedScore float64
currentScore float64
}

func newNodeItem(priority int, nodeID int64) nodeItem {
func newNodeItem(currentScore int, nodeID int64) nodeItem {
return nodeItem{
baseItem: baseItem{
priority: priority,
},
nodeID: nodeID,
baseItem: baseItem{},
nodeID: nodeID,
currentScore: float64(currentScore),
}
}

func (b *nodeItem) getPriority() int {
// if node lacks more score between assignedScore and currentScore, then higher priority
return int(b.currentScore - b.assignedScore)
}

func (b *nodeItem) setPriority(priority int) {
panic("not supported, use updatePriority instead")
}

func (b *nodeItem) getPriorityWithCurrentScoreDelta(delta float64) int {
return int((b.currentScore + delta) - b.assignedScore)
}

func (b *nodeItem) getCurrentScore() float64 {
return b.currentScore
}

func (b *nodeItem) AddCurrentScoreDelta(delta float64) {
b.currentScore += delta
b.priority = b.getPriority()
}

func (b *nodeItem) getAssignedScore() float64 {
return b.assignedScore
}

func (b *nodeItem) setAssignedScore(delta float64) {
b.assignedScore += delta
b.priority = b.getPriority()
}

func (b *nodeItem) String() string {
return fmt.Sprintf("{NodeID: %d, AssignedScore: %f, CurrentScore: %f, Priority: %d}", b.nodeID, b.assignedScore, b.currentScore, b.priority)
}
84 changes: 53 additions & 31 deletions internal/querycoordv2/balance/score_based_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func (b *ScoreBasedBalancer) AssignSegment(collectionID int64, segments []*meta.
if len(nodeItemsMap) == 0 {
return nil
}
log.Info("node workload status", zap.Int64("collectionID", collectionID), zap.Stringers("nodes", lo.Values(nodeItemsMap)))

queue := newPriorityQueue()
for _, item := range nodeItemsMap {
Expand All @@ -89,13 +90,13 @@ func (b *ScoreBasedBalancer) AssignSegment(collectionID int64, segments []*meta.
targetNode := queue.pop().(*nodeItem)
// make sure candidate is always push back
defer queue.push(targetNode)
priorityChange := b.calculateSegmentScore(s)
scoreChanges := b.calculateSegmentScore(s)

sourceNode := nodeItemsMap[s.Node]
// if segment's node exist, which means this segment comes from balancer. we should consider the benefit
// if the segment reassignment doesn't got enough benefit, we should skip this reassignment
// notice: we should skip benefit check for manual balance
if !manualBalance && sourceNode != nil && !b.hasEnoughBenefit(sourceNode, targetNode, priorityChange) {
if !manualBalance && sourceNode != nil && !b.hasEnoughBenefit(sourceNode, targetNode, scoreChanges) {
return
}

Expand All @@ -112,15 +113,15 @@ func (b *ScoreBasedBalancer) AssignSegment(collectionID int64, segments []*meta.
Segment: s,
FromScore: fromScore,
ToScore: int64(targetNode.getPriority()),
SegmentScore: int64(priorityChange),
SegmentScore: int64(scoreChanges),
}
plans = append(plans, plan)

// update the sourceNode and targetNode's score
if sourceNode != nil {
sourceNode.setPriority(sourceNode.getPriority() - priorityChange)
sourceNode.AddCurrentScoreDelta(-scoreChanges)
}
targetNode.setPriority(targetNode.getPriority() + priorityChange)
targetNode.AddCurrentScoreDelta(scoreChanges)
}(s)

if len(plans) > balanceBatchSize {
Expand All @@ -130,21 +131,21 @@ func (b *ScoreBasedBalancer) AssignSegment(collectionID int64, segments []*meta.
return plans
}

func (b *ScoreBasedBalancer) hasEnoughBenefit(sourceNode *nodeItem, targetNode *nodeItem, priorityChange int) bool {
func (b *ScoreBasedBalancer) hasEnoughBenefit(sourceNode *nodeItem, targetNode *nodeItem, scoreChanges float64) bool {
// if the score diff between sourceNode and targetNode is lower than the unbalance toleration factor, there is no need to assign it targetNode
oldScoreDiff := math.Abs(float64(sourceNode.getPriority()) - float64(targetNode.getPriority()))
if oldScoreDiff < float64(targetNode.getPriority())*params.Params.QueryCoordCfg.ScoreUnbalanceTolerationFactor.GetAsFloat() {
oldPriorityDiff := math.Abs(float64(sourceNode.getPriority()) - float64(targetNode.getPriority()))
if oldPriorityDiff < float64(targetNode.getPriority())*params.Params.QueryCoordCfg.ScoreUnbalanceTolerationFactor.GetAsFloat() {
return false
}

newSourceScore := sourceNode.getPriority() - priorityChange
newTargetScore := targetNode.getPriority() + priorityChange
if newTargetScore > newSourceScore {
newSourcePriority := sourceNode.getPriorityWithCurrentScoreDelta(-scoreChanges)
newTargetPriority := targetNode.getPriorityWithCurrentScoreDelta(scoreChanges)
if newTargetPriority > newSourcePriority {
// if score diff reverted after segment reassignment, we will consider the benefit
// only trigger following segment reassignment when the generated reverted score diff
// is far smaller than the original score diff
newScoreDiff := math.Abs(float64(newSourceScore) - float64(newTargetScore))
if newScoreDiff*params.Params.QueryCoordCfg.ReverseUnbalanceTolerationFactor.GetAsFloat() >= oldScoreDiff {
newScoreDiff := math.Abs(float64(newSourcePriority) - float64(newTargetPriority))
if newScoreDiff*params.Params.QueryCoordCfg.ReverseUnbalanceTolerationFactor.GetAsFloat() >= oldPriorityDiff {
return false
}
}
Expand All @@ -155,25 +156,48 @@ func (b *ScoreBasedBalancer) hasEnoughBenefit(sourceNode *nodeItem, targetNode *
func (b *ScoreBasedBalancer) convertToNodeItems(collectionID int64, nodeIDs []int64) map[int64]*nodeItem {
totalScore := 0
nodeScoreMap := make(map[int64]*nodeItem)
nodeMemMap := make(map[int64]float64)
totalMemCapacity := float64(0)
allNodeHasMemInfo := true
for _, node := range nodeIDs {
score := b.calculateScore(collectionID, node)
nodeItem := newNodeItem(score, node)
nodeScoreMap[node] = &nodeItem
totalScore += score

// set memory default to 1.0, will multiply average value to compute assigned score
nodeInfo := b.nodeManager.Get(node)
if nodeInfo != nil {
totalMemCapacity += nodeInfo.MemCapacity()
nodeMemMap[node] = nodeInfo.MemCapacity()
}
allNodeHasMemInfo = allNodeHasMemInfo && nodeInfo != nil && nodeInfo.MemCapacity() > 0
}

if totalScore == 0 {
return nodeScoreMap
}

average := totalScore / len(nodeIDs)
// if all node has memory info, we will use totalScore / totalMemCapacity to calculate the score, then average means average score on memory unit
// otherwise, we will use totalScore / len(nodeItemsMap) to calculate the score, then average means average score on node unit
average := float64(0)
if allNodeHasMemInfo {
average = float64(totalScore) / totalMemCapacity
} else {
average = float64(totalScore) / float64(len(nodeIDs))
}

delegatorOverloadFactor := params.Params.QueryCoordCfg.DelegatorMemoryOverloadFactor.GetAsFloat()
// use average * delegatorOverloadFactor * delegator_num, to preserve fixed memory size for delegator
for _, node := range nodeIDs {
if allNodeHasMemInfo {
nodeScoreMap[node].setAssignedScore(nodeMemMap[node] * average)
} else {
nodeScoreMap[node].setAssignedScore(average)
}
// use assignedScore * delegatorOverloadFactor * delegator_num, to preserve fixed memory size for delegator
collectionViews := b.dist.LeaderViewManager.GetByFilter(meta.WithCollectionID2LeaderView(collectionID), meta.WithNodeID2LeaderView(node))
if len(collectionViews) > 0 {
newScore := nodeScoreMap[node].getPriority() + int(float64(average)*delegatorOverloadFactor)*len(collectionViews)
nodeScoreMap[node].setPriority(newScore)
nodeScoreMap[node].AddCurrentScoreDelta(nodeScoreMap[node].getAssignedScore() * delegatorOverloadFactor * float64(len(collectionViews)))
}
}
return nodeScoreMap
Expand Down Expand Up @@ -217,8 +241,8 @@ func (b *ScoreBasedBalancer) calculateScore(collectionID, nodeID int64) int {
}

// calculateSegmentScore calculate the score which the segment represented
func (b *ScoreBasedBalancer) calculateSegmentScore(s *meta.Segment) int {
return int(float64(s.GetNumOfRows()) * (1 + params.Params.QueryCoordCfg.GlobalRowCountFactor.GetAsFloat()))
func (b *ScoreBasedBalancer) calculateSegmentScore(s *meta.Segment) float64 {
return float64(s.GetNumOfRows()) * (1 + params.Params.QueryCoordCfg.GlobalRowCountFactor.GetAsFloat())
}

func (b *ScoreBasedBalancer) BalanceReplica(replica *meta.Replica) ([]SegmentAssignPlan, []ChannelAssignPlan) {
Expand Down Expand Up @@ -288,8 +312,10 @@ func (b *ScoreBasedBalancer) genStoppingSegmentPlan(replica *meta.Replica, onlin

func (b *ScoreBasedBalancer) genSegmentPlan(replica *meta.Replica, onlineNodes []int64) []SegmentAssignPlan {
segmentDist := make(map[int64][]*meta.Segment)
nodeScore := b.convertToNodeItems(replica.GetCollectionID(), onlineNodes)
totalScore := 0
nodeItemsMap := b.convertToNodeItems(replica.GetCollectionID(), onlineNodes)
if len(nodeItemsMap) == 0 {
return nil
}

// list all segment which could be balanced, and calculate node's score
for _, node := range onlineNodes {
Expand All @@ -298,21 +324,16 @@ func (b *ScoreBasedBalancer) genSegmentPlan(replica *meta.Replica, onlineNodes [
return b.targetMgr.CanSegmentBeMoved(segment.GetCollectionID(), segment.GetID())
})
segmentDist[node] = segments
totalScore += nodeScore[node].getPriority()
}

if totalScore == 0 {
return nil
}

balanceBatchSize := paramtable.Get().QueryCoordCfg.CollectionBalanceSegmentBatchSize.GetAsInt()

// find the segment from the node which has more score than the average
segmentsToMove := make([]*meta.Segment, 0)
average := totalScore / len(onlineNodes)
for node, segments := range segmentDist {
leftScore := nodeScore[node].getPriority()
if leftScore <= average {
currentScore := nodeItemsMap[node].getCurrentScore()
assignedScore := nodeItemsMap[node].getAssignedScore()
if currentScore <= assignedScore {
continue
}

Expand All @@ -324,8 +345,9 @@ func (b *ScoreBasedBalancer) genSegmentPlan(replica *meta.Replica, onlineNodes [
if len(segmentsToMove) >= balanceBatchSize {
break
}
leftScore -= b.calculateSegmentScore(s)
if leftScore <= average {

currentScore -= b.calculateSegmentScore(s)
if currentScore <= assignedScore {
break
}
}
Expand Down
Loading

0 comments on commit 470bb0c

Please sign in to comment.