Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Support circuit breaking for faulty nodes in direct and domain selector modes #113

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 0 additions & 28 deletions trpc/common/config/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -288,34 +288,6 @@ cc_test(
],
)

cc_library(
name = "domain_naming_conf",
srcs = ["domain_naming_conf.cc"],
hdrs = ["domain_naming_conf.h"],
deps = [
"//trpc/util/log:logging",
"@com_github_jbeder_yaml_cpp//:yaml-cpp",
],
)

cc_library(
name = "domain_naming_conf_parser",
hdrs = ["domain_naming_conf_parser.h"],
deps = [
":domain_naming_conf",
],
)

cc_test(
name = "domain_naming_conf_test",
srcs = ["domain_naming_conf_test.cc"],
deps = [
":domain_naming_conf",
":domain_naming_conf_parser",
"@com_google_googletest//:gtest_main",
],
)

cc_library(
name = "loadbalance_naming_conf",
srcs = ["loadbalance_naming_conf.cc"],
Expand Down
2 changes: 2 additions & 0 deletions trpc/naming/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ cc_library(
":selector_factory",
"//trpc/filter:filter_manager",
"//trpc/naming:load_balance_factory",
"//trpc/naming/common/util/circuit_break:circuit_breaker_creator_factory",
"//trpc/naming/common/util/circuit_break:default_circuit_breaker",
"//trpc/naming/direct:direct_selector_filter",
"//trpc/naming/direct:selector_direct",
"//trpc/naming/domain:domain_selector_filter",
Expand Down
6 changes: 0 additions & 6 deletions trpc/naming/common/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@ package(default_visibility = ["//visibility:public"])
cc_library(
name = "common_defs",
hdrs = ["common_defs.h"],
visibility = [
"//visibility:public",
],
deps = [
":common_inc_deprecated",
"//trpc/client:client_context",
Expand All @@ -34,8 +31,5 @@ cc_library(
cc_library(
name = "constants",
hdrs = ["constants.h"],
visibility = [
"//visibility:public",
],
deps = [],
)
121 changes: 121 additions & 0 deletions trpc/naming/common/util/circuit_break/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
licenses(["notice"])

package(default_visibility = ["//visibility:public"])

cc_library(
name = "bucket_circular_array",
srcs = [
"bucket_circular_array.cc",
],
hdrs = [
"bucket_circular_array.h",
],
deps = [
"//trpc/util/log:logging",
],
)

cc_library(
name = "circuit_break_whitelist",
srcs = [
"circuit_break_whitelist.cc",
],
hdrs = [
"circuit_break_whitelist.h",
],
deps = [
"//trpc/codec/trpc",
],
)

cc_test(
name = "circuit_break_whitelist_test",
srcs = [
"circuit_break_whitelist_test.cc",
],
deps = [
":circuit_break_whitelist",
"@com_google_googletest//:gtest_main",
],
)

cc_library(
name = "circuit_breaker_config",
hdrs = [
"circuit_breaker_config.h",
],
deps = [
"//trpc/util/log:logging",
"@com_github_jbeder_yaml_cpp//:yaml-cpp",
],
)

cc_library(
name = "default_circuit_breaker_config",
hdrs = [
"default_circuit_breaker_config.h",
],
deps = [
"@com_github_jbeder_yaml_cpp//:yaml-cpp",
],
)

cc_library(
name = "default_circuit_breaker",
srcs = [
"default_circuit_breaker.cc",
],
hdrs = [
"default_circuit_breaker.h",
],
deps = [
":bucket_circular_array",
":circuit_breaker",
":default_circuit_breaker_config",
"//trpc/util/log:logging",
],
)

cc_library(
name = "circuit_breaker",
srcs = [],
hdrs = [
"circuit_breaker.h",
],
deps = [
"//trpc/naming/common:common_defs",
],
)

cc_library(
name = "circuit_breaker_creator_factory",
srcs = [],
hdrs = [
"circuit_breaker_creator_factory.h",
],
deps = [
":circuit_breaker",
"@com_github_jbeder_yaml_cpp//:yaml-cpp",
],
)

cc_test(
name = "default_circuit_breaker_test",
srcs = ["default_circuit_breaker_test.cc"],
deps = [
":default_circuit_breaker",
"//trpc/util:time",
"@com_google_googletest//:gtest",
"@com_google_googletest//:gtest_main",
],
)

cc_test(
name = "default_circuit_beaker_config_test",
srcs = ["default_circuit_beaker_config_test.cc"],
deps = [
":default_circuit_breaker_config",
"@com_google_googletest//:gtest",
"@com_google_googletest//:gtest_main",
],
)
75 changes: 75 additions & 0 deletions trpc/naming/common/util/circuit_break/bucket_circular_array.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
//
//
// Tencent is pleased to support the open source community by making tRPC available.
//
// Copyright (C) 2023 THL A29 Limited, a Tencent company.
// All rights reserved.
//
// If you have downloaded a copy of the tRPC source code from Tencent,
// please note that tRPC source code is licensed under the Apache 2.0 License,
// A copy of the Apache 2.0 License is included in this file.
//
//

#include "trpc/naming/common/util/circuit_break/bucket_circular_array.h"

#include "trpc/util/log/logging.h"

namespace trpc::naming {

BucketCircularArray::BucketCircularArray(uint32_t stat_window_ms, uint32_t buckets_num)
: buckets_num_(buckets_num), stat_window_ms_per_bucket_(stat_window_ms / buckets_num), buckets_(buckets_num) {
for (auto& bucket : buckets_) {
bucket.bucket_time.store(0, std::memory_order_relaxed);
bucket.total_count.store(0, std::memory_order_relaxed);
bucket.error_count.store(0, std::memory_order_relaxed);
}
}

void BucketCircularArray::AddMetrics(uint64_t current_ms, bool success) {
uint64_t bucket_time = current_ms / stat_window_ms_per_bucket_;
int bucket_index = bucket_time % buckets_num_;
auto& bucket = buckets_[bucket_index];
// If it is data from the previous round, reset the data for that window.
uint64_t store_bucket_time = bucket.bucket_time;
if (bucket_time != store_bucket_time) {
if (bucket.bucket_time.compare_exchange_weak(store_bucket_time, bucket_time, std::memory_order_relaxed)) {
bucket.total_count.store(0, std::memory_order_relaxed);
bucket.error_count.store(0, std::memory_order_relaxed);
}
}

bucket.total_count.fetch_add(1, std::memory_order_relaxed);
if (!success) {
bucket.error_count.fetch_add(1, std::memory_order_relaxed);
}
}

void BucketCircularArray::ClearMetrics() {
// Since the time of data is checked when adding metrics, here we only need to reset the bucket_time.
for (auto& bucket : buckets_) {
bucket.bucket_time = 0;
}
}

float BucketCircularArray::GetErrorRate(uint64_t current_ms, uint32_t request_volume_threshold) {
uint64_t bucket_time = current_ms / stat_window_ms_per_bucket_;
uint64_t error_count = 0;
uint64_t total_count = 0;
for (auto& bucket : buckets_) {
// Only collect data from the most recent round.
if (bucket.bucket_time.load(std::memory_order_relaxed) > (bucket_time - buckets_num_)) {
total_count += bucket.total_count.load(std::memory_order_relaxed);
error_count += bucket.error_count.load(std::memory_order_relaxed);
}
}

if (total_count >= request_volume_threshold) {
return static_cast<float>(error_count) / total_count;
}

// If the minimum number of requests is not reached, return a failure rate of 0.
return 0;
}

} // namespace trpc::naming
51 changes: 51 additions & 0 deletions trpc/naming/common/util/circuit_break/bucket_circular_array.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@

//
//
// Tencent is pleased to support the open source community by making tRPC available.
//
// Copyright (C) 2023 THL A29 Limited, a Tencent company.
// All rights reserved.
//
// If you have downloaded a copy of the tRPC source code from Tencent,
// please note that tRPC source code is licensed under the Apache 2.0 License,
// A copy of the Apache 2.0 License is included in this file.
//
//

#pragma once

#include <atomic>
#include <cstdint>
#include <vector>

namespace trpc::naming {

/// @brief A thread-safe class for tracking invocation statistics using a sliding window implementation.
class BucketCircularArray {
public:
/// @brief Construct a bucket circular array
/// @note It is necessary to ensure that stat_window_ms is divisible by buckets_num.
BucketCircularArray(uint32_t stat_window_ms, uint32_t buckets_num);

/// @brief Add statistical data
void AddMetrics(uint64_t current_ms, bool success);

/// @brief Clear statistical data
void ClearMetrics();

/// @brief Retrieve the failure rate within the statistical time period
float GetErrorRate(uint64_t current_ms, uint32_t request_volume_threshold);

private:
struct Metrics {
std::atomic<uint64_t> bucket_time{0}; // The start time of the bucket
std::atomic<uint32_t> total_count{0}; // The request count during current time period
std::atomic<uint32_t> error_count{0}; // The error count during current time period
};

uint32_t buckets_num_;
uint32_t stat_window_ms_per_bucket_;
std::vector<Metrics> buckets_;
};

} // namespace trpc::naming
43 changes: 43 additions & 0 deletions trpc/naming/common/util/circuit_break/circuit_break_whitelist.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
//
//
// Tencent is pleased to support the open source community by making tRPC available.
//
// Copyright (C) 2023 THL A29 Limited, a Tencent company.
// All rights reserved.
//
// If you have downloaded a copy of the tRPC source code from Tencent,
// please note that tRPC source code is licensed under the Apache 2.0 License,
// A copy of the Apache 2.0 License is included in this file.
//
//

#include "trpc/naming/common/util/circuit_break/circuit_break_whitelist.h"

#include "trpc/codec/trpc/trpc.pb.h"

namespace trpc::naming {

CircuitBreakWhiteList::CircuitBreakWhiteList() {
// Add default error code to whitelist
circuitbreak_whitelist_.Writer().insert(TrpcRetCode::TRPC_SERVER_OVERLOAD_ERR);
circuitbreak_whitelist_.Writer().insert(TrpcRetCode::TRPC_SERVER_LIMITED_ERR);
circuitbreak_whitelist_.Swap();
}

void CircuitBreakWhiteList::SetCircuitBreakWhiteList(const std::vector<int>& retcodes) {
auto& writer = circuitbreak_whitelist_.Writer();
writer.clear();
writer.insert(retcodes.begin(), retcodes.end());
circuitbreak_whitelist_.Swap();
}

bool CircuitBreakWhiteList::Contains(int retcode) {
auto& reader = circuitbreak_whitelist_.Reader();
if (reader.find(retcode) != reader.end()) {
return true;
}

return false;
}

} // namespace trpc::naming
Loading
Loading