This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch sc-copy in repository https://gitbox.apache.org/repos/asf/camel.git
commit 3e347b59c0d8da2101267680c724d0d23b1cc2d0 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Thu Jun 13 10:17:37 2024 +0200 20866: seda/distruptor and wiretap in copy mode should make deep copy of stream cached body so its thread-safe --- .../component/disruptor/DisruptorProducer.java | 135 +++++++++++---------- .../apache/camel/component/seda/SedaProducer.java | 15 ++- .../apache/camel/processor/WireTapProcessor.java | 25 ++-- .../java/org/apache/camel/util/SensitiveUtils.java | 2 +- 4 files changed, 103 insertions(+), 74 deletions(-) diff --git a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorProducer.java b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorProducer.java index 89348a8ed82..f55907945b2 100644 --- a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorProducer.java +++ b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorProducer.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.disruptor; +import java.io.IOException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -23,6 +24,7 @@ import com.lmax.disruptor.InsufficientCapacityException; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.ExchangeTimedOutException; +import org.apache.camel.StreamCache; import org.apache.camel.WaitForTaskToComplete; import org.apache.camel.support.DefaultAsyncProducer; import org.apache.camel.support.ExchangeHelper; @@ -39,12 +41,10 @@ public class DisruptorProducer extends DefaultAsyncProducer { private final WaitForTaskToComplete waitForTaskToComplete; private final long timeout; - private final DisruptorEndpoint endpoint; - private boolean blockWhenFull; + private final boolean blockWhenFull; - public DisruptorProducer( - final DisruptorEndpoint endpoint, + public DisruptorProducer(final DisruptorEndpoint endpoint, final WaitForTaskToComplete waitForTaskToComplete, final long timeout, boolean blockWhenFull) { super(endpoint); @@ -76,67 +76,71 @@ public class DisruptorProducer extends DefaultAsyncProducer { wait = exchange.getProperty(Exchange.ASYNC_WAIT, WaitForTaskToComplete.class); } - if (wait == WaitForTaskToComplete.Always - || wait == WaitForTaskToComplete.IfReplyExpected && ExchangeHelper.isOutCapable(exchange)) { + try { + if (wait == WaitForTaskToComplete.Always + || wait == WaitForTaskToComplete.IfReplyExpected && ExchangeHelper.isOutCapable(exchange)) { - // do not handover the completion as we wait for the copy to complete, and copy its result back when it done - final Exchange copy = prepareCopy(exchange, false); + // do not handover the completion as we wait for the copy to complete, and copy its result back when it done + final Exchange copy = prepareCopy(exchange, false); - // latch that waits until we are complete - final CountDownLatch latch = new CountDownLatch(1); + // latch that waits until we are complete + final CountDownLatch latch = new CountDownLatch(1); - // we should wait for the reply so install a on completion so we know when its complete - copy.getExchangeExtension().addOnCompletion(newOnCompletion(exchange, latch)); + // we should wait for the reply so install a on completion so we know when its complete + copy.getExchangeExtension().addOnCompletion(newOnCompletion(exchange, latch)); - doPublish(copy); + doPublish(copy); - if (timeout > 0) { - if (LOG.isTraceEnabled()) { - LOG.trace("Waiting for task to complete using timeout (ms): {} at [{}]", timeout, - endpoint.getEndpointUri()); - } - // lets see if we can get the task done before the timeout - boolean done = false; - try { - done = latch.await(timeout, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - LOG.info("Interrupted while waiting for the task to complete"); - Thread.currentThread().interrupt(); - } - if (!done) { - // Remove timed out Exchange from disruptor endpoint. - - // We can't actually remove a published exchange from an active Disruptor. - // Instead we prevent processing of the exchange by setting a Property on the exchange and the value - // would be an AtomicBoolean. This is set by the Producer and the Consumer would look up that Property and - // check the AtomicBoolean. If the AtomicBoolean says that we are good to proceed, it will process the - // exchange. If false, it will simply disregard the exchange. - // But since the Property map is a Concurrent one, maybe we don't need the AtomicBoolean. Check with Simon. - // Also check the TimeoutHandler of the new Disruptor 3.0.0, consider making the switch to the latest version. - exchange.setProperty(DisruptorEndpoint.DISRUPTOR_IGNORE_EXCHANGE, true); - - exchange.setException(new ExchangeTimedOutException(exchange, timeout)); - - // count down to indicate timeout - latch.countDown(); + if (timeout > 0) { + if (LOG.isTraceEnabled()) { + LOG.trace("Waiting for task to complete using timeout (ms): {} at [{}]", timeout, + endpoint.getEndpointUri()); + } + // lets see if we can get the task done before the timeout + boolean done = false; + try { + done = latch.await(timeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + LOG.info("Interrupted while waiting for the task to complete"); + Thread.currentThread().interrupt(); + } + if (!done) { + // Remove timed out Exchange from disruptor endpoint. + + // We can't actually remove a published exchange from an active Disruptor. + // Instead we prevent processing of the exchange by setting a Property on the exchange and the value + // would be an AtomicBoolean. This is set by the Producer and the Consumer would look up that Property and + // check the AtomicBoolean. If the AtomicBoolean says that we are good to proceed, it will process the + // exchange. If false, it will simply disregard the exchange. + // But since the Property map is a Concurrent one, maybe we don't need the AtomicBoolean. Check with Simon. + // Also check the TimeoutHandler of the new Disruptor 3.0.0, consider making the switch to the latest version. + exchange.setProperty(DisruptorEndpoint.DISRUPTOR_IGNORE_EXCHANGE, true); + + exchange.setException(new ExchangeTimedOutException(exchange, timeout)); + + // count down to indicate timeout + latch.countDown(); + } + } else { + if (LOG.isTraceEnabled()) { + LOG.trace("Waiting for task to complete (blocking) at [{}]", endpoint.getEndpointUri()); + } + // no timeout then wait until its done + try { + latch.await(); + } catch (InterruptedException e) { + LOG.info("Interrupted while waiting for the task to complete"); + Thread.currentThread().interrupt(); + } } } else { - if (LOG.isTraceEnabled()) { - LOG.trace("Waiting for task to complete (blocking) at [{}]", endpoint.getEndpointUri()); - } - // no timeout then wait until its done - try { - latch.await(); - } catch (InterruptedException e) { - LOG.info("Interrupted while waiting for the task to complete"); - Thread.currentThread().interrupt(); - } + // no wait, eg its a InOnly then just publish to the ringbuffer and return + // handover the completion so its the copy which performs that, as we do not wait + final Exchange copy = prepareCopy(exchange, true); + doPublish(copy); } - } else { - // no wait, eg its a InOnly then just publish to the ringbuffer and return - // handover the completion so its the copy which performs that, as we do not wait - final Exchange copy = prepareCopy(exchange, true); - doPublish(copy); + } catch (Exception e) { + exchange.setException(e); } // we use OnCompletion on the Exchange to callback and wait for the Exchange to be done @@ -197,11 +201,20 @@ public class DisruptorProducer extends DefaultAsyncProducer { } } - private Exchange prepareCopy(final Exchange exchange, final boolean handover) { + private Exchange prepareCopy(final Exchange exchange, final boolean copy) throws IOException { // use a new copy of the exchange to route async - final Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, handover); + final Exchange target = ExchangeHelper.createCorrelatedCopy(exchange, copy); // set a new from endpoint to be the disruptor - copy.getExchangeExtension().setFromEndpoint(endpoint); - return copy; + target.getExchangeExtension().setFromEndpoint(endpoint); + if (copy) { + // if the body is stream caching based we need to make a deep copy + if (target.getMessage().getBody() instanceof StreamCache sc) { + StreamCache newBody = sc.copy(target); + if (newBody != null) { + target.getMessage().setBody(newBody); + } + } + } + return target; } } diff --git a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaProducer.java b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaProducer.java index e72b9edd943..77bd10b92c9 100644 --- a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaProducer.java +++ b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaProducer.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.seda; +import java.io.IOException; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -23,6 +24,7 @@ import java.util.concurrent.TimeUnit; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.ExchangeTimedOutException; +import org.apache.camel.StreamCache; import org.apache.camel.WaitForTaskToComplete; import org.apache.camel.support.DefaultAsyncProducer; import org.apache.camel.support.ExchangeHelper; @@ -106,7 +108,7 @@ public class SedaProducer extends DefaultAsyncProducer { try { // do not copy as we already did the copy addToQueue(copy, false); - } catch (SedaConsumerNotAvailableException e) { + } catch (SedaConsumerNotAvailableException | IOException e) { exchange.setException(e); callback.done(true); return true; @@ -146,7 +148,7 @@ public class SedaProducer extends DefaultAsyncProducer { // no wait, eg its a InOnly then just add to queue and return try { addToQueue(exchange, true); - } catch (SedaConsumerNotAvailableException e) { + } catch (SedaConsumerNotAvailableException | IOException e) { exchange.setException(e); callback.done(true); return true; @@ -187,7 +189,7 @@ public class SedaProducer extends DefaultAsyncProducer { * @param exchange the exchange to add to the queue * @param copy whether to create a copy of the exchange to use for adding to the queue */ - protected void addToQueue(Exchange exchange, boolean copy) throws SedaConsumerNotAvailableException { + protected void addToQueue(Exchange exchange, boolean copy) throws SedaConsumerNotAvailableException, IOException { BlockingQueue<Exchange> queue = null; QueueReference queueReference = endpoint.getQueueReference(); if (queueReference != null) { @@ -212,6 +214,13 @@ public class SedaProducer extends DefaultAsyncProducer { // handover the completion so its the copy which performs that, as we do not wait if (copy) { target = prepareCopy(exchange, true); + // if the body is stream caching based we need to make a deep copy + if (target.getMessage().getBody() instanceof StreamCache sc) { + StreamCache newBody = sc.copy(target); + if (newBody != null) { + target.getMessage().setBody(newBody); + } + } } LOG.trace("Adding Exchange to queue: {}", target); diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java index 755724ad85c..a0222c09a97 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java @@ -247,22 +247,29 @@ public class WireTapProcessor extends AsyncProcessorSupport return answer; } - private Exchange configureCopyExchange(Exchange exchange) { + private Exchange configureCopyExchange(Exchange exchange) throws IOException { // must use a copy as we dont want it to cause side effects of the original exchange - Exchange copy = processorExchangeFactory.createCorrelatedCopy(exchange, false); + Exchange target = processorExchangeFactory.createCorrelatedCopy(exchange, false); // should not be correlated, but we needed to copy without handover - copy.removeProperty(ExchangePropertyKey.CORRELATION_ID); + target.removeProperty(ExchangePropertyKey.CORRELATION_ID); // set MEP to InOnly as this wire tap is a fire and forget - copy.setPattern(ExchangePattern.InOnly); + target.setPattern(ExchangePattern.InOnly); // move OUT to IN if needed - if (copy.hasOut()) { - copy.setIn(copy.getOut()); - copy.setOut(null); + if (target.hasOut()) { + target.setIn(target.getOut()); + target.setOut(null); } // remove STREAM_CACHE_UNIT_OF_WORK property because this wire tap will // close its own created stream cache(s) - copy.removeProperty(ExchangePropertyKey.STREAM_CACHE_UNIT_OF_WORK); - return copy; + target.removeProperty(ExchangePropertyKey.STREAM_CACHE_UNIT_OF_WORK); + // if the body is stream caching based we need to make a deep copy + if (target.getMessage().getBody() instanceof StreamCache sc) { + StreamCache newBody = sc.copy(target); + if (newBody != null) { + target.getMessage().setBody(newBody); + } + } + return target; } private Exchange configureNewExchange(Exchange exchange) { diff --git a/core/camel-util/src/main/java/org/apache/camel/util/SensitiveUtils.java b/core/camel-util/src/main/java/org/apache/camel/util/SensitiveUtils.java index 9f0de97dac8..ad29dad2a23 100644 --- a/core/camel-util/src/main/java/org/apache/camel/util/SensitiveUtils.java +++ b/core/camel-util/src/main/java/org/apache/camel/util/SensitiveUtils.java @@ -180,7 +180,7 @@ public final class SensitiveUtils { + "|\\Qverificationcode\\E" + "|\\Qwebhookverifytoken\\E" + "|\\Qzookeeperpassword\\E" - // SENSITIVE-PATTERN: END + // SENSITIVE-PATTERN: END ; private SensitiveUtils() {