diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/PythonFunctionIT.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/PythonAgentsIT.java similarity index 75% rename from langstream-e2e-tests/src/test/java/ai/langstream/tests/PythonFunctionIT.java rename to langstream-e2e-tests/src/test/java/ai/langstream/tests/PythonAgentsIT.java index a1b8b5f67..55fc4a2ae 100644 --- a/langstream-e2e-tests/src/test/java/ai/langstream/tests/PythonFunctionIT.java +++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/PythonAgentsIT.java @@ -24,17 +24,18 @@ import lombok.extern.slf4j.Slf4j; import org.awaitility.Awaitility; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @Slf4j @ExtendWith(BaseEndToEndTest.class) -public class PythonFunctionIT extends BaseEndToEndTest { +public class PythonAgentsIT extends BaseEndToEndTest { @ParameterizedTest @ValueSource(strings = {"python-processor", "experimental-python-processor"}) - public void test(String appDir) { + public void testProcessor(String appDir) { installLangStreamCluster(false); final String tenant = "ten-" + System.currentTimeMillis(); setupTenant(tenant); @@ -61,6 +62,38 @@ public void test(String appDir) { executeCommandOnClient("bin/langstream apps delete %s".formatted(applicationId).split(" ")); + awaitCleanup(tenantNamespace, applicationId, "-test-python-processor"); + + final List topics = getAllTopics(); + Assertions.assertEquals(List.of("ls-test-topic0"), topics); + } + + @Test + public void testSource() { + installLangStreamCluster(false); + final String tenant = "ten-" + System.currentTimeMillis(); + setupTenant(tenant); + final String applicationId = "my-test-app"; + deployLocalApplication(applicationId, "experimental-python-source"); + final String tenantNamespace = TENANT_NAMESPACE_PREFIX + tenant; + awaitApplicationReady(applicationId, 1); + + final String output = + executeCommandOnClient( + "bin/langstream gateway consume %s consume-output --position earliest -n 1 --connect-timeout 30" + .formatted(applicationId) + .split(" ")); + log.info("Output: {}", output); + Assertions.assertTrue( + output.contains("{\"record\":{\"key\":null,\"value\":\"test\",\"headers\":{}}")); + + executeCommandOnClient("bin/langstream apps delete %s".formatted(applicationId).split(" ")); + + awaitCleanup(tenantNamespace, applicationId, "-test-python-source"); + } + + private static void awaitCleanup( + String tenantNamespace, String applicationId, String appNameSuffix) { Awaitility.await() .atMost(1, TimeUnit.MINUTES) .untilAsserted( @@ -69,7 +102,7 @@ public void test(String appDir) { client.apps() .statefulSets() .inNamespace(tenantNamespace) - .withName(applicationId + "-test-python-processor") + .withName(applicationId + appNameSuffix) .get()); Assertions.assertEquals( @@ -96,8 +129,5 @@ public void test(String appDir) { .getItems() .size()); }); - - final List topics = getAllTopics(); - Assertions.assertEquals(List.of("ls-test-topic0"), topics); } } diff --git a/langstream-e2e-tests/src/test/resources/apps/experimental-python-source/gateways.yaml b/langstream-e2e-tests/src/test/resources/apps/experimental-python-source/gateways.yaml new file mode 100644 index 000000000..b74064459 --- /dev/null +++ b/langstream-e2e-tests/src/test/resources/apps/experimental-python-source/gateways.yaml @@ -0,0 +1,20 @@ +# +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +gateways: + - id: consume-output + type: consume + topic: ls-test-output \ No newline at end of file diff --git a/langstream-e2e-tests/src/test/resources/apps/experimental-python-source/pipeline.yaml b/langstream-e2e-tests/src/test/resources/apps/experimental-python-source/pipeline.yaml new file mode 100644 index 000000000..57e10756d --- /dev/null +++ b/langstream-e2e-tests/src/test/resources/apps/experimental-python-source/pipeline.yaml @@ -0,0 +1,33 @@ +# +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module: "module-1" +id: "pipeline-1" +name: "Test Source" +topics: + - name: ls-test-output + creation-mode: create-if-not-exists + schema: + type: string +pipeline: + - name: "Source using Python" + resources: + size: 2 + id: "test-python-source" + type: "experimental-python-source" + output: ls-test-output + configuration: + className: example.TestSource \ No newline at end of file diff --git a/langstream-e2e-tests/src/test/resources/apps/experimental-python-source/python/example.py b/langstream-e2e-tests/src/test/resources/apps/experimental-python-source/python/example.py new file mode 100644 index 000000000..dcefa5445 --- /dev/null +++ b/langstream-e2e-tests/src/test/resources/apps/experimental-python-source/python/example.py @@ -0,0 +1,33 @@ +# +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from langstream import SimpleRecord +import logging + +class TestSource(object): + + def __init__(self): + self.sent = False + + def read(self): + logging.info("Read records") + if not self.sent: + self.sent = True + return [SimpleRecord("test")] + return [] + + def commit(self, records): + logging.info("Commit :" + str(records)) diff --git a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/tests/test_grpc_service.py b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/tests/test_grpc_processor.py similarity index 98% rename from langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/tests/test_grpc_service.py rename to langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/tests/test_grpc_processor.py index c388fb9bd..f7edf5a49 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/tests/test_grpc_service.py +++ b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/tests/test_grpc_processor.py @@ -41,7 +41,7 @@ @pytest.fixture(autouse=True) def stub(): config = """{ - "className": "langstream_grpc.tests.test_grpc_service.MyProcessor" + "className": "langstream_grpc.tests.test_grpc_processor.MyProcessor" }""" server = AgentServer("[::]:0", config) server.start()