Skip to content

Commit

Permalink
resourcemanager: return resource-group priority in OnRequestWait (#7378
Browse files Browse the repository at this point in the history
…) (#7383)

close #7379, ref tikv/tikv#15994

Signed-off-by: glorv <[email protected]>
Signed-off-by: JmPotato <[email protected]>

Co-authored-by: glorv <[email protected]>
Co-authored-by: JmPotato <[email protected]>
  • Loading branch information
3 people authored Nov 16, 2023
1 parent d0a17ca commit ef6ba85
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 28 deletions.
17 changes: 9 additions & 8 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ const (
// ResourceGroupKVInterceptor is used as quota limit controller for resource group using kv store.
type ResourceGroupKVInterceptor interface {
// OnRequestWait is used to check whether resource group has enough tokens. It maybe needs to wait some time.
OnRequestWait(ctx context.Context, resourceGroupName string, info RequestInfo) (*rmpb.Consumption, *rmpb.Consumption, error)
OnRequestWait(ctx context.Context, resourceGroupName string, info RequestInfo) (*rmpb.Consumption, *rmpb.Consumption, uint32, error)
// OnResponse is used to consume tokens after receiving response.
OnResponse(resourceGroupName string, req RequestInfo, resp ResponseInfo) (*rmpb.Consumption, error)
// IsBackgroundRequest If the resource group has background jobs, we should not record consumption and wait for it.
Expand Down Expand Up @@ -171,12 +171,13 @@ func loadServerConfig(ctx context.Context, provider ResourceGroupProvider) (*Con
if err != nil {
return nil, err
}
if len(resp.Kvs) == 0 {
kvs := resp.GetKvs()
if len(kvs) == 0 {
log.Warn("[resource group controller] server does not save config, load config failed")
return DefaultConfig(), nil
}
config := &Config{}
err = json.Unmarshal(resp.Kvs[0].GetValue(), config)
err = json.Unmarshal(kvs[0].GetValue(), config)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -525,10 +526,10 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context,
// OnRequestWait is used to check whether resource group has enough tokens. It maybe needs to wait some time.
func (c *ResourceGroupsController) OnRequestWait(
ctx context.Context, resourceGroupName string, info RequestInfo,
) (*rmpb.Consumption, *rmpb.Consumption, error) {
) (*rmpb.Consumption, *rmpb.Consumption, uint32, error) {
gc, err := c.tryGetResourceGroup(ctx, resourceGroupName)
if err != nil {
return nil, nil, err
return nil, nil, 0, err
}
return gc.onRequestWait(ctx, info)
}
Expand Down Expand Up @@ -1175,7 +1176,7 @@ func (gc *groupCostController) calcRequest(counter *tokenCounter) float64 {

func (gc *groupCostController) onRequestWait(
ctx context.Context, info RequestInfo,
) (*rmpb.Consumption, *rmpb.Consumption, error) {
) (*rmpb.Consumption, *rmpb.Consumption, uint32, error) {
delta := &rmpb.Consumption{}
for _, calc := range gc.calculators {
calc.BeforeKVRequest(delta, info)
Expand Down Expand Up @@ -1225,7 +1226,7 @@ func (gc *groupCostController) onRequestWait(
failpoint.Inject("triggerUpdate", func() {
gc.lowRUNotifyChan <- struct{}{}
})
return nil, nil, err
return nil, nil, 0, err
}
gc.successfulRequestDuration.Observe(d.Seconds())
}
Expand All @@ -1244,7 +1245,7 @@ func (gc *groupCostController) onRequestWait(
*gc.mu.storeCounter[info.StoreID()] = *gc.mu.globalCounter
gc.mu.Unlock()

return delta, penalty, nil
return delta, penalty, gc.meta.Priority, nil
}

func (gc *groupCostController) onResponse(
Expand Down
8 changes: 5 additions & 3 deletions client/resource_group/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ import (

func createTestGroupCostController(re *require.Assertions) *groupCostController {
group := &rmpb.ResourceGroup{
Name: "test",
Mode: rmpb.GroupMode_RUMode,
Name: "test",
Mode: rmpb.GroupMode_RUMode,
Priority: 1,
RUSettings: &rmpb.GroupRequestUnitSettings{
RU: &rmpb.TokenBucket{
Settings: &rmpb.TokenLimitSettings{
Expand Down Expand Up @@ -100,8 +101,9 @@ func TestRequestAndResponseConsumption(t *testing.T) {
kvCalculator := gc.getKVCalculator()
for idx, testCase := range testCases {
caseNum := fmt.Sprintf("case %d", idx)
consumption, _, err := gc.onRequestWait(context.TODO(), testCase.req)
consumption, _, priority, err := gc.onRequestWait(context.TODO(), testCase.req)
re.NoError(err, caseNum)
re.Equal(priority, gc.meta.Priority)
expectedConsumption := &rmpb.Consumption{}
if testCase.req.IsWrite() {
kvCalculator.calculateWriteCost(expectedConsumption, testCase.req)
Expand Down
34 changes: 17 additions & 17 deletions tests/integrations/mcs/resourcemanager/resource_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,9 +438,9 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() {
rres := cas.tcs[i].makeReadResponse()
wres := cas.tcs[i].makeWriteResponse()
startTime := time.Now()
_, _, err := controller.OnRequestWait(suite.ctx, cas.resourceGroupName, rreq)
_, _, _, err := controller.OnRequestWait(suite.ctx, cas.resourceGroupName, rreq)
re.NoError(err)
_, _, err = controller.OnRequestWait(suite.ctx, cas.resourceGroupName, wreq)
_, _, _, err = controller.OnRequestWait(suite.ctx, cas.resourceGroupName, wreq)
re.NoError(err)
sum += time.Since(startTime)
controller.OnResponse(cas.resourceGroupName, rreq, rres)
Expand All @@ -457,7 +457,7 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() {
re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/triggerUpdate", "return(true)"))
tcs := tokenConsumptionPerSecond{rruTokensAtATime: 1, wruTokensAtATime: 900000000, times: 1, waitDuration: 0}
wreq := tcs.makeWriteRequest()
_, _, err = controller.OnRequestWait(suite.ctx, rg.Name, wreq)
_, _, _, err = controller.OnRequestWait(suite.ctx, rg.Name, wreq)
re.Error(err)
time.Sleep(time.Millisecond * 200)
re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/triggerUpdate"))
Expand Down Expand Up @@ -512,9 +512,9 @@ func (suite *resourceManagerClientTestSuite) TestSwitchBurst() {
wreq := tcs.makeWriteRequest()
rres := tcs.makeReadResponse()
wres := tcs.makeWriteResponse()
_, _, err := controller.OnRequestWait(suite.ctx, resourceGroupName, rreq)
_, _, _, err := controller.OnRequestWait(suite.ctx, resourceGroupName, rreq)
re.NoError(err)
_, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName, wreq)
_, _, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName, wreq)
re.NoError(err)
controller.OnResponse(resourceGroupName, rreq, rres)
controller.OnResponse(resourceGroupName, wreq, wres)
Expand Down Expand Up @@ -551,9 +551,9 @@ func (suite *resourceManagerClientTestSuite) TestSwitchBurst() {
rres := cas.tcs[i].makeReadResponse()
wres := cas.tcs[i].makeWriteResponse()
startTime := time.Now()
_, _, err := controller.OnRequestWait(suite.ctx, resourceGroupName, rreq)
_, _, _, err := controller.OnRequestWait(suite.ctx, resourceGroupName, rreq)
re.NoError(err)
_, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName, wreq)
_, _, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName, wreq)
re.NoError(err)
sum += time.Since(startTime)
controller.OnResponse(resourceGroupName, rreq, rres)
Expand All @@ -571,22 +571,22 @@ func (suite *resourceManagerClientTestSuite) TestSwitchBurst() {
resourceGroupName2 := suite.initGroups[2].Name
tcs = tokenConsumptionPerSecond{rruTokensAtATime: 1, wruTokensAtATime: 100000, times: 1, waitDuration: 0}
wreq := tcs.makeWriteRequest()
_, _, err := controller.OnRequestWait(suite.ctx, resourceGroupName2, wreq)
_, _, _, err := controller.OnRequestWait(suite.ctx, resourceGroupName2, wreq)
re.NoError(err)

re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/acceleratedSpeedTrend", "return(true)"))
resourceGroupName3 := suite.initGroups[3].Name
tcs = tokenConsumptionPerSecond{rruTokensAtATime: 1, wruTokensAtATime: 1000, times: 1, waitDuration: 0}
wreq = tcs.makeWriteRequest()
_, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName3, wreq)
_, _, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName3, wreq)
re.NoError(err)
time.Sleep(110 * time.Millisecond)
tcs = tokenConsumptionPerSecond{rruTokensAtATime: 1, wruTokensAtATime: 10, times: 1010, waitDuration: 0}
duration := time.Duration(0)
for i := 0; i < tcs.times; i++ {
wreq = tcs.makeWriteRequest()
startTime := time.Now()
_, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName3, wreq)
_, _, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName3, wreq)
duration += time.Since(startTime)
re.NoError(err)
}
Expand Down Expand Up @@ -635,7 +635,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() {
// init
req := controller.NewTestRequestInfo(false, 0, 2 /* store2 */)
resp := controller.NewTestResponseInfo(0, time.Duration(30), true)
_, penalty, err := c.OnRequestWait(suite.ctx, resourceGroupName, req)
_, penalty, _, err := c.OnRequestWait(suite.ctx, resourceGroupName, req)
re.NoError(err)
re.Equal(penalty.WriteBytes, 0.0)
re.Equal(penalty.TotalCpuTimeMs, 0.0)
Expand All @@ -644,7 +644,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() {

req = controller.NewTestRequestInfo(true, 60, 1 /* store1 */)
resp = controller.NewTestResponseInfo(0, time.Duration(10), true)
_, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req)
_, penalty, _, err = c.OnRequestWait(suite.ctx, resourceGroupName, req)
re.NoError(err)
re.Equal(penalty.WriteBytes, 0.0)
re.Equal(penalty.TotalCpuTimeMs, 0.0)
Expand All @@ -654,7 +654,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() {
// failed request, shouldn't be counted in penalty
req = controller.NewTestRequestInfo(true, 20, 1 /* store1 */)
resp = controller.NewTestResponseInfo(0, time.Duration(0), false)
_, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req)
_, penalty, _, err = c.OnRequestWait(suite.ctx, resourceGroupName, req)
re.NoError(err)
re.Equal(penalty.WriteBytes, 0.0)
re.Equal(penalty.TotalCpuTimeMs, 0.0)
Expand All @@ -664,7 +664,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() {
// from same store, should be zero
req1 := controller.NewTestRequestInfo(false, 0, 1 /* store1 */)
resp1 := controller.NewTestResponseInfo(0, time.Duration(10), true)
_, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req1)
_, penalty, _, err = c.OnRequestWait(suite.ctx, resourceGroupName, req1)
re.NoError(err)
re.Equal(penalty.WriteBytes, 0.0)
_, err = c.OnResponse(resourceGroupName, req1, resp1)
Expand All @@ -673,7 +673,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() {
// from different store, should be non-zero
req2 := controller.NewTestRequestInfo(true, 50, 2 /* store2 */)
resp2 := controller.NewTestResponseInfo(0, time.Duration(10), true)
_, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req2)
_, penalty, _, err = c.OnRequestWait(suite.ctx, resourceGroupName, req2)
re.NoError(err)
re.Equal(penalty.WriteBytes, 60.0)
re.InEpsilon(penalty.TotalCpuTimeMs, 10.0/1000.0/1000.0, 1e-6)
Expand All @@ -683,7 +683,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() {
// from new store, should be zero
req3 := controller.NewTestRequestInfo(true, 0, 3 /* store3 */)
resp3 := controller.NewTestResponseInfo(0, time.Duration(10), true)
_, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req3)
_, penalty, _, err = c.OnRequestWait(suite.ctx, resourceGroupName, req3)
re.NoError(err)
re.Equal(penalty.WriteBytes, 0.0)
_, err = c.OnResponse(resourceGroupName, req3, resp3)
Expand All @@ -693,7 +693,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() {
resourceGroupName = groupNames[1]
req4 := controller.NewTestRequestInfo(true, 50, 1 /* store2 */)
resp4 := controller.NewTestResponseInfo(0, time.Duration(10), true)
_, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req4)
_, penalty, _, err = c.OnRequestWait(suite.ctx, resourceGroupName, req4)
re.NoError(err)
re.Equal(penalty.WriteBytes, 0.0)
_, err = c.OnResponse(resourceGroupName, req4, resp4)
Expand Down

0 comments on commit ef6ba85

Please sign in to comment.