From 1f0d47fb6962e85ae16a8c0fccbbcc9c5ea5e939 Mon Sep 17 00:00:00 2001 From: cawright-rh Date: Wed, 14 Aug 2024 10:23:04 -0700 Subject: [PATCH] Allow property security-inter-broker-protocol (#85) * adding the ability to use security-inter-broker-protocol in koperator * updating util.go to remove _ for generated names * adding replace all for external listener port name * fixing other places where externallistener name is used to not have _ * adding an alternative way to identify which port to use for kafka administration and cc connection * taking out comments for pr push * fixing kafka crd * setting omitempty so it will not be required * adding generated crds * adding comments with context for new flag UsedForKafkaAdminCommunication * Use getBrokerReadOnlyConfig function to get properties and update unit test - security_inter_broker_protocol_Set * Update crds to match generated manifest --------- Co-authored-by: Cameron Wright Co-authored-by: Ha Van --- api/v1beta1/kafkacluster_types.go | 3 + charts/kafka-operator/crds/kafkaclusters.yaml | 10 +++ .../kafka.banzaicloud.io_kafkaclusters.yaml | 10 +++ pkg/resources/kafka/configmap.go | 23 +++-- pkg/resources/kafka/configmap_test.go | 20 +++++ pkg/resources/kafka/kafka.go | 10 ++- pkg/util/client/common.go | 7 ++ pkg/util/kafka/common.go | 9 +- pkg/util/kafka/const.go | 1 + pkg/util/util.go | 10 ++- pkg/util/util_test.go | 83 +++++++++++++++++++ 11 files changed, 172 insertions(+), 14 deletions(-) diff --git a/api/v1beta1/kafkacluster_types.go b/api/v1beta1/kafkacluster_types.go index 9a9c3b645..31aea4956 100644 --- a/api/v1beta1/kafkacluster_types.go +++ b/api/v1beta1/kafkacluster_types.go @@ -748,6 +748,9 @@ type CommonListenerSpec struct { // At least one of the listeners should have this flag enabled // +optional UsedForInnerBrokerCommunication bool `json:"usedForInnerBrokerCommunication"` + // UsedForKafkaAdminCommunication allows for a different port to be returned when the koperator is checking for the port to use to check if kafka is operating. + // +optional + UsedForKafkaAdminCommunication bool `json:"usedForKafkaAdminCommunication,omitempty"` } func (c *CommonListenerSpec) GetServerSSLCertSecretName() string { diff --git a/charts/kafka-operator/crds/kafkaclusters.yaml b/charts/kafka-operator/crds/kafkaclusters.yaml index e20593bc2..c55ba0c7f 100644 --- a/charts/kafka-operator/crds/kafkaclusters.yaml +++ b/charts/kafka-operator/crds/kafkaclusters.yaml @@ -21677,6 +21677,11 @@ spec: description: At least one of the listeners should have this flag enabled type: boolean + usedForKafkaAdminCommunication: + description: UsedForKafkaAdminCommunication allows for a + different port to be returned when the koperator is checking + for the port to use to check if kafka is operating. + type: boolean required: - containerPort - externalStartingPort @@ -21753,6 +21758,11 @@ spec: description: At least one of the listeners should have this flag enabled type: boolean + usedForKafkaAdminCommunication: + description: UsedForKafkaAdminCommunication allows for a + different port to be returned when the koperator is checking + for the port to use to check if kafka is operating. + type: boolean required: - containerPort - name diff --git a/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml b/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml index e20593bc2..c55ba0c7f 100644 --- a/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml +++ b/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml @@ -21677,6 +21677,11 @@ spec: description: At least one of the listeners should have this flag enabled type: boolean + usedForKafkaAdminCommunication: + description: UsedForKafkaAdminCommunication allows for a + different port to be returned when the koperator is checking + for the port to use to check if kafka is operating. + type: boolean required: - containerPort - externalStartingPort @@ -21753,6 +21758,11 @@ spec: description: At least one of the listeners should have this flag enabled type: boolean + usedForKafkaAdminCommunication: + description: UsedForKafkaAdminCommunication allows for a + different port to be returned when the koperator is checking + for the port to use to check if kafka is operating. + type: boolean required: - containerPort - name diff --git a/pkg/resources/kafka/configmap.go b/pkg/resources/kafka/configmap.go index 5c4645c4a..e74b1cf4e 100644 --- a/pkg/resources/kafka/configmap.go +++ b/pkg/resources/kafka/configmap.go @@ -40,7 +40,7 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, id int32 config := properties.NewProperties() // Add listener configuration - listenerConf := generateListenerSpecificConfig(&r.KafkaCluster.Spec.ListenersConfig, serverPasses, log) + listenerConf := generateListenerSpecificConfig(&r.KafkaCluster.Spec, serverPasses, log) config.Merge(listenerConf) // Add listener configuration @@ -87,9 +87,13 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, id int32 } } - // Add Cruise Control Metrics Reporter configuration - if err := config.Set(kafkautils.CruiseControlConfigMetricsReporters, "com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter"); err != nil { - log.Error(err, fmt.Sprintf("setting '%s' in broker configuration resulted an error", kafkautils.CruiseControlConfigMetricsReporters)) + // Add Cruise Control Metrics Reporter configuration. + // When "security.inter.broker.protocol" (e.g. inter broker communication is secure) is configured, the operator disables the reporter. + _, isSecurityInterBrokerProtocolConfigured := getBrokerReadOnlyConfig(id, r.KafkaCluster, log).Get(kafkautils.KafkaConfigSecurityInterBrokerProtocol) + if !isSecurityInterBrokerProtocolConfigured { + if err := config.Set(kafkautils.CruiseControlConfigMetricsReporters, "com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter"); err != nil { + log.Error(err, fmt.Sprintf("setting '%s' in broker configuration resulted an error", kafkautils.CruiseControlConfigMetricsReporters)) + } } bootstrapServers, err := kafkautils.GetBootstrapServersService(r.KafkaCluster) if err != nil { @@ -244,12 +248,14 @@ func generateControlPlaneListener(iListeners []v1beta1.InternalListenerConfig) s return controlPlaneListener } -func generateListenerSpecificConfig(l *v1beta1.ListenersConfig, serverPasses map[string]string, log logr.Logger) *properties.Properties { +func generateListenerSpecificConfig(kcs *v1beta1.KafkaClusterSpec, serverPasses map[string]string, log logr.Logger) *properties.Properties { var ( interBrokerListenerName string securityProtocolMapConfig []string listenerConfig []string ) + l := kcs.ListenersConfig + r := kcs.ReadOnlyConfig config := properties.NewProperties() @@ -292,9 +298,12 @@ func generateListenerSpecificConfig(l *v1beta1.ListenersConfig, serverPasses map if err := config.Set(kafkautils.KafkaConfigListenerSecurityProtocolMap, securityProtocolMapConfig); err != nil { log.Error(err, fmt.Sprintf("setting '%s' parameter in broker configuration resulted an error", kafkautils.KafkaConfigListenerSecurityProtocolMap)) } - if err := config.Set(kafkautils.KafkaConfigInterBrokerListenerName, interBrokerListenerName); err != nil { - log.Error(err, fmt.Sprintf("setting '%s' parameter in broker configuration resulted an error", kafkautils.KafkaConfigInterBrokerListenerName)) + if !strings.Contains(r, kafkautils.KafkaConfigSecurityInterBrokerProtocol+"=") { + if err := config.Set(kafkautils.KafkaConfigInterBrokerListenerName, interBrokerListenerName); err != nil { + log.Error(err, fmt.Sprintf("setting '%s' parameter in broker configuration resulted an error", kafkautils.KafkaConfigInterBrokerListenerName)) + } } + if err := config.Set(kafkautils.KafkaConfigListeners, listenerConfig); err != nil { log.Error(err, fmt.Sprintf("setting '%s' parameter in broker configuration resulted an error", kafkautils.KafkaConfigListeners)) } diff --git a/pkg/resources/kafka/configmap_test.go b/pkg/resources/kafka/configmap_test.go index cd3405b82..f10cc116e 100644 --- a/pkg/resources/kafka/configmap_test.go +++ b/pkg/resources/kafka/configmap_test.go @@ -605,6 +605,26 @@ metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlM super.users=User:CN=kafka-headless.kafka.svc.cluster.local zookeeper.connect=example.zk:2181/`, }, + { + testName: "security_inter_broker_protocol_Set", + readOnlyConfig: `security.inter.broker.protocol=SASL_SSL`, + zkAddresses: []string{"example.zk:2181"}, + zkPath: ``, + kubernetesClusterDomain: ``, + clusterWideConfig: ``, + perBrokerConfig: ``, + perBrokerReadOnlyConfig: ``, + advertisedListenerAddress: `kafka-0.kafka.svc.cluster.local:9092`, + listenerType: "plaintext", + expectedConfig: `advertised.listeners=INTERNAL://kafka-0.kafka.svc.cluster.local:9092 +broker.id=0 +cruise.control.metrics.reporter.bootstrap.servers=kafka-all-broker.kafka.svc.cluster.local:9092 +cruise.control.metrics.reporter.kubernetes.mode=true +listener.security.protocol.map=INTERNAL:PLAINTEXT +listeners=INTERNAL://:9092 +zookeeper.connect=example.zk:2181/ +security.inter.broker.protocol=SASL_SSL`, + }, } t.Parallel() diff --git a/pkg/resources/kafka/kafka.go b/pkg/resources/kafka/kafka.go index 03f2ea011..a77c0dfff 100644 --- a/pkg/resources/kafka/kafka.go +++ b/pkg/resources/kafka/kafka.go @@ -1513,20 +1513,26 @@ func getServiceFromExternalListener(client client.Client, cluster *v1beta1.Kafka case istioingressutils.IngressControllerName: if ingressConfigName == util.IngressConfigGlobalName { iControllerServiceName = fmt.Sprintf(istioingressutils.MeshGatewayNameTemplate, eListenerName, cluster.GetName()) + iControllerServiceName = strings.ReplaceAll(iControllerServiceName, "_", "-") } else { iControllerServiceName = fmt.Sprintf(istioingressutils.MeshGatewayNameTemplateWithScope, eListenerName, ingressConfigName, cluster.GetName()) + iControllerServiceName = strings.ReplaceAll(iControllerServiceName, "_", "-") } case envoyutils.IngressControllerName: if ingressConfigName == util.IngressConfigGlobalName { iControllerServiceName = fmt.Sprintf(envoyutils.EnvoyServiceName, eListenerName, cluster.GetName()) + iControllerServiceName = strings.ReplaceAll(iControllerServiceName, "_", "-") } else { iControllerServiceName = fmt.Sprintf(envoyutils.EnvoyServiceNameWithScope, eListenerName, ingressConfigName, cluster.GetName()) + iControllerServiceName = strings.ReplaceAll(iControllerServiceName, "_", "-") } case contourutils.IngressControllerName: if ingressConfigName == util.IngressConfigGlobalName { iControllerServiceName = fmt.Sprintf(contourutils.ContourServiceName, eListenerName, cluster.GetName()) + iControllerServiceName = strings.ReplaceAll(iControllerServiceName, "_", "-") } else { iControllerServiceName = fmt.Sprintf(contourutils.ContourServiceNameWithScope, eListenerName, ingressConfigName, cluster.GetName()) + iControllerServiceName = strings.ReplaceAll(iControllerServiceName, "_", "-") } } @@ -1602,7 +1608,7 @@ func generateServicePortForIListeners(listeners []v1beta1.InternalListenerConfig var usedPorts []corev1.ServicePort for _, iListener := range listeners { usedPorts = append(usedPorts, corev1.ServicePort{ - Name: strings.ReplaceAll(iListener.GetListenerServiceName(), "_", ""), + Name: strings.ReplaceAll(iListener.GetListenerServiceName(), "_", "-"), Port: iListener.ContainerPort, TargetPort: intstr.FromInt(int(iListener.ContainerPort)), Protocol: corev1.ProtocolTCP, @@ -1615,7 +1621,7 @@ func generateServicePortForEListeners(listeners []v1beta1.ExternalListenerConfig var usedPorts []corev1.ServicePort for _, eListener := range listeners { usedPorts = append(usedPorts, corev1.ServicePort{ - Name: eListener.GetListenerServiceName(), + Name: strings.ReplaceAll(eListener.GetListenerServiceName(), "_", "-"), Protocol: corev1.ProtocolTCP, Port: eListener.ContainerPort, TargetPort: intstr.FromInt(int(eListener.ContainerPort)), diff --git a/pkg/util/client/common.go b/pkg/util/client/common.go index 360091f83..76057bff8 100644 --- a/pkg/util/client/common.go +++ b/pkg/util/client/common.go @@ -37,11 +37,18 @@ func UseSSL(cluster *v1beta1.KafkaCluster) bool { func getContainerPortForInnerCom(internalListeners []v1beta1.InternalListenerConfig, extListeners []v1beta1.ExternalListenerConfig) int32 { for _, val := range internalListeners { + if val.UsedForKafkaAdminCommunication { // Optional override to return a port from a different listener. Needed if b2b communication is on an external listener and and you want the koperator to interact with kafka over a different port. + return val.ContainerPort + } if val.UsedForInnerBrokerCommunication { return val.ContainerPort } } + for _, val := range extListeners { + if val.UsedForKafkaAdminCommunication { + return val.ContainerPort + } if val.UsedForInnerBrokerCommunication { return val.ContainerPort } diff --git a/pkg/util/kafka/common.go b/pkg/util/kafka/common.go index 5656d1c57..c9f58362a 100644 --- a/pkg/util/kafka/common.go +++ b/pkg/util/kafka/common.go @@ -192,8 +192,11 @@ func GetBootstrapServersService(cluster *v1beta1.KafkaCluster) (string, error) { // GetBrokerContainerPort return broker container port func GetBrokerContainerPort(cluster *v1beta1.KafkaCluster) (int32, error) { containerPort := int32(0) - for _, lc := range cluster.Spec.ListenersConfig.InternalListeners { + if lc.UsedForKafkaAdminCommunication { // Optional override to return a port from a different listener. Needed if b2b communication is on an external listener and and you want the koperator to interact with kafka over a different port. + containerPort = lc.ContainerPort + break + } if lc.UsedForInnerBrokerCommunication && !lc.UsedForControllerCommunication { containerPort = lc.ContainerPort break @@ -201,6 +204,10 @@ func GetBrokerContainerPort(cluster *v1beta1.KafkaCluster) (int32, error) { } for _, lc := range cluster.Spec.ListenersConfig.ExternalListeners { + if lc.UsedForKafkaAdminCommunication { + containerPort = lc.ContainerPort + break + } if lc.UsedForInnerBrokerCommunication { containerPort = lc.ContainerPort break diff --git a/pkg/util/kafka/const.go b/pkg/util/kafka/const.go index e286db10e..ffabfaace 100644 --- a/pkg/util/kafka/const.go +++ b/pkg/util/kafka/const.go @@ -30,6 +30,7 @@ const ( KafkaConfigListenerName = "listener.name" KafkaConfigListenerSecurityProtocolMap = "listener.security.protocol.map" KafkaConfigInterBrokerListenerName = "inter.broker.listener.name" + KafkaConfigSecurityInterBrokerProtocol = "security.inter.broker.protocol" KafkaConfigAdvertisedListeners = "advertised.listeners" KafkaConfigControlPlaneListener = "control.plane.listener.name" diff --git a/pkg/util/util.go b/pkg/util/util.go index e2ee62501..48c36cfd5 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -236,11 +236,12 @@ func IsIngressConfigInUse(iConfigName, defaultConfigName string, cluster *v1beta // ConstructEListenerLabelName construct an eListener label name based on ingress config name and listener name func ConstructEListenerLabelName(ingressConfigName, eListenerName string) string { + externalListenerName := strings.ReplaceAll(eListenerName, "_", "-") if ingressConfigName == IngressConfigGlobalName { - return eListenerName + return externalListenerName } - return fmt.Sprintf(ExternalListenerLabelNameTemplate, eListenerName, ingressConfigName) + return fmt.Sprintf(ExternalListenerLabelNameTemplate, externalListenerName, ingressConfigName) } // ShouldIncludeBroker returns true if the broker should be included as a resource on external listener resources @@ -437,10 +438,11 @@ func ConvertConfigEntryListToProperties(config []sarama.ConfigEntry) (*propertie func GenerateEnvoyResourceName(resourceNameFormat string, resourceNameWithScopeFormat string, extListener v1beta1.ExternalListenerConfig, ingressConfig v1beta1.IngressConfig, ingressConfigName, clusterName string) string { var resourceName string + externalListenerName := strings.ReplaceAll(extListener.Name, "_", "-") if ingressConfigName == IngressConfigGlobalName { - resourceName = fmt.Sprintf(resourceNameFormat, extListener.Name, clusterName) + resourceName = fmt.Sprintf(resourceNameFormat, externalListenerName, clusterName) } else { - resourceName = fmt.Sprintf(resourceNameWithScopeFormat, extListener.Name, ingressConfigName, clusterName) + resourceName = fmt.Sprintf(resourceNameWithScopeFormat, externalListenerName, ingressConfigName, clusterName) } return resourceName diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go index 3fd64f9f4..7182f267f 100644 --- a/pkg/util/util_test.go +++ b/pkg/util/util_test.go @@ -691,6 +691,89 @@ cruise.control.metrics.reporter.kubernetes.mode=true`, } } +func TestConstructEListenerLabelName(t *testing.T) { + tests := []struct { + ingressConfigName string + eListenerName string + expected string + }{ + {"globalConfig", "example_listener_name", "example-listener-name"}, + {"globalConfig", "no_underscores", "no-underscores"}, + {"globalConfig", "multiple___underscores", "multiple---underscores"}, + {"globalConfig", "noUnderscoresHere", "noUnderscoresHere"}, + {"nonGlobalConfig", "example_listener_name", "example-listener-name-nonGlobalConfig"}, + } + + for _, test := range tests { + result := ConstructEListenerLabelName(test.ingressConfigName, test.eListenerName) + if result != test.expected { + t.Errorf("ConstructEListenerLabelName(%q, %q) = %q; want %q", test.ingressConfigName, test.eListenerName, result, test.expected) + } + } +} + +func TestGenerateEnvoyResourceName(t *testing.T) { + testCases := []struct { + resourceNameFormat string + resourceNameWithScopeFormat string + extListener v1beta1.ExternalListenerConfig + ingressConfig v1beta1.IngressConfig + ingressConfigName, clusterName, expected string + }{ + { + resourceNameFormat: "%s-%s", + resourceNameWithScopeFormat: "%s-%s-%s", + extListener: v1beta1.ExternalListenerConfig{ + CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "noUnderscores"}, + }, + ingressConfig: v1beta1.IngressConfig{}, + ingressConfigName: "globalConfig", + clusterName: "clusterName", + expected: "noUnderscores-clusterName", + }, + { + resourceNameFormat: "%s-%s", + resourceNameWithScopeFormat: "%s-%s-%s", + extListener: v1beta1.ExternalListenerConfig{ + CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "under_scores"}, + }, + ingressConfig: v1beta1.IngressConfig{}, + ingressConfigName: "globalConfig", + clusterName: "clusterName", + expected: "under-scores-clusterName", + }, + { + resourceNameFormat: "%s-%s", + resourceNameWithScopeFormat: "%s-%s-%s", + extListener: v1beta1.ExternalListenerConfig{ + CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "noUnderscores"}, + }, + ingressConfig: v1beta1.IngressConfig{}, + ingressConfigName: "nonGlobalConfig", + clusterName: "clusterName", + expected: "noUnderscores-nonGlobalConfig-clusterName", + }, + { + resourceNameFormat: "%s-%s", + resourceNameWithScopeFormat: "%s-%s-%s", + extListener: v1beta1.ExternalListenerConfig{ + CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "under_scores"}, + }, + ingressConfig: v1beta1.IngressConfig{}, + ingressConfigName: "nonGlobalConfig", + clusterName: "clusterName", + expected: "under-scores-nonGlobalConfig-clusterName", + }, + } + for _, test := range testCases { + hash := GenerateEnvoyResourceName(test.resourceNameFormat, test.resourceNameWithScopeFormat, test.extListener, test.ingressConfig, + test.ingressConfigName, test.clusterName) + if hash != test.expected { + t.Errorf("Expected: %s Got: %s", test.expected, hash) + } + } +} + func TestGetMD5Hash(t *testing.T) { testCases := []struct { testName string