Skip to content

Commit

Permalink
Add integration test for Python Source
Browse files Browse the repository at this point in the history
  • Loading branch information
cbornet committed Sep 20, 2023
1 parent 3c1c609 commit 2d07d67
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,18 @@
import lombok.extern.slf4j.Slf4j;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@Slf4j
@ExtendWith(BaseEndToEndTest.class)
public class PythonFunctionIT extends BaseEndToEndTest {
public class PythonAgentsIT extends BaseEndToEndTest {

@ParameterizedTest
@ValueSource(strings = {"python-processor", "experimental-python-processor"})
public void test(String appDir) {
public void testProcessor(String appDir) {
installLangStreamCluster(false);
final String tenant = "ten-" + System.currentTimeMillis();
setupTenant(tenant);
Expand All @@ -61,6 +62,38 @@ public void test(String appDir) {

executeCommandOnClient("bin/langstream apps delete %s".formatted(applicationId).split(" "));

awaitCleanup(tenantNamespace, applicationId, "-test-python-processor");

final List<String> topics = getAllTopics();
Assertions.assertEquals(List.of("ls-test-topic0"), topics);
}

@Test
public void testSource() {
installLangStreamCluster(false);
final String tenant = "ten-" + System.currentTimeMillis();
setupTenant(tenant);
final String applicationId = "my-test-app";
deployLocalApplication(applicationId, "experimental-python-source");
final String tenantNamespace = TENANT_NAMESPACE_PREFIX + tenant;
awaitApplicationReady(applicationId, 1);

final String output =
executeCommandOnClient(
"bin/langstream gateway consume %s consume-output --position earliest -n 1 --connect-timeout 30"
.formatted(applicationId)
.split(" "));
log.info("Output: {}", output);
Assertions.assertTrue(
output.contains("{\"record\":{\"key\":null,\"value\":\"test\",\"headers\":{}}"));

executeCommandOnClient("bin/langstream apps delete %s".formatted(applicationId).split(" "));

awaitCleanup(tenantNamespace, applicationId, "-test-python-source");
}

private static void awaitCleanup(
String tenantNamespace, String applicationId, String appNameSuffix) {
Awaitility.await()
.atMost(1, TimeUnit.MINUTES)
.untilAsserted(
Expand All @@ -69,7 +102,7 @@ public void test(String appDir) {
client.apps()
.statefulSets()
.inNamespace(tenantNamespace)
.withName(applicationId + "-test-python-processor")
.withName(applicationId + appNameSuffix)
.get());

Assertions.assertEquals(
Expand All @@ -96,8 +129,5 @@ public void test(String appDir) {
.getItems()
.size());
});

final List<String> topics = getAllTopics();
Assertions.assertEquals(List.of("ls-test-topic0"), topics);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#
# Copyright DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

gateways:
- id: consume-output
type: consume
topic: ls-test-output
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#
# Copyright DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

module: "module-1"
id: "pipeline-1"
name: "Test Source"
topics:
- name: ls-test-output
creation-mode: create-if-not-exists
schema:
type: string
pipeline:
- name: "Source using Python"
resources:
size: 2
id: "test-python-source"
type: "experimental-python-source"
output: ls-test-output
configuration:
className: example.TestSource
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#
# Copyright DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from langstream import SimpleRecord
import logging

class TestSource(object):

def __init__(self):
self.sent = False

def read(self):
logging.info("Read records")
if not self.sent:
self.sent = True
return [SimpleRecord("test")]

def commit(self, records):
logging.info("Commit :" + str(records))
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
@pytest.fixture(autouse=True)
def stub():
config = """{
"className": "langstream_grpc.tests.test_grpc_service.MyProcessor"
"className": "langstream_grpc.tests.test_grpc_processor.MyProcessor"
}"""
server = AgentServer("[::]:0", config)
server.start()
Expand Down

0 comments on commit 2d07d67

Please sign in to comment.