Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: overload_control/flow_control:smooth_limiter issues related to accuracy and efficiency #185 #186

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions trpc/overload_control/flow_control/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,10 @@ cc_test(
"//trpc/overload_control/flow_control:flow_test.yaml",
],
deps = [
"//trpc/common:trpc_plugin",
":flow_controller_factory",
":flow_controller_server_filter",
"//trpc/codec/testing:protocol_testing",
"//trpc/common:trpc_plugin",
"@com_google_googletest//:gtest_main",
],
)
Expand Down Expand Up @@ -218,8 +218,7 @@ cc_library(
],
deps = [
":flow_controller",
":hit_queue",
":tick_timer",
":recent_queue",
"//trpc/overload_control/common:report",
"//trpc/util/log:logging",
],
Expand Down Expand Up @@ -260,3 +259,28 @@ cc_test(
"@com_google_googletest//:gtest_main",
],
)

cc_library(
name = "recent_queue",
srcs = ["recent_queue.cc"],
hdrs = ["recent_queue.h"],
defines = [] +
select({
"//trpc:trpc_include_overload_control": ["TRPC_BUILD_INCLUDE_OVERLOAD_CONTROL"],
"//conditions:default": [],
}),
visibility = ["//visibility:private"],
deps = [
"//trpc/util:time",
],
)

cc_test(
name = "recent_queue_test",
srcs = ["recent_queue_test.cc"],
deps = [
":recent_queue",
"@com_google_googletest//:gtest",
"@com_google_googletest//:gtest_main",
],
)
50 changes: 50 additions & 0 deletions trpc/overload_control/flow_control/recent_queue.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
//
//
// Tencent is pleased to support the open source community by making tRPC available.
//
// Copyright (C) 2024 THL A29 Limited, a Tencent company.
// All rights reserved.
//
// If you have downloaded a copy of the tRPC source code from Tencent,
// please note that tRPC source code is licensed under the Apache 2.0 License,
// A copy of the Apache 2.0 License is included in this file.
//
//

#ifdef TRPC_BUILD_INCLUDE_OVERLOAD_CONTROL

#include "trpc/overload_control/flow_control/recent_queue.h"

namespace trpc::overload_control {

RecentQueue::RecentQueue(int64_t limit, uint64_t window_size) : limit_(limit), window_size_(window_size) {
cache_.resize(limit);
}

bool RecentQueue::Add() {
std::unique_lock<std::mutex> lock(mutex_);
auto now = trpc::time::GetSteadyNanoSeconds();
if (now - window_size_ < cache_[cur_]) {
return false;
}
cache_[cur_] = now;
cur_ = (cur_ + 1) % limit_;
return true;
}

int64_t RecentQueue::ActiveCount() {
std::unique_lock<std::mutex> lock(mutex_);
auto past = trpc::time::GetSteadyNanoSeconds() - window_size_;
auto split = cache_.begin() + cur_;
if (past < *split) {
return limit_;
}
if (cache_.back() <= past) {
return split - std::upper_bound(cache_.begin(), split, past);
}
return split - std::upper_bound(split, cache_.end(), past) + limit_;
}

} // namespace trpc::overload_control

#endif
52 changes: 52 additions & 0 deletions trpc/overload_control/flow_control/recent_queue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
//
//
// Tencent is pleased to support the open source community by making tRPC available.
//
// Copyright (C) 2024 THL A29 Limited, a Tencent company.
// All rights reserved.
//
// If you have downloaded a copy of the tRPC source code from Tencent,
// please note that tRPC source code is licensed under the Apache 2.0 License,
// A copy of the Apache 2.0 License is included in this file.
//
//

#ifdef TRPC_BUILD_INCLUDE_OVERLOAD_CONTROL

#pragma once

#include <algorithm>
#include <memory>
#include <mutex>
#include <vector>

#include "trpc/util/time.h"

