diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/PythonAgentsIT.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/PythonAgentsIT.java index c328ea104..639a8b6d9 100644 --- a/langstream-e2e-tests/src/test/java/ai/langstream/tests/PythonAgentsIT.java +++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/PythonAgentsIT.java @@ -59,6 +59,16 @@ public void testProcessor() { "{\"record\":{\"key\":null,\"value\":\"my-value!!super secret value\"," + "\"headers\":{\"langstream-client-session-id\":\"s1\"}}")); + output = + executeCommandOnClient( + ("bin/langstream gateway consume %s topic-producer --position earliest -n 1 --connect-timeout 30") + .formatted(applicationId) + .split(" ")); + log.info("Output2: {}", output); + Assertions.assertTrue( + output.contains( + "{\"record\":{\"key\":null,\"value\":\"my-value test-topic-producer\"}}")); + updateLocalApplicationAndAwaitReady( tenant, applicationId, @@ -77,7 +87,7 @@ public void testProcessor() { + "30 -p sessionId=s2") .formatted(applicationId) .split(" ")); - log.info("Output2: {}", output); + log.info("Output3: {}", output); Assertions.assertTrue( output.contains( "{\"record\":{\"key\":null,\"value\":\"my-value!!super secret value - changed\"," @@ -89,6 +99,7 @@ public void testProcessor() { log.info("all topics: {}", topics); Assertions.assertTrue(topics.contains("ls-test-topic0")); Assertions.assertFalse(topics.contains("ls-test-topic1")); + Assertions.assertTrue(topics.contains("ls-test-topic-producer")); } @Test diff --git a/langstream-e2e-tests/src/test/resources/apps/python-processor/gateways.yaml b/langstream-e2e-tests/src/test/resources/apps/python-processor/gateways.yaml index 1977390d9..da2c2c443 100644 --- a/langstream-e2e-tests/src/test/resources/apps/python-processor/gateways.yaml +++ b/langstream-e2e-tests/src/test/resources/apps/python-processor/gateways.yaml @@ -33,4 +33,7 @@ gateways: filters: headers: - key: langstream-client-session-id - value-from-parameters: sessionId \ No newline at end of file + value-from-parameters: sessionId + - id: topic-producer + type: consume + topic: ls-test-topic-producer diff --git a/langstream-e2e-tests/src/test/resources/apps/python-processor/pipeline.yaml b/langstream-e2e-tests/src/test/resources/apps/python-processor/pipeline.yaml index 5add9e10e..b0e3bc16e 100644 --- a/langstream-e2e-tests/src/test/resources/apps/python-processor/pipeline.yaml +++ b/langstream-e2e-tests/src/test/resources/apps/python-processor/pipeline.yaml @@ -31,6 +31,12 @@ topics: type: string keySchema: type: string + - name: ls-test-topic-producer + creation-mode: create-if-not-exists + schema: + type: string + keySchema: + type: string pipeline: - name: "Process using Python" resources: @@ -44,4 +50,4 @@ pipeline: output: ls-test-topic1 configuration: secret_value: "${secrets.secret1.value-key}" - className: example.Exclamation \ No newline at end of file + className: example.Exclamation diff --git a/langstream-e2e-tests/src/test/resources/apps/python-processor/python/example.py b/langstream-e2e-tests/src/test/resources/apps/python-processor/python/example.py index a87a6fe73..5627de32c 100644 --- a/langstream-e2e-tests/src/test/resources/apps/python-processor/python/example.py +++ b/langstream-e2e-tests/src/test/resources/apps/python-processor/python/example.py @@ -26,6 +26,7 @@ def init(self, config, context: AgentContext): def process(self, record): logging.info("Processing record" + str(record)) + self.context.get_topic_producer().write("ls-test-topic-producer", {"value": record.value() + " test-topic-producer"}).result() directory = self.context.get_persistent_state_directory() counter_file = os.path.join(directory, "counter.txt") counter = 0