This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new a861f8eeef6 CAMEL-18858: camel-kamelet - Create a copy of exchange 
when kamelet is acting as source, so the exchange is faked to be created 
directly by the consumer itself, so it originate from the user route, and make 
the kamelet as it was just like any other regular Camel component. (#13310)
a861f8eeef6 is described below

commit a861f8eeef69cf5b144feef19982108e43b5a5bf
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Tue Feb 27 06:35:49 2024 +0100

    CAMEL-18858: camel-kamelet - Create a copy of exchange when kamelet is 
acting as source, so the exchange is faked to be created directly by the 
consumer itself, so it originate from the user route, and make the kamelet as 
it was just like any other regular Camel component. (#13310)
---
 .../camel/component/kamelet/KameletProducer.java   | 45 +++++++++++++---------
 .../camel/component/kamelet/KameletReifier.java    |  7 +++-
 .../org/apache/camel/support/ExchangeHelper.java   |  1 +
 3 files changed, 33 insertions(+), 20 deletions(-)

diff --git 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java
 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java
index 1e2d2960647..68f6a734f77 100644
--- 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java
+++ 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java
@@ -20,6 +20,7 @@ import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.Route;
+import org.apache.camel.spi.ManagementStrategy;
 import org.apache.camel.spi.RouteIdAware;
 import org.apache.camel.support.DefaultAsyncProducer;
 import org.apache.camel.support.ExchangeHelper;
@@ -40,6 +41,7 @@ final class KameletProducer extends DefaultAsyncProducer 
implements RouteIdAware
     private final long timeout;
     private final boolean sink;
     private String routeId;
+    boolean registerKamelets;
 
     public KameletProducer(KameletEndpoint endpoint, String key) {
         super(endpoint);
@@ -51,23 +53,6 @@ final class KameletProducer extends DefaultAsyncProducer 
implements RouteIdAware
         this.sink = 
getEndpoint().getEndpointKey().startsWith("kamelet://sink");
     }
 
-    @Override
-    public void process(Exchange exchange) throws Exception {
-        if (consumer == null || stateCounter != component.getStateCounter()) {
-            stateCounter = component.getStateCounter();
-            consumer = component.getConsumer(key, block, timeout);
-        }
-        if (consumer == null) {
-            if (endpoint.isFailIfNoConsumers()) {
-                throw new KameletConsumerNotAvailableException("No consumers 
available on endpoint: " + endpoint, exchange);
-            } else {
-                LOG.debug("message ignored, no consumers available on 
endpoint: {}", endpoint);
-            }
-        } else {
-            consumer.getProcessor().process(exchange);
-        }
-    }
-
     @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
         try {
@@ -111,8 +96,22 @@ final class KameletProducer extends DefaultAsyncProducer 
implements RouteIdAware
                         }
                     }
                 }
-                // kamelet producer that calls its kamelet consumer to process 
the incoming exchange
-                return consumer.getAsyncProcessor().process(exchange, 
callback);
+                if (registerKamelets) {
+                    // kamelets are first-class registered as route (as old 
behavior)
+                    return consumer.getAsyncProcessor().process(exchange, 
callback);
+                } else {
+                    // kamelet producer that calls its kamelet consumer to 
process the incoming exchange
+                    // create exchange copy to let a new lifecycle originate 
from the calling route (not the kamelet route)
+                    final Exchange copy = 
ExchangeHelper.createCorrelatedCopy(exchange, true, true);
+                    // fake copy as being created by the consumer
+                    
copy.getExchangeExtension().setFromEndpoint(consumer.getEndpoint());
+                    
copy.getExchangeExtension().setFromRouteId(consumer.getRouteId());
+                    return consumer.getAsyncProcessor().process(copy, doneSync 
-> {
+                        // copy result back after processing is done
+                        ExchangeHelper.copyResults(exchange, copy);
+                        callback.done(doneSync);
+                    });
+                }
             }
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
@@ -140,4 +139,12 @@ final class KameletProducer extends DefaultAsyncProducer 
implements RouteIdAware
         return key;
     }
 
+    @Override
+    protected void doInit() throws Exception {
+        ManagementStrategy ms = 
getEndpoint().getCamelContext().getManagementStrategy();
+        if (ms != null && ms.getManagementAgent() != null) {
+            registerKamelets = 
ms.getManagementAgent().getRegisterRoutesCreateByKamelet();
+        }
+    }
+
 }
diff --git 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletReifier.java
 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletReifier.java
index 6218ca99170..0252e6f92d8 100644
--- 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletReifier.java
+++ 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletReifier.java
@@ -20,6 +20,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.Route;
 import org.apache.camel.model.KameletDefinition;
 import org.apache.camel.reifier.ProcessorReifier;
+import org.apache.camel.support.PluginHelper;
 
 public class KameletReifier extends ProcessorReifier<KameletDefinition> {
 
@@ -34,6 +35,10 @@ public class KameletReifier extends 
ProcessorReifier<KameletDefinition> {
             // use an empty noop processor, as there should be a single 
processor
             processor = new NoopProcessor();
         }
-        return new KameletProcessor(camelContext, 
parseString(definition.getName()), processor);
+        // wrap in uow
+        Processor target = new KameletProcessor(camelContext, 
parseString(definition.getName()), processor);
+        target = PluginHelper.getInternalProcessorFactory(camelContext)
+                .addUnitOfWorkProcessorAdvice(camelContext, target, null);
+        return target;
     }
 }
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java 
b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
index a74eb830b2f..2cdf7071450 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
@@ -374,6 +374,7 @@ public final class ExchangeHelper {
         resultExtension.setNotifyEvent(sourceExtension.isNotifyEvent());
         
resultExtension.setRedeliveryExhausted(sourceExtension.isRedeliveryExhausted());
         
resultExtension.setErrorHandlerHandled(sourceExtension.getErrorHandlerHandled());
+        resultExtension.setFailureHandled(sourceExtension.isFailureHandled());
 
         result.setException(source.getException());
     }

Reply via email to