Skip to content

Commit

Permalink
Add e2e test for topic producer
Browse files Browse the repository at this point in the history
  • Loading branch information
cbornet committed Nov 22, 2023
1 parent b376906 commit c04d4cb
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@ public void testProcessor() {
"{\"record\":{\"key\":null,\"value\":\"my-value!!super secret value\","
+ "\"headers\":{\"langstream-client-session-id\":\"s1\"}}"));

output =
executeCommandOnClient(
("bin/langstream gateway consume %s topic-producer --position earliest -n 1 --connect-timeout 30")
.formatted(applicationId)
.split(" "));
log.info("Output2: {}", output);
Assertions.assertTrue(
output.contains(
"{\"record\":{\"key\":null,\"value\":\"my-value test-topic-producer\"}}"));

updateLocalApplicationAndAwaitReady(
tenant,
applicationId,
Expand All @@ -77,7 +87,7 @@ public void testProcessor() {
+ "30 -p sessionId=s2")
.formatted(applicationId)
.split(" "));
log.info("Output2: {}", output);
log.info("Output3: {}", output);
Assertions.assertTrue(
output.contains(
"{\"record\":{\"key\":null,\"value\":\"my-value!!super secret value - changed\","
Expand All @@ -89,6 +99,7 @@ public void testProcessor() {
log.info("all topics: {}", topics);
Assertions.assertTrue(topics.contains("ls-test-topic0"));
Assertions.assertFalse(topics.contains("ls-test-topic1"));
Assertions.assertTrue(topics.contains("ls-test-topic-producer"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,7 @@ gateways:
filters:
headers:
- key: langstream-client-session-id
value-from-parameters: sessionId
value-from-parameters: sessionId
- id: topic-producer
type: consume
topic: ls-test-topic-producer
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ topics:
type: string
keySchema:
type: string
- name: ls-test-topic-producer
creation-mode: create-if-not-exists
schema:
type: string
keySchema:
type: string
pipeline:
- name: "Process using Python"
resources:
Expand All @@ -44,4 +50,4 @@ pipeline:
output: ls-test-topic1
configuration:
secret_value: "${secrets.secret1.value-key}"
className: example.Exclamation
className: example.Exclamation
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def init(self, config, context: AgentContext):

def process(self, record):
logging.info("Processing record" + str(record))
self.context.get_topic_producer().write("ls-test-topic-producer", {"value": record.value() + " test-topic-producer"}).result()
directory = self.context.get_persistent_state_directory()
counter_file = os.path.join(directory, "counter.txt")
counter = 0
Expand Down

0 comments on commit c04d4cb

Please sign in to comment.