Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Moved bridge configuration setup within the operator #11032

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion .azure/templates/steps/system_test_general.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ jobs:
env:
DOCKER_TAG: $(docker_tag)
BRIDGE_IMAGE: "latest-released"
BRIDGE_IMAGE: "quay.io/ppatierno/kafka-bridge:bridge-config"
STRIMZI_RBAC_SCOPE: '${{ parameters.strimzi_rbac_scope }}'
DOCKER_REGISTRY: registry.minikube
CLUSTER_OPERATOR_INSTALL_TYPE: '${{ parameters.cluster_operator_install_type }}'
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* Support for MirrorMaker 1 has been removed
* Added support to configure `dnsPolicy` and `dnsConfig` using the `template` sections.
* Store Kafka node certificates in separate Secrets, one Secret per pod.
* Moved HTTP bridge configuration to the ConfigMap setup by the operator.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also add a note in "Major changes, deprecations and removals" section to say that this release is only compatible with Bridge 0.32+? The alternative would be to support both the old and new configuration method where the new one has precedence. I think either way is fine.


### Major changes, deprecations and removals

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,29 +86,18 @@ public class KafkaBridgeCluster extends AbstractModel implements SupportsLogging
// Kafka Bridge configuration keys (EnvVariables)
protected static final String ENV_VAR_PREFIX = "KAFKA_BRIDGE_";
protected static final String ENV_VAR_KAFKA_BRIDGE_METRICS_ENABLED = "KAFKA_BRIDGE_METRICS_ENABLED";
protected static final String ENV_VAR_KAFKA_BRIDGE_BOOTSTRAP_SERVERS = "KAFKA_BRIDGE_BOOTSTRAP_SERVERS";
protected static final String ENV_VAR_KAFKA_BRIDGE_TLS = "KAFKA_BRIDGE_TLS";
protected static final String ENV_VAR_KAFKA_BRIDGE_TRUSTED_CERTS = "KAFKA_BRIDGE_TRUSTED_CERTS";
protected static final String OAUTH_TLS_CERTS_BASE_VOLUME_MOUNT = "/opt/strimzi/oauth-certs/";
protected static final String ENV_VAR_STRIMZI_TRACING = "STRIMZI_TRACING";

protected static final String ENV_VAR_KAFKA_BRIDGE_ADMIN_CLIENT_CONFIG = "KAFKA_BRIDGE_ADMIN_CLIENT_CONFIG";
protected static final String ENV_VAR_KAFKA_BRIDGE_PRODUCER_CONFIG = "KAFKA_BRIDGE_PRODUCER_CONFIG";
protected static final String ENV_VAR_KAFKA_BRIDGE_CONSUMER_CONFIG = "KAFKA_BRIDGE_CONSUMER_CONFIG";
protected static final String ENV_VAR_KAFKA_BRIDGE_ID = "KAFKA_BRIDGE_ID";

protected static final String ENV_VAR_KAFKA_BRIDGE_HTTP_HOST = "KAFKA_BRIDGE_HTTP_HOST";
protected static final String ENV_VAR_KAFKA_BRIDGE_HTTP_PORT = "KAFKA_BRIDGE_HTTP_PORT";
protected static final String ENV_VAR_KAFKA_BRIDGE_HTTP_CONSUMER_TIMEOUT = "KAFKA_BRIDGE_HTTP_CONSUMER_TIMEOUT";
protected static final String ENV_VAR_KAFKA_BRIDGE_HTTP_CONSUMER_ENABLED = "KAFKA_BRIDGE_HTTP_CONSUMER_ENABLED";
protected static final String ENV_VAR_KAFKA_BRIDGE_HTTP_PRODUCER_ENABLED = "KAFKA_BRIDGE_HTTP_PRODUCER_ENABLED";
protected static final String ENV_VAR_KAFKA_BRIDGE_CORS_ENABLED = "KAFKA_BRIDGE_CORS_ENABLED";
protected static final String ENV_VAR_KAFKA_BRIDGE_CORS_ALLOWED_ORIGINS = "KAFKA_BRIDGE_CORS_ALLOWED_ORIGINS";
protected static final String ENV_VAR_KAFKA_BRIDGE_CORS_ALLOWED_METHODS = "KAFKA_BRIDGE_CORS_ALLOWED_METHODS";

protected static final String CO_ENV_VAR_CUSTOM_BRIDGE_POD_LABELS = "STRIMZI_CUSTOM_KAFKA_BRIDGE_LABELS";
protected static final String INIT_VOLUME_MOUNT = "/opt/strimzi/init";

/**
* Key under which the bridge configuration is stored in ConfigMap
*/
public static final String BRIDGE_CONFIGURATION_FILENAME = "application.properties";

