diff --git a/langstream-admin-client/src/main/java/ai/langstream/admin/client/AdminClient.java b/langstream-admin-client/src/main/java/ai/langstream/admin/client/AdminClient.java index 5132bacfa..3e70e55cc 100644 --- a/langstream-admin-client/src/main/java/ai/langstream/admin/client/AdminClient.java +++ b/langstream-admin-client/src/main/java/ai/langstream/admin/client/AdminClient.java @@ -243,14 +243,22 @@ public String get(String archetype) { private class ApplicationsImpl implements Applications { @Override public String deploy(String application, MultiPartBodyPublisher multiPartBodyPublisher) { - return deploy(application, multiPartBodyPublisher, false); + return deploy(application, multiPartBodyPublisher, false, false); } @Override @SneakyThrows public String deploy( - String application, MultiPartBodyPublisher multiPartBodyPublisher, boolean dryRun) { - final String path = tenantAppPath("/" + application) + "?dry-run=" + dryRun; + String application, + MultiPartBodyPublisher multiPartBodyPublisher, + boolean dryRun, + boolean autoUpgrade) { + final String path = + tenantAppPath("/" + application) + + "?dry-run=" + + dryRun + + "&auto-upgrade=" + + autoUpgrade; final String contentType = String.format( "multipart/form-data; boundary=%s", @@ -280,8 +288,17 @@ public String deployFromArchetype( @Override @SneakyThrows - public void update(String application, MultiPartBodyPublisher multiPartBodyPublisher) { - final String path = tenantAppPath("/" + application); + public void update( + String application, + MultiPartBodyPublisher multiPartBodyPublisher, + boolean autoUpgrade, + boolean forceRestart) { + final String path = + tenantAppPath("/" + application) + + "?auto-upgrade=" + + autoUpgrade + + "&force-restart=" + + forceRestart; final String contentType = String.format( "multipart/form-data; boundary=%s", diff --git a/langstream-admin-client/src/main/java/ai/langstream/admin/client/model/Applications.java b/langstream-admin-client/src/main/java/ai/langstream/admin/client/model/Applications.java index 1abf3698b..63bf115af 100644 --- a/langstream-admin-client/src/main/java/ai/langstream/admin/client/model/Applications.java +++ b/langstream-admin-client/src/main/java/ai/langstream/admin/client/model/Applications.java @@ -25,9 +25,16 @@ public interface Applications { String deploy(String application, MultiPartBodyPublisher multiPartBodyPublisher); String deploy( - String application, MultiPartBodyPublisher multiPartBodyPublisher, boolean dryRun); + String application, + MultiPartBodyPublisher multiPartBodyPublisher, + boolean dryRun, + boolean autoUpgrade); - void update(String application, MultiPartBodyPublisher multiPartBodyPublisher); + void update( + String application, + MultiPartBodyPublisher multiPartBodyPublisher, + boolean autoUpgrade, + boolean forceRestart); void delete(String application, boolean force); diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/LRUTopicProducerCache.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/LRUTopicProducerCache.java index f88b31f33..1fb4e0b61 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/LRUTopicProducerCache.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/LRUTopicProducerCache.java @@ -135,4 +135,9 @@ public TopicProducer getOrCreate( throw new RuntimeException(ex); } } + + @Override + public void close() { + cache.invalidateAll(); + } } diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/TopicProducerCache.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/TopicProducerCache.java index 1b9f3ede6..0447e0eec 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/TopicProducerCache.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/TopicProducerCache.java @@ -18,7 +18,7 @@ import ai.langstream.api.runner.topics.TopicProducer; import java.util.function.Supplier; -public interface TopicProducerCache { +public interface TopicProducerCache extends AutoCloseable { record Key( String tenant, String application, @@ -27,4 +27,7 @@ record Key( String configString) {} TopicProducer getOrCreate(Key key, Supplier topicProducerSupplier); + + @Override + void close(); } diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/TopicProducerCacheFactory.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/TopicProducerCacheFactory.java index 9ef1430da..063bd8488 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/TopicProducerCacheFactory.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/TopicProducerCacheFactory.java @@ -15,10 +15,12 @@ */ package ai.langstream.apigateway.gateways; +import ai.langstream.api.runner.topics.TopicProducer; import ai.langstream.apigateway.MetricsNames; import ai.langstream.apigateway.config.TopicProperties; import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.binder.cache.GuavaCacheMetrics; +import java.util.function.Supplier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -34,7 +36,16 @@ public TopicProducerCache topicProducerCache(TopicProperties topicProperties) { Metrics.globalRegistry, cache.getCache(), MetricsNames.TOPIC_PRODUCER_CACHE); return cache; } else { - return (key, topicProducerSupplier) -> topicProducerSupplier.get(); + return new TopicProducerCache() { + @Override + public TopicProducer getOrCreate( + Key key, Supplier topicProducerSupplier) { + return topicProducerSupplier.get(); + } + + @Override + public void close() {} + }; } } } diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/GatewayResource.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/GatewayResource.java index 8d82dada3..ce8ee65c9 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/GatewayResource.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/GatewayResource.java @@ -270,13 +270,7 @@ private CompletableFuture handleServiceWithTopics( topicConnectionsRuntimeRegistryProvider .getTopicConnectionsRuntimeRegistry(), clusterRuntimeRegistry); - completableFuture.thenRunAsync( - () -> { - if (consumeGateway != null) { - consumeGateway.close(); - } - }, - consumeThreadPool); + completableFuture.thenRunAsync(consumeGateway::close, consumeThreadPool); final Gateway.ServiceOptions serviceOptions = authContext.gateway().getServiceOptions(); try { @@ -297,7 +291,7 @@ record -> { final AtomicBoolean stop = new AtomicBoolean(false); consumeGateway.startReadingAsync( consumeThreadPool, - () -> stop.get(), + stop::get, record -> { stop.set(true); completableFuture.complete(ResponseEntity.ok(record)); diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/WebSocketConfig.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/WebSocketConfig.java index 29afd6aa7..2fdbd12a3 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/WebSocketConfig.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/WebSocketConfig.java @@ -97,7 +97,9 @@ public ServletServerContainerFactoryBean createWebSocketContainer() { @PreDestroy public void onDestroy() { - consumeThreadPool.shutdown(); + log.info("Shutting down WebSocket"); + consumeThreadPool.shutdownNow(); clusterRuntimeRegistry.close(); + topicProducerCache.close(); } } diff --git a/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java index 7d1e83344..a347c7187 100644 --- a/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java +++ b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java @@ -30,6 +30,7 @@ import ai.langstream.api.runner.topics.TopicConsumer; import ai.langstream.api.runner.topics.TopicProducer; import ai.langstream.api.runtime.ClusterRuntimeRegistry; +import ai.langstream.api.runtime.DeployContext; import ai.langstream.api.runtime.PluginsRegistry; import ai.langstream.api.storage.ApplicationStore; import ai.langstream.apigateway.api.ConsumePushMessage; @@ -569,8 +570,7 @@ void testService() throws Exception { url, "{\"key\": \"my-key2\", \"value\": \"my-value\", \"headers\": {\"header1\":\"value1\"}}")); - // sorry but kafka can't keep up - final int numParallel = getStreamingCluster().type().equals("kafka") ? 5 : 30; + final int numParallel = 10; List> futures1 = new ArrayList<>(); for (int i = 0; i < numParallel; i++) { @@ -588,7 +588,7 @@ void testService() throws Exception { futures1.add(future); } CompletableFuture.allOf(futures1.toArray(new CompletableFuture[] {})) - .get(2, TimeUnit.MINUTES); + .get(3, TimeUnit.MINUTES); } private void startTopicExchange(String logicalFromTopic, String logicalToTopic) @@ -678,6 +678,7 @@ private void prepareTopicsForTest(String... topic) throws Exception { .pluginsRegistry(new PluginsRegistry()) .registry(new ClusterRuntimeRegistry()) .topicConnectionsRuntimeRegistry(topicConnectionsRuntimeRegistry) + .deployContext(DeployContext.NO_DEPLOY_CONTEXT) .build(); final StreamingCluster streamingCluster = getStreamingCluster(); topicConnectionsRuntimeRegistry diff --git a/langstream-api-gateway/src/test/java/ai/langstream/apigateway/websocket/handlers/ProduceConsumeHandlerTest.java b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/websocket/handlers/ProduceConsumeHandlerTest.java index bcccad9da..1a8693ab7 100644 --- a/langstream-api-gateway/src/test/java/ai/langstream/apigateway/websocket/handlers/ProduceConsumeHandlerTest.java +++ b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/websocket/handlers/ProduceConsumeHandlerTest.java @@ -35,6 +35,7 @@ import ai.langstream.api.model.StreamingCluster; import ai.langstream.api.runner.topics.TopicConnectionsRuntimeRegistry; import ai.langstream.api.runtime.ClusterRuntimeRegistry; +import ai.langstream.api.runtime.DeployContext; import ai.langstream.api.runtime.PluginsRegistry; import ai.langstream.api.storage.ApplicationStore; import ai.langstream.apigateway.api.ConsumePushMessage; @@ -267,6 +268,7 @@ private void prepareTopicsForTest(String... topic) throws Exception { .pluginsRegistry(new PluginsRegistry()) .registry(new ClusterRuntimeRegistry()) .topicConnectionsRuntimeRegistry(topicConnectionsRuntimeRegistry) + .deployContext(DeployContext.NO_DEPLOY_CONTEXT) .build(); final StreamingCluster streamingCluster = getStreamingCluster(); topicConnectionsRuntimeRegistry diff --git a/langstream-api/src/main/java/ai/langstream/api/runtime/DeployContext.java b/langstream-api/src/main/java/ai/langstream/api/runtime/DeployContext.java index b97deff39..81e57ba48 100644 --- a/langstream-api/src/main/java/ai/langstream/api/runtime/DeployContext.java +++ b/langstream-api/src/main/java/ai/langstream/api/runtime/DeployContext.java @@ -15,20 +15,62 @@ */ package ai.langstream.api.runtime; -import ai.langstream.api.runner.assets.AssetManagerRegistry; import ai.langstream.api.webservice.application.ApplicationCodeInfo; public interface DeployContext extends AutoCloseable { - default ApplicationCodeInfo getApplicationCodeInfo( - String tenant, String applicationId, String codeArchiveId) { - throw new UnsupportedOperationException(); - } + DeployContext NO_DEPLOY_CONTEXT = new NoOpDeployContext(); + + class NoOpDeployContext implements DeployContext { + + @Override + public ApplicationCodeInfo getApplicationCodeInfo( + String tenant, String applicationId, String codeArchiveId) { + return null; + } + + @Override + public boolean isAutoUpgradeRuntimeImage() { + return false; + } + + @Override + public boolean isAutoUpgradeRuntimeImagePullPolicy() { + return false; + } + + @Override + public boolean isAutoUpgradeAgentResources() { + return false; + } + + @Override + public boolean isAutoUpgradeAgentPodTemplate() { + return false; + } - default AssetManagerRegistry getAssetManagerRegistry() { - throw new UnsupportedOperationException(); + @Override + public long getApplicationSeed() { + return -1L; + } + + @Override + public void close() {} } + ApplicationCodeInfo getApplicationCodeInfo( + String tenant, String applicationId, String codeArchiveId); + + boolean isAutoUpgradeRuntimeImage(); + + boolean isAutoUpgradeRuntimeImagePullPolicy(); + + boolean isAutoUpgradeAgentResources(); + + boolean isAutoUpgradeAgentPodTemplate(); + + long getApplicationSeed(); + @Override default void close() {} } diff --git a/langstream-api/src/main/java/ai/langstream/api/storage/ApplicationStore.java b/langstream-api/src/main/java/ai/langstream/api/storage/ApplicationStore.java index 287f0bd70..bea0f6804 100644 --- a/langstream-api/src/main/java/ai/langstream/api/storage/ApplicationStore.java +++ b/langstream-api/src/main/java/ai/langstream/api/storage/ApplicationStore.java @@ -41,7 +41,9 @@ void put( String applicationId, Application applicationInstance, String codeArchiveReference, - ExecutionPlan executionPlan); + ExecutionPlan executionPlan, + boolean autoUpgrade, + boolean forceRestart); StoredApplication get(String tenant, String applicationId, boolean queryPods); diff --git a/langstream-cli/src/main/java/ai/langstream/cli/commands/applications/AbstractDeployApplicationCmd.java b/langstream-cli/src/main/java/ai/langstream/cli/commands/applications/AbstractDeployApplicationCmd.java index f176f1b3c..d526aebf5 100644 --- a/langstream-cli/src/main/java/ai/langstream/cli/commands/applications/AbstractDeployApplicationCmd.java +++ b/langstream-cli/src/main/java/ai/langstream/cli/commands/applications/AbstractDeployApplicationCmd.java @@ -64,6 +64,12 @@ public static class DeployApplicationCmd extends AbstractDeployApplicationCmd { "Output format for dry-run mode. Formats are: yaml, json. Default value is yaml.") private Formats format = Formats.yaml; + @CommandLine.Option( + names = {"--auto-upgrade"}, + description = + "Whether to make the executors to automatically upgrades the environment (image, resources mapping etc.) when restarted") + private boolean autoUpgrade; + @Override String applicationId() { return name; @@ -94,6 +100,16 @@ boolean isDryRun() { return dryRun; } + @Override + boolean isAutoUpgrade() { + return autoUpgrade; + } + + @Override + boolean isForceRestart() { + return false; + } + @Override Formats format() { ensureFormatIn(format, Formats.json, Formats.yaml); @@ -122,6 +138,17 @@ public static class UpdateApplicationCmd extends AbstractDeployApplicationCmd { description = "Secrets file path") private String secretFilePath; + @CommandLine.Option( + names = {"--auto-upgrade"}, + description = + "Whether to make the executors to automatically upgrades the environment (image, resources mapping etc.) when restarted") + private boolean autoUpgrade; + + @CommandLine.Option( + names = {"--force-restart"}, + description = "Whether to make force restart all the executors of the application") + private boolean forceRestart; + @Override String applicationId() { return name; @@ -152,6 +179,16 @@ boolean isDryRun() { return false; } + @Override + boolean isAutoUpgrade() { + return autoUpgrade; + } + + @Override + boolean isForceRestart() { + return forceRestart; + } + @Override Formats format() { return null; @@ -170,6 +207,10 @@ Formats format() { abstract boolean isDryRun(); + abstract boolean isAutoUpgrade(); + + abstract boolean isForceRestart(); + abstract Formats format(); @Override @@ -229,7 +270,9 @@ public void run() { if (isUpdate()) { log(String.format("updating application: %s (%d KB)", applicationId, size / 1024)); - getClient().applications().update(applicationId, bodyPublisher); + getClient() + .applications() + .update(applicationId, bodyPublisher, isAutoUpgrade(), isForceRestart()); log(String.format("application %s updated", applicationId)); } else { final boolean dryRun = isDryRun(); @@ -242,7 +285,9 @@ public void run() { log(String.format("deploying application: %s (%d KB)", applicationId, size / 1024)); } final String response = - getClient().applications().deploy(applicationId, bodyPublisher, dryRun); + getClient() + .applications() + .deploy(applicationId, bodyPublisher, dryRun, isAutoUpgrade()); if (dryRun) { final Formats format = format(); print(format == Formats.raw ? Formats.yaml : format, response, null, null); diff --git a/langstream-cli/src/test/java/ai/langstream/cli/commands/applications/AppsCmdTest.java b/langstream-cli/src/test/java/ai/langstream/cli/commands/applications/AppsCmdTest.java index e5365457c..db008475f 100644 --- a/langstream-cli/src/test/java/ai/langstream/cli/commands/applications/AppsCmdTest.java +++ b/langstream-cli/src/test/java/ai/langstream/cli/commands/applications/AppsCmdTest.java @@ -48,7 +48,10 @@ public void testDeploy() throws Exception { final Path zipFile = buildZip(langstream.toFile(), System.out::println); wireMock.register( - WireMock.post(String.format("/api/applications/%s/my-app?dry-run=false", TENANT)) + WireMock.post( + String.format( + "/api/applications/%s/my-app?dry-run=false&auto-upgrade=false", + TENANT)) .withMultipartRequestBody( aMultipart("app") .withBody(binaryEqualTo(Files.readAllBytes(zipFile)))) @@ -107,7 +110,10 @@ public void testDeployWithDependencies() throws Exception { final Path zipFile = buildZip(langstream.toFile(), System.out::println); wireMock.register( - WireMock.post(String.format("/api/applications/%s/my-app?dry-run=false", TENANT)) + WireMock.post( + String.format( + "/api/applications/%s/my-app?dry-run=false&auto-upgrade=false", + TENANT)) .withMultipartRequestBody( aMultipart("app") .withBody(binaryEqualTo(Files.readAllBytes(zipFile)))) @@ -141,7 +147,11 @@ public void testUpdateAll() throws Exception { final Path zipFile = buildZip(langstream.toFile(), System.out::println); wireMock.register( - WireMock.patch(urlEqualTo(String.format("/api/applications/%s/my-app", TENANT))) + WireMock.patch( + urlEqualTo( + String.format( + "/api/applications/%s/my-app?auto-upgrade=false&force-restart=false", + TENANT))) .withMultipartRequestBody( aMultipart("app") .withBody(binaryEqualTo(Files.readAllBytes(zipFile)))) @@ -176,7 +186,10 @@ public void testDeployDryRun() throws Exception { final Path zipFile = buildZip(langstream.toFile(), System.out::println); wireMock.register( - WireMock.post(String.format("/api/applications/%s/my-app?dry-run=true", TENANT)) + WireMock.post( + String.format( + "/api/applications/%s/my-app?dry-run=true&auto-upgrade=false", + TENANT)) .withMultipartRequestBody( aMultipart("app") .withBody(binaryEqualTo(Files.readAllBytes(zipFile)))) @@ -221,12 +234,55 @@ public void testDeployDryRun() throws Exception { Assertions.assertTrue(result.out().contains("{\n" + " \"name\" : \"my-app\"\n" + "}")); } + @Test + public void testDeployAutoUpgrade() throws Exception { + Path langstream = Files.createTempDirectory("langstream"); + final String app = createTempFile("module: module-1", langstream); + final String instance = createTempFile("instance: {}"); + final String secrets = createTempFile("secrets: []"); + + final Path zipFile = buildZip(langstream.toFile(), System.out::println); + + wireMock.register( + WireMock.post( + String.format( + "/api/applications/%s/my-app?dry-run=false&auto-upgrade=true", + TENANT)) + .withMultipartRequestBody( + aMultipart("app") + .withBody(binaryEqualTo(Files.readAllBytes(zipFile)))) + .withMultipartRequestBody( + aMultipart("instance").withBody(equalTo("instance: {}"))) + .withMultipartRequestBody( + aMultipart("secrets").withBody(equalTo("secrets: []"))) + .willReturn(WireMock.ok("{ \"name\": \"my-app\" }"))); + + CommandResult result = + executeCommand( + "apps", + "deploy", + "my-app", + "-s", + secrets, + "-app", + langstream.toAbsolutePath().toString(), + "-i", + instance, + "--auto-upgrade"); + Assertions.assertEquals(0, result.exitCode()); + Assertions.assertEquals("", result.err()); + } + @Test public void testUpdateInstance() throws Exception { final String instance = createTempFile("instance: {}"); wireMock.register( - WireMock.patch(urlEqualTo(String.format("/api/applications/%s/my-app", TENANT))) + WireMock.patch( + urlEqualTo( + String.format( + "/api/applications/%s/my-app?auto-upgrade=false&force-restart=false", + TENANT))) .withMultipartRequestBody( aMultipart("instance").withBody(equalTo("instance: {}"))) .willReturn(WireMock.ok("{ \"name\": \"my-app\" }"))); @@ -244,7 +300,11 @@ public void testUpdateAppAndInstance() throws Exception { final Path zipFile = buildZip(langstream.toFile(), System.out::println); wireMock.register( - WireMock.patch(urlEqualTo(String.format("/api/applications/%s/my-app", TENANT))) + WireMock.patch( + urlEqualTo( + String.format( + "/api/applications/%s/my-app?auto-upgrade=false&force-restart=false", + TENANT))) .withMultipartRequestBody( aMultipart("app") .withBody(binaryEqualTo(Files.readAllBytes(zipFile)))) @@ -272,7 +332,11 @@ public void testUpdateApp() throws Exception { final Path zipFile = buildZip(langstream.toFile(), System.out::println); wireMock.register( - WireMock.patch(urlEqualTo(String.format("/api/applications/%s/my-app", TENANT))) + WireMock.patch( + urlEqualTo( + String.format( + "/api/applications/%s/my-app?auto-upgrade=false&force-restart=false", + TENANT))) .withMultipartRequestBody( aMultipart("app") .withBody(binaryEqualTo(Files.readAllBytes(zipFile)))) @@ -285,11 +349,73 @@ public void testUpdateApp() throws Exception { Assertions.assertEquals("", result.err()); } + @Test + public void testUpdateAppWithAutoUpgrade() throws Exception { + Path langstream = Files.createTempDirectory("langstream"); + final String app = createTempFile("module: module-1", langstream); + + final Path zipFile = buildZip(langstream.toFile(), System.out::println); + wireMock.register( + WireMock.patch( + urlEqualTo( + String.format( + "/api/applications/%s/my-app?auto-upgrade=true&force-restart=false", + TENANT))) + .withMultipartRequestBody( + aMultipart("app") + .withBody(binaryEqualTo(Files.readAllBytes(zipFile)))) + .willReturn(WireMock.ok("{ \"name\": \"my-app\" }"))); + + CommandResult result = + executeCommand( + "apps", + "update", + "my-app", + "-app", + langstream.toFile().getAbsolutePath(), + "--auto-upgrade"); + Assertions.assertEquals(0, result.exitCode()); + Assertions.assertEquals("", result.err()); + } + + @Test + public void testUpdateAppWithForceRestart() throws Exception { + Path langstream = Files.createTempDirectory("langstream"); + final String app = createTempFile("module: module-1", langstream); + + final Path zipFile = buildZip(langstream.toFile(), System.out::println); + wireMock.register( + WireMock.patch( + urlEqualTo( + String.format( + "/api/applications/%s/my-app?auto-upgrade=false&force-restart=true", + TENANT))) + .withMultipartRequestBody( + aMultipart("app") + .withBody(binaryEqualTo(Files.readAllBytes(zipFile)))) + .willReturn(WireMock.ok("{ \"name\": \"my-app\" }"))); + + CommandResult result = + executeCommand( + "apps", + "update", + "my-app", + "-app", + langstream.toFile().getAbsolutePath(), + "--force-restart"); + Assertions.assertEquals(0, result.exitCode()); + Assertions.assertEquals("", result.err()); + } + @Test public void testUpdateSecrets() throws Exception { final String secrets = createTempFile("secrets: []"); wireMock.register( - WireMock.patch(urlEqualTo(String.format("/api/applications/%s/my-app", TENANT))) + WireMock.patch( + urlEqualTo( + String.format( + "/api/applications/%s/my-app?auto-upgrade=false&force-restart=false", + TENANT))) .withMultipartRequestBody( aMultipart("secrets").withBody(equalTo("secrets: []"))) .willReturn(WireMock.ok("{ \"name\": \"my-app\" }"))); @@ -483,7 +609,10 @@ public void testDeployWithFilePlaceholders() throws Exception { final Path zipFile = buildZip(langstream.toFile(), System.out::println); wireMock.register( - WireMock.post(String.format("/api/applications/%s/my-app?dry-run=false", TENANT)) + WireMock.post( + String.format( + "/api/applications/%s/my-app?dry-run=false&auto-upgrade=false", + TENANT)) .withMultipartRequestBody( aMultipart("app") .withBody(binaryEqualTo(Files.readAllBytes(zipFile)))) diff --git a/langstream-core/src/main/java/ai/langstream/impl/deploy/ApplicationDeployer.java b/langstream-core/src/main/java/ai/langstream/impl/deploy/ApplicationDeployer.java index fae271768..c437821cd 100644 --- a/langstream-core/src/main/java/ai/langstream/impl/deploy/ApplicationDeployer.java +++ b/langstream-core/src/main/java/ai/langstream/impl/deploy/ApplicationDeployer.java @@ -31,6 +31,7 @@ import ai.langstream.api.runtime.StreamingClusterRuntime; import ai.langstream.impl.common.ApplicationPlaceholderResolver; import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.Objects; import lombok.Builder; import lombok.Getter; import lombok.SneakyThrows; @@ -44,7 +45,7 @@ public final class ApplicationDeployer implements AutoCloseable { private ClusterRuntimeRegistry registry; private PluginsRegistry pluginsRegistry; - private DeployContext deployContext; + @Builder.Default private DeployContext deployContext = DeployContext.NO_DEPLOY_CONTEXT; @Getter private TopicConnectionsRuntimeRegistry topicConnectionsRuntimeRegistry; private AssetManagerRegistry assetManagerRegistry; @@ -145,6 +146,7 @@ private void setupAsset(AssetDefinition asset, AssetManagerRegistry assetManager */ public Object deploy( String tenant, ExecutionPlan physicalApplicationInstance, String codeStorageArchiveId) { + Objects.requireNonNull(deployContext, "Deploy context is not set"); Application applicationInstance = physicalApplicationInstance.getApplication(); ComputeClusterRuntime clusterRuntime = registry.getClusterRuntime(applicationInstance.getInstance().computeCluster()); @@ -167,6 +169,7 @@ public Object deploy( * @param codeStorageArchiveId the code storage archive id */ public void delete(String tenant, ExecutionPlan executionPlan, String codeStorageArchiveId) { + Objects.requireNonNull(deployContext, "Deploy context is not set"); Application applicationInstance = executionPlan.getApplication(); ComputeClusterRuntime clusterRuntime = registry.getClusterRuntime(applicationInstance.getInstance().computeCluster()); diff --git a/langstream-core/src/test/java/ai/langstream/impl/deploy/ApplicationDeployerTest.java b/langstream-core/src/test/java/ai/langstream/impl/deploy/ApplicationDeployerTest.java index 940fe7636..e90c9136e 100644 --- a/langstream-core/src/test/java/ai/langstream/impl/deploy/ApplicationDeployerTest.java +++ b/langstream-core/src/test/java/ai/langstream/impl/deploy/ApplicationDeployerTest.java @@ -25,11 +25,7 @@ import ai.langstream.api.runner.topics.TopicConnectionsRuntime; import ai.langstream.api.runner.topics.TopicConnectionsRuntimeAndLoader; import ai.langstream.api.runner.topics.TopicConnectionsRuntimeRegistry; -import ai.langstream.api.runtime.ClusterRuntimeRegistry; -import ai.langstream.api.runtime.ComputeClusterRuntime; -import ai.langstream.api.runtime.ExecutionPlan; -import ai.langstream.api.runtime.PluginsRegistry; -import ai.langstream.api.runtime.StreamingClusterRuntime; +import ai.langstream.api.runtime.*; import ai.langstream.impl.noop.NoOpComputeClusterRuntimeProvider; import ai.langstream.impl.parser.ModelBuilder; import java.util.Map; @@ -82,6 +78,7 @@ void testDeploy() throws Exception { ApplicationDeployer.builder() .pluginsRegistry(new PluginsRegistry()) .registry(registry) + .deployContext(DeployContext.NO_DEPLOY_CONTEXT) .topicConnectionsRuntimeRegistry( new TopicConnectionsRuntimeRegistry() { @Override diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/AppLifecycleIT.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/AppLifecycleIT.java index e3a6187f7..e272306c1 100644 --- a/langstream-e2e-tests/src/test/java/ai/langstream/tests/AppLifecycleIT.java +++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/AppLifecycleIT.java @@ -63,4 +63,38 @@ public void testDeleteBrokenApplication() throws Exception { "bin/langstream apps delete -f %s".formatted(applicationId).split(" ")); awaitApplicationCleanup(tenant, applicationId); } + + @Test + public void testUpdateForceRestart() throws Exception { + installLangStreamCluster(true); + final String tenant = "ten-" + System.currentTimeMillis(); + setupTenant(tenant); + final String applicationId = "my-test-app"; + + final Map> instanceContent = + Map.of( + "instance", + Map.of( + "streamingCluster", + Map.of( + "type", + "kafka", + "configuration", + Map.of("bootstrapServers", "wrong:9092")), + "computeCluster", + Map.of("type", "kubernetes"))); + + final File instanceFile = Files.createTempFile("ls-test", ".yaml").toFile(); + YAML_MAPPER.writeValue(instanceFile, instanceContent); + + deployLocalApplication( + tenant, false, applicationId, "python-processor", instanceFile, Map.of()); + awaitApplicationInStatus(applicationId, "ERROR_DEPLOYING"); + executeCommandOnClient( + "bin/langstream apps delete %s ".formatted(applicationId).split(" ")); + awaitApplicationInStatus(applicationId, "ERROR_DELETING"); + executeCommandOnClient( + "bin/langstream apps delete -f %s".formatted(applicationId).split(" ")); + awaitApplicationCleanup(tenant, applicationId); + } } diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/PythonAgentsIT.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/PythonAgentsIT.java index 422906ddb..44a2f465a 100644 --- a/langstream-e2e-tests/src/test/java/ai/langstream/tests/PythonAgentsIT.java +++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/PythonAgentsIT.java @@ -76,6 +76,15 @@ public void testProcessor() { Map.of("SECRET1_VK", "super secret value - changed"), 1); + // test force-restart + updateLocalApplicationAndAwaitReady( + tenant, + applicationId, + "python-processor", + Map.of("SECRET1_VK", "super secret value - changed"), + 1, + true); + executeCommandOnClient( "bin/langstream gateway produce %s produce-input -v my-value --connect-timeout 30 -p sessionId=s2" .formatted(applicationId) diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/BaseEndToEndTest.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/BaseEndToEndTest.java index 1b299b1bd..b528633aa 100644 --- a/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/BaseEndToEndTest.java +++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/BaseEndToEndTest.java @@ -1110,18 +1110,29 @@ protected static void deployLocalApplicationAndAwaitReady( Map env, int expectedNumExecutors) { deployLocalApplicationAndAwaitReady( - tenant, false, applicationId, appDirName, env, expectedNumExecutors); + tenant, false, applicationId, appDirName, env, expectedNumExecutors, false); } - @SneakyThrows protected static void updateLocalApplicationAndAwaitReady( String tenant, String applicationId, String appDirName, Map env, int expectedNumExecutors) { + updateLocalApplicationAndAwaitReady( + tenant, applicationId, appDirName, env, expectedNumExecutors, false); + } + + @SneakyThrows + protected static void updateLocalApplicationAndAwaitReady( + String tenant, + String applicationId, + String appDirName, + Map env, + int expectedNumExecutors, + boolean forceRestart) { deployLocalApplicationAndAwaitReady( - tenant, true, applicationId, appDirName, env, expectedNumExecutors); + tenant, true, applicationId, appDirName, env, expectedNumExecutors, forceRestart); } @SneakyThrows @@ -1131,11 +1142,18 @@ private static void deployLocalApplicationAndAwaitReady( String applicationId, String appDirName, Map env, - int expectedNumExecutors) { + int expectedNumExecutors, + boolean forceRestart) { final String tenantNamespace = TENANT_NAMESPACE_PREFIX + tenant; final String podUids = deployLocalApplication( - tenant, isUpdate, applicationId, appDirName, instanceFile, env); + tenant, + isUpdate, + applicationId, + appDirName, + instanceFile, + env, + forceRestart); awaitApplicationReady(applicationId, expectedNumExecutors); Awaitility.await() @@ -1183,6 +1201,19 @@ protected static String deployLocalApplication( String appDirName, File instanceFile, Map env) { + return deployLocalApplication( + tenant, isUpdate, applicationId, appDirName, instanceFile, env, false); + } + + @SneakyThrows + protected static String deployLocalApplication( + String tenant, + boolean isUpdate, + String applicationId, + String appDirName, + File instanceFile, + Map env, + boolean forceRestart) { final String tenantNamespace = TENANT_NAMESPACE_PREFIX + tenant; String testAppsBaseDir = "src/test/resources/apps"; String testSecretBaseDir = "src/test/resources/secrets"; @@ -1232,9 +1263,10 @@ protected static String deployLocalApplication( } else { podUids = ""; } + final String forceRestartFlag = isUpdate && forceRestart ? "--force-restart" : ""; final String command = - "bin/langstream apps %s %s -app /tmp/app -i /tmp/instance.yaml -s /tmp/secrets.yaml" - .formatted(isUpdate ? "update" : "deploy", applicationId); + "bin/langstream apps %s %s %s -app /tmp/app -i /tmp/instance.yaml -s /tmp/secrets.yaml" + .formatted(isUpdate ? "update" : "deploy", applicationId, forceRestartFlag); String logs = executeCommandOnClient((beforeCmd + command).split(" ")); log.info("Logs after deploy: {}", logs); return podUids; diff --git a/langstream-k8s-deployer/langstream-k8s-deployer-api/src/main/java/ai/langstream/deployer/k8s/api/crds/agents/AgentSpec.java b/langstream-k8s-deployer/langstream-k8s-deployer-api/src/main/java/ai/langstream/deployer/k8s/api/crds/agents/AgentSpec.java index acd6ea65b..91ba049c6 100644 --- a/langstream-k8s-deployer/langstream-k8s-deployer-api/src/main/java/ai/langstream/deployer/k8s/api/crds/agents/AgentSpec.java +++ b/langstream-k8s-deployer/langstream-k8s-deployer-api/src/main/java/ai/langstream/deployer/k8s/api/crds/agents/AgentSpec.java @@ -36,7 +36,13 @@ public record Resources(int parallelism, int size) {} public record Disk(String agentId, long size, String type) {} - public record Options(List disks) {} + public record Options( + List disks, + boolean autoUpgradeRuntimeImage, + boolean autoUpgradeRuntimeImagePullPolicy, + boolean autoUpgradeAgentResources, + boolean autoUpgradeAgentPodTemplate, + long applicationSeed) {} private static final ObjectMapper MAPPER = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); @@ -53,7 +59,7 @@ public record Options(List disks) {} @JsonIgnore private Options parsedOptions; @SneakyThrows - private Options parseOptions() { + private synchronized Options parseOptions() { if (parsedOptions == null) { if (options != null) { parsedOptions = MAPPER.readValue(options, Options.class); @@ -75,4 +81,49 @@ public List getDisks() { } return options.disks(); } + + @JsonIgnore + public boolean isAutoUpgradeRuntimeImage() { + final Options options = parseOptions(); + if (options == null) { + return false; + } + return options.autoUpgradeRuntimeImage(); + } + + @JsonIgnore + public boolean isAutoUpgradeRuntimeImagePullPolicy() { + final Options options = parseOptions(); + if (options == null) { + return false; + } + return options.autoUpgradeRuntimeImagePullPolicy(); + } + + @JsonIgnore + public boolean isAutoUpgradeAgentPodTemplate() { + final Options options = parseOptions(); + if (options == null) { + return false; + } + return options.autoUpgradeAgentPodTemplate(); + } + + @JsonIgnore + public boolean isAutoUpgradeAgentResources() { + final Options options = parseOptions(); + if (options == null) { + return false; + } + return options.autoUpgradeAgentResources(); + } + + @JsonIgnore + public long getApplicationSeed() { + final Options options = parseOptions(); + if (options == null) { + return 0L; + } + return options.applicationSeed(); + } } diff --git a/langstream-k8s-deployer/langstream-k8s-deployer-api/src/main/java/ai/langstream/deployer/k8s/api/crds/apps/ApplicationSpecOptions.java b/langstream-k8s-deployer/langstream-k8s-deployer-api/src/main/java/ai/langstream/deployer/k8s/api/crds/apps/ApplicationSpecOptions.java index 5f61c7ed2..1fb46e8c2 100644 --- a/langstream-k8s-deployer/langstream-k8s-deployer-api/src/main/java/ai/langstream/deployer/k8s/api/crds/apps/ApplicationSpecOptions.java +++ b/langstream-k8s-deployer/langstream-k8s-deployer-api/src/main/java/ai/langstream/deployer/k8s/api/crds/apps/ApplicationSpecOptions.java @@ -29,4 +29,9 @@ public enum DeleteMode { private DeleteMode deleteMode = DeleteMode.CLEANUP_REQUIRED; private boolean markedForDeletion; + private long seed; + private boolean autoUpgradeRuntimeImage; + private boolean autoUpgradeRuntimeImagePullPolicy; + private boolean autoUpgradeAgentResources; + private boolean autoUpgradeAgentPodTemplate; } diff --git a/langstream-k8s-deployer/langstream-k8s-deployer-core/src/main/java/ai/langstream/deployer/k8s/agents/AgentResourcesFactory.java b/langstream-k8s-deployer/langstream-k8s-deployer-core/src/main/java/ai/langstream/deployer/k8s/agents/AgentResourcesFactory.java index 33487489e..611d7daf4 100644 --- a/langstream-k8s-deployer/langstream-k8s-deployer-core/src/main/java/ai/langstream/deployer/k8s/agents/AgentResourcesFactory.java +++ b/langstream-k8s-deployer/langstream-k8s-deployer-core/src/main/java/ai/langstream/deployer/k8s/agents/AgentResourcesFactory.java @@ -461,6 +461,7 @@ private static Probe createReadinessProbe( private static Map getPodAnnotations(AgentSpec spec, PodTemplate podTemplate) { final Map annotations = new HashMap<>(); annotations.put("ai.langstream/config-checksum", spec.getAgentConfigSecretRefChecksum()); + annotations.put("ai.langstream/application-seed", spec.getApplicationSeed() + ""); if (podTemplate != null && podTemplate.annotations() != null) { annotations.putAll(podTemplate.annotations()); } diff --git a/langstream-k8s-deployer/langstream-k8s-deployer-core/src/main/java/ai/langstream/deployer/k8s/apps/AppResourcesFactory.java b/langstream-k8s-deployer/langstream-k8s-deployer-core/src/main/java/ai/langstream/deployer/k8s/apps/AppResourcesFactory.java index ba12db6bd..841241f47 100644 --- a/langstream-k8s-deployer/langstream-k8s-deployer-core/src/main/java/ai/langstream/deployer/k8s/apps/AppResourcesFactory.java +++ b/langstream-k8s-deployer/langstream-k8s-deployer-core/src/main/java/ai/langstream/deployer/k8s/apps/AppResourcesFactory.java @@ -26,6 +26,7 @@ import ai.langstream.deployer.k8s.PodTemplate; import ai.langstream.deployer.k8s.api.crds.apps.ApplicationCustomResource; import ai.langstream.deployer.k8s.api.crds.apps.ApplicationSpec; +import ai.langstream.deployer.k8s.api.crds.apps.ApplicationSpecOptions; import ai.langstream.deployer.k8s.util.KubeUtil; import ai.langstream.deployer.k8s.util.SerializationUtil; import ai.langstream.runtime.api.application.ApplicationSetupConfiguration; @@ -84,6 +85,8 @@ public static Job generateDeployerJob(GenerateJobParams params) { final String applicationId = applicationCustomResource.getMetadata().getName(); final ApplicationSpec spec = applicationCustomResource.getSpec(); final String tenant = spec.getTenant(); + final ApplicationSpecOptions applicationSpecOptions = + ApplicationSpec.deserializeOptions(spec.getOptions()); final String clusterRuntimeConfigVolumeName = "cluster-runtime-config"; final String appConfigVolumeName = "app-config"; @@ -92,9 +95,23 @@ public static Job generateDeployerJob(GenerateJobParams params) { final String containerImagePullPolicy = resolveContainerImagePullPolicy(imagePullPolicy, spec); + RuntimeDeployerConfiguration.DeployFlags deployFlags = + new RuntimeDeployerConfiguration.DeployFlags(); + deployFlags.setAutoUpgradeRuntimeImage(applicationSpecOptions.isAutoUpgradeRuntimeImage()); + deployFlags.setAutoUpgradeRuntimeImagePullPolicy( + applicationSpecOptions.isAutoUpgradeRuntimeImagePullPolicy()); + deployFlags.setAutoUpgradeAgentResources( + applicationSpecOptions.isAutoUpgradeAgentResources()); + deployFlags.setAutoUpgradeAgentPodTemplate( + applicationSpecOptions.isAutoUpgradeAgentPodTemplate()); + deployFlags.setSeed(applicationSpecOptions.getSeed()); final RuntimeDeployerConfiguration config = new RuntimeDeployerConfiguration( - applicationId, tenant, spec.getApplication(), spec.getCodeArchiveId()); + applicationId, + tenant, + spec.getApplication(), + spec.getCodeArchiveId(), + deployFlags); Map initContainerConfigs = new LinkedHashMap<>(); initContainerConfigs.put(appConfigVolumeName, config); diff --git a/langstream-k8s-deployer/langstream-k8s-deployer-core/src/test/java/ai/langstream/deployer/k8s/agents/AgentResourcesFactoryTest.java b/langstream-k8s-deployer/langstream-k8s-deployer-core/src/test/java/ai/langstream/deployer/k8s/agents/AgentResourcesFactoryTest.java index 9c4d24775..1ee3456fc 100644 --- a/langstream-k8s-deployer/langstream-k8s-deployer-core/src/test/java/ai/langstream/deployer/k8s/agents/AgentResourcesFactoryTest.java +++ b/langstream-k8s-deployer/langstream-k8s-deployer-core/src/test/java/ai/langstream/deployer/k8s/agents/AgentResourcesFactoryTest.java @@ -53,6 +53,7 @@ void testStatefulsetAndService() { tenant: my-tenant applicationId: the-'app agentId: my-agent + options: '{"autoUpgradeRuntimeImage": true}' """); final StatefulSet statefulSet = AgentResourcesFactory.generateStatefulSet( @@ -91,6 +92,7 @@ void testStatefulsetAndService() { template: metadata: annotations: + ai.langstream/application-seed: 0 ai.langstream/config-checksum: xx labels: app: langstream-runtime diff --git a/langstream-k8s-deployer/langstream-k8s-deployer-core/src/test/java/ai/langstream/deployer/k8s/apps/AppResourcesFactoryTest.java b/langstream-k8s-deployer/langstream-k8s-deployer-core/src/test/java/ai/langstream/deployer/k8s/apps/AppResourcesFactoryTest.java index e04465590..9b1e752ee 100644 --- a/langstream-k8s-deployer/langstream-k8s-deployer-core/src/test/java/ai/langstream/deployer/k8s/apps/AppResourcesFactoryTest.java +++ b/langstream-k8s-deployer/langstream-k8s-deployer-core/src/test/java/ai/langstream/deployer/k8s/apps/AppResourcesFactoryTest.java @@ -16,6 +16,7 @@ package ai.langstream.deployer.k8s.apps; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import ai.langstream.deployer.k8s.PodTemplate; import ai.langstream.deployer.k8s.api.crds.apps.ApplicationCustomResource; @@ -114,7 +115,7 @@ void testDeployerJob() { name: cluster-config initContainers: - args: - - "echo '{\\"applicationId\\":\\"test-'\\"'\\"'app\\",\\"tenant\\":\\"my-tenant\\",\\"application\\":\\"{app: true}\\",\\"codeStorageArchiveId\\":\\"iiii\\"}' > /app-config/config && echo '{}' > /cluster-runtime-config/config" + - "echo '{\\"applicationId\\":\\"test-'\\"'\\"'app\\",\\"tenant\\":\\"my-tenant\\",\\"application\\":\\"{app: true}\\",\\"codeStorageArchiveId\\":\\"iiii\\",\\"deployFlags\\":{\\"autoUpgradeRuntimeImage\\":false,\\"autoUpgradeRuntimeImagePullPolicy\\":false,\\"autoUpgradeAgentResources\\":false,\\"autoUpgradeAgentPodTemplate\\":false,\\"seed\\":0}}' > /app-config/config && echo '{}' > /cluster-runtime-config/config" command: - bash - -c @@ -213,7 +214,7 @@ void testDeployerJob() { name: cluster-config initContainers: - args: - - "echo '{\\"applicationId\\":\\"test-'\\"'\\"'app\\",\\"tenant\\":\\"my-tenant\\",\\"application\\":\\"{app: true}\\",\\"codeStorageArchiveId\\":\\"iiii\\"}' > /app-config/config && echo '{}' > /cluster-runtime-config/config" + - "echo '{\\"applicationId\\":\\"test-'\\"'\\"'app\\",\\"tenant\\":\\"my-tenant\\",\\"application\\":\\"{app: true}\\",\\"codeStorageArchiveId\\":\\"iiii\\",\\"deployFlags\\":{\\"autoUpgradeRuntimeImage\\":false,\\"autoUpgradeRuntimeImagePullPolicy\\":false,\\"autoUpgradeAgentResources\\":false,\\"autoUpgradeAgentPodTemplate\\":false,\\"seed\\":0}}' > /app-config/config && echo '{}' > /cluster-runtime-config/config" command: - bash - -c @@ -541,6 +542,78 @@ void testSetImage(boolean deleteJob) { assertEquals("Never", container.getImagePullPolicy()); } + @Test + void testNoUpdateFlags() { + final ApplicationCustomResource resource = + getCr( + """ + apiVersion: langstream.ai/v1alpha1 + kind: Application + metadata: + name: test-'app + namespace: default + spec: + application: "{app: true}" + tenant: my-tenant + codeArchiveId: "iiii" + options: '{"autoUpgradeRuntimeImage": false, "autoUpgradeRuntimeImagePullPolicy": false, "autoUpgradeAgentResources": false, "autoUpgradeAgentPodTemplate": false}' + """); + + Job job = + AppResourcesFactory.generateDeployerJob( + AppResourcesFactory.GenerateJobParams.builder() + .applicationCustomResource(resource) + .deleteJob(false) + .image("busybox:v1") + .imagePullPolicy("Never") + .build()); + final Container container = + job.getSpec().getTemplate().getSpec().getInitContainers().get(0); + System.out.println("args=" + container.getArgs().get(0)); + assertTrue( + container + .getArgs() + .get(0) + .contains( + "\"deployFlags\":{\"autoUpgradeRuntimeImage\":false,\"autoUpgradeRuntimeImagePullPolicy\":false,\"autoUpgradeAgentResources\":false,\"autoUpgradeAgentPodTemplate\":false,\"seed\":0}")); + } + + @Test + void testUpdateFlags() { + final ApplicationCustomResource resource = + getCr( + """ + apiVersion: langstream.ai/v1alpha1 + kind: Application + metadata: + name: test-'app + namespace: default + spec: + application: "{app: true}" + tenant: my-tenant + codeArchiveId: "iiii" + options: '{"autoUpgradeRuntimeImage": true, "autoUpgradeRuntimeImagePullPolicy": true, "autoUpgradeAgentResources": true, "autoUpgradeAgentPodTemplate": true}' + """); + + Job job = + AppResourcesFactory.generateDeployerJob( + AppResourcesFactory.GenerateJobParams.builder() + .applicationCustomResource(resource) + .deleteJob(false) + .image("busybox:v1") + .imagePullPolicy("Never") + .build()); + final Container container = + job.getSpec().getTemplate().getSpec().getInitContainers().get(0); + System.out.println("args=" + container.getArgs().get(0)); + assertTrue( + container + .getArgs() + .get(0) + .contains( + "\"deployFlags\":{\"autoUpgradeRuntimeImage\":true,\"autoUpgradeRuntimeImagePullPolicy\":true,\"autoUpgradeAgentResources\":true,\"autoUpgradeAgentPodTemplate\":true,\"seed\":0}")); + } + private ApplicationCustomResource getCr(String yaml) { return SerializationUtil.readYaml(yaml, ApplicationCustomResource.class); } diff --git a/langstream-k8s-deployer/langstream-k8s-deployer-operator/src/main/java/ai/langstream/deployer/k8s/controllers/agents/AgentController.java b/langstream-k8s-deployer/langstream-k8s-deployer-operator/src/main/java/ai/langstream/deployer/k8s/controllers/agents/AgentController.java index 8358c4376..7d27c7bb4 100644 --- a/langstream-k8s-deployer/langstream-k8s-deployer-operator/src/main/java/ai/langstream/deployer/k8s/controllers/agents/AgentController.java +++ b/langstream-k8s-deployer/langstream-k8s-deployer-operator/src/main/java/ai/langstream/deployer/k8s/controllers/agents/AgentController.java @@ -152,15 +152,34 @@ protected StatefulSet desired( // this is an update for the statefulset. // It's required to not keep the same deployer configuration of the current // version + + boolean autoUpgradeRuntimeImage = primary.getSpec().isAutoUpgradeRuntimeImage(); + boolean autoUpgradeRuntimeImagePullPolicy = + primary.getSpec().isAutoUpgradeRuntimeImagePullPolicy(); + boolean autoUpgradeAgentResources = + primary.getSpec().isAutoUpgradeAgentResources(); + boolean updatePodTemplate = primary.getSpec().isAutoUpgradeAgentPodTemplate(); + final LastAppliedConfigForStatefulset lastAppliedConfig = SerializationUtil.readJson( status.getLastConfigApplied(), LastAppliedConfigForStatefulset.class); builder.agentResourceUnitConfiguration( - lastAppliedConfig.getAgentResourceUnitConfiguration()) - .image(lastAppliedConfig.getImage()) - .imagePullPolicy(lastAppliedConfig.getImagePullPolicy()) - .podTemplate(lastAppliedConfig.getPodTemplate()); + autoUpgradeAgentResources + ? configuration.getAgentResources() + : lastAppliedConfig.getAgentResourceUnitConfiguration()) + .image( + autoUpgradeRuntimeImage + ? configuration.getRuntimeImage() + : lastAppliedConfig.getImage()) + .imagePullPolicy( + autoUpgradeRuntimeImagePullPolicy + ? configuration.getRuntimeImagePullPolicy() + : lastAppliedConfig.getImagePullPolicy()) + .podTemplate( + updatePodTemplate + ? configuration.getAgentPodTemplate() + : lastAppliedConfig.getPodTemplate()); } else { isUpdate = false; builder.agentResourceUnitConfiguration(configuration.getAgentResources()) diff --git a/langstream-k8s-deployer/langstream-k8s-deployer-operator/src/test/java/ai/langstream/deployer/k8s/controllers/AgentControllerIT.java b/langstream-k8s-deployer/langstream-k8s-deployer-operator/src/test/java/ai/langstream/deployer/k8s/controllers/AgentControllerIT.java index 51dd719b4..db33faf21 100644 --- a/langstream-k8s-deployer/langstream-k8s-deployer-operator/src/test/java/ai/langstream/deployer/k8s/controllers/AgentControllerIT.java +++ b/langstream-k8s-deployer/langstream-k8s-deployer-operator/src/test/java/ai/langstream/deployer/k8s/controllers/AgentControllerIT.java @@ -20,13 +20,12 @@ import ai.langstream.api.model.AgentLifecycleStatus; import ai.langstream.api.model.StreamingCluster; +import ai.langstream.deployer.k8s.CRDConstants; import ai.langstream.deployer.k8s.agents.AgentResourcesFactory; import ai.langstream.deployer.k8s.api.crds.agents.AgentCustomResource; import ai.langstream.deployer.k8s.util.SerializationUtil; import ai.langstream.runtime.api.agent.RuntimePodConfiguration; -import io.fabric8.kubernetes.api.model.Container; -import io.fabric8.kubernetes.api.model.NamespaceBuilder; -import io.fabric8.kubernetes.api.model.PodSpec; +import io.fabric8.kubernetes.api.model.*; import io.fabric8.kubernetes.api.model.apps.StatefulSet; import io.fabric8.kubernetes.api.model.apps.StatefulSetSpec; import io.fabric8.kubernetes.client.KubernetesClient; @@ -131,6 +130,186 @@ void testAgentController() throws Exception { assertEquals("/app-config/config", container.getArgs().get(args++)); } + private static StatefulSet findStatefulSetWithPodAnnotation( + String namespace, String annotation, String value) { + return deployment + .getClient() + .apps() + .statefulSets() + .inNamespace(namespace) + .list() + .getItems() + .stream() + .filter( + sts -> + value.equals( + sts.getSpec() + .getTemplate() + .getMetadata() + .getAnnotations() + .get(annotation))) + .findFirst() + .orElse(null); + } + + @Test + void testAgentControllerUpdateAgent() throws Exception { + final KubernetesClient client = deployment.getClient(); + final String tenant = genTenant(); + final String namespace = "langstream-" + tenant; + createNamespace(client, namespace); + + final String agentCustomResourceName = + AgentResourcesFactory.getAgentCustomResourceName("my-app", "agent-id"); + + createAgentSecret(client, tenant, agentCustomResourceName, namespace); + + final AgentCustomResource resource = + getCr( + """ + apiVersion: langstream.ai/v1alpha1 + kind: Agent + metadata: + name: %s + spec: + applicationId: my-app + agentId: agent-id + agentConfigSecretRef: %s + agentConfigSecretRefChecksum: xx + tenant: %s + """ + .formatted( + agentCustomResourceName, agentCustomResourceName, tenant)); + + client.resource(resource).inNamespace(namespace).create(); + + Awaitility.await() + .untilAsserted( + () -> { + assertEquals( + 1, + client.apps() + .statefulSets() + .inNamespace(namespace) + .list() + .getItems() + .size()); + assertEquals( + AgentLifecycleStatus.Status.DEPLOYING, + client.resource(resource) + .inNamespace(namespace) + .get() + .getStatus() + .getStatus() + .getStatus()); + }); + + Awaitility.await() + .atMost(30, TimeUnit.SECONDS) + .untilAsserted( + () -> { + StatefulSet p = + findStatefulSetWithPodAnnotation( + namespace, "ai.langstream/config-checksum", "xx"); + assertNotNull(p); + }); + + StatefulSet sts = + findStatefulSetWithPodAnnotation(namespace, "ai.langstream/config-checksum", "xx"); + assertEquals( + "busybox", sts.getSpec().getTemplate().getSpec().getContainers().get(0).getImage()); + DEPLOYER_CONFIG.put("DEPLOYER_RUNTIME_IMAGE", "busybox:v2"); + try { + deployment.restartDeployerOperator(); + + AgentCustomResource resource2 = + client.resources(AgentCustomResource.class) + .inNamespace(namespace) + .withName(agentCustomResourceName) + .get(); + resource2.getSpec().setAgentConfigSecretRefChecksum("xx2"); + client.resource(resource2).inNamespace(namespace).update(); + + Awaitility.await() + .atMost(30, TimeUnit.SECONDS) + .untilAsserted( + () -> { + StatefulSet p = + findStatefulSetWithPodAnnotation( + namespace, "ai.langstream/config-checksum", "xx2"); + assertNotNull(p); + }); + sts = + findStatefulSetWithPodAnnotation( + namespace, "ai.langstream/config-checksum", "xx2"); + assertEquals( + "busybox", + sts.getSpec().getTemplate().getSpec().getContainers().get(0).getImage()); + + resource2 = + client.resources(AgentCustomResource.class) + .inNamespace(namespace) + .withName(agentCustomResourceName) + .get(); + resource2.getSpec().setAgentConfigSecretRefChecksum("xx3"); + resource2 + .getSpec() + .setOptions( + "{\"autoUpgradeRuntimeImage\": true, \"autoUpgradeRuntimeImagePullPolicy\": true, \"autoUpgradeAgentResources\": true, \"autoUpgradeAgentPodTemplate\": true}"); + + client.resource(resource2).inNamespace(namespace).update(); + + Awaitility.await() + .atMost(30, TimeUnit.SECONDS) + .untilAsserted( + () -> { + StatefulSet p = + findStatefulSetWithPodAnnotation( + namespace, "ai.langstream/config-checksum", "xx3"); + assertNotNull(p); + }); + sts = + findStatefulSetWithPodAnnotation( + namespace, "ai.langstream/config-checksum", "xx3"); + assertEquals( + "busybox:v2", + sts.getSpec().getTemplate().getSpec().getContainers().get(0).getImage()); + + DEPLOYER_CONFIG.put("DEPLOYER_RUNTIME_IMAGE", "busybox:v3"); + deployment.restartDeployerOperator(); + + resource2 = + client.resources(AgentCustomResource.class) + .inNamespace(namespace) + .withName(agentCustomResourceName) + .get(); + resource2 + .getSpec() + .setOptions( + "{\"autoUpgradeRuntimeImage\": true, \"autoUpgradeRuntimeImagePullPolicy\": true, \"autoUpgradeAgentResources\": true, \"autoUpgradeAgentPodTemplate\": true,\"applicationSeed\": 123}"); + + client.resource(resource2).inNamespace(namespace).update(); + + Awaitility.await() + .untilAsserted( + () -> { + StatefulSet p = + findStatefulSetWithPodAnnotation( + namespace, "ai.langstream/application-seed", "123"); + assertNotNull(p); + }); + sts = + findStatefulSetWithPodAnnotation( + namespace, "ai.langstream/config-checksum", "xx3"); + assertEquals( + "busybox:v3", + sts.getSpec().getTemplate().getSpec().getContainers().get(0).getImage()); + + } finally { + DEPLOYER_CONFIG.put("DEPLOYER_RUNTIME_IMAGE", "busybox"); + } + } + static AtomicInteger counter = new AtomicInteger(0); private String genTenant() { @@ -277,5 +456,18 @@ private void createNamespace(KubernetesClient client, String namespace) { .endMetadata() .build()) .serverSideApply(); + + deployment + .getClient() + .resource( + new ServiceAccountBuilder() + .withNewMetadata() + .withName( + CRDConstants.computeRuntimeServiceAccountForTenant( + namespace.replace("langstream-", ""))) + .endMetadata() + .build()) + .inNamespace(namespace) + .serverSideApply(); } } diff --git a/langstream-k8s-deployer/langstream-k8s-deployer-operator/src/test/java/ai/langstream/deployer/k8s/controllers/AppControllerIT.java b/langstream-k8s-deployer/langstream-k8s-deployer-operator/src/test/java/ai/langstream/deployer/k8s/controllers/AppControllerIT.java index 2e03ed9c4..fff7d7b0e 100644 --- a/langstream-k8s-deployer/langstream-k8s-deployer-operator/src/test/java/ai/langstream/deployer/k8s/controllers/AppControllerIT.java +++ b/langstream-k8s-deployer/langstream-k8s-deployer-operator/src/test/java/ai/langstream/deployer/k8s/controllers/AppControllerIT.java @@ -55,9 +55,11 @@ public class AppControllerIT { new OperatorExtension( Map.of( "DEPLOYER_AGENT_RESOURCES", - "{defaultMaxTotalResourceUnitsPerTenant: 3}", - "DEPLOYER_RUNTIME_IMAGE", "bash", - "DEPLOYER_RUNTIME_IMAGE_PULL_POLICY", "IfNotPresent")); + "{defaultMaxTotalResourceUnitsPerTenant: 3}", + "DEPLOYER_RUNTIME_IMAGE", + "bash", + "DEPLOYER_RUNTIME_IMAGE_PULL_POLICY", + "IfNotPresent")); static AtomicInteger counter = new AtomicInteger(0); @@ -75,17 +77,17 @@ void testAppController() { final ApplicationCustomResource resource = getCr( """ - apiVersion: langstream.ai/v1alpha1 - kind: Application - metadata: - name: %s - namespace: %s - spec: - image: bash - imagePullPolicy: IfNotPresent - application: '{"modules": {}}' - tenant: %s - """ + apiVersion: langstream.ai/v1alpha1 + kind: Application + metadata: + name: %s + namespace: %s + spec: + image: bash + imagePullPolicy: IfNotPresent + application: '{"modules": {}}' + tenant: %s + """ .formatted(applicationId, namespace, tenant)); final KubernetesClient client = deployment.getClient(); deployment @@ -444,9 +446,7 @@ private void checkDeployerJob(Job job, boolean cleanup) { assertEquals("bash", initContainer.getCommand().get(0)); assertEquals("-c", initContainer.getCommand().get(1)); assertEquals( - "echo '{\"applicationId\":\"my-app\",\"tenant\":\"my-tenant\",\"application\":\"{\\\"modules\\\": " - + "{}}\",\"codeStorageArchiveId\":null}' > /app-config/config && echo '{}' > " - + "/cluster-runtime-config/config", + "echo '{\"applicationId\":\"my-app\",\"tenant\":\"my-tenant\",\"application\":\"{\\\"modules\\\": {}}\",\"codeStorageArchiveId\":null,\"deployFlags\":{\"autoUpgradeRuntimeImage\":false,\"autoUpgradeRuntimeImagePullPolicy\":false,\"autoUpgradeAgentResources\":false,\"autoUpgradeAgentPodTemplate\":false,\"seed\":0}}' > /app-config/config && echo '{}' > /cluster-runtime-config/config", initContainer.getArgs().get(0)); } diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/KubernetesClusterRuntime.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/KubernetesClusterRuntime.java index 09cf10d8d..0fb4cb150 100644 --- a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/KubernetesClusterRuntime.java +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/KubernetesClusterRuntime.java @@ -109,7 +109,8 @@ public Object deploy( secrets, executionPlan, streamingClusterRuntime, - codeStorageArchiveId); + codeStorageArchiveId, + deployContext); final String namespace = computeNamespace(tenant); for (Secret secret : secrets) { @@ -229,7 +230,8 @@ private void collectAgentCustomResourcesAndSecrets( List secrets, ExecutionPlan applicationInstance, StreamingClusterRuntime streamingClusterRuntime, - String codeStorageArchiveId) { + String codeStorageArchiveId, + DeployContext deployContext) { for (AgentNode agentImplementation : applicationInstance.getAgents().values()) { collectAgentCustomResourceAndSecret( tenant, @@ -238,7 +240,8 @@ private void collectAgentCustomResourcesAndSecrets( agentImplementation, streamingClusterRuntime, applicationInstance, - codeStorageArchiveId); + codeStorageArchiveId, + deployContext); } } @@ -250,7 +253,8 @@ private void collectAgentCustomResourceAndSecret( AgentNode agent, StreamingClusterRuntime streamingClusterRuntime, ExecutionPlan applicationInstance, - String codeStorageArchiveId) { + String codeStorageArchiveId, + DeployContext deployContext) { if (log.isDebugEnabled()) { log.debug( "Building configuration for Agent {}, codeStorageArchiveId {}", @@ -340,7 +344,15 @@ private void collectAgentCustomResourceAndSecret( agentSpec.setResources( new AgentSpec.Resources(resourcesSpec.parallelism(), resourcesSpec.size())); - agentSpec.serializeAndSetOptions(new AgentSpec.Options(disks)); + AgentSpec.Options options = + new AgentSpec.Options( + disks, + deployContext.isAutoUpgradeRuntimeImage(), + deployContext.isAutoUpgradeRuntimeImagePullPolicy(), + deployContext.isAutoUpgradeAgentResources(), + deployContext.isAutoUpgradeAgentPodTemplate(), + deployContext.getApplicationSeed()); + agentSpec.serializeAndSetOptions(options); agentSpec.setAgentConfigSecretRef(secretName); agentSpec.setCodeArchiveId(codeStorageArchiveId); byte[] hash = DIGEST.digest(SerializationUtil.writeAsJsonBytes(secret.getData())); @@ -393,7 +405,8 @@ public void delete( secrets, applicationInstance, streamingClusterRuntime, - codeStorageArchiveId); + codeStorageArchiveId, + deployContext); final String namespace = computeNamespace(tenant); for (Secret secret : secrets) { diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/KubernetesClusterRuntimeDockerTest.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/KubernetesClusterRuntimeDockerTest.java index c15e683a6..f5111eedf 100644 --- a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/KubernetesClusterRuntimeDockerTest.java +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/KubernetesClusterRuntimeDockerTest.java @@ -61,7 +61,7 @@ class KubernetesClusterRuntimeDockerTest { static KafkaContainerExtension kafkaContainer = new KafkaContainerExtension(); private ApplicationDeployer getDeployer() { - return getDeployer(null); + return getDeployer(DeployContext.NO_DEPLOY_CONTEXT); } private ApplicationDeployer getDeployer(DeployContext deployContext) { @@ -309,7 +309,7 @@ public void testCodeArchiveId() throws Exception { ApplicationDeployer deployer = getDeployer( - new DeployContext() { + new DeployContext.NoOpDeployContext() { @Override public ApplicationCodeInfo getApplicationCodeInfo( String tenant, String applicationId, String codeArchiveId) { diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/KubernetesClusterRuntimeTest.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/KubernetesClusterRuntimeTest.java index 49bb6f58a..56625872e 100644 --- a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/KubernetesClusterRuntimeTest.java +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/KubernetesClusterRuntimeTest.java @@ -116,7 +116,7 @@ private void assertCode( String oldCode, boolean expectKeep) { final DeployContext deployContext = - new DeployContext() { + new DeployContext.NoOpDeployContext() { @Override public ApplicationCodeInfo getApplicationCodeInfo( String tenant, String applicationId, String codeArchiveId) { diff --git a/langstream-k8s-storage/src/main/java/ai/langstream/impl/storage/k8s/apps/KubernetesApplicationStore.java b/langstream-k8s-storage/src/main/java/ai/langstream/impl/storage/k8s/apps/KubernetesApplicationStore.java index 0acfefdf8..a7ce0d3d1 100644 --- a/langstream-k8s-storage/src/main/java/ai/langstream/impl/storage/k8s/apps/KubernetesApplicationStore.java +++ b/langstream-k8s-storage/src/main/java/ai/langstream/impl/storage/k8s/apps/KubernetesApplicationStore.java @@ -140,17 +140,21 @@ public void put( String applicationId, Application applicationInstance, String codeArchiveId, - ExecutionPlan executionPlan) { + ExecutionPlan executionPlan, + boolean autoUpgrade, + boolean forceRestart) { final ApplicationCustomResource existing = getApplicationCustomResource(tenant, applicationId); + final ApplicationSpecOptions existingOptions = + existing != null + ? ApplicationSpec.deserializeOptions(existing.getSpec().getOptions()) + : null; if (existing != null) { if (existing.isMarkedForDeletion()) { throw new IllegalArgumentException( "Application " + applicationId + " is marked for deletion."); } - final ApplicationSpecOptions options = - ApplicationSpec.deserializeOptions(existing.getSpec().getOptions()); - if (options.isMarkedForDeletion()) { + if (existingOptions.isMarkedForDeletion()) { throw new IllegalArgumentException( "Application " + applicationId + " is marked for deletion."); } @@ -168,6 +172,22 @@ public void put( spec.setTenant(tenant); spec.setApplication(ApplicationSpec.serializeApplication(serializedApp)); spec.setCodeArchiveId(codeArchiveId); + ApplicationSpecOptions specOptions = new ApplicationSpecOptions(); + specOptions.setAutoUpgradeRuntimeImage(autoUpgrade); + specOptions.setAutoUpgradeRuntimeImagePullPolicy(autoUpgrade); + specOptions.setAutoUpgradeAgentResources(autoUpgrade); + specOptions.setAutoUpgradeAgentPodTemplate(autoUpgrade); + if (forceRestart) { + specOptions.setSeed(System.nanoTime()); + } else { + // very important to not overwrite the seed if we are not forcing a restart + if (existingOptions != null) { + specOptions.setSeed(existingOptions.getSeed()); + } else { + specOptions.setSeed(0L); + } + } + spec.setOptions(ApplicationSpec.serializeOptions(specOptions)); log.info( "Creating application {} in namespace {}, spec {}", applicationId, namespace, spec); diff --git a/langstream-k8s-storage/src/test/java/ai/langstream/impl/storage/k8s/apps/KubernetesApplicationStoreTest.java b/langstream-k8s-storage/src/test/java/ai/langstream/impl/storage/k8s/apps/KubernetesApplicationStoreTest.java index e8e9930ad..fb80bf0ab 100644 --- a/langstream-k8s-storage/src/test/java/ai/langstream/impl/storage/k8s/apps/KubernetesApplicationStoreTest.java +++ b/langstream-k8s-storage/src/test/java/ai/langstream/impl/storage/k8s/apps/KubernetesApplicationStoreTest.java @@ -54,8 +54,8 @@ void testApp() { "mysecret", new ai.langstream.api.model.Secret( "mysecret", "My secret", Map.of("token", "xxx"))))); - store.put(tenant, "myapp", app, "code-1", null); - final ApplicationCustomResource createdCr = + store.put(tenant, "myapp", app, "code-1", null, false, false); + ApplicationCustomResource createdCr = k3s.getClient() .resources(ApplicationCustomResource.class) .inNamespace("s" + tenant) @@ -68,6 +68,9 @@ void testApp() { "{\"resources\":{},\"modules\":{},\"instance\":null,\"gateways\":null,\"agentRunners\":{}}", createdCr.getSpec().getApplication()); assertEquals(tenant, createdCr.getSpec().getTenant()); + assertEquals( + "{\"deleteMode\":\"CLEANUP_REQUIRED\",\"markedForDeletion\":false,\"seed\":0,\"autoUpgradeRuntimeImage\":false,\"autoUpgradeRuntimeImagePullPolicy\":false,\"autoUpgradeAgentResources\":false,\"autoUpgradeAgentPodTemplate\":false}", + createdCr.getSpec().getOptions()); final Secret createdSecret = k3s.getClient().secrets().inNamespace("s" + tenant).withName("myapp").get(); @@ -86,6 +89,37 @@ void testApp() { assertEquals(1, store.list(tenant).size()); + store.put(tenant, "myapp", app, "code-1", null, true, false); + createdCr = + k3s.getClient() + .resources(ApplicationCustomResource.class) + .inNamespace("s" + tenant) + .withName("myapp") + .get(); + assertEquals( + "{\"deleteMode\":\"CLEANUP_REQUIRED\",\"markedForDeletion\":false,\"seed\":0,\"autoUpgradeRuntimeImage\":true,\"autoUpgradeRuntimeImagePullPolicy\":true,\"autoUpgradeAgentResources\":true,\"autoUpgradeAgentPodTemplate\":true}", + createdCr.getSpec().getOptions()); + + store.put(tenant, "myapp", app, "code-1", null, true, true); + createdCr = + k3s.getClient() + .resources(ApplicationCustomResource.class) + .inNamespace("s" + tenant) + .withName("myapp") + .get(); + assertEquals( + "{\"deleteMode\":\"CLEANUP_REQUIRED\",\"markedForDeletion\":false,\"seed\":%s,\"autoUpgradeRuntimeImage\":true,\"autoUpgradeRuntimeImagePullPolicy\":true,\"autoUpgradeAgentResources\":true,\"autoUpgradeAgentPodTemplate\":true}" + .formatted( + ApplicationSpec.deserializeOptions(createdCr.getSpec().getOptions()) + .getSeed() + + ""), + createdCr.getSpec().getOptions()); + + assertNotEquals( + "0", + ApplicationSpec.deserializeOptions(createdCr.getSpec().getOptions()).getSeed() + + ""); + assertNotNull(store.get(tenant, "myapp", false)); store.delete(tenant, "myapp", false); @@ -124,7 +158,7 @@ void testBlockDeployWhileDeleting() { final String tenant = getTenant(); store.onTenantCreated(tenant); final Application app = new Application(); - store.put(tenant, "myapp", app, "code-1", null); + store.put(tenant, "myapp", app, "code-1", null, false, false); ApplicationCustomResource applicationCustomResource = k3s.getClient() .resources(ApplicationCustomResource.class) @@ -156,12 +190,11 @@ void testBlockDeployWhileDeleting() { .getDeleteMode()); try { - store.put(tenant, "myapp", app, "code-1", null); + store.put(tenant, "myapp", app, "code-1", null, false, false); fail(); } catch (IllegalArgumentException aie) { assertEquals("Application myapp is marked for deletion.", aie.getMessage()); } - applicationCustomResource = applicationCustomResource; applicationCustomResource.getMetadata().setFinalizers(List.of()); k3s.getClient().resource(applicationCustomResource).update(); k3s.getClient().resource(applicationCustomResource).delete(); @@ -170,7 +203,7 @@ void testBlockDeployWhileDeleting() { .until( () -> { try { - store.put(tenant, "myapp", app, "code-1", null); + store.put(tenant, "myapp", app, "code-1", null, false, false); return true; } catch (IllegalArgumentException e) { return false; diff --git a/langstream-kafka-runtime/src/main/java/ai/langstream/kafka/runner/KafkaReaderWrapper.java b/langstream-kafka-runtime/src/main/java/ai/langstream/kafka/runner/KafkaReaderWrapper.java index 1738845ad..1b2b81b44 100644 --- a/langstream-kafka-runtime/src/main/java/ai/langstream/kafka/runner/KafkaReaderWrapper.java +++ b/langstream-kafka-runtime/src/main/java/ai/langstream/kafka/runner/KafkaReaderWrapper.java @@ -107,7 +107,11 @@ private OffsetPerPartition parseOffset() throws IOException { @Override public void close() { if (consumer != null) { - consumer.close(); + try { + consumer.close(); + } catch (org.apache.kafka.common.errors.InterruptException e) { + log.warn("Interrupted while closing Kafka consumer", e); + } } } diff --git a/langstream-pulsar-runtime/src/test/java/ai/langstream/pulsar/PulsarClusterRuntimeDockerTest.java b/langstream-pulsar-runtime/src/test/java/ai/langstream/pulsar/PulsarClusterRuntimeDockerTest.java index 4b184d23c..b4fce154c 100644 --- a/langstream-pulsar-runtime/src/test/java/ai/langstream/pulsar/PulsarClusterRuntimeDockerTest.java +++ b/langstream-pulsar-runtime/src/test/java/ai/langstream/pulsar/PulsarClusterRuntimeDockerTest.java @@ -25,6 +25,7 @@ import ai.langstream.api.model.TopicDefinition; import ai.langstream.api.runner.topics.TopicConnectionsRuntimeRegistry; import ai.langstream.api.runtime.ClusterRuntimeRegistry; +import ai.langstream.api.runtime.DeployContext; import ai.langstream.api.runtime.ExecutionPlan; import ai.langstream.api.runtime.PluginsRegistry; import ai.langstream.impl.deploy.ApplicationDeployer; @@ -73,6 +74,7 @@ public void testMapPulsarTopics() throws Exception { .registry(new ClusterRuntimeRegistry()) .pluginsRegistry(new PluginsRegistry()) .topicConnectionsRuntimeRegistry(new TopicConnectionsRuntimeRegistry()) + .deployContext(DeployContext.NO_DEPLOY_CONTEXT) .build()) { Module module = applicationInstance.getModule("module-1"); diff --git a/langstream-runtime/langstream-runtime-api/src/main/java/ai/langstream/runtime/api/deployer/RuntimeDeployerConfiguration.java b/langstream-runtime/langstream-runtime-api/src/main/java/ai/langstream/runtime/api/deployer/RuntimeDeployerConfiguration.java index 40a1b2959..9ff9255b4 100644 --- a/langstream-runtime/langstream-runtime-api/src/main/java/ai/langstream/runtime/api/deployer/RuntimeDeployerConfiguration.java +++ b/langstream-runtime/langstream-runtime-api/src/main/java/ai/langstream/runtime/api/deployer/RuntimeDeployerConfiguration.java @@ -27,4 +27,19 @@ public class RuntimeDeployerConfiguration { private String tenant; private String application; private String codeStorageArchiveId; + private DeployFlags deployFlags; + + @Data + @NoArgsConstructor + @AllArgsConstructor + /* + * Flags specifically used for this deploy operation. + */ + public static class DeployFlags { + private boolean autoUpgradeRuntimeImage; + private boolean autoUpgradeRuntimeImagePullPolicy; + private boolean autoUpgradeAgentResources; + private boolean autoUpgradeAgentPodTemplate; + private long seed; + } } diff --git a/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/application/ApplicationSetupRunner.java b/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/application/ApplicationSetupRunner.java index bfc55d9a4..88209f25d 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/application/ApplicationSetupRunner.java +++ b/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/application/ApplicationSetupRunner.java @@ -22,6 +22,7 @@ import ai.langstream.api.runner.assets.AssetManagerRegistry; import ai.langstream.api.runner.topics.TopicConnectionsRuntimeRegistry; import ai.langstream.api.runtime.ClusterRuntimeRegistry; +import ai.langstream.api.runtime.DeployContext; import ai.langstream.api.runtime.ExecutionPlan; import ai.langstream.api.runtime.PluginsRegistry; import ai.langstream.impl.deploy.ApplicationDeployer; @@ -124,6 +125,7 @@ private ApplicationDeployer buildDeployer( .pluginsRegistry(new PluginsRegistry()) .topicConnectionsRuntimeRegistry(topicConnectionsRuntimeRegistry) .assetManagerRegistry(assetManagerRegistry) + .deployContext(DeployContext.NO_DEPLOY_CONTEXT) .build(); } diff --git a/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/deployer/RuntimeDeployer.java b/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/deployer/RuntimeDeployer.java index 4f1b5cb5a..2aac052d7 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/deployer/RuntimeDeployer.java +++ b/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/deployer/RuntimeDeployer.java @@ -64,9 +64,21 @@ public void deploy( appInstance.setSecrets(secrets); final String tenant = configuration.getTenant(); + RuntimeDeployerConfiguration.DeployFlags deployFlags = configuration.getDeployFlags(); + if (deployFlags == null) { + deployFlags = new RuntimeDeployerConfiguration.DeployFlags(); + } final DeployContextImpl deployContext = - new DeployContextImpl(clusterConfiguration, token, tenant); + new DeployContextImpl( + clusterConfiguration, + token, + tenant, + deployFlags.isAutoUpgradeRuntimeImage(), + deployFlags.isAutoUpgradeRuntimeImagePullPolicy(), + deployFlags.isAutoUpgradeAgentResources(), + deployFlags.isAutoUpgradeAgentPodTemplate(), + deployFlags.getSeed()); try (ApplicationDeployer deployer = ApplicationDeployer.builder() .registry(new ClusterRuntimeRegistry(clusterRuntimeConfiguration)) @@ -98,6 +110,7 @@ public void delete( ApplicationDeployer.builder() .registry(new ClusterRuntimeRegistry(clusterRuntimeConfiguration)) .pluginsRegistry(new PluginsRegistry()) + .deployContext(DeployContext.NO_DEPLOY_CONTEXT) .build()) { log.info("Deleting application {}", applicationId); @@ -114,14 +127,56 @@ public void delete( private static class DeployContextImpl implements DeployContext { private final AdminClient adminClient; + private final boolean autoUpgradeRuntimeImage; + private final boolean autoUpgradeRuntimeImagePullPolicy; + private final boolean autoUpgradeAgentResources; + private final boolean autoUpgradeAgentPodTemplate; + private final long seed; public DeployContextImpl( - ClusterConfiguration clusterConfiguration, String token, String tenant) { + ClusterConfiguration clusterConfiguration, + String token, + String tenant, + boolean autoUpgradeRuntimeImage, + boolean autoUpgradeRuntimeImagePullPolicy, + boolean autoUpgradeAgentResources, + boolean autoUpgradeAgentPodTemplate, + long seed) { if (clusterConfiguration == null) { adminClient = null; } else { adminClient = createAdminClient(clusterConfiguration, token, tenant); } + this.autoUpgradeRuntimeImage = autoUpgradeRuntimeImage; + this.autoUpgradeRuntimeImagePullPolicy = autoUpgradeRuntimeImagePullPolicy; + this.autoUpgradeAgentResources = autoUpgradeAgentResources; + this.autoUpgradeAgentPodTemplate = autoUpgradeAgentPodTemplate; + this.seed = seed; + } + + @Override + public boolean isAutoUpgradeRuntimeImage() { + return autoUpgradeRuntimeImage; + } + + @Override + public boolean isAutoUpgradeRuntimeImagePullPolicy() { + return autoUpgradeRuntimeImagePullPolicy; + } + + @Override + public boolean isAutoUpgradeAgentResources() { + return autoUpgradeAgentResources; + } + + @Override + public boolean isAutoUpgradeAgentPodTemplate() { + return autoUpgradeAgentPodTemplate; + } + + @Override + public long getApplicationSeed() { + return seed; } @Override diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/AbstractApplicationRunner.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/AbstractApplicationRunner.java index f94cdb54f..1aa40d886 100644 --- a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/AbstractApplicationRunner.java +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/AbstractApplicationRunner.java @@ -22,6 +22,7 @@ import ai.langstream.api.runner.code.MetricsReporter; import ai.langstream.api.runner.topics.TopicConnectionsRuntimeRegistry; import ai.langstream.api.runtime.ClusterRuntimeRegistry; +import ai.langstream.api.runtime.DeployContext; import ai.langstream.api.runtime.ExecutionPlan; import ai.langstream.api.runtime.PluginsRegistry; import ai.langstream.deployer.k8s.agents.AgentResourcesFactory; @@ -165,6 +166,7 @@ public static void setup() throws Exception { ApplicationDeployer.builder() .registry(new ClusterRuntimeRegistry()) .pluginsRegistry(new PluginsRegistry()) + .deployContext(DeployContext.NO_DEPLOY_CONTEXT) .topicConnectionsRuntimeRegistry(topicConnectionsRuntimeRegistry) .assetManagerRegistry(assetManagerRegistry) .build(); diff --git a/langstream-runtime/langstream-runtime-tester/src/main/java/ai/langstream/runtime/tester/InMemoryApplicationStore.java b/langstream-runtime/langstream-runtime-tester/src/main/java/ai/langstream/runtime/tester/InMemoryApplicationStore.java index 5e3eecdc3..be6f34639 100644 --- a/langstream-runtime/langstream-runtime-tester/src/main/java/ai/langstream/runtime/tester/InMemoryApplicationStore.java +++ b/langstream-runtime/langstream-runtime-tester/src/main/java/ai/langstream/runtime/tester/InMemoryApplicationStore.java @@ -86,7 +86,9 @@ public void put( String applicationId, Application applicationInstance, String codeArchiveReference, - ExecutionPlan executionPlan) { + ExecutionPlan executionPlan, + boolean autoUpgrade, + boolean forceRestart) { APPLICATIONS.put( getKey(tenant, applicationId), new LocalApplication( diff --git a/langstream-runtime/langstream-runtime-tester/src/main/java/ai/langstream/runtime/tester/LocalApplicationRunner.java b/langstream-runtime/langstream-runtime-tester/src/main/java/ai/langstream/runtime/tester/LocalApplicationRunner.java index 0b2f354a0..2b13230b9 100644 --- a/langstream-runtime/langstream-runtime-tester/src/main/java/ai/langstream/runtime/tester/LocalApplicationRunner.java +++ b/langstream-runtime/langstream-runtime-tester/src/main/java/ai/langstream/runtime/tester/LocalApplicationRunner.java @@ -19,10 +19,7 @@ import ai.langstream.api.runner.assets.AssetManagerRegistry; import ai.langstream.api.runner.code.MetricsReporter; import ai.langstream.api.runner.topics.TopicConnectionsRuntimeRegistry; -import ai.langstream.api.runtime.AgentNode; -import ai.langstream.api.runtime.ClusterRuntimeRegistry; -import ai.langstream.api.runtime.ExecutionPlan; -import ai.langstream.api.runtime.PluginsRegistry; +import ai.langstream.api.runtime.*; import ai.langstream.deployer.k8s.agents.AgentResourcesFactory; import ai.langstream.impl.deploy.ApplicationDeployer; import ai.langstream.impl.nar.NarFileHandler; @@ -101,6 +98,7 @@ public LocalApplicationRunner( .pluginsRegistry(new PluginsRegistry()) .topicConnectionsRuntimeRegistry(topicConnectionsRuntimeRegistry) .assetManagerRegistry(assetManagerRegistry) + .deployContext(DeployContext.NO_DEPLOY_CONTEXT) .build(); } @@ -145,7 +143,13 @@ public ApplicationRuntime deployApplicationWithSecrets( applicationDeployer.deploy(tenant, implementation, null); applicationStore.put( - tenant, appId, applicationInstance, "no-code-archive-reference", implementation); + tenant, + appId, + applicationInstance, + "no-code-archive-reference", + implementation, + false, + false); return new ApplicationRuntime( tenant, appId, applicationInstance, implementation, secrets, applicationDeployer); diff --git a/langstream-webservice/src/main/java/ai/langstream/webservice/application/ApplicationResource.java b/langstream-webservice/src/main/java/ai/langstream/webservice/application/ApplicationResource.java index 7ec464cd0..61d64e924 100644 --- a/langstream-webservice/src/main/java/ai/langstream/webservice/application/ApplicationResource.java +++ b/langstream-webservice/src/main/java/ai/langstream/webservice/application/ApplicationResource.java @@ -144,7 +144,8 @@ ApplicationDescription.ApplicationDefinition deployApplication( @RequestParam("app") MultipartFile appFile, @RequestParam String instance, @RequestParam Optional secrets, - @RequestParam(value = "dry-run", required = false) boolean dryRun) + @RequestParam(value = "dry-run", required = false) boolean dryRun, + @RequestParam(value = "auto-upgrade", required = false) boolean autoUpgrade) throws Exception { performAuthorization(authentication, tenant); final ParsedApplication parsedApplication = @@ -166,7 +167,8 @@ ApplicationDescription.ApplicationDefinition deployApplication( tenant, applicationId, parsedApplication.getApplication(), - parsedApplication.getCodeArchiveReference()); + parsedApplication.getCodeArchiveReference(), + autoUpgrade); application = parsedApplication.getApplication().getApplication(); } return new ApplicationDescription.ApplicationDefinition(application); @@ -180,7 +182,9 @@ void updateApplication( @NotBlank @PathVariable("id") String applicationId, @NotNull @RequestParam("app") Optional appFile, @RequestParam Optional instance, - @RequestParam Optional secrets) + @RequestParam Optional secrets, + @RequestParam(value = "force-restart", required = false) boolean forceRestart, + @RequestParam(value = "auto-upgrade", required = false) boolean autoUpgrade) throws Exception { performAuthorization(authentication, tenant); final ParsedApplication parsedApplication = @@ -189,7 +193,9 @@ void updateApplication( tenant, applicationId, parsedApplication.getApplication(), - parsedApplication.getCodeArchiveReference()); + parsedApplication.getCodeArchiveReference(), + autoUpgrade, + forceRestart); } @Data diff --git a/langstream-webservice/src/main/java/ai/langstream/webservice/application/ApplicationService.java b/langstream-webservice/src/main/java/ai/langstream/webservice/application/ApplicationService.java index ad4dd84e1..71b9d89b0 100644 --- a/langstream-webservice/src/main/java/ai/langstream/webservice/application/ApplicationService.java +++ b/langstream-webservice/src/main/java/ai/langstream/webservice/application/ApplicationService.java @@ -23,11 +23,7 @@ import ai.langstream.api.model.StoredApplication; import ai.langstream.api.model.TopicDefinition; import ai.langstream.api.runner.topics.TopicConnectionsRuntimeRegistry; -import ai.langstream.api.runtime.AgentNode; -import ai.langstream.api.runtime.ClusterRuntimeRegistry; -import ai.langstream.api.runtime.ExecutionPlan; -import ai.langstream.api.runtime.PluginsRegistry; -import ai.langstream.api.runtime.Topic; +import ai.langstream.api.runtime.*; import ai.langstream.api.storage.ApplicationStore; import ai.langstream.api.webservice.tenant.TenantConfiguration; import ai.langstream.impl.common.DefaultAgentNode; @@ -57,6 +53,7 @@ public class ApplicationService { .registry(new ClusterRuntimeRegistry()) .pluginsRegistry(new PluginsRegistry()) .topicConnectionsRuntimeRegistry(new TopicConnectionsRuntimeRegistry()) + .deployContext(DeployContext.NO_DEPLOY_CONTEXT) .build(); private final GlobalMetadataService globalMetadataService; @@ -75,7 +72,8 @@ public void deployApplication( String tenant, String applicationId, ModelBuilder.ApplicationWithPackageInfo applicationInstance, - String codeArchiveReference) { + String codeArchiveReference, + boolean autoUpgrade) { checkTenant(tenant); if (applicationStore.get(tenant, applicationId, false) != null) { throw new ResponseStatusException(HttpStatus.CONFLICT, "Application already exists"); @@ -92,7 +90,9 @@ public void deployApplication( applicationId, applicationInstance.getApplication(), codeArchiveReference, - executionPlan); + executionPlan, + autoUpgrade, + false); } void checkResourceUsage(String tenant, String applicationId, ExecutionPlan executionPlan) { @@ -150,17 +150,26 @@ public void updateApplication( String tenant, String applicationId, ModelBuilder.ApplicationWithPackageInfo applicationInstance, - String codeArchiveReference) { + String codeArchiveReference, + boolean autoUpgrade, + boolean forceRestart) { checkTenant(tenant); validateDeployMergeAndUpdate( - tenant, applicationId, applicationInstance, codeArchiveReference); + tenant, + applicationId, + applicationInstance, + codeArchiveReference, + autoUpgrade, + forceRestart); } private void validateDeployMergeAndUpdate( String tenant, String applicationId, ModelBuilder.ApplicationWithPackageInfo applicationInstance, - String codeArchiveReference) { + String codeArchiveReference, + boolean autoUpgrade, + boolean forceRestart) { final StoredApplication existing = applicationStore.get(tenant, applicationId, false); if (existing == null) { @@ -194,7 +203,14 @@ private void validateDeployMergeAndUpdate( if (codeArchiveReference == null) { codeArchiveReference = existing.getCodeArchiveReference(); } - applicationStore.put(tenant, applicationId, newApplication, codeArchiveReference, newPlan); + applicationStore.put( + tenant, + applicationId, + newApplication, + codeArchiveReference, + newPlan, + autoUpgrade, + forceRestart); } ExecutionPlan validateExecutionPlan(String applicationId, Application applicationInstance) { diff --git a/langstream-webservice/src/main/java/ai/langstream/webservice/archetype/ArchetypeResource.java b/langstream-webservice/src/main/java/ai/langstream/webservice/archetype/ArchetypeResource.java index 20f72a5cc..bba7e6d8c 100644 --- a/langstream-webservice/src/main/java/ai/langstream/webservice/archetype/ArchetypeResource.java +++ b/langstream-webservice/src/main/java/ai/langstream/webservice/archetype/ArchetypeResource.java @@ -149,7 +149,8 @@ ApplicationDescription.ApplicationDefinition deployApplication( tenant, applicationId, parsedApplication.getApplication(), - parsedApplication.getCodeArchiveReference()); + parsedApplication.getCodeArchiveReference(), + false); application = parsedApplication.getApplication().getApplication(); } return new ApplicationDescription.ApplicationDefinition(application); diff --git a/langstream-webservice/src/test/java/ai/langstream/webservice/application/ApplicationServiceResourceLimitTest.java b/langstream-webservice/src/test/java/ai/langstream/webservice/application/ApplicationServiceResourceLimitTest.java index 2bfdbca99..1de65c2e4 100644 --- a/langstream-webservice/src/test/java/ai/langstream/webservice/application/ApplicationServiceResourceLimitTest.java +++ b/langstream-webservice/src/test/java/ai/langstream/webservice/application/ApplicationServiceResourceLimitTest.java @@ -199,7 +199,9 @@ public void put( String applicationId, Application applicationInstance, String codeArchiveReference, - ExecutionPlan executionPlan) { + ExecutionPlan executionPlan, + boolean autoUpgrade, + boolean forceRestart) { throw new UnsupportedOperationException(); }