Skip to content

Commit

Permalink
Set gRPC message limit to 2GB
Browse files Browse the repository at this point in the history
  • Loading branch information
cbornet committed Dec 12, 2023
1 parent e908495 commit 9d91f25
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,11 @@ public void start() throws Exception {
}
blockingStub =
AgentServiceGrpc.newBlockingStub(channel).withDeadlineAfter(30, TimeUnit.SECONDS);
asyncStub = AgentServiceGrpc.newStub(channel).withWaitForReady();
asyncStub =
AgentServiceGrpc.newStub(channel)
.withWaitForReady()
.withMaxInboundMessageSize(Integer.MAX_VALUE)
.withMaxOutboundMessageSize(Integer.MAX_VALUE);

topicProducerWriteResults = new CompletableFuture<>();
topicProducerWriteResults.complete(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,12 @@ void testEmpty() throws Exception {
assertProcessSuccessful(processor, SimpleRecord.builder().build());
}

@Test
void testBigRecord() throws Exception {
assertProcessSuccessful(
processor, SimpleRecord.builder().value("a".repeat(1_000_000)).build());
}

@Test
void testFailingRecord() throws Exception {
Record inputRecord = SimpleRecord.builder().origin("failing-origin").build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,12 @@ def get_topic_producer(self) -> TopicProducer:
class AgentServer(object):
def __init__(self, target: str):
self.target = target
self.grpc_server = grpc.aio.server()
self.grpc_server = grpc.aio.server(
options=[
("grpc.max_send_message_length", 0x7FFFFFFF),
("grpc.max_receive_message_length", 0x7FFFFFFF),
]
)
self.port = self.grpc_server.add_insecure_port(target)
self.agent = None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,13 @@ async def __aenter__(self):
self.server = AgentServer("[::]:0")
await self.server.init(json.dumps(self.config), json.dumps(self.context))
await self.server.start()
self.channel = grpc.aio.insecure_channel("localhost:%d" % self.server.port)
self.channel = grpc.aio.insecure_channel(
"localhost:%d" % self.server.port,
options=[
("grpc.max_send_message_length", 0x7FFFFFFF),
("grpc.max_receive_message_length", 0x7FFFFFFF),
],
)
self.stub = AgentServiceStub(channel=self.channel)
return self

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,25 @@ async def test_future_record(klass):
assert response.results[0].records[0].value.string_value == "test"


async def test_big_record():
async with ServerAndStub(
"langstream_grpc.tests.test_grpc_processor.MyProcessor"
) as server_and_stub:
long_string = "a" * 10_000_000
response: ProcessorResponse
async for response in server_and_stub.stub.process(
[
ProcessorRequest(
records=[GrpcRecord(value=Value(string_value=long_string))]
)
]
):
assert len(response.results) == 1
assert response.results[0].HasField("error") is False
assert len(response.results[0].records) == 1
assert response.results[0].records[0].value.string_value == long_string


async def test_info():
async with ServerAndStub(
"langstream_grpc.tests.test_grpc_processor.MyProcessor"
Expand Down

0 comments on commit 9d91f25

Please sign in to comment.