diff --git a/trpc/overload_control/flow_control/BUILD b/trpc/overload_control/flow_control/BUILD index 01b59d8b..f7b6c012 100644 --- a/trpc/overload_control/flow_control/BUILD +++ b/trpc/overload_control/flow_control/BUILD @@ -127,10 +127,10 @@ cc_test( "//trpc/overload_control/flow_control:flow_test.yaml", ], deps = [ - "//trpc/common:trpc_plugin", ":flow_controller_factory", ":flow_controller_server_filter", "//trpc/codec/testing:protocol_testing", + "//trpc/common:trpc_plugin", "@com_google_googletest//:gtest_main", ], ) @@ -218,8 +218,7 @@ cc_library( ], deps = [ ":flow_controller", - ":hit_queue", - ":tick_timer", + ":recent_queue", "//trpc/overload_control/common:report", "//trpc/util/log:logging", ], @@ -260,3 +259,28 @@ cc_test( "@com_google_googletest//:gtest_main", ], ) + +cc_library( + name = "recent_queue", + srcs = ["recent_queue.cc"], + hdrs = ["recent_queue.h"], + defines = [] + + select({ + "//trpc:trpc_include_overload_control": ["TRPC_BUILD_INCLUDE_OVERLOAD_CONTROL"], + "//conditions:default": [], + }), + visibility = ["//visibility:private"], + deps = [ + "//trpc/util:time", + ], +) + +cc_test( + name = "recent_queue_test", + srcs = ["recent_queue_test.cc"], + deps = [ + ":recent_queue", + "@com_google_googletest//:gtest", + "@com_google_googletest//:gtest_main", + ], +) diff --git a/trpc/overload_control/flow_control/recent_queue.cc b/trpc/overload_control/flow_control/recent_queue.cc new file mode 100644 index 00000000..b9ef6a70 --- /dev/null +++ b/trpc/overload_control/flow_control/recent_queue.cc @@ -0,0 +1,50 @@ +// +// +// Tencent is pleased to support the open source community by making tRPC available. +// +// Copyright (C) 2024 THL A29 Limited, a Tencent company. +// All rights reserved. +// +// If you have downloaded a copy of the tRPC source code from Tencent, +// please note that tRPC source code is licensed under the Apache 2.0 License, +// A copy of the Apache 2.0 License is included in this file. +// +// + +#ifdef TRPC_BUILD_INCLUDE_OVERLOAD_CONTROL + +#include "trpc/overload_control/flow_control/recent_queue.h" + +namespace trpc::overload_control { + +RecentQueue::RecentQueue(int64_t limit, uint64_t window_size) : limit_(limit), window_size_(window_size) { + cache_.resize(limit); +} + +bool RecentQueue::Add() { + std::unique_lock lock(mutex_); + auto now = trpc::time::GetSteadyNanoSeconds(); + if (now - window_size_ < cache_[cur_]) { + return false; + } + cache_[cur_] = now; + cur_ = (cur_ + 1) % limit_; + return true; +} + +int64_t RecentQueue::ActiveCount() { + std::unique_lock lock(mutex_); + auto past = trpc::time::GetSteadyNanoSeconds() - window_size_; + auto split = cache_.begin() + cur_; + if (past < *split) { + return limit_; + } + if (cache_.back() <= past) { + return split - std::upper_bound(cache_.begin(), split, past); + } + return split - std::upper_bound(split, cache_.end(), past) + limit_; +} + +} // namespace trpc::overload_control + +#endif diff --git a/trpc/overload_control/flow_control/recent_queue.h b/trpc/overload_control/flow_control/recent_queue.h new file mode 100644 index 00000000..1b8f8aae --- /dev/null +++ b/trpc/overload_control/flow_control/recent_queue.h @@ -0,0 +1,52 @@ +// +// +// Tencent is pleased to support the open source community by making tRPC available. +// +// Copyright (C) 2024 THL A29 Limited, a Tencent company. +// All rights reserved. +// +// If you have downloaded a copy of the tRPC source code from Tencent, +// please note that tRPC source code is licensed under the Apache 2.0 License, +// A copy of the Apache 2.0 License is included in this file. +// +// + +#ifdef TRPC_BUILD_INCLUDE_OVERLOAD_CONTROL + +#pragma once + +#include +#include +#include +#include + +#include "trpc/util/time.h" + +namespace trpc::overload_control { + +class RecentQueue { + public: + explicit RecentQueue(int64_t limit, uint64_t window_size); + + bool Add(); + + int64_t ActiveCount(); + + private: + std::vector cache_; + std::mutex mutex_; + + // The location for cache eviction is also where the new timestamp is. + int64_t cur_{0}; + + // Maximum number of requests per second. + int64_t limit_; + // The time window size for cache eviction. + uint64_t window_size_; +}; + +using RecentQueuePtr = std::unique_ptr; + +} // namespace trpc::overload_control + +#endif diff --git a/trpc/overload_control/flow_control/recent_queue_test.cc b/trpc/overload_control/flow_control/recent_queue_test.cc new file mode 100644 index 00000000..68e46e1d --- /dev/null +++ b/trpc/overload_control/flow_control/recent_queue_test.cc @@ -0,0 +1,67 @@ +// +// +// Tencent is pleased to support the open source community by making tRPC available. +// +// Copyright (C) 2024 THL A29 Limited, a Tencent company. +// All rights reserved. +// +// If you have downloaded a copy of the tRPC source code from Tencent, +// please note that tRPC source code is licensed under the Apache 2.0 License, +// A copy of the Apache 2.0 License is included in this file. +// +// + +#ifdef TRPC_BUILD_INCLUDE_OVERLOAD_CONTROL + +#include "trpc/overload_control/flow_control/recent_queue.h" + +#include + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +namespace trpc::overload_control { + +namespace testing { + +class RecentQueueTest : public ::testing::Test { + protected: + void SetUp() override { seq_queue_ = std::make_unique(3, static_cast(1e9)); } + + void TearDown() override { seq_queue_ = nullptr; } + + protected: + RecentQueuePtr seq_queue_; +}; + +TEST_F(RecentQueueTest, Add) { + for (int i{0}; i < 3; ++i) { + ASSERT_EQ(seq_queue_->Add(), true); + ASSERT_EQ(seq_queue_->Add(), true); + ASSERT_EQ(seq_queue_->Add(), true); + ASSERT_EQ(seq_queue_->Add(), false); + ASSERT_EQ(seq_queue_->Add(), false); + std::this_thread::sleep_for(std::chrono::seconds(1)); + } +} + +TEST_F(RecentQueueTest, ActiveCount) { + for (int i{0}; i < 3; ++i) { + ASSERT_EQ(seq_queue_->ActiveCount(), 0); + ASSERT_EQ(seq_queue_->Add(), true); + ASSERT_EQ(seq_queue_->Add(), true); + ASSERT_EQ(seq_queue_->ActiveCount(), 2); + ASSERT_EQ(seq_queue_->Add(), true); + ASSERT_EQ(seq_queue_->ActiveCount(), 3); + ASSERT_EQ(seq_queue_->Add(), false); + ASSERT_EQ(seq_queue_->Add(), false); + ASSERT_EQ(seq_queue_->ActiveCount(), 3); + std::this_thread::sleep_for(std::chrono::seconds(1)); + } +} + +} // namespace testing + +} // namespace trpc::overload_control + +#endif diff --git a/trpc/overload_control/flow_control/smooth_limiter.cc b/trpc/overload_control/flow_control/smooth_limiter.cc index a0fbf536..29ae2c27 100644 --- a/trpc/overload_control/flow_control/smooth_limiter.cc +++ b/trpc/overload_control/flow_control/smooth_limiter.cc @@ -14,13 +14,7 @@ #ifdef TRPC_BUILD_INCLUDE_OVERLOAD_CONTROL #include "trpc/overload_control/flow_control/smooth_limiter.h" - -#include -#include -#include - #include "trpc/overload_control/common/report.h" -#include "trpc/util/log/logging.h" namespace trpc::overload_control { @@ -28,34 +22,19 @@ SmoothLimiter::SmoothLimiter(int64_t limit, bool is_report, int32_t window_size) : limit_(limit), is_report_(is_report), window_size_(window_size <= 0 ? kDefaultNumFps : window_size), - hit_queue_(window_size_), - tick_timer_(std::chrono::microseconds(static_cast(std::ceil(1000000.0 / window_size_))), - [this] { OnNextFrame(); }) { + recent_queue_(std::make_unique(limit, nsecs_per_sec_)) { TRPC_ASSERT(window_size_ > 0); } -SmoothLimiter::~SmoothLimiter() { tick_timer_.Deactivate(); } - bool SmoothLimiter::CheckLimit(const ServerContextPtr& context) { - bool ret = false; - int64_t active_sum = hit_queue_.ActiveSum(); - int64_t hit_num = 0; - if (active_sum >= limit_) { - ret = true; - } else { - hit_num = hit_queue_.AddHit(); - if (hit_num > limit_) { - ret = true; - } - } + bool ret = !recent_queue_->Add(); + if (is_report_) { OverloadInfo infos; infos.attr_name = "SmoothLimiter"; infos.report_name = context->GetFuncName(); - infos.tags["active_sum"] = active_sum; - infos.tags["hit_num"] = hit_num; + infos.tags["active_count"] = recent_queue_->ActiveCount(); infos.tags["max_qps"] = limit_; - infos.tags["window_size"] = window_size_; infos.tags[kOverloadctrlPass] = (ret ? 0 : 1); infos.tags[kOverloadctrlLimited] = (ret ? 1 : 0); Report::GetInstance()->ReportOverloadInfo(infos); @@ -63,9 +42,7 @@ bool SmoothLimiter::CheckLimit(const ServerContextPtr& context) { return ret; } -int64_t SmoothLimiter::GetCurrCounter() { return hit_queue_.ActiveSum(); } - -void SmoothLimiter::OnNextFrame() { hit_queue_.NextFrame(); } +int64_t SmoothLimiter::GetCurrCounter() { return recent_queue_->ActiveCount(); } } // namespace trpc::overload_control diff --git a/trpc/overload_control/flow_control/smooth_limiter.h b/trpc/overload_control/flow_control/smooth_limiter.h index eb876540..77a00283 100644 --- a/trpc/overload_control/flow_control/smooth_limiter.h +++ b/trpc/overload_control/flow_control/smooth_limiter.h @@ -20,8 +20,7 @@ #include #include "trpc/overload_control/flow_control/flow_controller.h" -#include "trpc/overload_control/flow_control/hit_queue.h" -#include "trpc/overload_control/flow_control/tick_timer.h" +#include "trpc/overload_control/flow_control/recent_queue.h" namespace trpc { namespace overload_control { @@ -34,8 +33,6 @@ class SmoothLimiter : public FlowController { public: explicit SmoothLimiter(int64_t limit, bool is_report = false, int32_t window_size = kDefaultNumFps); - virtual ~SmoothLimiter(); - // Check if the request is restricted. bool CheckLimit(const ServerContextPtr& check_param) override; @@ -45,10 +42,6 @@ class SmoothLimiter : public FlowController { // Get the current maximum request limit of the flow controller. int64_t GetMaxCounter() const override { return limit_; } - protected: - // Call when the next time slice arrives. - void OnNextFrame(); - protected: // Maximum number of requests per second. int64_t limit_; @@ -56,10 +49,10 @@ class SmoothLimiter : public FlowController { bool is_report_{false}; // Window size int32_t window_size_{kDefaultNumFps}; - // Hit queue - HitQueue hit_queue_; - // Timer implemented using system clock ticks. - TickTimer tick_timer_; + + RecentQueuePtr recent_queue_; + // Nanoseconds per second, 1s = 10^9 ns + static constexpr auto nsecs_per_sec_ = static_cast(1e9); }; } // namespace overload_control diff --git a/trpc/overload_control/flow_control/smooth_limiter_test.cc b/trpc/overload_control/flow_control/smooth_limiter_test.cc index 6a7d7474..db8adff9 100644 --- a/trpc/overload_control/flow_control/smooth_limiter_test.cc +++ b/trpc/overload_control/flow_control/smooth_limiter_test.cc @@ -64,6 +64,25 @@ TEST_F(SmoothLimiterTest, CheckLimit) { std::this_thread::sleep_for(std::chrono::seconds(1)); } +TEST_F(SmoothLimiterTest, Overload) { + ServerContextPtr context = MakeRefCounted(); + ProtocolPtr req_msg = std::make_shared(); + context->SetRequestMsg(std::move(req_msg)); + context->SetFuncName("trpc.test.flow_control.smooth_limiter"); + + ASSERT_EQ(smooth_limiter_->GetMaxCounter(), 3); + for (int i{0}; i < 4; ++i) { + ASSERT_EQ(smooth_limiter_->GetCurrCounter(), 0); + ASSERT_EQ(smooth_limiter_->CheckLimit(context), false); + ASSERT_EQ(smooth_limiter_->CheckLimit(context), false); + ASSERT_EQ(smooth_limiter_->GetCurrCounter(), 2); + ASSERT_EQ(smooth_limiter_->CheckLimit(context), false); + ASSERT_EQ(smooth_limiter_->CheckLimit(context), true); + ASSERT_EQ(smooth_limiter_->CheckLimit(context), true); + std::this_thread::sleep_for(std::chrono::seconds(1)); + } +} + } // namespace testing } // namespace trpc::overload_control