From fa0f24036ab8a1c35c72a521d1cee5909ec51440 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Mon, 9 Oct 2023 09:01:40 +0200 Subject: [PATCH] New http-request agent (#537) --- .../http-request-processor/README.md | 19 ++ .../http-request-processor/gateways.yaml | 23 ++ .../http-request-processor/pipeline.yaml | 52 ++++ .../langstream-agent-http-request/pom.xml | 92 +++++++ .../agents/queryhttp/HttpRequestAgent.java | 235 ++++++++++++++++++ .../queryhttp/HttpRequestAgentProvider.java | 39 +++ .../META-INF/ai.langstream.agents.index | 1 + ...ngstream.api.runner.code.AgentCodeProvider | 1 + .../langstream-agents-text-processing/pom.xml | 9 +- langstream-agents/pom.xml | 1 + .../k8s/agents/HttpRequestAgentProvider.java | 124 +++++++++ ...i.langstream.api.runtime.AgentNodeProvider | 3 +- .../langstream-runtime-impl/pom.xml | 17 ++ .../langstream/AbstractApplicationRunner.java | 9 +- .../kafka/CassandraAssetQueryWriteIT.java | 4 +- .../CassandraVectorAssetQueryWriteIT.java | 2 +- .../kafka/HttpRequestAgentRunnerIT.java | 220 ++++++++++++++++ .../MockAssetManagerCodeProvider.java | 6 +- 18 files changed, 837 insertions(+), 20 deletions(-) create mode 100644 examples/applications/http-request-processor/README.md create mode 100644 examples/applications/http-request-processor/gateways.yaml create mode 100644 examples/applications/http-request-processor/pipeline.yaml create mode 100644 langstream-agents/langstream-agent-http-request/pom.xml create mode 100644 langstream-agents/langstream-agent-http-request/src/main/java/ai/langstream/agents/queryhttp/HttpRequestAgent.java create mode 100644 langstream-agents/langstream-agent-http-request/src/main/java/ai/langstream/agents/queryhttp/HttpRequestAgentProvider.java create mode 100644 langstream-agents/langstream-agent-http-request/src/main/resources/META-INF/ai.langstream.agents.index create mode 100644 langstream-agents/langstream-agent-http-request/src/main/resources/META-INF/services/ai.langstream.api.runner.code.AgentCodeProvider create mode 100644 langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/HttpRequestAgentProvider.java create mode 100644 langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/HttpRequestAgentRunnerIT.java diff --git a/examples/applications/http-request-processor/README.md b/examples/applications/http-request-processor/README.md new file mode 100644 index 000000000..347b662a4 --- /dev/null +++ b/examples/applications/http-request-processor/README.md @@ -0,0 +1,19 @@ +# Http Request processor + +This sample application shows how to execute the http request processor. + + +## Deploy the LangStream application +``` +./bin/langstream apps deploy test -app examples/applications/http-request-processor -i examples/instances/kafka-kubernetes.yaml +``` + +## Chat + +Since the application opens a gateway, we can use the gateway API to send and consume messages using the use gateway `chat` feature: +``` +./bin/langstream gateway chat test -g chat +``` + +Ask about a city and you will get the timezone. + diff --git a/examples/applications/http-request-processor/gateways.yaml b/examples/applications/http-request-processor/gateways.yaml new file mode 100644 index 000000000..db7563437 --- /dev/null +++ b/examples/applications/http-request-processor/gateways.yaml @@ -0,0 +1,23 @@ +# +# +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +gateways: + - id: chat + type: chat + chat-options: + answers-topic: output-topic + questions-topic: input-topic diff --git a/examples/applications/http-request-processor/pipeline.yaml b/examples/applications/http-request-processor/pipeline.yaml new file mode 100644 index 000000000..e2518da8c --- /dev/null +++ b/examples/applications/http-request-processor/pipeline.yaml @@ -0,0 +1,52 @@ +# +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +topics: + - name: "input-topic" + creation-mode: create-if-not-exists + - name: "output-topic" + creation-mode: create-if-not-exists +pipeline: + - name: "convert-to-json" + type: "document-to-json" + input: "input-topic" + configuration: + text-field: "city" + - name: "Get timezone info from geocoding API" + type: "http-request" + configuration: + url: "https://geocoding-api.open-meteo.com/v1/search" + query-string: + name: "{{{ value.city }}}" + count: "1" + format: "json" + output-field: "value.response" + method: "GET" + + - name: "Extract timezone" + type: "compute" + configuration: + fields: + - name: "value.timezone" + expression: "value.response.results[0].timezone" + type: STRING + + - name: "Extract timezone" + type: "drop-fields" + output: "output-topic" + configuration: + fields: + - response diff --git a/langstream-agents/langstream-agent-http-request/pom.xml b/langstream-agents/langstream-agent-http-request/pom.xml new file mode 100644 index 000000000..42e614eef --- /dev/null +++ b/langstream-agents/langstream-agent-http-request/pom.xml @@ -0,0 +1,92 @@ + + + + + langstream-agents + ai.langstream + 0.1.1-SNAPSHOT + + 4.0.0 + + langstream-agent-http-request + + + ${project.groupId} + langstream-api + ${project.version} + provided + + + ${project.groupId} + langstream-agents-commons + ${project.version} + + + org.projectlombok + lombok + provided + + + org.slf4j + slf4j-api + + + com.samskivert + jmustache + + + ch.qos.logback + logback-core + test + + + ch.qos.logback + logback-classic + test + + + org.junit.jupiter + junit-jupiter + test + + + + + + org.apache.nifi + nifi-nar-maven-plugin + true + + nar + + + + default-nar + package + + nar + + + + + + + \ No newline at end of file diff --git a/langstream-agents/langstream-agent-http-request/src/main/java/ai/langstream/agents/queryhttp/HttpRequestAgent.java b/langstream-agents/langstream-agent-http-request/src/main/java/ai/langstream/agents/queryhttp/HttpRequestAgent.java new file mode 100644 index 000000000..d2d2c6e07 --- /dev/null +++ b/langstream-agents/langstream-agent-http-request/src/main/java/ai/langstream/agents/queryhttp/HttpRequestAgent.java @@ -0,0 +1,235 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.agents.queryhttp; + +import ai.langstream.ai.agents.commons.JsonRecord; +import ai.langstream.ai.agents.commons.TransformContext; +import ai.langstream.api.runner.code.AbstractAgentCode; +import ai.langstream.api.runner.code.AgentContext; +import ai.langstream.api.runner.code.AgentProcessor; +import ai.langstream.api.runner.code.Record; +import ai.langstream.api.runner.code.RecordSink; +import ai.langstream.api.runtime.ComponentType; +import ai.langstream.api.util.ConfigurationUtils; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.samskivert.mustache.Mustache; +import com.samskivert.mustache.Template; +import java.net.CookieManager; +import java.net.CookiePolicy; +import java.net.URI; +import java.net.URLEncoder; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; +import org.apache.avro.Schema; + +@Slf4j +public class HttpRequestAgent extends AbstractAgentCode implements AgentProcessor { + + static final ObjectMapper mapper = new ObjectMapper(); + private final Map avroValueSchemaCache = new ConcurrentHashMap<>(); + + private final Map avroKeySchemaCache = new ConcurrentHashMap<>(); + + private AgentContext agentContext; + private ExecutorService executor; + private HttpClient httpClient; + private String url; + private String method; + private Map queryStringTemplates; + private Map headersTemplates; + private Template bodyTemplate; + private String outputFieldName; + + @SuppressWarnings("unchecked") + @Override + public void init(Map configuration) { + this.url = + ConfigurationUtils.requiredNonEmptyField( + configuration, "url", () -> "http-request agent"); + this.outputFieldName = + ConfigurationUtils.requiredNonEmptyField( + configuration, "output-field", () -> "http-request agent"); + this.method = ConfigurationUtils.getString("method", "GET", configuration); + final String body = ConfigurationUtils.getString("body", null, configuration); + final Map headers = + ConfigurationUtils.getMap("headers", new HashMap<>(), configuration); + final Map queryString = + ConfigurationUtils.getMap("query-string", new HashMap<>(), configuration); + final boolean allowRedirects = + ConfigurationUtils.getBoolean("allow-redirects", true, configuration); + final boolean handleCookies = + ConfigurationUtils.getBoolean("handle-cookies", true, configuration); + + queryStringTemplates = new HashMap<>(); + for (Map.Entry entry : queryString.entrySet()) { + queryStringTemplates.put( + entry.getKey(), Mustache.compiler().compile(entry.getValue().toString())); + } + + headersTemplates = new HashMap<>(); + for (Map.Entry entry : headers.entrySet()) { + headersTemplates.put( + entry.getKey(), Mustache.compiler().compile(entry.getValue().toString())); + } + if (body != null) { + bodyTemplate = Mustache.compiler().compile(body); + } + + executor = Executors.newCachedThreadPool(); + CookieManager cookieManager = new CookieManager(); + cookieManager.setCookiePolicy( + handleCookies ? CookiePolicy.ACCEPT_ALL : CookiePolicy.ACCEPT_NONE); + httpClient = + HttpClient.newBuilder() + .followRedirects( + allowRedirects + ? HttpClient.Redirect.NORMAL + : HttpClient.Redirect.NEVER) + .cookieHandler(cookieManager) + .executor(executor) + .build(); + } + + @Override + public void setContext(AgentContext context) throws Exception { + this.agentContext = context; + } + + @Override + public void process(List records, RecordSink recordSink) { + for (Record record : records) { + processRecord(record, recordSink); + } + } + + @Override + public ComponentType componentType() { + return ComponentType.PROCESSOR; + } + + public void processRecord(Record record, RecordSink recordSink) { + try { + TransformContext context = TransformContext.recordToTransformContext(record, true); + final JsonRecord jsonRecord = context.toJsonRecord(); + + final URI uri = URI.create(url + computeQueryString(jsonRecord)); + final HttpRequest.BodyPublisher bodyPublisher; + if (bodyTemplate != null) { + bodyPublisher = + HttpRequest.BodyPublishers.ofString(bodyTemplate.execute(jsonRecord)); + } else { + bodyPublisher = HttpRequest.BodyPublishers.noBody(); + } + final HttpRequest.Builder requestBuilder = + HttpRequest.newBuilder() + .uri(uri) + .version(HttpClient.Version.HTTP_1_1) + .method(this.method, bodyPublisher); + headersTemplates.forEach( + (key, value) -> requestBuilder.header(key, value.execute(jsonRecord))); + final HttpRequest request = requestBuilder.build(); + + httpClient + .sendAsync(request, HttpResponse.BodyHandlers.ofString()) + .thenAccept( + response -> { + try { + if (response.statusCode() >= 400) { + throw new RuntimeException( + "Error processing record: " + + record + + " with response: " + + response); + } + final Object body = parseResponseBody(response); + context.setResultField( + body, + outputFieldName, + null, + avroKeySchemaCache, + avroValueSchemaCache); + Optional recordResult = + TransformContext.transformContextToRecord(context); + if (log.isDebugEnabled()) { + log.debug("recordResult {}", recordResult); + } + if (recordResult.isPresent()) { + recordSink.emit( + new SourceRecordAndResult( + record, List.of(recordResult.get()), null)); + } else { + recordSink.emit( + new SourceRecordAndResult(record, List.of(), null)); + } + } catch (Exception e) { + log.error("Error processing record: {}", record, e); + recordSink.emit( + new SourceRecordAndResult(record, List.of(), e)); + } + }); + } catch (Throwable error) { + log.error("Error processing record: {}", record, error); + recordSink.emit(new SourceRecordAndResult(record, null, error)); + } + } + + private Object parseResponseBody(HttpResponse response) { + try { + return mapper.readValue(response.body(), Map.class); + } catch (JsonProcessingException ex) { + log.debug("Not able to parse response to json: {}, {}", response.body(), ex); + } + return response.body(); + } + + private String computeQueryString(JsonRecord jsonRecord) { + if (queryStringTemplates.isEmpty()) { + return ""; + } + + return "?" + + queryStringTemplates.entrySet().stream() + .map( + e -> { + final String resolved = e.getValue().execute(jsonRecord); + return encodeParam(e.getKey(), resolved); + }) + .collect(Collectors.joining("&")); + } + + private static String encodeParam(String key, String value) { + return String.format("%s=%s", key, URLEncoder.encode(value, StandardCharsets.UTF_8)); + } + + @Override + public void close() throws Exception { + if (executor != null) { + executor.shutdownNow(); + } + } +} diff --git a/langstream-agents/langstream-agent-http-request/src/main/java/ai/langstream/agents/queryhttp/HttpRequestAgentProvider.java b/langstream-agents/langstream-agent-http-request/src/main/java/ai/langstream/agents/queryhttp/HttpRequestAgentProvider.java new file mode 100644 index 000000000..c0ca6f854 --- /dev/null +++ b/langstream-agents/langstream-agent-http-request/src/main/java/ai/langstream/agents/queryhttp/HttpRequestAgentProvider.java @@ -0,0 +1,39 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.agents.queryhttp; + +import ai.langstream.api.runner.code.AgentCodeProvider; +import ai.langstream.api.runner.code.AgentProcessor; +import java.util.Map; +import java.util.function.Supplier; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class HttpRequestAgentProvider implements AgentCodeProvider { + + private static final Map> FACTORIES = + Map.of("http-request", HttpRequestAgent::new); + + @Override + public boolean supports(String agentType) { + return FACTORIES.containsKey(agentType); + } + + @Override + public AgentProcessor createInstance(String agentType) { + return FACTORIES.get(agentType).get(); + } +} diff --git a/langstream-agents/langstream-agent-http-request/src/main/resources/META-INF/ai.langstream.agents.index b/langstream-agents/langstream-agent-http-request/src/main/resources/META-INF/ai.langstream.agents.index new file mode 100644 index 000000000..1ddcb4e59 --- /dev/null +++ b/langstream-agents/langstream-agent-http-request/src/main/resources/META-INF/ai.langstream.agents.index @@ -0,0 +1 @@ +query-http \ No newline at end of file diff --git a/langstream-agents/langstream-agent-http-request/src/main/resources/META-INF/services/ai.langstream.api.runner.code.AgentCodeProvider b/langstream-agents/langstream-agent-http-request/src/main/resources/META-INF/services/ai.langstream.api.runner.code.AgentCodeProvider new file mode 100644 index 000000000..248a0d85f --- /dev/null +++ b/langstream-agents/langstream-agent-http-request/src/main/resources/META-INF/services/ai.langstream.api.runner.code.AgentCodeProvider @@ -0,0 +1 @@ +ai.langstream.agents.queryhttp.HttpRequestAgentProvider \ No newline at end of file diff --git a/langstream-agents/langstream-agents-text-processing/pom.xml b/langstream-agents/langstream-agents-text-processing/pom.xml index 82ad2dc96..5fca5e28d 100644 --- a/langstream-agents/langstream-agents-text-processing/pom.xml +++ b/langstream-agents/langstream-agents-text-processing/pom.xml @@ -26,10 +26,6 @@ langstream-agents-text-processing jar LangStream - Apache Tika Agents - - 17 - 17 - ${project.groupId} @@ -90,12 +86,17 @@ org.testcontainers localstack + test org.testcontainers junit-jupiter + test + + 2.8.0 + diff --git a/langstream-agents/pom.xml b/langstream-agents/pom.xml index cb13f7814..e362ad4c8 100644 --- a/langstream-agents/pom.xml +++ b/langstream-agents/pom.xml @@ -40,5 +40,6 @@ langstream-ai-agents langstream-vector-agents langstream-agents-flow-control + langstream-agent-http-request diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/HttpRequestAgentProvider.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/HttpRequestAgentProvider.java new file mode 100644 index 000000000..c44454eef --- /dev/null +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/HttpRequestAgentProvider.java @@ -0,0 +1,124 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.runtime.impl.k8s.agents; + +import ai.langstream.api.doc.AgentConfig; +import ai.langstream.api.doc.ConfigProperty; +import ai.langstream.api.model.AgentConfiguration; +import ai.langstream.api.runtime.ComponentType; +import ai.langstream.impl.agents.AbstractComposableAgentProvider; +import ai.langstream.runtime.impl.k8s.KubernetesClusterRuntime; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.List; +import java.util.Map; +import java.util.Set; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class HttpRequestAgentProvider extends AbstractComposableAgentProvider { + + private static final Set SUPPORTED_AGENT_TYPES = Set.of("http-request"); + + public HttpRequestAgentProvider() { + super(SUPPORTED_AGENT_TYPES, List.of(KubernetesClusterRuntime.CLUSTER_TYPE, "none")); + } + + @Override + protected ComponentType getComponentType(AgentConfiguration agentConfiguration) { + return ComponentType.PROCESSOR; + } + + @Override + protected Class getAgentConfigModelClass(String type) { + return Config.class; + } + + @AgentConfig( + name = "Http Request", + description = + """ + Agent for enriching data with an HTTP request. + """) + @Data + public static class Config { + @ConfigProperty( + description = + """ + Url to send the request to. For adding query string parameters, use the `query-string` field. + """, + required = true) + private String url; + + @ConfigProperty( + description = + """ + The field that will hold the results, it can be the same as "field" to override it. + """, + required = true) + @JsonProperty("output-field") + private String outputFieldName; + + @ConfigProperty( + description = + """ + Http method to use for the request. + """, + defaultValue = "GET") + private String method; + + @ConfigProperty( + description = + """ + Headers to send with the request. You can use the Mustache syntax to inject value from the context. + """) + private Map headers; + + @ConfigProperty( + description = + """ + Query string to append to the url. You can use the Mustache syntax to inject value from the context. + Note that the values will be automatically escaped. + """) + @JsonProperty("query-string") + private Map queryString; + + @ConfigProperty( + description = + """ + Body to send with the request. You can use the Mustache syntax to inject value from the context. + """) + private String body; + + @ConfigProperty( + description = + """ + Whether or not to follow redirects. + """, + defaultValue = "true") + @JsonProperty("allow-redirects") + private boolean allowRedirects; + + @ConfigProperty( + description = + """ + Whether or not to handle cookies during the redirects. + """, + defaultValue = "true") + @JsonProperty("handle-cookies") + private boolean handleCookies; + } +} diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/resources/META-INF/services/ai.langstream.api.runtime.AgentNodeProvider b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/resources/META-INF/services/ai.langstream.api.runtime.AgentNodeProvider index 308f54f86..61bbaaea6 100644 --- a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/resources/META-INF/services/ai.langstream.api.runtime.AgentNodeProvider +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/resources/META-INF/services/ai.langstream.api.runtime.AgentNodeProvider @@ -8,4 +8,5 @@ ai.langstream.runtime.impl.k8s.agents.KubernetesCompositeAgentProvider ai.langstream.runtime.impl.k8s.agents.IdentityAgentProvider ai.langstream.runtime.impl.k8s.agents.QueryVectorDBAgentProvider ai.langstream.runtime.impl.k8s.agents.ReRankAgentProvider -ai.langstream.runtime.impl.k8s.agents.FlowControlAgentsProvider \ No newline at end of file +ai.langstream.runtime.impl.k8s.agents.FlowControlAgentsProvider +ai.langstream.runtime.impl.k8s.agents.HttpRequestAgentProvider diff --git a/langstream-runtime/langstream-runtime-impl/pom.xml b/langstream-runtime/langstream-runtime-impl/pom.xml index 374acca30..9dc7cba36 100644 --- a/langstream-runtime/langstream-runtime-impl/pom.xml +++ b/langstream-runtime/langstream-runtime-impl/pom.xml @@ -290,6 +290,13 @@ provided + + ${project.groupId} + langstream-agent-http-request + ${project.version} + provided + + @@ -525,6 +532,16 @@ ${project.build.directory}/agents + + ${project.groupId} + langstream-agent-http-request + ${project.version} + nar + nar + false + ${project.build.directory}/agents + + diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/AbstractApplicationRunner.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/AbstractApplicationRunner.java index 675108d40..95b18decb 100644 --- a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/AbstractApplicationRunner.java +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/AbstractApplicationRunner.java @@ -108,6 +108,7 @@ public T getGlobal(String key) { } public void close() { + applicationDeployer.cleanup(tenant, implementation); applicationDeployer.delete(tenant, implementation, null); Awaitility.await() .until( @@ -115,12 +116,6 @@ public void close() { log.info("Waiting for secrets to be deleted. {}", secrets); return secrets.isEmpty(); }); - // this is a workaround, we want to clean up the env - topicConnectionsRuntimeRegistry - .getTopicConnectionsRuntime( - implementation.getApplication().getInstance().streamingCluster()) - .asTopicConnectionsRuntime() - .delete(implementation); } } @@ -265,7 +260,7 @@ protected List waitForMessages(KafkaConsumer consumer, List e assertArrayEquals((byte[]) expectedValue, (byte[]) actualValue); } else { log.info("expected: {}", expectedValue); - log.info("got: {}", actualValue); + log.info("got: {}", actualValue); assertEquals(expectedValue, actualValue); } } diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/CassandraAssetQueryWriteIT.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/CassandraAssetQueryWriteIT.java index 33ba749a3..3bf1b1011 100644 --- a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/CassandraAssetQueryWriteIT.java +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/CassandraAssetQueryWriteIT.java @@ -81,7 +81,7 @@ public void testCassandra() throws Exception { create-statements: - "CREATE KEYSPACE vsearch WITH REPLICATION = {'class' : 'SimpleStrategy','replication_factor' : 1};" delete-statements: - - "DROP KEYSPACE vsearch;" + - "DROP KEYSPACE IF EXISTS vsearch;" - name: "documents-table" asset-type: "cassandra-table" creation-mode: create-if-not-exists @@ -219,7 +219,7 @@ public void testCassandraTable() throws Exception { - "CREATE TABLE IF NOT EXISTS v1.documents (id int PRIMARY KEY, name text, description text);" - "INSERT INTO v1.documents (id, name, description) VALUES (1, 'A', 'A description');" delete-statements: - - "DROP TABLE v1.documents;" + - "DROP TABLE IF EXISTS v1.documents;" topics: - name: "input-topic" creation-mode: create-if-not-exists diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/CassandraVectorAssetQueryWriteIT.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/CassandraVectorAssetQueryWriteIT.java index 9f619756f..91ffa3277 100644 --- a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/CassandraVectorAssetQueryWriteIT.java +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/CassandraVectorAssetQueryWriteIT.java @@ -85,7 +85,7 @@ public void testCassandra() throws Exception { create-statements: - "CREATE KEYSPACE vsearch WITH REPLICATION = {'class' : 'SimpleStrategy','replication_factor' : 1};" delete-statements: - - "DROP KEYSPACE vsearch;" + - "DROP KEYSPACE IF EXISTS vsearch;" - name: "documents-table" asset-type: "cassandra-table" creation-mode: create-if-not-exists diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/HttpRequestAgentRunnerIT.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/HttpRequestAgentRunnerIT.java new file mode 100644 index 000000000..f48fdd9bf --- /dev/null +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/HttpRequestAgentRunnerIT.java @@ -0,0 +1,220 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.kafka; + +import static com.github.tomakehurst.wiremock.client.WireMock.equalTo; +import static com.github.tomakehurst.wiremock.client.WireMock.get; +import static com.github.tomakehurst.wiremock.client.WireMock.ok; +import static com.github.tomakehurst.wiremock.client.WireMock.okJson; +import static com.github.tomakehurst.wiremock.client.WireMock.post; +import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; + +import ai.langstream.AbstractApplicationRunner; +import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo; +import com.github.tomakehurst.wiremock.junit5.WireMockTest; +import java.util.List; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +@Slf4j +@WireMockTest +class HttpRequestAgentRunnerIT extends AbstractApplicationRunner { + + static WireMockRuntimeInfo wireMockRuntimeInfo; + + @BeforeAll + static void onBeforeAll(WireMockRuntimeInfo info) { + wireMockRuntimeInfo = info; + } + + @Test + void testGetJson() throws Exception { + + stubFor( + get("/api/models?name=my-model") + .willReturn( + okJson( + """ + {"id": "my-model", + "created": "2021-08-31T12:00:00Z", + "model": "gpt-35-turbo", + "object": "text-generation", + "choices": [{"text": "It is a car."}]} + """))); + + Map application = + Map.of( + "module.yaml", + """ + topics: + - name: "input-topic" + creation-mode: create-if-not-exists + deletion-mode: delete + - name: "output-topic" + creation-mode: create-if-not-exists + deletion-mode: delete + pipeline: + - name: "http-request" + type: "http-request" + input: input-topic + output: output-topic + id: step1 + configuration: + output-field: value.api + url: %s/api/models + query-string: + name: "{{{ value.id }}}" + + """ + .formatted(wireMockRuntimeInfo.getHttpBaseUrl())); + + final String e1 = + """ + {"id":"my-model","classification":"good","api":{"id":"my-model","created":"2021-08-31T12:00:00Z","model":"gpt-35-turbo","object":"text-generation","choices":[{"text":"It is a car."}]}}"""; + runAndAssertMessage(application, e1); + } + + @Test + void testGetRawText() throws Exception { + + stubFor( + get("/api/models?name=my-model") + .willReturn( + ok(""" + some-string"""))); + + Map application = + Map.of( + "module.yaml", + """ + topics: + - name: "input-topic" + creation-mode: create-if-not-exists + deletion-mode: delete + - name: "output-topic" + creation-mode: create-if-not-exists + deletion-mode: delete + pipeline: + - name: "http-request" + type: "http-request" + input: input-topic + output: output-topic + id: step1 + configuration: + output-field: value.api + url: %s/api/models + query-string: + name: "{{{ value.id }}}" + + """ + .formatted(wireMockRuntimeInfo.getHttpBaseUrl())); + + final String e1 = + """ + {"id":"my-model","classification":"good","api":"some-string"}"""; + runAndAssertMessage(application, e1); + } + + @Test + void testPostWithBody() throws Exception { + + stubFor( + post("/api/models?name=my-model") + .withHeader("Content-Type", equalTo("application/json")) + .withHeader("Authorization", equalTo("Bearer my-token!")) + .withRequestBody(equalTo("{\"id\": \"my-model\"}")) + .willReturn( + okJson( + """ + {"id": "my-model", + "created": "2021-08-31T12:00:00Z", + "model": "gpt-35-turbo", + "object": "text-generation", + "choices": [{"text": "It is a car."}]} + """))); + + Map application = + Map.of( + "module.yaml", + """ + topics: + - name: "input-topic" + creation-mode: create-if-not-exists + deletion-mode: delete + - name: "output-topic" + creation-mode: create-if-not-exists + deletion-mode: delete + pipeline: + - name: "http-request" + type: "http-request" + input: input-topic + output: output-topic + id: step1 + configuration: + output-field: value.api + url: %s/api/models + query-string: + name: "{{{ value.id }}}" + method: POST + body: '{"id": "{{{ value.id }}}"}' + headers: + Content-Type: application/json + Authorization: Bearer {{{ secrets.s1.token }}} + """ + .formatted(wireMockRuntimeInfo.getHttpBaseUrl())); + + final String e1 = + """ + {"id":"my-model","classification":"good","api":{"id":"my-model","created":"2021-08-31T12:00:00Z","model":"gpt-35-turbo","object":"text-generation","choices":[{"text":"It is a car."}]}}"""; + runAndAssertMessage(application, e1); + } + + private void runAndAssertMessage(Map application, String e1) throws Exception { + String tenant = "tenant"; + String[] expectedAgents = {"app-step1"}; + + // write some data + try (ApplicationRuntime applicationRuntime = + deployApplicationWithSecrets( + tenant, + "app", + application, + buildInstanceYaml(), + """ + secrets: + - id: s1 + data: + token: my-token! + """, + expectedAgents)) { + try (KafkaProducer producer = createProducer(); + KafkaConsumer consumer = createConsumer("output-topic")) { + + sendMessage( + "input-topic", + "{\"id\":\"my-model\",\"classification\":\"good\"}", + producer); + executeAgentRunners(applicationRuntime); + + waitForMessages(consumer, List.of(e1)); + } + } + } +} diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/mockagents/MockAssetManagerCodeProvider.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/mockagents/MockAssetManagerCodeProvider.java index 278f92e2c..546ce7ab6 100644 --- a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/mockagents/MockAssetManagerCodeProvider.java +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/mockagents/MockAssetManagerCodeProvider.java @@ -80,11 +80,7 @@ public synchronized void deployAsset() throws Exception { @Override public synchronized boolean deleteAssetIfExists() throws Exception { log.info("Deleting asset {}", assetDefinition); - final boolean remove = DEPLOYED_ASSETS.remove(assetDefinition); - if (!remove) { - throw new IllegalStateException("Asset not found: " + assetDefinition); - } - return true; + return DEPLOYED_ASSETS.remove(assetDefinition); } } }