This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch sc-copy
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 3e347b59c0d8da2101267680c724d0d23b1cc2d0
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Thu Jun 13 10:17:37 2024 +0200

    20866: seda/distruptor and wiretap in copy mode should make deep copy of 
stream cached body so its thread-safe
---
 .../component/disruptor/DisruptorProducer.java     | 135 +++++++++++----------
 .../apache/camel/component/seda/SedaProducer.java  |  15 ++-
 .../apache/camel/processor/WireTapProcessor.java   |  25 ++--
 .../java/org/apache/camel/util/SensitiveUtils.java |   2 +-
 4 files changed, 103 insertions(+), 74 deletions(-)

diff --git 
a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorProducer.java
 
b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorProducer.java
index 89348a8ed82..f55907945b2 100644
--- 
a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorProducer.java
+++ 
b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorProducer.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.disruptor;
 
+import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -23,6 +24,7 @@ import com.lmax.disruptor.InsufficientCapacityException;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangeTimedOutException;
+import org.apache.camel.StreamCache;
 import org.apache.camel.WaitForTaskToComplete;
 import org.apache.camel.support.DefaultAsyncProducer;
 import org.apache.camel.support.ExchangeHelper;
@@ -39,12 +41,10 @@ public class DisruptorProducer extends DefaultAsyncProducer 
{
 
     private final WaitForTaskToComplete waitForTaskToComplete;
     private final long timeout;
-
     private final DisruptorEndpoint endpoint;
-    private boolean blockWhenFull;
+    private final boolean blockWhenFull;
 
-    public DisruptorProducer(
-                             final DisruptorEndpoint endpoint,
+    public DisruptorProducer(final DisruptorEndpoint endpoint,
                              final WaitForTaskToComplete waitForTaskToComplete,
                              final long timeout, boolean blockWhenFull) {
         super(endpoint);
@@ -76,67 +76,71 @@ public class DisruptorProducer extends DefaultAsyncProducer 
{
             wait = exchange.getProperty(Exchange.ASYNC_WAIT, 
WaitForTaskToComplete.class);
         }
 
-        if (wait == WaitForTaskToComplete.Always
-                || wait == WaitForTaskToComplete.IfReplyExpected && 
ExchangeHelper.isOutCapable(exchange)) {
+        try {
+            if (wait == WaitForTaskToComplete.Always
+                    || wait == WaitForTaskToComplete.IfReplyExpected && 
ExchangeHelper.isOutCapable(exchange)) {
 
-            // do not handover the completion as we wait for the copy to 
complete, and copy its result back when it done
-            final Exchange copy = prepareCopy(exchange, false);
+                // do not handover the completion as we wait for the copy to 
complete, and copy its result back when it done
+                final Exchange copy = prepareCopy(exchange, false);
 
-            // latch that waits until we are complete
-            final CountDownLatch latch = new CountDownLatch(1);
+                // latch that waits until we are complete
+                final CountDownLatch latch = new CountDownLatch(1);
 
-            // we should wait for the reply so install a on completion so we 
know when its complete
-            
copy.getExchangeExtension().addOnCompletion(newOnCompletion(exchange, latch));
+                // we should wait for the reply so install a on completion so 
we know when its complete
+                
copy.getExchangeExtension().addOnCompletion(newOnCompletion(exchange, latch));
 
-            doPublish(copy);
+                doPublish(copy);
 
-            if (timeout > 0) {
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("Waiting for task to complete using timeout 
(ms): {} at [{}]", timeout,
-                            endpoint.getEndpointUri());
-                }
-                // lets see if we can get the task done before the timeout
-                boolean done = false;
-                try {
-                    done = latch.await(timeout, TimeUnit.MILLISECONDS);
-                } catch (InterruptedException e) {
-                    LOG.info("Interrupted while waiting for the task to 
complete");
-                    Thread.currentThread().interrupt();
-                }
-                if (!done) {
-                    // Remove timed out Exchange from disruptor endpoint.
-
-                    // We can't actually remove a published exchange from an 
active Disruptor.
-                    // Instead we prevent processing of the exchange by 
setting a Property on the exchange and the value
-                    // would be an AtomicBoolean. This is set by the Producer 
and the Consumer would look up that Property and
-                    // check the AtomicBoolean. If the AtomicBoolean says that 
we are good to proceed, it will process the
-                    // exchange. If false, it will simply disregard the 
exchange.
-                    // But since the Property map is a Concurrent one, maybe 
we don't need the AtomicBoolean. Check with Simon.
-                    // Also check the TimeoutHandler of the new Disruptor 
3.0.0, consider making the switch to the latest version.
-                    
exchange.setProperty(DisruptorEndpoint.DISRUPTOR_IGNORE_EXCHANGE, true);
-
-                    exchange.setException(new 
ExchangeTimedOutException(exchange, timeout));
-
-                    // count down to indicate timeout
-                    latch.countDown();
+                if (timeout > 0) {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("Waiting for task to complete using timeout 
(ms): {} at [{}]", timeout,
+                                endpoint.getEndpointUri());
+                    }
+                    // lets see if we can get the task done before the timeout
+                    boolean done = false;
+                    try {
+                        done = latch.await(timeout, TimeUnit.MILLISECONDS);
+                    } catch (InterruptedException e) {
+                        LOG.info("Interrupted while waiting for the task to 
complete");
+                        Thread.currentThread().interrupt();
+                    }
+                    if (!done) {
+                        // Remove timed out Exchange from disruptor endpoint.
+
+                        // We can't actually remove a published exchange from 
an active Disruptor.
+                        // Instead we prevent processing of the exchange by 
setting a Property on the exchange and the value
+                        // would be an AtomicBoolean. This is set by the 
Producer and the Consumer would look up that Property and
+                        // check the AtomicBoolean. If the AtomicBoolean says 
that we are good to proceed, it will process the
+                        // exchange. If false, it will simply disregard the 
exchange.
+                        // But since the Property map is a Concurrent one, 
maybe we don't need the AtomicBoolean. Check with Simon.
+                        // Also check the TimeoutHandler of the new Disruptor 
3.0.0, consider making the switch to the latest version.
+                        
exchange.setProperty(DisruptorEndpoint.DISRUPTOR_IGNORE_EXCHANGE, true);
+
+                        exchange.setException(new 
ExchangeTimedOutException(exchange, timeout));
+
+                        // count down to indicate timeout
+                        latch.countDown();
+                    }
+                } else {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("Waiting for task to complete (blocking) at 
[{}]", endpoint.getEndpointUri());
+                    }
+                    // no timeout then wait until its done
+                    try {
+                        latch.await();
+                    } catch (InterruptedException e) {
+                        LOG.info("Interrupted while waiting for the task to 
complete");
+                        Thread.currentThread().interrupt();
+                    }
                 }
             } else {
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("Waiting for task to complete (blocking) at 
[{}]", endpoint.getEndpointUri());
-                }
-                // no timeout then wait until its done
-                try {
-                    latch.await();
-                } catch (InterruptedException e) {
-                    LOG.info("Interrupted while waiting for the task to 
complete");
-                    Thread.currentThread().interrupt();
-                }
+                // no wait, eg its a InOnly then just publish to the 
ringbuffer and return
+                // handover the completion so its the copy which performs 
that, as we do not wait
+                final Exchange copy = prepareCopy(exchange, true);
+                doPublish(copy);
             }
-        } else {
-            // no wait, eg its a InOnly then just publish to the ringbuffer 
and return
-            // handover the completion so its the copy which performs that, as 
we do not wait
-            final Exchange copy = prepareCopy(exchange, true);
-            doPublish(copy);
+        } catch (Exception e) {
+            exchange.setException(e);
         }
 
         // we use OnCompletion on the Exchange to callback and wait for the 
Exchange to be done
@@ -197,11 +201,20 @@ public class DisruptorProducer extends 
DefaultAsyncProducer {
         }
     }
 
-    private Exchange prepareCopy(final Exchange exchange, final boolean 
handover) {
+    private Exchange prepareCopy(final Exchange exchange, final boolean copy) 
throws IOException {
         // use a new copy of the exchange to route async
-        final Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, 
handover);
+        final Exchange target = ExchangeHelper.createCorrelatedCopy(exchange, 
copy);
         // set a new from endpoint to be the disruptor
-        copy.getExchangeExtension().setFromEndpoint(endpoint);
-        return copy;
+        target.getExchangeExtension().setFromEndpoint(endpoint);
+        if (copy) {
+            // if the body is stream caching based we need to make a deep copy
+            if (target.getMessage().getBody() instanceof StreamCache sc) {
+                StreamCache newBody = sc.copy(target);
+                if (newBody != null) {
+                    target.getMessage().setBody(newBody);
+                }
+            }
+        }
+        return target;
     }
 }
diff --git 
a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaProducer.java
 
b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaProducer.java
index e72b9edd943..77bd10b92c9 100644
--- 
a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaProducer.java
+++ 
b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaProducer.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.seda;
 
+import java.io.IOException;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -23,6 +24,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangeTimedOutException;
+import org.apache.camel.StreamCache;
 import org.apache.camel.WaitForTaskToComplete;
 import org.apache.camel.support.DefaultAsyncProducer;
 import org.apache.camel.support.ExchangeHelper;
@@ -106,7 +108,7 @@ public class SedaProducer extends DefaultAsyncProducer {
             try {
                 // do not copy as we already did the copy
                 addToQueue(copy, false);
-            } catch (SedaConsumerNotAvailableException e) {
+            } catch (SedaConsumerNotAvailableException | IOException e) {
                 exchange.setException(e);
                 callback.done(true);
                 return true;
@@ -146,7 +148,7 @@ public class SedaProducer extends DefaultAsyncProducer {
             // no wait, eg its a InOnly then just add to queue and return
             try {
                 addToQueue(exchange, true);
-            } catch (SedaConsumerNotAvailableException e) {
+            } catch (SedaConsumerNotAvailableException | IOException e) {
                 exchange.setException(e);
                 callback.done(true);
                 return true;
@@ -187,7 +189,7 @@ public class SedaProducer extends DefaultAsyncProducer {
      * @param exchange the exchange to add to the queue
      * @param copy     whether to create a copy of the exchange to use for 
adding to the queue
      */
-    protected void addToQueue(Exchange exchange, boolean copy) throws 
SedaConsumerNotAvailableException {
+    protected void addToQueue(Exchange exchange, boolean copy) throws 
SedaConsumerNotAvailableException, IOException {
         BlockingQueue<Exchange> queue = null;
         QueueReference queueReference = endpoint.getQueueReference();
         if (queueReference != null) {
@@ -212,6 +214,13 @@ public class SedaProducer extends DefaultAsyncProducer {
         // handover the completion so its the copy which performs that, as we 
do not wait
         if (copy) {
             target = prepareCopy(exchange, true);
+            // if the body is stream caching based we need to make a deep copy
+            if (target.getMessage().getBody() instanceof StreamCache sc) {
+                StreamCache newBody = sc.copy(target);
+                if (newBody != null) {
+                    target.getMessage().setBody(newBody);
+                }
+            }
         }
 
         LOG.trace("Adding Exchange to queue: {}", target);
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
index 755724ad85c..a0222c09a97 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
@@ -247,22 +247,29 @@ public class WireTapProcessor extends 
AsyncProcessorSupport
         return answer;
     }
 
-    private Exchange configureCopyExchange(Exchange exchange) {
+    private Exchange configureCopyExchange(Exchange exchange) throws 
IOException {
         // must use a copy as we dont want it to cause side effects of the 
original exchange
-        Exchange copy = 
processorExchangeFactory.createCorrelatedCopy(exchange, false);
+        Exchange target = 
processorExchangeFactory.createCorrelatedCopy(exchange, false);
         // should not be correlated, but we needed to copy without handover
-        copy.removeProperty(ExchangePropertyKey.CORRELATION_ID);
+        target.removeProperty(ExchangePropertyKey.CORRELATION_ID);
         // set MEP to InOnly as this wire tap is a fire and forget
-        copy.setPattern(ExchangePattern.InOnly);
+        target.setPattern(ExchangePattern.InOnly);
         // move OUT to IN if needed
-        if (copy.hasOut()) {
-            copy.setIn(copy.getOut());
-            copy.setOut(null);
+        if (target.hasOut()) {
+            target.setIn(target.getOut());
+            target.setOut(null);
         }
         // remove STREAM_CACHE_UNIT_OF_WORK property because this wire tap will
         // close its own created stream cache(s)
-        copy.removeProperty(ExchangePropertyKey.STREAM_CACHE_UNIT_OF_WORK);
-        return copy;
+        target.removeProperty(ExchangePropertyKey.STREAM_CACHE_UNIT_OF_WORK);
+        // if the body is stream caching based we need to make a deep copy
+        if (target.getMessage().getBody() instanceof StreamCache sc) {
+            StreamCache newBody = sc.copy(target);
+            if (newBody != null) {
+                target.getMessage().setBody(newBody);
+            }
+        }
+        return target;
     }
 
     private Exchange configureNewExchange(Exchange exchange) {
diff --git 
a/core/camel-util/src/main/java/org/apache/camel/util/SensitiveUtils.java 
b/core/camel-util/src/main/java/org/apache/camel/util/SensitiveUtils.java
index 9f0de97dac8..ad29dad2a23 100644
--- a/core/camel-util/src/main/java/org/apache/camel/util/SensitiveUtils.java
+++ b/core/camel-util/src/main/java/org/apache/camel/util/SensitiveUtils.java
@@ -180,7 +180,7 @@ public final class SensitiveUtils {
                                                     + "|\\Qverificationcode\\E"
                                                     + 
"|\\Qwebhookverifytoken\\E"
                                                     + 
"|\\Qzookeeperpassword\\E"
-                                                    // SENSITIVE-PATTERN: END
+    // SENSITIVE-PATTERN: END
     ;
 
     private SensitiveUtils() {

Reply via email to