From 42fce41b8af8edbe542fe2d4a73ea2cd201b9352 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexandre=20Del=C3=A8gue?= Date: Thu, 23 Nov 2023 09:51:13 +0100 Subject: [PATCH] More control on event publishing --- thoth-core-reactor/build.gradle | 1 + .../ReactorKafkaEventPublisher.java | 5 +++-- .../eventsourcing/KafkaEventPublisherTest.java | 5 ----- .../src/test/resources/logback.xml | 18 ++++++++++++++++++ 4 files changed, 22 insertions(+), 7 deletions(-) create mode 100644 thoth-core-reactor/src/test/resources/logback.xml diff --git a/thoth-core-reactor/build.gradle b/thoth-core-reactor/build.gradle index 6059008e..780a4c6c 100644 --- a/thoth-core-reactor/build.gradle +++ b/thoth-core-reactor/build.gradle @@ -23,6 +23,7 @@ dependencies { testImplementation("org.scalatest:scalatest_$scalaVersion:3.0.8") testImplementation("org.testcontainers:kafka:1.18.0") testImplementation "org.testcontainers:junit-jupiter:1.18.0" + testImplementation "ch.qos.logback:logback-classic:1.4.8" } test { diff --git a/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher.java b/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher.java index 6334fdf5..ba4ce12e 100644 --- a/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher.java +++ b/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher.java @@ -92,7 +92,7 @@ public void start(EventStore eventStore, Concur eventStore.commitOrRollback(Option.of(e), tx); LOGGER.error("Error replaying non published events to kafka for " + topic, e); }) - .retryWhen(Retry.backoff(Long.MAX_VALUE, restartInterval) + .retryWhen(Retry.backoff(10, restartInterval) .transientErrors(true) .maxBackoff(maxRestartInterval) .doBeforeRetry(ctx -> { @@ -101,6 +101,7 @@ public void start(EventStore eventStore, Concur ) .collectList() .map(__ -> Tuple.empty()) + .onErrorReturn(Tuple.empty()) .switchIfEmpty(Mono.just(Tuple.empty())); }) .concatMap(__ -> @@ -173,7 +174,7 @@ public CompletionStage publish(List> eve @Override public void close() throws IOException { - if (Objects.nonNull(killSwitch)) { + if (Objects.nonNull(killSwitch) && !killSwitch.isDisposed()) { try { this.killSwitch.dispose(); } catch (UnsupportedOperationException e) { diff --git a/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/KafkaEventPublisherTest.java b/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/KafkaEventPublisherTest.java index bc272a27..ebf7d4f9 100644 --- a/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/KafkaEventPublisherTest.java +++ b/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/KafkaEventPublisherTest.java @@ -99,10 +99,7 @@ public void eventConsumption() throws IOException, InterruptedException { .receive() .map(ConsumerRecord::value) .map(KafkaEventPublisherTest::deserialize) - .doOnNext(elt -> System.out.println("next : "+elt)) .take(3) - .doOnNext(elt -> System.out.println("Group : " + elt)) - .doOnError(e -> e.printStackTrace()) .map(e -> { println(e); return e; @@ -165,8 +162,6 @@ public void eventConsumptionWithEventFromDb() throws IOException, InterruptedExc ) .receive() .map(ConsumerRecord::value) - .doOnNext(elt -> System.out.println(elt)) - .doOnError(e -> e.printStackTrace()) .take(6) .timeout(Duration.of(20, ChronoUnit.SECONDS)) .collectList() diff --git a/thoth-core-reactor/src/test/resources/logback.xml b/thoth-core-reactor/src/test/resources/logback.xml new file mode 100644 index 00000000..197b6c54 --- /dev/null +++ b/thoth-core-reactor/src/test/resources/logback.xml @@ -0,0 +1,18 @@ + + + + %logger{15} - %message%n%xException{10} + + + + + + + + + + + + + +