Skip to content

Commit

Permalink
Add e2e test for topic producer
Browse files Browse the repository at this point in the history
  • Loading branch information
cbornet committed Nov 23, 2023
1 parent b376906 commit dbd6975
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@ public void testProcessor() {
"{\"record\":{\"key\":null,\"value\":\"my-value!!super secret value\","
+ "\"headers\":{\"langstream-client-session-id\":\"s1\"}}"));

output =
executeCommandOnClient(
("bin/langstream gateway consume %s topic-producer --position earliest -n 1 --connect-timeout 30")
.formatted(applicationId)
.split(" "));
log.info("Output2: {}", output);
Assertions.assertTrue(
output.contains(
"{\"record\":{\"key\":null,\"value\":\"my-value test-topic-producer\",\"headers\":{}}"));

updateLocalApplicationAndAwaitReady(
tenant,
applicationId,
Expand All @@ -77,7 +87,7 @@ public void testProcessor() {
+ "30 -p sessionId=s2")
.formatted(applicationId)
.split(" "));
log.info("Output2: {}", output);
log.info("Output3: {}", output);
Assertions.assertTrue(
output.contains(
"{\"record\":{\"key\":null,\"value\":\"my-value!!super secret value - changed\","
Expand All @@ -89,6 +99,7 @@ public void testProcessor() {
log.info("all topics: {}", topics);
Assertions.assertTrue(topics.contains("ls-test-topic0"));
Assertions.assertFalse(topics.contains("ls-test-topic1"));
Assertions.assertTrue(topics.contains("ls-test-topic-producer"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,7 @@ gateways:
filters:
headers:
- key: langstream-client-session-id
value-from-parameters: sessionId
value-from-parameters: sessionId
- id: topic-producer
type: consume
topic: ls-test-topic-producer
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ topics:
type: string
keySchema:
type: string
- name: ls-test-topic-producer
creation-mode: create-if-not-exists
schema:
type: string
keySchema:
type: string
pipeline:
- name: "Process using Python"
resources:
Expand All @@ -44,4 +50,4 @@ pipeline:
output: ls-test-topic1
configuration:
secret_value: "${secrets.secret1.value-key}"
className: example.Exclamation
className: example.Exclamation
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def init(self, config, context: AgentContext):

def process(self, record):
logging.info("Processing record" + str(record))
self.context.get_topic_producer().write("ls-test-topic-producer", {"value": record.value() + " test-topic-producer"}).result()
directory = self.context.get_persistent_state_directory()
counter_file = os.path.join(directory, "counter.txt")
counter = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ async def agent_info(self, _, __):
async def poll_topic_producer_records(self, context):
while True:
topic, record, future = await self.topic_producer_records.get()
schemas, grpc_record = self.to_grpc_record(record)
schemas, grpc_record = self.to_grpc_record(wrap_in_record(record))
for schema in schemas:
await context.write(TopicProducerResponse(schema=schema))
await contextµ.write(TopicProducerResponse(schema=schema))
self.topic_producer_record_id += 1
self.topic_producer_records_pending[self.topic_producer_record_id] = future
grpc_record.record_id = self.topic_producer_record_id
Expand Down

0 comments on commit dbd6975

Please sign in to comment.