Skip to content

Commit

Permalink
Update listeners order - external first/default (#35)
Browse files Browse the repository at this point in the history
* Update listeners order - external first/default

* Update tests to reflect new listeners order
  • Loading branch information
alungu authored Sep 2, 2021
1 parent c93245f commit 4deff34
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,14 @@ func expectDefaultBrokerSettingsForExternalListenerBinding(kafkaCluster *v1beta1
Expect(err).NotTo(HaveOccurred())
advertisedListener, found := brokerConfig.Get("advertised.listeners")
Expect(found).To(BeTrue())
Expect(advertisedListener.Value()).To(Equal(fmt.Sprintf("CONTROLLER://kafkacluster-%d-%d.kafka-%d.svc.cluster.local:29093,INTERNAL://kafkacluster-%d-%d.kafka-%d.svc.cluster.local:29092,TEST://external.az1.host.com:%d",
randomGenTestNumber, broker.Id, randomGenTestNumber, randomGenTestNumber, broker.Id, randomGenTestNumber, 19090+broker.Id)))
Expect(advertisedListener.Value()).To(Equal(fmt.Sprintf("TEST://external.az1.host.com:%d,CONTROLLER://kafkacluster-%d-%d.kafka-%d.svc.cluster.local:29093,INTERNAL://kafkacluster-%d-%d.kafka-%d.svc.cluster.local:29092",
19090+broker.Id, randomGenTestNumber, broker.Id, randomGenTestNumber, randomGenTestNumber, broker.Id, randomGenTestNumber)))
listeners, found := brokerConfig.Get("listeners")
Expect(found).To(BeTrue())
Expect(listeners.Value()).To(Equal("INTERNAL://:29092,CONTROLLER://:29093,TEST://:9094"))
Expect(listeners.Value()).To(Equal("TEST://:9094,INTERNAL://:29092,CONTROLLER://:29093"))
listenerSecMap, found := brokerConfig.Get("listener.security.protocol.map")
Expect(found).To(BeTrue())
Expect(listenerSecMap.Value()).To(Equal("INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,TEST:PLAINTEXT"))
Expect(listenerSecMap.Value()).To(Equal("TEST:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT"))
// check service
service := corev1.Service{}
Eventually(func() error {
Expand Down Expand Up @@ -111,8 +111,8 @@ func expectBrokerConfigmapForAz1ExternalListener(kafkaCluster *v1beta1.KafkaClus
Expect(err).NotTo(HaveOccurred())
advertisedListener, found := brokerConfig.Get("advertised.listeners")
Expect(found).To(BeTrue())
Expect(advertisedListener.Value()).To(Equal(fmt.Sprintf("CONTROLLER://kafkacluster-%d-%d.kafkaconfigtest-%d.svc.cluster.local:29093,INTERNAL://kafkacluster-%d-%d.kafkaconfigtest-%d.svc.cluster.local:29092,TEST://external.az1.host.com:%d",
randomGenTestNumber, 0, randomGenTestNumber, randomGenTestNumber, 0, randomGenTestNumber, 19090)))
Expect(advertisedListener.Value()).To(Equal(fmt.Sprintf("TEST://external.az1.host.com:%d,CONTROLLER://kafkacluster-%d-%d.kafkaconfigtest-%d.svc.cluster.local:29093,INTERNAL://kafkacluster-%d-%d.kafkaconfigtest-%d.svc.cluster.local:29092",
19090, randomGenTestNumber, 0, randomGenTestNumber, randomGenTestNumber, 0, randomGenTestNumber)))
}

func expectBrokerConfigmapForAz2ExternalListener(kafkaCluster *v1beta1.KafkaCluster, randomGenTestNumber uint64) {
Expand All @@ -128,8 +128,8 @@ func expectBrokerConfigmapForAz2ExternalListener(kafkaCluster *v1beta1.KafkaClus
Expect(err).NotTo(HaveOccurred())
advertisedListener, found := brokerConfig.Get("advertised.listeners")
Expect(found).To(BeTrue())
Expect(advertisedListener.Value()).To(Equal(fmt.Sprintf("CONTROLLER://kafkacluster-%d-%d.kafkaconfigtest-%d.svc.cluster.local:29093,INTERNAL://kafkacluster-%d-%d.kafkaconfigtest-%d.svc.cluster.local:29092,TEST://external.az2.host.com:%d",
randomGenTestNumber, 1, randomGenTestNumber, randomGenTestNumber, 1, randomGenTestNumber, 19091)))
Expect(advertisedListener.Value()).To(Equal(fmt.Sprintf("TEST://external.az2.host.com:%d,CONTROLLER://kafkacluster-%d-%d.kafkaconfigtest-%d.svc.cluster.local:29093,INTERNAL://kafkacluster-%d-%d.kafkaconfigtest-%d.svc.cluster.local:29092",
19091, randomGenTestNumber, 1, randomGenTestNumber, randomGenTestNumber, 1, randomGenTestNumber)))

configMap = corev1.ConfigMap{}
Eventually(func() error {
Expand All @@ -143,6 +143,6 @@ func expectBrokerConfigmapForAz2ExternalListener(kafkaCluster *v1beta1.KafkaClus
Expect(err).NotTo(HaveOccurred())
advertisedListener, found = brokerConfig.Get("advertised.listeners")
Expect(found).To(BeTrue())
Expect(advertisedListener.Value()).To(Equal(fmt.Sprintf("CONTROLLER://kafkacluster-%d-%d.kafkaconfigtest-%d.svc.cluster.local:29093,INTERNAL://kafkacluster-%d-%d.kafkaconfigtest-%d.svc.cluster.local:29092,TEST://external.az2.host.com:%d",
randomGenTestNumber, 2, randomGenTestNumber, randomGenTestNumber, 2, randomGenTestNumber, 19092)))
Expect(advertisedListener.Value()).To(Equal(fmt.Sprintf("TEST://external.az2.host.com:%d,CONTROLLER://kafkacluster-%d-%d.kafkaconfigtest-%d.svc.cluster.local:29093,INTERNAL://kafkacluster-%d-%d.kafkaconfigtest-%d.svc.cluster.local:29092",
19092, randomGenTestNumber, 2, randomGenTestNumber, randomGenTestNumber, 2, randomGenTestNumber)))
}
8 changes: 4 additions & 4 deletions controllers/tests/kafkacluster_controller_kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,18 +162,18 @@ func expectKafkaBrokerConfigmap(kafkaCluster *v1beta1.KafkaCluster, broker v1bet
Expect(configMap.Labels).To(HaveKeyWithValue("kafka_cr", kafkaCluster.Name))
Expect(configMap.Labels).To(HaveKeyWithValue("brokerId", strconv.Itoa(int(broker.Id))))

Expect(configMap.Data).To(HaveKeyWithValue("broker-config", fmt.Sprintf(`advertised.listeners=CONTROLLER://kafkacluster-%d-%d.kafka-%d.svc.cluster.local:29093,INTERNAL://kafkacluster-%d-%d.kafka-%d.svc.cluster.local:29092,TEST://test.host.com:%d
Expect(configMap.Data).To(HaveKeyWithValue("broker-config", fmt.Sprintf(`advertised.listeners=TEST://test.host.com:%d,CONTROLLER://kafkacluster-%d-%d.kafka-%d.svc.cluster.local:29093,INTERNAL://kafkacluster-%d-%d.kafka-%d.svc.cluster.local:29092
broker.id=%d
control.plane.listener.name=CONTROLLER
cruise.control.metrics.reporter.bootstrap.servers=kafkacluster-1-all-broker.kafka-1.svc.cluster.local:29092
cruise.control.metrics.reporter.kubernetes.mode=true
inter.broker.listener.name=INTERNAL
listener.security.protocol.map=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,TEST:PLAINTEXT
listeners=INTERNAL://:29092,CONTROLLER://:29093,TEST://:9094
listener.security.protocol.map=TEST:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
listeners=TEST://:9094,INTERNAL://:29092,CONTROLLER://:29093
log.dirs=/kafka-logs/kafka
metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter
zookeeper.connect=/
`, randomGenTestNumber, broker.Id, randomGenTestNumber, randomGenTestNumber, broker.Id, randomGenTestNumber, 19090+broker.Id, broker.Id)))
`, 19090+broker.Id, randomGenTestNumber, broker.Id, randomGenTestNumber, randomGenTestNumber, broker.Id, randomGenTestNumber, broker.Id)))

// assert log4j?
}
Expand Down
36 changes: 19 additions & 17 deletions pkg/resources/kafka/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,14 @@ func (r *Reconciler) configMap(id int32, brokerConfig *v1beta1.BrokerConfig, ext

func generateAdvertisedListenerConfig(id int32, l v1beta1.ListenersConfig,
extListenerStatuses, intListenerStatuses, controllerIntListenerStatuses map[string]v1beta1.ListenerStatusList) []string {
advertisedListenerConfig := make([]string, 0, len(l.ExternalListeners)+len(l.InternalListeners))
externalListenerConfig := make([]string, 0, len(l.ExternalListeners))
internalListenerConfig := make([]string, 0, len(l.InternalListeners))

advertisedListenerConfig = appendListenerConfigs(advertisedListenerConfig, id, extListenerStatuses)
advertisedListenerConfig = appendListenerConfigs(advertisedListenerConfig, id, intListenerStatuses)
advertisedListenerConfig = appendListenerConfigs(advertisedListenerConfig, id, controllerIntListenerStatuses)
externalListenerConfig = appendListenerConfigs(externalListenerConfig, id, extListenerStatuses)
internalListenerConfig = appendListenerConfigs(internalListenerConfig, id, intListenerStatuses)
internalListenerConfig = appendListenerConfigs(internalListenerConfig, id, controllerIntListenerStatuses)

return advertisedListenerConfig
return append(externalListenerConfig, internalListenerConfig...)
}

func appendListenerConfigs(advertisedListenerConfig []string, id int32,
Expand Down Expand Up @@ -217,31 +218,32 @@ func generateListenerSpecificConfig(l *v1beta1.ListenersConfig, log logr.Logger)
var securityProtocolMapConfig []string
var listenerConfig []string

for _, iListener := range l.InternalListeners {
if iListener.UsedForInnerBrokerCommunication {
for _, eListener := range l.ExternalListeners {
if eListener.UsedForInnerBrokerCommunication {
if interBrokerListenerName == "" {
interBrokerListenerName = strings.ToUpper(iListener.Name)
interBrokerListenerName = strings.ToUpper(eListener.Name)
} else {
log.Error(errors.New("inter broker listener name already set"), "config error")
}
}
UpperedListenerType := iListener.Type.ToUpperString()
UpperedListenerName := strings.ToUpper(iListener.Name)
UpperedListenerType := eListener.Type.ToUpperString()
UpperedListenerName := strings.ToUpper(eListener.Name)
securityProtocolMapConfig = append(securityProtocolMapConfig, fmt.Sprintf("%s:%s", UpperedListenerName, UpperedListenerType))
listenerConfig = append(listenerConfig, fmt.Sprintf("%s://:%d", UpperedListenerName, iListener.ContainerPort))
listenerConfig = append(listenerConfig, fmt.Sprintf("%s://:%d", UpperedListenerName, eListener.ContainerPort))
}
for _, eListener := range l.ExternalListeners {
if eListener.UsedForInnerBrokerCommunication {

for _, iListener := range l.InternalListeners {
if iListener.UsedForInnerBrokerCommunication {
if interBrokerListenerName == "" {
interBrokerListenerName = strings.ToUpper(eListener.Name)
interBrokerListenerName = strings.ToUpper(iListener.Name)
} else {
log.Error(errors.New("inter broker listener name already set"), "config error")
}
}
UpperedListenerType := eListener.Type.ToUpperString()
UpperedListenerName := strings.ToUpper(eListener.Name)
UpperedListenerType := iListener.Type.ToUpperString()
UpperedListenerName := strings.ToUpper(iListener.Name)
securityProtocolMapConfig = append(securityProtocolMapConfig, fmt.Sprintf("%s:%s", UpperedListenerName, UpperedListenerType))
listenerConfig = append(listenerConfig, fmt.Sprintf("%s://:%d", UpperedListenerName, eListener.ContainerPort))
listenerConfig = append(listenerConfig, fmt.Sprintf("%s://:%d", UpperedListenerName, iListener.ContainerPort))
}

config := properties.NewProperties()
Expand Down

0 comments on commit 4deff34

Please sign in to comment.