This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 8db83a9b2250a817df69fb7c43adf19f4c2cea77 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Mon Apr 5 09:38:46 2021 +0200 CAMEL-16450: camel-core - Optimize WireTap only use dynamic send processor when being dynamic. --- .../org/apache/camel/processor/SendProcessor.java | 2 +- .../apache/camel/processor/WireTapProcessor.java | 27 ++++++++++++----- .../java/org/apache/camel/reifier/SendReifier.java | 2 +- .../org/apache/camel/reifier/WireTapReifier.java | 35 ++++++++++++++++++---- 4 files changed, 51 insertions(+), 15 deletions(-) diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java index e76b002..f156ddd 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java @@ -70,10 +70,10 @@ public class SendProcessor extends AsyncProcessorSupport implements Traceable, E ObjectHelper.notNull(destination, "destination"); this.destination = destination; this.camelContext = (ExtendedCamelContext) destination.getCamelContext(); + ObjectHelper.notNull(this.camelContext, "camelContext"); this.pattern = pattern; this.destinationExchangePattern = null; this.destinationExchangePattern = EndpointHelper.resolveExchangePatternFromUrl(destination.getEndpointUri()); - ObjectHelper.notNull(this.camelContext, "camelContext"); } @Override diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java index 00111ba..e72526d 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java @@ -61,7 +61,7 @@ public class WireTapProcessor extends AsyncProcessorSupport private String id; private String routeId; private CamelContext camelContext; - private final SendDynamicProcessor dynamicProcessor; + private final SendDynamicProcessor dynamicSendProcessor; // is only used for reporting statistics private final String uri; private final boolean dynamicUri; private final Processor processor; @@ -78,10 +78,11 @@ public class WireTapProcessor extends AsyncProcessorSupport private boolean copy; private Processor onPrepare; - public WireTapProcessor(SendDynamicProcessor dynamicProcessor, Processor processor, ExchangePattern exchangePattern, + public WireTapProcessor(SendDynamicProcessor dynamicSendProcessor, Processor processor, String uri, + ExchangePattern exchangePattern, ExecutorService executorService, boolean shutdownExecutorService, boolean dynamicUri) { - this.dynamicProcessor = dynamicProcessor; - this.uri = dynamicProcessor.getUri(); + this.dynamicSendProcessor = dynamicSendProcessor; + this.uri = uri; this.processor = processor; this.asyncProcessor = AsyncProcessorConverterHelper.convert(processor); this.exchangePattern = exchangePattern; @@ -148,7 +149,11 @@ public class WireTapProcessor extends AsyncProcessorSupport } public EndpointUtilizationStatistics getEndpointUtilizationStatistics() { - return dynamicProcessor.getEndpointUtilizationStatistics(); + if (dynamicSendProcessor != null) { + return dynamicSendProcessor.getEndpointUtilizationStatistics(); + } else { + return null; + } } @Override @@ -305,11 +310,19 @@ public class WireTapProcessor extends AsyncProcessorSupport } public int getCacheSize() { - return dynamicProcessor.getCacheSize(); + if (dynamicSendProcessor != null) { + return dynamicSendProcessor.getCacheSize(); + } else { + return 0; + } } public boolean isIgnoreInvalidEndpoint() { - return dynamicProcessor.isIgnoreInvalidEndpoint(); + if (dynamicSendProcessor != null) { + return dynamicSendProcessor.isIgnoreInvalidEndpoint(); + } else { + return false; + } } public boolean isDynamicUri() { diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/SendReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/SendReifier.java index c8cd1e0..f5a3051 100644 --- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/SendReifier.java +++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/SendReifier.java @@ -40,7 +40,7 @@ public class SendReifier extends ProcessorReifier<SendDefinition<?>> { public Endpoint resolveEndpoint() { if (definition.getEndpoint() == null) { if (definition.getEndpointProducerBuilder() == null) { - return CamelContextHelper.resolveEndpoint(camelContext, definition.getEndpointUri(), (String) null); + return CamelContextHelper.resolveEndpoint(camelContext, definition.getEndpointUri(), null); } else { return definition.getEndpointProducerBuilder().resolve(camelContext); } diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java index 79c1d55..3d0c214 100644 --- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java +++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java @@ -19,6 +19,7 @@ package org.apache.camel.reifier; import java.util.concurrent.ExecutorService; import org.apache.camel.AsyncProcessor; +import org.apache.camel.Endpoint; import org.apache.camel.ExchangePattern; import org.apache.camel.Expression; import org.apache.camel.ExtendedCamelContext; @@ -28,7 +29,11 @@ import org.apache.camel.model.ProcessorDefinition; import org.apache.camel.model.SetHeaderDefinition; import org.apache.camel.model.WireTapDefinition; import org.apache.camel.processor.SendDynamicProcessor; +import org.apache.camel.processor.SendProcessor; import org.apache.camel.processor.WireTapProcessor; +import org.apache.camel.support.CamelContextHelper; +import org.apache.camel.support.LanguageSupport; +import org.apache.camel.util.StringHelper; public class WireTapReifier extends ToDynamicReifier<WireTapDefinition<?>> { @@ -45,11 +50,30 @@ public class WireTapReifier extends ToDynamicReifier<WireTapDefinition<?>> { // must use InOnly for WireTap definition.setPattern(ExchangePattern.InOnly.name()); - // create the send dynamic producer to send to the wire tapped endpoint - SendDynamicProcessor dynamicTo = (SendDynamicProcessor) super.createProcessor(); + // optimize to only use dynamic processor if really needed + String uri; + if (definition.getEndpointProducerBuilder() != null) { + uri = definition.getEndpointProducerBuilder().getUri(); + } else { + uri = StringHelper.notEmpty(definition.getUri(), "uri", this); + } + + SendDynamicProcessor dynamicSendProcessor = null; + SendProcessor sendProcessor = null; + boolean simple = LanguageSupport.hasSimpleFunction(definition.getUri()); + boolean dynamic = parseBoolean(definition.getDynamicUri(), true); + if (dynamic && simple) { + // dynamic so we need the dynamic send processor + dynamicSendProcessor = (SendDynamicProcessor) super.createProcessor(); + } else { + // static so we can use a plain send processor + Endpoint endpoint = CamelContextHelper.resolveEndpoint(camelContext, uri, null); + sendProcessor = new SendProcessor(endpoint); + } // create error handler we need to use for processing the wire tapped - Processor childProcessor = wrapInErrorHandler(dynamicTo); + Processor producer = dynamicSendProcessor != null ? dynamicSendProcessor : sendProcessor; + Processor childProcessor = wrapInErrorHandler(producer); // and wrap in unit of work AsyncProcessor target = camelContext.adapt(ExtendedCamelContext.class).getInternalProcessorFactory() @@ -59,10 +83,9 @@ public class WireTapReifier extends ToDynamicReifier<WireTapDefinition<?>> { boolean isCopy = parseBoolean(definition.getCopy(), true); WireTapProcessor answer = new WireTapProcessor( - dynamicTo, target, + dynamicSendProcessor, target, uri, parse(ExchangePattern.class, definition.getPattern()), - threadPool, shutdownThreadPool, - parseBoolean(definition.getDynamicUri(), true)); + threadPool, shutdownThreadPool, dynamic); answer.setCopy(isCopy); Processor newExchangeProcessor = definition.getNewExchangeProcessor(); String ref = parseString(definition.getNewExchangeProcessorRef());