From d67fdbacc398201341e6a67e3ef0bcb433b0ddd9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Tue, 6 Aug 2024 15:25:11 +0200 Subject: [PATCH] spotless --- .github/workflows/ci.yml | 2 + .../agents/grpc/GrpcAgentProcessor.java | 10 +- .../java/ai/langstream/cli/LangStreamCLI.java | 3 +- .../commands/gateway/ServiceGatewayCmd.java | 2 +- .../java/ai/langstream/tests/PythonDLQIT.java | 23 +- .../tests/util/BaseEndToEndTest.java | 51 +-- .../python-processor-with-dlq/pipeline.yaml | 2 + .../python/example.py | 4 +- .../langstream-runtime-impl/pom.xml | 4 +- .../langstream/runtime/agent/AgentRunner.java | 3 +- .../runtime/agent/TopicConsumerSource.java | 6 +- .../src/main/python/langstream/__init__.py | 2 +- .../src/main/python/langstream/util.py | 2 +- .../python/langstream_grpc/grpc_service.py | 13 +- .../python/langstream_grpc/proto/agent_pb2.py | 91 +++-- .../langstream_grpc/proto/agent_pb2.pyi | 119 ++++++- .../langstream_grpc/proto/agent_pb2_grpc.py | 312 +++++++++++------- .../tests/test_grpc_processor.py | 3 + .../src/main/python/langstream_grpc/util.py | 2 +- .../main/python/scripts/generate-grpc-code.sh | 16 + .../langstream/agents/ErrorHandlingTest.java | 51 ++- 21 files changed, 482 insertions(+), 239 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 52437cb91..1ba9fb3e7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -74,6 +74,8 @@ jobs: test_cmd: | exclude_modules="$(cd langstream-agents && ls -d langstream-* | sed 's/^/!:/g' | tr '\n' ',' | sed 's/,$//'),!langstream-agents,!langstream-webservice,!:langstream-api-gateway,!:langstream-k8s-deployer-operator,!:langstream-runtime-impl" ./mvnw verify -pl $exclude_modules $MAVEN_COMMON_SKIP_FLAGS + # python + unit tests for runtime-impl + ./mvnw package -pl langstream-runtime-impl -ntp -Dspotless.skip -Dlicense.skip steps: - name: 'Setup: checkout project' diff --git a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/GrpcAgentProcessor.java b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/GrpcAgentProcessor.java index 9ee9433dd..070387fd6 100644 --- a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/GrpcAgentProcessor.java +++ b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/GrpcAgentProcessor.java @@ -16,6 +16,7 @@ package ai.langstream.agents.grpc; import ai.langstream.api.runner.code.AgentProcessor; +import ai.langstream.api.runner.code.ErrorTypes; import ai.langstream.api.runner.code.RecordSink; import io.grpc.ManagedChannel; import io.grpc.stub.StreamObserver; @@ -112,9 +113,14 @@ private SourceRecordAndResult fromGrpc( throws IOException { List resultRecords = new ArrayList<>(); if (result.hasError()) { - // TODO: specialize exception ? + final ErrorTypes errorType; + if (result.hasErrorType()) { + errorType = ErrorTypes.valueOf(result.getErrorType().toUpperCase()); + } else { + errorType = ErrorTypes.INTERNAL_ERROR; + } return new SourceRecordAndResult( - sourceRecord, null, new RuntimeException(result.getError())); + sourceRecord, null, new RuntimeException(result.getError()), errorType); } for (Record record : result.getRecordsList()) { resultRecords.add(fromGrpc(record)); diff --git a/langstream-cli/src/main/java/ai/langstream/cli/LangStreamCLI.java b/langstream-cli/src/main/java/ai/langstream/cli/LangStreamCLI.java index 69bd08969..54d15aad0 100644 --- a/langstream-cli/src/main/java/ai/langstream/cli/LangStreamCLI.java +++ b/langstream-cli/src/main/java/ai/langstream/cli/LangStreamCLI.java @@ -95,13 +95,12 @@ private static String computeErrorMessage(Exception e) { final HttpResponse response = httpRequestFailedException.getResponse(); if (response != null) { Object body = httpRequestFailedException.getResponse().body(); + msg += String.format(" with code %d:", response.statusCode()); if (body != null) { if (body instanceof byte[]) { body = new String((byte[]) body, StandardCharsets.UTF_8); } msg += String.format(": %s", body); - } else { - msg += String.format(": %s", response.statusCode()); } } return msg; diff --git a/langstream-cli/src/main/java/ai/langstream/cli/commands/gateway/ServiceGatewayCmd.java b/langstream-cli/src/main/java/ai/langstream/cli/commands/gateway/ServiceGatewayCmd.java index 42440e7da..a422eb511 100644 --- a/langstream-cli/src/main/java/ai/langstream/cli/commands/gateway/ServiceGatewayCmd.java +++ b/langstream-cli/src/main/java/ai/langstream/cli/commands/gateway/ServiceGatewayCmd.java @@ -15,6 +15,7 @@ */ package ai.langstream.cli.commands.gateway; +import ai.langstream.admin.client.HttpRequestFailedException; import ai.langstream.cli.api.model.Gateways; import com.fasterxml.jackson.core.JsonProcessingException; import java.net.URI; @@ -131,7 +132,6 @@ private void produceHttp( getClient() .getHttpClientFacade() .http(request, HttpResponse.BodyHandlers.ofString()); - log("Response: " + response.statusCode()); log(response.body()); } } diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/PythonDLQIT.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/PythonDLQIT.java index abc8d582f..27738ec1f 100644 --- a/langstream-e2e-tests/src/test/java/ai/langstream/tests/PythonDLQIT.java +++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/PythonDLQIT.java @@ -21,6 +21,7 @@ import ai.langstream.tests.util.TestSuites; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CompletableFuture; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Tag; @@ -47,16 +48,24 @@ public void test() throws Exception { deployLocalApplicationAndAwaitReady( tenant, applicationId, "python-processor-with-dlq", appEnv, 2); - String output = - executeCommandOnClient( + CompletableFuture commandResult = + executeCommandOnClientAsync( "bin/langstream gateway service %s svc -v '{\"my-schema\":true}' --connect-timeout 60" .formatted(applicationId) .split(" ")); - - log.info("Output: {}", output); - Assertions.assertTrue(output.contains("Response: 400")); - Assertions.assertTrue(output.contains("record was not ok:")); - + commandResult.whenComplete( + (output, throwable) -> { + if (throwable == null) { + Assertions.fail("Expected exception"); + } + CommandExecFailedException failedException = + (CommandExecFailedException) throwable; + log.info("Error: {}", failedException.toString()); + String stderr = failedException.getStderr(); + Assertions.assertTrue(stderr.contains("with code: 400")); + Assertions.assertTrue(stderr.contains("record was not ok:")); + }); + commandResult.get(); deleteAppAndAwaitCleanup(tenant, applicationId); } } diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/BaseEndToEndTest.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/BaseEndToEndTest.java index 7871e65a6..e5f6d8cd2 100644 --- a/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/BaseEndToEndTest.java +++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/BaseEndToEndTest.java @@ -78,7 +78,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import java.util.stream.Collectors; -import lombok.SneakyThrows; +import lombok.*; import lombok.extern.slf4j.Slf4j; import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterAll; @@ -223,17 +223,14 @@ private static Pod getFirstPodFromDeployment(String deploymentName) { @SneakyThrows public static String executeCommandOnClient(String... args) { - return executeCommandOnClient(2, TimeUnit.MINUTES, args); + return executeCommandOnClientAsync(args).get(2, TimeUnit.MINUTES); } @SneakyThrows - protected static String executeCommandOnClient(long timeout, TimeUnit unit, String... args) { + protected static CompletableFuture executeCommandOnClientAsync(String... args) { final Pod pod = getFirstPodFromDeployment("langstream-client"); return execInPod( - pod.getMetadata().getName(), - pod.getSpec().getContainers().get(0).getName(), - args) - .get(timeout, unit); + pod.getMetadata().getName(), pod.getSpec().getContainers().get(0).getName(), args); } @SneakyThrows @@ -275,6 +272,15 @@ public static CompletableFuture execInPod( return execInPodInNamespace(namespace, podName, containerName, cmds); } + @AllArgsConstructor + @Getter + @ToString + public static class CommandExecFailedException extends RuntimeException { + private final String command; + private final String stdout; + private final String stderr; + } + public static CompletableFuture execInPodInNamespace( String namespace, String podName, String containerName, String... cmds) { @@ -298,13 +304,17 @@ public void onFailure(Throwable t, Response failureResponse) { if (!completed.compareAndSet(false, true)) { return; } + String errString = error.toString(StandardCharsets.UTF_8); + String outString = out.toString(StandardCharsets.UTF_8); log.warn( "Error executing {} encountered; \nstderr: {}\nstdout: {}", cmd, - error.toString(StandardCharsets.UTF_8), - out.toString(), + errString, + outString, t); - response.completeExceptionally(t); + CommandExecFailedException commandExecFailedException = + new CommandExecFailedException(cmd, errString, outString); + response.completeExceptionally(commandExecFailedException); } @Override @@ -313,18 +323,18 @@ public void onExit(int code, Status status) { return; } if (code != 0) { + String errString = error.toString(StandardCharsets.UTF_8); + String outString = out.toString(StandardCharsets.UTF_8); log.warn( "Error executing {} encountered; \ncode: {}\n stderr: {}\nstdout: {}", cmd, code, - error.toString(StandardCharsets.UTF_8), - out.toString(StandardCharsets.UTF_8)); - response.completeExceptionally( - new RuntimeException( - "Command failed with err code: " - + code - + ", stderr: " - + error.toString(StandardCharsets.UTF_8))); + errString, + outString); + + CommandExecFailedException commandExecFailedException = + new CommandExecFailedException(cmd, errString, outString); + response.completeExceptionally(commandExecFailedException); } else { log.info( "Command completed {}; \nstderr: {}\nstdout: {}", @@ -340,12 +350,13 @@ public void onClose(int rc, String reason) { if (!completed.compareAndSet(false, true)) { return; } + String outString = out.toString(StandardCharsets.UTF_8); log.info( "Command completed {}; \nstderr: {}\nstdout: {}", cmd, error.toString(StandardCharsets.UTF_8), - out.toString(StandardCharsets.UTF_8)); - response.complete(out.toString(StandardCharsets.UTF_8)); + outString); + response.complete(outString); } }; diff --git a/langstream-e2e-tests/src/test/resources/apps/python-processor-with-dlq/pipeline.yaml b/langstream-e2e-tests/src/test/resources/apps/python-processor-with-dlq/pipeline.yaml index 0a55960b9..f72bcc434 100644 --- a/langstream-e2e-tests/src/test/resources/apps/python-processor-with-dlq/pipeline.yaml +++ b/langstream-e2e-tests/src/test/resources/apps/python-processor-with-dlq/pipeline.yaml @@ -32,5 +32,7 @@ pipeline: type: "python-processor" input: ls-test-topic0 output: ls-test-topic1 + errors: + on-failure: dead-letter configuration: className: example.FailProcessor \ No newline at end of file diff --git a/langstream-e2e-tests/src/test/resources/apps/python-processor-with-dlq/python/example.py b/langstream-e2e-tests/src/test/resources/apps/python-processor-with-dlq/python/example.py index 72e3cb8fe..a885190b5 100644 --- a/langstream-e2e-tests/src/test/resources/apps/python-processor-with-dlq/python/example.py +++ b/langstream-e2e-tests/src/test/resources/apps/python-processor-with-dlq/python/example.py @@ -22,5 +22,5 @@ class FailProcessor(Processor): def process(self, record): logging.info("Processing record" + str(record)) - from langstream import InvalidRecordException - raise InvalidRecordException("record was not ok:" + str(record)) \ No newline at end of file + from langstream import InvalidRecordError + raise InvalidRecordError("record was not ok:" + str(record)) \ No newline at end of file diff --git a/langstream-runtime/langstream-runtime-impl/pom.xml b/langstream-runtime/langstream-runtime-impl/pom.xml index 78e861788..496f32c57 100644 --- a/langstream-runtime/langstream-runtime-impl/pom.xml +++ b/langstream-runtime/langstream-runtime-impl/pom.xml @@ -485,7 +485,7 @@ exec - ${project.build.directory}/python + ${project.basedir}/src/main/python poetry run @@ -501,7 +501,7 @@ exec - ${project.build.directory}/python + ${project.basedir}/src/main/python poetry ${skipPythonPackage} diff --git a/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/AgentRunner.java b/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/AgentRunner.java index f96c6d7dc..e6e97e96d 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/AgentRunner.java +++ b/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/AgentRunner.java @@ -925,7 +925,8 @@ private static void runProcessorAgent( new AgentProcessor.SourceRecordAndResult( sourceRecord, List.of(), - permanentFailureException)); + permanentFailureException, + result.errorType())); } else { // in case the source does not throw an exception we mark // the record as "skipped" diff --git a/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/TopicConsumerSource.java b/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/TopicConsumerSource.java index 00c8a5a7e..e15bacfac 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/TopicConsumerSource.java +++ b/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/TopicConsumerSource.java @@ -49,12 +49,10 @@ public void commit(List records) throws Exception { @Override public void permanentFailure(Record record, Exception error, ErrorTypes errorType) { - // DLQ - log.error("Permanent failure on record {}", record, error); + errorType = errorType == null ? ErrorTypes.INTERNAL_ERROR : errorType; + log.error("Permanent {} failure on record {}", errorType, record, error); MutableRecord recordWithError = MutableRecord.recordToMutableRecord(record, false); String sourceTopic = record.origin(); - - errorType = errorType == null ? ErrorTypes.INTERNAL_ERROR : errorType; recordWithError.setProperty( SystemHeaders.ERROR_HANDLING_ERROR_TYPE.getKey(), errorType.toString()); diff --git a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream/__init__.py b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream/__init__.py index 78984aad6..83883ccc3 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream/__init__.py +++ b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream/__init__.py @@ -38,5 +38,5 @@ "SimpleRecord", "AvroValue", "AgentContext", - "InvalidRecordError" + "InvalidRecordError", ] diff --git a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream/util.py b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream/util.py index 719c7abf7..28dbc9a31 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream/util.py +++ b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream/util.py @@ -72,4 +72,4 @@ class AvroValue(object): class InvalidRecordError(Exception): - pass \ No newline at end of file + pass diff --git a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/grpc_service.py b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/grpc_service.py index 78f6604b6..baf6ede33 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/grpc_service.py +++ b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/grpc_service.py @@ -47,7 +47,7 @@ ) from langstream_grpc.proto.agent_pb2_grpc import AgentServiceServicer from .api import Source, Sink, Processor, Record, Agent, AgentContext, TopicProducer -from .util import SimpleRecord, AvroValue, InvalidRecordError +from .util import SimpleRecord, AvroValue class RecordWithId(SimpleRecord): @@ -161,7 +161,7 @@ async def handle_read_requests(self, context, read_records): "permanent_failure", record, RuntimeError(failure.error_message), - failure.error_type + failure.error_type, ) request = await context.read() @@ -207,13 +207,12 @@ async def process(self, requests: AsyncIterable[ProcessorRequest], _): yield ProcessorResponse(schema=schema) grpc_result.records.append(grpc_record) yield ProcessorResponse(results=[grpc_result]) - except InvalidRecordError as invalid_record_error: - grpc_result.error = str(invalid_record_error) - grpc_result.error_type = "INVALID_RECORD" - yield ProcessorResponse(results=[grpc_result]) except Exception as e: + if e.__class__.__name__ == "InvalidRecordError": + grpc_result.error_type = "INVALID_RECORD" + else: + grpc_result.error_type = "INTERNAL_ERROR" grpc_result.error = str(e) - grpc_result.error_type = "INTERNAL_ERROR" yield ProcessorResponse(results=[grpc_result]) async def write(self, requests: AsyncIterable[SinkRequest], context): diff --git a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/proto/agent_pb2.py b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/proto/agent_pb2.py index 14b8238ff..2daa20301 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/proto/agent_pb2.py +++ b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/proto/agent_pb2.py @@ -1,3 +1,19 @@ +# +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: agent.proto @@ -7,6 +23,7 @@ from google.protobuf import descriptor_pool as _descriptor_pool from google.protobuf import symbol_database as _symbol_database from google.protobuf.internal import builder as _builder + # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() @@ -15,44 +32,46 @@ from google.protobuf import empty_pb2 as google_dot_protobuf_dot_empty__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0b\x61gent.proto\x1a\x1bgoogle/protobuf/empty.proto\"!\n\x0cInfoResponse\x12\x11\n\tjson_info\x18\x01 \x01(\t\"\xa3\x02\n\x05Value\x12\x11\n\tschema_id\x18\x01 \x01(\x05\x12\x15\n\x0b\x62ytes_value\x18\x02 \x01(\x0cH\x00\x12\x17\n\rboolean_value\x18\x03 \x01(\x08H\x00\x12\x16\n\x0cstring_value\x18\x04 \x01(\tH\x00\x12\x14\n\nbyte_value\x18\x05 \x01(\x05H\x00\x12\x15\n\x0bshort_value\x18\x06 \x01(\x05H\x00\x12\x13\n\tint_value\x18\x07 \x01(\x05H\x00\x12\x14\n\nlong_value\x18\x08 \x01(\x03H\x00\x12\x15\n\x0b\x66loat_value\x18\t \x01(\x02H\x00\x12\x16\n\x0c\x64ouble_value\x18\n \x01(\x01H\x00\x12\x14\n\njson_value\x18\x0b \x01(\tH\x00\x12\x14\n\navro_value\x18\x0c \x01(\x0cH\x00\x42\x0c\n\ntype_oneof\"-\n\x06Header\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x15\n\x05value\x18\x02 \x01(\x0b\x32\x06.Value\"*\n\x06Schema\x12\x11\n\tschema_id\x18\x01 \x01(\x05\x12\r\n\x05value\x18\x02 \x01(\x0c\"\xb3\x01\n\x06Record\x12\x11\n\trecord_id\x18\x01 \x01(\x03\x12\x18\n\x03key\x18\x02 \x01(\x0b\x32\x06.ValueH\x00\x88\x01\x01\x12\x1a\n\x05value\x18\x03 \x01(\x0b\x32\x06.ValueH\x01\x88\x01\x01\x12\x18\n\x07headers\x18\x04 \x03(\x0b\x32\x07.Header\x12\x0e\n\x06origin\x18\x05 \x01(\t\x12\x16\n\ttimestamp\x18\x06 \x01(\x03H\x02\x88\x01\x01\x42\x06\n\x04_keyB\x08\n\x06_valueB\x0c\n\n_timestamp\"K\n\x18TopicProducerWriteResult\x12\x11\n\trecord_id\x18\x01 \x01(\x03\x12\x12\n\x05\x65rror\x18\x02 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\"X\n\x15TopicProducerResponse\x12\r\n\x05topic\x18\x01 \x01(\t\x12\x17\n\x06schema\x18\x02 \x01(\x0b\x32\x07.Schema\x12\x17\n\x06record\x18\x03 \x01(\x0b\x32\x07.Record\"P\n\x10PermanentFailure\x12\x11\n\trecord_id\x18\x01 \x01(\x03\x12\x15\n\rerror_message\x18\x02 \x01(\t\x12\x12\n\nerror_type\x18\x03 \x01(\t\"X\n\rSourceRequest\x12\x19\n\x11\x63ommitted_records\x18\x01 \x03(\x03\x12,\n\x11permanent_failure\x18\x02 \x01(\x0b\x32\x11.PermanentFailure\"C\n\x0eSourceResponse\x12\x17\n\x06schema\x18\x01 \x01(\x0b\x32\x07.Schema\x12\x18\n\x07records\x18\x02 \x03(\x0b\x32\x07.Record\"E\n\x10ProcessorRequest\x12\x17\n\x06schema\x18\x01 \x01(\x0b\x32\x07.Schema\x12\x18\n\x07records\x18\x02 \x03(\x0b\x32\x07.Record\"O\n\x11ProcessorResponse\x12\x17\n\x06schema\x18\x01 \x01(\x0b\x32\x07.Schema\x12!\n\x07results\x18\x02 \x03(\x0b\x32\x10.ProcessorResult\"\x84\x01\n\x0fProcessorResult\x12\x11\n\trecord_id\x18\x01 \x01(\x03\x12\x12\n\x05\x65rror\x18\x02 \x01(\tH\x00\x88\x01\x01\x12\x18\n\x07records\x18\x03 \x03(\x0b\x32\x07.Record\x12\x17\n\nerror_type\x18\x04 \x01(\tH\x01\x88\x01\x01\x42\x08\n\x06_errorB\r\n\x0b_error_type\"?\n\x0bSinkRequest\x12\x17\n\x06schema\x18\x01 \x01(\x0b\x32\x07.Schema\x12\x17\n\x06record\x18\x02 \x01(\x0b\x32\x07.Record\"?\n\x0cSinkResponse\x12\x11\n\trecord_id\x18\x01 \x01(\x03\x12\x12\n\x05\x65rror\x18\x02 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error2\xaf\x02\n\x0c\x41gentService\x12\x35\n\nagent_info\x12\x16.google.protobuf.Empty\x1a\r.InfoResponse\"\x00\x12-\n\x04read\x12\x0e.SourceRequest\x1a\x0f.SourceResponse\"\x00(\x01\x30\x01\x12\x36\n\x07process\x12\x11.ProcessorRequest\x1a\x12.ProcessorResponse\"\x00(\x01\x30\x01\x12*\n\x05write\x12\x0c.SinkRequest\x1a\r.SinkResponse\"\x00(\x01\x30\x01\x12U\n\x1aget_topic_producer_records\x12\x19.TopicProducerWriteResult\x1a\x16.TopicProducerResponse\"\x00(\x01\x30\x01\x42\x1d\n\x19\x61i.langstream.agents.grpcP\x01\x62\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( + b'\n\x0b\x61gent.proto\x1a\x1bgoogle/protobuf/empty.proto"!\n\x0cInfoResponse\x12\x11\n\tjson_info\x18\x01 \x01(\t"\xa3\x02\n\x05Value\x12\x11\n\tschema_id\x18\x01 \x01(\x05\x12\x15\n\x0b\x62ytes_value\x18\x02 \x01(\x0cH\x00\x12\x17\n\rboolean_value\x18\x03 \x01(\x08H\x00\x12\x16\n\x0cstring_value\x18\x04 \x01(\tH\x00\x12\x14\n\nbyte_value\x18\x05 \x01(\x05H\x00\x12\x15\n\x0bshort_value\x18\x06 \x01(\x05H\x00\x12\x13\n\tint_value\x18\x07 \x01(\x05H\x00\x12\x14\n\nlong_value\x18\x08 \x01(\x03H\x00\x12\x15\n\x0b\x66loat_value\x18\t \x01(\x02H\x00\x12\x16\n\x0c\x64ouble_value\x18\n \x01(\x01H\x00\x12\x14\n\njson_value\x18\x0b \x01(\tH\x00\x12\x14\n\navro_value\x18\x0c \x01(\x0cH\x00\x42\x0c\n\ntype_oneof"-\n\x06Header\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x15\n\x05value\x18\x02 \x01(\x0b\x32\x06.Value"*\n\x06Schema\x12\x11\n\tschema_id\x18\x01 \x01(\x05\x12\r\n\x05value\x18\x02 \x01(\x0c"\xb3\x01\n\x06Record\x12\x11\n\trecord_id\x18\x01 \x01(\x03\x12\x18\n\x03key\x18\x02 \x01(\x0b\x32\x06.ValueH\x00\x88\x01\x01\x12\x1a\n\x05value\x18\x03 \x01(\x0b\x32\x06.ValueH\x01\x88\x01\x01\x12\x18\n\x07headers\x18\x04 \x03(\x0b\x32\x07.Header\x12\x0e\n\x06origin\x18\x05 \x01(\t\x12\x16\n\ttimestamp\x18\x06 \x01(\x03H\x02\x88\x01\x01\x42\x06\n\x04_keyB\x08\n\x06_valueB\x0c\n\n_timestamp"K\n\x18TopicProducerWriteResult\x12\x11\n\trecord_id\x18\x01 \x01(\x03\x12\x12\n\x05\x65rror\x18\x02 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error"X\n\x15TopicProducerResponse\x12\r\n\x05topic\x18\x01 \x01(\t\x12\x17\n\x06schema\x18\x02 \x01(\x0b\x32\x07.Schema\x12\x17\n\x06record\x18\x03 \x01(\x0b\x32\x07.Record"P\n\x10PermanentFailure\x12\x11\n\trecord_id\x18\x01 \x01(\x03\x12\x15\n\rerror_message\x18\x02 \x01(\t\x12\x12\n\nerror_type\x18\x03 \x01(\t"X\n\rSourceRequest\x12\x19\n\x11\x63ommitted_records\x18\x01 \x03(\x03\x12,\n\x11permanent_failure\x18\x02 \x01(\x0b\x32\x11.PermanentFailure"C\n\x0eSourceResponse\x12\x17\n\x06schema\x18\x01 \x01(\x0b\x32\x07.Schema\x12\x18\n\x07records\x18\x02 \x03(\x0b\x32\x07.Record"E\n\x10ProcessorRequest\x12\x17\n\x06schema\x18\x01 \x01(\x0b\x32\x07.Schema\x12\x18\n\x07records\x18\x02 \x03(\x0b\x32\x07.Record"O\n\x11ProcessorResponse\x12\x17\n\x06schema\x18\x01 \x01(\x0b\x32\x07.Schema\x12!\n\x07results\x18\x02 \x03(\x0b\x32\x10.ProcessorResult"\x84\x01\n\x0fProcessorResult\x12\x11\n\trecord_id\x18\x01 \x01(\x03\x12\x12\n\x05\x65rror\x18\x02 \x01(\tH\x00\x88\x01\x01\x12\x18\n\x07records\x18\x03 \x03(\x0b\x32\x07.Record\x12\x17\n\nerror_type\x18\x04 \x01(\tH\x01\x88\x01\x01\x42\x08\n\x06_errorB\r\n\x0b_error_type"?\n\x0bSinkRequest\x12\x17\n\x06schema\x18\x01 \x01(\x0b\x32\x07.Schema\x12\x17\n\x06record\x18\x02 \x01(\x0b\x32\x07.Record"?\n\x0cSinkResponse\x12\x11\n\trecord_id\x18\x01 \x01(\x03\x12\x12\n\x05\x65rror\x18\x02 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error2\xaf\x02\n\x0c\x41gentService\x12\x35\n\nagent_info\x12\x16.google.protobuf.Empty\x1a\r.InfoResponse"\x00\x12-\n\x04read\x12\x0e.SourceRequest\x1a\x0f.SourceResponse"\x00(\x01\x30\x01\x12\x36\n\x07process\x12\x11.ProcessorRequest\x1a\x12.ProcessorResponse"\x00(\x01\x30\x01\x12*\n\x05write\x12\x0c.SinkRequest\x1a\r.SinkResponse"\x00(\x01\x30\x01\x12U\n\x1aget_topic_producer_records\x12\x19.TopicProducerWriteResult\x1a\x16.TopicProducerResponse"\x00(\x01\x30\x01\x42\x1d\n\x19\x61i.langstream.agents.grpcP\x01\x62\x06proto3' +) _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) -_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'agent_pb2', _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, "agent_pb2", _globals) if _descriptor._USE_C_DESCRIPTORS == False: - _globals['DESCRIPTOR']._options = None - _globals['DESCRIPTOR']._serialized_options = b'\n\031ai.langstream.agents.grpcP\001' - _globals['_INFORESPONSE']._serialized_start=44 - _globals['_INFORESPONSE']._serialized_end=77 - _globals['_VALUE']._serialized_start=80 - _globals['_VALUE']._serialized_end=371 - _globals['_HEADER']._serialized_start=373 - _globals['_HEADER']._serialized_end=418 - _globals['_SCHEMA']._serialized_start=420 - _globals['_SCHEMA']._serialized_end=462 - _globals['_RECORD']._serialized_start=465 - _globals['_RECORD']._serialized_end=644 - _globals['_TOPICPRODUCERWRITERESULT']._serialized_start=646 - _globals['_TOPICPRODUCERWRITERESULT']._serialized_end=721 - _globals['_TOPICPRODUCERRESPONSE']._serialized_start=723 - _globals['_TOPICPRODUCERRESPONSE']._serialized_end=811 - _globals['_PERMANENTFAILURE']._serialized_start=813 - _globals['_PERMANENTFAILURE']._serialized_end=893 - _globals['_SOURCEREQUEST']._serialized_start=895 - _globals['_SOURCEREQUEST']._serialized_end=983 - _globals['_SOURCERESPONSE']._serialized_start=985 - _globals['_SOURCERESPONSE']._serialized_end=1052 - _globals['_PROCESSORREQUEST']._serialized_start=1054 - _globals['_PROCESSORREQUEST']._serialized_end=1123 - _globals['_PROCESSORRESPONSE']._serialized_start=1125 - _globals['_PROCESSORRESPONSE']._serialized_end=1204 - _globals['_PROCESSORRESULT']._serialized_start=1207 - _globals['_PROCESSORRESULT']._serialized_end=1339 - _globals['_SINKREQUEST']._serialized_start=1341 - _globals['_SINKREQUEST']._serialized_end=1404 - _globals['_SINKRESPONSE']._serialized_start=1406 - _globals['_SINKRESPONSE']._serialized_end=1469 - _globals['_AGENTSERVICE']._serialized_start=1472 - _globals['_AGENTSERVICE']._serialized_end=1775 + _globals["DESCRIPTOR"]._options = None + _globals["DESCRIPTOR"]._serialized_options = b"\n\031ai.langstream.agents.grpcP\001" + _globals["_INFORESPONSE"]._serialized_start = 44 + _globals["_INFORESPONSE"]._serialized_end = 77 + _globals["_VALUE"]._serialized_start = 80 + _globals["_VALUE"]._serialized_end = 371 + _globals["_HEADER"]._serialized_start = 373 + _globals["_HEADER"]._serialized_end = 418 + _globals["_SCHEMA"]._serialized_start = 420 + _globals["_SCHEMA"]._serialized_end = 462 + _globals["_RECORD"]._serialized_start = 465 + _globals["_RECORD"]._serialized_end = 644 + _globals["_TOPICPRODUCERWRITERESULT"]._serialized_start = 646 + _globals["_TOPICPRODUCERWRITERESULT"]._serialized_end = 721 + _globals["_TOPICPRODUCERRESPONSE"]._serialized_start = 723 + _globals["_TOPICPRODUCERRESPONSE"]._serialized_end = 811 + _globals["_PERMANENTFAILURE"]._serialized_start = 813 + _globals["_PERMANENTFAILURE"]._serialized_end = 893 + _globals["_SOURCEREQUEST"]._serialized_start = 895 + _globals["_SOURCEREQUEST"]._serialized_end = 983 + _globals["_SOURCERESPONSE"]._serialized_start = 985 + _globals["_SOURCERESPONSE"]._serialized_end = 1052 + _globals["_PROCESSORREQUEST"]._serialized_start = 1054 + _globals["_PROCESSORREQUEST"]._serialized_end = 1123 + _globals["_PROCESSORRESPONSE"]._serialized_start = 1125 + _globals["_PROCESSORRESPONSE"]._serialized_end = 1204 + _globals["_PROCESSORRESULT"]._serialized_start = 1207 + _globals["_PROCESSORRESULT"]._serialized_end = 1339 + _globals["_SINKREQUEST"]._serialized_start = 1341 + _globals["_SINKREQUEST"]._serialized_end = 1404 + _globals["_SINKRESPONSE"]._serialized_start = 1406 + _globals["_SINKRESPONSE"]._serialized_end = 1469 + _globals["_AGENTSERVICE"]._serialized_start = 1472 + _globals["_AGENTSERVICE"]._serialized_end = 1775 # @@protoc_insertion_point(module_scope) diff --git a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/proto/agent_pb2.pyi b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/proto/agent_pb2.pyi index 26142ed1b..22d3e499a 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/proto/agent_pb2.pyi +++ b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/proto/agent_pb2.pyi @@ -2,7 +2,13 @@ from google.protobuf import empty_pb2 as _empty_pb2 from google.protobuf.internal import containers as _containers from google.protobuf import descriptor as _descriptor from google.protobuf import message as _message -from typing import ClassVar as _ClassVar, Iterable as _Iterable, Mapping as _Mapping, Optional as _Optional, Union as _Union +from typing import ( + ClassVar as _ClassVar, + Iterable as _Iterable, + Mapping as _Mapping, + Optional as _Optional, + Union as _Union, +) DESCRIPTOR: _descriptor.FileDescriptor @@ -13,7 +19,20 @@ class InfoResponse(_message.Message): def __init__(self, json_info: _Optional[str] = ...) -> None: ... class Value(_message.Message): - __slots__ = ("schema_id", "bytes_value", "boolean_value", "string_value", "byte_value", "short_value", "int_value", "long_value", "float_value", "double_value", "json_value", "avro_value") + __slots__ = ( + "schema_id", + "bytes_value", + "boolean_value", + "string_value", + "byte_value", + "short_value", + "int_value", + "long_value", + "float_value", + "double_value", + "json_value", + "avro_value", + ) SCHEMA_ID_FIELD_NUMBER: _ClassVar[int] BYTES_VALUE_FIELD_NUMBER: _ClassVar[int] BOOLEAN_VALUE_FIELD_NUMBER: _ClassVar[int] @@ -38,7 +57,21 @@ class Value(_message.Message): double_value: float json_value: str avro_value: bytes - def __init__(self, schema_id: _Optional[int] = ..., bytes_value: _Optional[bytes] = ..., boolean_value: bool = ..., string_value: _Optional[str] = ..., byte_value: _Optional[int] = ..., short_value: _Optional[int] = ..., int_value: _Optional[int] = ..., long_value: _Optional[int] = ..., float_value: _Optional[float] = ..., double_value: _Optional[float] = ..., json_value: _Optional[str] = ..., avro_value: _Optional[bytes] = ...) -> None: ... + def __init__( + self, + schema_id: _Optional[int] = ..., + bytes_value: _Optional[bytes] = ..., + boolean_value: bool = ..., + string_value: _Optional[str] = ..., + byte_value: _Optional[int] = ..., + short_value: _Optional[int] = ..., + int_value: _Optional[int] = ..., + long_value: _Optional[int] = ..., + float_value: _Optional[float] = ..., + double_value: _Optional[float] = ..., + json_value: _Optional[str] = ..., + avro_value: _Optional[bytes] = ..., + ) -> None: ... class Header(_message.Message): __slots__ = ("name", "value") @@ -46,7 +79,11 @@ class Header(_message.Message): VALUE_FIELD_NUMBER: _ClassVar[int] name: str value: Value - def __init__(self, name: _Optional[str] = ..., value: _Optional[_Union[Value, _Mapping]] = ...) -> None: ... + def __init__( + self, + name: _Optional[str] = ..., + value: _Optional[_Union[Value, _Mapping]] = ..., + ) -> None: ... class Schema(_message.Message): __slots__ = ("schema_id", "value") @@ -54,7 +91,9 @@ class Schema(_message.Message): VALUE_FIELD_NUMBER: _ClassVar[int] schema_id: int value: bytes - def __init__(self, schema_id: _Optional[int] = ..., value: _Optional[bytes] = ...) -> None: ... + def __init__( + self, schema_id: _Optional[int] = ..., value: _Optional[bytes] = ... + ) -> None: ... class Record(_message.Message): __slots__ = ("record_id", "key", "value", "headers", "origin", "timestamp") @@ -70,7 +109,15 @@ class Record(_message.Message): headers: _containers.RepeatedCompositeFieldContainer[Header] origin: str timestamp: int - def __init__(self, record_id: _Optional[int] = ..., key: _Optional[_Union[Value, _Mapping]] = ..., value: _Optional[_Union[Value, _Mapping]] = ..., headers: _Optional[_Iterable[_Union[Header, _Mapping]]] = ..., origin: _Optional[str] = ..., timestamp: _Optional[int] = ...) -> None: ... + def __init__( + self, + record_id: _Optional[int] = ..., + key: _Optional[_Union[Value, _Mapping]] = ..., + value: _Optional[_Union[Value, _Mapping]] = ..., + headers: _Optional[_Iterable[_Union[Header, _Mapping]]] = ..., + origin: _Optional[str] = ..., + timestamp: _Optional[int] = ..., + ) -> None: ... class TopicProducerWriteResult(_message.Message): __slots__ = ("record_id", "error") @@ -78,7 +125,9 @@ class TopicProducerWriteResult(_message.Message): ERROR_FIELD_NUMBER: _ClassVar[int] record_id: int error: str - def __init__(self, record_id: _Optional[int] = ..., error: _Optional[str] = ...) -> None: ... + def __init__( + self, record_id: _Optional[int] = ..., error: _Optional[str] = ... + ) -> None: ... class TopicProducerResponse(_message.Message): __slots__ = ("topic", "schema", "record") @@ -88,7 +137,12 @@ class TopicProducerResponse(_message.Message): topic: str schema: Schema record: Record - def __init__(self, topic: _Optional[str] = ..., schema: _Optional[_Union[Schema, _Mapping]] = ..., record: _Optional[_Union[Record, _Mapping]] = ...) -> None: ... + def __init__( + self, + topic: _Optional[str] = ..., + schema: _Optional[_Union[Schema, _Mapping]] = ..., + record: _Optional[_Union[Record, _Mapping]] = ..., + ) -> None: ... class PermanentFailure(_message.Message): __slots__ = ("record_id", "error_message", "error_type") @@ -98,7 +152,12 @@ class PermanentFailure(_message.Message): record_id: int error_message: str error_type: str - def __init__(self, record_id: _Optional[int] = ..., error_message: _Optional[str] = ..., error_type: _Optional[str] = ...) -> None: ... + def __init__( + self, + record_id: _Optional[int] = ..., + error_message: _Optional[str] = ..., + error_type: _Optional[str] = ..., + ) -> None: ... class SourceRequest(_message.Message): __slots__ = ("committed_records", "permanent_failure") @@ -106,7 +165,11 @@ class SourceRequest(_message.Message): PERMANENT_FAILURE_FIELD_NUMBER: _ClassVar[int] committed_records: _containers.RepeatedScalarFieldContainer[int] permanent_failure: PermanentFailure - def __init__(self, committed_records: _Optional[_Iterable[int]] = ..., permanent_failure: _Optional[_Union[PermanentFailure, _Mapping]] = ...) -> None: ... + def __init__( + self, + committed_records: _Optional[_Iterable[int]] = ..., + permanent_failure: _Optional[_Union[PermanentFailure, _Mapping]] = ..., + ) -> None: ... class SourceResponse(_message.Message): __slots__ = ("schema", "records") @@ -114,7 +177,11 @@ class SourceResponse(_message.Message): RECORDS_FIELD_NUMBER: _ClassVar[int] schema: Schema records: _containers.RepeatedCompositeFieldContainer[Record] - def __init__(self, schema: _Optional[_Union[Schema, _Mapping]] = ..., records: _Optional[_Iterable[_Union[Record, _Mapping]]] = ...) -> None: ... + def __init__( + self, + schema: _Optional[_Union[Schema, _Mapping]] = ..., + records: _Optional[_Iterable[_Union[Record, _Mapping]]] = ..., + ) -> None: ... class ProcessorRequest(_message.Message): __slots__ = ("schema", "records") @@ -122,7 +189,11 @@ class ProcessorRequest(_message.Message): RECORDS_FIELD_NUMBER: _ClassVar[int] schema: Schema records: _containers.RepeatedCompositeFieldContainer[Record] - def __init__(self, schema: _Optional[_Union[Schema, _Mapping]] = ..., records: _Optional[_Iterable[_Union[Record, _Mapping]]] = ...) -> None: ... + def __init__( + self, + schema: _Optional[_Union[Schema, _Mapping]] = ..., + records: _Optional[_Iterable[_Union[Record, _Mapping]]] = ..., + ) -> None: ... class ProcessorResponse(_message.Message): __slots__ = ("schema", "results") @@ -130,7 +201,11 @@ class ProcessorResponse(_message.Message): RESULTS_FIELD_NUMBER: _ClassVar[int] schema: Schema results: _containers.RepeatedCompositeFieldContainer[ProcessorResult] - def __init__(self, schema: _Optional[_Union[Schema, _Mapping]] = ..., results: _Optional[_Iterable[_Union[ProcessorResult, _Mapping]]] = ...) -> None: ... + def __init__( + self, + schema: _Optional[_Union[Schema, _Mapping]] = ..., + results: _Optional[_Iterable[_Union[ProcessorResult, _Mapping]]] = ..., + ) -> None: ... class ProcessorResult(_message.Message): __slots__ = ("record_id", "error", "records", "error_type") @@ -142,7 +217,13 @@ class ProcessorResult(_message.Message): error: str records: _containers.RepeatedCompositeFieldContainer[Record] error_type: str - def __init__(self, record_id: _Optional[int] = ..., error: _Optional[str] = ..., records: _Optional[_Iterable[_Union[Record, _Mapping]]] = ..., error_type: _Optional[str] = ...) -> None: ... + def __init__( + self, + record_id: _Optional[int] = ..., + error: _Optional[str] = ..., + records: _Optional[_Iterable[_Union[Record, _Mapping]]] = ..., + error_type: _Optional[str] = ..., + ) -> None: ... class SinkRequest(_message.Message): __slots__ = ("schema", "record") @@ -150,7 +231,11 @@ class SinkRequest(_message.Message): RECORD_FIELD_NUMBER: _ClassVar[int] schema: Schema record: Record - def __init__(self, schema: _Optional[_Union[Schema, _Mapping]] = ..., record: _Optional[_Union[Record, _Mapping]] = ...) -> None: ... + def __init__( + self, + schema: _Optional[_Union[Schema, _Mapping]] = ..., + record: _Optional[_Union[Record, _Mapping]] = ..., + ) -> None: ... class SinkResponse(_message.Message): __slots__ = ("record_id", "error") @@ -158,4 +243,6 @@ class SinkResponse(_message.Message): ERROR_FIELD_NUMBER: _ClassVar[int] record_id: int error: str - def __init__(self, record_id: _Optional[int] = ..., error: _Optional[str] = ...) -> None: ... + def __init__( + self, record_id: _Optional[int] = ..., error: _Optional[str] = ... + ) -> None: ... diff --git a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/proto/agent_pb2_grpc.py b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/proto/agent_pb2_grpc.py index 7472aefa9..e3874da85 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/proto/agent_pb2_grpc.py +++ b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/proto/agent_pb2_grpc.py @@ -1,3 +1,19 @@ +# +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + # Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! """Client and server classes corresponding to protobuf-defined services.""" import grpc @@ -6,7 +22,6 @@ from google.protobuf import empty_pb2 as google_dot_protobuf_dot_empty__pb2 - class AgentServiceStub(object): """Missing associated documentation comment in .proto file.""" @@ -17,30 +32,30 @@ def __init__(self, channel): channel: A grpc.Channel. """ self.agent_info = channel.unary_unary( - '/AgentService/agent_info', - request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString, - response_deserializer=agent__pb2.InfoResponse.FromString, - ) + "/AgentService/agent_info", + request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString, + response_deserializer=agent__pb2.InfoResponse.FromString, + ) self.read = channel.stream_stream( - '/AgentService/read', - request_serializer=agent__pb2.SourceRequest.SerializeToString, - response_deserializer=agent__pb2.SourceResponse.FromString, - ) + "/AgentService/read", + request_serializer=agent__pb2.SourceRequest.SerializeToString, + response_deserializer=agent__pb2.SourceResponse.FromString, + ) self.process = channel.stream_stream( - '/AgentService/process', - request_serializer=agent__pb2.ProcessorRequest.SerializeToString, - response_deserializer=agent__pb2.ProcessorResponse.FromString, - ) + "/AgentService/process", + request_serializer=agent__pb2.ProcessorRequest.SerializeToString, + response_deserializer=agent__pb2.ProcessorResponse.FromString, + ) self.write = channel.stream_stream( - '/AgentService/write', - request_serializer=agent__pb2.SinkRequest.SerializeToString, - response_deserializer=agent__pb2.SinkResponse.FromString, - ) + "/AgentService/write", + request_serializer=agent__pb2.SinkRequest.SerializeToString, + response_deserializer=agent__pb2.SinkResponse.FromString, + ) self.get_topic_producer_records = channel.stream_stream( - '/AgentService/get_topic_producer_records', - request_serializer=agent__pb2.TopicProducerWriteResult.SerializeToString, - response_deserializer=agent__pb2.TopicProducerResponse.FromString, - ) + "/AgentService/get_topic_producer_records", + request_serializer=agent__pb2.TopicProducerWriteResult.SerializeToString, + response_deserializer=agent__pb2.TopicProducerResponse.FromString, + ) class AgentServiceServicer(object): @@ -49,152 +64,213 @@ class AgentServiceServicer(object): def agent_info(self, request, context): """Missing associated documentation comment in .proto file.""" context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") def read(self, request_iterator, context): """Missing associated documentation comment in .proto file.""" context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") def process(self, request_iterator, context): """Missing associated documentation comment in .proto file.""" context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") def write(self, request_iterator, context): """Missing associated documentation comment in .proto file.""" context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") def get_topic_producer_records(self, request_iterator, context): """Missing associated documentation comment in .proto file.""" context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") def add_AgentServiceServicer_to_server(servicer, server): rpc_method_handlers = { - 'agent_info': grpc.unary_unary_rpc_method_handler( - servicer.agent_info, - request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString, - response_serializer=agent__pb2.InfoResponse.SerializeToString, - ), - 'read': grpc.stream_stream_rpc_method_handler( - servicer.read, - request_deserializer=agent__pb2.SourceRequest.FromString, - response_serializer=agent__pb2.SourceResponse.SerializeToString, - ), - 'process': grpc.stream_stream_rpc_method_handler( - servicer.process, - request_deserializer=agent__pb2.ProcessorRequest.FromString, - response_serializer=agent__pb2.ProcessorResponse.SerializeToString, - ), - 'write': grpc.stream_stream_rpc_method_handler( - servicer.write, - request_deserializer=agent__pb2.SinkRequest.FromString, - response_serializer=agent__pb2.SinkResponse.SerializeToString, - ), - 'get_topic_producer_records': grpc.stream_stream_rpc_method_handler( - servicer.get_topic_producer_records, - request_deserializer=agent__pb2.TopicProducerWriteResult.FromString, - response_serializer=agent__pb2.TopicProducerResponse.SerializeToString, - ), + "agent_info": grpc.unary_unary_rpc_method_handler( + servicer.agent_info, + request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString, + response_serializer=agent__pb2.InfoResponse.SerializeToString, + ), + "read": grpc.stream_stream_rpc_method_handler( + servicer.read, + request_deserializer=agent__pb2.SourceRequest.FromString, + response_serializer=agent__pb2.SourceResponse.SerializeToString, + ), + "process": grpc.stream_stream_rpc_method_handler( + servicer.process, + request_deserializer=agent__pb2.ProcessorRequest.FromString, + response_serializer=agent__pb2.ProcessorResponse.SerializeToString, + ), + "write": grpc.stream_stream_rpc_method_handler( + servicer.write, + request_deserializer=agent__pb2.SinkRequest.FromString, + response_serializer=agent__pb2.SinkResponse.SerializeToString, + ), + "get_topic_producer_records": grpc.stream_stream_rpc_method_handler( + servicer.get_topic_producer_records, + request_deserializer=agent__pb2.TopicProducerWriteResult.FromString, + response_serializer=agent__pb2.TopicProducerResponse.SerializeToString, + ), } generic_handler = grpc.method_handlers_generic_handler( - 'AgentService', rpc_method_handlers) + "AgentService", rpc_method_handlers + ) server.add_generic_rpc_handlers((generic_handler,)) - # This class is part of an EXPERIMENTAL API. +# This class is part of an EXPERIMENTAL API. class AgentService(object): """Missing associated documentation comment in .proto file.""" @staticmethod - def agent_info(request, + def agent_info( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_unary( + request, target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): - return grpc.experimental.unary_unary(request, target, '/AgentService/agent_info', + "/AgentService/agent_info", google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString, agent__pb2.InfoResponse.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) @staticmethod - def read(request_iterator, + def read( + request_iterator, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.stream_stream( + request_iterator, target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): - return grpc.experimental.stream_stream(request_iterator, target, '/AgentService/read', + "/AgentService/read", agent__pb2.SourceRequest.SerializeToString, agent__pb2.SourceResponse.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) @staticmethod - def process(request_iterator, + def process( + request_iterator, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.stream_stream( + request_iterator, target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): - return grpc.experimental.stream_stream(request_iterator, target, '/AgentService/process', + "/AgentService/process", agent__pb2.ProcessorRequest.SerializeToString, agent__pb2.ProcessorResponse.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) @staticmethod - def write(request_iterator, + def write( + request_iterator, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.stream_stream( + request_iterator, target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): - return grpc.experimental.stream_stream(request_iterator, target, '/AgentService/write', + "/AgentService/write", agent__pb2.SinkRequest.SerializeToString, agent__pb2.SinkResponse.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) @staticmethod - def get_topic_producer_records(request_iterator, + def get_topic_producer_records( + request_iterator, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.stream_stream( + request_iterator, target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): - return grpc.experimental.stream_stream(request_iterator, target, '/AgentService/get_topic_producer_records', + "/AgentService/get_topic_producer_records", agent__pb2.TopicProducerWriteResult.SerializeToString, agent__pb2.TopicProducerResponse.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) diff --git a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/tests/test_grpc_processor.py b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/tests/test_grpc_processor.py index 09d78d481..43c2aef3a 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/tests/test_grpc_processor.py +++ b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/tests/test_grpc_processor.py @@ -186,6 +186,7 @@ async def test_failing_record(): assert response.results[0].error == "failure" assert response.results[0].error_type == "INTERNAL_ERROR" + async def test_failing_record_bad_record(): async with ServerAndStub( "langstream_grpc.tests.test_grpc_processor.MyFailingProcessorForBadRecord" @@ -300,9 +301,11 @@ class MyFailingProcessor(Processor): def process(self, record: Record) -> List[RecordType]: raise Exception("failure") + class MyFailingProcessorForBadRecord(Processor): def process(self, record: Record) -> List[RecordType]: from langstream_grpc.util import InvalidRecordError + raise InvalidRecordError("this record is invalid") diff --git a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/util.py b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/util.py index d70754d54..28dbc9a31 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/util.py +++ b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/util.py @@ -72,4 +72,4 @@ class AvroValue(object): class InvalidRecordError(Exception): - pass + pass diff --git a/langstream-runtime/langstream-runtime-impl/src/main/python/scripts/generate-grpc-code.sh b/langstream-runtime/langstream-runtime-impl/src/main/python/scripts/generate-grpc-code.sh index 2ce72fc48..651a22c9d 100755 --- a/langstream-runtime/langstream-runtime-impl/src/main/python/scripts/generate-grpc-code.sh +++ b/langstream-runtime/langstream-runtime-impl/src/main/python/scripts/generate-grpc-code.sh @@ -1,4 +1,20 @@ #!/bin/bash +# +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + poetry install grpc_proto_dir=../../../../../langstream-agents/langstream-agent-grpc/src/main/proto/langstream_grpc/proto out_dir=./langstream_grpc/proto diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/agents/ErrorHandlingTest.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/agents/ErrorHandlingTest.java index a2dcd1c9d..0bf522ff6 100644 --- a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/agents/ErrorHandlingTest.java +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/agents/ErrorHandlingTest.java @@ -37,6 +37,10 @@ @Slf4j class ErrorHandlingTest extends AbstractGenericStreamingApplicationRunner { + public ErrorHandlingTest() { + super("pulsar"); + } + @Test public void testDiscardErrors() throws Exception { String inputTopic = "input-topic-" + UUID.randomUUID(); @@ -53,9 +57,6 @@ public void testDiscardErrors() throws Exception { topics: - name: "%s" creation-mode: create-if-not-exists - options: - # we want to read more than one record at a time - consumer.max.poll.records: 10 - name: "%s" creation-mode: create-if-not-exists errors: @@ -139,27 +140,51 @@ public void testDeadLetter() throws Exception { .key(null) .headers( List.of( + SimpleRecord.SimpleHeader.of( + "langstream-error-type", + "INTERNAL_ERROR"), SimpleRecord.SimpleHeader.of( "cause-class", "ai.langstream.mockagents.MockProcessorAgentsCodeProvider$InjectedFailure"), + SimpleRecord.SimpleHeader.of( + "langstream-error-cause-class", + "ai.langstream.mockagents.MockProcessorAgentsCodeProvider$InjectedFailure"), SimpleRecord.SimpleHeader.of( "error-class", "ai.langstream.runtime.agent.AgentRunner$PermanentFailureException"), + SimpleRecord.SimpleHeader.of( + "langstream-error-class", + "ai.langstream.runtime.agent.AgentRunner$PermanentFailureException"), SimpleRecord.SimpleHeader.of( "source-topic", - "persistent://public/default/" - + inputTopic), + "persistent://public/default/input"), + SimpleRecord.SimpleHeader.of( + "langstream-error-source-topic", + "persistent://public/default/input"), SimpleRecord.SimpleHeader.of( "cause-msg", "Failing on content: fail-me-" + i), + SimpleRecord.SimpleHeader.of( + "langstream-error-cause-message", + "Failing on content: fail-me-" + i), SimpleRecord.SimpleHeader.of( "root-cause-class", "ai.langstream.mockagents.MockProcessorAgentsCodeProvider$InjectedFailure"), + SimpleRecord.SimpleHeader.of( + "langstream-error-root-cause-class", + "ai.langstream.mockagents.MockProcessorAgentsCodeProvider$InjectedFailure"), SimpleRecord.SimpleHeader.of( "root-cause-msg", "Failing on content: fail-me-" + i), + SimpleRecord.SimpleHeader.of( + "langstream-error-root-cause-message", + "Failing on content: fail-me-" + i), SimpleRecord.SimpleHeader.of( "error-msg", + "ai.langstream.mockagents.MockProcessorAgentsCodeProvider$InjectedFailure: Failing on content: fail-me-" + + i), + SimpleRecord.SimpleHeader.of( + "langstream-error-message", "ai.langstream.mockagents.MockProcessorAgentsCodeProvider$InjectedFailure: Failing on content: fail-me-" + i))) .build()); @@ -189,9 +214,6 @@ public void testFailOnErrors() throws Exception { topics: - name: "%s" creation-mode: create-if-not-exists - options: - # we want to read more than one record at a time - consumer.max.poll.records: 10 - name: "%s" creation-mode: create-if-not-exists errors: @@ -258,9 +280,6 @@ public void testDiscardErrorsOnSink() throws Exception { topics: - name: "%s" creation-mode: create-if-not-exists - options: - # we want to read more than one record at a time - consumer.max.poll.records: 10 errors: on-failure: fail retries: 5 @@ -314,9 +333,6 @@ public void testFailOnErrorsOnSink() throws Exception { topics: - name: "%s" creation-mode: create-if-not-exists - options: - # we want to read more than one record at a time - consumer.max.poll.records: 10 errors: on-failure: skip retries: 5 @@ -383,7 +399,6 @@ public void testDeadLetterOnSink() throws Exception { input: "%s" errors: on-failure: dead-letter - retries: 3 configuration: fail-on-content: "fail-me" """ @@ -395,7 +410,7 @@ public void testDeadLetterOnSink() throws Exception { TopicConsumer consumerDeadletter = createConsumer(inputTopic + "-deadletter")) { List expectedMessages = new ArrayList<>(); - List expectedMessagesDeadletter = new ArrayList<>(); + List expectedMessagesDeadletter = new ArrayList<>(); for (int i = 0; i < 10; i++) { sendMessage(producer, "fail-me-" + i); sendMessage(producer, "keep-me-" + i); @@ -403,9 +418,9 @@ public void testDeadLetterOnSink() throws Exception { expectedMessagesDeadletter.add("fail-me-" + i); } - executeAgentRunners(applicationRuntime); + executeAgentRunners(applicationRuntime, 20); - waitForMessages(consumerDeadletter, expectedMessagesDeadletter); + waitForMessagesInAnyOrder(consumerDeadletter, expectedMessagesDeadletter); Awaitility.await() .untilAsserted(