From 2f02b068205db98123248cbe519ae63c7f06bccb Mon Sep 17 00:00:00 2001 From: Christophe Bornet Date: Thu, 21 Sep 2023 12:27:14 +0200 Subject: [PATCH] Limit the number of read records to prevent OOME (#463) --- .../ai/langstream/agents/grpc/GrpcAgentSource.java | 10 +++++----- .../ai/langstream/agents/grpc/PythonGrpcServer.java | 5 +---- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/GrpcAgentSource.java b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/GrpcAgentSource.java index f14fa597a..d2fad5fcd 100644 --- a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/GrpcAgentSource.java +++ b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/GrpcAgentSource.java @@ -21,18 +21,18 @@ import io.grpc.stub.StreamObserver; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingQueue; import lombok.extern.slf4j.Slf4j; @Slf4j public class GrpcAgentSource extends AbstractGrpcAgent implements AgentSource { - private static final int MAX_RECORDS_PER_READ = 10_000; + private static final int MAX_RECORDS_PER_READ = 1000; + public static final int READ_BUFER_SIZE = 1000; private StreamObserver request; private final StreamObserver responseObserver; - - // TODO: use a bounded queue ? backpressure ? - private final ConcurrentLinkedQueue readRecords = new ConcurrentLinkedQueue<>(); + private final LinkedBlockingQueue readRecords = + new LinkedBlockingQueue<>(READ_BUFER_SIZE); public GrpcAgentSource() { super(); diff --git a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcServer.java b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcServer.java index d6b91bb27..6948ec11b 100644 --- a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcServer.java +++ b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcServer.java @@ -73,10 +73,7 @@ public ManagedChannel start() throws Exception { processBuilder.environment().put("NLTK_DATA", "/app/nltk_data"); pythonProcess = processBuilder.start(); ManagedChannel channel = - ManagedChannelBuilder.forAddress("localhost", port) - .directExecutor() - .usePlaintext() - .build(); + ManagedChannelBuilder.forAddress("localhost", port).usePlaintext().build(); AgentServiceGrpc.AgentServiceBlockingStub stub = AgentServiceGrpc.newBlockingStub(channel).withDeadlineAfter(30, TimeUnit.SECONDS); for (int i = 0; ; i++) {