This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new a861f8eeef6 CAMEL-18858: camel-kamelet - Create a copy of exchange when kamelet is acting as source, so the exchange is faked to be created directly by the consumer itself, so it originate from the user route, and make the kamelet as it was just like any other regular Camel component. (#13310) a861f8eeef6 is described below commit a861f8eeef69cf5b144feef19982108e43b5a5bf Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Tue Feb 27 06:35:49 2024 +0100 CAMEL-18858: camel-kamelet - Create a copy of exchange when kamelet is acting as source, so the exchange is faked to be created directly by the consumer itself, so it originate from the user route, and make the kamelet as it was just like any other regular Camel component. (#13310) --- .../camel/component/kamelet/KameletProducer.java | 45 +++++++++++++--------- .../camel/component/kamelet/KameletReifier.java | 7 +++- .../org/apache/camel/support/ExchangeHelper.java | 1 + 3 files changed, 33 insertions(+), 20 deletions(-) diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java index 1e2d2960647..68f6a734f77 100644 --- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java @@ -20,6 +20,7 @@ import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; import org.apache.camel.Exchange; import org.apache.camel.Route; +import org.apache.camel.spi.ManagementStrategy; import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.DefaultAsyncProducer; import org.apache.camel.support.ExchangeHelper; @@ -40,6 +41,7 @@ final class KameletProducer extends DefaultAsyncProducer implements RouteIdAware private final long timeout; private final boolean sink; private String routeId; + boolean registerKamelets; public KameletProducer(KameletEndpoint endpoint, String key) { super(endpoint); @@ -51,23 +53,6 @@ final class KameletProducer extends DefaultAsyncProducer implements RouteIdAware this.sink = getEndpoint().getEndpointKey().startsWith("kamelet://sink"); } - @Override - public void process(Exchange exchange) throws Exception { - if (consumer == null || stateCounter != component.getStateCounter()) { - stateCounter = component.getStateCounter(); - consumer = component.getConsumer(key, block, timeout); - } - if (consumer == null) { - if (endpoint.isFailIfNoConsumers()) { - throw new KameletConsumerNotAvailableException("No consumers available on endpoint: " + endpoint, exchange); - } else { - LOG.debug("message ignored, no consumers available on endpoint: {}", endpoint); - } - } else { - consumer.getProcessor().process(exchange); - } - } - @Override public boolean process(Exchange exchange, AsyncCallback callback) { try { @@ -111,8 +96,22 @@ final class KameletProducer extends DefaultAsyncProducer implements RouteIdAware } } } - // kamelet producer that calls its kamelet consumer to process the incoming exchange - return consumer.getAsyncProcessor().process(exchange, callback); + if (registerKamelets) { + // kamelets are first-class registered as route (as old behavior) + return consumer.getAsyncProcessor().process(exchange, callback); + } else { + // kamelet producer that calls its kamelet consumer to process the incoming exchange + // create exchange copy to let a new lifecycle originate from the calling route (not the kamelet route) + final Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, true, true); + // fake copy as being created by the consumer + copy.getExchangeExtension().setFromEndpoint(consumer.getEndpoint()); + copy.getExchangeExtension().setFromRouteId(consumer.getRouteId()); + return consumer.getAsyncProcessor().process(copy, doneSync -> { + // copy result back after processing is done + ExchangeHelper.copyResults(exchange, copy); + callback.done(doneSync); + }); + } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -140,4 +139,12 @@ final class KameletProducer extends DefaultAsyncProducer implements RouteIdAware return key; } + @Override + protected void doInit() throws Exception { + ManagementStrategy ms = getEndpoint().getCamelContext().getManagementStrategy(); + if (ms != null && ms.getManagementAgent() != null) { + registerKamelets = ms.getManagementAgent().getRegisterRoutesCreateByKamelet(); + } + } + } diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletReifier.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletReifier.java index 6218ca99170..0252e6f92d8 100644 --- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletReifier.java +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletReifier.java @@ -20,6 +20,7 @@ import org.apache.camel.Processor; import org.apache.camel.Route; import org.apache.camel.model.KameletDefinition; import org.apache.camel.reifier.ProcessorReifier; +import org.apache.camel.support.PluginHelper; public class KameletReifier extends ProcessorReifier<KameletDefinition> { @@ -34,6 +35,10 @@ public class KameletReifier extends ProcessorReifier<KameletDefinition> { // use an empty noop processor, as there should be a single processor processor = new NoopProcessor(); } - return new KameletProcessor(camelContext, parseString(definition.getName()), processor); + // wrap in uow + Processor target = new KameletProcessor(camelContext, parseString(definition.getName()), processor); + target = PluginHelper.getInternalProcessorFactory(camelContext) + .addUnitOfWorkProcessorAdvice(camelContext, target, null); + return target; } } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java index a74eb830b2f..2cdf7071450 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java @@ -374,6 +374,7 @@ public final class ExchangeHelper { resultExtension.setNotifyEvent(sourceExtension.isNotifyEvent()); resultExtension.setRedeliveryExhausted(sourceExtension.isRedeliveryExhausted()); resultExtension.setErrorHandlerHandled(sourceExtension.getErrorHandlerHandled()); + resultExtension.setFailureHandled(sourceExtension.isFailureHandled()); result.setException(source.getException()); }