diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/api/ConsumePushMessage.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/api/ConsumePushMessage.java similarity index 94% rename from langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/api/ConsumePushMessage.java rename to langstream-api-gateway/src/main/java/ai/langstream/apigateway/api/ConsumePushMessage.java index a34d54b89..1d207e4da 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/api/ConsumePushMessage.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/api/ConsumePushMessage.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package ai.langstream.apigateway.websocket.api; +package ai.langstream.apigateway.api; import java.util.Map; diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/api/ProduceRequest.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/api/ProduceRequest.java similarity index 93% rename from langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/api/ProduceRequest.java rename to langstream-api-gateway/src/main/java/ai/langstream/apigateway/api/ProduceRequest.java index fb52b6ef1..7238f5047 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/api/ProduceRequest.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/api/ProduceRequest.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package ai.langstream.apigateway.websocket.api; +package ai.langstream.apigateway.api; import java.util.Map; diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/api/ProduceResponse.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/api/ProduceResponse.java similarity index 94% rename from langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/api/ProduceResponse.java rename to langstream-api-gateway/src/main/java/ai/langstream/apigateway/api/ProduceResponse.java index 67e6b65fe..ea8af13e1 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/api/ProduceResponse.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/api/ProduceResponse.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package ai.langstream.apigateway.websocket.api; +package ai.langstream.apigateway.api; public record ProduceResponse(Status status, String reason) { public static ProduceResponse OK = new ProduceResponse(Status.OK, null); diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/ConsumeGateway.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/ConsumeGateway.java index 827bdc489..59a859357 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/ConsumeGateway.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/ConsumeGateway.java @@ -25,10 +25,11 @@ import ai.langstream.api.runner.topics.TopicReadResult; import ai.langstream.api.runner.topics.TopicReader; import ai.langstream.apigateway.websocket.AuthenticatedGatewayRequestContext; -import ai.langstream.apigateway.websocket.api.ConsumePushMessage; -import ai.langstream.apigateway.websocket.api.ProduceResponse; +import ai.langstream.apigateway.api.ConsumePushMessage; +import ai.langstream.apigateway.api.ProduceResponse; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.Closeable; +import java.util.ArrayList; import java.util.Base64; import java.util.Collection; import java.util.HashMap; @@ -46,7 +47,7 @@ import lombok.extern.slf4j.Slf4j; @Slf4j -public class ConsumeGateway implements Closeable { +public class ConsumeGateway implements AutoCloseable { protected static final ObjectMapper mapper = new ObjectMapper(); @@ -237,4 +238,44 @@ public void close() { closeReader(); } } + + + + public static List> createMessageFilters( + List headersFilters, + Map passedParameters, + Map principalValues) { + List> filters = new ArrayList<>(); + if (headersFilters == null) { + return filters; + } + for (Gateway.KeyValueComparison comparison : headersFilters) { + if (comparison.key() == null) { + throw new IllegalArgumentException("Key cannot be null"); + } + filters.add( + record -> { + final Header header = record.getHeader(comparison.key()); + if (header == null) { + return false; + } + final String expectedValue = header.valueAsString(); + if (expectedValue == null) { + return false; + } + String value = comparison.value(); + if (value == null && comparison.valueFromParameters() != null) { + value = passedParameters.get(comparison.valueFromParameters()); + } + if (value == null && comparison.valueFromAuthentication() != null) { + value = principalValues.get(comparison.valueFromAuthentication()); + } + if (value == null) { + return false; + } + return expectedValue.equals(value); + }); + } + return filters; + } } diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/ProduceGateway.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/ProduceGateway.java index 9147c157f..2ed3ce3a3 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/ProduceGateway.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/ProduceGateway.java @@ -23,8 +23,8 @@ import ai.langstream.api.runner.topics.TopicConnectionsRuntimeRegistry; import ai.langstream.api.runner.topics.TopicProducer; import ai.langstream.apigateway.websocket.AuthenticatedGatewayRequestContext; -import ai.langstream.apigateway.websocket.api.ProduceRequest; -import ai.langstream.apigateway.websocket.api.ProduceResponse; +import ai.langstream.apigateway.api.ProduceRequest; +import ai.langstream.apigateway.api.ProduceResponse; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.Closeable; @@ -36,10 +36,9 @@ import java.util.stream.Collectors; import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.collections4.map.LRUMap; @Slf4j -public class ProduceGateway implements Closeable { +public class ProduceGateway implements AutoCloseable { protected static final ObjectMapper mapper = new ObjectMapper(); @@ -83,7 +82,8 @@ public void validateOptions(Map options) { private List
commonHeaders; private String logRef; - public ProduceGateway(TopicConnectionsRuntimeRegistry topicConnectionsRuntimeRegistry, TopicProducerCache topicProducerCache) { + public ProduceGateway(TopicConnectionsRuntimeRegistry topicConnectionsRuntimeRegistry, + TopicProducerCache topicProducerCache) { this.topicConnectionsRuntimeRegistry = topicConnectionsRuntimeRegistry; this.topicProducerCache = topicProducerCache; } @@ -102,8 +102,9 @@ public void start( final TopicProducerCache.Key key = new TopicProducerCache.Key(requestContext.tenant(), requestContext.applicationId(), requestContext.gateway().getId()); - producer = topicProducerCache.getOrCreate(key, () -> setupProducer(topic, requestContext.application().getInstance() - .streamingCluster())); + producer = topicProducerCache.getOrCreate(key, + () -> setupProducer(topic, requestContext.application().getInstance() + .streamingCluster())); } protected TopicProducer setupProducer( @@ -153,8 +154,8 @@ public void produceMessage(ProduceRequest produceRequest) throws ProduceExceptio if (configuredHeaders.contains(messageHeader.getKey())) { throw new ProduceException( "Header " - + messageHeader.getKey() - + " is configured as parameter-level header.", + + messageHeader.getKey() + + " is configured as parameter-level header.", ProduceResponse.Status.BAD_REQUEST); } headers.add( @@ -191,11 +192,27 @@ public void close() { public static List
getProducerCommonHeaders( Gateway.ProduceOptions produceOptions, AuthenticatedGatewayRequestContext context) { - if (produceOptions != null) { - return getProducerCommonHeaders( - produceOptions.headers(), context.userParameters(), context.principalValues()); + if (produceOptions == null) { + return null; } - return null; + return getProducerCommonHeaders( + produceOptions.headers(), context.userParameters(), context.principalValues()); + } + + public static List
getProducerCommonHeaders( + Gateway.ChatOptions chatOptions, AuthenticatedGatewayRequestContext context) { + if (chatOptions == null) { + return null; + } + return getProducerCommonHeaders(chatOptions.getHeaders(), context.userParameters(), context.principalValues()); + } + + public static List
getProducerCommonHeaders( + Gateway.ServiceOptions serviceOptions, AuthenticatedGatewayRequestContext context) { + if (serviceOptions == null) { + return null; + } + return getProducerCommonHeaders(serviceOptions.getHeaders(), context.userParameters(), context.principalValues()); } public static List
getProducerCommonHeaders( diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/GatewayResource.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/GatewayResource.java index ea5ead9f9..58e341ad8 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/GatewayResource.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/http/GatewayResource.java @@ -18,18 +18,26 @@ import ai.langstream.api.gateway.GatewayRequestContext; import ai.langstream.api.model.Gateway; import ai.langstream.api.runner.code.Header; -import ai.langstream.apigateway.config.TopicProperties; +import ai.langstream.api.runner.code.Record; +import ai.langstream.apigateway.api.ConsumePushMessage; +import ai.langstream.apigateway.gateways.ConsumeGateway; import ai.langstream.apigateway.gateways.GatewayRequestHandler; import ai.langstream.apigateway.gateways.ProduceGateway; import ai.langstream.apigateway.gateways.TopicProducerCache; import ai.langstream.apigateway.runner.TopicConnectionsRuntimeProviderBean; import ai.langstream.apigateway.websocket.AuthenticatedGatewayRequestContext; -import ai.langstream.apigateway.websocket.api.ProduceRequest; -import ai.langstream.apigateway.websocket.api.ProduceResponse; +import ai.langstream.apigateway.api.ProduceRequest; +import ai.langstream.apigateway.api.ProduceResponse; +import jakarta.servlet.http.HttpServletResponse; import jakarta.validation.constraints.NotBlank; +import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; import java.util.stream.Collectors; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -52,6 +60,7 @@ public class GatewayResource { private final TopicConnectionsRuntimeProviderBean topicConnectionsRuntimeRegistryProvider; private final TopicProducerCache topicProducerCache; private final GatewayRequestHandler gatewayRequestHandler; + private final ExecutorService consumeThreadPool = Executors.newCachedThreadPool(); @PostMapping( value = "/produce/{tenant}/{application}/{gateway}", @@ -86,20 +95,93 @@ ProduceResponse produce( throw new ResponseStatusException(HttpStatus.UNAUTHORIZED, e.getMessage()); } - final ProduceGateway produceGateway = - new ProduceGateway( - topicConnectionsRuntimeRegistryProvider - .getTopicConnectionsRuntimeRegistry(), - topicProducerCache); - try { + + try (final ProduceGateway produceGateway = + new ProduceGateway( + topicConnectionsRuntimeRegistryProvider + .getTopicConnectionsRuntimeRegistry(), + topicProducerCache);) { final List
commonHeaders = ProduceGateway.getProducerCommonHeaders( context.gateway().getProduceOptions(), authContext); produceGateway.start(context.gateway().getTopic(), commonHeaders, authContext); produceGateway.produceMessage(produceRequest); return ProduceResponse.OK; - } finally { - produceGateway.close(); } } + + + @PostMapping( + value = "/service/{tenant}/{application}/{gateway}", + consumes = MediaType.APPLICATION_JSON_VALUE) + void service( + WebRequest request, + HttpServletResponse response, + @NotBlank @PathVariable("tenant") String tenant, + @NotBlank @PathVariable("application") String application, + @NotBlank @PathVariable("gateway") String gateway, + @RequestBody ProduceRequest produceRequest) + throws ProduceGateway.ProduceException { + + final Map queryString = + request.getParameterMap().keySet().stream() + .collect(Collectors.toMap(k -> k, k -> request.getParameter(k))); + final Map headers = new HashMap<>(); + request.getHeaderNames() + .forEachRemaining(name -> headers.put(name, request.getHeader(name))); + final GatewayRequestContext context = + gatewayRequestHandler.validateRequest( + tenant, + application, + gateway, + Gateway.GatewayType.service, + queryString, + headers, + new ProduceGateway.ProduceGatewayRequestValidator()); + final AuthenticatedGatewayRequestContext authContext; + try { + authContext = gatewayRequestHandler.authenticate(context); + } catch (GatewayRequestHandler.AuthFailedException e) { + throw new ResponseStatusException(HttpStatus.UNAUTHORIZED, e.getMessage()); + } + + + try (final ConsumeGateway consumeGateway = new ConsumeGateway( + topicConnectionsRuntimeRegistryProvider + .getTopicConnectionsRuntimeRegistry()); + final ProduceGateway produceGateway = + new ProduceGateway( + topicConnectionsRuntimeRegistryProvider + .getTopicConnectionsRuntimeRegistry(), + topicProducerCache);) { + + final Gateway.ServiceOptions serviceOptions = authContext.gateway().getServiceOptions(); + try { + final List> messageFilters = + ConsumeGateway.createMessageFilters( + serviceOptions.getHeaders(), authContext.userParameters(), + authContext.principalValues()); + consumeGateway.setup(serviceOptions.getInputTopic(), messageFilters, authContext); + final AtomicBoolean stop = new AtomicBoolean(false); + consumeGateway.startReadingAsync(consumeThreadPool, () -> stop.get(), record -> { + stop.set(true); + try { + response.getWriter().print(record); + response.getWriter().flush(); + response.getWriter().close(); + } catch (IOException ioException) { + throw new RuntimeException(ioException); + } + }); + } catch (Exception ex) { + log.error("Error while setting up consume gateway", ex); + throw new RuntimeException(ex); + } + final List
commonHeaders = + ProduceGateway.getProducerCommonHeaders(serviceOptions, authContext); + produceGateway.start(serviceOptions.getOutputTopic(), commonHeaders, authContext); + produceGateway.produceMessage(produceRequest); + } + + } } diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/handlers/AbstractHandler.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/handlers/AbstractHandler.java index 1eb701df7..e58b2c526 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/handlers/AbstractHandler.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/handlers/AbstractHandler.java @@ -32,7 +32,7 @@ import ai.langstream.apigateway.gateways.ProduceGateway; import ai.langstream.apigateway.gateways.TopicProducerCache; import ai.langstream.apigateway.websocket.AuthenticatedGatewayRequestContext; -import ai.langstream.apigateway.websocket.api.ProduceResponse; +import ai.langstream.apigateway.api.ProduceResponse; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.util.ArrayList; @@ -242,43 +242,6 @@ protected void startReadingMessages(WebSocketSession webSocketSession, Executor }); } - protected static List> createMessageFilters( - List headersFilters, - Map passedParameters, - Map principalValues) { - List> filters = new ArrayList<>(); - if (headersFilters == null) { - return filters; - } - for (Gateway.KeyValueComparison comparison : headersFilters) { - if (comparison.key() == null) { - throw new IllegalArgumentException("Key cannot be null"); - } - filters.add( - record -> { - final Header header = record.getHeader(comparison.key()); - if (header == null) { - return false; - } - final String expectedValue = header.valueAsString(); - if (expectedValue == null) { - return false; - } - String value = comparison.value(); - if (value == null && comparison.valueFromParameters() != null) { - value = passedParameters.get(comparison.valueFromParameters()); - } - if (value == null && comparison.valueFromAuthentication() != null) { - value = principalValues.get(comparison.valueFromAuthentication()); - } - if (value == null) { - return false; - } - return expectedValue.equals(value); - }); - } - return filters; - } protected void setupReader( String topic, diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/handlers/ChatHandler.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/handlers/ChatHandler.java index 436ecd41b..83076e5e1 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/handlers/ChatHandler.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/handlers/ChatHandler.java @@ -22,6 +22,7 @@ import ai.langstream.api.runner.code.Record; import ai.langstream.api.runner.topics.TopicConnectionsRuntimeRegistry; import ai.langstream.api.storage.ApplicationStore; +import ai.langstream.apigateway.gateways.ConsumeGateway; import ai.langstream.apigateway.gateways.GatewayRequestHandler; import ai.langstream.apigateway.gateways.ProduceGateway; import ai.langstream.apigateway.gateways.TopicProducerCache; @@ -138,35 +139,17 @@ public void onBeforeHandshakeCompleted( } private void setupProducer(AuthenticatedGatewayRequestContext context) throws Exception { - final Gateway.ChatOptions chatOptions = context.gateway().getChatOptions(); - - List headerConfig = new ArrayList<>(); - final List gwHeaders = chatOptions.getHeaders(); - if (gwHeaders != null) { - for (Gateway.KeyValueComparison gwHeader : gwHeaders) { - headerConfig.add(gwHeader); - } - } final List
commonHeaders = - ProduceGateway.getProducerCommonHeaders( - headerConfig, context.userParameters(), context.principalValues()); + ProduceGateway.getProducerCommonHeaders(context.gateway().getChatOptions(), context); - setupProducer(chatOptions.getQuestionsTopic(), commonHeaders, context); + setupProducer(context.gateway().getChatOptions().getQuestionsTopic(), commonHeaders, context); } private void setupReader(AuthenticatedGatewayRequestContext context) throws Exception { final Gateway.ChatOptions chatOptions = context.gateway().getChatOptions(); - - List headerFilters = new ArrayList<>(); - final List gwHeaders = chatOptions.getHeaders(); - if (gwHeaders != null) { - for (Gateway.KeyValueComparison gwHeader : gwHeaders) { - headerFilters.add(gwHeader); - } - } final List> messageFilters = - createMessageFilters( - headerFilters, context.userParameters(), context.principalValues()); + ConsumeGateway.createMessageFilters( + chatOptions.getHeaders(), context.userParameters(), context.principalValues()); setupReader(chatOptions.getAnswersTopic(), messageFilters, context); } diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/handlers/ConsumeHandler.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/handlers/ConsumeHandler.java index efa7a5feb..9b70d4fb0 100644 --- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/handlers/ConsumeHandler.java +++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/handlers/ConsumeHandler.java @@ -21,6 +21,7 @@ import ai.langstream.api.runner.code.Record; import ai.langstream.api.runner.topics.TopicConnectionsRuntimeRegistry; import ai.langstream.api.storage.ApplicationStore; +import ai.langstream.apigateway.gateways.ConsumeGateway; import ai.langstream.apigateway.gateways.GatewayRequestHandler; import ai.langstream.apigateway.gateways.TopicProducerCache; import ai.langstream.apigateway.websocket.AuthenticatedGatewayRequestContext; @@ -108,7 +109,7 @@ public void onBeforeHandshakeCompleted( final List> messageFilters; if (consumeOptions != null && consumeOptions.filters() != null) { messageFilters = - createMessageFilters( + ConsumeGateway.createMessageFilters( consumeOptions.filters().headers(), context.userParameters(), context.principalValues()); diff --git a/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java index db2f5d2db..53b474b0d 100644 --- a/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java +++ b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/http/GatewayResourceTest.java @@ -198,6 +198,19 @@ void produceAndExpectOk(String url, String content) { {"status":"OK","reason":null}""", response.body()); } + @SneakyThrows + String produceAndGetBody(String url, String content) { + final HttpRequest request = + HttpRequest.newBuilder(URI.create(url)) + .header("Content-Type", "application/json") + .POST(HttpRequest.BodyPublishers.ofString(content)) + .build(); + final HttpResponse response = + CLIENT.send(request, HttpResponse.BodyHandlers.ofString()); + assertEquals(200, response.statusCode()); + return response.body(); + } + @SneakyThrows void produceAndExpectBadRequest(String url, String content, String errorMessage) { final HttpRequest request = @@ -388,6 +401,34 @@ void testTestCredentials() throws Exception { "{\"value\": \"my-value\"}"); } + + @Test + void testService() throws Exception { + final String topic = genTopic(); + prepareTopicsForTest(topic); + testGateways = + new Gateways( + List.of( + Gateway.builder() + .id("svc") + .type(Gateway.GatewayType.service) + .serviceOptions( + new Gateway.ServiceOptions( + topic, topic, List.of()) + ) + .build())); + + final String url = + "http://localhost:%d/api/gateways/service/tenant1/application1/svc" + .formatted(port); + + assertEquals("{\"record\":{\"key\":\"my-key\",\"value\":\"my-value\",\"headers\":{}}," + + "\"offset\":\"eyJvZmZzZXRzIjp7IjAiOiIxIn19\"}", produceAndGetBody(url, "{\"key\": \"my-key\", \"value\": \"my-value\"}")); + + } + + + protected abstract StreamingCluster getStreamingCluster(); private void prepareTopicsForTest(String... topic) throws Exception { diff --git a/langstream-api-gateway/src/test/java/ai/langstream/apigateway/websocket/handlers/ProduceConsumeHandlerTest.java b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/websocket/handlers/ProduceConsumeHandlerTest.java index 4c4b60784..6f13c6381 100644 --- a/langstream-api-gateway/src/test/java/ai/langstream/apigateway/websocket/handlers/ProduceConsumeHandlerTest.java +++ b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/websocket/handlers/ProduceConsumeHandlerTest.java @@ -39,9 +39,9 @@ import ai.langstream.api.storage.ApplicationStore; import ai.langstream.apigateway.config.GatewayTestAuthenticationProperties; import ai.langstream.apigateway.runner.TopicConnectionsRuntimeProviderBean; -import ai.langstream.apigateway.websocket.api.ConsumePushMessage; -import ai.langstream.apigateway.websocket.api.ProduceRequest; -import ai.langstream.apigateway.websocket.api.ProduceResponse; +import ai.langstream.apigateway.api.ConsumePushMessage; +import ai.langstream.apigateway.api.ProduceRequest; +import ai.langstream.apigateway.api.ProduceResponse; import ai.langstream.impl.deploy.ApplicationDeployer; import ai.langstream.impl.parser.ModelBuilder; import com.fasterxml.jackson.core.JsonProcessingException; diff --git a/langstream-api/src/main/java/ai/langstream/api/model/Gateway.java b/langstream-api/src/main/java/ai/langstream/api/model/Gateway.java index 8ce97462d..b84210c42 100644 --- a/langstream-api/src/main/java/ai/langstream/api/model/Gateway.java +++ b/langstream-api/src/main/java/ai/langstream/api/model/Gateway.java @@ -45,13 +45,17 @@ public final class Gateway { @JsonProperty("chat-options") private ChatOptions chatOptions; + @JsonProperty("service-options") + private ServiceOptions serviceOptions; + @JsonProperty("events-topic") private String eventsTopic; public enum GatewayType { produce, consume, - chat + chat, + service } @Data @@ -118,10 +122,6 @@ public static KeyValueComparison valueFromAuthentication( return new KeyValueComparison(key, null, null, valueFromAuthentication); } - private String getKeyWithDefaultValue() { - - throw new IllegalStateException(); - } } public record ProduceOptions(List headers) {} @@ -143,4 +143,18 @@ public static class ChatOptions { List headers; } + + @Data + @NoArgsConstructor + @AllArgsConstructor + public static class ServiceOptions { + + @JsonProperty("input-topic") + private String inputTopic; + + @JsonProperty("output-topic") + private String outputTopic; + + List headers; + } } diff --git a/langstream-core/src/main/java/ai/langstream/impl/common/ApplicationPlaceholderResolver.java b/langstream-core/src/main/java/ai/langstream/impl/common/ApplicationPlaceholderResolver.java index b7d705a60..81c0f100f 100644 --- a/langstream-core/src/main/java/ai/langstream/impl/common/ApplicationPlaceholderResolver.java +++ b/langstream-core/src/main/java/ai/langstream/impl/common/ApplicationPlaceholderResolver.java @@ -204,6 +204,7 @@ private static Gateways resolveGateways(Application instance, Map