Skip to content

Commit

Permalink
Changes following review
Browse files Browse the repository at this point in the history
  • Loading branch information
cbornet committed Nov 14, 2023
1 parent 5aa62af commit 7d0bdcf
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -72,8 +73,8 @@ abstract class AbstractGrpcAgent extends AbstractAgentCode {
@Getter protected volatile boolean startFailedButDevelopmentMode = false;
protected AgentServiceGrpc.AgentServiceStub asyncStub;

AtomicReference<StreamObserver<TopicProducerWriteResult>> topicProducerWriteResults =
new AtomicReference<>();
protected CompletableFuture<StreamObserver<TopicProducerWriteResult>>
topicProducerWriteResults = CompletableFuture.completedFuture(null);

private final Map<String, TopicProducer> topicProducers = new ConcurrentHashMap<>();

Expand Down Expand Up @@ -103,22 +104,27 @@ public void start() throws Exception {
AgentServiceGrpc.newBlockingStub(channel).withDeadlineAfter(30, TimeUnit.SECONDS);
asyncStub = AgentServiceGrpc.newStub(channel).withWaitForReady();

topicProducerWriteResults.set(
topicProducerWriteResults = new CompletableFuture<>();
topicProducerWriteResults.complete(
asyncStub.getTopicProducerRecords(
new StreamObserver<>() {
@Override
public void onNext(TopicProducerRecord topicProducerRecord) {
TopicProducer topicProducer =
topicProducers.computeIfAbsent(
topicProducerRecord.getTopic(),
topic ->
agentContext
.getTopicConnectionProvider()
.createProducer(
agentContext
.getGlobalAgentId(),
topic,
Map.of()));
topic -> {
TopicProducer tp =
agentContext
.getTopicConnectionProvider()
.createProducer(
agentContext
.getGlobalAgentId(),
topic,
Map.of());
tp.start();
return tp;
});
try {
topicProducer
.write(fromGrpc(topicProducerRecord.getRecord()))
Expand Down Expand Up @@ -149,20 +155,40 @@ public void onNext(TopicProducerRecord topicProducerRecord) {

@Override
public void onError(Throwable throwable) {
agentContext.criticalFailure(throwable);
if (!restarting.get()) {
agentContext.criticalFailure(
new RuntimeException(
"getTopicProducerRecords: gRPC server sent error: %s"
.formatted(throwable.getMessage()),
throwable));
} else {
log.info(
"getTopicProducerRecords: ignoring error during restart {}",
throwable + "");
}
}

@Override
public void onCompleted() {
agentContext.criticalFailure(
new RuntimeException("Unexpected completion"));
if (!restarting.get()) {
agentContext.criticalFailure(
new RuntimeException(
"getTopicProducerRecords: gRPC server completed the stream unexpectedly"));
} else {
log.info(
"getTopicProducerRecords: ignoring error server stop during restart");
}
}
}));
}

private synchronized void sendTopicProducerWriteResult(
TopicProducerWriteResult.Builder result) {
topicProducerWriteResults.get().onNext(result.build());
try {
topicProducerWriteResults.get().onNext(result.build());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}

@Override
Expand All @@ -181,6 +207,16 @@ protected Map<String, Object> buildAdditionalInfo() {
}

protected synchronized void stopBeforeRestart() throws Exception {
restarting.set(true);
StreamObserver<TopicProducerWriteResult> topicProducerWriteResultStreamObserver =
topicProducerWriteResults.get();
if (topicProducerWriteResultStreamObserver != null) {
try {
topicProducerWriteResultStreamObserver.onCompleted();
} catch (IllegalStateException e) {
log.info("Ignoring error while stopping {}", e + "");
}
}
stopChannel(false);
}

Expand All @@ -199,7 +235,6 @@ public void stopChannel(boolean wait) throws Exception {

@Override
public synchronized void close() throws Exception {
topicProducerWriteResults.get().onCompleted();
stopBeforeRestart();
stopChannel(true);
for (TopicProducer topicProducer : topicProducers.values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

@Slf4j
public class AbstractGrpcAgentTest {

private Server server;
Expand Down Expand Up @@ -171,15 +173,17 @@ void testTopicProducerError() throws Exception {
TestAgentContext context = new TestAgentContextFailure();
startProcessor(context);
assertEquals(
"INTERNAL: test-error", context.failure.get(15, TimeUnit.SECONDS).getMessage());
"getTopicProducerRecords: gRPC server sent error: INTERNAL: test-error",
context.failure.get(15, TimeUnit.SECONDS).getMessage());
}

@Test
void testTopicProducerComplete() throws Exception {
TestAgentContextCompleting context = new TestAgentContextCompleting();
startProcessor(context);
assertEquals(
"Unexpected completion", context.failure.get(5, TimeUnit.SECONDS).getMessage());
"getTopicProducerRecords: gRPC server completed the stream unexpectedly",
context.failure.get(5, TimeUnit.SECONDS).getMessage());
}

private void startProcessor(AgentContext context) throws Exception {
Expand Down Expand Up @@ -240,6 +244,7 @@ public long getTotalIn() {

@Override
public void criticalFailure(Throwable error) {
log.info("TestAgentContext critical failure", error);
failure.complete(error);
}

Expand Down

0 comments on commit 7d0bdcf

Please sign in to comment.