Skip to content

Commit

Permalink
Feat: Support circuit breaking for faulty nodes in direct and domain …
Browse files Browse the repository at this point in the history
…selector modes
  • Loading branch information
chhy2009 committed Feb 19, 2024
1 parent fe2471a commit 71918cf
Show file tree
Hide file tree
Showing 41 changed files with 2,402 additions and 365 deletions.
28 changes: 0 additions & 28 deletions trpc/common/config/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -288,34 +288,6 @@ cc_test(
],
)

cc_library(
name = "domain_naming_conf",
srcs = ["domain_naming_conf.cc"],
hdrs = ["domain_naming_conf.h"],
deps = [
"//trpc/util/log:logging",
"@com_github_jbeder_yaml_cpp//:yaml-cpp",
],
)

cc_library(
name = "domain_naming_conf_parser",
hdrs = ["domain_naming_conf_parser.h"],
deps = [
":domain_naming_conf",
],
)

cc_test(
name = "domain_naming_conf_test",
srcs = ["domain_naming_conf_test.cc"],
deps = [
":domain_naming_conf",
":domain_naming_conf_parser",
"@com_google_googletest//:gtest_main",
],
)

cc_library(
name = "local_file_provider_conf",
srcs = ["local_file_provider_conf.cc"],
Expand Down
2 changes: 2 additions & 0 deletions trpc/naming/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ cc_library(
":selector_factory",
"//trpc/filter:filter_manager",
"//trpc/naming:load_balance_factory",
"//trpc/naming/common/util/circuit_break:circuit_breaker_creator_factory",
"//trpc/naming/common/util/circuit_break:default_circuit_breaker",
"//trpc/naming/direct:direct_selector_filter",
"//trpc/naming/direct:selector_direct",
"//trpc/naming/domain:domain_selector_filter",
Expand Down
6 changes: 0 additions & 6 deletions trpc/naming/common/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@ package(default_visibility = ["//visibility:public"])
cc_library(
name = "common_defs",
hdrs = ["common_defs.h"],
visibility = [
"//visibility:public",
],
deps = [
":common_inc_deprecated",
"//trpc/client:client_context",
Expand All @@ -34,8 +31,5 @@ cc_library(
cc_library(
name = "constants",
hdrs = ["constants.h"],
visibility = [
"//visibility:public",
],
deps = [],
)
121 changes: 121 additions & 0 deletions trpc/naming/common/util/circuit_break/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
licenses(["notice"])

package(default_visibility = ["//visibility:public"])

cc_library(
name = "bucket_circular_array",
srcs = [
"bucket_circular_array.cc",
],
hdrs = [
"bucket_circular_array.h",
],
deps = [
"//trpc/util/log:logging",
],
)

cc_library(
name = "circuit_break_whitelist",
srcs = [
"circuit_break_whitelist.cc",
],
hdrs = [
"circuit_break_whitelist.h",
],
deps = [
"//trpc/codec/trpc",
],
)

cc_test(
name = "circuit_break_whitelist_test",
srcs = [
"circuit_break_whitelist_test.cc",
],
deps = [
":circuit_break_whitelist",
"@com_google_googletest//:gtest_main",
],
)

cc_library(
name = "circuit_breaker_config",
hdrs = [
"circuit_breaker_config.h",
],
deps = [
"//trpc/util/log:logging",
"@com_github_jbeder_yaml_cpp//:yaml-cpp",
],
)

cc_library(
name = "default_circuit_breaker_config",
hdrs = [
"default_circuit_breaker_config.h",
],
deps = [
"@com_github_jbeder_yaml_cpp//:yaml-cpp",
],
)

cc_library(
name = "default_circuit_breaker",
srcs = [
"default_circuit_breaker.cc",
],
hdrs = [
"default_circuit_breaker.h",
],
deps = [
":bucket_circular_array",
":circuit_breaker",
":default_circuit_breaker_config",
"//trpc/util/log:logging",
],
)

cc_library(
name = "circuit_breaker",
srcs = [],
hdrs = [
"circuit_breaker.h",
],
deps = [
"//trpc/naming/common:common_defs",
],
)

cc_library(
name = "circuit_breaker_creator_factory",
srcs = [],
hdrs = [
"circuit_breaker_creator_factory.h",
],
deps = [
":circuit_breaker",
"@com_github_jbeder_yaml_cpp//:yaml-cpp",
],
)

cc_test(
name = "default_circuit_breaker_test",
srcs = ["default_circuit_breaker_test.cc"],
deps = [
":default_circuit_breaker",
"//trpc/util:time",
"@com_google_googletest//:gtest",
"@com_google_googletest//:gtest_main",
],
)

cc_test(
name = "default_circuit_beaker_config_test",
srcs = ["default_circuit_beaker_config_test.cc"],
deps = [
":default_circuit_breaker_config",
"@com_google_googletest//:gtest",
"@com_google_googletest//:gtest_main",
],
)
75 changes: 75 additions & 0 deletions trpc/naming/common/util/circuit_break/bucket_circular_array.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
//
//
// Tencent is pleased to support the open source community by making tRPC available.
//
// Copyright (C) 2023 THL A29 Limited, a Tencent company.
// All rights reserved.
//
// If you have downloaded a copy of the tRPC source code from Tencent,
// please note that tRPC source code is licensed under the Apache 2.0 License,
// A copy of the Apache 2.0 License is included in this file.
//
//

#include "trpc/naming/common/util/circuit_break/bucket_circular_array.h"

#include "trpc/util/log/logging.h"

namespace trpc::naming {

BucketCircularArray::BucketCircularArray(uint32_t stat_window_ms, uint32_t buckets_num)
: buckets_num_(buckets_num), stat_window_ms_per_bucket_(stat_window_ms / buckets_num), buckets_(buckets_num) {
for (auto& bucket : buckets_) {
bucket.bucket_time.store(0, std::memory_order_relaxed);
bucket.total_count.store(0, std::memory_order_relaxed);
bucket.error_count.store(0, std::memory_order_relaxed);
}
}

void BucketCircularArray::AddMetrics(uint64_t current_ms, bool success) {
uint64_t bucket_time = current_ms / stat_window_ms_per_bucket_;
int bucket_index = bucket_time % buckets_num_;
auto& bucket = buckets_[bucket_index];
// If it is data from the previous round, reset the data for that window.
uint64_t store_bucket_time = bucket.bucket_time;
if (bucket_time != store_bucket_time) {
if (bucket.bucket_time.compare_exchange_weak(store_bucket_time, bucket_time, std::memory_order_relaxed)) {
bucket.total_count.store(0, std::memory_order_relaxed);
bucket.error_count.store(0, std::memory_order_relaxed);
}
}

bucket.total_count.fetch_add(1, std::memory_order_relaxed);
if (!success) {
bucket.error_count.fetch_add(1, std::memory_order_relaxed);
}
}

void BucketCircularArray::ClearMetrics() {
// Since the time of data is checked when adding metrics, here we only need to reset the bucket_time.
for (auto& bucket : buckets_) {
bucket.bucket_time = 0;
}
}

float BucketCircularArray::GetErrorRate(uint64_t current_ms, uint32_t request_volume_threshold) {
uint64_t bucket_time = current_ms / stat_window_ms_per_bucket_;
uint64_t error_count = 0;
uint64_t total_count = 0;
for (auto& bucket : buckets_) {
// Only collect data from the most recent round.
if (bucket.bucket_time.load(std::memory_order_relaxed) > (bucket_time - buckets_num_)) {
total_count += bucket.total_count.load(std::memory_order_relaxed);
error_count += bucket.error_count.load(std::memory_order_relaxed);
}
}

if (total_count >= request_volume_threshold) {
return static_cast<float>(error_count) / total_count;
}

// If the minimum number of requests is not reached, return a failure rate of 0.
return 0;
}

} // namespace trpc::naming
51 changes: 51 additions & 0 deletions trpc/naming/common/util/circuit_break/bucket_circular_array.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@

//
//
// Tencent is pleased to support the open source community by making tRPC available.
//
// Copyright (C) 2023 THL A29 Limited, a Tencent company.
// All rights reserved.
//
// If you have downloaded a copy of the tRPC source code from Tencent,
// please note that tRPC source code is licensed under the Apache 2.0 License,
// A copy of the Apache 2.0 License is included in this file.
//
//

#pragma once

#include <atomic>
#include <cstdint>
#include <vector>

namespace trpc::naming {

/// @brief A thread-safe class for tracking invocation statistics using a sliding window implementation.
class BucketCircularArray {
public:
/// @brief Construct a bucket circular array
/// @note It is necessary to ensure that stat_window_ms is divisible by buckets_num.
BucketCircularArray(uint32_t stat_window_ms, uint32_t buckets_num);

/// @brief Add statistical data
void AddMetrics(uint64_t current_ms, bool success);

/// @brief Clear statistical data
void ClearMetrics();

/// @brief Retrieve the failure rate within the statistical time period
float GetErrorRate(uint64_t current_ms, uint32_t request_volume_threshold);

private:
struct Metrics {
std::atomic<uint64_t> bucket_time{0}; // The start time of the bucket
std::atomic<uint32_t> total_count{0}; // The request count during current time period
std::atomic<uint32_t> error_count{0}; // The error count during current time period
};

uint32_t buckets_num_;
uint32_t stat_window_ms_per_bucket_;
std::vector<Metrics> buckets_;
};

} // namespace trpc::naming
43 changes: 43 additions & 0 deletions trpc/naming/common/util/circuit_break/circuit_break_whitelist.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
//
//
// Tencent is pleased to support the open source community by making tRPC available.
//
// Copyright (C) 2023 THL A29 Limited, a Tencent company.
// All rights reserved.
//
// If you have downloaded a copy of the tRPC source code from Tencent,
// please note that tRPC source code is licensed under the Apache 2.0 License,
// A copy of the Apache 2.0 License is included in this file.
//
//

#include "trpc/naming/common/util/circuit_break/circuit_break_whitelist.h"

#include "trpc/codec/trpc/trpc.pb.h"

namespace trpc::naming {

CircuitBreakWhiteList::CircuitBreakWhiteList() {
// Add default error code to whitelist
circuitbreak_whitelist_.Writer().insert(TrpcRetCode::TRPC_SERVER_OVERLOAD_ERR);
circuitbreak_whitelist_.Writer().insert(TrpcRetCode::TRPC_SERVER_LIMITED_ERR);
circuitbreak_whitelist_.Swap();
}

void CircuitBreakWhiteList::SetCircuitBreakWhiteList(const std::vector<int>& retcodes) {
auto& writer = circuitbreak_whitelist_.Writer();
writer.clear();
writer.insert(retcodes.begin(), retcodes.end());
circuitbreak_whitelist_.Swap();
}

bool CircuitBreakWhiteList::Contains(int retcode) {
auto& reader = circuitbreak_whitelist_.Reader();
if (reader.find(retcode) != reader.end()) {
return true;
}

return false;
}

} // namespace trpc::naming
Loading

0 comments on commit 71918cf

Please sign in to comment.