diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/BaseEndToEndTest.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/BaseEndToEndTest.java index cdcf74995..382f1c8e0 100644 --- a/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/BaseEndToEndTest.java +++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/BaseEndToEndTest.java @@ -19,6 +19,8 @@ import ai.langstream.deployer.k8s.api.crds.agents.AgentCustomResource; import ai.langstream.deployer.k8s.api.crds.apps.ApplicationCustomResource; import ai.langstream.impl.parser.ModelBuilder; +import ai.langstream.tests.util.codestorage.LocalMinioCodeStorageProvider; +import ai.langstream.tests.util.codestorage.RemoteCodeStorageProvider; import ai.langstream.tests.util.k8s.LocalK3sContainer; import ai.langstream.tests.util.k8s.RunningHostCluster; import ai.langstream.tests.util.kafka.LocalRedPandaClusterProvider; @@ -90,6 +92,8 @@ public class BaseEndToEndTest implements TestWatcher { private static final String LANGSTREAM_K8S = System.getProperty("langstream.tests.k8s", "host"); private static final String LANGSTREAM_STREAMING = System.getProperty("langstream.tests.streaming", "local-redpanda"); + private static final String LANGSTREAM_CODESTORAGE = + System.getProperty("langstream.tests.codestorage", "local-minio"); public static final File TEST_LOGS_DIR = new File("target", "e2e-test-logs"); protected static final String TENANT_NAMESPACE_PREFIX = "ls-tenant-"; @@ -98,6 +102,8 @@ public class BaseEndToEndTest implements TestWatcher { protected static KubeCluster kubeCluster; protected static StreamingClusterProvider streamingClusterProvider; protected static File instanceFile; + protected static CodeStorageProvider codeStorageProvider; + protected static CodeStorageProvider.CodeStorageConfig codeStorageConfig; protected static KubernetesClient client; protected static String namespace; @@ -136,7 +142,7 @@ protected static void deleteManifest(String manifest, String namespace) { .delete(); } - protected static void applyManifestNoNamespace(String manifest) { + public static void applyManifestNoNamespace(String manifest) { client.load(new ByteArrayInputStream(manifest.getBytes(StandardCharsets.UTF_8))) .serverSideApply(); } @@ -347,6 +353,10 @@ public static void destroy() { streamingClusterProvider.stop(); streamingClusterProvider = null; } + if (codeStorageProvider != null) { + codeStorageProvider.stop(); + codeStorageProvider = null; + } if (client != null) { client.close(); client = null; @@ -377,6 +387,10 @@ public static void setup() { streamingClusterProvider = getStreamingClusterProvider(); } + if (codeStorageProvider == null) { + codeStorageProvider = getCodeStorageProvider(); + } + try { final Path tempFile = Files.createTempFile("ls-test-kube", ".yaml"); @@ -388,8 +402,8 @@ public static void setup() { final CompletableFuture streamingClusterFuture = CompletableFuture.supplyAsync(() -> streamingClusterProvider.start()); - final CompletableFuture minioFuture = - CompletableFuture.runAsync(BaseEndToEndTest::installMinio); + final CompletableFuture minioFuture = + CompletableFuture.supplyAsync(() -> codeStorageProvider.start()); List> imagesFutures = new ArrayList<>(); imagesFutures.add( @@ -414,7 +428,6 @@ public static void setup() { "langstream/langstream-api-gateway:latest-dev"))); CompletableFuture.allOf( - minioFuture, imagesFutures.get(0), imagesFutures.get(1), imagesFutures.get(2), @@ -435,6 +448,8 @@ public static void setup() { instanceFile = Files.createTempFile("ls-test", ".yaml").toFile(); YAML_MAPPER.writeValue(instanceFile, instanceContent); + codeStorageConfig = minioFuture.join(); + } catch (Throwable ee) { dumpTest("BeforeAll"); throw ee; @@ -453,6 +468,18 @@ private static StreamingClusterProvider getStreamingClusterProvider() { } } + private static CodeStorageProvider getCodeStorageProvider() { + switch (LANGSTREAM_CODESTORAGE) { + case "local-minio": + return new LocalMinioCodeStorageProvider(); + case "remote": + return new RemoteCodeStorageProvider(); + default: + throw new IllegalArgumentException( + "Unknown LANGSTREAM_CODESTORAGE: " + LANGSTREAM_CODESTORAGE); + } + } + private static KubeCluster getKubeCluster() { switch (LANGSTREAM_K8S) { case "k3s": @@ -618,11 +645,8 @@ private static void installLangStream(boolean authentication) { create: false namespacePrefix: %s codeStorage: - type: s3 - configuration: - endpoint: http://minio.minio-dev.svc.cluster.local:9000 - access-key: minioadmin - secret-key: minioadmin + type: %s + configuration: %s """ .formatted( LANGSTREAM_TAG, @@ -636,7 +660,11 @@ private static void installLangStream(boolean authentication) { imagePullPolicy, baseImageRepository, imagePullPolicy, - TENANT_NAMESPACE_PREFIX); + TENANT_NAMESPACE_PREFIX, + codeStorageConfig.type(), + JSON_MAPPER.writeValueAsString(codeStorageConfig.configuration())); + + log.info("Applying values: {}", values); final Path tempFile = Files.createTempFile("langstream-test", ".yaml"); Files.writeString(tempFile, values); @@ -683,81 +711,6 @@ private static void awaitApiGatewayReady() { log.info("api gateway ready"); } - static void installMinio() { - applyManifestNoNamespace( - """ - # Deploys a new Namespace for the MinIO Pod - apiVersion: v1 - kind: Namespace - metadata: - name: minio-dev # Change this value if you want a different namespace name - labels: - name: minio-dev # Change this value to match metadata.name - --- - # Deploys a new MinIO Pod into the metadata.namespace Kubernetes namespace - # - # The `spec.containers[0].args` contains the command run on the pod - # The `/data` directory corresponds to the `spec.containers[0].volumeMounts[0].mountPath` - # That mount path corresponds to a Kubernetes HostPath which binds `/data` to a local drive or volume on the worker node where the pod runs - #\s - apiVersion: v1 - kind: Pod - metadata: - labels: - app: minio - name: minio - namespace: minio-dev # Change this value to match the namespace metadata.name - spec: - containers: - - name: minio - image: quay.io/minio/minio:latest - command: - - /bin/bash - - -c - args:\s - - minio server /data --console-address :9090 - volumeMounts: - - mountPath: /data - name: localvolume # Corresponds to the `spec.volumes` Persistent Volume - ports: - - containerPort: 9090 - protocol: TCP - name: console - - containerPort: 9000 - protocol: TCP - name: s3 - resources: - requests: - cpu: 50m - memory: 512Mi - volumes: - - name: localvolume - hostPath: # MinIO generally recommends using locally-attached volumes - path: /mnt/disk1/data # Specify a path to a local drive or volume on the Kubernetes worker node - type: DirectoryOrCreate # The path to the last directory must exist - --- - apiVersion: v1 - kind: Service - metadata: - labels: - app: minio - name: minio - namespace: minio-dev # Change this value to match the namespace metadata.name - spec: - ports: - - port: 9090 - protocol: TCP - targetPort: 9090 - name: console - - port: 9000 - protocol: TCP - targetPort: 9000 - name: s3 - selector: - app: minio - """); - } - protected static void withPodLogs( String podName, String namespace, diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/CodeStorageProvider.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/CodeStorageProvider.java new file mode 100644 index 000000000..ddf53f1a1 --- /dev/null +++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/CodeStorageProvider.java @@ -0,0 +1,29 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.tests.util; + +import java.util.Map; + +public interface CodeStorageProvider { + + record CodeStorageConfig(String type, Map configuration) {} + + CodeStorageConfig start(); + + void cleanup(); + + void stop(); +} diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/codestorage/LocalMinioCodeStorageProvider.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/codestorage/LocalMinioCodeStorageProvider.java new file mode 100644 index 000000000..ddc039ac8 --- /dev/null +++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/codestorage/LocalMinioCodeStorageProvider.java @@ -0,0 +1,112 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.tests.util.codestorage; + +import ai.langstream.tests.util.BaseEndToEndTest; +import ai.langstream.tests.util.CodeStorageProvider; +import java.util.Map; + +public class LocalMinioCodeStorageProvider implements CodeStorageProvider { + + @Override + public CodeStorageConfig start() { + BaseEndToEndTest.applyManifestNoNamespace( + """ + # Deploys a new Namespace for the MinIO Pod + apiVersion: v1 + kind: Namespace + metadata: + name: minio-dev # Change this value if you want a different namespace name + labels: + name: minio-dev # Change this value to match metadata.name + --- + # Deploys a new MinIO Pod into the metadata.namespace Kubernetes namespace + # + # The `spec.containers[0].args` contains the command run on the pod + # The `/data` directory corresponds to the `spec.containers[0].volumeMounts[0].mountPath` + # That mount path corresponds to a Kubernetes HostPath which binds `/data` to a local drive or volume on the worker node where the pod runs + #\s + apiVersion: v1 + kind: Pod + metadata: + labels: + app: minio + name: minio + namespace: minio-dev # Change this value to match the namespace metadata.name + spec: + containers: + - name: minio + image: quay.io/minio/minio:latest + command: + - /bin/bash + - -c + args:\s + - minio server /data --console-address :9090 + volumeMounts: + - mountPath: /data + name: localvolume # Corresponds to the `spec.volumes` Persistent Volume + ports: + - containerPort: 9090 + protocol: TCP + name: console + - containerPort: 9000 + protocol: TCP + name: s3 + resources: + requests: + cpu: 50m + memory: 512Mi + volumes: + - name: localvolume + hostPath: # MinIO generally recommends using locally-attached volumes + path: /mnt/disk1/data # Specify a path to a local drive or volume on the Kubernetes worker node + type: DirectoryOrCreate # The path to the last directory must exist + --- + apiVersion: v1 + kind: Service + metadata: + labels: + app: minio + name: minio + namespace: minio-dev # Change this value to match the namespace metadata.name + spec: + ports: + - port: 9090 + protocol: TCP + targetPort: 9090 + name: console + - port: 9000 + protocol: TCP + targetPort: 9000 + name: s3 + selector: + app: minio + """); + + return new CodeStorageConfig( + "s3", + Map.of( + "endpoint", "http://minio.minio-dev.svc.cluster.local:9000", + "access-key", "minioadmin", + "secret-key", "minioadmin")); + } + + @Override + public void cleanup() {} + + @Override + public void stop() {} +} diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/codestorage/RemoteCodeStorageProvider.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/codestorage/RemoteCodeStorageProvider.java new file mode 100644 index 000000000..abf046607 --- /dev/null +++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/codestorage/RemoteCodeStorageProvider.java @@ -0,0 +1,77 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.tests.util.codestorage; + +import ai.langstream.tests.util.CodeStorageProvider; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class RemoteCodeStorageProvider implements CodeStorageProvider { + + private static final String SYS_PROPERTIES_PREFIX = "langstream.tests.codestorageremote.props."; + private static final String ENV_PREFIX = "LANGSTREAM_TESTS_CODESTORAGEREMOTE_PROPS_"; + private static final String TYPE; + private static final Map CONFIG; + + static { + CONFIG = new HashMap<>(); + + final Map envs = System.getenv(); + for (Map.Entry env : envs.entrySet()) { + if (env.getKey().startsWith(ENV_PREFIX)) { + final String key = + env.getKey().substring(ENV_PREFIX.length()).toLowerCase().replace("_", "."); + final String value = env.getValue(); + log.info("Loading remote codestorage config from env: {}={}", key, value); + CONFIG.put(key, value); + } + } + + final Set props = System.getProperties().keySet(); + for (Object prop : props) { + String asString = prop.toString(); + if (asString.startsWith(SYS_PROPERTIES_PREFIX)) { + final String key = asString.substring(SYS_PROPERTIES_PREFIX.length()); + final String value = System.getProperty(asString); + log.info("Loading remote codestorage config from sys: {}={}", key, value); + CONFIG.put(key, value); + } + } + String type = System.getProperty("langstream.tests.codestorageremote.type"); + if (type == null) { + type = System.getenv("LANGSTREAM_TESTS_CODESTORAGEREMOTE_TYPE"); + } + TYPE = type; + } + + @Override + public CodeStorageConfig start() { + if (TYPE == null) { + throw new IllegalArgumentException( + "langstream.tests.codestorageremote.type must be set"); + } + return new CodeStorageConfig(TYPE, CONFIG); + } + + @Override + public void cleanup() {} + + @Override + public void stop() {} +} diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/kafka/LocalRedPandaClusterProvider.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/kafka/LocalRedPandaClusterProvider.java index fd4c6b05e..9beedb805 100644 --- a/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/kafka/LocalRedPandaClusterProvider.java +++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/kafka/LocalRedPandaClusterProvider.java @@ -81,7 +81,7 @@ private void internalStart() throws InterruptedException, IOException { // ref https://github.com/redpanda-data/helm-charts/blob/main/charts/redpanda/values.yaml log.info("running helm command to install redpanda"); BaseEndToEndTest.runProcess( - ("helm --debug upgrade --install redpanda redpanda/redpanda --namespace kafka-ns --set resources.cpu.cores=0.3" + ("helm upgrade --install redpanda redpanda/redpanda --namespace kafka-ns --set resources.cpu.cores=0.3" + " --set resources.memory.container.max=1512Mi --set statefulset.replicas=1 --set console" + ".enabled=false --set tls.enabled=false --set external.domain=redpanda-external.kafka-ns.svc" + ".cluster.local --set statefulset.initContainers.setDataDirOwnership.enabled=true --set tuning.tune_aio_events=false --wait --timeout=5m")