namespace trpc::overload_control {

class RecentQueue {
public:
explicit RecentQueue(int64_t limit, uint64_t window_size);

bool Add();

int64_t ActiveCount();

private:
std::vector<uint64_t> cache_;
std::mutex mutex_;

// The location for cache eviction is also where the new timestamp is.
int64_t cur_{0};

// Maximum number of requests per second.
int64_t limit_;
// The time window size for cache eviction.
uint64_t window_size_;
};

using RecentQueuePtr = std::unique_ptr<RecentQueue>;

} // namespace trpc::overload_control

#endif
67 changes: 67 additions & 0 deletions trpc/overload_control/flow_control/recent_queue_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
//
//
// Tencent is pleased to support the open source community by making tRPC available.
//
// Copyright (C) 2024 THL A29 Limited, a Tencent company.
// All rights reserved.
//
// If you have downloaded a copy of the tRPC source code from Tencent,
// please note that tRPC source code is licensed under the Apache 2.0 License,
// A copy of the Apache 2.0 License is included in this file.
//
//

#ifdef TRPC_BUILD_INCLUDE_OVERLOAD_CONTROL

#include "trpc/overload_control/flow_control/recent_queue.h"

#include <memory>

#include "gmock/gmock.h"
#include "gtest/gtest.h"

