Skip to content

Commit

Permalink
Limit the number of read records to prevent OOME (#463)
Browse files Browse the repository at this point in the history
  • Loading branch information
cbornet authored Sep 21, 2023
1 parent e736ca8 commit 2f02b06
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,18 @@
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class GrpcAgentSource extends AbstractGrpcAgent implements AgentSource {

private static final int MAX_RECORDS_PER_READ = 10_000;
private static final int MAX_RECORDS_PER_READ = 1000;
public static final int READ_BUFER_SIZE = 1000;
private StreamObserver<SourceRequest> request;
private final StreamObserver<SourceResponse> responseObserver;

// TODO: use a bounded queue ? backpressure ?
private final ConcurrentLinkedQueue<Record> readRecords = new ConcurrentLinkedQueue<>();
private final LinkedBlockingQueue<Record> readRecords =
new LinkedBlockingQueue<>(READ_BUFER_SIZE);

public GrpcAgentSource() {
super();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,7 @@ public ManagedChannel start() throws Exception {
processBuilder.environment().put("NLTK_DATA", "/app/nltk_data");
pythonProcess = processBuilder.start();
ManagedChannel channel =
ManagedChannelBuilder.forAddress("localhost", port)
.directExecutor()
.usePlaintext()
.build();
ManagedChannelBuilder.forAddress("localhost", port).usePlaintext().build();
AgentServiceGrpc.AgentServiceBlockingStub stub =
AgentServiceGrpc.newBlockingStub(channel).withDeadlineAfter(30, TimeUnit.SECONDS);
for (int i = 0; ; i++) {
Expand Down

0 comments on commit 2f02b06

Please sign in to comment.