From bfc9cccad17a224e14b4005fd424324924224516 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Tue, 14 Nov 2023 13:30:12 +0100 Subject: [PATCH] fix --- .../applications/DeleteApplicationCmd.java | 10 +- .../commands/applications/AppsCmdTest.java | 16 +- .../ai/langstream/tests/AppLifecycleIT.java | 8 +- .../tests/util/BaseEndToEndTest.java | 5 +- .../k8s/controllers/apps/AppController.java | 21 ++- .../k8s/controllers/AppControllerIT.java | 173 ++++++++++++++---- .../k8s/controllers/OperatorExtension.java | 1 + .../application/ApplicationResourceTest.java | 3 +- 8 files changed, 182 insertions(+), 55 deletions(-) diff --git a/langstream-cli/src/main/java/ai/langstream/cli/commands/applications/DeleteApplicationCmd.java b/langstream-cli/src/main/java/ai/langstream/cli/commands/applications/DeleteApplicationCmd.java index d3924ba6d..8c1e8235e 100644 --- a/langstream-cli/src/main/java/ai/langstream/cli/commands/applications/DeleteApplicationCmd.java +++ b/langstream-cli/src/main/java/ai/langstream/cli/commands/applications/DeleteApplicationCmd.java @@ -37,15 +37,9 @@ public class DeleteApplicationCmd extends BaseApplicationCmd { public void run() { getClient().applications().delete(applicationId, force); if (force) { - log( - String.format( - "Application deletion request accepted (forced) for application %s", - applicationId)); + log(String.format("Application '%s' marked for deletion (forced)", applicationId)); } else { - log( - String.format( - "Application deletion request accepted for application %s", - applicationId)); + log(String.format("Application '%s' marked for deletion", applicationId)); } } } diff --git a/langstream-cli/src/test/java/ai/langstream/cli/commands/applications/AppsCmdTest.java b/langstream-cli/src/test/java/ai/langstream/cli/commands/applications/AppsCmdTest.java index 29c80e3c8..e5365457c 100644 --- a/langstream-cli/src/test/java/ai/langstream/cli/commands/applications/AppsCmdTest.java +++ b/langstream-cli/src/test/java/ai/langstream/cli/commands/applications/AppsCmdTest.java @@ -352,13 +352,25 @@ public void testGet() throws Exception { @Test public void testDelete() { wireMock.register( - WireMock.delete(String.format("/api/applications/%s/my-app", TENANT)) + WireMock.delete(String.format("/api/applications/%s/my-app?force=false", TENANT)) .willReturn(WireMock.ok())); CommandResult result = executeCommand("apps", "delete", "my-app"); Assertions.assertEquals(0, result.exitCode()); Assertions.assertEquals("", result.err()); - Assertions.assertEquals("Application my-app deleted", result.out()); + Assertions.assertEquals("Application 'my-app' marked for deletion", result.out()); + } + + @Test + public void testForceDelete() { + wireMock.register( + WireMock.delete(String.format("/api/applications/%s/my-app?force=true", TENANT)) + .willReturn(WireMock.ok())); + + CommandResult result = executeCommand("apps", "delete", "my-app", "-f"); + Assertions.assertEquals(0, result.exitCode()); + Assertions.assertEquals("", result.err()); + Assertions.assertEquals("Application 'my-app' marked for deletion (forced)", result.out()); } @Test diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/AppLifecycleIT.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/AppLifecycleIT.java index 31db6d798..e3a6187f7 100644 --- a/langstream-e2e-tests/src/test/java/ai/langstream/tests/AppLifecycleIT.java +++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/AppLifecycleIT.java @@ -42,12 +42,16 @@ public void testDeleteBrokenApplication() throws Exception { "instance", Map.of( "streamingCluster", - Map.of("kafka", Map.of("bootstrapServers", "wrong:9092")), + Map.of( + "type", + "kafka", + "configuration", + Map.of("bootstrapServers", "wrong:9092")), "computeCluster", Map.of("type", "kubernetes"))); final File instanceFile = Files.createTempFile("ls-test", ".yaml").toFile(); - YAML_MAPPER.writeValue(BaseEndToEndTest.instanceFile, instanceContent); + YAML_MAPPER.writeValue(instanceFile, instanceContent); deployLocalApplication( tenant, false, applicationId, "python-processor", instanceFile, Map.of()); diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/BaseEndToEndTest.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/BaseEndToEndTest.java index 34e2ef302..103374058 100644 --- a/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/BaseEndToEndTest.java +++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/BaseEndToEndTest.java @@ -1092,9 +1092,10 @@ protected static boolean isApplicationInStatus(String applicationId, String expe return true; } log.info( - "application {} is not in expected status {}, dumping status", + "application {} is not in expected status {} but is in {}, dumping:", applicationId, - expectedStatus); + expectedStatus, + status); executeCommandOnClient( "bin/langstream apps get %s -o yaml".formatted(applicationId).split(" ")); return false; diff --git a/langstream-k8s-deployer/langstream-k8s-deployer-operator/src/main/java/ai/langstream/deployer/k8s/controllers/apps/AppController.java b/langstream-k8s-deployer/langstream-k8s-deployer-operator/src/main/java/ai/langstream/deployer/k8s/controllers/apps/AppController.java index bf7acb30a..eb2c50bf0 100644 --- a/langstream-k8s-deployer/langstream-k8s-deployer-operator/src/main/java/ai/langstream/deployer/k8s/controllers/apps/AppController.java +++ b/langstream-k8s-deployer/langstream-k8s-deployer-operator/src/main/java/ai/langstream/deployer/k8s/controllers/apps/AppController.java @@ -110,12 +110,12 @@ protected PatchResult patchResources( final HandleJobResult setupJobResult = handleJob(resource, appLastApplied, true, false); if (setupJobResult.proceed()) { log.infof( - "setup job for %s is completed, checking deployer", + "[deploy] setup job for %s is completed, checking deployer", resource.getMetadata().getName()); final HandleJobResult deployerJobResult = handleJob(resource, appLastApplied, false, false); log.infof( - "setup job for %s is %s", + "[deploy] setup job for %s is %s", resource.getMetadata().getName(), deployerJobResult.proceed() ? "completed" : "not completed"); @@ -123,7 +123,8 @@ protected PatchResult patchResources( } else { log.infof( - "setup job for %s is not completed yet", resource.getMetadata().getName()); + "[deploy] setup job for %s is not completed yet", + resource.getMetadata().getName()); rescheduleDuration = setupJobResult.reschedule(); } } @@ -146,6 +147,9 @@ protected DeleteControl cleanupResources( } if (rescheduleDuration == null) { + log.infof( + "cleanup complete for app %s is completed, removing from limiter", + resource.getMetadata().getName()); appResourcesLimiter.onAppBeingDeleted(resource); return DeleteControl.defaultDelete(); } else { @@ -160,12 +164,12 @@ private Duration cleanupApplication( Duration rescheduleDuration; if (deployerJobResult.proceed()) { log.infof( - "deployer cleanup job for %s is completed, checking setup cleanup", + "[cleanup] deployer cleanup job for %s is completed, checking setup cleanup", resource.getMetadata().getName()); final HandleJobResult setupJobResult = handleJob(resource, appLastApplied, true, true); log.infof( - "setup cleanup job for %s is %s", + "[cleanup] setup cleanup job for %s is %s", resource.getMetadata().getName(), setupJobResult.proceed() ? "completed" : "not completed"); if (setupJobResult.proceed()) { @@ -179,7 +183,7 @@ private Duration cleanupApplication( } } else { log.infof( - "deployer cleanup job for %s is not completed yet", + "[cleanup] deployer cleanup job for %s is not completed yet", resource.getMetadata().getName()); rescheduleDuration = deployerJobResult.reschedule(); } @@ -309,17 +313,22 @@ private void createJob( setupJob ? AppResourcesFactory.generateSetupJob(params) : AppResourcesFactory.generateDeployerJob(params); + log.debugf( + "Applying job %s in namespace %s", + job.getMetadata().getName(), job.getMetadata().getNamespace()); KubeUtil.patchJob(client, job); } private static boolean areSpecChanged( ApplicationCustomResource cr, AppLastApplied appLastApplied, boolean checkSetup) { if (appLastApplied == null) { + log.infof("Spec changed for %s, no status found", cr.getMetadata().getName()); return true; } final String lastAppliedAsString = checkSetup ? appLastApplied.getSetup() : appLastApplied.getRuntimeDeployer(); if (lastAppliedAsString == null) { + log.infof("Spec changed for %s, no status found", cr.getMetadata().getName()); return true; } final JSONComparator.Result diff = diff --git a/langstream-k8s-deployer/langstream-k8s-deployer-operator/src/test/java/ai/langstream/deployer/k8s/controllers/AppControllerIT.java b/langstream-k8s-deployer/langstream-k8s-deployer-operator/src/test/java/ai/langstream/deployer/k8s/controllers/AppControllerIT.java index 0a88c3631..b82f58b33 100644 --- a/langstream-k8s-deployer/langstream-k8s-deployer-operator/src/test/java/ai/langstream/deployer/k8s/controllers/AppControllerIT.java +++ b/langstream-k8s-deployer/langstream-k8s-deployer-operator/src/test/java/ai/langstream/deployer/k8s/controllers/AppControllerIT.java @@ -17,17 +17,22 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import ai.langstream.api.model.ApplicationLifecycleStatus; +import ai.langstream.deployer.k8s.CRDConstants; import ai.langstream.deployer.k8s.api.crds.apps.ApplicationCustomResource; import ai.langstream.deployer.k8s.api.crds.apps.ApplicationStatus; import ai.langstream.deployer.k8s.apps.AppResourcesFactory; +import ai.langstream.deployer.k8s.controllers.apps.AppController; import ai.langstream.deployer.k8s.util.SerializationUtil; import io.fabric8.kubernetes.api.model.Container; import io.fabric8.kubernetes.api.model.ContainerBuilder; import io.fabric8.kubernetes.api.model.NamespaceBuilder; import io.fabric8.kubernetes.api.model.PodSpec; import io.fabric8.kubernetes.api.model.Quantity; +import io.fabric8.kubernetes.api.model.SecretBuilder; +import io.fabric8.kubernetes.api.model.ServiceAccountBuilder; import io.fabric8.kubernetes.api.model.batch.v1.Job; import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder; import io.fabric8.kubernetes.api.model.batch.v1.JobSpec; @@ -35,6 +40,7 @@ import java.time.Duration; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -50,7 +56,7 @@ public class AppControllerIT { Map.of( "DEPLOYER_AGENT_RESOURCES", "{defaultMaxTotalResourceUnitsPerTenant: 3}", - "DEPLOYER_RUNTIME_IMAGE", "busybox", + "DEPLOYER_RUNTIME_IMAGE", "bash", "DEPLOYER_RUNTIME_IMAGE_PULL_POLICY", "IfNotPresent")); static AtomicInteger counter = new AtomicInteger(0); @@ -74,7 +80,7 @@ void testAppController() { name: %s namespace: %s spec: - image: busybox + image: bash imagePullPolicy: IfNotPresent application: '{"modules": {}}' tenant: %s @@ -116,27 +122,7 @@ void testAppController() { // simulate job finished client.resource(job).inNamespace(namespace).delete(); - final Job mockJob = - new JobBuilder() - .withNewMetadata() - .withName(job.getMetadata().getName()) - .endMetadata() - .withNewSpec() - .withNewTemplate() - .withNewSpec() - .withContainers( - List.of( - new ContainerBuilder() - .withName("test") - .withImage("busybox") - .withCommand(List.of("sleep", "1")) - .build())) - .withRestartPolicy("Never") - .endSpec() - .endTemplate() - .endSpec() - .build(); - client.resource(mockJob).inNamespace(namespace).create(); + createMockJob(namespace, client, job.getMetadata().getName()); Awaitility.await() .atMost(Duration.ofSeconds(30)) @@ -199,6 +185,30 @@ void testAppController() { assertNotNull(client.resource(resource).inNamespace(namespace).get()); } + private void createMockJob(String namespace, KubernetesClient client, String name) { + final Job mockJob = + new JobBuilder() + .withNewMetadata() + .withName(name) + .endMetadata() + .withNewSpec() + .withNewTemplate() + .withNewSpec() + .withContainers( + List.of( + new ContainerBuilder() + .withName("test") + .withImage("bash") + .withCommand(List.of("sleep", "1")) + .build())) + .withRestartPolicy("Never") + .endSpec() + .endTemplate() + .endSpec() + .build(); + client.resource(mockJob).inNamespace(namespace).create(); + } + @Test void testAppResources() { @@ -213,17 +223,73 @@ void testAppResources() { final ApplicationCustomResource app3 = createAppWithResources(tenant, 2, 1); awaitApplicationErrorForResources(app3); - deployment.getClient().resource(app2).delete(); + simulateAppDeletion(app2); awaitApplicationDeployingStatus(app3); final ApplicationCustomResource app4 = createAppWithResources(tenant, 2, 1); awaitApplicationErrorForResources(app4); deployment.restartDeployerOperator(); - deployment.getClient().resource(app3).delete(); + simulateAppDeletion(app3); awaitApplicationDeployingStatus(app4); } + private void simulateAppDeletion(ApplicationCustomResource app) { + final String namespace = app.getMetadata().getNamespace(); + final String deployerJobName = + AppResourcesFactory.getDeployerJobName(app.getMetadata().getName(), true); + final String setupJobName = + AppResourcesFactory.getSetupJobName(app.getMetadata().getName(), true); + createMockJob(namespace, deployment.getClient(), setupJobName); + + createMockJob(namespace, deployment.getClient(), deployerJobName); + + awaitJobCompleted(namespace, deployerJobName); + awaitJobCompleted(namespace, setupJobName); + final ApplicationStatus status = new ApplicationStatus(); + final AppController.AppLastApplied appLastApplied = new AppController.AppLastApplied(); + appLastApplied.setSetup(SerializationUtil.writeAsJson(app.getSpec())); + appLastApplied.setRuntimeDeployer(SerializationUtil.writeAsJson(app.getSpec())); + status.setLastApplied(SerializationUtil.writeAsJson(appLastApplied)); + final ApplicationCustomResource resource = + deployment + .getClient() + .resources(ApplicationCustomResource.class) + .inNamespace(app.getMetadata().getNamespace()) + .withName(app.getMetadata().getName()) + .get(); + resource.setStatus(status); + deployment.getClient().resource(resource).patchStatus(); + + deployment + .getClient() + .resources(ApplicationCustomResource.class) + .inNamespace(app.getMetadata().getNamespace()) + .withName(app.getMetadata().getName()) + .delete(); + } + + private void awaitJobCompleted(String namespace, String deployerJobName) { + Awaitility.await() + .atMost(2, TimeUnit.MINUTES) + .pollInterval(5, TimeUnit.SECONDS) + .untilAsserted( + () -> { + final Integer succeeded = + deployment + .getClient() + .batch() + .v1() + .jobs() + .inNamespace(namespace) + .withName(deployerJobName) + .get() + .getStatus() + .getSucceeded(); + assertTrue(succeeded != null && succeeded > 0); + }); + } + private void awaitApplicationErrorForResources(ApplicationCustomResource original) { org.awaitility.Awaitility.await() .untilAsserted( @@ -267,7 +333,7 @@ private void checkSetupJob(Job job) { final JobSpec spec = job.getSpec(); final PodSpec templateSpec = spec.getTemplate().getSpec(); final Container container = templateSpec.getContainers().get(0); - assertEquals("busybox", container.getImage()); + assertEquals("bash", container.getImage()); assertEquals("IfNotPresent", container.getImagePullPolicy()); assertEquals("setup", container.getName()); assertEquals(Quantity.parse("100m"), container.getResources().getRequests().get("cpu")); @@ -282,7 +348,7 @@ private void checkSetupJob(Job job) { assertEquals("deploy", container.getArgs().get(args++)); final Container initContainer = templateSpec.getInitContainers().get(0); - assertEquals("busybox", initContainer.getImage()); + assertEquals("bash", initContainer.getImage()); assertEquals("IfNotPresent", initContainer.getImagePullPolicy()); assertEquals("setup-init-config", initContainer.getName()); assertEquals("/app-config", initContainer.getVolumeMounts().get(0).getMountPath()); @@ -302,7 +368,7 @@ private void checkDeployerJob(Job job, boolean cleanup) { final JobSpec spec = job.getSpec(); final PodSpec templateSpec = spec.getTemplate().getSpec(); final Container container = templateSpec.getContainers().get(0); - assertEquals("busybox", container.getImage()); + assertEquals("bash", container.getImage()); assertEquals("IfNotPresent", container.getImagePullPolicy()); assertEquals("deployer", container.getName()); assertEquals(Quantity.parse("100m"), container.getResources().getRequests().get("cpu")); @@ -356,7 +422,7 @@ private void checkDeployerJob(Job job, boolean cleanup) { .getValue()); final Container initContainer = templateSpec.getInitContainers().get(0); - assertEquals("busybox", initContainer.getImage()); + assertEquals("bash", initContainer.getImage()); assertEquals("IfNotPresent", initContainer.getImagePullPolicy()); assertEquals("deployer-init-config", initContainer.getName()); assertEquals("/app-config", initContainer.getVolumeMounts().get(0).getMountPath()); @@ -378,15 +444,51 @@ private ApplicationCustomResource getCr(String yaml) { } private void setupTenant(String tenant) { + final String namespace = "langstream-" + tenant; deployment .getClient() .resource( new NamespaceBuilder() .withNewMetadata() - .withName("langstream-" + tenant) + .withName(namespace) .endMetadata() .build()) .create(); + deployment + .getClient() + .resource( + new ServiceAccountBuilder() + .withNewMetadata() + .withName( + CRDConstants.computeRuntimeServiceAccountForTenant(tenant)) + .endMetadata() + .build()) + .inNamespace(namespace) + .serverSideApply(); + + deployment + .getClient() + .resource( + new ServiceAccountBuilder() + .withNewMetadata() + .withName( + CRDConstants.computeDeployerServiceAccountForTenant(tenant)) + .endMetadata() + .build()) + .inNamespace(namespace) + .serverSideApply(); + + deployment + .getClient() + .resource( + new SecretBuilder() + .withNewMetadata() + .withName(CRDConstants.TENANT_CLUSTER_CONFIG_SECRET) + .endMetadata() + .withData(Map.of(CRDConstants.TENANT_CLUSTER_CONFIG_SECRET_KEY, "")) + .build()) + .inNamespace(namespace) + .serverSideApply(); } private ApplicationCustomResource createAppWithResources( @@ -404,11 +506,14 @@ private ApplicationCustomResource createAppWithResources( tenant: %s """ .formatted(appId, size, parallelism, tenant)); - return deployment + final String namespace = "langstream-" + tenant; + deployment .getClient() - .resource(resource) - .inNamespace("langstream-" + tenant) - .create(); + .resource( + new SecretBuilder().withNewMetadata().withName(appId).endMetadata().build()) + .inNamespace(namespace) + .serverSideApply(); + return deployment.getClient().resource(resource).inNamespace(namespace).create(); } private String genAppId() { diff --git a/langstream-k8s-deployer/langstream-k8s-deployer-operator/src/test/java/ai/langstream/deployer/k8s/controllers/OperatorExtension.java b/langstream-k8s-deployer/langstream-k8s-deployer-operator/src/test/java/ai/langstream/deployer/k8s/controllers/OperatorExtension.java index 44b1fc370..20c3b168e 100644 --- a/langstream-k8s-deployer/langstream-k8s-deployer-operator/src/test/java/ai/langstream/deployer/k8s/controllers/OperatorExtension.java +++ b/langstream-k8s-deployer/langstream-k8s-deployer-operator/src/test/java/ai/langstream/deployer/k8s/controllers/OperatorExtension.java @@ -115,6 +115,7 @@ private void startDeployerOperator() throws IOException { container.withEnv( "QUARKUS_LOG_CATEGORY__IO_JAVAOPERATORSDK_OPERATOR_PROCESSING_EVENT_SOURCE_CONTROLLER", "debug"); + container.withEnv("QUARKUS_LOG_CATEGORY__AI_LANGSTREAM_DEPLOYER_K8S_APPS", "debug"); env.forEach(container::withEnv); container.withExposedPorts(8080); container.withAccessToHost(true); diff --git a/langstream-webservice/src/test/java/ai/langstream/webservice/application/ApplicationResourceTest.java b/langstream-webservice/src/test/java/ai/langstream/webservice/application/ApplicationResourceTest.java index c56d012d3..e2ec04f8c 100644 --- a/langstream-webservice/src/test/java/ai/langstream/webservice/application/ApplicationResourceTest.java +++ b/langstream-webservice/src/test/java/ai/langstream/webservice/application/ApplicationResourceTest.java @@ -152,7 +152,8 @@ void testAppCrud() throws Exception { mockMvc.perform(delete("/api/applications/my-tenant/test")).andExpect(status().isOk()); - mockMvc.perform(get("/api/applications/my-tenant/test")).andExpect(status().isNotFound()); + // the delete is actually a crd update + mockMvc.perform(get("/api/applications/my-tenant/test")).andExpect(status().isOk()); } @Test