diff --git a/controllers/tests/kafkacluster_controller_envoy_test.go b/controllers/tests/kafkacluster_controller_envoy_test.go index 3cbc4068a..417f6973b 100644 --- a/controllers/tests/kafkacluster_controller_envoy_test.go +++ b/controllers/tests/kafkacluster_controller_envoy_test.go @@ -92,7 +92,18 @@ func expectEnvoyConfigMap(kafkaCluster *v1beta1.KafkaCluster, eListenerTemplate portValue: 9901 staticResources: clusters: - - connectTimeout: 1s + - circuitBreakers: + thresholds: + - maxConnections: 1000000000 + maxPendingRequests: 1000000000 + maxRequests: 1000000000 + maxRetries: 1000000000 + - maxConnections: 1000000000 + maxPendingRequests: 1000000000 + maxRequests: 1000000000 + maxRetries: 1000000000 + priority: HIGH + connectTimeout: 1s loadAssignment: clusterName: broker-0 endpoints: @@ -104,7 +115,18 @@ staticResources: portValue: 9094 name: broker-0 type: STRICT_DNS - - connectTimeout: 1s + - circuitBreakers: + thresholds: + - maxConnections: 1000000000 + maxPendingRequests: 1000000000 + maxRequests: 1000000000 + maxRetries: 1000000000 + - maxConnections: 1000000000 + maxPendingRequests: 1000000000 + maxRequests: 1000000000 + maxRetries: 1000000000 + priority: HIGH + connectTimeout: 1s loadAssignment: clusterName: broker-1 endpoints: @@ -116,7 +138,18 @@ staticResources: portValue: 9094 name: broker-1 type: STRICT_DNS - - connectTimeout: 1s + - circuitBreakers: + thresholds: + - maxConnections: 1000000000 + maxPendingRequests: 1000000000 + maxRequests: 1000000000 + maxRetries: 1000000000 + - maxConnections: 1000000000 + maxPendingRequests: 1000000000 + maxRequests: 1000000000 + maxRetries: 1000000000 + priority: HIGH + connectTimeout: 1s loadAssignment: clusterName: broker-2 endpoints: @@ -128,7 +161,18 @@ staticResources: portValue: 9094 name: broker-2 type: STRICT_DNS - - connectTimeout: 1s + - circuitBreakers: + thresholds: + - maxConnections: 1000000000 + maxPendingRequests: 1000000000 + maxRequests: 1000000000 + maxRetries: 1000000000 + - maxConnections: 1000000000 + maxPendingRequests: 1000000000 + maxRequests: 1000000000 + maxRetries: 1000000000 + priority: HIGH + connectTimeout: 1s loadAssignment: clusterName: all-brokers endpoints: @@ -328,7 +372,18 @@ func expectEnvoyWithConfigAz1(kafkaCluster *v1beta1.KafkaCluster) { portValue: 9901 staticResources: clusters: - - connectTimeout: 1s + - circuitBreakers: + thresholds: + - maxConnections: 1000000000 + maxPendingRequests: 1000000000 + maxRequests: 1000000000 + maxRetries: 1000000000 + - maxConnections: 1000000000 + maxPendingRequests: 1000000000 + maxRequests: 1000000000 + maxRetries: 1000000000 + priority: HIGH + connectTimeout: 1s loadAssignment: clusterName: broker-0 endpoints: @@ -340,7 +395,18 @@ staticResources: portValue: 9094 name: broker-0 type: STRICT_DNS - - connectTimeout: 1s + - circuitBreakers: + thresholds: + - maxConnections: 1000000000 + maxPendingRequests: 1000000000 + maxRequests: 1000000000 + maxRetries: 1000000000 + - maxConnections: 1000000000 + maxPendingRequests: 1000000000 + maxRequests: 1000000000 + maxRetries: 1000000000 + priority: HIGH + connectTimeout: 1s loadAssignment: clusterName: all-brokers endpoints: @@ -453,7 +519,18 @@ func expectEnvoyWithConfigAz2(kafkaCluster *v1beta1.KafkaCluster) { portValue: 9901 staticResources: clusters: - - connectTimeout: 1s + - circuitBreakers: + thresholds: + - maxConnections: 1000000000 + maxPendingRequests: 1000000000 + maxRequests: 1000000000 + maxRetries: 1000000000 + - maxConnections: 1000000000 + maxPendingRequests: 1000000000 + maxRequests: 1000000000 + maxRetries: 1000000000 + priority: HIGH + connectTimeout: 1s loadAssignment: clusterName: broker-1 endpoints: @@ -465,7 +542,18 @@ staticResources: portValue: 9094 name: broker-1 type: STRICT_DNS - - connectTimeout: 1s + - circuitBreakers: + thresholds: + - maxConnections: 1000000000 + maxPendingRequests: 1000000000 + maxRequests: 1000000000 + maxRetries: 1000000000 + - maxConnections: 1000000000 + maxPendingRequests: 1000000000 + maxRequests: 1000000000 + maxRetries: 1000000000 + priority: HIGH + connectTimeout: 1s loadAssignment: clusterName: broker-2 endpoints: @@ -477,7 +565,18 @@ staticResources: portValue: 9094 name: broker-2 type: STRICT_DNS - - connectTimeout: 1s + - circuitBreakers: + thresholds: + - maxConnections: 1000000000 + maxPendingRequests: 1000000000 + maxRequests: 1000000000 + maxRetries: 1000000000 + - maxConnections: 1000000000 + maxPendingRequests: 1000000000 + maxRequests: 1000000000 + maxRetries: 1000000000 + priority: HIGH + connectTimeout: 1s loadAssignment: clusterName: all-brokers endpoints: diff --git a/pkg/resources/envoy/configmap.go b/pkg/resources/envoy/configmap.go index a6280045c..900d20a91 100644 --- a/pkg/resources/envoy/configmap.go +++ b/pkg/resources/envoy/configmap.go @@ -29,6 +29,7 @@ import ( "github.com/golang/protobuf/ptypes" "github.com/golang/protobuf/ptypes/duration" + "github.com/golang/protobuf/ptypes/wrappers" "google.golang.org/protobuf/encoding/protojson" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" @@ -144,6 +145,26 @@ func GenerateEnvoyConfig(kc *v1beta1.KafkaCluster, elistener v1beta1.ExternalLis ConnectTimeout: &duration.Duration{Seconds: 1}, ClusterDiscoveryType: &envoycluster.Cluster_Type{Type: envoycluster.Cluster_STRICT_DNS}, LbPolicy: envoycluster.Cluster_ROUND_ROBIN, + // disable circuit breakingL: + // https://www.envoyproxy.io/docs/envoy/latest/faq/load_balancing/disable_circuit_breaking + CircuitBreakers: &envoycluster.CircuitBreakers{ + Thresholds: []*envoycluster.CircuitBreakers_Thresholds{ + { + Priority: envoycore.RoutingPriority_DEFAULT, + MaxConnections: &wrappers.UInt32Value{Value: 1_000_000_000}, + MaxPendingRequests: &wrappers.UInt32Value{Value: 1_000_000_000}, + MaxRequests: &wrappers.UInt32Value{Value: 1_000_000_000}, + MaxRetries: &wrappers.UInt32Value{Value: 1_000_000_000}, + }, + { + Priority: envoycore.RoutingPriority_HIGH, + MaxConnections: &wrappers.UInt32Value{Value: 1_000_000_000}, + MaxPendingRequests: &wrappers.UInt32Value{Value: 1_000_000_000}, + MaxRequests: &wrappers.UInt32Value{Value: 1_000_000_000}, + MaxRetries: &wrappers.UInt32Value{Value: 1_000_000_000}, + }, + }, + }, LoadAssignment: &envoyendpoint.ClusterLoadAssignment{ ClusterName: fmt.Sprintf("broker-%d", brokerId), Endpoints: []*envoyendpoint.LocalityLbEndpoints{{ @@ -213,6 +234,26 @@ func GenerateEnvoyConfig(kc *v1beta1.KafkaCluster, elistener v1beta1.ExternalLis ConnectTimeout: &duration.Duration{Seconds: 1}, ClusterDiscoveryType: &envoycluster.Cluster_Type{Type: envoycluster.Cluster_STRICT_DNS}, LbPolicy: envoycluster.Cluster_ROUND_ROBIN, + // disable circuit breakingL: + // https://www.envoyproxy.io/docs/envoy/latest/faq/load_balancing/disable_circuit_breaking + CircuitBreakers: &envoycluster.CircuitBreakers{ + Thresholds: []*envoycluster.CircuitBreakers_Thresholds{ + { + Priority: envoycore.RoutingPriority_DEFAULT, + MaxConnections: &wrappers.UInt32Value{Value: 1_000_000_000}, + MaxPendingRequests: &wrappers.UInt32Value{Value: 1_000_000_000}, + MaxRequests: &wrappers.UInt32Value{Value: 1_000_000_000}, + MaxRetries: &wrappers.UInt32Value{Value: 1_000_000_000}, + }, + { + Priority: envoycore.RoutingPriority_HIGH, + MaxConnections: &wrappers.UInt32Value{Value: 1_000_000_000}, + MaxPendingRequests: &wrappers.UInt32Value{Value: 1_000_000_000}, + MaxRequests: &wrappers.UInt32Value{Value: 1_000_000_000}, + MaxRetries: &wrappers.UInt32Value{Value: 1_000_000_000}, + }, + }, + }, LoadAssignment: &envoyendpoint.ClusterLoadAssignment{ ClusterName: allBrokerEnvoyConfigName, Endpoints: []*envoyendpoint.LocalityLbEndpoints{{