diff --git a/trpc/common/config/BUILD b/trpc/common/config/BUILD index 028ce9f8..89462200 100644 --- a/trpc/common/config/BUILD +++ b/trpc/common/config/BUILD @@ -288,34 +288,6 @@ cc_test( ], ) -cc_library( - name = "domain_naming_conf", - srcs = ["domain_naming_conf.cc"], - hdrs = ["domain_naming_conf.h"], - deps = [ - "//trpc/util/log:logging", - "@com_github_jbeder_yaml_cpp//:yaml-cpp", - ], -) - -cc_library( - name = "domain_naming_conf_parser", - hdrs = ["domain_naming_conf_parser.h"], - deps = [ - ":domain_naming_conf", - ], -) - -cc_test( - name = "domain_naming_conf_test", - srcs = ["domain_naming_conf_test.cc"], - deps = [ - ":domain_naming_conf", - ":domain_naming_conf_parser", - "@com_google_googletest//:gtest_main", - ], -) - cc_library( name = "local_file_provider_conf", srcs = ["local_file_provider_conf.cc"], diff --git a/trpc/naming/BUILD b/trpc/naming/BUILD index 9f33f4fd..4d5345f9 100644 --- a/trpc/naming/BUILD +++ b/trpc/naming/BUILD @@ -173,6 +173,8 @@ cc_library( ":selector_factory", "//trpc/filter:filter_manager", "//trpc/naming:load_balance_factory", + "//trpc/naming/common/util/circuit_break:circuit_breaker_creator_factory", + "//trpc/naming/common/util/circuit_break:default_circuit_breaker", "//trpc/naming/direct:direct_selector_filter", "//trpc/naming/direct:selector_direct", "//trpc/naming/domain:domain_selector_filter", diff --git a/trpc/naming/common/BUILD b/trpc/naming/common/BUILD index cd2fbcc3..6a92b5e7 100644 --- a/trpc/naming/common/BUILD +++ b/trpc/naming/common/BUILD @@ -7,9 +7,6 @@ package(default_visibility = ["//visibility:public"]) cc_library( name = "common_defs", hdrs = ["common_defs.h"], - visibility = [ - "//visibility:public", - ], deps = [ ":common_inc_deprecated", "//trpc/client:client_context", @@ -34,8 +31,5 @@ cc_library( cc_library( name = "constants", hdrs = ["constants.h"], - visibility = [ - "//visibility:public", - ], deps = [], ) diff --git a/trpc/naming/common/util/circuit_break/BUILD b/trpc/naming/common/util/circuit_break/BUILD new file mode 100644 index 00000000..dd7924da --- /dev/null +++ b/trpc/naming/common/util/circuit_break/BUILD @@ -0,0 +1,121 @@ +licenses(["notice"]) + +package(default_visibility = ["//visibility:public"]) + +cc_library( + name = "bucket_circular_array", + srcs = [ + "bucket_circular_array.cc", + ], + hdrs = [ + "bucket_circular_array.h", + ], + deps = [ + "//trpc/util/log:logging", + ], +) + +cc_library( + name = "circuit_break_whitelist", + srcs = [ + "circuit_break_whitelist.cc", + ], + hdrs = [ + "circuit_break_whitelist.h", + ], + deps = [ + "//trpc/codec/trpc", + ], +) + +cc_test( + name = "circuit_break_whitelist_test", + srcs = [ + "circuit_break_whitelist_test.cc", + ], + deps = [ + ":circuit_break_whitelist", + "@com_google_googletest//:gtest_main", + ], +) + +cc_library( + name = "circuit_breaker_config", + hdrs = [ + "circuit_breaker_config.h", + ], + deps = [ + "//trpc/util/log:logging", + "@com_github_jbeder_yaml_cpp//:yaml-cpp", + ], +) + +cc_library( + name = "default_circuit_breaker_config", + hdrs = [ + "default_circuit_breaker_config.h", + ], + deps = [ + "@com_github_jbeder_yaml_cpp//:yaml-cpp", + ], +) + +cc_library( + name = "default_circuit_breaker", + srcs = [ + "default_circuit_breaker.cc", + ], + hdrs = [ + "default_circuit_breaker.h", + ], + deps = [ + ":bucket_circular_array", + ":circuit_breaker", + ":default_circuit_breaker_config", + "//trpc/util/log:logging", + ], +) + +cc_library( + name = "circuit_breaker", + srcs = [], + hdrs = [ + "circuit_breaker.h", + ], + deps = [ + "//trpc/naming/common:common_defs", + ], +) + +cc_library( + name = "circuit_breaker_creator_factory", + srcs = [], + hdrs = [ + "circuit_breaker_creator_factory.h", + ], + deps = [ + ":circuit_breaker", + "@com_github_jbeder_yaml_cpp//:yaml-cpp", + ], +) + +cc_test( + name = "default_circuit_breaker_test", + srcs = ["default_circuit_breaker_test.cc"], + deps = [ + ":default_circuit_breaker", + "//trpc/util:time", + "@com_google_googletest//:gtest", + "@com_google_googletest//:gtest_main", + ], +) + +cc_test( + name = "default_circuit_beaker_config_test", + srcs = ["default_circuit_beaker_config_test.cc"], + deps = [ + ":default_circuit_breaker_config", + "@com_google_googletest//:gtest", + "@com_google_googletest//:gtest_main", + ], +) diff --git a/trpc/naming/common/util/circuit_break/bucket_circular_array.cc b/trpc/naming/common/util/circuit_break/bucket_circular_array.cc new file mode 100644 index 00000000..5ef8db22 --- /dev/null +++ b/trpc/naming/common/util/circuit_break/bucket_circular_array.cc @@ -0,0 +1,75 @@ +// +// +// Tencent is pleased to support the open source community by making tRPC available. +// +// Copyright (C) 2023 THL A29 Limited, a Tencent company. +// All rights reserved. +// +// If you have downloaded a copy of the tRPC source code from Tencent, +// please note that tRPC source code is licensed under the Apache 2.0 License, +// A copy of the Apache 2.0 License is included in this file. +// +// + +#include "trpc/naming/common/util/circuit_break/bucket_circular_array.h" + +#include "trpc/util/log/logging.h" + +namespace trpc::naming { + +BucketCircularArray::BucketCircularArray(uint32_t stat_window_ms, uint32_t buckets_num) + : buckets_num_(buckets_num), stat_window_ms_per_bucket_(stat_window_ms / buckets_num), buckets_(buckets_num) { + for (auto& bucket : buckets_) { + bucket.bucket_time.store(0, std::memory_order_relaxed); + bucket.total_count.store(0, std::memory_order_relaxed); + bucket.error_count.store(0, std::memory_order_relaxed); + } +} + +void BucketCircularArray::AddMetrics(uint64_t current_ms, bool success) { + uint64_t bucket_time = current_ms / stat_window_ms_per_bucket_; + int bucket_index = bucket_time % buckets_num_; + auto& bucket = buckets_[bucket_index]; + // If it is data from the previous round, reset the data for that window. + uint64_t store_bucket_time = bucket.bucket_time; + if (bucket_time != store_bucket_time) { + if (bucket.bucket_time.compare_exchange_weak(store_bucket_time, bucket_time, std::memory_order_relaxed)) { + bucket.total_count.store(0, std::memory_order_relaxed); + bucket.error_count.store(0, std::memory_order_relaxed); + } + } + + bucket.total_count.fetch_add(1, std::memory_order_relaxed); + if (!success) { + bucket.error_count.fetch_add(1, std::memory_order_relaxed); + } +} + +void BucketCircularArray::ClearMetrics() { + // Since the time of data is checked when adding metrics, here we only need to reset the bucket_time. + for (auto& bucket : buckets_) { + bucket.bucket_time = 0; + } +} + +float BucketCircularArray::GetErrorRate(uint64_t current_ms, uint32_t request_volume_threshold) { + uint64_t bucket_time = current_ms / stat_window_ms_per_bucket_; + uint64_t error_count = 0; + uint64_t total_count = 0; + for (auto& bucket : buckets_) { + // Only collect data from the most recent round. + if (bucket.bucket_time.load(std::memory_order_relaxed) > (bucket_time - buckets_num_)) { + total_count += bucket.total_count.load(std::memory_order_relaxed); + error_count += bucket.error_count.load(std::memory_order_relaxed); + } + } + + if (total_count >= request_volume_threshold) { + return static_cast(error_count) / total_count; + } + + // If the minimum number of requests is not reached, return a failure rate of 0. + return 0; +} + +} // namespace trpc::naming diff --git a/trpc/naming/common/util/circuit_break/bucket_circular_array.h b/trpc/naming/common/util/circuit_break/bucket_circular_array.h new file mode 100644 index 00000000..238b803d --- /dev/null +++ b/trpc/naming/common/util/circuit_break/bucket_circular_array.h @@ -0,0 +1,51 @@ + +// +// +// Tencent is pleased to support the open source community by making tRPC available. +// +// Copyright (C) 2023 THL A29 Limited, a Tencent company. +// All rights reserved. +// +// If you have downloaded a copy of the tRPC source code from Tencent, +// please note that tRPC source code is licensed under the Apache 2.0 License, +// A copy of the Apache 2.0 License is included in this file. +// +// + +#pragma once + +#include +#include +#include + +namespace trpc::naming { + +/// @brief A thread-safe class for tracking invocation statistics using a sliding window implementation. +class BucketCircularArray { + public: + /// @brief Construct a bucket circular array + /// @note It is necessary to ensure that stat_window_ms is divisible by buckets_num. + BucketCircularArray(uint32_t stat_window_ms, uint32_t buckets_num); + + /// @brief Add statistical data + void AddMetrics(uint64_t current_ms, bool success); + + /// @brief Clear statistical data + void ClearMetrics(); + + /// @brief Retrieve the failure rate within the statistical time period + float GetErrorRate(uint64_t current_ms, uint32_t request_volume_threshold); + + private: + struct Metrics { + std::atomic bucket_time{0}; // The start time of the bucket + std::atomic total_count{0}; // The request count during current time period + std::atomic error_count{0}; // The error count during current time period + }; + + uint32_t buckets_num_; + uint32_t stat_window_ms_per_bucket_; + std::vector buckets_; +}; + +} // namespace trpc::naming \ No newline at end of file diff --git a/trpc/naming/common/util/circuit_break/circuit_break_whitelist.cc b/trpc/naming/common/util/circuit_break/circuit_break_whitelist.cc new file mode 100644 index 00000000..1035ca2f --- /dev/null +++ b/trpc/naming/common/util/circuit_break/circuit_break_whitelist.cc @@ -0,0 +1,43 @@ +// +// +// Tencent is pleased to support the open source community by making tRPC available. +// +// Copyright (C) 2023 THL A29 Limited, a Tencent company. +// All rights reserved. +// +// If you have downloaded a copy of the tRPC source code from Tencent, +// please note that tRPC source code is licensed under the Apache 2.0 License, +// A copy of the Apache 2.0 License is included in this file. +// +// + +#include "trpc/naming/common/util/circuit_break/circuit_break_whitelist.h" + +#include "trpc/codec/trpc/trpc.pb.h" + +namespace trpc::naming { + +CircuitBreakWhiteList::CircuitBreakWhiteList() { + // Add default error code to whitelist + circuitbreak_whitelist_.Writer().insert(TrpcRetCode::TRPC_SERVER_OVERLOAD_ERR); + circuitbreak_whitelist_.Writer().insert(TrpcRetCode::TRPC_SERVER_LIMITED_ERR); + circuitbreak_whitelist_.Swap(); +} + +void CircuitBreakWhiteList::SetCircuitBreakWhiteList(const std::vector& retcodes) { + auto& writer = circuitbreak_whitelist_.Writer(); + writer.clear(); + writer.insert(retcodes.begin(), retcodes.end()); + circuitbreak_whitelist_.Swap(); +} + +bool CircuitBreakWhiteList::Contains(int retcode) { + auto& reader = circuitbreak_whitelist_.Reader(); + if (reader.find(retcode) != reader.end()) { + return true; + } + + return false; +} + +} // namespace trpc::naming diff --git a/trpc/naming/common/util/circuit_break/circuit_break_whitelist.h b/trpc/naming/common/util/circuit_break/circuit_break_whitelist.h new file mode 100644 index 00000000..c53dc6d9 --- /dev/null +++ b/trpc/naming/common/util/circuit_break/circuit_break_whitelist.h @@ -0,0 +1,58 @@ +// +// +// Tencent is pleased to support the open source community by making tRPC available. +// +// Copyright (C) 2023 THL A29 Limited, a Tencent company. +// All rights reserved. +// +// If you have downloaded a copy of the tRPC source code from Tencent, +// please note that tRPC source code is licensed under the Apache 2.0 License, +// A copy of the Apache 2.0 License is included in this file. +// +// + +#pragma once + +#include +#include + +namespace trpc::naming { + +namespace detail { +template +class ReadersWriterData { + public: + virtual const T& Reader() { return instances_[reader_]; } + + virtual T& Writer() { return instances_[writer_]; } + + virtual void Swap() { std::swap(reader_, writer_); } + + public: + ReadersWriterData() : reader_(0), writer_(1) {} + virtual ~ReadersWriterData() = default; + + private: + T instances_[2]; + int reader_{0}; + int writer_{1}; +}; +} // namespace detail + +/// @brief Class of circuit breaker error code whitelist, is not thread safe. +class CircuitBreakWhiteList { + public: + CircuitBreakWhiteList(); + + /// @brief Set error code of whitelist + void SetCircuitBreakWhiteList(const std::vector& retcodes); + + /// @brief Determine whether the error code is in the whitelist + /// @return Return true if the error code is in the whitelist, otherwise return false + bool Contains(int retcode); + + private: + detail::ReadersWriterData> circuitbreak_whitelist_; +}; + +} // namespace trpc::naming diff --git a/trpc/naming/common/util/circuit_break/circuit_break_whitelist_test.cc b/trpc/naming/common/util/circuit_break/circuit_break_whitelist_test.cc new file mode 100644 index 00000000..2219b2b0 --- /dev/null +++ b/trpc/naming/common/util/circuit_break/circuit_break_whitelist_test.cc @@ -0,0 +1,30 @@ +// +// +// Tencent is pleased to support the open source community by making tRPC available. +// +// Copyright (C) 2023 THL A29 Limited, a Tencent company. +// All rights reserved. +// +// If you have downloaded a copy of the tRPC source code from Tencent, +// please note that tRPC source code is licensed under the Apache 2.0 License, +// A copy of the Apache 2.0 License is included in this file. +// +// + +#include "trpc/naming/common/util/circuit_break/circuit_break_whitelist.h" + +#include "gtest/gtest.h" + +namespace trpc::naming::testing { + +TEST(CircuitBreakWhiteListTest, SetAndContains) { + CircuitBreakWhiteList white_list; + std::vector retcodes; + retcodes.emplace_back(101); + retcodes.emplace_back(121); + white_list.SetCircuitBreakWhiteList(retcodes); + ASSERT_TRUE(white_list.Contains(101)); + ASSERT_TRUE(white_list.Contains(121)); +} + +} // namespace trpc::naming::testing \ No newline at end of file diff --git a/trpc/naming/common/util/circuit_break/circuit_breaker.h b/trpc/naming/common/util/circuit_break/circuit_breaker.h new file mode 100644 index 00000000..fd1a57cf --- /dev/null +++ b/trpc/naming/common/util/circuit_break/circuit_breaker.h @@ -0,0 +1,68 @@ +// +// +// Tencent is pleased to support the open source community by making tRPC available. +// +// Copyright (C) 2023 THL A29 Limited, a Tencent company. +// All rights reserved. +// +// If you have downloaded a copy of the tRPC source code from Tencent, +// please note that tRPC source code is licensed under the Apache 2.0 License, +// A copy of the Apache 2.0 License is included in this file. +// +// + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +namespace trpc::naming { + +enum CircuitBreakStatus { + kClose, + kOpen, + kHalfOpen, +}; + +struct CircuitBreakRecordKey { + CircuitBreakRecordKey(const std::string& ip_, uint32_t port_) : ip(ip_), port(port_) {} + std::string ip; + uint32_t port; + bool operator==(const CircuitBreakRecordKey& key) const { return (ip == key.ip) && (port == key.port); } +}; + +struct CircuitBreakRecordKeyHash { + std::size_t operator()(const CircuitBreakRecordKey& key) const { + return std::hash()(key.ip) ^ std::hash()(key.port); + } +}; + +class CircuitBreaker { + public: + /// @brief Reserve only the circuit breaker statistics information of recordkey information passed in the parameters. + /// @note Call it during initialization or when there is a change in the node set. + virtual void Reserve(const std::unordered_set& keys) = 0; + + /// @brief Indicate whether there has been a change in the node status. + /// @note Call it in 'Select' and 'SelectBatch' interface. + /// If status changed, it is necessary to update the available node information. + virtual bool StatusChanged(uint64_t current_ms) = 0; + + /// @brief Add statistical data and switch node status based on invocation conditions. + /// @note Call it in 'ReportInvokeResult' interface. + virtual void AddRecordData(const CircuitBreakRecordKey& key, uint64_t current_ms, bool success) = 0; + + /// @brief Retrieve the circuit breaker status of all nodes. + /// @note Used in conjunction with the 'StatusChanged' interface to update the available node information when their + /// status changes. + virtual std::unordered_map GetAllStatus() = 0; +}; + +using CircuitBreakerPtr = std::shared_ptr; + +} // namespace trpc::naming diff --git a/trpc/naming/common/util/circuit_break/circuit_breaker_config.h b/trpc/naming/common/util/circuit_break/circuit_breaker_config.h new file mode 100644 index 00000000..f467b442 --- /dev/null +++ b/trpc/naming/common/util/circuit_break/circuit_breaker_config.h @@ -0,0 +1,68 @@ +// +// +// Tencent is pleased to support the open source community by making tRPC available. +// +// Copyright (C) 2023 THL A29 Limited, a Tencent company. +// All rights reserved. +// +// If you have downloaded a copy of the tRPC source code from Tencent, +// please note that tRPC source code is licensed under the Apache 2.0 License, +// A copy of the Apache 2.0 License is included in this file. +// +// + +#pragma once + +#include + +#include "yaml-cpp/yaml.h" + +#include "trpc/util/log/logging.h" + +namespace trpc::naming { + +struct CircuitBreakConfig { + std::string plugin_name{"default"}; // circuit break plugin name + bool enable{true}; // enable circuit break or not + YAML::Node plugin_config; // config of circuit break plugin + + void Display() const { + TRPC_LOG_DEBUG("----------CircuitBreakConfig----------"); + TRPC_LOG_DEBUG("plugin_name:" << plugin_name); + TRPC_LOG_DEBUG("enable:" << (enable ? "true" : "false")); + TRPC_LOG_DEBUG("plugin_config:" << plugin_config); + } +}; + +} // namespace trpc::naming + +namespace YAML { + +template <> +struct convert { + static YAML::Node encode(const trpc::naming::CircuitBreakConfig& config) { + YAML::Node node; + + node[config.plugin_name] = config.plugin_config; + + return node; + } + + static bool decode(const YAML::Node& node, trpc::naming::CircuitBreakConfig& config) { + auto iter = node.begin(); + if (iter != node.end()) { + config.plugin_name = iter->first.as(); + config.plugin_config = iter->second; + + if (iter->second) { + if (iter->second["enable"]) { + config.enable = iter->second["enable"].as(); + } + } + } + + return true; + } +}; + +} // namespace YAML diff --git a/trpc/naming/common/util/circuit_break/circuit_breaker_creator_factory.h b/trpc/naming/common/util/circuit_break/circuit_breaker_creator_factory.h new file mode 100644 index 00000000..e386bfa1 --- /dev/null +++ b/trpc/naming/common/util/circuit_break/circuit_breaker_creator_factory.h @@ -0,0 +1,58 @@ +// +// +// Tencent is pleased to support the open source community by making tRPC available. +// +// Copyright (C) 2023 THL A29 Limited, a Tencent company. +// All rights reserved. +// +// If you have downloaded a copy of the tRPC source code from Tencent, +// please note that tRPC source code is licensed under the Apache 2.0 License, +// A copy of the Apache 2.0 License is included in this file. +// +// + +#pragma once + +#include +#include +#include + +#include "yaml-cpp/yaml.h" + +#include "trpc/naming/common/util/circuit_break/circuit_breaker.h" + +namespace trpc::naming { + +class CircuitBreakerCreatorFactory { + public: + using CreateFunction = + std::function; + + static CircuitBreakerCreatorFactory* GetInstance() { + static CircuitBreakerCreatorFactory factory; + return &factory; + } + + void Register(const std::string& name, CreateFunction&& creator) { + creators_[name] = std::move(creator); + } + + CircuitBreakerPtr Create(const std::string& name, const YAML::Node* plugin_config, const std::string& service_name) { + auto iter = creators_.find(name); + if (iter != creators_.end()) { + return iter->second(plugin_config, service_name); + } + + return nullptr; + } + + private: + CircuitBreakerCreatorFactory() = default; + CircuitBreakerCreatorFactory(const CircuitBreakerCreatorFactory&) = delete; + CircuitBreakerCreatorFactory& operator=(const CircuitBreakerCreatorFactory&) = delete; + + private: + std::unordered_map creators_; +}; + +} // namespace trpc::naming diff --git a/trpc/naming/common/util/circuit_break/default_circuit_beaker_config_test.cc b/trpc/naming/common/util/circuit_break/default_circuit_beaker_config_test.cc new file mode 100644 index 00000000..26318939 --- /dev/null +++ b/trpc/naming/common/util/circuit_break/default_circuit_beaker_config_test.cc @@ -0,0 +1,47 @@ +// +// +// Tencent is pleased to support the open source community by making tRPC available. +// +// Copyright (C) 2023 THL A29 Limited, a Tencent company. +// All rights reserved. +// +// If you have downloaded a copy of the tRPC source code from Tencent, +// please note that tRPC source code is licensed under the Apache 2.0 License, +// A copy of the Apache 2.0 License is included in this file. +// +// + +#include "trpc/naming/common/util/circuit_break/default_circuit_breaker_config.h" + +#include "gtest/gtest.h" + +namespace trpc::naming::testing { + +TEST(DefaultCircuitBreakerConfig, Parse) { + std::string config_str = + "enable: false\r\n" + "statWindow: 30s\r\n" + "bucketsNum: 6\r\n" + "sleepWindow: 10s\r\n" + "requestVolumeThreshold: 5\r\n" + "errorRateThreshold: 0.4\r\n" + "continuousErrorThreshold: 5\r\n" + "requestCountAfterHalfOpen: 5\r\n" + "successCountAfterHalfOpen: 4"; + + YAML::Node plugin_config = YAML::Load(config_str); + + DefaultCircuitBreakerConfig circuit_breaker_config; + YAML::convert::decode(plugin_config, circuit_breaker_config); + ASSERT_EQ(circuit_breaker_config.enable, false); + ASSERT_EQ(circuit_breaker_config.stat_window_ms, 30000); + ASSERT_EQ(circuit_breaker_config.buckets_num, 6); + ASSERT_EQ(circuit_breaker_config.sleep_window_ms, 10000); + ASSERT_EQ(circuit_breaker_config.request_volume_threshold, 5); + ASSERT_EQ(circuit_breaker_config.error_rate_threshold, 0.4f); + ASSERT_EQ(circuit_breaker_config.continuous_error_threshold, 5); + ASSERT_EQ(circuit_breaker_config.request_count_after_half_open, 5); + ASSERT_EQ(circuit_breaker_config.success_count_after_half_open, 4); +} + +} // namespace trpc::naming::testing diff --git a/trpc/naming/common/util/circuit_break/default_circuit_breaker.cc b/trpc/naming/common/util/circuit_break/default_circuit_breaker.cc new file mode 100644 index 00000000..5f13b21e --- /dev/null +++ b/trpc/naming/common/util/circuit_break/default_circuit_breaker.cc @@ -0,0 +1,226 @@ +// +// +// Tencent is pleased to support the open source community by making tRPC available. +// +// Copyright (C) 2023 THL A29 Limited, a Tencent company. +// All rights reserved. +// +// If you have downloaded a copy of the tRPC source code from Tencent, +// please note that tRPC source code is licensed under the Apache 2.0 License, +// A copy of the Apache 2.0 License is included in this file. +// +// + +#include "trpc/naming/common/util/circuit_break/default_circuit_breaker.h" + +#include "trpc/util/log/logging.h" + +namespace trpc::naming { + +namespace { +const char* StatusStr(CircuitBreakStatus status) { + switch (status) { + case CircuitBreakStatus::kClose: + return "Close"; + case CircuitBreakStatus::kHalfOpen: + return "HalfOpen"; + case CircuitBreakStatus::kOpen: + return "Open"; + default: + return "Unknown"; + } +} + +enum kReasonIndex { + kContinuousError = 0, + kExcessiveErrorRate, + kEnoughSuccessCount, + kNoEnoughSuccessCount, + kReachSleepWindow, +}; + +const char* kReasonStr[] = { + "continuous error", + "excessive error rate", + "enough success count after half-open", + "no enough success count after half-open", + "reach the sleep window point", +}; + +void PrintSwitchLog(const std::string& service_name, const CircuitBreakRecordKey& key, CircuitBreakStatus from_status, + CircuitBreakStatus to_status, int reason) { + TRPC_FMT_INFO("{} {}:{} switch from {} status to {} status due to {}", service_name, key.ip, key.port, + StatusStr(from_status), StatusStr(to_status), kReasonStr[reason]); +} +} // namespace + +DefaultCircuitBreaker::DefaultCircuitBreaker(const DefaultCircuitBreakerConfig& config, const std::string& service_name) + : config_(config), service_name_(service_name) { + config_.continuous_error_threshold -= 1; + stat_window_ms_per_bucket_ = config_.stat_window_ms / config_.buckets_num; +} + +void DefaultCircuitBreaker::Reserve(const std::unordered_set& keys) { + if (!config_.enable) { + return; + } + + std::unique_lock wl(mutex_); + for (auto& key : keys) { + if (record_info_.find(key) == record_info_.end()) { + record_info_.emplace(key, std::make_unique(config_.stat_window_ms, config_.buckets_num)); + } + } + + for (auto iter = record_info_.begin(); iter != record_info_.end();) { + if (keys.find(iter->first) == keys.end()) { + iter = record_info_.erase(iter); + } else { + iter++; + } + } +} + +bool DefaultCircuitBreaker::StatusChanged(uint64_t current_ms) { + if (!config_.enable) { + return false; + } + + auto status_change = true; + if (status_changed_.compare_exchange_weak(status_change, false, std::memory_order_relaxed)) { + return true; + } + + return ShouldChangeToHalfOpen(current_ms); +} + +void DefaultCircuitBreaker::SwitchStatus(const CircuitBreakRecordKey& key, CircuitBreakRecordData& record_info, + uint64_t current_ms, CircuitBreakStatus from_status, + CircuitBreakStatus to_status, int reason) { + if (to_status == CircuitBreakStatus::kOpen) { + if (record_info.status.compare_exchange_strong(from_status, to_status, std::memory_order_relaxed)) { + PrintSwitchLog(service_name_, key, from_status, to_status, reason); + record_info.status.store(CircuitBreakStatus::kOpen); + record_info.circuitbreak_open_time.store(current_ms); + record_info.continuous_error_count.store(0); + record_info.success_count_after_half_open.store(0); + record_info.request_count_after_half_open.store(0); + SetStatusChanged(true); + AddLastCircuitBreakOpenTime(current_ms); + return; + } + } else if (to_status == CircuitBreakStatus::kClose) { + PrintSwitchLog(service_name_, key, from_status, to_status, reason); + record_info.status.store(to_status); + // Reset the statistical data of the time window when switch to close state + record_info.metrics_data.ClearMetrics(); + SetStatusChanged(true); + return; + } +} + +bool DefaultCircuitBreaker::ShouldChangeToHalfOpen(uint64_t current_ms) { + std::unique_lock wl(switch_to_open_times_mutex_); + if (switch_to_open_times_.empty()) { + return false; + } + + if (switch_to_open_times_.front() + config_.sleep_window_ms > current_ms) { + return false; + } + + switch_to_open_times_.pop_front(); + wl.unlock(); + + // When the sleep time window is reached, switch the nodes in the open state to the half-open state. + bool flag = false; + std::shared_lock rl(mutex_); + for (auto& item : record_info_) { + auto& record_info = *item.second; + auto status = record_info.status.load(std::memory_order_relaxed); + + if (status == CircuitBreakStatus::kOpen && + record_info.circuitbreak_open_time + config_.sleep_window_ms <= current_ms) { + if (record_info.status.compare_exchange_strong(status, CircuitBreakStatus::kHalfOpen, + std::memory_order_relaxed)) { + record_info.request_count_after_half_open.store(0); + record_info.success_count_after_half_open.store(0); + PrintSwitchLog(service_name_, item.first, status, CircuitBreakStatus::kHalfOpen, kReachSleepWindow); + flag = true; + } + } + } + + return flag; +} + +void DefaultCircuitBreaker::AddRecordData(const CircuitBreakRecordKey& key, uint64_t current_ms, bool success) { + if (!config_.enable) { + return; + } + + std::shared_lock rl(mutex_); + auto iter = record_info_.find(key); + if (iter != record_info_.end()) { + auto& record_info = *(iter->second); + record_info.metrics_data.AddMetrics(current_ms, success); + auto status = record_info.status.load(std::memory_order_relaxed); + if (status == CircuitBreakStatus::kClose) { + if (!success) { + auto continuous_error_count = record_info.continuous_error_count.fetch_add(1, std::memory_order_relaxed); + if (continuous_error_count >= config_.continuous_error_threshold) { + // When the consecutive failure count reaches the threshold, switch to open state. + SwitchStatus(key, record_info, current_ms, status, CircuitBreakStatus::kOpen, kContinuousError); + return; + } + } else { + record_info.continuous_error_count.store(0, std::memory_order_relaxed); + } + + // Check if the failure rate meets the standard periodically + auto current_check_error_rate_time = record_info.current_check_error_rate_time.load(std::memory_order_relaxed); + if (current_check_error_rate_time == 0) { + record_info.current_check_error_rate_time.store(current_ms, std::memory_order_relaxed); + current_check_error_rate_time = current_ms; + } + if (current_ms - current_check_error_rate_time >= config_.stat_window_ms) { + // set the next statistical time to stat_window_ms_per_bucket_ later + record_info.current_check_error_rate_time.store( + current_ms - config_.stat_window_ms + stat_window_ms_per_bucket_, std::memory_order_relaxed); + if (record_info.metrics_data.GetErrorRate(current_ms, config_.request_volume_threshold) >= + config_.error_rate_threshold) { + SwitchStatus(key, record_info, current_ms, status, CircuitBreakStatus::kOpen, kExcessiveErrorRate); + return; + } + } + + } else if (status == CircuitBreakStatus::kHalfOpen) { + if (success) { + record_info.success_count_after_half_open.fetch_add(1, std::memory_order_acq_rel); + } + auto request_count_after_half_open = + record_info.request_count_after_half_open.fetch_add(1, std::memory_order_acq_rel); + if (request_count_after_half_open == config_.request_count_after_half_open - 1) { + if (record_info.success_count_after_half_open.load(std::memory_order_acquire) >= + config_.success_count_after_half_open) { + SwitchStatus(key, record_info, current_ms, status, CircuitBreakStatus::kClose, kEnoughSuccessCount); + } else { + SwitchStatus(key, record_info, current_ms, status, CircuitBreakStatus::kOpen, kNoEnoughSuccessCount); + } + } + } + } +} + +std::unordered_map +DefaultCircuitBreaker::GetAllStatus() { + // copy data + std::unordered_map status; + std::shared_lock rl(mutex_); + for (auto& record : record_info_) { + status.emplace(record.first, record.second->status.load(std::memory_order_relaxed)); + } + return status; +} + +} // namespace trpc::naming \ No newline at end of file diff --git a/trpc/naming/common/util/circuit_break/default_circuit_breaker.h b/trpc/naming/common/util/circuit_break/default_circuit_breaker.h new file mode 100644 index 00000000..e35a3f29 --- /dev/null +++ b/trpc/naming/common/util/circuit_break/default_circuit_breaker.h @@ -0,0 +1,86 @@ +// +// +// Tencent is pleased to support the open source community by making tRPC available. +// +// Copyright (C) 2023 THL A29 Limited, a Tencent company. +// All rights reserved. +// +// If you have downloaded a copy of the tRPC source code from Tencent, +// please note that tRPC source code is licensed under the Apache 2.0 License, +// A copy of the Apache 2.0 License is included in this file. +// +// + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "trpc/naming/common/util/circuit_break/bucket_circular_array.h" +#include "trpc/naming/common/util/circuit_break/circuit_breaker.h" +#include "trpc/naming/common/util/circuit_break/default_circuit_breaker_config.h" + +namespace trpc::naming { + +/// @brief The implementation of the default circuit breaker. +class DefaultCircuitBreaker : public CircuitBreaker { + public: + static constexpr char kName[] = "default"; + + DefaultCircuitBreaker(const DefaultCircuitBreakerConfig& config, const std::string& service_name); + + void Reserve(const std::unordered_set& keys) override; + + bool StatusChanged(uint64_t current_ms) override; + + void AddRecordData(const CircuitBreakRecordKey& key, uint64_t current_ms, bool success) override; + + std::unordered_map GetAllStatus() override; + + private: + struct CircuitBreakRecordData { + CircuitBreakRecordData(uint32_t stat_window_ms, uint32_t buckets_num) : metrics_data(stat_window_ms, buckets_num) {} + BucketCircularArray metrics_data; // invoke statistics + std::atomic status{CircuitBreakStatus::kClose}; // circuit break state + std::atomic continuous_error_count{0}; // The number of consecutive failed times in closed state + std::atomic circuitbreak_open_time{0}; // The moment when switch to the open state + std::atomic success_count_after_half_open{0}; // The success count in half-open state + std::atomic request_count_after_half_open{0}; // The request count in half-open state + std::atomic current_check_error_rate_time{0}; // The recent moment of checking if the failure rate has + // exceeded the threshold. + }; + + void SwitchStatus(const CircuitBreakRecordKey& key, CircuitBreakRecordData& record_info, uint64_t current_ms, + CircuitBreakStatus from_status, CircuitBreakStatus to_status, int reason); + + void SetStatusChanged(bool value) { status_changed_.store(value, std::memory_order_relaxed); } + + void AddLastCircuitBreakOpenTime(uint64_t current_ms) { + std::lock_guard lock(switch_to_open_times_mutex_); + switch_to_open_times_.push_back(current_ms); + } + + bool ShouldChangeToHalfOpen(uint64_t current_ms); + + private: + DefaultCircuitBreakerConfig config_; + std::string service_name_; + uint32_t stat_window_ms_per_bucket_; + std::atomic status_changed_{false}; + + // The status of node circuit breaker, where the key is ip:port + std::unordered_map, CircuitBreakRecordKeyHash> + record_info_; + mutable std::shared_mutex mutex_; + + std::deque switch_to_open_times_; + std::mutex switch_to_open_times_mutex_; +}; + +} // namespace trpc::naming diff --git a/trpc/naming/common/util/circuit_break/default_circuit_breaker_config.h b/trpc/naming/common/util/circuit_break/default_circuit_breaker_config.h new file mode 100644 index 00000000..db2e6388 --- /dev/null +++ b/trpc/naming/common/util/circuit_break/default_circuit_breaker_config.h @@ -0,0 +1,118 @@ +// +// +// Tencent is pleased to support the open source community by making tRPC available. +// +// Copyright (C) 2023 THL A29 Limited, a Tencent company. +// All rights reserved. +// +// If you have downloaded a copy of the tRPC source code from Tencent, +// please note that tRPC source code is licensed under the Apache 2.0 License, +// A copy of the Apache 2.0 License is included in this file. +// +// + +#pragma once + +#include +#include + +#include "yaml-cpp/yaml.h" + +namespace trpc::naming { + +struct DefaultCircuitBreakerConfig { + bool enable{true}; // Whether to enable circuit breaker + uint32_t stat_window_ms{60000}; // Statistical time window period (in milliseconds) + uint32_t buckets_num{12}; // Buckets number in statistical time window period + uint32_t sleep_window_ms{30000}; // The time it takes to switch to the half-open state from open state + uint32_t request_volume_threshold{10}; // The minimum number of requests required to trigger circuit breaker + float error_rate_threshold{0.5}; // The percentage of failure rate at which circuit breaker is triggered + uint32_t continuous_error_threshold{10}; // The number of consecutive failures before triggering circuit breaker + uint32_t request_count_after_half_open{10}; // The number of allowed requests in the half-open state + uint32_t success_count_after_half_open{8}; // The minimum number of successful attempts required to transition from + // the half-open state to the closed state. +}; + +} // namespace trpc::naming + +// convert strings ending with 'ms' or 's' into milliseconds +static uint32_t ConvertToMilliSeconds(const std::string& time_str) { + uint32_t time_ms = 0; + auto str_size = time_str.size(); + // strings ending with 'ms' or 's' + if (str_size >= 2) { + if (!time_str.substr(str_size - 2).compare("ms")) { + time_ms = atoi(time_str.substr(0, str_size - 2).c_str()); + return time_ms; + } else if (!time_str.substr(str_size - 1).compare("s")) { + time_ms = atoi(time_str.substr(0, str_size - 1).c_str()) * 1000; + return time_ms; + } + } + + // strings without time unit + time_ms = atoi(time_str.c_str()); + return time_ms; +} + +namespace YAML { + +template <> +struct convert { + static YAML::Node encode(const trpc::naming::DefaultCircuitBreakerConfig& config) { + YAML::Node node; + node["enable"] = config.enable; + node["statWindow"] = std::to_string(config.stat_window_ms) + "ms"; + node["bucketsNum"] = config.buckets_num; + node["sleepWindow"] = std::to_string(config.sleep_window_ms) + "ms"; + node["requestVolumeThreshold"] = config.request_volume_threshold; + node["errorRateThreshold"] = config.error_rate_threshold; + node["continuousErrorThreshold"] = config.continuous_error_threshold; + node["requestCountAfterHalfOpen"] = config.request_count_after_half_open; + node["successCountAfterHalfOpen"] = config.success_count_after_half_open; + return node; + } + + static bool decode(const YAML::Node& node, trpc::naming::DefaultCircuitBreakerConfig& config) { + if (node["enable"]) { + config.enable = node["enable"].as(); + } + if (node["statWindow"]) { + std::string stat_window = node["statWindow"].as(); + config.stat_window_ms = ConvertToMilliSeconds(stat_window); + } + if (node["bucketsNum"]) { + config.buckets_num = node["bucketsNum"].as(); + } + + if (config.stat_window_ms % config.buckets_num != 0) { + config.stat_window_ms = config.stat_window_ms / config.buckets_num * config.buckets_num; + } + + if (node["sleepWindow"]) { + std::string sleep_window = node["sleepWindow"].as(); + config.sleep_window_ms = ConvertToMilliSeconds(sleep_window); + } + if (node["requestVolumeThreshold"]) { + config.request_volume_threshold = node["requestVolumeThreshold"].as(); + } + if (node["errorRateThreshold"]) { + config.error_rate_threshold = node["errorRateThreshold"].as(); + } + if (node["continuousErrorThreshold"]) { + config.continuous_error_threshold = node["continuousErrorThreshold"].as(); + if (config.continuous_error_threshold == 0) { + config.continuous_error_threshold = 1; + } + } + if (node["requestCountAfterHalfOpen"]) { + config.request_count_after_half_open = node["requestCountAfterHalfOpen"].as(); + } + if (node["successCountAfterHalfOpen"]) { + config.success_count_after_half_open = node["successCountAfterHalfOpen"].as(); + } + return true; + } +}; + +} // namespace YAML \ No newline at end of file diff --git a/trpc/naming/common/util/circuit_break/default_circuit_breaker_test.cc b/trpc/naming/common/util/circuit_break/default_circuit_breaker_test.cc new file mode 100644 index 00000000..2dfcd3be --- /dev/null +++ b/trpc/naming/common/util/circuit_break/default_circuit_breaker_test.cc @@ -0,0 +1,259 @@ +// +// +// Tencent is pleased to support the open source community by making tRPC available. +// +// Copyright (C) 2023 THL A29 Limited, a Tencent company. +// All rights reserved. +// +// If you have downloaded a copy of the tRPC source code from Tencent, +// please note that tRPC source code is licensed under the Apache 2.0 License, +// A copy of the Apache 2.0 License is included in this file. +// +// + +#include "trpc/naming/common/util/circuit_break/default_circuit_breaker.h" + +#include "gtest/gtest.h" + +#include "trpc/util/time.h" + +namespace trpc::naming::testing { + +class DefaultCircuitBreakerTest : public ::testing::Test { + protected: + static void SetUpTestCase() {} + + static void TearDownTestCase() {} + + virtual void SetUp() { + config_.enable = true; + circuit_breaker_ = std::make_shared(config_, ""); + } + + virtual void TearDown() { circuit_breaker_.reset(); } + + protected: + CircuitBreakerPtr circuit_breaker_{nullptr}; + DefaultCircuitBreakerConfig config_; +}; + +TEST_F(DefaultCircuitBreakerTest, Reserve) { + // After calling Reserve during initialization, the recorddata can be correspondingly updated + std::unordered_set keys; + keys.emplace(CircuitBreakRecordKey("127.0.0.1", 10001)); + keys.emplace(CircuitBreakRecordKey("127.0.0.1", 10002)); + keys.emplace(CircuitBreakRecordKey("127.0.0.1", 10003)); + circuit_breaker_->Reserve(keys); + auto all_status = circuit_breaker_->GetAllStatus(); + ASSERT_TRUE(all_status.size() == keys.size()); + for (auto& key : keys) { + ASSERT_TRUE(all_status.find(key) != all_status.end()); + } + + // Calling Reserve after adding or removing nodes, the recorddata will also reflect the corresponding changes + keys.emplace(CircuitBreakRecordKey("127.0.0.1", 10004)); + keys.erase(CircuitBreakRecordKey("127.0.0.1", 10003)); + circuit_breaker_->Reserve(keys); + all_status = circuit_breaker_->GetAllStatus(); + ASSERT_TRUE(all_status.size() == keys.size()); + for (auto& key : keys) { + ASSERT_TRUE(all_status.find(key) != all_status.end()); + } +} + +// Test when the consecutive failure count meets the criteria, the node will trigger a circuit breaker. +// It will switch from open state to the half-open state after the sleep_window time has elapsed. +// If the success count meets the criteria during this period, it will switch to the closed state. +// If the success count does not meet the criteria during this period, it will switch to the open state. +TEST_F(DefaultCircuitBreakerTest, ContinuousFailAndResume) { + CircuitBreakRecordKey key1("127.0.0.1", 10001); + CircuitBreakRecordKey key2("127.0.0.1", 10002); + // Initialize + std::unordered_set keys; + keys.emplace(CircuitBreakRecordKey(key1)); + keys.emplace(CircuitBreakRecordKey(key2)); + circuit_breaker_->Reserve(keys); + + // trigger a circuit break when the consecutive failure count meets the criteria + for (uint32_t i = 1; i <= config_.continuous_error_threshold; i++) { + for (auto& key : keys) { + uint64_t current_ms = GetMilliSeconds(); + circuit_breaker_->AddRecordData(CircuitBreakRecordKey(key), current_ms, false); + auto all_status = circuit_breaker_->GetAllStatus(); + if (i < config_.continuous_error_threshold) { + ASSERT_FALSE(circuit_breaker_->StatusChanged(current_ms)); + ASSERT_TRUE(all_status.find(key)->second == CircuitBreakStatus::kClose); + } else { + ASSERT_TRUE(circuit_breaker_->StatusChanged(current_ms)); + ASSERT_TRUE(all_status.find(key)->second == CircuitBreakStatus::kOpen); + } + } + } + + uint64_t current_ms = GetMilliSeconds(); + ASSERT_FALSE(circuit_breaker_->StatusChanged(current_ms)); + + // switch from open state to half-open state after the sleep_window time has elapsed + current_ms += config_.sleep_window_ms; + ASSERT_TRUE(circuit_breaker_->StatusChanged(current_ms)); + auto all_status = circuit_breaker_->GetAllStatus(); + for (auto& key : keys) { + ASSERT_TRUE(all_status.find(key)->second == CircuitBreakStatus::kHalfOpen); + } + + // switch from half-open state to closed state when the success count meets the criteria during this period + for (uint32_t i = 1; i <= config_.request_count_after_half_open; i++) { + circuit_breaker_->AddRecordData(CircuitBreakRecordKey(key1), current_ms, true); + auto all_status = circuit_breaker_->GetAllStatus(); + if (i < config_.request_count_after_half_open) { + ASSERT_FALSE(circuit_breaker_->StatusChanged(current_ms)); + ASSERT_TRUE(all_status.find(key1)->second == CircuitBreakStatus::kHalfOpen); + } else { + ASSERT_TRUE(circuit_breaker_->StatusChanged(current_ms)); + ASSERT_TRUE(all_status.find(key1)->second == CircuitBreakStatus::kClose); + } + } + + // switch from half-open state to open state when the success count does not meet the criteria during this period + for (uint32_t i = 1; i <= config_.request_count_after_half_open; i++) { + if (i % 2 == 0) { + circuit_breaker_->AddRecordData(CircuitBreakRecordKey(key2), current_ms, true); + } else { + circuit_breaker_->AddRecordData(CircuitBreakRecordKey(key2), current_ms, false); + } + + auto all_status = circuit_breaker_->GetAllStatus(); + if (i < config_.request_count_after_half_open) { + ASSERT_FALSE(circuit_breaker_->StatusChanged(current_ms)); + ASSERT_TRUE(all_status.find(key2)->second == CircuitBreakStatus::kHalfOpen); + } else { + ASSERT_TRUE(circuit_breaker_->StatusChanged(current_ms)); + ASSERT_TRUE(all_status.find(key2)->second == CircuitBreakStatus::kOpen); + } + } +} + +// Test when the failure rate meets the criteria, the node will trigger a circuit breaker. +TEST_F(DefaultCircuitBreakerTest, ReachErrorRateAndResume) { + CircuitBreakRecordKey key1("127.0.0.1", 10001); + CircuitBreakRecordKey key2("127.0.0.1", 10002); + // Initialize + std::unordered_set keys; + keys.emplace(CircuitBreakRecordKey(key1)); + keys.emplace(CircuitBreakRecordKey(key2)); + circuit_breaker_->Reserve(keys); + + uint64_t interval = config_.stat_window_ms / config_.buckets_num; + for (uint32_t i = 0; i <= config_.buckets_num; i++) { + for (auto& key : keys) { + uint64_t current_ms = GetMilliSeconds() - config_.stat_window_ms + i * interval; + if (i % 3 != 0) { + circuit_breaker_->AddRecordData(CircuitBreakRecordKey(key), current_ms, false); + } else { + circuit_breaker_->AddRecordData(CircuitBreakRecordKey(key), current_ms, true); + } + auto all_status = circuit_breaker_->GetAllStatus(); + if (i < config_.buckets_num) { + ASSERT_FALSE(circuit_breaker_->StatusChanged(current_ms)); + ASSERT_TRUE(all_status.find(key)->second == CircuitBreakStatus::kClose); + } else { + ASSERT_TRUE(circuit_breaker_->StatusChanged(current_ms)); + ASSERT_TRUE(all_status.find(key)->second == CircuitBreakStatus::kOpen); + } + } + } + + uint64_t current_ms = GetMilliSeconds(); + ASSERT_FALSE(circuit_breaker_->StatusChanged(current_ms)); + + // switch from open state to the half-open state after the sleep_window time has elapsed + current_ms += config_.sleep_window_ms; + ASSERT_TRUE(circuit_breaker_->StatusChanged(current_ms)); + auto all_status = circuit_breaker_->GetAllStatus(); + for (auto& key : keys) { + ASSERT_TRUE(all_status.find(key)->second == CircuitBreakStatus::kHalfOpen); + } + + // switch from half-open state to closed state when the success count meets the criteria during this period + for (uint32_t i = 1; i <= config_.request_count_after_half_open; i++) { + circuit_breaker_->AddRecordData(CircuitBreakRecordKey(key1), current_ms, true); + auto all_status = circuit_breaker_->GetAllStatus(); + if (i < config_.request_count_after_half_open) { + ASSERT_FALSE(circuit_breaker_->StatusChanged(current_ms)); + ASSERT_TRUE(all_status.find(key1)->second == CircuitBreakStatus::kHalfOpen); + } else { + ASSERT_TRUE(circuit_breaker_->StatusChanged(current_ms)); + ASSERT_TRUE(all_status.find(key1)->second == CircuitBreakStatus::kClose); + } + } + + // switch from half-open state to open state when the success count does not meet the criteria during this period + for (uint32_t i = 1; i <= config_.request_count_after_half_open; i++) { + if (i % 2 == 0) { + circuit_breaker_->AddRecordData(CircuitBreakRecordKey(key2), current_ms, true); + } else { + circuit_breaker_->AddRecordData(CircuitBreakRecordKey(key2), current_ms, false); + } + + auto all_status = circuit_breaker_->GetAllStatus(); + if (i < config_.request_count_after_half_open) { + ASSERT_FALSE(circuit_breaker_->StatusChanged(current_ms)); + ASSERT_TRUE(all_status.find(key2)->second == CircuitBreakStatus::kHalfOpen); + } else { + ASSERT_TRUE(circuit_breaker_->StatusChanged(current_ms)); + ASSERT_TRUE(all_status.find(key2)->second == CircuitBreakStatus::kOpen); + } + } +} + +// The circuit breaker will not be triggered when the node is called normally +TEST_F(DefaultCircuitBreakerTest, NoTriggerCircuitBreak) { + CircuitBreakRecordKey key1("127.0.0.1", 10001); + // Initialize + std::unordered_set keys; + keys.emplace(CircuitBreakRecordKey(key1)); + circuit_breaker_->Reserve(keys); + + // Simulate 10 rounds of calls, with a 20% chance of error in each round + uint64_t interval = config_.stat_window_ms / config_.buckets_num; + uint32_t loop_times = 10; + for (uint32_t i = 1; i <= config_.buckets_num * loop_times; i++) { + uint64_t current_ms = GetMilliSeconds() - config_.stat_window_ms * loop_times + i * interval; + if (i % 5 != 0) { + circuit_breaker_->AddRecordData(CircuitBreakRecordKey(key1), current_ms, true); + } else { + circuit_breaker_->AddRecordData(CircuitBreakRecordKey(key1), current_ms, false); + } + auto all_status = circuit_breaker_->GetAllStatus(); + ASSERT_FALSE(circuit_breaker_->StatusChanged(current_ms)); + ASSERT_TRUE(all_status.find(key1)->second == CircuitBreakStatus::kClose); + } +} + +// If the failure rate meets the criteria within the statistical period but does not reach the threshold for the number +// of requests, the circuit breaker will not be triggered. +// And if the consecutive failure count is greater than or equal to the consecutive failure count threshold, the circuit +// breaker will be triggered. +TEST_F(DefaultCircuitBreakerTest, NoReachRequestVolumeThreshold) { + CircuitBreakRecordKey key1("127.0.0.1", 10001); + // Initialize + std::unordered_set keys; + keys.emplace(CircuitBreakRecordKey(key1)); + circuit_breaker_->Reserve(keys); + + uint64_t interval = config_.stat_window_ms * 2 / config_.request_volume_threshold; + for (uint32_t i = 1; i <= config_.request_volume_threshold; i++) { + uint64_t current_ms = GetMilliSeconds() - config_.stat_window_ms * 2 + i * interval; + circuit_breaker_->AddRecordData(CircuitBreakRecordKey(key1), current_ms, false); + auto all_status = circuit_breaker_->GetAllStatus(); + if (i < config_.continuous_error_threshold) { + ASSERT_FALSE(circuit_breaker_->StatusChanged(current_ms)); + ASSERT_TRUE(all_status.find(key1)->second == CircuitBreakStatus::kClose); + } else if (i == config_.continuous_error_threshold) { + ASSERT_TRUE(circuit_breaker_->StatusChanged(current_ms)); + ASSERT_TRUE(all_status.find(key1)->second == CircuitBreakStatus::kOpen); + } + } +} + +} // namespace trpc::naming::testing \ No newline at end of file diff --git a/trpc/naming/common/util/loadbalance/polling/BUILD b/trpc/naming/common/util/loadbalance/polling/BUILD index 09dded5a..4538299b 100644 --- a/trpc/naming/common/util/loadbalance/polling/BUILD +++ b/trpc/naming/common/util/loadbalance/polling/BUILD @@ -8,9 +8,6 @@ cc_library( name = "polling_load_balance", srcs = ["polling_load_balance.cc"], hdrs = ["polling_load_balance.h"], - visibility = [ - "//visibility:public", - ], deps = [ "//trpc/naming:load_balance_factory", "//trpc/util/log:logging", diff --git a/trpc/naming/direct/BUILD b/trpc/naming/direct/BUILD index 2a49ba97..1c3cf0d7 100644 --- a/trpc/naming/direct/BUILD +++ b/trpc/naming/direct/BUILD @@ -4,16 +4,50 @@ licenses(["notice"]) package(default_visibility = ["//visibility:public"]) +cc_library( + name = "direct_selector_conf", + hdrs = ["direct_selector_conf.h"], + deps = [ + "//trpc/naming/common/util/circuit_break:circuit_breaker_config", + ], +) + +cc_library( + name = "direct_selector_conf_parser", + hdrs = ["direct_selector_conf_parser.h"], + deps = [ + "//trpc/naming/direct:direct_selector_conf", + "@com_github_jbeder_yaml_cpp//:yaml-cpp", + ], +) + +cc_test( + name = "direct_selector_conf_test", + srcs = ["direct_selector_conf_test.cc"], + deps = [ + ":direct_selector_conf", + ":direct_selector_conf_parser", + "@com_google_googletest//:gtest_main", + ], +) + cc_library( name = "selector_direct", srcs = ["selector_direct.cc"], hdrs = ["selector_direct.h"], deps = [ + ":direct_selector_conf", + ":direct_selector_conf_parser", + "//trpc/common/config:trpc_config", "//trpc/naming:load_balance_factory", "//trpc/naming:selector_factory", "//trpc/naming/common/util:utils_help", + "//trpc/naming/common/util/circuit_break:circuit_break_whitelist", + "//trpc/naming/common/util/circuit_break:circuit_breaker", + "//trpc/naming/common/util/circuit_break:circuit_breaker_creator_factory", "//trpc/naming/common/util/loadbalance/polling:polling_load_balance", "//trpc/util:string_util", + "//trpc/util:time", "//trpc/util/log:logging", ], ) @@ -21,10 +55,10 @@ cc_library( cc_library( name = "direct_selector_filter", hdrs = ["direct_selector_filter.h"], - visibility = [ - "//visibility:public", - ], + srcs = ["direct_selector_filter.cc"], deps = [ + ":direct_selector_conf", + ":direct_selector_conf_parser", "//trpc/filter", "//trpc/naming:selector_workflow", ], @@ -33,10 +67,19 @@ cc_library( cc_test( name = "selector_direct_test", srcs = ["selector_direct_test.cc"], + data = [ + "//trpc/naming/testing:direct_selector_test.yaml", + ], deps = [ + ":direct_selector_conf", ":selector_direct", "//trpc/common:trpc_plugin", + "//trpc/common/config:trpc_config", + "//trpc/naming/common/util/circuit_break:circuit_breaker_creator_factory", + "//trpc/naming/common/util/circuit_break:default_circuit_breaker", + "//trpc/naming/common/util/circuit_break:default_circuit_breaker_config", "//trpc/naming/common/util/loadbalance/polling:polling_load_balance", + "//trpc/util:time", "@com_google_googletest//:gtest", "@com_google_googletest//:gtest_main", ], diff --git a/trpc/naming/direct/direct_selector_conf.h b/trpc/naming/direct/direct_selector_conf.h new file mode 100644 index 00000000..8851d805 --- /dev/null +++ b/trpc/naming/direct/direct_selector_conf.h @@ -0,0 +1,27 @@ + +// +// +// Tencent is pleased to support the open source community by making tRPC available. +// +// Copyright (C) 2023 THL A29 Limited, a Tencent company. +// All rights reserved. +// +// If you have downloaded a copy of the tRPC source code from Tencent, +// please note that tRPC source code is licensed under the Apache 2.0 License, +// A copy of the Apache 2.0 License is included in this file. +// +// + +#pragma once + +#include "trpc/naming/common/util/circuit_break/circuit_breaker_config.h" + +namespace trpc::naming { + +struct DirectSelectorConfig { + CircuitBreakConfig circuit_break_config; + + void Display() const { circuit_break_config.Display(); } +}; + +} // namespace trpc::naming diff --git a/trpc/naming/direct/direct_selector_conf_parser.h b/trpc/naming/direct/direct_selector_conf_parser.h new file mode 100644 index 00000000..ed03dc70 --- /dev/null +++ b/trpc/naming/direct/direct_selector_conf_parser.h @@ -0,0 +1,42 @@ + +// +// +// Tencent is pleased to support the open source community by making tRPC available. +// +// Copyright (C) 2023 THL A29 Limited, a Tencent company. +// All rights reserved. +// +// If you have downloaded a copy of the tRPC source code from Tencent, +// please note that tRPC source code is licensed under the Apache 2.0 License, +// A copy of the Apache 2.0 License is included in this file. +// +// + +#pragma once + +#include "yaml-cpp/yaml.h" + +#include "trpc/naming/direct/direct_selector_conf.h" + +namespace YAML { + +template <> +struct convert { + static YAML::Node encode(const trpc::naming::DirectSelectorConfig& config) { + YAML::Node node; + + node["circuitBreaker"] = config.circuit_break_config; + + return node; + } + + static bool decode(const YAML::Node& node, trpc::naming::DirectSelectorConfig& config) { + if (node["circuitBreaker"]) { + config.circuit_break_config = node["circuitBreaker"].as(); + } + + return true; + } +}; + +} // namespace YAML diff --git a/trpc/naming/direct/direct_selector_conf_test.cc b/trpc/naming/direct/direct_selector_conf_test.cc new file mode 100644 index 00000000..6627ddbb --- /dev/null +++ b/trpc/naming/direct/direct_selector_conf_test.cc @@ -0,0 +1,36 @@ + +// +// +// Tencent is pleased to support the open source community by making tRPC available. +// +// Copyright (C) 2023 THL A29 Limited, a Tencent company. +// All rights reserved. +// +// If you have downloaded a copy of the tRPC source code from Tencent, +// please note that tRPC source code is licensed under the Apache 2.0 License, +// A copy of the Apache 2.0 License is included in this file. +// +// + +#include "trpc/naming/direct/direct_selector_conf.h" + +#include "gtest/gtest.h" + +#include "trpc/naming/direct/direct_selector_conf_parser.h" + +namespace trpc::naming::testing { + +TEST(DirectSelectorConfigTest, parse) { + DirectSelectorConfig direct_selector_config; + direct_selector_config.circuit_break_config.plugin_name = "self"; + direct_selector_config.circuit_break_config.enable = false; + direct_selector_config.circuit_break_config.plugin_config["enable"] = "false"; + + YAML::Node root = YAML::convert::encode(direct_selector_config); + DirectSelectorConfig tmp; + YAML::convert::decode(root, tmp); + ASSERT_TRUE(tmp.circuit_break_config.plugin_name == direct_selector_config.circuit_break_config.plugin_name); + ASSERT_TRUE(tmp.circuit_break_config.enable == direct_selector_config.circuit_break_config.enable); +} + +} // namespace trpc::naming::testing \ No newline at end of file diff --git a/trpc/naming/direct/direct_selector_filter.cc b/trpc/naming/direct/direct_selector_filter.cc new file mode 100644 index 00000000..0a36e33c --- /dev/null +++ b/trpc/naming/direct/direct_selector_filter.cc @@ -0,0 +1,28 @@ +// +// +// Tencent is pleased to support the open source community by making tRPC available. +// +// Copyright (C) 2023 THL A29 Limited, a Tencent company. +// All rights reserved. +// +// If you have downloaded a copy of the tRPC source code from Tencent, +// please note that tRPC source code is licensed under the Apache 2.0 License, +// A copy of the Apache 2.0 License is included in this file. +// +// + +#include "trpc/naming/direct/direct_selector_filter.h" + +#include "trpc/common/config/trpc_config.h" +#include "trpc/naming/direct/direct_selector_conf.h" +#include "trpc/naming/direct/direct_selector_conf_parser.h" + +namespace trpc { +DirectSelectorFilter::DirectSelectorFilter() { + naming::DirectSelectorConfig config; + if (!TrpcConfig::GetInstance()->GetPluginConfig("selector", "direct", config)) { + TRPC_FMT_DEBUG("Get plugin config fail, use default config"); + } + selector_flow_ = std::make_unique("direct", config.circuit_break_config.enable, false); +} +} // namespace trpc diff --git a/trpc/naming/direct/direct_selector_filter.h b/trpc/naming/direct/direct_selector_filter.h index 9daa7343..ac18b573 100644 --- a/trpc/naming/direct/direct_selector_filter.h +++ b/trpc/naming/direct/direct_selector_filter.h @@ -26,9 +26,7 @@ namespace trpc { class DirectSelectorFilter : public MessageClientFilter { public: /// @brief Constructor that creates a SelectorWorkFlow object - DirectSelectorFilter() { selector_flow_ = std::make_unique("direct", false, false); } - - ~DirectSelectorFilter() override {} + DirectSelectorFilter(); /// @brief Initializes the SelectorWorkFlow object /// @return 0 if successful, -1 otherwise diff --git a/trpc/naming/direct/selector_direct.cc b/trpc/naming/direct/selector_direct.cc index 50dc38cc..d6bf29d4 100644 --- a/trpc/naming/direct/selector_direct.cc +++ b/trpc/naming/direct/selector_direct.cc @@ -15,13 +15,18 @@ #include #include +#include #include +#include "trpc/common/config/trpc_config.h" +#include "trpc/naming/common/util/circuit_break/circuit_breaker_creator_factory.h" #include "trpc/naming/common/util/loadbalance/polling/polling_load_balance.h" +#include "trpc/naming/direct/direct_selector_conf_parser.h" #include "trpc/naming/load_balance_factory.h" #include "trpc/naming/selector_factory.h" #include "trpc/util/log/logging.h" #include "trpc/util/string_util.h" +#include "trpc/util/time.h" namespace trpc { @@ -29,6 +34,15 @@ SelectorDirect::SelectorDirect(const LoadBalancePtr& load_balance) : default_loa TRPC_ASSERT(default_load_balance_); } +int SelectorDirect::Init() noexcept { + if (!TrpcConfig::GetInstance()->GetPluginConfig("selector", "direct", config_)) { + TRPC_FMT_DEBUG("Get plugin config fail, use default config"); + } + + config_.Display(); + return 0; +} + LoadBalance* SelectorDirect::GetLoadBalance(const std::string& name) { if (!name.empty()) { auto load_balance = LoadBalanceFactory::GetInstance()->Get(name).get(); @@ -40,8 +54,80 @@ LoadBalance* SelectorDirect::GetLoadBalance(const std::string& name) { return default_load_balance_.get(); } +naming::CircuitBreakerPtr SelectorDirect::CreateCircuitBreaker(const std::string& service_name) { + if (config_.circuit_break_config.enable) { + auto& plugin_name = config_.circuit_break_config.plugin_name; + auto& plugin_config = config_.circuit_break_config.plugin_config; + return naming::CircuitBreakerCreatorFactory::GetInstance()->Create(plugin_name, &plugin_config, service_name); + } + + return nullptr; +} + +bool SelectorDirect::DoUpdate(const std::string& service_name, const std::string& load_balance_name) { + std::unique_lock wlock(mutex_); + auto it = targets_map_.find(service_name); + if (it == targets_map_.end()) { + return false; + } + auto& real_endpoints = it->second.endpoints; + auto& available_endpoints = it->second.available_endpoints; + + auto& circuit_breaker = it->second.circuit_breaker; + if (circuit_breaker != nullptr) { + available_endpoints.clear(); + available_endpoints.reserve(real_endpoints.size()); + auto endpoints_status = circuit_breaker->GetAllStatus(); + for (auto& endpoint : real_endpoints) { + naming::CircuitBreakRecordKey key(endpoint.host, endpoint.port); + auto iter = endpoints_status.find(key); + if (iter != endpoints_status.end()) { + if (iter->second != naming::CircuitBreakStatus::kOpen) { + available_endpoints.push_back(endpoint); + } + } + } + } + + // update route info + SelectorInfo select_info; + select_info.name = service_name; + LoadBalanceInfo load_balance_info; + load_balance_info.info = &select_info; + // no available nodes, then select from all nodes + if (!available_endpoints.empty()) { + load_balance_info.endpoints = &available_endpoints; + } else { + load_balance_info.endpoints = &real_endpoints; + } + GetLoadBalance(load_balance_name)->Update(&load_balance_info); + return true; +} + +bool SelectorDirect::CheckAndUpdate(const SelectorInfo* info) { + if (!config_.circuit_break_config.enable) { + return false; + } + + std::shared_lock rlock(mutex_); + auto it = targets_map_.find(info->name); + if (it == targets_map_.end()) { + return false; + } + auto& circuit_breaker = it->second.circuit_breaker; + if (circuit_breaker && circuit_breaker->StatusChanged(GetMilliSeconds())) { + // If the node status changes, update the nodes set in the load balancer + rlock.unlock(); + return DoUpdate(info->name, info->load_balance_name); + } + + return false; +} + // Get the routing interface of the node being called int SelectorDirect::Select(const SelectorInfo* info, TrpcEndpointInfo* endpoint) { + CheckAndUpdate(info); + LoadBalanceResult load_balance_result; load_balance_result.info = info; auto lb = GetLoadBalance(info->load_balance_name); @@ -57,17 +143,13 @@ int SelectorDirect::Select(const SelectorInfo* info, TrpcEndpointInfo* endpoint) // Get a throttling interface asynchronously Future SelectorDirect::AsyncSelect(const SelectorInfo* info) { - LoadBalanceResult load_balance_result; - load_balance_result.info = info; - auto lb = GetLoadBalance(info->load_balance_name); - if (lb->Next(load_balance_result)) { - std::string error_str = "Do load balance of " + info->name + " failed"; - TRPC_LOG_ERROR(error_str); - return MakeExceptionFuture(CommonException(error_str.c_str())); + TrpcEndpointInfo endpoint; + if (Select(info, &endpoint) == 0) { + return MakeReadyFuture(std::move(endpoint)); } - TrpcEndpointInfo endpoint = std::move(std::any_cast(load_balance_result.result)); - return MakeReadyFuture(std::move(endpoint)); + std::string error_str = "Do load balance of " + info->name + " failed"; + return MakeExceptionFuture(CommonException(error_str.c_str())); } // An interface for fetching node routing information in bulk by policy @@ -77,6 +159,8 @@ int SelectorDirect::SelectBatch(const SelectorInfo* info, std::vector lock(mutex_); auto it = targets_map_.find(info->name); if (it == targets_map_.end()) { @@ -87,7 +171,11 @@ int SelectorDirect::SelectBatch(const SelectorInfo* info, std::vectorpolicy == SelectorPolicy::MULTIPLE) { - SelectMultiple(it->second.endpoints, endpoints, info->select_num); + if (!it->second.available_endpoints.empty()) { + SelectMultiple(it->second.available_endpoints, endpoints, info->select_num); + } else { + SelectMultiple(it->second.endpoints, endpoints, info->select_num); + } } else { *endpoints = it->second.endpoints; } @@ -101,32 +189,40 @@ Future> SelectorDirect::AsyncSelectBatch(const Sel return MakeExceptionFuture>(CommonException("Selectorinfo is null")); } - std::shared_lock lock(mutex_); - auto it = targets_map_.find(info->name); - if (it == targets_map_.end()) { - std::stringstream error_str; - error_str << "router info of " << info->name << " no found"; - TRPC_LOG_ERROR(error_str.str()); - return MakeExceptionFuture>(CommonException(error_str.str().c_str())); + std::vector endpoints; + if (SelectBatch(info, &endpoints) == 0) { + return MakeReadyFuture>(std::move(endpoints)); } - std::vector endpoints; - if (info->policy == SelectorPolicy::MULTIPLE) { - SelectMultiple(it->second.endpoints, &endpoints, info->select_num); - } else { - endpoints = it->second.endpoints; + std::string error_str = "router info of " + info->name + " no found"; + return MakeExceptionFuture>(CommonException(error_str.c_str())); +} + +bool SelectorDirect::IsSuccess(int framework_result) { + if (framework_result == TrpcRetCode::TRPC_INVOKE_SUCCESS) { + return true; } - return MakeReadyFuture>(std::move(endpoints)); + return circuitbreak_whitelist_.Contains(framework_result); } // Call the result reporting interface int SelectorDirect::ReportInvokeResult(const InvokeResult* result) { - if (nullptr == result) { - TRPC_LOG_ERROR("Invalid parameter: invoke result is empty"); + if (nullptr == result || !config_.circuit_break_config.enable) { return -1; } + std::shared_lock lock(mutex_); + auto it = targets_map_.find(result->name); + if (it != targets_map_.end()) { + auto& circuit_breaker = it->second.circuit_breaker; + if (circuit_breaker != nullptr) { + naming::CircuitBreakRecordKey key(result->context->GetIp(), result->context->GetPort()); + bool success = IsSuccess(result->framework_result); + circuit_breaker->AddRecordData(key, GetMilliSeconds(), success); + } + } + return 0; } @@ -139,12 +235,24 @@ int SelectorDirect::SetEndpoints(const RouterInfo* info) { // Generate a unique id for each node, then put the node in the cache EndpointsInfo endpoints_info; endpoints_info.endpoints = info->info; + std::unordered_set keys; + for (auto& endpoint : endpoints_info.endpoints) { + keys.emplace(naming::CircuitBreakRecordKey(endpoint.host, endpoint.port)); + } std::unique_lock uniq_lock(mutex_); auto iter = targets_map_.find(info->name); if (iter != targets_map_.end()) { // If the service name is in the cache, use the original id generator endpoints_info.id_generator = std::move(iter->second.id_generator); + endpoints_info.circuit_breaker = std::move(iter->second.circuit_breaker); + } else { + endpoints_info.circuit_breaker = CreateCircuitBreaker(info->name); + } + + // add or remove the status statistics in the circuit breaker based on the node set information + if (endpoints_info.circuit_breaker != nullptr) { + endpoints_info.circuit_breaker->Reserve(keys); } for (auto& item : endpoints_info.endpoints) { @@ -156,14 +264,13 @@ int SelectorDirect::SetEndpoints(const RouterInfo* info) { uniq_lock.unlock(); // Update service routing information to default loadbalance - SelectorInfo select_info; - select_info.name = info->name; - select_info.context = nullptr; - LoadBalanceInfo load_balance_info; - load_balance_info.info = &select_info; - load_balance_info.endpoints = &endpoints_info.endpoints; - default_load_balance_->Update(&load_balance_info); + DoUpdate(info->name, ""); return 0; } +bool SelectorDirect::SetCircuitBreakWhiteList(const std::vector& framework_retcodes) { + circuitbreak_whitelist_.SetCircuitBreakWhiteList(framework_retcodes); + return true; +} + } // namespace trpc diff --git a/trpc/naming/direct/selector_direct.h b/trpc/naming/direct/selector_direct.h index a2747c42..6380a379 100644 --- a/trpc/naming/direct/selector_direct.h +++ b/trpc/naming/direct/selector_direct.h @@ -20,7 +20,10 @@ #include #include "trpc/common/plugin.h" +#include "trpc/naming/common/util/circuit_break/circuit_break_whitelist.h" +#include "trpc/naming/common/util/circuit_break/circuit_breaker.h" #include "trpc/naming/common/util/utils_help.h" +#include "trpc/naming/direct/direct_selector_conf.h" #include "trpc/naming/load_balance.h" #include "trpc/naming/selector.h" @@ -31,6 +34,10 @@ class SelectorDirect : public Selector { public: explicit SelectorDirect(const LoadBalancePtr& load_balance); + /// @brief Initialization + /// @return Returns 0 on success, -1 on failure + int Init() noexcept override; + /// @brief Return the name of the plugin. /// @return The name of the plugin. std::string Name() const override { return "direct"; } @@ -71,12 +78,21 @@ class SelectorDirect : public Selector { /// @return 0 on success, -1 on failure. int SetEndpoints(const RouterInfo* info) override; + /// @brief Sets the whitelist of framework error codes for circuit breaking reporting + bool SetCircuitBreakWhiteList(const std::vector& framework_retcodes) override; + private: - /// @brief Gets the load balancer plugin with the specified name. - /// @param name The name of the load balancer plugin. - /// @return A pointer to the load balancer plugin. + // Gets the load balancer plugin with the specified name. LoadBalance* GetLoadBalance(const std::string& name); + naming::CircuitBreakerPtr CreateCircuitBreaker(const std::string& service_name); + + bool DoUpdate(const std::string& service_name, const std::string& load_balance_name); + + bool CheckAndUpdate(const SelectorInfo* info); + + bool IsSuccess(int framework_result); + private: // The name of the default load balancer plugin. static const char default_load_balance_name_[]; @@ -84,14 +100,22 @@ class SelectorDirect : public Selector { struct EndpointsInfo { // The endpoints for the target service. std::vector endpoints; + // The available endpoints for the target service. + std::vector available_endpoints; // The endpoint ID generator. EndpointIdGenerator id_generator; + // circuit breaker + naming::CircuitBreakerPtr circuit_breaker{nullptr}; }; std::unordered_map targets_map_; // The default load balancer plugin. LoadBalancePtr default_load_balance_; mutable std::shared_mutex mutex_; + + naming::DirectSelectorConfig config_; + + naming::CircuitBreakWhiteList circuitbreak_whitelist_; }; using SelectorDirectPtr = RefPtr; diff --git a/trpc/naming/direct/selector_direct_test.cc b/trpc/naming/direct/selector_direct_test.cc index 5092b237..c18b2b45 100644 --- a/trpc/naming/direct/selector_direct_test.cc +++ b/trpc/naming/direct/selector_direct_test.cc @@ -18,44 +18,83 @@ #include "gtest/gtest.h" #include "trpc/codec/trpc/trpc_client_codec.h" +#include "trpc/common/config/trpc_config.h" +#include "trpc/naming/common/util/circuit_break/circuit_breaker_creator_factory.h" +#include "trpc/naming/common/util/circuit_break/default_circuit_breaker.h" +#include "trpc/naming/common/util/circuit_break/default_circuit_breaker_config.h" #include "trpc/naming/common/util/loadbalance/polling/polling_load_balance.h" +#include "trpc/naming/direct/direct_selector_conf_parser.h" namespace trpc { -TEST(SelectorDirect, select_test) { - LoadBalancePtr polling = MakeRefCounted(); - std::shared_ptr ptr = std::make_shared(polling); +class SelectorDirectTest : public ::testing::Test { + protected: + static void SetUpTestCase() { + // register circuitbreak creator + naming::CircuitBreakerCreatorFactory::GetInstance()->Register( + "default", [](const YAML::Node* plugin_config, const std::string& service_name) { + naming::DefaultCircuitBreakerConfig config; + if (plugin_config) { + YAML::convert::decode(*plugin_config, config); + } + return std::make_shared(config, service_name); + }); + } - ptr->Init(); - ptr->Start(); - EXPECT_TRUE(ptr->Version() != ""); + static void TearDownTestCase() {} + + void SetUp() override { + int ret = TrpcConfig::GetInstance()->Init("./trpc/naming/testing/direct_selector_test.yaml"); + ASSERT_TRUE(ret == 0); + naming::DirectSelectorConfig config; + trpc::TrpcConfig::GetInstance()->GetPluginConfig("selector", "direct", config); + YAML::convert::decode(config.circuit_break_config.plugin_config, + circuit_break_config_); + + LoadBalancePtr polling = MakeRefCounted(); + direct_selector_ = std::make_shared(polling); + direct_selector_->Init(); + direct_selector_->Start(); + EXPECT_TRUE(direct_selector_->Version() != ""); + + // set endpoints of callee + RouterInfo info; + info.name = "test_service"; + std::vector& endpoints_info = info.info; + TrpcEndpointInfo endpoint1, endpoint2; + endpoint1.host = "127.0.0.1"; + endpoint1.port = 1001; + endpoint2.host = "127.0.0.1"; + endpoint2.port = 1002; + endpoints_info.push_back(endpoint1); + endpoints_info.push_back(endpoint2); + direct_selector_->SetEndpoints(&info); + } - // set endpoints of callee - RouterInfo info; - info.name = "test_service"; - std::vector& endpoints_info = info.info; - TrpcEndpointInfo endpoint1, endpoint2; - endpoint1.host = "127.0.0.1"; - endpoint1.port = 1001; - endpoint2.host = "192.168.0.2"; - endpoint2.port = 1002; - endpoints_info.push_back(endpoint1); - endpoints_info.push_back(endpoint2); - ptr->SetEndpoints(&info); + void TearDown() override { + direct_selector_->Stop(); + direct_selector_->Destroy(); + } + + protected: + std::shared_ptr direct_selector_; + naming::DefaultCircuitBreakerConfig circuit_break_config_; +}; +TEST_F(SelectorDirectTest, select_test) { auto context = trpc::MakeRefCounted(); SelectorInfo select_info; select_info.name = "test_service"; select_info.context = context; TrpcEndpointInfo endpoint; - ptr->Select(&select_info, &endpoint); + direct_selector_->Select(&select_info, &endpoint); EXPECT_TRUE(endpoint.host == "127.0.0.1") << (endpoint.host); EXPECT_TRUE(endpoint.port == 1001); EXPECT_TRUE(endpoint.id != kInvalidEndpointId); - ptr->Select(&select_info, &endpoint); - EXPECT_TRUE(endpoint.host == "192.168.0.2"); + direct_selector_->Select(&select_info, &endpoint); + EXPECT_TRUE(endpoint.host == "127.0.0.1"); EXPECT_TRUE(endpoint.port == 1002); EXPECT_TRUE(endpoint.id != kInvalidEndpointId); @@ -66,97 +105,59 @@ TEST(SelectorDirect, select_test) { auto trpc_codec = std::make_shared(); result.context = MakeRefCounted(trpc_codec); result.context->SetCallerName("test_service"); - result.context->SetAddr("192.168.0.1", 1001); - int ret = ptr->ReportInvokeResult(&result); + ExtendNodeAddr addr; + addr.addr.ip = "127.0.0.1"; + addr.addr.port = 1001; + result.context->SetRequestAddrByNaming(std::move(addr)); + + int ret = direct_selector_->ReportInvokeResult(&result); EXPECT_EQ(0, ret); - EXPECT_TRUE(ptr->ReportInvokeResult(nullptr) != 0); + EXPECT_TRUE(direct_selector_->ReportInvokeResult(nullptr) != 0); std::vector endpoints; - ptr->SelectBatch(&select_info, &endpoints); + direct_selector_->SelectBatch(&select_info, &endpoints); EXPECT_EQ(2, endpoints.size()); // test select multiple endpoints.clear(); select_info.policy = SelectorPolicy::MULTIPLE; - ret = ptr->SelectBatch(&select_info, &endpoints); + ret = direct_selector_->SelectBatch(&select_info, &endpoints); EXPECT_EQ(0, ret); select_info.name = "test_service1"; - ret = ptr->Select(&select_info, &endpoint); + ret = direct_selector_->Select(&select_info, &endpoint); EXPECT_NE(0, ret); endpoints.clear(); - ret = ptr->SelectBatch(&select_info, &endpoints); + ret = direct_selector_->SelectBatch(&select_info, &endpoints); EXPECT_NE(0, ret); - - ptr->Stop(); - ptr->Destroy(); } -TEST(SelectorDirect, select_exception_test) { - LoadBalancePtr polling = MakeRefCounted(); - std::shared_ptr ptr = std::make_shared(polling); - - ptr->Init(); - EXPECT_EQ(-1, ptr->SetEndpoints(nullptr)); - - // set endpoints of callee - RouterInfo info; - info.name = "test_service"; - std::vector& endpoints_info = info.info; - TrpcEndpointInfo endpoint1, endpoint2; - endpoint1.host = "127.0.0.1"; - endpoint1.port = 1001; - endpoints_info.push_back(endpoint1); - int ret = ptr->SetEndpoints(&info); - EXPECT_EQ(0, ret); - +TEST_F(SelectorDirectTest, select_exception_test) { auto context = trpc::MakeRefCounted(); SelectorInfo select_info; select_info.name = "test_service"; select_info.context = context; TrpcEndpointInfo endpoint; - ret = ptr->SelectBatch(nullptr, nullptr); + int ret = direct_selector_->SelectBatch(nullptr, nullptr); EXPECT_NE(0, ret); - ptr->AsyncSelectBatch(nullptr).Then([](Future>&& fut) { + direct_selector_->AsyncSelectBatch(nullptr).Then([](Future>&& fut) { EXPECT_TRUE(fut.IsFailed()); return MakeReadyFuture<>(); }); - - ptr->Stop(); - ptr->Destroy(); } -TEST(SelectorDirect, asyncselect_test) { - LoadBalancePtr polling = MakeRefCounted(); - std::shared_ptr ptr = std::make_shared(polling); - - ptr->Init(); - ptr->Start(); - - // set endpoints of callee - RouterInfo info; - info.name = "test_service"; - std::vector& endpoints_info = info.info; - TrpcEndpointInfo endpoint1, endpoint2; - endpoint1.host = "127.0.0.1"; - endpoint1.port = 1001; - endpoint2.host = "192.168.0.2"; - endpoint2.port = 1002; - endpoints_info.push_back(endpoint1); - endpoints_info.push_back(endpoint2); - ptr->SetEndpoints(&info); - +TEST_F(SelectorDirectTest, asyncselect_test) { auto context = trpc::MakeRefCounted(); SelectorInfo select_info; select_info.name = "test_service"; select_info.context = context; TrpcEndpointInfo endpoint; - ptr->AsyncSelect(&select_info).Then([](Future&& fut) { + direct_selector_->AsyncSelect(&select_info).Then([](Future&& fut) { EXPECT_TRUE(fut.IsReady()); TrpcEndpointInfo endpoint = fut.GetValue0(); EXPECT_TRUE(endpoint.host == "127.0.0.1"); @@ -164,79 +165,70 @@ TEST(SelectorDirect, asyncselect_test) { return trpc::MakeReadyFuture<>(); }); - ptr->AsyncSelect(&select_info).Then([](Future&& fut) { + direct_selector_->AsyncSelect(&select_info).Then([](Future&& fut) { EXPECT_TRUE(fut.IsReady()); TrpcEndpointInfo endpoint = fut.GetValue0(); - EXPECT_TRUE(endpoint.host == "192.168.0.2"); + EXPECT_TRUE(endpoint.host == "127.0.0.1"); EXPECT_TRUE(endpoint.port == 1002); return trpc::MakeReadyFuture<>(); }); - ptr->AsyncSelectBatch(&select_info).Then([this](trpc::Future>&& fut) { - EXPECT_TRUE(fut.IsReady()); - std::vector endpoints = fut.GetValue0(); - EXPECT_EQ(2, endpoints.size()); - return trpc::MakeReadyFuture<>(); - }); + direct_selector_->AsyncSelectBatch(&select_info) + .Then([this](trpc::Future>&& fut) { + EXPECT_TRUE(fut.IsReady()); + std::vector endpoints = fut.GetValue0(); + EXPECT_EQ(2, endpoints.size()); + return trpc::MakeReadyFuture<>(); + }); // test select multiple select_info.policy = SelectorPolicy::MULTIPLE; - ptr->AsyncSelectBatch(&select_info).Then([this](trpc::Future>&& fut) { - EXPECT_TRUE(fut.IsReady()); - std::vector endpoints = fut.GetValue0(); - EXPECT_EQ(2, endpoints.size()); - return trpc::MakeReadyFuture<>(); - }); + direct_selector_->AsyncSelectBatch(&select_info) + .Then([this](trpc::Future>&& fut) { + EXPECT_TRUE(fut.IsReady()); + std::vector endpoints = fut.GetValue0(); + EXPECT_EQ(2, endpoints.size()); + return trpc::MakeReadyFuture<>(); + }); select_info.name = "test_service1"; - ptr->AsyncSelect(&select_info).Then([](Future&& fut) { + direct_selector_->AsyncSelect(&select_info).Then([](Future&& fut) { EXPECT_TRUE(fut.IsFailed()); return trpc::MakeReadyFuture<>(); }); - ptr->AsyncSelectBatch(&select_info).Then([](Future>&& fut) { + direct_selector_->AsyncSelectBatch(&select_info).Then([](Future>&& fut) { EXPECT_TRUE(fut.IsFailed()); return trpc::MakeReadyFuture<>(); }); - - ptr->Stop(); - ptr->Destroy(); } -TEST(SelectorDirect, endpoint_unique_id_test) { - LoadBalancePtr polling = MakeRefCounted(); - std::shared_ptr ptr = std::make_shared(polling); - - ptr->Init(); - - // set endpoints of callee - RouterInfo info; - info.name = "test_service"; - std::vector& endpoints_info = info.info; - TrpcEndpointInfo endpoint1, endpoint2; - endpoint1.host = "127.0.0.1"; - endpoint1.port = 1001; - endpoint2.host = "192.168.0.2"; - endpoint2.port = 1002; - endpoints_info.push_back(endpoint1); - endpoints_info.push_back(endpoint2); - ptr->SetEndpoints(&info); +TEST_F(SelectorDirectTest, endpoint_unique_id_test) { SelectorInfo select_info1; select_info1.name = "test_service"; auto context1 = trpc::MakeRefCounted(); select_info1.context = context1; select_info1.policy = SelectorPolicy::ALL; std::vector result1; - ptr->SelectBatch(&select_info1, &result1); + direct_selector_->SelectBatch(&select_info1, &result1); // Reset endpoints info: put endpoint2 after - TrpcEndpointInfo endpoint3; + TrpcEndpointInfo endpoint1; endpoint1.host = "127.0.0.1"; - endpoint1.port = 1003; - endpoints_info.pop_back(); + endpoint1.port = 1001; + TrpcEndpointInfo endpoint3; + endpoint3.host = "127.0.0.1"; + endpoint3.port = 1003; + TrpcEndpointInfo endpoint2; + endpoint2.host = "127.0.0.1"; + endpoint2.port = 1002; + RouterInfo info; + info.name = "test_service"; + std::vector& endpoints_info = info.info; + endpoints_info.push_back(endpoint1); endpoints_info.push_back(endpoint3); endpoints_info.push_back(endpoint2); - ptr->SetEndpoints(&info); + direct_selector_->SetEndpoints(&info); SelectorInfo select_info2; select_info2.name = "test_service"; @@ -244,7 +236,7 @@ TEST(SelectorDirect, endpoint_unique_id_test) { select_info2.context = context2; select_info2.policy = SelectorPolicy::ALL; std::vector result2; - ptr->SelectBatch(&select_info2, &result2); + direct_selector_->SelectBatch(&select_info2, &result2); // Verify that the id of the same node in the original server is the same ASSERT_EQ(result1.size() + 1, result2.size()); for (auto i : result1) { @@ -256,4 +248,190 @@ TEST(SelectorDirect, endpoint_unique_id_test) { } } +// Report failure for 10 consecutive times, expecting switch to open state, and not accessed within the +// 'sleep_window_ms' and recovery occurs when the success criteria are met after reaching the half-open state. +TEST_F(SelectorDirectTest, circuitbreak_when_continuous_fail) { + InvokeResult result; + result.name = "test_service"; + result.framework_result = ::trpc::TrpcRetCode::TRPC_CLIENT_NETWORK_ERR; + result.cost_time = 100; + result.context = MakeRefCounted(); + ExtendNodeAddr addr; + addr.addr.ip = "127.0.0.1"; + addr.addr.port = 1001; + result.context->SetRequestAddrByNaming(std::move(addr)); + for (uint32_t i = 0; i < circuit_break_config_.continuous_error_threshold; i++) { + direct_selector_->ReportInvokeResult(&result); + } + + uint32_t call_times = 100; + auto context = trpc::MakeRefCounted(); + SelectorInfo select_info; + select_info.name = "test_service"; + select_info.context = context; + TrpcEndpointInfo endpoint; + for (uint32_t i = 0; i < call_times; i++) { + direct_selector_->Select(&select_info, &endpoint); + ASSERT_NE(endpoint.port, 1001); + } + + std::this_thread::sleep_for(std::chrono::milliseconds(circuit_break_config_.sleep_window_ms)); + // After the node switch to half-open state, if the number of successful calls meets the requirements, it will be + // restored. + for (uint32_t i = 0, j = 0; i < circuit_break_config_.request_count_after_half_open && j < call_times; j++) { + direct_selector_->Select(&select_info, &endpoint); + ExtendNodeAddr addr; + addr.addr.ip = "127.0.0.1"; + addr.addr.port = endpoint.port; + result.context->SetRequestAddrByNaming(std::move(addr)); + result.framework_result = ::trpc::TrpcRetCode::TRPC_INVOKE_SUCCESS; + direct_selector_->ReportInvokeResult(&result); + if (endpoint.port == 1001) { + i++; + } + } + + // check the probability of the node being called + int call_count = 0; + for (uint32_t i = 0; i < call_times; i++) { + int ret = direct_selector_->Select(&select_info, &endpoint); + ASSERT_EQ(ret, 0); + if (endpoint.port == 1001) { + call_count++; + } + } + + float call_count_rate = call_count * 1.0 / call_times; + ASSERT_TRUE(call_count_rate > 0.2 && call_count_rate < 0.8) << call_count_rate << std::endl; +} + +// When the error rate exceeds the threshold within the statistical period, expecting switch to open state, and not accessed within the +// 'sleep_window_ms' and re-enter open state when the success criteria aren't met after reaching the half-open state. +TEST_F(SelectorDirectTest, circuitbreak_when_exceed_errorrate) { + InvokeResult result; + result.name = "test_service"; + result.cost_time = 100; + result.context = MakeRefCounted(); + ExtendNodeAddr addr; + addr.addr.ip = "127.0.0.1"; + addr.addr.port = 1001; + result.context->SetRequestAddrByNaming(std::move(addr)); + auto interval = circuit_break_config_.stat_window_ms / circuit_break_config_.request_volume_threshold; + for (uint32_t i = 0; i <= circuit_break_config_.request_volume_threshold; i++) { + result.framework_result = + (i % 2 == 0 ? ::trpc::TrpcRetCode::TRPC_CLIENT_NETWORK_ERR : ::trpc::TrpcRetCode::TRPC_INVOKE_SUCCESS); + direct_selector_->ReportInvokeResult(&result); + + std::this_thread::sleep_for(std::chrono::milliseconds(interval)); + } + + std::this_thread::sleep_for(std::chrono::milliseconds(interval)); + uint32_t call_times = 100; + auto context = trpc::MakeRefCounted(); + SelectorInfo select_info; + select_info.name = "test_service"; + select_info.context = context; + TrpcEndpointInfo endpoint; + for (uint32_t i = 0; i < call_times; i++) { + direct_selector_->Select(&select_info, &endpoint); + ASSERT_NE(endpoint.port, 1001); + } + + std::this_thread::sleep_for(std::chrono::milliseconds(circuit_break_config_.sleep_window_ms)); + // After the node is switched from open state to half-open state, if the number of successful calls does not meet the + // requirements, it will re-enter the open state. + for (uint32_t i = 0, j = 0; i < circuit_break_config_.request_count_after_half_open && j < call_times; j++) { + direct_selector_->Select(&select_info, &endpoint); + ExtendNodeAddr addr; + addr.addr.ip = "127.0.0.1"; + addr.addr.port = endpoint.port; + result.context->SetRequestAddrByNaming(std::move(addr)); + result.framework_result = + (j % 2 == 0 ? ::trpc::TrpcRetCode::TRPC_CLIENT_NETWORK_ERR : ::trpc::TrpcRetCode::TRPC_INVOKE_SUCCESS); + direct_selector_->ReportInvokeResult(&result); + if (endpoint.port == 1001) { + i++; + } + } + + // check the probability of the node being called + for (uint32_t i = 0; i < call_times; i++) { + direct_selector_->Select(&select_info, &endpoint); + ASSERT_NE(endpoint.port, 1001); + } +} + +// Recover all when all nodes are circuitbreak open state +TEST_F(SelectorDirectTest, all_endpoints_circuitbreak) { + InvokeResult result; + result.name = "test_service"; + result.framework_result = ::trpc::TrpcRetCode::TRPC_CLIENT_NETWORK_ERR; + result.cost_time = 100; + result.context = MakeRefCounted(); + for (uint32_t i = 0; i < circuit_break_config_.continuous_error_threshold; i++) { + ExtendNodeAddr addr1; + addr1.addr.ip = "127.0.0.1"; + addr1.addr.port = 1001; + result.context->SetRequestAddrByNaming(std::move(addr1)); + direct_selector_->ReportInvokeResult(&result); + + ExtendNodeAddr addr2; + addr2.addr.ip = "127.0.0.1"; + addr2.addr.port = 1002; + result.context->SetRequestAddrByNaming(std::move(addr2)); + direct_selector_->ReportInvokeResult(&result); + } + + // check the probability of the node being called + uint32_t call_times = 100; + auto context = trpc::MakeRefCounted(); + SelectorInfo select_info; + select_info.name = "test_service"; + select_info.context = context; + TrpcEndpointInfo endpoint; + uint32_t call_port1_count = 0; + uint32_t call_port2_count = 0; + for (uint32_t i = 0; i < call_times; i++) { + int ret = direct_selector_->Select(&select_info, &endpoint); + ASSERT_EQ(ret, 0); + if (endpoint.port == 1001) { + call_port1_count++; + } else if (endpoint.port == 1002) { + call_port2_count++; + } + } + + ASSERT_TRUE(call_port1_count + call_port2_count == call_times); + ASSERT_NEAR(call_port1_count, call_port2_count, call_times * 0.1); +} + +TEST_F(SelectorDirectTest, select_multi_with_circuitbreak) { + auto context = trpc::MakeRefCounted(); + SelectorInfo select_info; + select_info.name = "test_service"; + select_info.context = context; + select_info.policy = SelectorPolicy::MULTIPLE; + select_info.select_num = 2; + std::vector endpoints; + direct_selector_->SelectBatch(&select_info, &endpoints); + ASSERT_TRUE(endpoints.size() == static_cast(select_info.select_num)); + + InvokeResult result; + result.name = "test_service"; + result.framework_result = ::trpc::TrpcRetCode::TRPC_CLIENT_NETWORK_ERR; + result.cost_time = 100; + result.context = MakeRefCounted(); + for (uint32_t i = 0; i < circuit_break_config_.continuous_error_threshold; i++) { + ExtendNodeAddr addr; + addr.addr.ip = "127.0.0.1"; + addr.addr.port = 1001; + result.context->SetRequestAddrByNaming(std::move(addr)); + direct_selector_->ReportInvokeResult(&result); + } + + endpoints.clear(); + direct_selector_->SelectBatch(&select_info, &endpoints); + ASSERT_TRUE(endpoints.size() == 1); +} + } // namespace trpc diff --git a/trpc/naming/domain/BUILD b/trpc/naming/domain/BUILD index 53afda7c..d0708c80 100644 --- a/trpc/naming/domain/BUILD +++ b/trpc/naming/domain/BUILD @@ -6,19 +6,43 @@ package( default_visibility = ["//visibility:public"], ) +cc_library( + name = "domain_selector_conf", + hdrs = [ + "domain_selector_conf.h", + "domain_selector_conf_parser.h", + ], + srcs = ["domain_selector_conf.cc"], + deps = [ + "//trpc/util/log:logging", + "//trpc/naming/common/util/circuit_break:circuit_breaker_config", + "@com_github_jbeder_yaml_cpp//:yaml-cpp", + ], +) + +cc_test( + name = "domain_selector_conf_test", + srcs = ["domain_selector_conf_test.cc"], + deps = [ + ":domain_selector_conf", + "@com_google_googletest//:gtest_main", + ], +) + cc_library( name = "selector_domain", srcs = ["selector_domain.cc"], hdrs = ["selector_domain.h"], deps = [ - #"//trpc/common:plugin_class_registry", - "//trpc/common/config:domain_naming_conf", - "//trpc/common/config:domain_naming_conf_parser", + ":domain_selector_conf", "//trpc/common/config:trpc_config", "//trpc/util/log:logging", "//trpc/naming:load_balance_factory", "//trpc/naming:selector_factory", "//trpc/naming/common/util:utils_help", + "//trpc/naming/common/util/circuit_break:circuit_break_whitelist", + "//trpc/naming/common/util/circuit_break:circuit_breaker", + "//trpc/naming/common/util/circuit_break:circuit_breaker_creator_factory", "//trpc/naming/common/util/loadbalance/polling:polling_load_balance", "//trpc/runtime/common:periphery_task_scheduler", "//trpc/util/string:string_util", @@ -29,26 +53,33 @@ cc_library( cc_library( name = "domain_selector_filter", + srcs = ["domain_selector_filter.cc"], hdrs = ["domain_selector_filter.h"], - visibility = [ - "//visibility:public", - ], deps = [ + "//trpc/common/config:trpc_config", "//trpc/filter", "//trpc/naming:selector_workflow", + "//trpc/naming/domain:domain_selector_conf", ], ) cc_test( name = "selector_domain_test", srcs = ["selector_domain_test.cc"], - data = ["//trpc/naming/testing:domain_test.yaml"], + data = [ + "//trpc/naming/testing:domain_test.yaml", + "//trpc/naming/testing:domain_selector_test.yaml", + ], deps = [ ":selector_domain", "//trpc/codec/trpc:trpc_client_codec", "//trpc/common:trpc_plugin", "//trpc/common/config:trpc_config", + "//trpc/naming/common/util/circuit_break:circuit_breaker_creator_factory", + "//trpc/naming/common/util/circuit_break:default_circuit_breaker", + "//trpc/naming/common/util/circuit_break:default_circuit_breaker_config", "//trpc/naming/common/util/loadbalance/polling:polling_load_balance", + "//trpc/util:domain_util", "@com_google_googletest//:gtest", "@com_google_googletest//:gtest_main", ], diff --git a/trpc/common/config/domain_naming_conf.cc b/trpc/naming/domain/domain_selector_conf.cc similarity index 89% rename from trpc/common/config/domain_naming_conf.cc rename to trpc/naming/domain/domain_selector_conf.cc index bb4e84fd..317ca89a 100644 --- a/trpc/common/config/domain_naming_conf.cc +++ b/trpc/naming/domain/domain_selector_conf.cc @@ -11,7 +11,7 @@ // // -#include "trpc/common/config/domain_naming_conf.h" +#include "trpc/naming/domain/domain_selector_conf.h" #include "trpc/util/log/logging.h" @@ -22,6 +22,8 @@ void DomainSelectorConfig::Display() const { TRPC_FMT_DEBUG("exclude_ipv6:{}", exclude_ipv6); + circuit_break_config.Display(); + TRPC_FMT_DEBUG("--------------------------------------"); } diff --git a/trpc/common/config/domain_naming_conf.h b/trpc/naming/domain/domain_selector_conf.h similarity index 82% rename from trpc/common/config/domain_naming_conf.h rename to trpc/naming/domain/domain_selector_conf.h index 5047546c..12c5b814 100644 --- a/trpc/common/config/domain_naming_conf.h +++ b/trpc/naming/domain/domain_selector_conf.h @@ -13,6 +13,8 @@ #pragma once +#include "trpc/naming/common/util/circuit_break/circuit_breaker_config.h" + namespace trpc::naming { /// @brief domain select plugin configuration @@ -20,6 +22,9 @@ struct DomainSelectorConfig { /// @brief Is ipv6 excluded (if the domain is ipv6 only, the exclusion will not apply) bool exclude_ipv6{false}; + /// @brief Ciruit break config + CircuitBreakConfig circuit_break_config; + /// @brief Print out the logger configuration. void Display() const; }; diff --git a/trpc/common/config/domain_naming_conf_parser.h b/trpc/naming/domain/domain_selector_conf_parser.h similarity index 78% rename from trpc/common/config/domain_naming_conf_parser.h rename to trpc/naming/domain/domain_selector_conf_parser.h index 2e305098..eaace823 100644 --- a/trpc/common/config/domain_naming_conf_parser.h +++ b/trpc/naming/domain/domain_selector_conf_parser.h @@ -13,7 +13,7 @@ #include "yaml-cpp/yaml.h" -#include "trpc/common/config/domain_naming_conf.h" +#include "trpc/naming/domain/domain_selector_conf.h" namespace YAML { @@ -21,7 +21,11 @@ template <> struct convert { static YAML::Node encode(const trpc::naming::DomainSelectorConfig& config) { YAML::Node node; + node["exclude_ipv6"] = config.exclude_ipv6; + + node["circuitBreaker"] = config.circuit_break_config; + return node; } @@ -29,6 +33,11 @@ struct convert { if (node["exclude_ipv6"]) { config.exclude_ipv6 = node["exclude_ipv6"].as(); } + + if (node["circuitBreaker"]) { + config.circuit_break_config = node["circuitBreaker"].as(); + } + return true; } }; diff --git a/trpc/common/config/domain_naming_conf_test.cc b/trpc/naming/domain/domain_selector_conf_test.cc similarity index 59% rename from trpc/common/config/domain_naming_conf_test.cc rename to trpc/naming/domain/domain_selector_conf_test.cc index 41ff6585..f2dd1639 100644 --- a/trpc/common/config/domain_naming_conf_test.cc +++ b/trpc/naming/domain/domain_selector_conf_test.cc @@ -11,15 +11,18 @@ // // -#include "trpc/common/config/domain_naming_conf.h" -#include "trpc/common/config/domain_naming_conf_parser.h" +#include "trpc/naming/domain/domain_selector_conf.h" #include "gtest/gtest.h" -#include "yaml-cpp/yaml.h" -TEST(LoadbalancerConfig, load_test) { +#include "trpc/naming/domain/domain_selector_conf_parser.h" + +TEST(LoadbalancerConfig, EncodeAndDecode) { trpc::naming::DomainSelectorConfig domain_selector_config; domain_selector_config.exclude_ipv6 = true; + domain_selector_config.circuit_break_config.plugin_name = "self"; + domain_selector_config.circuit_break_config.enable = false; + domain_selector_config.circuit_break_config.plugin_config["enable"] = "false"; domain_selector_config.Display(); YAML::convert c; @@ -30,4 +33,6 @@ TEST(LoadbalancerConfig, load_test) { tmp.Display(); ASSERT_EQ(domain_selector_config.exclude_ipv6, tmp.exclude_ipv6); + ASSERT_TRUE(tmp.circuit_break_config.plugin_name == domain_selector_config.circuit_break_config.plugin_name); + ASSERT_TRUE(tmp.circuit_break_config.enable == domain_selector_config.circuit_break_config.enable); } diff --git a/trpc/naming/domain/domain_selector_filter.cc b/trpc/naming/domain/domain_selector_filter.cc new file mode 100644 index 00000000..9103ddef --- /dev/null +++ b/trpc/naming/domain/domain_selector_filter.cc @@ -0,0 +1,28 @@ +// +// +// Tencent is pleased to support the open source community by making tRPC available. +// +// Copyright (C) 2023 THL A29 Limited, a Tencent company. +// All rights reserved. +// +// If you have downloaded a copy of the tRPC source code from Tencent, +// please note that tRPC source code is licensed under the Apache 2.0 License, +// A copy of the Apache 2.0 License is included in this file. +// +// + +#include "trpc/naming/domain/domain_selector_filter.h" + +#include "trpc/common/config/trpc_config.h" +#include "trpc/naming/domain/domain_selector_conf.h" +#include "trpc/naming/domain/domain_selector_conf_parser.h" + +namespace trpc { +DomainSelectorFilter::DomainSelectorFilter() { + naming::DomainSelectorConfig config; + if (!TrpcConfig::GetInstance()->GetPluginConfig("selector", "domain", config)) { + TRPC_FMT_DEBUG("Get plugin config fail, use default config"); + } + selector_flow_ = std::make_unique("domain", config.circuit_break_config.enable, false); +} +} // namespace trpc \ No newline at end of file diff --git a/trpc/naming/domain/domain_selector_filter.h b/trpc/naming/domain/domain_selector_filter.h index a24acedd..5ad81213 100644 --- a/trpc/naming/domain/domain_selector_filter.h +++ b/trpc/naming/domain/domain_selector_filter.h @@ -25,9 +25,7 @@ namespace trpc { /// @brief DNS discovery filter class DomainSelectorFilter : public MessageClientFilter { public: - DomainSelectorFilter() { selector_flow_ = std::make_unique("domain", false, false); } - - ~DomainSelectorFilter() override {} + DomainSelectorFilter(); /// @brief initialization int Init() override { diff --git a/trpc/naming/domain/selector_domain.cc b/trpc/naming/domain/selector_domain.cc index 3d05a90c..782f2304 100644 --- a/trpc/naming/domain/selector_domain.cc +++ b/trpc/naming/domain/selector_domain.cc @@ -19,7 +19,9 @@ #include #include "trpc/common/config/trpc_config.h" +#include "trpc/naming/common/util/circuit_break/circuit_breaker_creator_factory.h" #include "trpc/naming/common/util/loadbalance/polling/polling_load_balance.h" +#include "trpc/naming/domain/domain_selector_conf_parser.h" #include "trpc/naming/load_balance_factory.h" #include "trpc/naming/selector_factory.h" #include "trpc/runtime/common/periphery_task_scheduler.h" @@ -32,13 +34,13 @@ namespace trpc { SelectorDomain::SelectorDomain(const LoadBalancePtr& load_balance) : default_load_balance_(load_balance) { TRPC_ASSERT(default_load_balance_); - dn_update_interval_ = 3600; } int SelectorDomain::Init() noexcept { - if (!trpc::TrpcConfig::GetInstance()->GetPluginConfig("selector", "domain", select_config_)) { + if (!trpc::TrpcConfig::GetInstance()->GetPluginConfig("selector", "domain", config_)) { TRPC_FMT_DEBUG("get selector domain config failed, use default value"); } + config_.Display(); // domain update duration is 30s dn_update_interval_ = 30 * 1000; @@ -77,7 +79,7 @@ int SelectorDomain::RefreshEndpointInfoByName(std::string dn_name, int dn_port, } // exclude or not ipv6 - if (select_config_.exclude_ipv6 && endpoints_exclude_ipv6.size() != 0) { + if (config_.exclude_ipv6 && endpoints_exclude_ipv6.size() != 0) { endpointInfo.endpoints.swap(endpoints_exclude_ipv6); } @@ -85,34 +87,39 @@ int SelectorDomain::RefreshEndpointInfoByName(std::string dn_name, int dn_port, } // Used to update EndpointInof to targets_map and load_balance caches -int SelectorDomain::RefreshDomainInfo(const SelectorInfo* info, SelectorDomain::DomainEndpointInfo& dn_endpointInfo) { - if (nullptr == info) { - TRPC_LOG_ERROR("Invalid parameter"); - return -1; +int SelectorDomain::RefreshDomainInfo(const std::string& service_name, + SelectorDomain::DomainEndpointInfo& dn_endpointInfo) { + std::unordered_set keys; + for (auto& endpoint : dn_endpointInfo.endpoints) { + keys.emplace(naming::CircuitBreakRecordKey(endpoint.host, endpoint.port)); } std::unique_lock uniq_lock(mutex_); // Generate a unique id for the node - auto iter = targets_map_.find(info->name); + auto iter = targets_map_.find(service_name); if (iter != targets_map_.end()) { // If the service name is in the cache, use the original id generator dn_endpointInfo.id_generator = std::move(iter->second.id_generator); + dn_endpointInfo.circuit_breaker = std::move(iter->second.circuit_breaker); + } else { + dn_endpointInfo.circuit_breaker = CreateCircuitBreaker(service_name); } + // add or remove the status statistics in the circuit breaker based on the node set information + if (dn_endpointInfo.circuit_breaker != nullptr) { + dn_endpointInfo.circuit_breaker->Reserve(keys); + } for (auto& item : dn_endpointInfo.endpoints) { std::string endpoint = item.host + ":" + std::to_string(item.port); item.id = dn_endpointInfo.id_generator.GetEndpointId(endpoint); } - targets_map_[info->name] = dn_endpointInfo; + targets_map_[service_name] = dn_endpointInfo; uniq_lock.unlock(); // update loadbalance cache - LoadBalanceInfo lb_info; - lb_info.info = info; - lb_info.endpoints = &dn_endpointInfo.endpoints; - default_load_balance_->Update(&lb_info); + DoUpdate(service_name, ""); return 0; } @@ -127,6 +134,76 @@ LoadBalance* SelectorDomain::GetLoadBalance(const std::string& name) { return default_load_balance_.get(); } +naming::CircuitBreakerPtr SelectorDomain::CreateCircuitBreaker(const std::string& service_name) { + if (config_.circuit_break_config.enable) { + auto& plugin_name = config_.circuit_break_config.plugin_name; + auto& plugin_config = config_.circuit_break_config.plugin_config; + return naming::CircuitBreakerCreatorFactory::GetInstance()->Create(plugin_name, &plugin_config, service_name); + } + + return nullptr; +} + +bool SelectorDomain::DoUpdate(const std::string& service_name, const std::string& load_balance_name) { + std::unique_lock wlock(mutex_); + auto it = targets_map_.find(service_name); + if (it == targets_map_.end()) { + return false; + } + auto& real_endpoints = it->second.endpoints; + auto& available_endpoints = it->second.available_endpoints; + + auto& circuit_breaker = it->second.circuit_breaker; + if (circuit_breaker != nullptr) { + available_endpoints.clear(); + available_endpoints.reserve(real_endpoints.size()); + auto endpoints_status = circuit_breaker->GetAllStatus(); + for (auto& endpoint : real_endpoints) { + naming::CircuitBreakRecordKey key(endpoint.host, endpoint.port); + auto iter = endpoints_status.find(key); + if (iter != endpoints_status.end()) { + if (iter->second != naming::CircuitBreakStatus::kOpen) { + available_endpoints.push_back(endpoint); + } + } + } + } + + // update route info + SelectorInfo select_info; + select_info.name = service_name; + LoadBalanceInfo load_balance_info; + load_balance_info.info = &select_info; + // no available nodes, then select from all nodes + if (!available_endpoints.empty()) { + load_balance_info.endpoints = &available_endpoints; + } else { + load_balance_info.endpoints = &real_endpoints; + } + GetLoadBalance(load_balance_name)->Update(&load_balance_info); + return true; +} + +bool SelectorDomain::CheckAndUpdate(const SelectorInfo* info) { + if (!config_.circuit_break_config.enable) { + return false; + } + + std::shared_lock rlock(mutex_); + auto it = targets_map_.find(info->name); + if (it == targets_map_.end()) { + return false; + } + auto& circuit_breaker = it->second.circuit_breaker; + if (circuit_breaker && circuit_breaker->StatusChanged(GetMilliSeconds())) { + // If the node status changes, update the nodes set in the load balancer + rlock.unlock(); + return DoUpdate(info->name, info->load_balance_name); + } + + return false; +} + // Get the routing interface of the node being called int SelectorDomain::Select(const SelectorInfo* info, TrpcEndpointInfo* endpoint) { if (nullptr == info || nullptr == endpoint) { @@ -134,6 +211,8 @@ int SelectorDomain::Select(const SelectorInfo* info, TrpcEndpointInfo* endpoint) return -1; } + CheckAndUpdate(info); + LoadBalanceResult load_balance_result; load_balance_result.info = info; auto lb = GetLoadBalance(info->load_balance_name); @@ -148,22 +227,12 @@ int SelectorDomain::Select(const SelectorInfo* info, TrpcEndpointInfo* endpoint) // Get a throttling interface asynchronously Future SelectorDomain::AsyncSelect(const SelectorInfo* info) { - if (nullptr == info) { - TRPC_LOG_ERROR("Selector info is null"); - return MakeExceptionFuture(CommonException("Selector info is null")); - } - - LoadBalanceResult load_balance_result; - load_balance_result.info = info; - auto lb = GetLoadBalance(info->load_balance_name); - if (lb->Next(load_balance_result)) { - std::string error_str = "Do load balance of " + info->name + " failed"; - TRPC_LOG_ERROR(error_str); - return MakeExceptionFuture(CommonException(error_str.c_str())); + TrpcEndpointInfo endpoint; + if (Select(info, &endpoint) == 0) { + return MakeReadyFuture(std::move(endpoint)); } - TrpcEndpointInfo endpoint = std::any_cast(load_balance_result.result); - return MakeReadyFuture(std::move(endpoint)); + return MakeExceptionFuture(CommonException("AsyncSelect failed")); } // An interface for fetching node routing information in bulk by policy @@ -173,6 +242,8 @@ int SelectorDomain::SelectBatch(const SelectorInfo* info, std::vectorname; std::shared_lock lock(mutex_); auto iter = targets_map_.find(callee); @@ -182,7 +253,11 @@ int SelectorDomain::SelectBatch(const SelectorInfo* info, std::vectorpolicy == SelectorPolicy::MULTIPLE) { - SelectMultiple(iter->second.endpoints, endpoints, info->select_num); + if (!iter->second.available_endpoints.empty()) { + SelectMultiple(iter->second.available_endpoints, endpoints, info->select_num); + } else { + SelectMultiple(iter->second.endpoints, endpoints, info->select_num); + } } else { *endpoints = iter->second.endpoints; } @@ -192,37 +267,39 @@ int SelectorDomain::SelectBatch(const SelectorInfo* info, std::vector> SelectorDomain::AsyncSelectBatch(const SelectorInfo* info) { - if (nullptr == info) { - TRPC_LOG_ERROR("Selector info is empty"); - return MakeExceptionFuture>(CommonException("Selector info is empty")); + std::vector endpoints; + if (SelectBatch(info, &endpoints) == 0) { + return MakeReadyFuture>(std::move(endpoints)); } - std::string callee = info->name; - std::shared_lock lock(mutex_); - auto iter = targets_map_.find(callee); - if (iter == targets_map_.end()) { - std::string error_str = "router info of " + callee + " no found"; - TRPC_LOG_ERROR(error_str); - return MakeExceptionFuture>(CommonException(error_str.c_str())); - } + return MakeExceptionFuture>(CommonException("AsyncSelectBatch fail")); +} - std::vector endpoints; - if (info->policy == SelectorPolicy::MULTIPLE) { - SelectMultiple(iter->second.endpoints, &endpoints, info->select_num); - } else { - endpoints = iter->second.endpoints; +bool SelectorDomain::IsSuccess(int framework_result) { + if (framework_result == TrpcRetCode::TRPC_INVOKE_SUCCESS) { + return true; } - return MakeReadyFuture>(std::move(endpoints)); + return circuitbreak_whitelist_.Contains(framework_result); } // Call the result reporting interface int SelectorDomain::ReportInvokeResult(const InvokeResult* result) { - if (nullptr == result) { - TRPC_LOG_ERROR("Invalid parameter: invoke result is empty"); + if (nullptr == result || !config_.circuit_break_config.enable) { return -1; } + std::shared_lock lock(mutex_); + auto it = targets_map_.find(result->name); + if (it != targets_map_.end()) { + auto& circuit_breaker = it->second.circuit_breaker; + if (circuit_breaker != nullptr) { + naming::CircuitBreakRecordKey key(result->context->GetIp(), result->context->GetPort()); + bool success = IsSuccess(result->framework_result); + circuit_breaker->AddRecordData(key, GetMilliSeconds(), success); + } + } + return 0; } @@ -232,7 +309,6 @@ int SelectorDomain::SetEndpoints(const RouterInfo* info) { return -1; } - std::string callee_name = info->name; // Initialize hostname information, only support passing one hostname if (info->info.size() != 1) { TRPC_LOG_ERROR("Router info is invalid"); @@ -248,9 +324,7 @@ int SelectorDomain::SetEndpoints(const RouterInfo* info) { return -1; } - SelectorInfo selector_info; - selector_info.name = info->name; - RefreshDomainInfo(&selector_info, endpointInfo); + RefreshDomainInfo(info->name, endpointInfo); return 0; } @@ -274,15 +348,13 @@ int SelectorDomain::UpdateEndpointInfo() { int targets_count = targets_map_.size(); int success_count = 0; - for (auto item : targets_map) { + for (auto& item : targets_map) { // Update node information SelectorDomain::DomainEndpointInfo endpointInfo; if (!RefreshEndpointInfoByName(item.second.domain_name, item.second.port, endpointInfo)) { TRPC_LOG_DEBUG("Update endpointInfo of " << item.first << ":" << item.second.domain_name << " success"); // Update node info to cache - SelectorInfo selector_info; - selector_info.name = item.first; - RefreshDomainInfo(&selector_info, endpointInfo); + RefreshDomainInfo(item.first, endpointInfo); success_count++; } } @@ -311,4 +383,9 @@ void SelectorDomain::Stop() noexcept { } } +bool SelectorDomain::SetCircuitBreakWhiteList(const std::vector& framework_retcodes) { + circuitbreak_whitelist_.SetCircuitBreakWhiteList(framework_retcodes); + return true; +} + } // namespace trpc diff --git a/trpc/naming/domain/selector_domain.h b/trpc/naming/domain/selector_domain.h index 6509bf5d..109a398a 100644 --- a/trpc/naming/domain/selector_domain.h +++ b/trpc/naming/domain/selector_domain.h @@ -18,10 +18,11 @@ #include #include -#include "trpc/common/config/domain_naming_conf.h" -#include "trpc/common/config/domain_naming_conf_parser.h" #include "trpc/common/plugin.h" +#include "trpc/naming/common/util/circuit_break/circuit_break_whitelist.h" +#include "trpc/naming/common/util/circuit_break/circuit_breaker.h" #include "trpc/naming/common/util/utils_help.h" +#include "trpc/naming/domain/domain_selector_conf.h" #include "trpc/naming/load_balance.h" #include "trpc/naming/selector.h" @@ -66,6 +67,9 @@ class SelectorDomain : public Selector { /// @brief Interface for setting the routing information of the called service int SetEndpoints(const RouterInfo* info) override; + /// @brief Sets the whitelist of framework error codes for circuit breaking reporting + bool SetCircuitBreakWhiteList(const std::vector& framework_retcodes) override; + private: struct DomainEndpointInfo { // Domain name of the called service @@ -74,15 +78,19 @@ class SelectorDomain : public Selector { int port; // IP/port information of the called service std::vector endpoints; + // available endpoints + std::vector available_endpoints; // Node ID generator EndpointIdGenerator id_generator; + // circuit breaker + naming::CircuitBreakerPtr circuit_breaker{nullptr}; }; // Update the IP information corresponding to the domain name int RefreshEndpointInfoByName(std::string dn_name, int dn_port, SelectorDomain::DomainEndpointInfo& endpointInfo); // Update EndpointInfo to targets_map and load_balance cache - int RefreshDomainInfo(const SelectorInfo* info, DomainEndpointInfo& dn_endpointInfo); + int RefreshDomainInfo(const std::string& service_name, DomainEndpointInfo& dn_endpointInfo); // Determine whether to refresh routing information bool NeedUpdate(); @@ -93,6 +101,17 @@ class SelectorDomain : public Selector { // Get the loadbalance plugin by name LoadBalance* GetLoadBalance(const std::string& name); + // Create circuit breaker by service name + naming::CircuitBreakerPtr CreateCircuitBreaker(const std::string& service_name); + + // Update endpoints when circuit break state changes + bool DoUpdate(const std::string& service_name, const std::string& load_balance_name); + + // Check if there is any change in the circuit breaker status of the node, and if there is, update the list of available nodes + bool CheckAndUpdate(const SelectorInfo* info); + + bool IsSuccess(int framework_result); + private: // Default load balancer name static const char default_load_balance_name_[]; @@ -109,10 +128,12 @@ class SelectorDomain : public Selector { uint64_t last_update_time_; // Business configuration items specified in the yaml file - naming::DomainSelectorConfig select_config_; + naming::DomainSelectorConfig config_; /// Task id of periodically updating node tasks uint64_t task_id_{0}; + + naming::CircuitBreakWhiteList circuitbreak_whitelist_; }; using SelectorDomainPtr = RefPtr; diff --git a/trpc/naming/domain/selector_domain_test.cc b/trpc/naming/domain/selector_domain_test.cc index 434e161a..eb3b4168 100644 --- a/trpc/naming/domain/selector_domain_test.cc +++ b/trpc/naming/domain/selector_domain_test.cc @@ -21,21 +21,63 @@ #include "trpc/codec/trpc/trpc_client_codec.h" #include "trpc/common/config/trpc_config.h" +#include "trpc/naming/common/util/circuit_break/circuit_breaker_creator_factory.h" +#include "trpc/naming/common/util/circuit_break/default_circuit_breaker.h" +#include "trpc/naming/common/util/circuit_break/default_circuit_breaker_config.h" #include "trpc/naming/common/util/loadbalance/polling/polling_load_balance.h" +#include "trpc/naming/domain/domain_selector_conf_parser.h" #include "trpc/runtime/common/periphery_task_scheduler.h" +#include "trpc/util/domain_util.h" namespace trpc { -TEST(SelectorDomainTest, select_test) { - PeripheryTaskScheduler::GetInstance()->Init(); - PeripheryTaskScheduler::GetInstance()->Start(); +class SelectorDomainTest : public ::testing::Test { + protected: + static void SetUpTestCase() { + // register circuitbreak creator + naming::CircuitBreakerCreatorFactory::GetInstance()->Register( + "default", [](const YAML::Node* plugin_config, const std::string& service_name) { + naming::DefaultCircuitBreakerConfig config; + if (plugin_config) { + YAML::convert::decode(*plugin_config, config); + } + return std::make_shared(config, service_name); + }); + + PeripheryTaskScheduler::GetInstance()->Init(); + PeripheryTaskScheduler::GetInstance()->Start(); + } - LoadBalancePtr polling = MakeRefCounted(); - SelectorDomainPtr ptr = MakeRefCounted(polling); - ptr->Init(); - ptr->Start(); - EXPECT_TRUE(ptr->Version() != ""); + static void TearDownTestCase() { + PeripheryTaskScheduler::GetInstance()->Stop(); + PeripheryTaskScheduler::GetInstance()->Join(); + } + + void SetUp() override { + int ret = TrpcConfig::GetInstance()->Init("./trpc/naming/testing/domain_selector_test.yaml"); + ASSERT_TRUE(ret == 0); + naming::DomainSelectorConfig config; + trpc::TrpcConfig::GetInstance()->GetPluginConfig("selector", "domain", config); + YAML::convert::decode(config.circuit_break_config.plugin_config, + circuit_break_config_); + + LoadBalancePtr polling = MakeRefCounted(); + domain_selector_ = std::make_shared(polling); + domain_selector_->Init(); + domain_selector_->Start(); + } + + void TearDown() override { + domain_selector_->Stop(); + domain_selector_->Destroy(); + } + protected: + std::shared_ptr domain_selector_; + naming::DefaultCircuitBreakerConfig circuit_break_config_; +}; + +TEST_F(SelectorDomainTest, select_test) { // set endpoints of callee RouterInfo info; info.name = "test_service"; @@ -44,7 +86,7 @@ TEST(SelectorDomainTest, select_test) { endpoint1.host = "localhost"; endpoint1.port = 1001; endpoints_info.push_back(endpoint1); - ptr->SetEndpoints(&info); + domain_selector_->SetEndpoints(&info); auto context = trpc::MakeRefCounted(); SelectorInfo select_info; @@ -52,7 +94,7 @@ TEST(SelectorDomainTest, select_test) { select_info.context = context; TrpcEndpointInfo endpoint; - ptr->Select(&select_info, &endpoint); + domain_selector_->Select(&select_info, &endpoint); if (endpoint.is_ipv6) { EXPECT_TRUE(endpoint.host == "::1"); } else { @@ -61,7 +103,7 @@ TEST(SelectorDomainTest, select_test) { EXPECT_TRUE(endpoint.port == 1001); EXPECT_TRUE(endpoint.id != kInvalidEndpointId); - ptr->Select(&select_info, &endpoint); + domain_selector_->Select(&select_info, &endpoint); if (endpoint.is_ipv6) { EXPECT_TRUE(endpoint.host == "::1"); } else { @@ -77,14 +119,16 @@ TEST(SelectorDomainTest, select_test) { auto trpc_codec = std::make_shared(); result.context = MakeRefCounted(trpc_codec); result.context->SetCallerName("test_service"); - - result.context->SetAddr("192.168.0.1", 1001); - int ret = ptr->ReportInvokeResult(&result); + ExtendNodeAddr addr; + addr.addr.ip = "127.0.0.1"; + addr.addr.port = 1001; + result.context->SetRequestAddrByNaming(std::move(addr)); + int ret = domain_selector_->ReportInvokeResult(&result); EXPECT_EQ(0, ret); - EXPECT_TRUE(ptr->ReportInvokeResult(nullptr) != 0); + EXPECT_TRUE(domain_selector_->ReportInvokeResult(nullptr) != 0); - ptr->AsyncSelect(&select_info).Then([](Future&& fut) { + domain_selector_->AsyncSelect(&select_info).Then([](Future&& fut) { TrpcEndpointInfo endpoint = fut.GetValue0(); if (endpoint.is_ipv6) { EXPECT_TRUE(endpoint.host == "::1"); @@ -97,7 +141,7 @@ TEST(SelectorDomainTest, select_test) { }); std::vector endpoints; - ptr->SelectBatch(&select_info, &endpoints); + domain_selector_->SelectBatch(&select_info, &endpoints); for (const auto& ref : endpoints) { if (ref.is_ipv6) { EXPECT_TRUE(ref.host == "::1"); @@ -110,33 +154,20 @@ TEST(SelectorDomainTest, select_test) { endpoints.clear(); select_info.policy = SelectorPolicy::MULTIPLE; - ret = ptr->SelectBatch(&select_info, &endpoints); + ret = domain_selector_->SelectBatch(&select_info, &endpoints); EXPECT_EQ(0, ret); EXPECT_TRUE(endpoints.size() > 0); select_info.name = "test_service1"; - ret = ptr->Select(&select_info, &endpoint); + ret = domain_selector_->Select(&select_info, &endpoint); EXPECT_NE(0, ret); - ptr->SelectBatch(&select_info, &endpoints); + domain_selector_->SelectBatch(&select_info, &endpoints); EXPECT_NE(0, ret); - - ptr->Stop(); - ptr->Destroy(); - - PeripheryTaskScheduler::GetInstance()->Stop(); - PeripheryTaskScheduler::GetInstance()->Join(); } -TEST(SelectorDomainTest, select_exception_test) { - PeripheryTaskScheduler::GetInstance()->Init(); - PeripheryTaskScheduler::GetInstance()->Start(); - - LoadBalancePtr polling = MakeRefCounted(); - SelectorDomainPtr ptr = MakeRefCounted(polling); - ptr->Init(); - ptr->Start(); - EXPECT_EQ(-1, ptr->SetEndpoints(nullptr)); +TEST_F(SelectorDomainTest, select_exception_test) { + EXPECT_EQ(-1, domain_selector_->SetEndpoints(nullptr)); // set endpoints of callee RouterInfo info; @@ -150,7 +181,7 @@ TEST(SelectorDomainTest, select_exception_test) { endpoints_info.push_back(endpoint1); endpoints_info.push_back(endpoint2); - int ret = ptr->SetEndpoints(&info); + int ret = domain_selector_->SetEndpoints(&info); EXPECT_NE(0, ret); auto context = trpc::MakeRefCounted(); @@ -159,41 +190,27 @@ TEST(SelectorDomainTest, select_exception_test) { select_info.context = context; TrpcEndpointInfo endpoint; - ret = ptr->Select(&select_info, &endpoint); + ret = domain_selector_->Select(&select_info, &endpoint); EXPECT_NE(0, ret); - ret = ptr->Select(nullptr, &endpoint); + ret = domain_selector_->Select(nullptr, &endpoint); EXPECT_NE(0, ret); - ptr->AsyncSelect(nullptr).Then([](Future&& fut) { + domain_selector_->AsyncSelect(nullptr).Then([](Future&& fut) { EXPECT_TRUE(fut.IsFailed()); return MakeReadyFuture<>(); }); - ret = ptr->SelectBatch(nullptr, nullptr); + ret = domain_selector_->SelectBatch(nullptr, nullptr); EXPECT_NE(0, ret); - ptr->AsyncSelectBatch(nullptr).Then([](Future>&& fut) { + domain_selector_->AsyncSelectBatch(nullptr).Then([](Future>&& fut) { EXPECT_TRUE(fut.IsFailed()); return MakeReadyFuture<>(); }); - - ptr->Stop(); - ptr->Destroy(); - - PeripheryTaskScheduler::GetInstance()->Stop(); - PeripheryTaskScheduler::GetInstance()->Join(); } -TEST(SelectorDomainTest, ayncselect_test) { - PeripheryTaskScheduler::GetInstance()->Init(); - PeripheryTaskScheduler::GetInstance()->Start(); - - LoadBalancePtr polling = MakeRefCounted(); - SelectorDomainPtr ptr = MakeRefCounted(polling); - ptr->Init(); - ptr->Start(); - +TEST_F(SelectorDomainTest, ayncselect_test) { // set endpoints of callee RouterInfo info; info.name = "test_service"; @@ -202,14 +219,14 @@ TEST(SelectorDomainTest, ayncselect_test) { endpoint1.host = "localhost"; endpoint1.port = 1001; endpoints_info.push_back(endpoint1); - ptr->SetEndpoints(&info); + domain_selector_->SetEndpoints(&info); auto context = trpc::MakeRefCounted(); SelectorInfo select_info; select_info.name = "test_service"; select_info.context = context; - ptr->AsyncSelect(&select_info).Then([](Future&& fut) { + domain_selector_->AsyncSelect(&select_info).Then([](Future&& fut) { TrpcEndpointInfo endpoint = fut.GetValue0(); if (endpoint.is_ipv6) { EXPECT_TRUE(endpoint.host == "::1"); @@ -220,7 +237,7 @@ TEST(SelectorDomainTest, ayncselect_test) { return MakeReadyFuture<>(); }); - ptr->AsyncSelect(&select_info).Then([](Future&& fut) { + domain_selector_->AsyncSelect(&select_info).Then([](Future&& fut) { TrpcEndpointInfo endpoint = fut.GetValue0(); if (endpoint.is_ipv6) { EXPECT_TRUE(endpoint.host == "::1"); @@ -231,7 +248,7 @@ TEST(SelectorDomainTest, ayncselect_test) { return MakeReadyFuture<>(); }); - ptr->AsyncSelectBatch(&select_info).Then([](Future>&& fut) { + domain_selector_->AsyncSelectBatch(&select_info).Then([](Future>&& fut) { EXPECT_TRUE(fut.IsReady()); std::vector endpoints = fut.GetValue0(); for (const auto& ref : endpoints) { @@ -246,7 +263,7 @@ TEST(SelectorDomainTest, ayncselect_test) { }); select_info.policy = SelectorPolicy::MULTIPLE; - ptr->AsyncSelectBatch(&select_info).Then([](Future>&& fut) { + domain_selector_->AsyncSelectBatch(&select_info).Then([](Future>&& fut) { EXPECT_TRUE(fut.IsReady()); std::vector endpoints = fut.GetValue0(); EXPECT_TRUE(endpoints.size() > 0); @@ -254,33 +271,18 @@ TEST(SelectorDomainTest, ayncselect_test) { }); select_info.name = "test_service1"; - ptr->AsyncSelect(&select_info).Then([](Future&& fut) { + domain_selector_->AsyncSelect(&select_info).Then([](Future&& fut) { EXPECT_TRUE(fut.IsFailed()); return trpc::MakeReadyFuture<>(); }); - ptr->AsyncSelectBatch(&select_info).Then([](Future>&& fut) { + domain_selector_->AsyncSelectBatch(&select_info).Then([](Future>&& fut) { EXPECT_TRUE(fut.IsFailed()); return trpc::MakeReadyFuture<>(); }); - - ptr->Stop(); - ptr->Destroy(); - - PeripheryTaskScheduler::GetInstance()->Stop(); - PeripheryTaskScheduler::GetInstance()->Join(); } -TEST(SelectorDomainTest, UpdateEndpointInfo) { - PeripheryTaskScheduler::GetInstance()->Init(); - PeripheryTaskScheduler::GetInstance()->Start(); - - LoadBalancePtr polling = MakeRefCounted(); - SelectorDomainPtr ptr = MakeRefCounted(polling); - ptr->Init(); - ptr->Start(); - EXPECT_TRUE(ptr->Version() != ""); - +TEST_F(SelectorDomainTest, UpdateEndpointInfo) { // set endpoints of callee RouterInfo info; info.name = "test_service"; @@ -289,7 +291,7 @@ TEST(SelectorDomainTest, UpdateEndpointInfo) { endpoint1.host = "localhost"; endpoint1.port = 1001; endpoints_info.push_back(endpoint1); - ptr->SetEndpoints(&info); + domain_selector_->SetEndpoints(&info); // Internal tasks perform node updates once in 200ms std::this_thread::sleep_for(std::chrono::milliseconds(500)); @@ -299,20 +301,11 @@ TEST(SelectorDomainTest, UpdateEndpointInfo) { select_info.context = context; TrpcEndpointInfo endpoint; - ptr->Select(&select_info, &endpoint); + domain_selector_->Select(&select_info, &endpoint); EXPECT_TRUE(endpoint.port == 1001); - - ptr->Stop(); - ptr->Destroy(); - - PeripheryTaskScheduler::GetInstance()->Stop(); - PeripheryTaskScheduler::GetInstance()->Join(); } -TEST(SelectorDomainTest, exclude_ipv6_test) { - PeripheryTaskScheduler::GetInstance()->Init(); - PeripheryTaskScheduler::GetInstance()->Start(); - +TEST_F(SelectorDomainTest, exclude_ipv6_test) { auto ret = trpc::TrpcConfig::GetInstance()->Init("./trpc/naming/testing/domain_test.yaml"); ASSERT_EQ(0, ret); @@ -358,9 +351,79 @@ TEST(SelectorDomainTest, exclude_ipv6_test) { ptr->Stop(); ptr->Destroy(); +} + +// Report failure for 10 consecutive times, expecting switch to open state, and not accessed within the +// 'sleep_window_ms' and recovery occurs when the success criteria are met after reaching the half-open state. +TEST_F(SelectorDomainTest, circuitbreak_when_continuous_fail) { + std::string domain = "www.qq.com"; + std::vector addrs; + trpc::util::GetAddrFromDomain(domain, addrs); + // sort ip, Duplicate removal + std::set ip_list_set(addrs.begin(), addrs.end()); + if (ip_list_set.size() < 2) { + // If the number of domain nodes is less than 2, do not execute this test case. + return; + } + + // set endpoints of callee + RouterInfo info; + info.name = "test_service"; + std::vector& endpoints_info = info.info; + TrpcEndpointInfo endpoint1; + endpoint1.host = "www.qq.com"; + endpoint1.port = 1001; + endpoints_info.push_back(endpoint1); + domain_selector_->SetEndpoints(&info); + + SelectorInfo select_info; + select_info.name = "test_service"; + TrpcEndpointInfo endpoint; + ASSERT_EQ(domain_selector_->Select(&select_info, &endpoint), 0); + auto error_endpoint_ip = endpoint.host; + + InvokeResult result; + result.name = "test_service"; + result.framework_result = ::trpc::TrpcRetCode::TRPC_CLIENT_NETWORK_ERR; + result.cost_time = 100; + result.context = MakeRefCounted(); + ExtendNodeAddr addr; + addr.addr.ip = endpoint.host; + addr.addr.port = endpoint.port; + result.context->SetRequestAddrByNaming(std::move(addr)); + for (uint32_t i = 0; i < circuit_break_config_.continuous_error_threshold; i++) { + domain_selector_->ReportInvokeResult(&result); + } + + uint32_t call_times = 100; + for (uint32_t i = 0; i < call_times; i++) { + domain_selector_->Select(&select_info, &endpoint); + ASSERT_NE(endpoint.host, error_endpoint_ip); + } + + std::this_thread::sleep_for(std::chrono::milliseconds(circuit_break_config_.sleep_window_ms)); + // If the number of successful calls meets the requirements, it will be switch from half-open state to close state. + for (uint32_t i = 0, j = 0; i < circuit_break_config_.request_count_after_half_open && j < call_times; j++) { + domain_selector_->Select(&select_info, &endpoint); + result.framework_result = ::trpc::TrpcRetCode::TRPC_INVOKE_SUCCESS; + domain_selector_->ReportInvokeResult(&result); + if (endpoint.host == error_endpoint_ip) { + i++; + } + } + + // check the probability of the node being called + int call_count = 0; + for (uint32_t i = 0; i < call_times; i++) { + int ret = domain_selector_->Select(&select_info, &endpoint); + ASSERT_EQ(ret, 0); + if (endpoint.host == error_endpoint_ip) { + call_count++; + } + } - PeripheryTaskScheduler::GetInstance()->Stop(); - PeripheryTaskScheduler::GetInstance()->Join(); + float call_count_rate = call_count * 1.0 / call_times; + ASSERT_TRUE(call_count_rate > 0.2 && call_count_rate < 0.8) << call_count_rate << std::endl; } } // namespace trpc diff --git a/trpc/naming/testing/BUILD b/trpc/naming/testing/BUILD index 78602b06..ac75b703 100644 --- a/trpc/naming/testing/BUILD +++ b/trpc/naming/testing/BUILD @@ -7,8 +7,10 @@ package( ) exports_files([ - "test.yaml", + "direct_selector_test.yaml", "domain_test.yaml", + "domain_selector_test.yaml", + "test.yaml", "test_load.yaml", "test_load.toml", ]) diff --git a/trpc/naming/testing/direct_selector_test.yaml b/trpc/naming/testing/direct_selector_test.yaml new file mode 100644 index 00000000..12319105 --- /dev/null +++ b/trpc/naming/testing/direct_selector_test.yaml @@ -0,0 +1,14 @@ +plugins: + selector: + direct: + circuitBreaker: + default: + enable: true + statWindow: 10s + bucketsNum: 5 + sleepWindow: 5s + requestVolumeThreshold: 5 + errorRateThreshold: 0.4 + continuousErrorThreshold: 5 + requestCountAfterHalfOpen: 5 + successCountAfterHalfOpen: 4 \ No newline at end of file diff --git a/trpc/naming/testing/domain_selector_test.yaml b/trpc/naming/testing/domain_selector_test.yaml new file mode 100644 index 00000000..2fc7691d --- /dev/null +++ b/trpc/naming/testing/domain_selector_test.yaml @@ -0,0 +1,14 @@ +plugins: + selector: + domain: + circuitBreaker: + default: + enable: true + statWindow: 10s + bucketsNum: 5 + sleepWindow: 5s + requestVolumeThreshold: 5 + errorRateThreshold: 0.4 + continuousErrorThreshold: 5 + requestCountAfterHalfOpen: 5 + successCountAfterHalfOpen: 4 diff --git a/trpc/naming/trpc_naming_registry.cc b/trpc/naming/trpc_naming_registry.cc index 725686d9..c768e195 100644 --- a/trpc/naming/trpc_naming_registry.cc +++ b/trpc/naming/trpc_naming_registry.cc @@ -16,6 +16,8 @@ #include "trpc/common/config/trpc_config.h" #include "trpc/filter/filter.h" #include "trpc/filter/filter_manager.h" +#include "trpc/naming/common/util/circuit_break/circuit_breaker_creator_factory.h" +#include "trpc/naming/common/util/circuit_break/default_circuit_breaker.h" #include "trpc/naming/common/util/loadbalance/polling/polling_load_balance.h" #include "trpc/naming/direct/direct_selector_filter.h" #include "trpc/naming/direct/selector_direct.h" @@ -33,6 +35,16 @@ namespace trpc { // Initialize the Selector inside trpc: currently domain and direct plugins. void RegisterInnerSelector() { + // register circuitbreak creator + naming::CircuitBreakerCreatorFactory::GetInstance()->Register( + naming::DefaultCircuitBreaker::kName, [](const YAML::Node* plugin_config, const std::string& service_name) { + naming::DefaultCircuitBreakerConfig config; + if (plugin_config) { + YAML::convert::decode(*plugin_config, config); + } + return std::make_shared(config, service_name); + }); + LoadBalancePtr polling_load_balance = trpc::LoadBalanceFactory::GetInstance()->Get(kPollingLoadBalance); if (polling_load_balance == nullptr) { // Register the default load balancer