Skip to content

Commit

Permalink
CAMEL-20199: Remove synchronized blocks from components R to S
Browse files Browse the repository at this point in the history
  • Loading branch information
essobedo committed Nov 15, 2024
1 parent a4c35c9 commit e43cc82
Show file tree
Hide file tree
Showing 31 changed files with 900 additions and 613 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.camel.Exchange;
import org.reactivestreams.Subscriber;
Expand All @@ -37,6 +39,7 @@ public class ReactiveStreamsCamelSubscriber implements Subscriber<Exchange>, Clo
*/
private static final long UNBOUNDED_REQUESTS = Long.MAX_VALUE;

private final Lock lock = new ReentrantLock();
private final String name;

private ReactiveStreamsConsumer consumer;
Expand All @@ -52,22 +55,33 @@ public ReactiveStreamsCamelSubscriber(String name) {
}

public void attachConsumer(ReactiveStreamsConsumer consumer) {
synchronized (this) {
lock.lock();
try {
if (this.consumer != null) {
throw new IllegalStateException("A consumer is already attached to the stream '" + name + "'");
}
this.consumer = consumer;
} finally {
lock.unlock();
}
refill();
}

public synchronized ReactiveStreamsConsumer getConsumer() {
return consumer;
public ReactiveStreamsConsumer getConsumer() {
lock.lock();
try {
return consumer;
} finally {
lock.unlock();
}
}

public void detachConsumer() {
synchronized (this) {
lock.lock();
try {
this.consumer = null;
} finally {
lock.unlock();
}
}

Expand All @@ -78,12 +92,15 @@ public void onSubscribe(Subscription subscription) {
}

boolean allowed = true;
synchronized (this) {
lock.lock();
try {
if (this.subscription != null) {
allowed = false;
} else {
this.subscription = subscription;
}
} finally {
lock.unlock();
}

if (!allowed) {
Expand All @@ -101,7 +118,8 @@ public void onNext(Exchange exchange) {
}

ReactiveStreamsConsumer target;
synchronized (this) {
lock.lock();
try {
if (requested < UNBOUNDED_REQUESTS) {
// When there are UNBOUNDED_REQUESTS, they remain constant
requested--;
Expand All @@ -110,12 +128,17 @@ public void onNext(Exchange exchange) {
if (target != null) {
inflightCount++;
}
} finally {
lock.unlock();
}

if (target != null) {
target.process(exchange, doneSync -> {
synchronized (this) {
lock.lock();
try {
inflightCount--;
} finally {
lock.unlock();
}

refill();
Expand All @@ -129,7 +152,8 @@ public void onNext(Exchange exchange) {
protected void refill() {
Long toBeRequested = null;
Subscription subs = null;
synchronized (this) {
lock.lock();
try {
if (consumer != null && this.subscription != null) {
Integer consMax = consumer.getEndpoint().getMaxInflightExchanges();
long max = (consMax != null && consMax > 0) ? consMax.longValue() : UNBOUNDED_REQUESTS;
Expand All @@ -144,6 +168,8 @@ protected void refill() {
}
}
}
} finally {
lock.unlock();
}

if (toBeRequested != null) {
Expand All @@ -160,9 +186,12 @@ public void onError(Throwable throwable) {
LOG.error("Error in reactive stream '{}'", name, throwable);

ReactiveStreamsConsumer consumer;
synchronized (this) {
lock.lock();
try {
consumer = this.consumer;
this.subscription = null;
} finally {
lock.unlock();
}

if (consumer != null) {
Expand All @@ -176,9 +205,12 @@ public void onComplete() {
LOG.info("Reactive stream '{}' completed", name);

ReactiveStreamsConsumer consumer;
synchronized (this) {
lock.lock();
try {
consumer = this.consumer;
this.subscription = null;
} finally {
lock.unlock();
}

if (consumer != null) {
Expand All @@ -189,8 +221,11 @@ public void onComplete() {
@Override
public void close() throws IOException {
Subscription subscription;
synchronized (this) {
lock.lock();
try {
subscription = this.subscription;
} finally {
lock.unlock();
}

if (subscription != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,29 +161,34 @@ public void setThreadPoolMaxSize(int threadPoolMaxSize) {
*
* @return the reactive streams service
*/
public synchronized CamelReactiveStreamsService getReactiveStreamsService() {
if (reactiveStreamsEngineConfiguration == null) {
reactiveStreamsEngineConfiguration = new ReactiveStreamsEngineConfiguration();
reactiveStreamsEngineConfiguration.setThreadPoolMaxSize(threadPoolMaxSize);
reactiveStreamsEngineConfiguration.setThreadPoolMinSize(threadPoolMinSize);
reactiveStreamsEngineConfiguration.setThreadPoolName(threadPoolName);
}
public CamelReactiveStreamsService getReactiveStreamsService() {
lock.lock();
try {
if (reactiveStreamsEngineConfiguration == null) {
reactiveStreamsEngineConfiguration = new ReactiveStreamsEngineConfiguration();
reactiveStreamsEngineConfiguration.setThreadPoolMaxSize(threadPoolMaxSize);
reactiveStreamsEngineConfiguration.setThreadPoolMinSize(threadPoolMinSize);
reactiveStreamsEngineConfiguration.setThreadPoolName(threadPoolName);
}

if (service == null) {
this.service = ReactiveStreamsHelper.resolveReactiveStreamsService(
getCamelContext(),
this.serviceType,
this.reactiveStreamsEngineConfiguration);

try {
// Start the service and add it to the Camel context to expose managed attributes
getCamelContext().addService(service, true, true);
} catch (Exception e) {
throw new RuntimeCamelException(e);
if (service == null) {
this.service = ReactiveStreamsHelper.resolveReactiveStreamsService(
getCamelContext(),
this.serviceType,
this.reactiveStreamsEngineConfiguration);

try {
// Start the service and add it to the Camel context to expose managed attributes
getCamelContext().addService(service, true, true);
} catch (Exception e) {
throw new RuntimeCamelException(e);
}
}
}

return service;
return service;
} finally {
lock.unlock();
}
}

// ****************************************
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
Expand Down Expand Up @@ -126,41 +128,54 @@ private final class MonoSubscription implements Subscription {
private volatile boolean requested;

private final Subscriber<? super T> subscriber;
private final Lock lock = new ReentrantLock();

private MonoSubscription(Subscriber<? super T> subscriber) {
this.subscriber = subscriber;
}

@Override
public void request(long l) {
synchronized (this) {
lock.lock();
try {
if (terminated) {
// just ignore the request
return;
}
} finally {
lock.unlock();
}

if (l <= 0) {
subscriber.onError(new IllegalArgumentException("3.9"));
synchronized (this) {
lock.lock();
try {
terminated = true;
} finally {
lock.unlock();
}
} else {
synchronized (this) {
lock.lock();
try {
requested = true;
} finally {
lock.unlock();
}
}

flushCycle();
}

public void flush() {
synchronized (this) {
lock.lock();
try {
if (!isReady()) {
return;
}

terminated = true;
} finally {
lock.unlock();
}

if (data != null) {
Expand All @@ -180,8 +195,13 @@ public boolean isReady() {
}

@Override
public synchronized void cancel() {
terminated = true;
public void cancel() {
lock.lock();
try {
terminated = true;
} finally {
lock.unlock();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.camel.Exchange;
import org.apache.camel.component.reactive.streams.ReactiveStreamsBackpressureStrategy;
Expand All @@ -40,6 +42,7 @@ final class ReactorCamelProcessor implements Closeable {
private final AtomicReference<FluxSink<Exchange>> camelSink;

private final ReactorStreamsService service;
private final Lock lock = new ReentrantLock();
private ReactiveStreamsProducer camelProducer;

ReactorCamelProcessor(ReactorStreamsService service, String name) {
Expand All @@ -64,41 +67,51 @@ Publisher<Exchange> getPublisher() {
return publisher;
}

synchronized void attach(ReactiveStreamsProducer producer) {
Objects.requireNonNull(producer, "producer cannot be null, use the detach method");
void attach(ReactiveStreamsProducer producer) {
lock.lock();
try {
Objects.requireNonNull(producer, "producer cannot be null, use the detach method");

if (this.camelProducer != null) {
throw new IllegalStateException("A producer is already attached to the stream '" + name + "'");
}

if (this.camelProducer != producer) { // this condition is always true
detach();

ReactiveStreamsBackpressureStrategy strategy = producer.getEndpoint().getBackpressureStrategy();
Flux<Exchange> flux = Flux.create(camelSink::set, FluxSink.OverflowStrategy.IGNORE);

if (ObjectHelper.equal(strategy, ReactiveStreamsBackpressureStrategy.OLDEST)) {
// signal item emitted for non-dropped items only
flux = flux.onBackpressureDrop(this::onBackPressure).handle(this::onItemEmitted);
} else if (ObjectHelper.equal(strategy, ReactiveStreamsBackpressureStrategy.LATEST)) {
// Since there is no callback for dropped elements on backpressure "latest", item emission is signaled before dropping
// No exception is reported back to the exchanges
flux = flux.handle(this::onItemEmitted).onBackpressureLatest();
} else {
// Default strategy is BUFFER
flux = flux.onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, this::onBackPressure).handle(this::onItemEmitted);
if (this.camelProducer != null) {
throw new IllegalStateException("A producer is already attached to the stream '" + name + "'");
}

flux.subscribe(this.publisher);
if (this.camelProducer != producer) { // this condition is always true
detach();

ReactiveStreamsBackpressureStrategy strategy = producer.getEndpoint().getBackpressureStrategy();
Flux<Exchange> flux = Flux.create(camelSink::set, FluxSink.OverflowStrategy.IGNORE);

if (ObjectHelper.equal(strategy, ReactiveStreamsBackpressureStrategy.OLDEST)) {
// signal item emitted for non-dropped items only
flux = flux.onBackpressureDrop(this::onBackPressure).handle(this::onItemEmitted);
} else if (ObjectHelper.equal(strategy, ReactiveStreamsBackpressureStrategy.LATEST)) {
// Since there is no callback for dropped elements on backpressure "latest", item emission is signaled before dropping
// No exception is reported back to the exchanges
flux = flux.handle(this::onItemEmitted).onBackpressureLatest();
} else {
// Default strategy is BUFFER
flux = flux.onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, this::onBackPressure)
.handle(this::onItemEmitted);
}

flux.subscribe(this.publisher);

camelProducer = producer;
camelProducer = producer;
}
} finally {
lock.unlock();
}
}

synchronized void detach() {

this.camelProducer = null;
this.camelSink.set(null);
void detach() {
lock.lock();
try {
this.camelProducer = null;
this.camelSink.set(null);
} finally {
lock.unlock();
}
}

void send(Exchange exchange) {
Expand Down
Loading

0 comments on commit e43cc82

Please sign in to comment.