namespace trpc::overload_control {

namespace testing {

class RecentQueueTest : public ::testing::Test {
protected:
void SetUp() override { seq_queue_ = std::make_unique<RecentQueue>(3, static_cast<uint64_t>(1e9)); }

void TearDown() override { seq_queue_ = nullptr; }

protected:
RecentQueuePtr seq_queue_;
};

TEST_F(RecentQueueTest, Add) {
for (int i{0}; i < 3; ++i) {
ASSERT_EQ(seq_queue_->Add(), true);
ASSERT_EQ(seq_queue_->Add(), true);
ASSERT_EQ(seq_queue_->Add(), true);
ASSERT_EQ(seq_queue_->Add(), false);
ASSERT_EQ(seq_queue_->Add(), false);
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}

TEST_F(RecentQueueTest, ActiveCount) {
for (int i{0}; i < 3; ++i) {
ASSERT_EQ(seq_queue_->ActiveCount(), 0);
ASSERT_EQ(seq_queue_->Add(), true);
ASSERT_EQ(seq_queue_->Add(), true);
ASSERT_EQ(seq_queue_->ActiveCount(), 2);
ASSERT_EQ(seq_queue_->Add(), true);
ASSERT_EQ(seq_queue_->ActiveCount(), 3);
ASSERT_EQ(seq_queue_->Add(), false);
ASSERT_EQ(seq_queue_->Add(), false);
ASSERT_EQ(seq_queue_->ActiveCount(), 3);
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}

} // namespace testing

} // namespace trpc::overload_control

#endif
33 changes: 5 additions & 28 deletions trpc/overload_control/flow_control/smooth_limiter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,58 +14,35 @@
#ifdef TRPC_BUILD_INCLUDE_OVERLOAD_CONTROL

#include "trpc/overload_control/flow_control/smooth_limiter.h"

#include <chrono>
#include <cmath>
#include <cstdint>

#include "trpc/overload_control/common/report.h"
#include "trpc/util/log/logging.h"

namespace trpc::overload_control {

SmoothLimiter::SmoothLimiter(int64_t limit, bool is_report, int32_t window_size)
: limit_(limit),
is_report_(is_report),
window_size_(window_size <= 0 ? kDefaultNumFps : window_size),
hit_queue_(window_size_),
tick_timer_(std::chrono::microseconds(static_cast<int64_t>(std::ceil(1000000.0 / window_size_))),
[this] { OnNextFrame(); }) {
recent_queue_(std::make_unique<RecentQueue>(limit, nsecs_per_sec_)) {
TRPC_ASSERT(window_size_ > 0);
}

SmoothLimiter::~SmoothLimiter() { tick_timer_.Deactivate(); }

bool SmoothLimiter::CheckLimit(const ServerContextPtr& context) {
bool ret = false;
int64_t active_sum = hit_queue_.ActiveSum();
int64_t hit_num = 0;
if (active_sum >= limit_) {
ret = true;
} else {
hit_num = hit_queue_.AddHit();
if (hit_num > limit_) {
ret = true;
}
}
bool ret = !recent_queue_->Add();

if (is_report_) {
OverloadInfo infos;
infos.attr_name = "SmoothLimiter";
infos.report_name = context->GetFuncName();
infos.tags["active_sum"] = active_sum;
infos.tags["hit_num"] = hit_num;
infos.tags["active_count"] = recent_queue_->ActiveCount();
infos.tags["max_qps"] = limit_;
infos.tags["window_size"] = window_size_;
infos.tags[kOverloadctrlPass] = (ret ? 0 : 1);
infos.tags[kOverloadctrlLimited] = (ret ? 1 : 0);
Report::GetInstance()->ReportOverloadInfo(infos);
}
return ret;
}

int64_t SmoothLimiter::GetCurrCounter() { return hit_queue_.ActiveSum(); }

void SmoothLimiter::OnNextFrame() { hit_queue_.NextFrame(); }
int64_t SmoothLimiter::GetCurrCounter() { return recent_queue_->ActiveCount(); }

} // namespace trpc::overload_control

Expand Down
17 changes: 5 additions & 12 deletions trpc/overload_control/flow_control/smooth_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@
#include <string>

#include "trpc/overload_control/flow_control/flow_controller.h"
#include "trpc/overload_control/flow_control/hit_queue.h"
#include "trpc/overload_control/flow_control/tick_timer.h"
#include "trpc/overload_control/flow_control/recent_queue.h"

namespace trpc {
namespace overload_control {
Expand All @@ -34,8 +33,6 @@ class SmoothLimiter : public FlowController {
public:
explicit SmoothLimiter(int64_t limit, bool is_report = false, int32_t window_size = kDefaultNumFps);

virtual ~SmoothLimiter();

// Check if the request is restricted.
bool CheckLimit(const ServerContextPtr& check_param) override;

Expand All @@ -45,21 +42,17 @@ class SmoothLimiter : public FlowController {
// Get the current maximum request limit of the flow controller.
int64_t GetMaxCounter() const override { return limit_; }

protected:
// Call when the next time slice arrives.
void OnNextFrame();

protected:
// Maximum number of requests per second.
int64_t limit_;
// Whether to report monitoring data.
bool is_report_{false};
// Window size
int32_t window_size_{kDefaultNumFps};
// Hit queue
HitQueue hit_queue_;
// Timer implemented using system clock ticks.
TickTimer tick_timer_;

RecentQueuePtr recent_queue_;
// Nanoseconds per second, 1s = 10^9 ns
static constexpr auto nsecs_per_sec_ = static_cast<uint64_t>(1e9);
};
} // namespace overload_control

Expand Down
19 changes: 19 additions & 0 deletions trpc/overload_control/flow_control/smooth_limiter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,25 @@ TEST_F(SmoothLimiterTest, CheckLimit) {
std::this_thread::sleep_for(std::chrono::seconds(1));
}

TEST_F(SmoothLimiterTest, Overload) {
ServerContextPtr context = MakeRefCounted<ServerContext>();
ProtocolPtr req_msg = std::make_shared<TrpcRequestProtocol>();
context->SetRequestMsg(std::move(req_msg));
context->SetFuncName("trpc.test.flow_control.smooth_limiter");

ASSERT_EQ(smooth_limiter_->GetMaxCounter(), 3);
for (int i{0}; i < 4; ++i) {
ASSERT_EQ(smooth_limiter_->GetCurrCounter(), 0);
ASSERT_EQ(smooth_limiter_->CheckLimit(context), false);
ASSERT_EQ(smooth_limiter_->CheckLimit(context), false);
ASSERT_EQ(smooth_limiter_->GetCurrCounter(), 2);
ASSERT_EQ(smooth_limiter_->CheckLimit(context), false);
ASSERT_EQ(smooth_limiter_->CheckLimit(context), true);
ASSERT_EQ(smooth_limiter_->CheckLimit(context), true);
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}

} // namespace testing

} // namespace trpc::overload_control
Expand Down
Loading