-
Notifications
You must be signed in to change notification settings - Fork 726
Store Limit
Before introducing the store limit, PD can only control the global scheduling speed, which inevitably has some limitations. Sometimes we have to reduce the impact of scheduling on a store by limiting the global speed. In addition, for online and offline scenarios, it is difficult to make a trade-off between stability and efficiency only by adjusting the global scheduling speed.
Store limit is designed to control the scheduling speed more finely and avoid the performance problems caused by scheduling.
Store limit implements the speed limit at the level of the store by maintaining a mapping from stores to token buckets in memory. Different steps contained in the scheduling will correspond to different token buckets. For example, AddPeer
and RemovePeer
are two kinds of scheduling steps which are limited through two token buckets. Every time an operator
(the basic unit of scheduling, including several scheduling steps) is generated, it first checks whether there are enough tokens in the token bucket corresponding to the scheduling steps contained in the 'operator'. Only if there are enough tokens, the operator
can be added into the scheduled queue. Otherwise, the operator
will be discarded. If the operator can be added, the corresponding number of tokens, which represents the cost of the step, will be taken from the corresponding token bucket. And each token bucket will supplement the token according to a fixed rate.
The difference between store limit and other PD limit
related parameters (such as region schedule limit
, leader schedule limit
, etc.) is that the store limit limits the consumption speed of operator
, while other limit
mainly limits the generation speed of operator
.
The initialization of the Store limit is performed during the initialization of OperatorController
. PD will create a map to maintain the mapping from uint64
to StoreLimit
which is a token bucket as mentioned above. Type
represents different scheduling steps (AddPeer
and RemovePeer
).
func NewOperatorController(ctx context.Context, cluster opt.Cluster, hbStreams opt.HeartbeatStreams) *OperatorController {
return &OperatorController{
...
storesLimit: make(map[uint64]map[storelimit.Type]*storelimit.StoreLimit),
...
}
}
The actual initialization is carried out during the scheduling. Since the limit
of stores are the memory state, if the limit
of a store does not exist, it will be initialized according to the default value. If a PD is restarted, it will be recreated in memory according to the previously persistent limit
configuration. In addition, we also associate the filter
and limit
through the AttachAvailableFunc
function, so that when scheduling is generated, unnecessary operator
can be reduced through filter
.
func (oc *OperatorController) getOrCreateStoreLimit(storeID uint64, limitType storelimit.Type) *storelimit.StoreLimit {
if oc.storesLimit[storeID][limitType] == nil {
ratePerSec := oc.cluster.GetStoreLimitByType(storeID, limitType) / StoreBalanceBaseTime
oc.newStoreLimit(storeID, ratePerSec, limitType)
oc.cluster.AttachAvailableFunc(storeID, limitType, func() bool {
oc.RLock()
defer oc.RUnlock()
if oc.storesLimit[storeID][limitType] == nil {
return true
}
return oc.storesLimit[storeID][limitType].Available() >= storelimit.RegionInfluence[limitType]
})
}
ratePerSec := oc.cluster.GetStoreLimitByType(storeID, limitType) / StoreBalanceBaseTime
if ratePerSec != oc.storesLimit[storeID][limitType].Rate() {
oc.newStoreLimit(storeID, ratePerSec, limitType)
}
return oc.storesLimit[storeID][limitType]
}
Every time an operator
is created, exceedStoreLimit
will be called to check whether the limit
of the store is exceeded. exceedStoreLimit
mainly does two things. One is to determine whether the impact of each scheduling step contained in the current operator
on a store can be ignored. The other is if it can not be ignored, it is necessary to determine whether there are enough tokens in the token bucket of the store to allow the execution of the scheduling step.
func (oc *OperatorController) exceedStoreLimit(ops ...*operator.Operator) bool {
...
for _, v := range storelimit.TypeNameValue {
stepCost := opInfluence.GetStoreInfluence(storeID).GetStepCost(v)
if stepCost == 0 {
continue
}
if oc.getOrCreateStoreLimit(storeID, v).Available() < stepCost {
return true
}
}
...
return false
}
Whether an operator
has an opportiuty to be executed relies on the return value. If it makes the store exceed the its limit, it will be discarded.
For more details, See operator_controller.go.