diff --git a/examples/applications/http-request-processor/README.md b/examples/applications/http-request-processor/README.md
new file mode 100644
index 000000000..347b662a4
--- /dev/null
+++ b/examples/applications/http-request-processor/README.md
@@ -0,0 +1,19 @@
+# Http Request processor
+
+This sample application shows how to execute the http request processor.
+
+
+## Deploy the LangStream application
+```
+./bin/langstream apps deploy test -app examples/applications/http-request-processor -i examples/instances/kafka-kubernetes.yaml
+```
+
+## Chat
+
+Since the application opens a gateway, we can use the gateway API to send and consume messages using the use gateway `chat` feature:
+```
+./bin/langstream gateway chat test -g chat
+```
+
+Ask about a city and you will get the timezone.
+
diff --git a/examples/applications/http-request-processor/gateways.yaml b/examples/applications/http-request-processor/gateways.yaml
new file mode 100644
index 000000000..db7563437
--- /dev/null
+++ b/examples/applications/http-request-processor/gateways.yaml
@@ -0,0 +1,23 @@
+#
+#
+# Copyright DataStax, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+gateways:
+ - id: chat
+ type: chat
+ chat-options:
+ answers-topic: output-topic
+ questions-topic: input-topic
diff --git a/examples/applications/http-request-processor/pipeline.yaml b/examples/applications/http-request-processor/pipeline.yaml
new file mode 100644
index 000000000..e2518da8c
--- /dev/null
+++ b/examples/applications/http-request-processor/pipeline.yaml
@@ -0,0 +1,52 @@
+#
+# Copyright DataStax, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+topics:
+ - name: "input-topic"
+ creation-mode: create-if-not-exists
+ - name: "output-topic"
+ creation-mode: create-if-not-exists
+pipeline:
+ - name: "convert-to-json"
+ type: "document-to-json"
+ input: "input-topic"
+ configuration:
+ text-field: "city"
+ - name: "Get timezone info from geocoding API"
+ type: "http-request"
+ configuration:
+ url: "https://geocoding-api.open-meteo.com/v1/search"
+ query-string:
+ name: "{{{ value.city }}}"
+ count: "1"
+ format: "json"
+ output-field: "value.response"
+ method: "GET"
+
+ - name: "Extract timezone"
+ type: "compute"
+ configuration:
+ fields:
+ - name: "value.timezone"
+ expression: "value.response.results[0].timezone"
+ type: STRING
+
+ - name: "Extract timezone"
+ type: "drop-fields"
+ output: "output-topic"
+ configuration:
+ fields:
+ - response
diff --git a/langstream-agents/langstream-agent-http-request/pom.xml b/langstream-agents/langstream-agent-http-request/pom.xml
new file mode 100644
index 000000000..42e614eef
--- /dev/null
+++ b/langstream-agents/langstream-agent-http-request/pom.xml
@@ -0,0 +1,92 @@
+
+
+
+
+ langstream-agents
+ ai.langstream
+ 0.1.1-SNAPSHOT
+
+ 4.0.0
+
+ langstream-agent-http-request
+
+
+ ${project.groupId}
+ langstream-api
+ ${project.version}
+ provided
+
+
+ ${project.groupId}
+ langstream-agents-commons
+ ${project.version}
+
+
+ org.projectlombok
+ lombok
+ provided
+
+
+ org.slf4j
+ slf4j-api
+
+
+ com.samskivert
+ jmustache
+
+
+ ch.qos.logback
+ logback-core
+ test
+
+
+ ch.qos.logback
+ logback-classic
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter
+ test
+
+
+
+
+
+ org.apache.nifi
+ nifi-nar-maven-plugin
+ true
+
+ nar
+
+
+
+ default-nar
+ package
+
+ nar
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/langstream-agents/langstream-agent-http-request/src/main/java/ai/langstream/agents/queryhttp/HttpRequestAgent.java b/langstream-agents/langstream-agent-http-request/src/main/java/ai/langstream/agents/queryhttp/HttpRequestAgent.java
new file mode 100644
index 000000000..d2d2c6e07
--- /dev/null
+++ b/langstream-agents/langstream-agent-http-request/src/main/java/ai/langstream/agents/queryhttp/HttpRequestAgent.java
@@ -0,0 +1,235 @@
+/*
+ * Copyright DataStax, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ai.langstream.agents.queryhttp;
+
+import ai.langstream.ai.agents.commons.JsonRecord;
+import ai.langstream.ai.agents.commons.TransformContext;
+import ai.langstream.api.runner.code.AbstractAgentCode;
+import ai.langstream.api.runner.code.AgentContext;
+import ai.langstream.api.runner.code.AgentProcessor;
+import ai.langstream.api.runner.code.Record;
+import ai.langstream.api.runner.code.RecordSink;
+import ai.langstream.api.runtime.ComponentType;
+import ai.langstream.api.util.ConfigurationUtils;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.samskivert.mustache.Mustache;
+import com.samskivert.mustache.Template;
+import java.net.CookieManager;
+import java.net.CookiePolicy;
+import java.net.URI;
+import java.net.URLEncoder;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.Schema;
+
+@Slf4j
+public class HttpRequestAgent extends AbstractAgentCode implements AgentProcessor {
+
+ static final ObjectMapper mapper = new ObjectMapper();
+ private final Map avroValueSchemaCache = new ConcurrentHashMap<>();
+
+ private final Map avroKeySchemaCache = new ConcurrentHashMap<>();
+
+ private AgentContext agentContext;
+ private ExecutorService executor;
+ private HttpClient httpClient;
+ private String url;
+ private String method;
+ private Map queryStringTemplates;
+ private Map headersTemplates;
+ private Template bodyTemplate;
+ private String outputFieldName;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void init(Map configuration) {
+ this.url =
+ ConfigurationUtils.requiredNonEmptyField(
+ configuration, "url", () -> "http-request agent");
+ this.outputFieldName =
+ ConfigurationUtils.requiredNonEmptyField(
+ configuration, "output-field", () -> "http-request agent");
+ this.method = ConfigurationUtils.getString("method", "GET", configuration);
+ final String body = ConfigurationUtils.getString("body", null, configuration);
+ final Map headers =
+ ConfigurationUtils.getMap("headers", new HashMap<>(), configuration);
+ final Map queryString =
+ ConfigurationUtils.getMap("query-string", new HashMap<>(), configuration);
+ final boolean allowRedirects =
+ ConfigurationUtils.getBoolean("allow-redirects", true, configuration);
+ final boolean handleCookies =
+ ConfigurationUtils.getBoolean("handle-cookies", true, configuration);
+
+ queryStringTemplates = new HashMap<>();
+ for (Map.Entry entry : queryString.entrySet()) {
+ queryStringTemplates.put(
+ entry.getKey(), Mustache.compiler().compile(entry.getValue().toString()));
+ }
+
+ headersTemplates = new HashMap<>();
+ for (Map.Entry entry : headers.entrySet()) {
+ headersTemplates.put(
+ entry.getKey(), Mustache.compiler().compile(entry.getValue().toString()));
+ }
+ if (body != null) {
+ bodyTemplate = Mustache.compiler().compile(body);
+ }
+
+ executor = Executors.newCachedThreadPool();
+ CookieManager cookieManager = new CookieManager();
+ cookieManager.setCookiePolicy(
+ handleCookies ? CookiePolicy.ACCEPT_ALL : CookiePolicy.ACCEPT_NONE);
+ httpClient =
+ HttpClient.newBuilder()
+ .followRedirects(
+ allowRedirects
+ ? HttpClient.Redirect.NORMAL
+ : HttpClient.Redirect.NEVER)
+ .cookieHandler(cookieManager)
+ .executor(executor)
+ .build();
+ }
+
+ @Override
+ public void setContext(AgentContext context) throws Exception {
+ this.agentContext = context;
+ }
+
+ @Override
+ public void process(List records, RecordSink recordSink) {
+ for (Record record : records) {
+ processRecord(record, recordSink);
+ }
+ }
+
+ @Override
+ public ComponentType componentType() {
+ return ComponentType.PROCESSOR;
+ }
+
+ public void processRecord(Record record, RecordSink recordSink) {
+ try {
+ TransformContext context = TransformContext.recordToTransformContext(record, true);
+ final JsonRecord jsonRecord = context.toJsonRecord();
+
+ final URI uri = URI.create(url + computeQueryString(jsonRecord));
+ final HttpRequest.BodyPublisher bodyPublisher;
+ if (bodyTemplate != null) {
+ bodyPublisher =
+ HttpRequest.BodyPublishers.ofString(bodyTemplate.execute(jsonRecord));
+ } else {
+ bodyPublisher = HttpRequest.BodyPublishers.noBody();
+ }
+ final HttpRequest.Builder requestBuilder =
+ HttpRequest.newBuilder()
+ .uri(uri)
+ .version(HttpClient.Version.HTTP_1_1)
+ .method(this.method, bodyPublisher);
+ headersTemplates.forEach(
+ (key, value) -> requestBuilder.header(key, value.execute(jsonRecord)));
+ final HttpRequest request = requestBuilder.build();
+
+ httpClient
+ .sendAsync(request, HttpResponse.BodyHandlers.ofString())
+ .thenAccept(
+ response -> {
+ try {
+ if (response.statusCode() >= 400) {
+ throw new RuntimeException(
+ "Error processing record: "
+ + record
+ + " with response: "
+ + response);
+ }
+ final Object body = parseResponseBody(response);
+ context.setResultField(
+ body,
+ outputFieldName,
+ null,
+ avroKeySchemaCache,
+ avroValueSchemaCache);
+ Optional recordResult =
+ TransformContext.transformContextToRecord(context);
+ if (log.isDebugEnabled()) {
+ log.debug("recordResult {}", recordResult);
+ }
+ if (recordResult.isPresent()) {
+ recordSink.emit(
+ new SourceRecordAndResult(
+ record, List.of(recordResult.get()), null));
+ } else {
+ recordSink.emit(
+ new SourceRecordAndResult(record, List.of(), null));
+ }
+ } catch (Exception e) {
+ log.error("Error processing record: {}", record, e);
+ recordSink.emit(
+ new SourceRecordAndResult(record, List.of(), e));
+ }
+ });
+ } catch (Throwable error) {
+ log.error("Error processing record: {}", record, error);
+ recordSink.emit(new SourceRecordAndResult(record, null, error));
+ }
+ }
+
+ private Object parseResponseBody(HttpResponse response) {
+ try {
+ return mapper.readValue(response.body(), Map.class);
+ } catch (JsonProcessingException ex) {
+ log.debug("Not able to parse response to json: {}, {}", response.body(), ex);
+ }
+ return response.body();
+ }
+
+ private String computeQueryString(JsonRecord jsonRecord) {
+ if (queryStringTemplates.isEmpty()) {
+ return "";
+ }
+
+ return "?"
+ + queryStringTemplates.entrySet().stream()
+ .map(
+ e -> {
+ final String resolved = e.getValue().execute(jsonRecord);
+ return encodeParam(e.getKey(), resolved);
+ })
+ .collect(Collectors.joining("&"));
+ }
+
+ private static String encodeParam(String key, String value) {
+ return String.format("%s=%s", key, URLEncoder.encode(value, StandardCharsets.UTF_8));
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (executor != null) {
+ executor.shutdownNow();
+ }
+ }
+}
diff --git a/langstream-agents/langstream-agent-http-request/src/main/java/ai/langstream/agents/queryhttp/HttpRequestAgentProvider.java b/langstream-agents/langstream-agent-http-request/src/main/java/ai/langstream/agents/queryhttp/HttpRequestAgentProvider.java
new file mode 100644
index 000000000..c0ca6f854
--- /dev/null
+++ b/langstream-agents/langstream-agent-http-request/src/main/java/ai/langstream/agents/queryhttp/HttpRequestAgentProvider.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright DataStax, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ai.langstream.agents.queryhttp;
+
+import ai.langstream.api.runner.code.AgentCodeProvider;
+import ai.langstream.api.runner.code.AgentProcessor;
+import java.util.Map;
+import java.util.function.Supplier;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class HttpRequestAgentProvider implements AgentCodeProvider {
+
+ private static final Map> FACTORIES =
+ Map.of("http-request", HttpRequestAgent::new);
+
+ @Override
+ public boolean supports(String agentType) {
+ return FACTORIES.containsKey(agentType);
+ }
+
+ @Override
+ public AgentProcessor createInstance(String agentType) {
+ return FACTORIES.get(agentType).get();
+ }
+}
diff --git a/langstream-agents/langstream-agent-http-request/src/main/resources/META-INF/ai.langstream.agents.index b/langstream-agents/langstream-agent-http-request/src/main/resources/META-INF/ai.langstream.agents.index
new file mode 100644
index 000000000..1ddcb4e59
--- /dev/null
+++ b/langstream-agents/langstream-agent-http-request/src/main/resources/META-INF/ai.langstream.agents.index
@@ -0,0 +1 @@
+query-http
\ No newline at end of file
diff --git a/langstream-agents/langstream-agent-http-request/src/main/resources/META-INF/services/ai.langstream.api.runner.code.AgentCodeProvider b/langstream-agents/langstream-agent-http-request/src/main/resources/META-INF/services/ai.langstream.api.runner.code.AgentCodeProvider
new file mode 100644
index 000000000..248a0d85f
--- /dev/null
+++ b/langstream-agents/langstream-agent-http-request/src/main/resources/META-INF/services/ai.langstream.api.runner.code.AgentCodeProvider
@@ -0,0 +1 @@
+ai.langstream.agents.queryhttp.HttpRequestAgentProvider
\ No newline at end of file
diff --git a/langstream-agents/langstream-agents-text-processing/pom.xml b/langstream-agents/langstream-agents-text-processing/pom.xml
index 82ad2dc96..5fca5e28d 100644
--- a/langstream-agents/langstream-agents-text-processing/pom.xml
+++ b/langstream-agents/langstream-agents-text-processing/pom.xml
@@ -26,10 +26,6 @@
langstream-agents-text-processing
jar
LangStream - Apache Tika Agents
-
- 17
- 17
-
${project.groupId}
@@ -90,12 +86,17 @@
org.testcontainers
localstack
+ test
org.testcontainers
junit-jupiter
+ test
+
+ 2.8.0
+
diff --git a/langstream-agents/pom.xml b/langstream-agents/pom.xml
index cb13f7814..e362ad4c8 100644
--- a/langstream-agents/pom.xml
+++ b/langstream-agents/pom.xml
@@ -40,5 +40,6 @@
langstream-ai-agents
langstream-vector-agents
langstream-agents-flow-control
+ langstream-agent-http-request
diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/HttpRequestAgentProvider.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/HttpRequestAgentProvider.java
new file mode 100644
index 000000000..c44454eef
--- /dev/null
+++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/HttpRequestAgentProvider.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright DataStax, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ai.langstream.runtime.impl.k8s.agents;
+
+import ai.langstream.api.doc.AgentConfig;
+import ai.langstream.api.doc.ConfigProperty;
+import ai.langstream.api.model.AgentConfiguration;
+import ai.langstream.api.runtime.ComponentType;
+import ai.langstream.impl.agents.AbstractComposableAgentProvider;
+import ai.langstream.runtime.impl.k8s.KubernetesClusterRuntime;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class HttpRequestAgentProvider extends AbstractComposableAgentProvider {
+
+ private static final Set SUPPORTED_AGENT_TYPES = Set.of("http-request");
+
+ public HttpRequestAgentProvider() {
+ super(SUPPORTED_AGENT_TYPES, List.of(KubernetesClusterRuntime.CLUSTER_TYPE, "none"));
+ }
+
+ @Override
+ protected ComponentType getComponentType(AgentConfiguration agentConfiguration) {
+ return ComponentType.PROCESSOR;
+ }
+
+ @Override
+ protected Class getAgentConfigModelClass(String type) {
+ return Config.class;
+ }
+
+ @AgentConfig(
+ name = "Http Request",
+ description =
+ """
+ Agent for enriching data with an HTTP request.
+ """)
+ @Data
+ public static class Config {
+ @ConfigProperty(
+ description =
+ """
+ Url to send the request to. For adding query string parameters, use the `query-string` field.
+ """,
+ required = true)
+ private String url;
+
+ @ConfigProperty(
+ description =
+ """
+ The field that will hold the results, it can be the same as "field" to override it.
+ """,
+ required = true)
+ @JsonProperty("output-field")
+ private String outputFieldName;
+
+ @ConfigProperty(
+ description =
+ """
+ Http method to use for the request.
+ """,
+ defaultValue = "GET")
+ private String method;
+
+ @ConfigProperty(
+ description =
+ """
+ Headers to send with the request. You can use the Mustache syntax to inject value from the context.
+ """)
+ private Map headers;
+
+ @ConfigProperty(
+ description =
+ """
+ Query string to append to the url. You can use the Mustache syntax to inject value from the context.
+ Note that the values will be automatically escaped.
+ """)
+ @JsonProperty("query-string")
+ private Map queryString;
+
+ @ConfigProperty(
+ description =
+ """
+ Body to send with the request. You can use the Mustache syntax to inject value from the context.
+ """)
+ private String body;
+
+ @ConfigProperty(
+ description =
+ """
+ Whether or not to follow redirects.
+ """,
+ defaultValue = "true")
+ @JsonProperty("allow-redirects")
+ private boolean allowRedirects;
+
+ @ConfigProperty(
+ description =
+ """
+ Whether or not to handle cookies during the redirects.
+ """,
+ defaultValue = "true")
+ @JsonProperty("handle-cookies")
+ private boolean handleCookies;
+ }
+}
diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/resources/META-INF/services/ai.langstream.api.runtime.AgentNodeProvider b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/resources/META-INF/services/ai.langstream.api.runtime.AgentNodeProvider
index 308f54f86..61bbaaea6 100644
--- a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/resources/META-INF/services/ai.langstream.api.runtime.AgentNodeProvider
+++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/resources/META-INF/services/ai.langstream.api.runtime.AgentNodeProvider
@@ -8,4 +8,5 @@ ai.langstream.runtime.impl.k8s.agents.KubernetesCompositeAgentProvider
ai.langstream.runtime.impl.k8s.agents.IdentityAgentProvider
ai.langstream.runtime.impl.k8s.agents.QueryVectorDBAgentProvider
ai.langstream.runtime.impl.k8s.agents.ReRankAgentProvider
-ai.langstream.runtime.impl.k8s.agents.FlowControlAgentsProvider
\ No newline at end of file
+ai.langstream.runtime.impl.k8s.agents.FlowControlAgentsProvider
+ai.langstream.runtime.impl.k8s.agents.HttpRequestAgentProvider
diff --git a/langstream-runtime/langstream-runtime-impl/pom.xml b/langstream-runtime/langstream-runtime-impl/pom.xml
index 374acca30..9dc7cba36 100644
--- a/langstream-runtime/langstream-runtime-impl/pom.xml
+++ b/langstream-runtime/langstream-runtime-impl/pom.xml
@@ -290,6 +290,13 @@
provided
+
+ ${project.groupId}
+ langstream-agent-http-request
+ ${project.version}
+ provided
+
+
@@ -525,6 +532,16 @@
${project.build.directory}/agents
+
+ ${project.groupId}
+ langstream-agent-http-request
+ ${project.version}
+ nar
+ nar
+ false
+ ${project.build.directory}/agents
+
+
diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/AbstractApplicationRunner.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/AbstractApplicationRunner.java
index 675108d40..95b18decb 100644
--- a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/AbstractApplicationRunner.java
+++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/AbstractApplicationRunner.java
@@ -108,6 +108,7 @@ public T getGlobal(String key) {
}
public void close() {
+ applicationDeployer.cleanup(tenant, implementation);
applicationDeployer.delete(tenant, implementation, null);
Awaitility.await()
.until(
@@ -115,12 +116,6 @@ public void close() {
log.info("Waiting for secrets to be deleted. {}", secrets);
return secrets.isEmpty();
});
- // this is a workaround, we want to clean up the env
- topicConnectionsRuntimeRegistry
- .getTopicConnectionsRuntime(
- implementation.getApplication().getInstance().streamingCluster())
- .asTopicConnectionsRuntime()
- .delete(implementation);
}
}
@@ -265,7 +260,7 @@ protected List waitForMessages(KafkaConsumer consumer, List> e
assertArrayEquals((byte[]) expectedValue, (byte[]) actualValue);
} else {
log.info("expected: {}", expectedValue);
- log.info("got: {}", actualValue);
+ log.info("got: {}", actualValue);
assertEquals(expectedValue, actualValue);
}
}
diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/CassandraAssetQueryWriteIT.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/CassandraAssetQueryWriteIT.java
index 33ba749a3..3bf1b1011 100644
--- a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/CassandraAssetQueryWriteIT.java
+++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/CassandraAssetQueryWriteIT.java
@@ -81,7 +81,7 @@ public void testCassandra() throws Exception {
create-statements:
- "CREATE KEYSPACE vsearch WITH REPLICATION = {'class' : 'SimpleStrategy','replication_factor' : 1};"
delete-statements:
- - "DROP KEYSPACE vsearch;"
+ - "DROP KEYSPACE IF EXISTS vsearch;"
- name: "documents-table"
asset-type: "cassandra-table"
creation-mode: create-if-not-exists
@@ -219,7 +219,7 @@ public void testCassandraTable() throws Exception {
- "CREATE TABLE IF NOT EXISTS v1.documents (id int PRIMARY KEY, name text, description text);"
- "INSERT INTO v1.documents (id, name, description) VALUES (1, 'A', 'A description');"
delete-statements:
- - "DROP TABLE v1.documents;"
+ - "DROP TABLE IF EXISTS v1.documents;"
topics:
- name: "input-topic"
creation-mode: create-if-not-exists
diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/CassandraVectorAssetQueryWriteIT.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/CassandraVectorAssetQueryWriteIT.java
index 9f619756f..91ffa3277 100644
--- a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/CassandraVectorAssetQueryWriteIT.java
+++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/CassandraVectorAssetQueryWriteIT.java
@@ -85,7 +85,7 @@ public void testCassandra() throws Exception {
create-statements:
- "CREATE KEYSPACE vsearch WITH REPLICATION = {'class' : 'SimpleStrategy','replication_factor' : 1};"
delete-statements:
- - "DROP KEYSPACE vsearch;"
+ - "DROP KEYSPACE IF EXISTS vsearch;"
- name: "documents-table"
asset-type: "cassandra-table"
creation-mode: create-if-not-exists
diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/HttpRequestAgentRunnerIT.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/HttpRequestAgentRunnerIT.java
new file mode 100644
index 000000000..f48fdd9bf
--- /dev/null
+++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/HttpRequestAgentRunnerIT.java
@@ -0,0 +1,220 @@
+/*
+ * Copyright DataStax, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ai.langstream.kafka;
+
+import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
+import static com.github.tomakehurst.wiremock.client.WireMock.get;
+import static com.github.tomakehurst.wiremock.client.WireMock.ok;
+import static com.github.tomakehurst.wiremock.client.WireMock.okJson;
+import static com.github.tomakehurst.wiremock.client.WireMock.post;
+import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
+
+import ai.langstream.AbstractApplicationRunner;
+import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo;
+import com.github.tomakehurst.wiremock.junit5.WireMockTest;
+import java.util.List;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+@Slf4j
+@WireMockTest
+class HttpRequestAgentRunnerIT extends AbstractApplicationRunner {
+
+ static WireMockRuntimeInfo wireMockRuntimeInfo;
+
+ @BeforeAll
+ static void onBeforeAll(WireMockRuntimeInfo info) {
+ wireMockRuntimeInfo = info;
+ }
+
+ @Test
+ void testGetJson() throws Exception {
+
+ stubFor(
+ get("/api/models?name=my-model")
+ .willReturn(
+ okJson(
+ """
+ {"id": "my-model",
+ "created": "2021-08-31T12:00:00Z",
+ "model": "gpt-35-turbo",
+ "object": "text-generation",
+ "choices": [{"text": "It is a car."}]}
+ """)));
+
+ Map application =
+ Map.of(
+ "module.yaml",
+ """
+ topics:
+ - name: "input-topic"
+ creation-mode: create-if-not-exists
+ deletion-mode: delete
+ - name: "output-topic"
+ creation-mode: create-if-not-exists
+ deletion-mode: delete
+ pipeline:
+ - name: "http-request"
+ type: "http-request"
+ input: input-topic
+ output: output-topic
+ id: step1
+ configuration:
+ output-field: value.api
+ url: %s/api/models
+ query-string:
+ name: "{{{ value.id }}}"
+
+ """
+ .formatted(wireMockRuntimeInfo.getHttpBaseUrl()));
+
+ final String e1 =
+ """
+ {"id":"my-model","classification":"good","api":{"id":"my-model","created":"2021-08-31T12:00:00Z","model":"gpt-35-turbo","object":"text-generation","choices":[{"text":"It is a car."}]}}""";
+ runAndAssertMessage(application, e1);
+ }
+
+ @Test
+ void testGetRawText() throws Exception {
+
+ stubFor(
+ get("/api/models?name=my-model")
+ .willReturn(
+ ok("""
+ some-string""")));
+
+ Map application =
+ Map.of(
+ "module.yaml",
+ """
+ topics:
+ - name: "input-topic"
+ creation-mode: create-if-not-exists
+ deletion-mode: delete
+ - name: "output-topic"
+ creation-mode: create-if-not-exists
+ deletion-mode: delete
+ pipeline:
+ - name: "http-request"
+ type: "http-request"
+ input: input-topic
+ output: output-topic
+ id: step1
+ configuration:
+ output-field: value.api
+ url: %s/api/models
+ query-string:
+ name: "{{{ value.id }}}"
+
+ """
+ .formatted(wireMockRuntimeInfo.getHttpBaseUrl()));
+
+ final String e1 =
+ """
+ {"id":"my-model","classification":"good","api":"some-string"}""";
+ runAndAssertMessage(application, e1);
+ }
+
+ @Test
+ void testPostWithBody() throws Exception {
+
+ stubFor(
+ post("/api/models?name=my-model")
+ .withHeader("Content-Type", equalTo("application/json"))
+ .withHeader("Authorization", equalTo("Bearer my-token!"))
+ .withRequestBody(equalTo("{\"id\": \"my-model\"}"))
+ .willReturn(
+ okJson(
+ """
+ {"id": "my-model",
+ "created": "2021-08-31T12:00:00Z",
+ "model": "gpt-35-turbo",
+ "object": "text-generation",
+ "choices": [{"text": "It is a car."}]}
+ """)));
+
+ Map application =
+ Map.of(
+ "module.yaml",
+ """
+ topics:
+ - name: "input-topic"
+ creation-mode: create-if-not-exists
+ deletion-mode: delete
+ - name: "output-topic"
+ creation-mode: create-if-not-exists
+ deletion-mode: delete
+ pipeline:
+ - name: "http-request"
+ type: "http-request"
+ input: input-topic
+ output: output-topic
+ id: step1
+ configuration:
+ output-field: value.api
+ url: %s/api/models
+ query-string:
+ name: "{{{ value.id }}}"
+ method: POST
+ body: '{"id": "{{{ value.id }}}"}'
+ headers:
+ Content-Type: application/json
+ Authorization: Bearer {{{ secrets.s1.token }}}
+ """
+ .formatted(wireMockRuntimeInfo.getHttpBaseUrl()));
+
+ final String e1 =
+ """
+ {"id":"my-model","classification":"good","api":{"id":"my-model","created":"2021-08-31T12:00:00Z","model":"gpt-35-turbo","object":"text-generation","choices":[{"text":"It is a car."}]}}""";
+ runAndAssertMessage(application, e1);
+ }
+
+ private void runAndAssertMessage(Map application, String e1) throws Exception {
+ String tenant = "tenant";
+ String[] expectedAgents = {"app-step1"};
+
+ // write some data
+ try (ApplicationRuntime applicationRuntime =
+ deployApplicationWithSecrets(
+ tenant,
+ "app",
+ application,
+ buildInstanceYaml(),
+ """
+ secrets:
+ - id: s1
+ data:
+ token: my-token!
+ """,
+ expectedAgents)) {
+ try (KafkaProducer producer = createProducer();
+ KafkaConsumer consumer = createConsumer("output-topic")) {
+
+ sendMessage(
+ "input-topic",
+ "{\"id\":\"my-model\",\"classification\":\"good\"}",
+ producer);
+ executeAgentRunners(applicationRuntime);
+
+ waitForMessages(consumer, List.of(e1));
+ }
+ }
+ }
+}
diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/mockagents/MockAssetManagerCodeProvider.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/mockagents/MockAssetManagerCodeProvider.java
index 278f92e2c..546ce7ab6 100644
--- a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/mockagents/MockAssetManagerCodeProvider.java
+++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/mockagents/MockAssetManagerCodeProvider.java
@@ -80,11 +80,7 @@ public synchronized void deployAsset() throws Exception {
@Override
public synchronized boolean deleteAssetIfExists() throws Exception {
log.info("Deleting asset {}", assetDefinition);
- final boolean remove = DEPLOYED_ASSETS.remove(assetDefinition);
- if (!remove) {
- throw new IllegalStateException("Asset not found: " + assetDefinition);
- }
- return true;
+ return DEPLOYED_ASSETS.remove(assetDefinition);
}
}
}