From e43cc82a1dfeb7101ea3bd4520f4345df5024ee6 Mon Sep 17 00:00:00 2001 From: Nicolas Filotto Date: Fri, 15 Nov 2024 08:59:17 +0100 Subject: [PATCH] CAMEL-20199: Remove synchronized blocks from components R to S --- .../ReactiveStreamsCamelSubscriber.java | 57 +++++-- .../streams/ReactiveStreamsComponent.java | 45 +++--- .../streams/engine/DelayedMonoPublisher.java | 32 +++- .../reactor/engine/ReactorCamelProcessor.java | 69 ++++---- .../component/rocketmq/RocketMQProducer.java | 5 +- .../rxjava/engine/RxJavaCamelProcessor.java | 69 ++++---- .../internal/SalesforceSession.java | 136 ++++++++-------- .../streaming/SubscriptionHelper.java | 86 +++++----- .../scheduler/SchedulerComponent.java | 66 ++++---- .../camel/component/seda/QueueReference.java | 55 +++++-- .../camel/component/seda/SedaComponent.java | 107 +++++++------ .../camel/component/seda/SedaEndpoint.java | 123 ++++++++------- .../component/servicenow/auth/OAuthToken.java | 102 ++++++------ .../camel/component/sjms/SjmsComponent.java | 19 ++- .../camel/component/sjms/SjmsProducer.java | 5 +- .../consumer/EndpointMessageListener.java | 17 +- .../SimpleMessageListenerContainer.java | 41 +++-- .../sjms/reply/MessageSelectorCreator.java | 52 +++--- .../sjms/reply/QueueReplyManager.java | 5 +- .../splunk/SplunkConnectionFactory.java | 148 +++++++++--------- .../component/splunk/SplunkEndpoint.java | 21 ++- .../splunk/support/SplunkDataWriter.java | 30 +++- .../splunk/support/SubmitDataWriter.java | 19 ++- .../springrabbit/EndpointMessageListener.java | 16 +- .../SpringSecurityAuthorizationPolicy.java | 24 ++- .../spring/ws/SpringWebserviceProducer.java | 12 +- .../camel/component/event/EventEndpoint.java | 22 ++- .../spi/TransactionErrorHandlerReifier.java | 43 ++--- .../stax/StAXJAXBIteratorExpression.java | 19 +-- .../component/stream/StreamConsumer.java | 62 +++++--- .../component/stream/StreamProducer.java | 6 +- 31 files changed, 900 insertions(+), 613 deletions(-) diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsCamelSubscriber.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsCamelSubscriber.java index ebf827f47ab18..444fb42ef675e 100644 --- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsCamelSubscriber.java +++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsCamelSubscriber.java @@ -18,6 +18,8 @@ import java.io.Closeable; import java.io.IOException; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.apache.camel.Exchange; import org.reactivestreams.Subscriber; @@ -37,6 +39,7 @@ public class ReactiveStreamsCamelSubscriber implements Subscriber, Clo */ private static final long UNBOUNDED_REQUESTS = Long.MAX_VALUE; + private final Lock lock = new ReentrantLock(); private final String name; private ReactiveStreamsConsumer consumer; @@ -52,22 +55,33 @@ public ReactiveStreamsCamelSubscriber(String name) { } public void attachConsumer(ReactiveStreamsConsumer consumer) { - synchronized (this) { + lock.lock(); + try { if (this.consumer != null) { throw new IllegalStateException("A consumer is already attached to the stream '" + name + "'"); } this.consumer = consumer; + } finally { + lock.unlock(); } refill(); } - public synchronized ReactiveStreamsConsumer getConsumer() { - return consumer; + public ReactiveStreamsConsumer getConsumer() { + lock.lock(); + try { + return consumer; + } finally { + lock.unlock(); + } } public void detachConsumer() { - synchronized (this) { + lock.lock(); + try { this.consumer = null; + } finally { + lock.unlock(); } } @@ -78,12 +92,15 @@ public void onSubscribe(Subscription subscription) { } boolean allowed = true; - synchronized (this) { + lock.lock(); + try { if (this.subscription != null) { allowed = false; } else { this.subscription = subscription; } + } finally { + lock.unlock(); } if (!allowed) { @@ -101,7 +118,8 @@ public void onNext(Exchange exchange) { } ReactiveStreamsConsumer target; - synchronized (this) { + lock.lock(); + try { if (requested < UNBOUNDED_REQUESTS) { // When there are UNBOUNDED_REQUESTS, they remain constant requested--; @@ -110,12 +128,17 @@ public void onNext(Exchange exchange) { if (target != null) { inflightCount++; } + } finally { + lock.unlock(); } if (target != null) { target.process(exchange, doneSync -> { - synchronized (this) { + lock.lock(); + try { inflightCount--; + } finally { + lock.unlock(); } refill(); @@ -129,7 +152,8 @@ public void onNext(Exchange exchange) { protected void refill() { Long toBeRequested = null; Subscription subs = null; - synchronized (this) { + lock.lock(); + try { if (consumer != null && this.subscription != null) { Integer consMax = consumer.getEndpoint().getMaxInflightExchanges(); long max = (consMax != null && consMax > 0) ? consMax.longValue() : UNBOUNDED_REQUESTS; @@ -144,6 +168,8 @@ protected void refill() { } } } + } finally { + lock.unlock(); } if (toBeRequested != null) { @@ -160,9 +186,12 @@ public void onError(Throwable throwable) { LOG.error("Error in reactive stream '{}'", name, throwable); ReactiveStreamsConsumer consumer; - synchronized (this) { + lock.lock(); + try { consumer = this.consumer; this.subscription = null; + } finally { + lock.unlock(); } if (consumer != null) { @@ -176,9 +205,12 @@ public void onComplete() { LOG.info("Reactive stream '{}' completed", name); ReactiveStreamsConsumer consumer; - synchronized (this) { + lock.lock(); + try { consumer = this.consumer; this.subscription = null; + } finally { + lock.unlock(); } if (consumer != null) { @@ -189,8 +221,11 @@ public void onComplete() { @Override public void close() throws IOException { Subscription subscription; - synchronized (this) { + lock.lock(); + try { subscription = this.subscription; + } finally { + lock.unlock(); } if (subscription != null) { diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsComponent.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsComponent.java index c4524e6028b83..640b9d5f16d93 100644 --- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsComponent.java +++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsComponent.java @@ -161,29 +161,34 @@ public void setThreadPoolMaxSize(int threadPoolMaxSize) { * * @return the reactive streams service */ - public synchronized CamelReactiveStreamsService getReactiveStreamsService() { - if (reactiveStreamsEngineConfiguration == null) { - reactiveStreamsEngineConfiguration = new ReactiveStreamsEngineConfiguration(); - reactiveStreamsEngineConfiguration.setThreadPoolMaxSize(threadPoolMaxSize); - reactiveStreamsEngineConfiguration.setThreadPoolMinSize(threadPoolMinSize); - reactiveStreamsEngineConfiguration.setThreadPoolName(threadPoolName); - } + public CamelReactiveStreamsService getReactiveStreamsService() { + lock.lock(); + try { + if (reactiveStreamsEngineConfiguration == null) { + reactiveStreamsEngineConfiguration = new ReactiveStreamsEngineConfiguration(); + reactiveStreamsEngineConfiguration.setThreadPoolMaxSize(threadPoolMaxSize); + reactiveStreamsEngineConfiguration.setThreadPoolMinSize(threadPoolMinSize); + reactiveStreamsEngineConfiguration.setThreadPoolName(threadPoolName); + } - if (service == null) { - this.service = ReactiveStreamsHelper.resolveReactiveStreamsService( - getCamelContext(), - this.serviceType, - this.reactiveStreamsEngineConfiguration); - - try { - // Start the service and add it to the Camel context to expose managed attributes - getCamelContext().addService(service, true, true); - } catch (Exception e) { - throw new RuntimeCamelException(e); + if (service == null) { + this.service = ReactiveStreamsHelper.resolveReactiveStreamsService( + getCamelContext(), + this.serviceType, + this.reactiveStreamsEngineConfiguration); + + try { + // Start the service and add it to the Camel context to expose managed attributes + getCamelContext().addService(service, true, true); + } catch (Exception e) { + throw new RuntimeCamelException(e); + } } - } - return service; + return service; + } finally { + lock.unlock(); + } } // **************************************** diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DelayedMonoPublisher.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DelayedMonoPublisher.java index dd8c4929c7d59..e9615048c2097 100644 --- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DelayedMonoPublisher.java +++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DelayedMonoPublisher.java @@ -22,6 +22,8 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; @@ -126,6 +128,7 @@ private final class MonoSubscription implements Subscription { private volatile boolean requested; private final Subscriber subscriber; + private final Lock lock = new ReentrantLock(); private MonoSubscription(Subscriber subscriber) { this.subscriber = subscriber; @@ -133,21 +136,30 @@ private MonoSubscription(Subscriber subscriber) { @Override public void request(long l) { - synchronized (this) { + lock.lock(); + try { if (terminated) { // just ignore the request return; } + } finally { + lock.unlock(); } if (l <= 0) { subscriber.onError(new IllegalArgumentException("3.9")); - synchronized (this) { + lock.lock(); + try { terminated = true; + } finally { + lock.unlock(); } } else { - synchronized (this) { + lock.lock(); + try { requested = true; + } finally { + lock.unlock(); } } @@ -155,12 +167,15 @@ public void request(long l) { } public void flush() { - synchronized (this) { + lock.lock(); + try { if (!isReady()) { return; } terminated = true; + } finally { + lock.unlock(); } if (data != null) { @@ -180,8 +195,13 @@ public boolean isReady() { } @Override - public synchronized void cancel() { - terminated = true; + public void cancel() { + lock.lock(); + try { + terminated = true; + } finally { + lock.unlock(); + } } } } diff --git a/components/camel-reactor/src/main/java/org/apache/camel/component/reactor/engine/ReactorCamelProcessor.java b/components/camel-reactor/src/main/java/org/apache/camel/component/reactor/engine/ReactorCamelProcessor.java index c33100a813d7a..47733ed920d8c 100644 --- a/components/camel-reactor/src/main/java/org/apache/camel/component/reactor/engine/ReactorCamelProcessor.java +++ b/components/camel-reactor/src/main/java/org/apache/camel/component/reactor/engine/ReactorCamelProcessor.java @@ -20,6 +20,8 @@ import java.io.IOException; import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.apache.camel.Exchange; import org.apache.camel.component.reactive.streams.ReactiveStreamsBackpressureStrategy; @@ -40,6 +42,7 @@ final class ReactorCamelProcessor implements Closeable { private final AtomicReference> camelSink; private final ReactorStreamsService service; + private final Lock lock = new ReentrantLock(); private ReactiveStreamsProducer camelProducer; ReactorCamelProcessor(ReactorStreamsService service, String name) { @@ -64,41 +67,51 @@ Publisher getPublisher() { return publisher; } - synchronized void attach(ReactiveStreamsProducer producer) { - Objects.requireNonNull(producer, "producer cannot be null, use the detach method"); + void attach(ReactiveStreamsProducer producer) { + lock.lock(); + try { + Objects.requireNonNull(producer, "producer cannot be null, use the detach method"); - if (this.camelProducer != null) { - throw new IllegalStateException("A producer is already attached to the stream '" + name + "'"); - } - - if (this.camelProducer != producer) { // this condition is always true - detach(); - - ReactiveStreamsBackpressureStrategy strategy = producer.getEndpoint().getBackpressureStrategy(); - Flux flux = Flux.create(camelSink::set, FluxSink.OverflowStrategy.IGNORE); - - if (ObjectHelper.equal(strategy, ReactiveStreamsBackpressureStrategy.OLDEST)) { - // signal item emitted for non-dropped items only - flux = flux.onBackpressureDrop(this::onBackPressure).handle(this::onItemEmitted); - } else if (ObjectHelper.equal(strategy, ReactiveStreamsBackpressureStrategy.LATEST)) { - // Since there is no callback for dropped elements on backpressure "latest", item emission is signaled before dropping - // No exception is reported back to the exchanges - flux = flux.handle(this::onItemEmitted).onBackpressureLatest(); - } else { - // Default strategy is BUFFER - flux = flux.onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, this::onBackPressure).handle(this::onItemEmitted); + if (this.camelProducer != null) { + throw new IllegalStateException("A producer is already attached to the stream '" + name + "'"); } - flux.subscribe(this.publisher); + if (this.camelProducer != producer) { // this condition is always true + detach(); + + ReactiveStreamsBackpressureStrategy strategy = producer.getEndpoint().getBackpressureStrategy(); + Flux flux = Flux.create(camelSink::set, FluxSink.OverflowStrategy.IGNORE); + + if (ObjectHelper.equal(strategy, ReactiveStreamsBackpressureStrategy.OLDEST)) { + // signal item emitted for non-dropped items only + flux = flux.onBackpressureDrop(this::onBackPressure).handle(this::onItemEmitted); + } else if (ObjectHelper.equal(strategy, ReactiveStreamsBackpressureStrategy.LATEST)) { + // Since there is no callback for dropped elements on backpressure "latest", item emission is signaled before dropping + // No exception is reported back to the exchanges + flux = flux.handle(this::onItemEmitted).onBackpressureLatest(); + } else { + // Default strategy is BUFFER + flux = flux.onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, this::onBackPressure) + .handle(this::onItemEmitted); + } + + flux.subscribe(this.publisher); - camelProducer = producer; + camelProducer = producer; + } + } finally { + lock.unlock(); } } - synchronized void detach() { - - this.camelProducer = null; - this.camelSink.set(null); + void detach() { + lock.lock(); + try { + this.camelProducer = null; + this.camelSink.set(null); + } finally { + lock.unlock(); + } } void send(Exchange exchange) { diff --git a/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQProducer.java b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQProducer.java index a07006f769580..defd5ea0ebdef 100644 --- a/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQProducer.java +++ b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQProducer.java @@ -136,7 +136,8 @@ public void onException(Throwable e) { protected void initReplyManager() { if (!started.get()) { - synchronized (this) { + lock.lock(); + try { if (started.get()) { return; } @@ -160,6 +161,8 @@ protected void initReplyManager() { } } started.set(true); + } finally { + lock.unlock(); } } } diff --git a/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaCamelProcessor.java b/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaCamelProcessor.java index 42a29c022c4ca..ef6135968b17e 100644 --- a/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaCamelProcessor.java +++ b/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaCamelProcessor.java @@ -20,6 +20,8 @@ import java.io.IOException; import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import io.reactivex.BackpressureStrategy; import io.reactivex.Flowable; @@ -39,6 +41,7 @@ final class RxJavaCamelProcessor implements Closeable { private final RxJavaStreamsService service; private final AtomicReference> camelEmitter; private final FlowableProcessor publisher; + private final Lock lock = new ReentrantLock(); private ReactiveStreamsProducer camelProducer; RxJavaCamelProcessor(RxJavaStreamsService service, String name) { @@ -58,40 +61,50 @@ Publisher getPublisher() { return publisher; } - synchronized void attach(ReactiveStreamsProducer producer) { - Objects.requireNonNull(producer, "producer cannot be null, use the detach method"); + void attach(ReactiveStreamsProducer producer) { + lock.lock(); + try { + Objects.requireNonNull(producer, "producer cannot be null, use the detach method"); - if (this.camelProducer != null) { - throw new IllegalStateException("A producer is already attached to the stream '" + name + "'"); - } - - if (this.camelProducer != producer) { - detach(); - - ReactiveStreamsBackpressureStrategy strategy = producer.getEndpoint().getBackpressureStrategy(); - Flowable flow = Flowable.create(camelEmitter::set, BackpressureStrategy.MISSING); - - if (ObjectHelper.equal(strategy, ReactiveStreamsBackpressureStrategy.OLDEST)) { - flow.onBackpressureDrop(this::onBackPressure) - .doAfterNext(this::onItemEmitted) - .subscribe(this.publisher); - } else if (ObjectHelper.equal(strategy, ReactiveStreamsBackpressureStrategy.LATEST)) { - flow.doAfterNext(this::onItemEmitted) - .onBackpressureLatest() - .subscribe(this.publisher); - } else { - flow.doAfterNext(this::onItemEmitted) - .onBackpressureBuffer() - .subscribe(this.publisher); + if (this.camelProducer != null) { + throw new IllegalStateException("A producer is already attached to the stream '" + name + "'"); } - camelProducer = producer; + if (this.camelProducer != producer) { + detach(); + + ReactiveStreamsBackpressureStrategy strategy = producer.getEndpoint().getBackpressureStrategy(); + Flowable flow = Flowable.create(camelEmitter::set, BackpressureStrategy.MISSING); + + if (ObjectHelper.equal(strategy, ReactiveStreamsBackpressureStrategy.OLDEST)) { + flow.onBackpressureDrop(this::onBackPressure) + .doAfterNext(this::onItemEmitted) + .subscribe(this.publisher); + } else if (ObjectHelper.equal(strategy, ReactiveStreamsBackpressureStrategy.LATEST)) { + flow.doAfterNext(this::onItemEmitted) + .onBackpressureLatest() + .subscribe(this.publisher); + } else { + flow.doAfterNext(this::onItemEmitted) + .onBackpressureBuffer() + .subscribe(this.publisher); + } + + camelProducer = producer; + } + } finally { + lock.unlock(); } } - synchronized void detach() { - this.camelProducer = null; - this.camelEmitter.set(null); + void detach() { + lock.lock(); + try { + this.camelProducer = null; + this.camelEmitter.set(null); + } finally { + lock.unlock(); + } } void send(Exchange exchange) { diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/SalesforceSession.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/SalesforceSession.java index 60262a5b9c54f..12c7c9dddafe1 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/SalesforceSession.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/SalesforceSession.java @@ -157,40 +157,44 @@ public void attemptLoginUntilSuccessful(long backoffIncrement, long maxBackoff) } } - public synchronized String login(String oldToken) throws SalesforceException { - - // check if we need a new session - // this way there's always a single valid session - if (accessToken == null || accessToken.equals(oldToken)) { + public String login(String oldToken) throws SalesforceException { + lock.lock(); + try { + // check if we need a new session + // this way there's always a single valid session + if (accessToken == null || accessToken.equals(oldToken)) { - // try revoking the old access token before creating a new one - accessToken = oldToken; - if (accessToken != null) { - try { - logout(); - } catch (SalesforceException e) { - LOG.warn("Error revoking old access token: {}", e.getMessage(), e); + // try revoking the old access token before creating a new one + accessToken = oldToken; + if (accessToken != null) { + try { + logout(); + } catch (SalesforceException e) { + LOG.warn("Error revoking old access token: {}", e.getMessage(), e); + } + accessToken = null; } - accessToken = null; - } - // login to Salesforce and get session id - final Request loginPost = getLoginRequest(null); - try { + // login to Salesforce and get session id + final Request loginPost = getLoginRequest(null); + try { - final ContentResponse loginResponse = loginPost.send(); - parseLoginResponse(loginResponse, loginResponse.getContentAsString()); + final ContentResponse loginResponse = loginPost.send(); + parseLoginResponse(loginResponse, loginResponse.getContentAsString()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new SalesforceException("Login error: interrupted", e); - } catch (TimeoutException e) { - throw new SalesforceException("Login request timeout: " + e.getMessage(), e); - } catch (ExecutionException e) { - throw new SalesforceException("Unexpected login error: " + e.getCause().getMessage(), e.getCause()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SalesforceException("Login error: interrupted", e); + } catch (TimeoutException e) { + throw new SalesforceException("Login request timeout: " + e.getMessage(), e); + } catch (ExecutionException e) { + throw new SalesforceException("Unexpected login error: " + e.getCause().getMessage(), e.getCause()); + } } + return accessToken; + } finally { + lock.unlock(); } - return accessToken; } /** @@ -301,11 +305,11 @@ String generateJwtAssertion() { * Parses login response, allows SalesforceSecurityHandler to parse a login request for a failed authentication * conversation. */ - public synchronized void parseLoginResponse(ContentResponse loginResponse, String responseContent) + public void parseLoginResponse(ContentResponse loginResponse, String responseContent) throws SalesforceException { - final int responseStatus = loginResponse.getStatus(); - + lock.lock(); try { + final int responseStatus = loginResponse.getStatus(); switch (responseStatus) { case HttpStatus.OK_200: // parse the response to get token @@ -352,47 +356,55 @@ public synchronized void parseLoginResponse(ContentResponse loginResponse, Strin } catch (IOException e) { String msg = "Login error: response parse exception " + e.getMessage(); throw new SalesforceException(msg, e); + } finally { + lock.unlock(); } } - public synchronized void logout() throws SalesforceException { - if (accessToken == null) { - return; - } - + public void logout() throws SalesforceException { + lock.lock(); try { - String logoutUrl = (instanceUrl == null ? config.getLoginUrl() : instanceUrl) + OAUTH2_REVOKE_PATH + accessToken; - final Request logoutGet = httpClient.newRequest(logoutUrl).timeout(timeout, TimeUnit.MILLISECONDS); - final ContentResponse logoutResponse = logoutGet.send(); + if (accessToken == null) { + return; + } - final int statusCode = logoutResponse.getStatus(); + try { + String logoutUrl + = (instanceUrl == null ? config.getLoginUrl() : instanceUrl) + OAUTH2_REVOKE_PATH + accessToken; + final Request logoutGet = httpClient.newRequest(logoutUrl).timeout(timeout, TimeUnit.MILLISECONDS); + final ContentResponse logoutResponse = logoutGet.send(); - if (statusCode == HttpStatus.OK_200) { - LOG.debug("Logout successful"); - } else { - LOG.debug("Failed to revoke OAuth token. This is expected if the token is invalid or already expired"); - } + final int statusCode = logoutResponse.getStatus(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new SalesforceException("Interrupted while logging out", e); - } catch (ExecutionException e) { - final Throwable ex = e.getCause(); - throw new SalesforceException("Unexpected logout exception: " + ex.getMessage(), ex); - } catch (TimeoutException e) { - throw new SalesforceException("Logout request TIMEOUT!", e); - } finally { - // reset session - accessToken = null; - instanceUrl = null; - // notify all session listeners about logout - for (SalesforceSessionListener listener : listeners) { - try { - listener.onLogout(); - } catch (Exception t) { - LOG.warn("Unexpected error from listener {}: {}", listener, t.getMessage()); + if (statusCode == HttpStatus.OK_200) { + LOG.debug("Logout successful"); + } else { + LOG.debug("Failed to revoke OAuth token. This is expected if the token is invalid or already expired"); + } + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SalesforceException("Interrupted while logging out", e); + } catch (ExecutionException e) { + final Throwable ex = e.getCause(); + throw new SalesforceException("Unexpected logout exception: " + ex.getMessage(), ex); + } catch (TimeoutException e) { + throw new SalesforceException("Logout request TIMEOUT!", e); + } finally { + // reset session + accessToken = null; + instanceUrl = null; + // notify all session listeners about logout + for (SalesforceSessionListener listener : listeners) { + try { + listener.onLogout(); + } catch (Exception t) { + LOG.warn("Unexpected error from listener {}: {}", listener, t.getMessage()); + } } } + } finally { + lock.unlock(); } } diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java index 0ffda9096feec..894fcb16820e2 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java @@ -413,25 +413,30 @@ protected HttpCookieStore getHttpCookieStore() { return client; } - public synchronized void subscribe(StreamingApiConsumer consumer) { - // create subscription for consumer - final String channelName = getChannelName(consumer.getTopicName()); - channelToConsumers.computeIfAbsent(channelName, key -> ConcurrentHashMap.newKeySet()).add(consumer); - channelsToSubscribe.add(channelName); - - setReplayIdIfAbsent(consumer.getEndpoint()); - - // channel message listener - LOG.info("Subscribing to channel {}...", channelName); - var messageListener = consumerToListener.computeIfAbsent(consumer, key -> (channel, message) -> { - LOG.debug("Received Message: {}", message); - // convert CometD message to Camel Message - consumer.processMessage(channel, message); - }); - - // subscribe asynchronously - final ClientSessionChannel clientChannel = client.getChannel(channelName); - clientChannel.subscribe(messageListener); + public void subscribe(StreamingApiConsumer consumer) { + lock.lock(); + try { + // create subscription for consumer + final String channelName = getChannelName(consumer.getTopicName()); + channelToConsumers.computeIfAbsent(channelName, key -> ConcurrentHashMap.newKeySet()).add(consumer); + channelsToSubscribe.add(channelName); + + setReplayIdIfAbsent(consumer.getEndpoint()); + + // channel message listener + LOG.info("Subscribing to channel {}...", channelName); + var messageListener = consumerToListener.computeIfAbsent(consumer, key -> (channel, message) -> { + LOG.debug("Received Message: {}", message); + // convert CometD message to Camel Message + consumer.processMessage(channel, message); + }); + + // subscribe asynchronously + final ClientSessionChannel clientChannel = client.getChannel(channelName); + clientChannel.subscribe(messageListener); + } finally { + lock.unlock(); + } } private static boolean isTemporaryError(Message message) { @@ -506,26 +511,31 @@ static String getChannelName(final String topicName) { return channelName.toString(); } - public synchronized void unsubscribe(StreamingApiConsumer consumer) { - // channel name - final String channelName = getChannelName(consumer.getTopicName()); - - // unsubscribe from channel - var consumers = channelToConsumers.get(channelName); - if (consumers != null) { - consumers.remove(consumer); - if (consumers.isEmpty()) { - channelToConsumers.remove(channelName); + public void unsubscribe(StreamingApiConsumer consumer) { + lock.lock(); + try { + // channel name + final String channelName = getChannelName(consumer.getTopicName()); + + // unsubscribe from channel + var consumers = channelToConsumers.get(channelName); + if (consumers != null) { + consumers.remove(consumer); + if (consumers.isEmpty()) { + channelToConsumers.remove(channelName); + } } - } - final ClientSessionChannel.MessageListener listener = consumerToListener.remove(consumer); - if (listener != null) { - LOG.debug("Unsubscribing from channel {}...", channelName); - final ClientSessionChannel clientChannel = client.getChannel(channelName); - // if there are other listeners on this channel, an unsubscribe message will not be sent, - // so we're not going to listen for and expect an unsub response. Just unsub and move on. - clientChannel.unsubscribe(listener); - clientChannel.release(); + final ClientSessionChannel.MessageListener listener = consumerToListener.remove(consumer); + if (listener != null) { + LOG.debug("Unsubscribing from channel {}...", channelName); + final ClientSessionChannel clientChannel = client.getChannel(channelName); + // if there are other listeners on this channel, an unsubscribe message will not be sent, + // so we're not going to listen for and expect an unsub response. Just unsub and move on. + clientChannel.unsubscribe(listener); + clientChannel.release(); + } + } finally { + lock.unlock(); } } diff --git a/components/camel-scheduler/src/main/java/org/apache/camel/component/scheduler/SchedulerComponent.java b/components/camel-scheduler/src/main/java/org/apache/camel/component/scheduler/SchedulerComponent.java index 4c23a7e6bb490..52cdbe76311c5 100644 --- a/components/camel-scheduler/src/main/java/org/apache/camel/component/scheduler/SchedulerComponent.java +++ b/components/camel-scheduler/src/main/java/org/apache/camel/component/scheduler/SchedulerComponent.java @@ -17,8 +17,8 @@ package org.apache.camel.component.scheduler; import java.util.Collection; -import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicInteger; @@ -29,8 +29,7 @@ @org.apache.camel.spi.annotations.Component("scheduler") public class SchedulerComponent extends HealthCheckComponent { - private final Map executors = new HashMap<>(); - private final Map refCounts = new HashMap<>(); + private final Map executors = new ConcurrentHashMap<>(); @Metadata private boolean includeMetadata; @@ -75,53 +74,46 @@ public void setPoolSize(int poolSize) { protected ScheduledExecutorService addConsumer(SchedulerConsumer consumer) { String name = consumer.getEndpoint().getName(); - int poolSize = consumer.getEndpoint().getPoolSize(); - - ScheduledExecutorService answer; - synchronized (executors) { - answer = executors.get(name); - if (answer == null) { - answer = getCamelContext().getExecutorServiceManager().newScheduledThreadPool(this, "scheduler://" + name, - poolSize); - executors.put(name, answer); - // store new reference counter - refCounts.put(name, new AtomicInteger(1)); - } else { - // increase reference counter - AtomicInteger counter = refCounts.get(name); - if (counter != null) { - counter.incrementAndGet(); - } + return executors.compute(name, (k, v) -> { + if (v == null) { + int poolSize = consumer.getEndpoint().getPoolSize(); + return new ScheduledExecutorServiceHolder( + getCamelContext().getExecutorServiceManager().newScheduledThreadPool(this, "scheduler://" + name, + poolSize)); } - } - return answer; + v.refCount.incrementAndGet(); + return v; + }).executorService; } protected void removeConsumer(SchedulerConsumer consumer) { String name = consumer.getEndpoint().getName(); - synchronized (executors) { - // decrease reference counter - AtomicInteger counter = refCounts.get(name); - if (counter != null && counter.decrementAndGet() <= 0) { - refCounts.remove(name); - // remove scheduler as its no longer in use - ScheduledExecutorService scheduler = executors.remove(name); - if (scheduler != null) { - getCamelContext().getExecutorServiceManager().shutdown(scheduler); - } + executors.computeIfPresent(name, (k, v) -> { + if (v.refCount.decrementAndGet() == 0) { + getCamelContext().getExecutorServiceManager().shutdown(v.executorService); + return null; } - } + return v; + }); } @Override protected void doStop() throws Exception { - Collection collection = executors.values(); - for (ScheduledExecutorService scheduler : collection) { - getCamelContext().getExecutorServiceManager().shutdown(scheduler); + Collection collection = executors.values(); + for (ScheduledExecutorServiceHolder holder : collection) { + getCamelContext().getExecutorServiceManager().shutdown(holder.executorService); } executors.clear(); - refCounts.clear(); } + private static class ScheduledExecutorServiceHolder { + private final ScheduledExecutorService executorService; + private final AtomicInteger refCount; + + ScheduledExecutorServiceHolder(ScheduledExecutorService executorService) { + this.executorService = executorService; + this.refCount = new AtomicInteger(1); + } + } } diff --git a/components/camel-seda/src/main/java/org/apache/camel/component/seda/QueueReference.java b/components/camel-seda/src/main/java/org/apache/camel/component/seda/QueueReference.java index 61dd592225941..4efa264ba3400 100644 --- a/components/camel-seda/src/main/java/org/apache/camel/component/seda/QueueReference.java +++ b/components/camel-seda/src/main/java/org/apache/camel/component/seda/QueueReference.java @@ -19,6 +19,8 @@ import java.util.LinkedList; import java.util.List; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.apache.camel.Exchange; @@ -34,7 +36,8 @@ public final class QueueReference { private Integer size; private Boolean multipleConsumers; - private List endpoints = new LinkedList<>(); + private final Lock lock = new ReentrantLock(); + private final List endpoints = new LinkedList<>(); QueueReference(BlockingQueue queue, Integer size, Boolean multipleConsumers) { this.queue = queue; @@ -42,27 +45,40 @@ public final class QueueReference { this.multipleConsumers = multipleConsumers; } - synchronized void addReference(SedaEndpoint endpoint) { - if (!endpoints.contains(endpoint)) { - endpoints.add(endpoint); - // update the multipleConsumers setting if need - if (endpoint.isMultipleConsumers()) { - multipleConsumers = true; + void addReference(SedaEndpoint endpoint) { + lock.lock(); + try { + if (!endpoints.contains(endpoint)) { + endpoints.add(endpoint); + // update the multipleConsumers setting if need + if (endpoint.isMultipleConsumers()) { + multipleConsumers = true; + } } + } finally { + lock.unlock(); } } - synchronized void removeReference(SedaEndpoint endpoint) { - if (endpoints.contains(endpoint)) { + void removeReference(SedaEndpoint endpoint) { + lock.lock(); + try { endpoints.remove(endpoint); + } finally { + lock.unlock(); } } /** * Gets the reference counter */ - public synchronized int getCount() { - return endpoints.size(); + public int getCount() { + lock.lock(); + try { + return endpoints.size(); + } finally { + lock.unlock(); + } } /** @@ -85,13 +101,18 @@ public BlockingQueue getQueue() { return queue; } - public synchronized boolean hasConsumers() { - for (SedaEndpoint endpoint : endpoints) { - if (!endpoint.getConsumers().isEmpty()) { - return true; + public boolean hasConsumers() { + lock.lock(); + try { + for (SedaEndpoint endpoint : endpoints) { + if (!endpoint.getConsumers().isEmpty()) { + return true; + } } - } - return false; + return false; + } finally { + lock.unlock(); + } } } diff --git a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaComponent.java b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaComponent.java index cdf46a989efe2..05dfb0bb2f0c5 100644 --- a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaComponent.java +++ b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaComponent.java @@ -144,69 +144,78 @@ public void setDefaultPollTimeout(int defaultPollTimeout) { this.defaultPollTimeout = defaultPollTimeout; } - public synchronized QueueReference getOrCreateQueue( + public QueueReference getOrCreateQueue( SedaEndpoint endpoint, Integer size, Boolean multipleConsumers, BlockingQueueFactory customQueueFactory) { + lock.lock(); + try { + String key = getQueueKey(endpoint.getEndpointUri()); - String key = getQueueKey(endpoint.getEndpointUri()); - - if (size == null) { - // there may be a custom size during startup - size = customSize.get(key); - } - - QueueReference ref = getQueues().get(key); - if (ref != null) { - // if the given size is not provided, we just use the existing queue as is - if (size != null && !size.equals(ref.getSize())) { - // there is already a queue, so make sure the size matches - throw new IllegalArgumentException( - "Cannot use existing queue " + key + " as the existing queue size " - + (ref.getSize() != null ? ref.getSize() : SedaConstants.QUEUE_SIZE) - + " does not match given queue size " + size); + if (size == null) { + // there may be a custom size during startup + size = customSize.get(key); } - // add the reference before returning queue - ref.addReference(endpoint); - if (log.isDebugEnabled()) { - log.debug("Reusing existing queue {} with size {} and reference count {}", key, size, ref.getCount()); + QueueReference ref = getQueues().get(key); + if (ref != null) { + // if the given size is not provided, we just use the existing queue as is + if (size != null && !size.equals(ref.getSize())) { + // there is already a queue, so make sure the size matches + throw new IllegalArgumentException( + "Cannot use existing queue " + key + " as the existing queue size " + + (ref.getSize() != null ? ref.getSize() : SedaConstants.QUEUE_SIZE) + + " does not match given queue size " + size); + } + // add the reference before returning queue + ref.addReference(endpoint); + + if (log.isDebugEnabled()) { + log.debug("Reusing existing queue {} with size {} and reference count {}", key, size, ref.getCount()); + } + return ref; } - return ref; - } - // create queue - BlockingQueue queue; - BlockingQueueFactory queueFactory = customQueueFactory == null ? defaultQueueFactory : customQueueFactory; - if (size != null && size > 0) { - queue = queueFactory.create(size); - } else { - if (getQueueSize() > 0) { - size = getQueueSize(); - queue = queueFactory.create(getQueueSize()); + // create queue + BlockingQueue queue; + BlockingQueueFactory queueFactory = customQueueFactory == null ? defaultQueueFactory : customQueueFactory; + if (size != null && size > 0) { + queue = queueFactory.create(size); } else { - queue = queueFactory.create(); + if (getQueueSize() > 0) { + size = getQueueSize(); + queue = queueFactory.create(getQueueSize()); + } else { + queue = queueFactory.create(); + } } - } - log.debug("Created queue {} with size {}", key, size); + log.debug("Created queue {} with size {}", key, size); - // create and add a new reference queue - ref = new QueueReference(queue, size, multipleConsumers); - ref.addReference(endpoint); - getQueues().put(key, ref); + // create and add a new reference queue + ref = new QueueReference(queue, size, multipleConsumers); + ref.addReference(endpoint); + getQueues().put(key, ref); - return ref; + return ref; + } finally { + lock.unlock(); + } } - public synchronized QueueReference registerQueue(SedaEndpoint endpoint, BlockingQueue queue) { - String key = getQueueKey(endpoint.getEndpointUri()); + public QueueReference registerQueue(SedaEndpoint endpoint, BlockingQueue queue) { + lock.lock(); + try { + String key = getQueueKey(endpoint.getEndpointUri()); - QueueReference ref = getQueues().get(key); - if (ref == null) { - ref = new QueueReference(queue, endpoint.getSize(), endpoint.isMultipleConsumers()); - ref.addReference(endpoint); - getQueues().put(key, ref); - } + QueueReference ref = getQueues().get(key); + if (ref == null) { + ref = new QueueReference(queue, endpoint.getSize(), endpoint.isMultipleConsumers()); + ref.addReference(endpoint); + getQueues().put(key, ref); + } - return ref; + return ref; + } finally { + lock.unlock(); + } } public Map getQueues() { diff --git a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java index d03d320e6be73..8c4710edc0400 100644 --- a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java +++ b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java @@ -183,30 +183,35 @@ public PollingConsumer createPollingConsumer() throws Exception { return answer; } - public synchronized BlockingQueue getQueue() { - if (queue == null) { - // prefer to lookup queue from component, so if this endpoint is re-created or re-started - // then the existing queue from the component can be used, so new producers and consumers - // can use the already existing queue referenced from the component - if (getComponent() != null) { - // use null to indicate default size (= use what the existing queue has been configured with) - Integer size = (getSize() == Integer.MAX_VALUE || getSize() == SedaConstants.QUEUE_SIZE) ? null : getSize(); - QueueReference ref = getComponent().getOrCreateQueue(this, size, isMultipleConsumers(), queueFactory); - queue = ref.getQueue(); - String key = getComponent().getQueueKey(getEndpointUri()); - LOG.debug("Endpoint {} is using shared queue: {} with size: {}", this, key, - ref.getSize() != null ? ref.getSize() : Integer.MAX_VALUE); - // and set the size we are using - if (ref.getSize() != null) { - setSize(ref.getSize()); + public BlockingQueue getQueue() { + lock.lock(); + try { + if (queue == null) { + // prefer to lookup queue from component, so if this endpoint is re-created or re-started + // then the existing queue from the component can be used, so new producers and consumers + // can use the already existing queue referenced from the component + if (getComponent() != null) { + // use null to indicate default size (= use what the existing queue has been configured with) + Integer size = (getSize() == Integer.MAX_VALUE || getSize() == SedaConstants.QUEUE_SIZE) ? null : getSize(); + QueueReference ref = getComponent().getOrCreateQueue(this, size, isMultipleConsumers(), queueFactory); + queue = ref.getQueue(); + String key = getComponent().getQueueKey(getEndpointUri()); + LOG.debug("Endpoint {} is using shared queue: {} with size: {}", this, key, + ref.getSize() != null ? ref.getSize() : Integer.MAX_VALUE); + // and set the size we are using + if (ref.getSize() != null) { + setSize(ref.getSize()); + } + } else { + // fallback and create queue (as this endpoint has no component) + queue = createQueue(); + LOG.debug("Endpoint {} is using queue: {} with size: {}", this, getEndpointUri(), getSize()); } - } else { - // fallback and create queue (as this endpoint has no component) - queue = createQueue(); - LOG.debug("Endpoint {} is using queue: {} with size: {}", this, getEndpointUri(), getSize()); } + return queue; + } finally { + lock.unlock(); } - return queue; } protected BlockingQueue createQueue() { @@ -240,45 +245,55 @@ private QueueReference tryQueueRefInit() { return null; } - protected synchronized AsyncProcessor getConsumerMulticastProcessor() { - if (!multicastStarted && consumerMulticastProcessor != null) { - // only start it on-demand to avoid starting it during stopping - ServiceHelper.startService(consumerMulticastProcessor); - multicastStarted = true; + protected AsyncProcessor getConsumerMulticastProcessor() { + lock.lock(); + try { + if (!multicastStarted && consumerMulticastProcessor != null) { + // only start it on-demand to avoid starting it during stopping + ServiceHelper.startService(consumerMulticastProcessor); + multicastStarted = true; + } + return consumerMulticastProcessor; + } finally { + lock.unlock(); } - return consumerMulticastProcessor; } - protected synchronized void updateMulticastProcessor() throws Exception { - // only needed if we support multiple consumers - if (!isMultipleConsumersSupported()) { - return; - } - - // stop old before we create a new - if (consumerMulticastProcessor != null) { - ServiceHelper.stopService(consumerMulticastProcessor); - consumerMulticastProcessor = null; - } - - int size = getConsumers().size(); - if (size >= 1) { - if (multicastExecutor == null) { - // create multicast executor as we need it when we have more than 1 processor - multicastExecutor = getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, - URISupport.sanitizeUri(getEndpointUri()) + "(multicast)"); + protected void updateMulticastProcessor() throws Exception { + lock.lock(); + try { + // only needed if we support multiple consumers + if (!isMultipleConsumersSupported()) { + return; } - // create list of consumers to multicast to - List processors = new ArrayList<>(size); - for (SedaConsumer consumer : getConsumers()) { - processors.add(consumer.getProcessor()); + + // stop old before we create a new + if (consumerMulticastProcessor != null) { + ServiceHelper.stopService(consumerMulticastProcessor); + consumerMulticastProcessor = null; } - // create multicast processor - multicastStarted = false; - consumerMulticastProcessor = (AsyncProcessor) PluginHelper.getProcessorFactory(getCamelContext()) - .createProcessor(getCamelContext(), "MulticastProcessor", - new Object[] { processors, multicastExecutor, false }); + int size = getConsumers().size(); + if (size >= 1) { + if (multicastExecutor == null) { + // create multicast executor as we need it when we have more than 1 processor + multicastExecutor = getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, + URISupport.sanitizeUri(getEndpointUri()) + "(multicast)"); + } + // create list of consumers to multicast to + List processors = new ArrayList<>(size); + for (SedaConsumer consumer : getConsumers()) { + processors.add(consumer.getProcessor()); + } + // create multicast processor + multicastStarted = false; + + consumerMulticastProcessor = (AsyncProcessor) PluginHelper.getProcessorFactory(getCamelContext()) + .createProcessor(getCamelContext(), "MulticastProcessor", + new Object[] { processors, multicastExecutor, false }); + } + } finally { + lock.unlock(); } } diff --git a/components/camel-servicenow/camel-servicenow-component/src/main/java/org/apache/camel/component/servicenow/auth/OAuthToken.java b/components/camel-servicenow/camel-servicenow-component/src/main/java/org/apache/camel/component/servicenow/auth/OAuthToken.java index f317cd47015fb..74959b9815d36 100644 --- a/components/camel-servicenow/camel-servicenow-component/src/main/java/org/apache/camel/component/servicenow/auth/OAuthToken.java +++ b/components/camel-servicenow/camel-servicenow-component/src/main/java/org/apache/camel/component/servicenow/auth/OAuthToken.java @@ -17,6 +17,8 @@ package org.apache.camel.component.servicenow.auth; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.apache.camel.component.servicenow.ServiceNowConfiguration; import org.apache.cxf.jaxrs.client.WebClient; @@ -30,6 +32,7 @@ public class OAuthToken { private static final Logger LOGGER = LoggerFactory.getLogger(OAuthToken.class); + private final Lock lock = new ReentrantLock(); private final ServiceNowConfiguration configuration; private ClientAccessToken token; private String authString; @@ -42,54 +45,59 @@ public OAuthToken(ServiceNowConfiguration configuration) { this.expireAt = 0; } - private synchronized void getOrRefreshAccessToken() { - if (token == null) { - LOGGER.debug("Generate OAuth token"); - - token = OAuthClientUtils.getAccessToken( - WebClient.create(configuration.getOauthTokenUrl()), - new Consumer( - configuration.getOauthClientId(), - configuration.getOauthClientSecret()), - new ResourceOwnerGrant( - configuration.getUserName(), - configuration.getPassword()), - true); - - LOGGER.debug("OAuth token expires in {}s", token.getExpiresIn()); - - // Set expiration time related info in milliseconds - token.setIssuedAt(System.currentTimeMillis()); - token.setExpiresIn(TimeUnit.MILLISECONDS.convert(token.getExpiresIn(), TimeUnit.SECONDS)); - - authString = token.toString(); - - if (token.getExpiresIn() > 0) { - expireAt = token.getIssuedAt() + token.getExpiresIn(); - } - } else if (expireAt > 0 && System.currentTimeMillis() >= expireAt) { - LOGGER.debug("OAuth token is expired, refresh it"); - - token = OAuthClientUtils.refreshAccessToken( - WebClient.create(configuration.getOauthTokenUrl()), - new Consumer( - configuration.getOauthClientId(), - configuration.getOauthClientSecret()), - token, - null, - false); - - LOGGER.debug("Refreshed OAuth token expires in {}s", token.getExpiresIn()); - - // Set expiration time related info in milliseconds - token.setIssuedAt(System.currentTimeMillis()); - token.setExpiresIn(TimeUnit.MILLISECONDS.convert(token.getExpiresIn(), TimeUnit.SECONDS)); - - authString = token.toString(); - - if (token.getExpiresIn() > 0) { - expireAt = token.getIssuedAt() + token.getExpiresIn(); + private void getOrRefreshAccessToken() { + lock.lock(); + try { + if (token == null) { + LOGGER.debug("Generate OAuth token"); + + token = OAuthClientUtils.getAccessToken( + WebClient.create(configuration.getOauthTokenUrl()), + new Consumer( + configuration.getOauthClientId(), + configuration.getOauthClientSecret()), + new ResourceOwnerGrant( + configuration.getUserName(), + configuration.getPassword()), + true); + + LOGGER.debug("OAuth token expires in {}s", token.getExpiresIn()); + + // Set expiration time related info in milliseconds + token.setIssuedAt(System.currentTimeMillis()); + token.setExpiresIn(TimeUnit.MILLISECONDS.convert(token.getExpiresIn(), TimeUnit.SECONDS)); + + authString = token.toString(); + + if (token.getExpiresIn() > 0) { + expireAt = token.getIssuedAt() + token.getExpiresIn(); + } + } else if (expireAt > 0 && System.currentTimeMillis() >= expireAt) { + LOGGER.debug("OAuth token is expired, refresh it"); + + token = OAuthClientUtils.refreshAccessToken( + WebClient.create(configuration.getOauthTokenUrl()), + new Consumer( + configuration.getOauthClientId(), + configuration.getOauthClientSecret()), + token, + null, + false); + + LOGGER.debug("Refreshed OAuth token expires in {}s", token.getExpiresIn()); + + // Set expiration time related info in milliseconds + token.setIssuedAt(System.currentTimeMillis()); + token.setExpiresIn(TimeUnit.MILLISECONDS.convert(token.getExpiresIn(), TimeUnit.SECONDS)); + + authString = token.toString(); + + if (token.getExpiresIn() > 0) { + expireAt = token.getIssuedAt() + token.getExpiresIn(); + } } + } finally { + lock.unlock(); } } diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java index 585482ce5448f..ea285912b88c8 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java @@ -134,14 +134,19 @@ protected void doShutdown() throws Exception { super.doShutdown(); } - protected synchronized ExecutorService getAsyncStartStopExecutorService() { - if (asyncStartStopExecutorService == null) { - // use a cached thread pool for async start tasks as they can run for a while, and we need a dedicated thread - // for each task, and the thread pool will shrink when no more tasks running - asyncStartStopExecutorService - = getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, "AsyncStartStopListener"); + protected ExecutorService getAsyncStartStopExecutorService() { + lock.lock(); + try { + if (asyncStartStopExecutorService == null) { + // use a cached thread pool for async start tasks as they can run for a while, and we need a dedicated thread + // for each task, and the thread pool will shrink when no more tasks running + asyncStartStopExecutorService + = getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, "AsyncStartStopListener"); + } + return asyncStartStopExecutorService; + } finally { + lock.unlock(); } - return asyncStartStopExecutorService; } public void setConnectionFactory(ConnectionFactory connectionFactory) { diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java index 9f71a770abdbe..6ff8154ea8adc 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java @@ -95,7 +95,8 @@ protected void doStop() throws Exception { protected void initReplyManager() { if (!started.get()) { - synchronized (this) { + lock.lock(); + try { if (started.get()) { return; } @@ -136,6 +137,8 @@ protected void initReplyManager() { Thread.currentThread().setContextClassLoader(oldClassLoader); } started.set(true); + } finally { + lock.unlock(); } } } diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java index 3aa5eb353496f..63fe3dd1c33ee 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java @@ -16,6 +16,9 @@ */ package org.apache.camel.component.sjms.consumer; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + import jakarta.jms.Connection; import jakarta.jms.Destination; import jakarta.jms.JMSException; @@ -66,6 +69,7 @@ public class EndpointMessageListener implements SessionMessageListener { private boolean eagerLoadingOfProperties; private String eagerPoisonBody; private volatile SjmsTemplate template; + private final Lock lock = new ReentrantLock(); public EndpointMessageListener(SjmsConsumer consumer, SjmsEndpoint endpoint, Processor processor) { this.consumer = consumer; @@ -73,11 +77,16 @@ public EndpointMessageListener(SjmsConsumer consumer, SjmsEndpoint endpoint, Pro this.processor = AsyncProcessorConverterHelper.convert(processor); } - public synchronized SjmsTemplate getTemplate() { - if (template == null) { - template = endpoint.createInOnlyTemplate(); + public SjmsTemplate getTemplate() { + lock.lock(); + try { + if (template == null) { + template = endpoint.createInOnlyTemplate(); + } + return template; + } finally { + lock.unlock(); } - return template; } public void setTemplate(SjmsTemplate template) { diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/SimpleMessageListenerContainer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/SimpleMessageListenerContainer.java index 0a931703378ee..081e98e65a03a 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/SimpleMessageListenerContainer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/SimpleMessageListenerContainer.java @@ -19,6 +19,8 @@ import java.util.HashSet; import java.util.Set; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import jakarta.jms.Connection; import jakarta.jms.ConnectionFactory; @@ -54,10 +56,10 @@ public class SimpleMessageListenerContainer extends ServiceSupport private String destinationName; private DestinationCreationStrategy destinationCreationStrategy; - private final Object connectionLock = new Object(); + private final Lock connectionLock = new ReentrantLock(); private Connection connection; private volatile boolean connectionStarted; - private final Object consumerLock = new Object(); + private final Lock consumerLock = new ReentrantLock(); private Set consumers; private Set sessions; private BackOffTimer.Task recoverTask; @@ -181,9 +183,12 @@ public void onException(JMSException exception) { } } - synchronized (this.connectionLock) { + connectionLock.lock(); + try { this.sessions = null; this.consumers = null; + } finally { + connectionLock.unlock(); } scheduleConnectionRecovery(); } @@ -240,7 +245,8 @@ protected void doStop() throws Exception { } protected void initConsumers() throws Exception { - synchronized (this.consumerLock) { + consumerLock.lock(); + try { if (consumers == null) { LOG.debug("Initializing {} concurrent consumers as JMS listener on destination: {}", concurrentConsumers, destinationName); @@ -254,6 +260,8 @@ protected void initConsumers() throws Exception { consumers.add(consumer); } } + } finally { + consumerLock.unlock(); } } @@ -266,7 +274,8 @@ protected MessageConsumer createMessageConsumer(Session session) throws Exceptio } protected void stopConsumers() { - synchronized (this.consumerLock) { + consumerLock.lock(); + try { if (consumers != null) { LOG.debug("Stopping JMS MessageConsumers"); for (MessageConsumer consumer : this.consumers) { @@ -279,11 +288,14 @@ protected void stopConsumers() { } } } + } finally { + consumerLock.unlock(); } } protected void createConnection() throws Exception { - synchronized (this.connectionLock) { + connectionLock.lock(); + try { if (this.connection == null) { Connection con = null; try { @@ -300,22 +312,28 @@ protected void createConnection() throws Exception { this.connection = con; LOG.debug("Created JMS Connection"); } + } finally { + connectionLock.unlock(); } } protected final void refreshConnection() throws Exception { - synchronized (this.connectionLock) { + connectionLock.lock(); + try { closeConnection(connection); this.connection = null; createConnection(); if (this.connectionStarted) { startConnection(); } + } finally { + connectionLock.unlock(); } } protected void startConnection() throws Exception { - synchronized (this.connectionLock) { + connectionLock.lock(); + try { this.connectionStarted = true; if (this.connection != null) { try { @@ -324,11 +342,14 @@ protected void startConnection() throws Exception { // ignore as it may already be started } } + } finally { + connectionLock.unlock(); } } protected void stopConnection() { - synchronized (this.connectionLock) { + connectionLock.lock(); + try { this.connectionStarted = false; if (this.connection != null) { try { @@ -337,6 +358,8 @@ protected void stopConnection() { LOG.debug("Error stopping connection. This exception is ignored.", e); } } + } finally { + connectionLock.unlock(); } } diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/MessageSelectorCreator.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/MessageSelectorCreator.java index f7aa5899f382b..59f8443fd9366 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/MessageSelectorCreator.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/MessageSelectorCreator.java @@ -17,6 +17,8 @@ package org.apache.camel.component.sjms.reply; import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.apache.camel.TimeoutMap; import org.slf4j.Logger; @@ -32,6 +34,7 @@ public class MessageSelectorCreator { protected static final Logger LOG = LoggerFactory.getLogger(MessageSelectorCreator.class); protected final TimeoutMap timeoutMap; protected final ConcurrentSkipListSet correlationIds; + private final Lock lock = new ReentrantLock(); protected volatile boolean dirty = true; protected StringBuilder expression; @@ -44,34 +47,39 @@ public MessageSelectorCreator(CorrelationTimeoutMap timeoutMap) { this.correlationIds = new ConcurrentSkipListSet<>(); } - public synchronized String get() { - if (!dirty) { - return expression.toString(); - } + public String get() { + lock.lock(); + try { + if (!dirty) { + return expression.toString(); + } - expression = new StringBuilder(256); + expression = new StringBuilder(256); - expression.append("JMSCorrelationID='"); - if (correlationIds.isEmpty()) { - // no id's so use a dummy to select nothing - expression.append("CamelDummyJmsMessageSelector'"); - } else { - boolean first = true; - for (String value : correlationIds) { - if (!first) { - expression.append(" OR JMSCorrelationID='"); - } - expression.append(value).append("'"); - if (first) { - first = false; + expression.append("JMSCorrelationID='"); + if (correlationIds.isEmpty()) { + // no id's so use a dummy to select nothing + expression.append("CamelDummyJmsMessageSelector'"); + } else { + boolean first = true; + for (String value : correlationIds) { + if (!first) { + expression.append(" OR JMSCorrelationID='"); + } + expression.append(value).append("'"); + if (first) { + first = false; + } } } - } - String answer = expression.toString(); + String answer = expression.toString(); - dirty = false; - return answer; + dirty = false; + return answer; + } finally { + lock.unlock(); + } } // Changes to live correlation-ids invalidate existing message selector diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/QueueReplyManager.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/QueueReplyManager.java index 662f40fa6cccd..c92ba21e51613 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/QueueReplyManager.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/QueueReplyManager.java @@ -73,12 +73,15 @@ private final class DestinationResolverDelegate implements DestinationCreationSt @Override public Destination createDestination(Session session, String destinationName, boolean topic) throws JMSException { - synchronized (QueueReplyManager.this) { + QueueReplyManager.this.lock.lock(); + try { // resolve the reply to destination if (destination == null) { destination = delegate.createDestination(session, destinationName, topic); setReplyTo(destination); } + } finally { + QueueReplyManager.this.lock.unlock(); } return destination; } diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConnectionFactory.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConnectionFactory.java index 4f13a2d5ff31e..c9119d3b4b0f7 100644 --- a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConnectionFactory.java +++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConnectionFactory.java @@ -21,6 +21,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import com.splunk.HttpService; import com.splunk.SSLSecurityProtocol; @@ -45,6 +47,7 @@ public class SplunkConnectionFactory { private boolean useSunHttpsHandler; private SSLSecurityProtocol sslProtocol; private boolean validateCertificates; + private final Lock lock = new ReentrantLock(); public SplunkConnectionFactory(final String host, final int port, final String username, final String password) { this.host = host; @@ -113,81 +116,86 @@ public void setToken(String token) { this.token = token; } - public synchronized Service createService(CamelContext camelContext) { - final ServiceArgs args = new ServiceArgs(); - if (host != null) { - args.setHost(host); - } - if (port > 0) { - args.setPort(port); - } - if (scheme != null) { - args.setScheme(scheme); - } - if (app != null) { - args.setApp(app); - } - if (owner != null) { - args.setOwner(owner); - } - if (username != null) { - args.setUsername(username); - } - if (password != null && token == null) { - args.setPassword(password); - } - if (token != null) { - args.setToken(String.format("Bearer %s", token)); - args.remove("username"); - args.remove("password"); - } - // useful in cases where you want to bypass app. servers https handling - // (wls i'm looking at you) - if (isUseSunHttpsHandler()) { - String sunHandlerClassName = "sun.net.www.protocol.https.Handler"; - Class clazz - = camelContext.getClassResolver().resolveClass(sunHandlerClassName, URLStreamHandler.class); - if (clazz != null) { - URLStreamHandler handler = camelContext.getInjector().newInstance(clazz); - args.setHTTPSHandler(handler); - LOG.debug("using the URLStreamHandler {} for {}", handler, args); - } else { - LOG.warn("could not resolve and use the URLStreamHandler class '{}'", sunHandlerClassName); + public Service createService(CamelContext camelContext) { + lock.lock(); + try { + final ServiceArgs args = new ServiceArgs(); + if (host != null) { + args.setHost(host); } - } - - ExecutorService executor - = camelContext.getExecutorServiceManager().newSingleThreadExecutor(this, "DefaultSplunkConnectionFactory"); - - Future future = executor.submit(new Callable() { - public Service call() throws Exception { - if (Service.DEFAULT_SCHEME.equals(getScheme())) { - LOG.debug("Https in use. Setting SSL protocol to {} and sertificate validation to %s", getSslProtocol(), - isValidateCertificates()); - HttpService.setValidateCertificates(isValidateCertificates()); - HttpService.setSslSecurityProtocol(getSslProtocol()); + if (port > 0) { + args.setPort(port); + } + if (scheme != null) { + args.setScheme(scheme); + } + if (app != null) { + args.setApp(app); + } + if (owner != null) { + args.setOwner(owner); + } + if (username != null) { + args.setUsername(username); + } + if (password != null && token == null) { + args.setPassword(password); + } + if (token != null) { + args.setToken(String.format("Bearer %s", token)); + args.remove("username"); + args.remove("password"); + } + // useful in cases where you want to bypass app. servers https handling + // (wls i'm looking at you) + if (isUseSunHttpsHandler()) { + String sunHandlerClassName = "sun.net.www.protocol.https.Handler"; + Class clazz + = camelContext.getClassResolver().resolveClass(sunHandlerClassName, URLStreamHandler.class); + if (clazz != null) { + URLStreamHandler handler = camelContext.getInjector().newInstance(clazz); + args.setHTTPSHandler(handler); + LOG.debug("using the URLStreamHandler {} for {}", handler, args); + } else { + LOG.warn("could not resolve and use the URLStreamHandler class '{}'", sunHandlerClassName); } - return Service.connect(args); } - }); - try { - Service service = null; - if (connectionTimeout > 0) { - service = future.get(connectionTimeout, TimeUnit.MILLISECONDS); - } else { - service = future.get(); + + ExecutorService executor + = camelContext.getExecutorServiceManager().newSingleThreadExecutor(this, "DefaultSplunkConnectionFactory"); + + Future future = executor.submit(new Callable() { + public Service call() throws Exception { + if (Service.DEFAULT_SCHEME.equals(getScheme())) { + LOG.debug("Https in use. Setting SSL protocol to {} and sertificate validation to %s", getSslProtocol(), + isValidateCertificates()); + HttpService.setValidateCertificates(isValidateCertificates()); + HttpService.setSslSecurityProtocol(getSslProtocol()); + } + return Service.connect(args); + } + }); + try { + Service service = null; + if (connectionTimeout > 0) { + service = future.get(connectionTimeout, TimeUnit.MILLISECONDS); + } else { + service = future.get(); + } + LOG.info("Successfully connected to Splunk"); + return service; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException( + String.format("could not connect to Splunk Server @ %s:%d - %s", host, port, e.getMessage()), e); + } catch (Exception e) { + throw new RuntimeException( + String.format("could not connect to Splunk Server @ %s:%d - %s", host, port, e.getMessage()), e); + } finally { + camelContext.getExecutorServiceManager().shutdownNow(executor); } - LOG.info("Successfully connected to Splunk"); - return service; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException( - String.format("could not connect to Splunk Server @ %s:%d - %s", host, port, e.getMessage()), e); - } catch (Exception e) { - throw new RuntimeException( - String.format("could not connect to Splunk Server @ %s:%d - %s", host, port, e.getMessage()), e); } finally { - camelContext.getExecutorServiceManager().shutdownNow(executor); + lock.unlock(); } } } diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkEndpoint.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkEndpoint.java index ec90916e8b1d3..1fe381a91385a 100644 --- a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkEndpoint.java +++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkEndpoint.java @@ -130,14 +130,19 @@ public SplunkConfiguration getConfiguration() { return configuration; } - public synchronized boolean reset(Exception e) { - boolean answer = false; - if (e instanceof RuntimeException && e.getCause() instanceof ConnectException - || e instanceof SocketException || e instanceof SSLException) { - LOG.warn("Got exception from Splunk. Service will be reset."); - this.service = null; - answer = true; + public boolean reset(Exception e) { + lock.lock(); + try { + boolean answer = false; + if (e instanceof RuntimeException && e.getCause() instanceof ConnectException + || e instanceof SocketException || e instanceof SSLException) { + LOG.warn("Got exception from Splunk. Service will be reset."); + this.service = null; + answer = true; + } + return answer; + } finally { + lock.unlock(); } - return answer; } } diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataWriter.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataWriter.java index f131fa14d6daf..36b8d28aaf280 100644 --- a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataWriter.java +++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataWriter.java @@ -22,6 +22,8 @@ import java.io.Writer; import java.net.Socket; import java.nio.charset.StandardCharsets; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import com.splunk.Args; import com.splunk.Service; @@ -37,6 +39,7 @@ public abstract class SplunkDataWriter implements DataWriter { protected Args args; private boolean connected; private Socket socket; + protected final Lock lock = new ReentrantLock(); public SplunkDataWriter(SplunkEndpoint endpoint, Args args) { this.endpoint = endpoint; @@ -55,27 +58,36 @@ public void write(String event) throws Exception { doWrite(event + SplunkEvent.LINEBREAK); } - protected synchronized void doWrite(String event) throws IOException { - LOG.debug("writing event to splunk:{}", event); - OutputStream ostream = socket.getOutputStream(); - Writer writer = new OutputStreamWriter(ostream, StandardCharsets.UTF_8); - writer.write(event); - writer.flush(); + protected void doWrite(String event) throws IOException { + lock.lock(); + try { + LOG.debug("writing event to splunk:{}", event); + OutputStream ostream = socket.getOutputStream(); + Writer writer = new OutputStreamWriter(ostream, StandardCharsets.UTF_8); + writer.write(event); + writer.flush(); + } finally { + lock.unlock(); + } } @Override - public synchronized void start() { + public void start() { + lock.lock(); try { socket = createSocket(endpoint.getService()); connected = true; } catch (Exception e) { connected = false; throw new RuntimeException(e); + } finally { + lock.unlock(); } } @Override - public synchronized void stop() { + public void stop() { + lock.lock(); try { if (socket != null) { socket.close(); @@ -83,6 +95,8 @@ public synchronized void stop() { } } catch (Exception e) { throw new RuntimeException(e); + } finally { + lock.unlock(); } } diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SubmitDataWriter.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SubmitDataWriter.java index f9587b080c1eb..1d773ed8400a6 100644 --- a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SubmitDataWriter.java +++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SubmitDataWriter.java @@ -33,13 +33,18 @@ public SubmitDataWriter(SplunkEndpoint endpoint, Args args) { } @Override - protected synchronized void doWrite(String event) throws IOException { - Index index = getIndex(); - if (index != null) { - index.submit(args, event); - } else { - Receiver receiver = endpoint.getService().getReceiver(); - receiver.submit(args, event); + protected void doWrite(String event) throws IOException { + lock.lock(); + try { + Index index = getIndex(); + if (index != null) { + index.submit(args, event); + } else { + Receiver receiver = endpoint.getService().getReceiver(); + receiver.submit(args, event); + } + } finally { + lock.unlock(); } } diff --git a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/EndpointMessageListener.java b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/EndpointMessageListener.java index 2dda0bd9ec2f1..b16969f2b8e02 100644 --- a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/EndpointMessageListener.java +++ b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/EndpointMessageListener.java @@ -17,6 +17,8 @@ package org.apache.camel.component.springrabbit; import java.util.Map; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import com.rabbitmq.client.Channel; import org.apache.camel.AsyncCallback; @@ -47,6 +49,7 @@ public class EndpointMessageListener implements ChannelAwareMessageListener { private RabbitTemplate template; private boolean disableReplyTo; private boolean async; + private final Lock lock = new ReentrantLock(); public EndpointMessageListener(SpringRabbitMQConsumer consumer, SpringRabbitMQEndpoint endpoint, Processor processor) { this.consumer = consumer; @@ -76,11 +79,16 @@ public void setDisableReplyTo(boolean disableReplyTo) { this.disableReplyTo = disableReplyTo; } - public synchronized RabbitTemplate getTemplate() { - if (template == null) { - template = endpoint.createInOnlyTemplate(); + public RabbitTemplate getTemplate() { + lock.lock(); + try { + if (template == null) { + template = endpoint.createInOnlyTemplate(); + } + return template; + } finally { + lock.unlock(); } - return template; } public void setTemplate(RabbitTemplate template) { diff --git a/components/camel-spring-security/src/main/java/org/apache/camel/component/spring/security/SpringSecurityAuthorizationPolicy.java b/components/camel-spring-security/src/main/java/org/apache/camel/component/spring/security/SpringSecurityAuthorizationPolicy.java index 52a53292e00a9..59548292cecbf 100644 --- a/components/camel-spring-security/src/main/java/org/apache/camel/component/spring/security/SpringSecurityAuthorizationPolicy.java +++ b/components/camel-spring-security/src/main/java/org/apache/camel/component/spring/security/SpringSecurityAuthorizationPolicy.java @@ -16,6 +16,9 @@ */ package org.apache.camel.component.spring.security; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + import javax.security.auth.Subject; import org.apache.camel.CamelAuthorizationException; @@ -48,10 +51,11 @@ public class SpringSecurityAuthorizationPolicy extends IdentifiedType private static final Logger LOG = LoggerFactory.getLogger(SpringSecurityAuthorizationPolicy.class); private AuthorizationManager authorizationManager; private AuthenticationManager authenticationManager; - private AuthenticationAdapter authenticationAdapter; + private volatile AuthenticationAdapter authenticationAdapter; private ApplicationEventPublisher eventPublisher; private boolean alwaysReauthenticate; private boolean useThreadSecurityContext = true; + private final Lock lock = new ReentrantLock(); @Override public void beforeWrap(Route route, NamedNode definition) { @@ -145,16 +149,20 @@ private void publishEvent(ApplicationEvent event) { } public AuthenticationAdapter getAuthenticationAdapter() { - if (authenticationAdapter == null) { - synchronized (this) { - if (authenticationAdapter != null) { - return authenticationAdapter; - } else { - authenticationAdapter = new DefaultAuthenticationAdapter(); + AuthenticationAdapter adapter = authenticationAdapter; + if (adapter == null) { + lock.lock(); + try { + adapter = authenticationAdapter; + if (adapter == null) { + adapter = new DefaultAuthenticationAdapter(); + authenticationAdapter = adapter; } + } finally { + lock.unlock(); } } - return authenticationAdapter; + return adapter; } public void setAuthenticationAdapter(AuthenticationAdapter adapter) { diff --git a/components/camel-spring-ws/src/main/java/org/apache/camel/component/spring/ws/SpringWebserviceProducer.java b/components/camel-spring-ws/src/main/java/org/apache/camel/component/spring/ws/SpringWebserviceProducer.java index 7ad8f5b78ec72..0c210e330b4fb 100644 --- a/components/camel-spring-ws/src/main/java/org/apache/camel/component/spring/ws/SpringWebserviceProducer.java +++ b/components/camel-spring-ws/src/main/java/org/apache/camel/component/spring/ws/SpringWebserviceProducer.java @@ -21,6 +21,8 @@ import java.net.URI; import java.security.GeneralSecurityException; import java.util.Iterator; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.SSLContext; @@ -213,6 +215,7 @@ protected static final class AbstractHttpWebServiceMessageSenderDecorator extend private final AbstractHttpWebServiceMessageSender delegate; private final SpringWebserviceConfiguration configuration; private final CamelContext camelContext; + private final Lock lock = new ReentrantLock(); private SSLContext sslContext; @@ -235,14 +238,15 @@ public WebServiceConnection createConnection(URI uri) throws IOException { } if (configuration.getSslContextParameters() != null && connection instanceof HttpsURLConnection) { + lock.lock(); try { - synchronized (this) { - if (sslContext == null) { - sslContext = configuration.getSslContextParameters().createSSLContext(camelContext); - } + if (sslContext == null) { + sslContext = configuration.getSslContextParameters().createSSLContext(camelContext); } } catch (GeneralSecurityException e) { throw new RuntimeCamelException("Error creating SSLContext based on SSLContextParameters.", e); + } finally { + lock.unlock(); } ((HttpsURLConnection) connection).setSSLSocketFactory(sslContext.getSocketFactory()); diff --git a/components/camel-spring/src/main/java/org/apache/camel/component/event/EventEndpoint.java b/components/camel-spring/src/main/java/org/apache/camel/component/event/EventEndpoint.java index 1e8ef97b1d6bb..667035bf13558 100644 --- a/components/camel-spring/src/main/java/org/apache/camel/component/event/EventEndpoint.java +++ b/components/camel-spring/src/main/java/org/apache/camel/component/event/EventEndpoint.java @@ -121,14 +121,24 @@ public EventComponent getComponent() { // Implementation methods // ------------------------------------------------------------------------- - public synchronized void consumerStarted(EventConsumer consumer) { - getComponent().consumerStarted(this); - getLoadBalancer().addProcessor(consumer.getAsyncProcessor()); + public void consumerStarted(EventConsumer consumer) { + lock.lock(); + try { + getComponent().consumerStarted(this); + getLoadBalancer().addProcessor(consumer.getAsyncProcessor()); + } finally { + lock.unlock(); + } } - public synchronized void consumerStopped(EventConsumer consumer) { - getComponent().consumerStopped(this); - getLoadBalancer().removeProcessor(consumer.getAsyncProcessor()); + public void consumerStopped(EventConsumer consumer) { + lock.lock(); + try { + getComponent().consumerStopped(this); + getLoadBalancer().removeProcessor(consumer.getAsyncProcessor()); + } finally { + lock.unlock(); + } } protected LoadBalancer createLoadBalancer() { diff --git a/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandlerReifier.java b/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandlerReifier.java index 6d5199bcb1752..a45914ddc73ac 100644 --- a/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandlerReifier.java +++ b/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandlerReifier.java @@ -191,29 +191,34 @@ private Predicate resolveRetryWhilePolicy(TransactionErrorHandlerDefinition defi return answer; } - protected synchronized ScheduledExecutorService getExecutorService( + protected ScheduledExecutorService getExecutorService( ScheduledExecutorService executorService, String executorServiceRef) { - if (executorService == null || executorService.isShutdown()) { - // camel context will shutdown the executor when it shutdown so no - // need to shut it down when stopping - if (executorServiceRef != null) { - executorService = lookupByNameAndType(executorServiceRef, ScheduledExecutorService.class); - if (executorService == null) { - ExecutorServiceManager manager = camelContext.getExecutorServiceManager(); - ThreadPoolProfile profile = manager.getThreadPoolProfile(executorServiceRef); - executorService = manager.newScheduledThreadPool(this, executorServiceRef, profile); + lock.lock(); + try { + if (executorService == null || executorService.isShutdown()) { + // camel context will shutdown the executor when it shutdown so no + // need to shut it down when stopping + if (executorServiceRef != null) { + executorService = lookupByNameAndType(executorServiceRef, ScheduledExecutorService.class); + if (executorService == null) { + ExecutorServiceManager manager = camelContext.getExecutorServiceManager(); + ThreadPoolProfile profile = manager.getThreadPoolProfile(executorServiceRef); + executorService = manager.newScheduledThreadPool(this, executorServiceRef, profile); + } + if (executorService == null) { + throw new IllegalArgumentException("ExecutorService " + executorServiceRef + " not found in registry."); + } + } else { + // no explicit configured thread pool, so leave it up to the + // error handler to decide if it need a default thread pool from + // CamelContext#getErrorHandlerExecutorService + executorService = null; } - if (executorService == null) { - throw new IllegalArgumentException("ExecutorService " + executorServiceRef + " not found in registry."); - } - } else { - // no explicit configured thread pool, so leave it up to the - // error handler to decide if it need a default thread pool from - // CamelContext#getErrorHandlerExecutorService - executorService = null; } + return executorService; + } finally { + lock.unlock(); } - return executorService; } } diff --git a/components/camel-stax/src/main/java/org/apache/camel/component/stax/StAXJAXBIteratorExpression.java b/components/camel-stax/src/main/java/org/apache/camel/component/stax/StAXJAXBIteratorExpression.java index 40f265349b496..f9b785d8c78e3 100644 --- a/components/camel-stax/src/main/java/org/apache/camel/component/stax/StAXJAXBIteratorExpression.java +++ b/components/camel-stax/src/main/java/org/apache/camel/component/stax/StAXJAXBIteratorExpression.java @@ -101,16 +101,17 @@ public StAXJAXBIteratorExpression(String handledName, boolean isNamespaceAware) } private static JAXBContext jaxbContext(Class handled) throws JAXBException { - if (JAX_CONTEXTS.containsKey(handled)) { - return JAX_CONTEXTS.get(handled); - } - - JAXBContext context; - synchronized (JAX_CONTEXTS) { - context = JAXBContext.newInstance(handled); - JAX_CONTEXTS.put(handled, context); + try { + return JAX_CONTEXTS.computeIfAbsent(handled, k -> { + try { + return JAXBContext.newInstance(handled); + } catch (JAXBException e) { + throw new RuntimeCamelException(e); + } + }); + } catch (RuntimeCamelException e) { + throw (JAXBException) e.getCause(); } - return context; } @Override diff --git a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java index 3ab6db20563c9..8739b8ec5a26f 100644 --- a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java +++ b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java @@ -335,43 +335,53 @@ private void readFromStreamLineMode() throws Exception { /** * Strategy method for processing the line */ - protected synchronized long processLine(String line, boolean last, long index) throws Exception { - if (endpoint.getGroupLines() > 0) { - // remember line - if (line != null) { - lines.add(line); - } + protected long processLine(String line, boolean last, long index) throws Exception { + lock.lock(); + try { + if (endpoint.getGroupLines() > 0) { + // remember line + if (line != null) { + lines.add(line); + } - // should we flush lines? - if (!lines.isEmpty() && (lines.size() >= endpoint.getGroupLines() || last)) { - // spit out lines as we hit the size, or it was the last - List copy = new ArrayList<>(lines); - Object body = endpoint.getGroupStrategy().groupLines(copy); - // remember to inc index when we create an exchange - Exchange exchange = createExchange(body, index++, last); + // should we flush lines? + if (!lines.isEmpty() && (lines.size() >= endpoint.getGroupLines() || last)) { + // spit out lines as we hit the size, or it was the last + List copy = new ArrayList<>(lines); + Object body = endpoint.getGroupStrategy().groupLines(copy); + // remember to inc index when we create an exchange + Exchange exchange = createExchange(body, index++, last); - // clear lines - lines.clear(); + // clear lines + lines.clear(); + getProcessor().process(exchange); + } + } else if (line != null) { + // single line + // remember to inc index when we create an exchange + Exchange exchange = createExchange(line, index++, last); getProcessor().process(exchange); } - } else if (line != null) { - // single line - // remember to inc index when we create an exchange - Exchange exchange = createExchange(line, index++, last); - getProcessor().process(exchange); - } - return index; + return index; + } finally { + lock.unlock(); + } } /** * Strategy method for processing the data */ - protected synchronized long processRaw(byte[] body, long index) throws Exception { - Exchange exchange = createExchange(body, index++, true); - getProcessor().process(exchange); - return index; + protected long processRaw(byte[] body, long index) throws Exception { + lock.lock(); + try { + Exchange exchange = createExchange(body, index++, true); + getProcessor().process(exchange); + return index; + } finally { + lock.unlock(); + } } /** diff --git a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java index 77b0e7d5298e2..9cc2328511bd6 100644 --- a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java +++ b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java @@ -69,14 +69,16 @@ protected void doStop() throws Exception { public boolean process(Exchange exchange, AsyncCallback callback) { try { delay(endpoint.getDelay()); - - synchronized (this) { + lock.lock(); + try { try { openStream(exchange); writeToStream(outputStream, exchange); } finally { closeStream(exchange, false); } + } finally { + lock.unlock(); } } catch (InterruptedException e) { exchange.setException(e);