private int replicas;
private ClientTls tls;
private KafkaClientAuthentication authentication;
Expand Down Expand Up @@ -411,45 +400,6 @@ protected List<EnvVar> getEnvVars() {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_STRIMZI_GC_LOG_ENABLED, String.valueOf(gcLoggingEnabled)));
JvmOptionUtils.javaOptions(varList, jvmOptions);

varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_BOOTSTRAP_SERVERS, bootstrapServers));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_ADMIN_CLIENT_CONFIG, kafkaBridgeAdminClient == null ? "" : new KafkaBridgeAdminClientConfiguration(reconciliation, kafkaBridgeAdminClient.getConfig().entrySet()).getConfiguration()));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_ID, cluster));

if (kafkaBridgeConsumer != null) {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_CONSUMER_CONFIG, new KafkaBridgeConsumerConfiguration(reconciliation, kafkaBridgeConsumer.getConfig().entrySet()).getConfiguration()));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_HTTP_CONSUMER_ENABLED, String.valueOf(kafkaBridgeConsumer.isEnabled())));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_HTTP_CONSUMER_TIMEOUT, String.valueOf(kafkaBridgeConsumer.getTimeoutSeconds())));
} else {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_CONSUMER_CONFIG, ""));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_HTTP_CONSUMER_ENABLED, "true"));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_HTTP_CONSUMER_TIMEOUT, String.valueOf(KafkaBridgeConsumerSpec.HTTP_DEFAULT_TIMEOUT)));
}

if (kafkaBridgeProducer != null) {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_PRODUCER_CONFIG, new KafkaBridgeProducerConfiguration(reconciliation, kafkaBridgeProducer.getConfig().entrySet()).getConfiguration()));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_HTTP_PRODUCER_ENABLED, String.valueOf(kafkaBridgeProducer.isEnabled())));
} else {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_PRODUCER_CONFIG, ""));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_HTTP_PRODUCER_ENABLED, "true"));
}

varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_HTTP_HOST, KafkaBridgeHttpConfig.HTTP_DEFAULT_HOST));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_HTTP_PORT, String.valueOf(http != null ? http.getPort() : KafkaBridgeHttpConfig.HTTP_DEFAULT_PORT)));

if (http != null && http.getCors() != null) {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_CORS_ENABLED, "true"));

if (http.getCors().getAllowedOrigins() != null) {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_CORS_ALLOWED_ORIGINS, String.join(",", http.getCors().getAllowedOrigins())));
}

if (http.getCors().getAllowedMethods() != null) {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_CORS_ALLOWED_METHODS, String.join(",", http.getCors().getAllowedMethods())));
}
} else {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_CORS_ENABLED, "false"));
}

if (tls != null) {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_TLS, "true"));

Expand All @@ -460,10 +410,6 @@ protected List<EnvVar> getEnvVars() {

AuthenticationUtils.configureClientAuthenticationEnvVars(authentication, varList, name -> ENV_VAR_PREFIX + name);

if (tracing != null) {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_STRIMZI_TRACING, tracing.getType()));
}

// Add shared environment variables used for all containers
varList.addAll(sharedEnvironmentProvider.variables());

Expand Down Expand Up @@ -600,21 +546,38 @@ protected List<EnvVar> getInitContainerEnvVars() {
}

/**
* Generates a metrics and logging ConfigMap according to the configuration. If this operand doesn't support logging
* or metrics, they will nto be set.
* Generates a ConfigMap containing the bridge configuration related to HTTP and Kafka clients.
* It also generates the metrics and logging configuration. If this operand doesn't support logging
* or metrics, they will not be set.
*
* @param metricsAndLogging The external CMs with logging and metrics configuration
*
* @return The generated ConfigMap
*/
public ConfigMap generateMetricsAndLogConfigMap(MetricsAndLogging metricsAndLogging) {
public ConfigMap generateBridgeConfigMap(MetricsAndLogging metricsAndLogging) {
// generate the ConfigMap data entries for the metrics and logging configuration
Map<String, String> data = ConfigMapUtils.generateMetricsAndLogConfigMapData(reconciliation, this, metricsAndLogging);
// add the ConfigMap data entry for the bridge HTTP and Kafka clients related configuration
data.put(
BRIDGE_CONFIGURATION_FILENAME,
new KafkaBridgeConfigurationBuilder(cluster, bootstrapServers)
.withTracing(tracing)
.withTls(tls)
.withAuthentication(authentication)
.withKafkaAdminClient(kafkaBridgeAdminClient)
.withKafkaProducer(kafkaBridgeProducer)
.withKafkaConsumer(kafkaBridgeConsumer)
.withHttp(http, kafkaBridgeProducer, kafkaBridgeConsumer)
.build()
);

return ConfigMapUtils
.createConfigMap(
KafkaBridgeResources.metricsAndLogConfigMapName(cluster),
namespace,
labels,
ownerReference,
ConfigMapUtils.generateMetricsAndLogConfigMapData(reconciliation, this, metricsAndLogging)
data
);
}

Expand Down
Loading
Loading