Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(operator): KafkaSQL with TLS support #5444

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,36 @@

import io.apicurio.registry.operator.api.v1.ApicurioRegistry3;
import io.fabric8.kubernetes.api.model.EnvVar;
import io.fabric8.kubernetes.api.model.EnvVarBuilder;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

import static io.apicurio.registry.operator.resource.ResourceFactory.APP_CONTAINER_NAME;
import static io.apicurio.registry.operator.resource.app.AppDeploymentResource.addEnvVar;
import static io.apicurio.registry.operator.utils.Utils.isBlank;

public class KafkaSql {
public class KafkaSQL {

private static final Logger log = LoggerFactory.getLogger(KafkaSQL.class);

public static String ENV_STORAGE_KIND = "APICURIO_STORAGE_KIND";
public static String ENV_KAFKASQL_BOOTSTRAP_SERVERS = "APICURIO_KAFKASQL_BOOTSTRAP_SERVERS";

public static boolean configureKafkaSQL(ApicurioRegistry3 primary, Map<String, EnvVar> env) {
public static boolean configureKafkaSQL(ApicurioRegistry3 primary, Deployment deployment,
Map<String, EnvVar> env) {
if (primary.getSpec().getApp().getKafkasql() != null
&& !isBlank(primary.getSpec().getApp().getKafkasql().getBootstrapServers())) {
addEnvVar(env, new EnvVarBuilder().withName(ENV_STORAGE_KIND).withValue("kafkasql").build());
addEnvVar(env, new EnvVarBuilder().withName(ENV_KAFKASQL_BOOTSTRAP_SERVERS)
.withValue(primary.getSpec().getApp().getKafkasql().getBootstrapServers()).build());

addEnvVar(env, ENV_STORAGE_KIND, "kafkasql");
addEnvVar(env, ENV_KAFKASQL_BOOTSTRAP_SERVERS,
primary.getSpec().getApp().getKafkasql().getBootstrapServers());

if (KafkaSQLTLS.configureKafkaSQLTLS(primary, deployment, APP_CONTAINER_NAME, env)) {
log.info("KafkaSQL storage with TLS security configured.");
}

return true;
}
return false;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package io.apicurio.registry.operator.feat;

import io.apicurio.registry.operator.OperatorException;
import io.apicurio.registry.operator.api.v1.ApicurioRegistry3;
import io.apicurio.registry.operator.api.v1.ApicurioRegistry3SpecKafkaSqlSecurity;
import io.apicurio.registry.operator.api.v1.ApicurioRegistry3SpecKafkaSqlTLS;
import io.fabric8.kubernetes.api.model.EnvVar;
import io.fabric8.kubernetes.api.model.EnvVarBuilder;
import io.fabric8.kubernetes.api.model.VolumeBuilder;
import io.fabric8.kubernetes.api.model.VolumeMountBuilder;
import io.fabric8.kubernetes.api.model.apps.Deployment;

import java.util.Map;

import static io.apicurio.registry.operator.resource.app.AppDeploymentResource.addEnvVar;
import static io.apicurio.registry.operator.resource.app.AppDeploymentResource.getContainerFromDeployment;
import static io.apicurio.registry.operator.utils.Utils.isBlank;

public class KafkaSQLTLS {

public static final String ENV_KAFKASQL_SECURITY_PROTOCOL = "APICURIO_KAFKA_COMMON_SECURITY_PROTOCOL";

public static final String ENV_KAFKASQL_SSL_KEYSTORE_TYPE = "APICURIO_KAFKA_COMMON_SSL_KEYSTORE_TYPE";
public static final String ENV_KAFKASQL_SSL_KEYSTORE_LOCATION = "APICURIO_KAFKA_COMMON_SSL_KEYSTORE_LOCATION";
public static final String ENV_KAFKASQL_SSL_KEYSTORE_PASSWORD = "APICURIO_KAFKA_COMMON_SSL_KEYSTORE_PASSWORD";

public static final String ENV_KAFKASQL_SSL_TRUSTSTORE_TYPE = "APICURIO_KAFKA_COMMON_SSL_TRUSTSTORE_TYPE";
public static final String ENV_KAFKASQL_SSL_TRUSTSTORE_LOCATION = "APICURIO_KAFKA_COMMON_SSL_TRUSTSTORE_LOCATION";
public static final String ENV_KAFKASQL_SSL_TRUSTSTORE_PASSWORD = "APICURIO_KAFKA_COMMON_SSL_TRUSTSTORE_PASSWORD";

public static final String KEYSTORE_SECRET_VOLUME_NAME = "registry-kafkasql-tls-keystore";
public static final String TRUSTSTORE_SECRET_VOLUME_NAME = "registry-kafkasql-tls-truststore";

/**
* Plain KafkaSQL must be already configured.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment shows that we are not choosing the right level of encapsulation here.
Please, remove the new KafkaSQLTLS class and squash the logic in KafkaSQL.

*/
public static boolean configureKafkaSQLTLS(ApicurioRegistry3 primary, Deployment deployment,
String containerName, Map<String, EnvVar> env) {

if (primary.getSpec().getApp().getKafkasql() == null
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after squashing the logic this check goes away

|| isBlank(primary.getSpec().getApp().getKafkasql().getBootstrapServers())) {
throw new OperatorException("Plain KafkaSQL must be already configured.");
}

if (primary.getSpec().getApp().getKafkasql().getSecurity() == null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the following 3 if statements can be collapsed into one, something similar to:

var kafkasql = primary.getSpec().getApp().getKafkasql();
if (kafkasql.getSecurity() != null && kafkasql.getSecurity().getTls() != null && 
    !isBlank(kafkasql.getSecurity().getTls().getKeystoreSecretName()) && 
    !isBlank(kafkasql.getSecurity().getTls().getTruststoreSecretName())) {

assigning empty placeholder for null values to go to the next check is confusing.

primary.getSpec().getApp().getKafkasql().setSecurity(new ApicurioRegistry3SpecKafkaSqlSecurity());
}

if (primary.getSpec().getApp().getKafkasql().getSecurity().getTls() == null) {
primary.getSpec().getApp().getKafkasql().getSecurity()
.setTls(new ApicurioRegistry3SpecKafkaSqlTLS());
}

if (!isBlank(primary.getSpec().getApp().getKafkasql().getSecurity().getTls().getKeystoreSecretName())
&& !isBlank(primary.getSpec().getApp().getKafkasql().getSecurity().getTls()
.getTruststoreSecretName())) {

addEnvVar(env, ENV_KAFKASQL_SECURITY_PROTOCOL, "SSL");

// ===== Keystore

addEnvVar(env, ENV_KAFKASQL_SSL_KEYSTORE_TYPE, "PKCS12");
addEnvVar(env, ENV_KAFKASQL_SSL_KEYSTORE_LOCATION,
"/etc/" + KEYSTORE_SECRET_VOLUME_NAME + "/user.p12");
// spotless:off
// @formatter:off
addEnvVar(env, new EnvVarBuilder()
.withName(ENV_KAFKASQL_SSL_KEYSTORE_PASSWORD)
.withNewValueFrom()
.withNewSecretKeyRef()
.withName(primary.getSpec().getApp().getKafkasql().getSecurity().getTls().getKeystoreSecretName())
.withKey("user.password")
.endSecretKeyRef()
.endValueFrom()
.build()
);
// @formatter:on
// spotless:on

addSecretVolume(deployment,
primary.getSpec().getApp().getKafkasql().getSecurity().getTls().getKeystoreSecretName(),
KEYSTORE_SECRET_VOLUME_NAME);
addSecretVolumeMount(deployment, containerName, KEYSTORE_SECRET_VOLUME_NAME,
"etc/" + KEYSTORE_SECRET_VOLUME_NAME);

// ===== Truststore

addEnvVar(env, ENV_KAFKASQL_SSL_TRUSTSTORE_TYPE, "PKCS12");
addEnvVar(env, ENV_KAFKASQL_SSL_TRUSTSTORE_LOCATION,
"/etc/" + TRUSTSTORE_SECRET_VOLUME_NAME + "/ca.p12");
// spotless:off
// @formatter:off
addEnvVar(env, new EnvVarBuilder()
.withName(ENV_KAFKASQL_SSL_TRUSTSTORE_PASSWORD)
.withNewValueFrom()
.withNewSecretKeyRef()
.withName(primary.getSpec().getApp().getKafkasql().getSecurity().getTls().getTruststoreSecretName())
.withKey("ca.password")
.endSecretKeyRef()
.endValueFrom()
.build()
);
// @formatter:on
// spotless:on

addSecretVolume(deployment,
primary.getSpec().getApp().getKafkasql().getSecurity().getTls().getTruststoreSecretName(),
TRUSTSTORE_SECRET_VOLUME_NAME);
addSecretVolumeMount(deployment, containerName, TRUSTSTORE_SECRET_VOLUME_NAME,
"etc/" + TRUSTSTORE_SECRET_VOLUME_NAME);

return true;
}
return false;
}

public static void addSecretVolume(Deployment deployment, String secretName, String volumeName) {
// spotless:off
// @formatter:off
deployment.getSpec().getTemplate().getSpec().getVolumes().add(
new VolumeBuilder()
.withName(volumeName)
.withNewSecret()
.withSecretName(secretName)
.endSecret()
.build()
);
// @formatter:on
// spotless:on
}

public static void addSecretVolumeMount(Deployment deployment, String containerName, String volumeName,
String mountPath) {
var c = getContainerFromDeployment(deployment, containerName);
// spotless:off
// @formatter:off
c.getVolumeMounts().add(
new VolumeMountBuilder()
.withName(volumeName)
.withReadOnly(true)
.withMountPath(mountPath)
.build()
);
// @formatter:on
// spotless:on
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import io.apicurio.registry.operator.OperatorException;
import io.apicurio.registry.operator.api.v1.ApicurioRegistry3;
import io.apicurio.registry.operator.feat.KafkaSql;
import io.apicurio.registry.operator.feat.KafkaSQL;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the reason for this rename?
if we rename KafkaSql to KafkaSQL, it would be best to rename also PostgresSql for consistency.
Not mandatory, but I encourage you to keep this change in separate PR.

import io.apicurio.registry.operator.feat.PostgresSql;
import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.EnvVar;
Expand Down Expand Up @@ -62,7 +62,7 @@ protected Deployment desired(ApicurioRegistry3 primary, Context<ApicurioRegistry
// spotless:on

if (!PostgresSql.configureDatasource(primary, envVars)) {
KafkaSql.configureKafkaSQL(primary, envVars);
KafkaSQL.configureKafkaSQL(primary, d, envVars);
}

var container = getContainerFromDeployment(d, APP_CONTAINER_NAME);
Expand All @@ -78,6 +78,10 @@ public static void addEnvVar(Map<String, EnvVar> map, EnvVar envVar) {
}
}

public static void addEnvVar(Map<String, EnvVar> map, String name, String value) {
addEnvVar(map, new EnvVarBuilder().withName(name).withValue(value).build());
}

/**
* Get container with a given name from the given Deployment.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,20 @@ public static void beforeAll() throws Exception {

@Test
void testKafkaSQLPlain() {
client.load(getClass().getResourceAsStream("/k8s/examples/kafkasql/plain/ephemeral.kafka.yaml"))
.create();
final var clusterNAme = "my-cluster";
client.load(getClass().getResourceAsStream("/k8s/examples/kafkasql/plain/example-cluster.kafka.yaml"))
.createOrReplace();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the test should start on an empty namespace, this change will mask issues.
If you happen to be in the situation where this is needed, we should probably first merge #5364 and debug it.

final var clusterName = "example-cluster";

await().ignoreExceptions().untilAsserted(() ->
// Strimzi uses StrimziPodSet instead of ReplicaSet, so we have to check pods
assertThat(client.pods().inNamespace(namespace).withName(clusterNAme + "-kafka-0").get().getStatus()
assertThat(client.pods().inNamespace(namespace).withName(clusterName + "-kafka-0").get().getStatus()
.getConditions()).filteredOn(c -> "Ready".equals(c.getType())).map(PodCondition::getStatus)
.containsOnly("True"));

// We're guessing the value here to avoid using Strimzi Java model, and relying on retries below.
var bootstrapServers = clusterNAme + "-kafka-bootstrap." + namespace + ".svc:9092";
var bootstrapServers = clusterName + "-kafka-bootstrap." + namespace + ".svc:9092";

var registry = deserialize("k8s/examples/kafkasql/plain/kafka-plain.apicurioregistry3.yaml",
var registry = deserialize("k8s/examples/kafkasql/plain/kafkasql-plain.apicurioregistry3.yaml",
ApicurioRegistry3.class);
registry.getMetadata().setNamespace(namespace);
registry.getSpec().getApp().getKafkasql().setBootstrapServers(bootstrapServers);
Expand All @@ -63,12 +63,11 @@ void testKafkaSQLPlain() {
.findFirst().get();
assertThat(client.pods().inNamespace(namespace).withName(podName).getLog())
.contains("Using Kafka-SQL artifactStore");

return true;
});
}

private static void applyStrimziResources() throws IOException {
static void applyStrimziResources() throws IOException {
try (BufferedInputStream in = new BufferedInputStream(
new URL("https://strimzi.io/install/latest").openStream())) {
List<HasMetadata> resources = Serialization.unmarshal(in);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package io.apicurio.registry.operator.it;

import io.apicurio.registry.operator.api.v1.ApicurioRegistry3;
import io.fabric8.kubernetes.api.model.PodCondition;
import io.quarkus.test.junit.QuarkusTest;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static io.apicurio.registry.operator.it.KafkaSQLITTest.applyStrimziResources;
import static io.apicurio.registry.operator.resource.ResourceFactory.deserialize;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

@QuarkusTest
public class KafkaSQLTLSITTest extends ITBase {

private static final Logger log = LoggerFactory.getLogger(KafkaSQLTLSITTest.class);

@BeforeAll
public static void beforeAll() throws Exception {
applyStrimziResources();
}

@Test
void testKafkaSQLTLS() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would keep this test in the KafkaSQLITTest, no need to have separate classes with a single test inside.

client.load(getClass().getResourceAsStream("/k8s/examples/kafkasql/tls/example-cluster.kafka.yaml"))
.create();
final var clusterName = "example-cluster";

await().ignoreExceptions().untilAsserted(() ->
// Strimzi uses StrimziPodSet instead of ReplicaSet, so we have to check pods
assertThat(client.pods().inNamespace(namespace).withName(clusterName + "-kafka-0").get().getStatus()
.getConditions()).filteredOn(c -> "Ready".equals(c.getType())).map(PodCondition::getStatus)
.containsOnly("True"));

client.load(getClass().getResourceAsStream("/k8s/examples/kafkasql/tls/apicurio.kafkauser.yaml"))
.inNamespace(namespace).create();

final var userName = "apicurio";

await().untilAsserted(
() -> assertThat(client.secrets().inNamespace(namespace).withName(userName).get())
.isNotNull());

// We're guessing the value here to avoid using Strimzi Java model, and relying on retries below.
var bootstrapServers = clusterName + "-kafka-bootstrap." + namespace + ".svc:9093";

var registry = deserialize("k8s/examples/kafkasql/tls/kafkasql-tls.apicurioregistry3.yaml",
ApicurioRegistry3.class);
registry.getMetadata().setNamespace(namespace);
registry.getSpec().getApp().getKafkasql().setBootstrapServers(bootstrapServers);

client.resource(registry).create();

await().ignoreExceptions().until(() -> {
assertThat(client.apps().deployments().inNamespace(namespace)
.withName(registry.getMetadata().getName() + "-app-deployment").get().getStatus()
.getReadyReplicas().intValue()).isEqualTo(1);
var podName = client.pods().inNamespace(namespace).list().getItems().stream()
.map(pod -> pod.getMetadata().getName())
.filter(podN -> podN.startsWith(registry.getMetadata().getName() + "-app-deployment"))
.findFirst().get();
assertThat(client.pods().inNamespace(namespace).withName(podName).getLog())
.contains("Using Kafka-SQL artifactStore");
return true;
});
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: example-cluster
spec:
kafka:
version: 3.8.0
replicas: 1
listeners:
- name: plain
type: internal
port: 9092
tls: false
config:
inter.broker.protocol.version: "3.8"
offsets.topic.replication.factor: 1
storage:
type: ephemeral
zookeeper:
replicas: 1
storage:
type: ephemeral
entityOperator:
topicOperator: {}
userOperator: {}
Loading
Loading