From 507a4710e6f07ea7ca2767eb415a6ba860637a42 Mon Sep 17 00:00:00 2001 From: Adi Muraru Date: Fri, 8 Oct 2021 00:05:44 +0300 Subject: [PATCH] Enable envoy idleTimeout and TCP keep-alive for connections to kafka and clients 1/ Kafka broker defines connections.max.idle.ms=600s To ensure envoy as a client for kafka broker is terminating the connection first to avoid network disconnects this patch is setting the idleTimeout to value slightly less than that 2/ Enable tcp-keep alive for all TCP connections established by envoy to kafka and to client (or fronting Load Balancer) --- .../kafkacluster_controller_envoy_test.go | 210 ++++++++++++++++++ pkg/resources/envoy/configmap.go | 71 +++++- 2 files changed, 276 insertions(+), 5 deletions(-) diff --git a/controllers/tests/kafkacluster_controller_envoy_test.go b/controllers/tests/kafkacluster_controller_envoy_test.go index 62aa936a0..b81322f8a 100644 --- a/controllers/tests/kafkacluster_controller_envoy_test.go +++ b/controllers/tests/kafkacluster_controller_envoy_test.go @@ -120,6 +120,11 @@ staticResources: portValue: 9094 name: broker-0 type: STRICT_DNS + upstreamConnectionOptions: + tcpKeepalive: + keepaliveInterval: 30 + keepaliveProbes: 3 + keepaliveTime: 30 - circuitBreakers: thresholds: - maxConnections: 1000000000 @@ -143,6 +148,11 @@ staticResources: portValue: 9094 name: broker-1 type: STRICT_DNS + upstreamConnectionOptions: + tcpKeepalive: + keepaliveInterval: 30 + keepaliveProbes: 3 + keepaliveTime: 30 - circuitBreakers: thresholds: - maxConnections: 1000000000 @@ -166,6 +176,11 @@ staticResources: portValue: 9094 name: broker-2 type: STRICT_DNS + upstreamConnectionOptions: + tcpKeepalive: + keepaliveInterval: 30 + keepaliveProbes: 3 + keepaliveTime: 30 - circuitBreakers: thresholds: - maxConnections: 1000000000 @@ -203,6 +218,11 @@ staticResources: portValue: 9020 name: all-brokers type: STRICT_DNS + upstreamConnectionOptions: + tcpKeepalive: + keepaliveInterval: 30 + keepaliveProbes: 3 + keepaliveTime: 30 listeners: - address: socketAddress: @@ -214,8 +234,22 @@ staticResources: typedConfig: '@type': type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy cluster: broker-0 + idleTimeout: 560s maxConnectAttempts: 2 statPrefix: broker_tcp-0 + socketOptions: + - intValue: "1" + level: "1" + name: "9" + - intValue: "30" + level: "6" + name: "4" + - intValue: "30" + level: "6" + name: "5" + - intValue: "3" + level: "6" + name: "6" - address: socketAddress: address: 0.0.0.0 @@ -226,8 +260,22 @@ staticResources: typedConfig: '@type': type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy cluster: broker-1 + idleTimeout: 560s maxConnectAttempts: 2 statPrefix: broker_tcp-1 + socketOptions: + - intValue: "1" + level: "1" + name: "9" + - intValue: "30" + level: "6" + name: "4" + - intValue: "30" + level: "6" + name: "5" + - intValue: "3" + level: "6" + name: "6" - address: socketAddress: address: 0.0.0.0 @@ -238,8 +286,22 @@ staticResources: typedConfig: '@type': type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy cluster: broker-2 + idleTimeout: 560s maxConnectAttempts: 2 statPrefix: broker_tcp-2 + socketOptions: + - intValue: "1" + level: "1" + name: "9" + - intValue: "30" + level: "6" + name: "4" + - intValue: "30" + level: "6" + name: "5" + - intValue: "3" + level: "6" + name: "6" - address: socketAddress: address: 0.0.0.0 @@ -250,8 +312,22 @@ staticResources: typedConfig: '@type': type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy cluster: all-brokers + idleTimeout: 560s maxConnectAttempts: 2 statPrefix: all-brokers + socketOptions: + - intValue: "1" + level: "1" + name: "9" + - intValue: "30" + level: "6" + name: "4" + - intValue: "30" + level: "6" + name: "5" + - intValue: "3" + level: "6" + name: "6" - address: socketAddress: address: 0.0.0.0 @@ -289,6 +365,19 @@ staticResources: redirect: pathRedirect: /healthcheck statPrefix: all-brokers-healthcheck + socketOptions: + - intValue: "1" + level: "1" + name: "9" + - intValue: "30" + level: "6" + name: "4" + - intValue: "30" + level: "6" + name: "5" + - intValue: "3" + level: "6" + name: "6" `, fmt.Sprintf(svcTemplate, "0"), fmt.Sprintf(svcTemplate, "1"), fmt.Sprintf(svcTemplate, "2"), fmt.Sprintf(svcTemplate, "all-broker")) Expect(configMap.Data["envoy.yaml"]).To(Equal(expected)) } @@ -471,6 +560,11 @@ staticResources: portValue: 9094 name: broker-0 type: STRICT_DNS + upstreamConnectionOptions: + tcpKeepalive: + keepaliveInterval: 30 + keepaliveProbes: 3 + keepaliveTime: 30 - circuitBreakers: thresholds: - maxConnections: 1000000000 @@ -508,6 +602,11 @@ staticResources: portValue: 9020 name: all-brokers type: STRICT_DNS + upstreamConnectionOptions: + tcpKeepalive: + keepaliveInterval: 30 + keepaliveProbes: 3 + keepaliveTime: 30 listeners: - address: socketAddress: @@ -519,8 +618,22 @@ staticResources: typedConfig: '@type': type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy cluster: broker-0 + idleTimeout: 560s maxConnectAttempts: 2 statPrefix: broker_tcp-0 + socketOptions: + - intValue: "1" + level: "1" + name: "9" + - intValue: "30" + level: "6" + name: "4" + - intValue: "30" + level: "6" + name: "5" + - intValue: "3" + level: "6" + name: "6" - address: socketAddress: address: 0.0.0.0 @@ -531,8 +644,22 @@ staticResources: typedConfig: '@type': type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy cluster: all-brokers + idleTimeout: 560s maxConnectAttempts: 2 statPrefix: all-brokers + socketOptions: + - intValue: "1" + level: "1" + name: "9" + - intValue: "30" + level: "6" + name: "4" + - intValue: "30" + level: "6" + name: "5" + - intValue: "3" + level: "6" + name: "6" - address: socketAddress: address: 0.0.0.0 @@ -570,6 +697,19 @@ staticResources: redirect: pathRedirect: /healthcheck statPrefix: all-brokers-healthcheck + socketOptions: + - intValue: "1" + level: "1" + name: "9" + - intValue: "30" + level: "6" + name: "4" + - intValue: "30" + level: "6" + name: "5" + - intValue: "3" + level: "6" + name: "6" `, fmt.Sprintf(svcTemplate, "0"), fmt.Sprintf(svcTemplate, "all-broker")) Expect(configMap.Data["envoy.yaml"]).To(Equal(expected)) } @@ -682,6 +822,11 @@ staticResources: portValue: 9094 name: broker-1 type: STRICT_DNS + upstreamConnectionOptions: + tcpKeepalive: + keepaliveInterval: 30 + keepaliveProbes: 3 + keepaliveTime: 30 - circuitBreakers: thresholds: - maxConnections: 1000000000 @@ -705,6 +850,11 @@ staticResources: portValue: 9094 name: broker-2 type: STRICT_DNS + upstreamConnectionOptions: + tcpKeepalive: + keepaliveInterval: 30 + keepaliveProbes: 3 + keepaliveTime: 30 - circuitBreakers: thresholds: - maxConnections: 1000000000 @@ -742,6 +892,11 @@ staticResources: portValue: 9020 name: all-brokers type: STRICT_DNS + upstreamConnectionOptions: + tcpKeepalive: + keepaliveInterval: 30 + keepaliveProbes: 3 + keepaliveTime: 30 listeners: - address: socketAddress: @@ -753,8 +908,22 @@ staticResources: typedConfig: '@type': type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy cluster: broker-1 + idleTimeout: 560s maxConnectAttempts: 2 statPrefix: broker_tcp-1 + socketOptions: + - intValue: "1" + level: "1" + name: "9" + - intValue: "30" + level: "6" + name: "4" + - intValue: "30" + level: "6" + name: "5" + - intValue: "3" + level: "6" + name: "6" - address: socketAddress: address: 0.0.0.0 @@ -765,8 +934,22 @@ staticResources: typedConfig: '@type': type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy cluster: broker-2 + idleTimeout: 560s maxConnectAttempts: 2 statPrefix: broker_tcp-2 + socketOptions: + - intValue: "1" + level: "1" + name: "9" + - intValue: "30" + level: "6" + name: "4" + - intValue: "30" + level: "6" + name: "5" + - intValue: "3" + level: "6" + name: "6" - address: socketAddress: address: 0.0.0.0 @@ -777,8 +960,22 @@ staticResources: typedConfig: '@type': type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy cluster: all-brokers + idleTimeout: 560s maxConnectAttempts: 2 statPrefix: all-brokers + socketOptions: + - intValue: "1" + level: "1" + name: "9" + - intValue: "30" + level: "6" + name: "4" + - intValue: "30" + level: "6" + name: "5" + - intValue: "3" + level: "6" + name: "6" - address: socketAddress: address: 0.0.0.0 @@ -816,6 +1013,19 @@ staticResources: redirect: pathRedirect: /healthcheck statPrefix: all-brokers-healthcheck + socketOptions: + - intValue: "1" + level: "1" + name: "9" + - intValue: "30" + level: "6" + name: "4" + - intValue: "30" + level: "6" + name: "5" + - intValue: "3" + level: "6" + name: "6" `, fmt.Sprintf(svcTemplate, "1"), fmt.Sprintf(svcTemplate, "2"), fmt.Sprintf(svcTemplate, "all-broker")) Expect(configMap.Data["envoy.yaml"]).To(Equal(expected)) } diff --git a/pkg/resources/envoy/configmap.go b/pkg/resources/envoy/configmap.go index 871b8d447..5e8028709 100644 --- a/pkg/resources/envoy/configmap.go +++ b/pkg/resources/envoy/configmap.go @@ -187,6 +187,7 @@ func generateEnvoyHealthCheckListener(ingressConfig v1beta1.IngressConfig, log l }, }, }, + SocketOptions: getKeepAliveSocketOptions(), } } @@ -219,6 +220,8 @@ func GenerateEnvoyConfig(kc *v1beta1.KafkaCluster, elistener v1beta1.ExternalLis // TCP_Proxy filter configuration tcpProxy := &envoytcpproxy.TcpProxy{ StatPrefix: fmt.Sprintf("broker_tcp-%d", brokerId), + MaxConnectAttempts: &wrapperspb.UInt32Value{Value: 2}, + IdleTimeout: &durationpb.Duration{Seconds: 560}, ClusterSpecifier: &envoytcpproxy.TcpProxy_Cluster{ Cluster: fmt.Sprintf("broker-%d", brokerId), }, @@ -229,6 +232,7 @@ func GenerateEnvoyConfig(kc *v1beta1.KafkaCluster, elistener v1beta1.ExternalLis return "" } listeners = append(listeners, &envoylistener.Listener{ + Address: &envoycore.Address{ Address: &envoycore.Address_SocketAddress{ SocketAddress: &envoycore.SocketAddress{ @@ -251,14 +255,18 @@ func GenerateEnvoyConfig(kc *v1beta1.KafkaCluster, elistener v1beta1.ExternalLis }, }, }, + SocketOptions: getKeepAliveSocketOptions(), }) clusters = append(clusters, &envoycluster.Cluster{ - Name: fmt.Sprintf("broker-%d", brokerId), - ConnectTimeout: &durationpb.Duration{Seconds: 1}, + Name: fmt.Sprintf("broker-%d", brokerId), + ConnectTimeout: &durationpb.Duration{Seconds: 1}, + UpstreamConnectionOptions: &envoycluster.UpstreamConnectionOptions{ + TcpKeepalive: getTcpKeepalive(), + }, ClusterDiscoveryType: &envoycluster.Cluster_Type{Type: envoycluster.Cluster_STRICT_DNS}, LbPolicy: envoycluster.Cluster_ROUND_ROBIN, - // disable circuit breakingL: + // disable circuit breaking: // https://www.envoyproxy.io/docs/envoy/latest/faq/load_balancing/disable_circuit_breaking CircuitBreakers: &envoycluster.CircuitBreakers{ Thresholds: []*envoycluster.CircuitBreakers_Thresholds{ @@ -308,6 +316,8 @@ func GenerateEnvoyConfig(kc *v1beta1.KafkaCluster, elistener v1beta1.ExternalLis // TCP_Proxy filter configuration tcpProxy := &envoytcpproxy.TcpProxy{ StatPrefix: envoyutils.AllBrokerEnvoyConfigName, + IdleTimeout: &durationpb.Duration{Seconds: 560}, + MaxConnectAttempts: &wrapperspb.UInt32Value{Value: 2}, ClusterSpecifier: &envoytcpproxy.TcpProxy_Cluster{ Cluster: envoyutils.AllBrokerEnvoyConfigName, }, @@ -340,6 +350,7 @@ func GenerateEnvoyConfig(kc *v1beta1.KafkaCluster, elistener v1beta1.ExternalLis }, }, }, + SocketOptions: getKeepAliveSocketOptions(), }) // health-check http listener @@ -350,8 +361,11 @@ func GenerateEnvoyConfig(kc *v1beta1.KafkaCluster, elistener v1beta1.ExternalLis listeners = append(listeners, healthCheckListener) clusters = append(clusters, &envoycluster.Cluster{ - Name: envoyutils.AllBrokerEnvoyConfigName, - ConnectTimeout: &durationpb.Duration{Seconds: 1}, + Name: envoyutils.AllBrokerEnvoyConfigName, + ConnectTimeout: &durationpb.Duration{Seconds: 1}, + UpstreamConnectionOptions: &envoycluster.UpstreamConnectionOptions{ + TcpKeepalive: getTcpKeepalive(), + }, IgnoreHealthOnHostRemoval: true, HealthChecks: []*envoycore.HealthCheck{ { @@ -441,3 +455,50 @@ func GenerateEnvoyConfig(kc *v1beta1.KafkaCluster, elistener v1beta1.ExternalLis } return string(marshalledConfig) } + +func getTcpKeepalive() *envoycore.TcpKeepalive { + return &envoycore.TcpKeepalive{ + KeepaliveProbes: wrapperspb.UInt32(3), + KeepaliveTime: wrapperspb.UInt32(30), + KeepaliveInterval: wrapperspb.UInt32(30), + } +} + +func getKeepAliveSocketOptions() []*envoycore.SocketOption { + return []*envoycore.SocketOption{ + // enable socket keep-alive + { + // SOL_SOCKET = 1 + Level: 1, + // SO_KEEPALIVE = 9 + Name: 9, + Value: &envoycore.SocketOption_IntValue{IntValue: 1}, + State: envoycore.SocketOption_STATE_PREBIND, + }, + // configure keep alive idle, interval and count + { + // IPPROTO_TCP = 6 + Level: 6, + // TCP_KEEPIDLE = 4 + Name: 4, + Value: &envoycore.SocketOption_IntValue{IntValue: 30}, + State: envoycore.SocketOption_STATE_PREBIND, + }, + { + // IPPROTO_TCP = 6 + Level: 6, + // TCP_KEEPINTVL = 5 + Name: 5, + Value: &envoycore.SocketOption_IntValue{IntValue: 30}, + State: envoycore.SocketOption_STATE_PREBIND, + }, + { + // IPPROTO_TCP = 6 + Level: 6, + // TCP_KEEPCNT = 6 + Name: 6, + Value: &envoycore.SocketOption_IntValue{IntValue: 3}, + State: envoycore.SocketOption_STATE_PREBIND, + }, + } +}