From 1c4e3688e8fa20bd3fe8b5dff9fc50f0e2d86acf Mon Sep 17 00:00:00 2001 From: Christophe Bornet Date: Wed, 13 Dec 2023 00:05:19 +0100 Subject: [PATCH] Set gRPC message limit to 2GB (#746) --- .../agents/grpc/AbstractGrpcAgent.java | 6 +++++- .../python/langstream_grpc/grpc_service.py | 7 ++++++- .../langstream_grpc/tests/server_and_stub.py | 8 +++++++- .../tests/test_grpc_processor.py | 19 +++++++++++++++++++ 4 files changed, 37 insertions(+), 3 deletions(-) diff --git a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/AbstractGrpcAgent.java b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/AbstractGrpcAgent.java index acdf2fe26..c21d5dcff 100644 --- a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/AbstractGrpcAgent.java +++ b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/AbstractGrpcAgent.java @@ -100,7 +100,11 @@ public void start() throws Exception { } blockingStub = AgentServiceGrpc.newBlockingStub(channel).withDeadlineAfter(30, TimeUnit.SECONDS); - asyncStub = AgentServiceGrpc.newStub(channel).withWaitForReady(); + asyncStub = + AgentServiceGrpc.newStub(channel) + .withWaitForReady() + .withMaxInboundMessageSize(Integer.MAX_VALUE) + .withMaxOutboundMessageSize(Integer.MAX_VALUE); topicProducerWriteResults = new CompletableFuture<>(); topicProducerWriteResults.complete( diff --git a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/grpc_service.py b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/grpc_service.py index d1e9a28fa..057699a50 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/grpc_service.py +++ b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/grpc_service.py @@ -415,7 +415,12 @@ def get_topic_producer(self) -> TopicProducer: class AgentServer(object): def __init__(self, target: str): self.target = target - self.grpc_server = grpc.aio.server() + self.grpc_server = grpc.aio.server( + options=[ + ("grpc.max_send_message_length", 0x7FFFFFFF), + ("grpc.max_receive_message_length", 0x7FFFFFFF), + ] + ) self.port = self.grpc_server.add_insecure_port(target) self.agent = None diff --git a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/tests/server_and_stub.py b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/tests/server_and_stub.py index ea294cbbd..4ff959665 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/tests/server_and_stub.py +++ b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/tests/server_and_stub.py @@ -36,7 +36,13 @@ async def __aenter__(self): self.server = AgentServer("[::]:0") await self.server.init(json.dumps(self.config), json.dumps(self.context)) await self.server.start() - self.channel = grpc.aio.insecure_channel("localhost:%d" % self.server.port) + self.channel = grpc.aio.insecure_channel( + "localhost:%d" % self.server.port, + options=[ + ("grpc.max_send_message_length", 0x7FFFFFFF), + ("grpc.max_receive_message_length", 0x7FFFFFFF), + ], + ) self.stub = AgentServiceStub(channel=self.channel) return self diff --git a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/tests/test_grpc_processor.py b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/tests/test_grpc_processor.py index 3692bc432..82eefae20 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/tests/test_grpc_processor.py +++ b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/tests/test_grpc_processor.py @@ -201,6 +201,25 @@ async def test_future_record(klass): assert response.results[0].records[0].value.string_value == "test" +async def test_big_record(): + async with ServerAndStub( + "langstream_grpc.tests.test_grpc_processor.MyProcessor" + ) as server_and_stub: + long_string = "a" * 10_000_000 + response: ProcessorResponse + async for response in server_and_stub.stub.process( + [ + ProcessorRequest( + records=[GrpcRecord(value=Value(string_value=long_string))] + ) + ] + ): + assert len(response.results) == 1 + assert response.results[0].HasField("error") is False + assert len(response.results[0].records) == 1 + assert response.results[0].records[0].value.string_value == long_string + + async def test_info(): async with ServerAndStub( "langstream_grpc.tests.test_grpc_processor.MyProcessor"