This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 83fa854 CAMEL-14644: camel-core - Optimize dynamic EIPs to only normalize uri once 83fa854 is described below commit 83fa854f634b99d9e15e4ba39571f4ce63e5cee1 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Mon Mar 2 14:20:27 2020 +0100 CAMEL-14644: camel-core - Optimize dynamic EIPs to only normalize uri once --- .../main/java/org/apache/camel/CamelContext.java | 8 ++- .../org/apache/camel/ExtendedCamelContext.java | 45 +++++++++++++++ .../apache/camel/spi/NormalizedEndpointUri.java} | 29 +++------- .../camel/impl/engine/AbstractCamelContext.java | 57 +++++++++++++++--- .../org/apache/camel/impl/engine/EndpointKey.java | 8 ++- .../java/org/apache/camel/processor/Enricher.java | 51 ++++++++++------ .../org/apache/camel/processor/PollEnricher.java | 51 ++++++++++------ .../camel/processor/RecipientListProcessor.java | 64 ++++++++++++++------- .../org/apache/camel/processor/RoutingSlip.java | 42 +++++++++++--- .../camel/processor/SendDynamicProcessor.java | 67 +++++++++++++--------- .../org/apache/camel/processor/SendProcessor.java | 8 +-- .../camel/impl/engine/DefaultCamelContextTest.java | 4 +- .../apache/camel/support/CamelContextHelper.java | 31 ++++++++++ .../org/apache/camel/support/EndpointHelper.java | 19 +++--- .../org/apache/camel/support/ExchangeHelper.java | 7 +++ .../org/apache/camel/support/NormalizedUri.java} | 29 ++++------ 16 files changed, 359 insertions(+), 161 deletions(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/CamelContext.java b/core/camel-api/src/main/java/org/apache/camel/CamelContext.java index 62e5c50..cc63828 100644 --- a/core/camel-api/src/main/java/org/apache/camel/CamelContext.java +++ b/core/camel-api/src/main/java/org/apache/camel/CamelContext.java @@ -16,7 +16,11 @@ */ package org.apache.camel; -import java.util.*; +import java.util.Collection; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Set; import org.apache.camel.spi.CamelContextNameStrategy; import org.apache.camel.spi.ClassResolver; @@ -434,8 +438,6 @@ public interface CamelContext extends StatefulService, RuntimeConfiguration { * * @param uri the URI of the endpoint * @return the endpoint - * - * @see #getPrototypeEndpoint(String) */ Endpoint getEndpoint(String uri); diff --git a/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java b/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java index 970d466..60c5458 100644 --- a/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java +++ b/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java @@ -46,6 +46,7 @@ import org.apache.camel.spi.ManagementMBeanAssembler; import org.apache.camel.spi.ModelJAXBContextFactory; import org.apache.camel.spi.ModelToXMLDumper; import org.apache.camel.spi.NodeIdFactory; +import org.apache.camel.spi.NormalizedEndpointUri; import org.apache.camel.spi.PackageScanClassResolver; import org.apache.camel.spi.PackageScanResourceResolver; import org.apache.camel.spi.ProcessorFactory; @@ -132,6 +133,50 @@ public interface ExtendedCamelContext extends CamelContext { Endpoint getPrototypeEndpoint(String uri); /** + * Resolves the given name to an {@link Endpoint} of the specified type (scope is prototype). + * If the name has a singleton endpoint registered, then the singleton is returned. + * Otherwise, a new {@link Endpoint} is created. + * + * The endpoint is NOT registered in the {@link org.apache.camel.spi.EndpointRegistry} as its prototype scoped, + * and therefore expected to be short lived and discarded after use (you must stop and shutdown the + * endpoint when no longer in use). + * + * @param uri the URI of the endpoint + * @return the endpoint + * + * @see #getEndpoint(String) + */ + Endpoint getPrototypeEndpoint(NormalizedEndpointUri uri); + + /** + * Is the given endpoint already registered in the {@link org.apache.camel.spi.EndpointRegistry} + * + * @param uri the URI of the endpoint + * @return the registered endpoint or <tt>null</tt> if not registered + */ + Endpoint hasEndpoint(NormalizedEndpointUri uri); + + /** + * Resolves the given name to an {@link Endpoint} of the specified type. + * If the name has a singleton endpoint registered, then the singleton is returned. + * Otherwise, a new {@link Endpoint} is created and registered in the {@link org.apache.camel.spi.EndpointRegistry}. + * + * @param uri the URI of the endpoint + * @return the endpoint + * + * @see #getPrototypeEndpoint(String) + */ + Endpoint getEndpoint(NormalizedEndpointUri uri); + + /** + * Normalizes the given uri. + * + * @param uri the uri + * @return a normalized uri + */ + NormalizedEndpointUri normalizeUri(String uri); + + /** * Returns the order in which the route inputs was started. * <p/> * The order may not be according to the startupOrder defined on the route. diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/EndpointKey.java b/core/camel-api/src/main/java/org/apache/camel/spi/NormalizedEndpointUri.java similarity index 55% copy from core/camel-base/src/main/java/org/apache/camel/impl/engine/EndpointKey.java copy to core/camel-api/src/main/java/org/apache/camel/spi/NormalizedEndpointUri.java index 8981d44..99cae35 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/EndpointKey.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/NormalizedEndpointUri.java @@ -14,32 +14,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.camel.impl.engine; - -import org.apache.camel.ValueHolder; -import org.apache.camel.util.StringHelper; +package org.apache.camel.spi; /** - * Key used in {@link DefaultEndpointRegistry} in {@link AbstractCamelContext}, - * to ensure a consistent lookup. + * An Uri which has been normalized. + * <p/> + * This is intended for internal optimizations or third party EIP or component implementations + * that can be optimized to pre normalize endpoints under certain use-cases. */ -public final class EndpointKey extends ValueHolder<String> { - - public EndpointKey(String uri) { - this(uri, false); - } +public interface NormalizedEndpointUri { /** - * Optimized when the uri is already normalized. + * Gets the normalized uri */ - public EndpointKey(String uri, boolean normalized) { - super(normalized ? uri : AbstractCamelContext.normalizeEndpointUri(uri)); - StringHelper.notEmpty(uri, "uri"); - } - - @Override - public String toString() { - return get(); - } + String getUri(); } diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java index bd95fb1..1094d8f 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java @@ -119,6 +119,7 @@ import org.apache.camel.spi.MessageHistoryFactory; import org.apache.camel.spi.ModelJAXBContextFactory; import org.apache.camel.spi.ModelToXMLDumper; import org.apache.camel.spi.NodeIdFactory; +import org.apache.camel.spi.NormalizedEndpointUri; import org.apache.camel.spi.PackageScanClassResolver; import org.apache.camel.spi.PackageScanResourceResolver; import org.apache.camel.spi.ProcessorFactory; @@ -149,6 +150,7 @@ import org.apache.camel.support.CamelContextHelper; import org.apache.camel.support.EndpointHelper; import org.apache.camel.support.EventHelper; import org.apache.camel.support.LRUCacheFactory; +import org.apache.camel.support.NormalizedUri; import org.apache.camel.support.OrderedComparator; import org.apache.camel.support.ProcessorEndpoint; import org.apache.camel.support.jsse.SSLContextParameters; @@ -682,6 +684,20 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Ext } @Override + public Endpoint hasEndpoint(NormalizedEndpointUri uri) { + if (endpoints.isEmpty()) { + return null; + } + EndpointKey key; + if (uri instanceof EndpointKey) { + key = (EndpointKey) uri; + } else { + key = new EndpointKey(uri.getUri(), true); + } + return endpoints.get(key); + } + + @Override public Endpoint addEndpoint(String uri, Endpoint endpoint) throws Exception { Endpoint oldEndpoint; @@ -736,16 +752,37 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Ext } @Override + public NormalizedEndpointUri normalizeUri(String uri) { + try { + uri = resolvePropertyPlaceholders(uri); + uri = normalizeEndpointUri(uri); + return new NormalizedUri(uri); + } catch (Exception e) { + throw new ResolveEndpointFailedException(uri, e); + } + } + + @Override public Endpoint getEndpoint(String uri) { - return doGetEndpoint(uri, false); + return doGetEndpoint(uri, false, false); + } + + @Override + public Endpoint getEndpoint(NormalizedEndpointUri uri) { + return doGetEndpoint(uri.getUri(), true, false); } @Override public Endpoint getPrototypeEndpoint(String uri) { - return doGetEndpoint(uri, true); + return doGetEndpoint(uri, false, true); + } + + @Override + public Endpoint getPrototypeEndpoint(NormalizedEndpointUri uri) { + return doGetEndpoint(uri.getUri(), true, true); } - protected Endpoint doGetEndpoint(String uri, boolean prototype) { + protected Endpoint doGetEndpoint(String uri, boolean normalized, boolean prototype) { // ensure CamelContext are initialized before we can get an endpoint init(); @@ -755,17 +792,21 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Ext // in case path has property placeholders then try to let property // component resolve those - try { - uri = resolvePropertyPlaceholders(uri); - } catch (Exception e) { - throw new ResolveEndpointFailedException(uri, e); + if (!normalized) { + try { + uri = resolvePropertyPlaceholders(uri); + } catch (Exception e) { + throw new ResolveEndpointFailedException(uri, e); + } } final String rawUri = uri; // normalize uri so we can do endpoint hits with minor mistakes and // parameters is not in the same order - uri = normalizeEndpointUri(uri); + if (!normalized) { + uri = normalizeEndpointUri(uri); + } LOG.trace("Getting endpoint with raw uri: {}, normalized uri: {}", rawUri, uri); diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/EndpointKey.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/EndpointKey.java index 8981d44..46be51c 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/EndpointKey.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/EndpointKey.java @@ -17,13 +17,14 @@ package org.apache.camel.impl.engine; import org.apache.camel.ValueHolder; +import org.apache.camel.spi.NormalizedEndpointUri; import org.apache.camel.util.StringHelper; /** * Key used in {@link DefaultEndpointRegistry} in {@link AbstractCamelContext}, * to ensure a consistent lookup. */ -public final class EndpointKey extends ValueHolder<String> { +public final class EndpointKey extends ValueHolder<String> implements NormalizedEndpointUri { public EndpointKey(String uri) { this(uri, false); @@ -38,6 +39,11 @@ public final class EndpointKey extends ValueHolder<String> { } @Override + public String getUri() { + return get(); + } + + @Override public String toString() { return get(); } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java b/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java index d61550c..d29a42b 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java @@ -27,12 +27,14 @@ import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.Expression; +import org.apache.camel.ExtendedCamelContext; import org.apache.camel.ExtendedExchange; import org.apache.camel.NoTypeConversionAvailableException; import org.apache.camel.impl.engine.DefaultProducerCache; import org.apache.camel.impl.engine.EmptyProducerCache; import org.apache.camel.spi.EndpointUtilizationStatistics; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.NormalizedEndpointUri; import org.apache.camel.spi.ProducerCache; import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.AsyncProcessorConverterHelper; @@ -180,6 +182,7 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA boolean prototype = cacheSize < 0; try { recipient = expression.evaluate(exchange, Object.class); + recipient = prepareRecipient(exchange, recipient); Endpoint existing = getExistingEndpoint(exchange, recipient); if (existing == null) { endpoint = resolveEndpoint(exchange, recipient, prototype); @@ -329,37 +332,47 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA return true; } - protected static Endpoint getExistingEndpoint(Exchange exchange, Object recipient) throws NoTypeConversionAvailableException { - // trim strings as end users might have added spaces between separators - if (recipient instanceof Endpoint) { - return (Endpoint) recipient; + protected static Object prepareRecipient(Exchange exchange, Object recipient) throws NoTypeConversionAvailableException { + if (recipient instanceof Endpoint || recipient instanceof NormalizedEndpointUri) { + return recipient; } else if (recipient instanceof String) { + // trim strings as end users might have added spaces between separators recipient = ((String) recipient).trim(); } if (recipient != null) { - // convert to a string type we can work with - String uri = exchange.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient); - return exchange.getContext().hasEndpoint(uri); + ExtendedCamelContext ecc = (ExtendedCamelContext) exchange.getContext(); + String uri; + if (recipient instanceof String) { + uri = (String) recipient; + } else { + // convert to a string type we can work with + uri = ecc.getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient); + } + // optimize and normalize endpoint + return ecc.normalizeUri(uri); } return null; } - protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient, boolean prototype) throws NoTypeConversionAvailableException { - // trim strings as end users might have added spaces between separators - if (recipient instanceof String) { - recipient = ((String) recipient).trim(); - } else if (recipient instanceof Endpoint) { + protected static Endpoint getExistingEndpoint(Exchange exchange, Object recipient) { + if (recipient instanceof Endpoint) { return (Endpoint) recipient; - } else if (recipient != null) { - // convert to a string type we can work with - recipient = exchange.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient); } - if (recipient != null) { - return prototype ? ExchangeHelper.resolvePrototypeEndpoint(exchange, recipient) : ExchangeHelper.resolveEndpoint(exchange, recipient); - } else { - return null; + if (recipient instanceof NormalizedEndpointUri) { + NormalizedEndpointUri nu = (NormalizedEndpointUri) recipient; + ExtendedCamelContext ecc = (ExtendedCamelContext) exchange.getContext(); + return ecc.hasEndpoint(nu); + } else { + String uri = recipient.toString(); + return exchange.getContext().hasEndpoint(uri); + } } + return null; + } + + protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient, boolean prototype) { + return prototype ? ExchangeHelper.resolvePrototypeEndpoint(exchange, recipient) : ExchangeHelper.resolveEndpoint(exchange, recipient); } /** diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/PollEnricher.java b/core/camel-base/src/main/java/org/apache/camel/processor/PollEnricher.java index de1fea9..a608b90 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/PollEnricher.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/PollEnricher.java @@ -25,6 +25,7 @@ import org.apache.camel.Consumer; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Expression; +import org.apache.camel.ExtendedCamelContext; import org.apache.camel.ExtendedExchange; import org.apache.camel.NoTypeConversionAvailableException; import org.apache.camel.PollingConsumer; @@ -33,6 +34,7 @@ import org.apache.camel.spi.ConsumerCache; import org.apache.camel.spi.EndpointUtilizationStatistics; import org.apache.camel.spi.ExceptionHandler; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.NormalizedEndpointUri; import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.AsyncProcessorSupport; import org.apache.camel.support.BridgeExceptionHandlerToErrorHandler; @@ -213,6 +215,7 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout boolean prototype = cacheSize < 0; try { recipient = expression.evaluate(exchange, Object.class); + recipient = prepareRecipient(exchange, recipient); Endpoint existing = getExistingEndpoint(exchange, recipient); if (existing == null) { endpoint = resolveEndpoint(exchange, recipient, prototype); @@ -345,37 +348,47 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout return true; } - protected static Endpoint getExistingEndpoint(Exchange exchange, Object recipient) throws NoTypeConversionAvailableException { - // trim strings as end users might have added spaces between separators - if (recipient instanceof Endpoint) { - return (Endpoint) recipient; + protected static Object prepareRecipient(Exchange exchange, Object recipient) throws NoTypeConversionAvailableException { + if (recipient instanceof Endpoint || recipient instanceof NormalizedEndpointUri) { + return recipient; } else if (recipient instanceof String) { + // trim strings as end users might have added spaces between separators recipient = ((String) recipient).trim(); } if (recipient != null) { - // convert to a string type we can work with - String uri = exchange.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient); - return exchange.getContext().hasEndpoint(uri); + ExtendedCamelContext ecc = (ExtendedCamelContext) exchange.getContext(); + String uri; + if (recipient instanceof String) { + uri = (String) recipient; + } else { + // convert to a string type we can work with + uri = ecc.getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient); + } + // optimize and normalize endpoint + return ecc.normalizeUri(uri); } return null; } - protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient, boolean prototype) throws NoTypeConversionAvailableException { - // trim strings as end users might have added spaces between separators - if (recipient instanceof String) { - recipient = ((String) recipient).trim(); - } else if (recipient instanceof Endpoint) { + protected static Endpoint getExistingEndpoint(Exchange exchange, Object recipient) { + if (recipient instanceof Endpoint) { return (Endpoint) recipient; - } else if (recipient != null) { - // convert to a string type we can work with - recipient = exchange.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient); } - if (recipient != null) { - return prototype ? ExchangeHelper.resolvePrototypeEndpoint(exchange, recipient) : ExchangeHelper.resolveEndpoint(exchange, recipient); - } else { - return null; + if (recipient instanceof NormalizedEndpointUri) { + NormalizedEndpointUri nu = (NormalizedEndpointUri) recipient; + ExtendedCamelContext ecc = (ExtendedCamelContext) exchange.getContext(); + return ecc.hasEndpoint(nu); + } else { + String uri = recipient.toString(); + return exchange.getContext().hasEndpoint(uri); + } } + return null; + } + + protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient, boolean prototype) { + return prototype ? ExchangeHelper.resolvePrototypeEndpoint(exchange, recipient) : ExchangeHelper.resolveEndpoint(exchange, recipient); } /** diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/RecipientListProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/RecipientListProcessor.java index 3eb6352..66a22d3 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/RecipientListProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/RecipientListProcessor.java @@ -31,10 +31,12 @@ import org.apache.camel.Endpoint; import org.apache.camel.ErrorHandlerFactory; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; +import org.apache.camel.ExtendedCamelContext; import org.apache.camel.ExtendedExchange; import org.apache.camel.NoTypeConversionAvailableException; import org.apache.camel.Processor; import org.apache.camel.Producer; +import org.apache.camel.spi.NormalizedEndpointUri; import org.apache.camel.spi.ProducerCache; import org.apache.camel.spi.RouteContext; import org.apache.camel.support.AsyncProcessorConverterHelper; @@ -210,6 +212,7 @@ public class RecipientListProcessor extends MulticastProcessor { Producer producer; ExchangePattern pattern; try { + recipient = prepareRecipient(exchange, recipient); Endpoint existing = getExistingEndpoint(exchange, recipient); if (existing == null) { endpoint = resolveEndpoint(exchange, recipient, prototype); @@ -291,41 +294,62 @@ public class RecipientListProcessor extends MulticastProcessor { return answer; } - protected static Endpoint getExistingEndpoint(Exchange exchange, Object recipient) throws NoTypeConversionAvailableException { - // trim strings as end users might have added spaces between separators - if (recipient instanceof Endpoint) { - return (Endpoint) recipient; + protected static Object prepareRecipient(Exchange exchange, Object recipient) throws NoTypeConversionAvailableException { + if (recipient instanceof Endpoint || recipient instanceof NormalizedEndpointUri) { + return recipient; } else if (recipient instanceof String) { + // trim strings as end users might have added spaces between separators recipient = ((String) recipient).trim(); } if (recipient != null) { - // convert to a string type we can work with - String uri = exchange.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient); - return exchange.getContext().hasEndpoint(uri); + ExtendedCamelContext ecc = (ExtendedCamelContext) exchange.getContext(); + String uri; + if (recipient instanceof String) { + uri = (String) recipient; + } else { + // convert to a string type we can work with + uri = ecc.getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient); + } + // optimize and normalize endpoint + return ecc.normalizeUri(uri); } return null; } - protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient, boolean prototype) { - // trim strings as end users might have added spaces between separators - if (recipient instanceof String) { - recipient = ((String) recipient).trim(); + protected static Endpoint getExistingEndpoint(Exchange exchange, Object recipient) { + if (recipient instanceof Endpoint) { + return (Endpoint) recipient; } if (recipient != null) { - return prototype ? ExchangeHelper.resolvePrototypeEndpoint(exchange, recipient) : ExchangeHelper.resolveEndpoint(exchange, recipient); - } else { - return null; + if (recipient instanceof NormalizedEndpointUri) { + NormalizedEndpointUri nu = (NormalizedEndpointUri) recipient; + ExtendedCamelContext ecc = (ExtendedCamelContext) exchange.getContext(); + return ecc.hasEndpoint(nu); + } else { + String uri = recipient.toString().trim(); + return exchange.getContext().hasEndpoint(uri); + } } + return null; + } + + protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient, boolean prototype) { + return prototype ? ExchangeHelper.resolvePrototypeEndpoint(exchange, recipient) : ExchangeHelper.resolveEndpoint(exchange, recipient); } - protected ExchangePattern resolveExchangePattern(Object recipient) throws UnsupportedEncodingException, URISyntaxException, MalformedURLException { - // trim strings as end users might have added spaces between separators - if (recipient instanceof String) { - String s = ((String) recipient).trim(); - // see if exchangePattern is a parameter in the url - s = URISupport.normalizeUri(s); + protected ExchangePattern resolveExchangePattern(Object recipient) { + String s = null; + + if (recipient instanceof NormalizedEndpointUri) { + s = ((NormalizedEndpointUri) recipient).getUri(); + } else if (recipient instanceof String) { + // trim strings as end users might have added spaces between separators + s = ((String) recipient).trim(); + } + if (s != null) { return EndpointHelper.resolveExchangePatternFromUrl(s); } + return null; } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/RoutingSlip.java b/core/camel-base/src/main/java/org/apache/camel/processor/RoutingSlip.java index e34480a..d88d9c7 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/RoutingSlip.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/RoutingSlip.java @@ -24,6 +24,7 @@ import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Expression; +import org.apache.camel.ExtendedCamelContext; import org.apache.camel.FailedToCreateProducerException; import org.apache.camel.Message; import org.apache.camel.NoTypeConversionAvailableException; @@ -32,6 +33,7 @@ import org.apache.camel.impl.engine.DefaultProducerCache; import org.apache.camel.impl.engine.EmptyProducerCache; import org.apache.camel.spi.EndpointUtilizationStatistics; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.NormalizedEndpointUri; import org.apache.camel.spi.ProducerCache; import org.apache.camel.spi.RouteContext; import org.apache.camel.spi.RouteIdAware; @@ -246,6 +248,7 @@ public class RoutingSlip extends AsyncProcessorSupport implements Traceable, IdA Endpoint endpoint; try { Object recipient = iter.next(exchange); + recipient = prepareRecipient(exchange, recipient); Endpoint existing = getExistingEndpoint(exchange, recipient); if (existing == null) { endpoint = resolveEndpoint(exchange, recipient, prototype); @@ -315,17 +318,41 @@ public class RoutingSlip extends AsyncProcessorSupport implements Traceable, IdA return true; } - protected static Endpoint getExistingEndpoint(Exchange exchange, Object recipient) throws NoTypeConversionAvailableException { - // trim strings as end users might have added spaces between separators - if (recipient instanceof Endpoint) { - return (Endpoint) recipient; + protected static Object prepareRecipient(Exchange exchange, Object recipient) throws NoTypeConversionAvailableException { + if (recipient instanceof Endpoint || recipient instanceof NormalizedEndpointUri) { + return recipient; } else if (recipient instanceof String) { + // trim strings as end users might have added spaces between separators recipient = ((String) recipient).trim(); } if (recipient != null) { - // convert to a string type we can work with - String uri = exchange.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient); - return exchange.getContext().hasEndpoint(uri); + ExtendedCamelContext ecc = (ExtendedCamelContext) exchange.getContext(); + String uri; + if (recipient instanceof String) { + uri = (String) recipient; + } else { + // convert to a string type we can work with + uri = ecc.getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient); + } + // optimize and normalize endpoint + return ecc.normalizeUri(uri); + } + return null; + } + + protected static Endpoint getExistingEndpoint(Exchange exchange, Object recipient) { + if (recipient instanceof Endpoint) { + return (Endpoint) recipient; + } + if (recipient != null) { + if (recipient instanceof NormalizedEndpointUri) { + NormalizedEndpointUri nu = (NormalizedEndpointUri) recipient; + ExtendedCamelContext ecc = (ExtendedCamelContext) exchange.getContext(); + return ecc.hasEndpoint(nu); + } else { + String uri = recipient.toString(); + return exchange.getContext().hasEndpoint(uri); + } } return null; } @@ -448,6 +475,7 @@ public class RoutingSlip extends AsyncProcessorSupport implements Traceable, IdA boolean prototype = cacheSize < 0; try { Object recipient = iter.next(ex); + recipient = prepareRecipient(exchange, recipient); Endpoint existing = getExistingEndpoint(exchange, recipient); if (existing == null) { nextEndpoint = resolveEndpoint(exchange, recipient, prototype); diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java index 11fa8ef..61e8d7c 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java @@ -23,6 +23,7 @@ import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.Expression; +import org.apache.camel.ExtendedCamelContext; import org.apache.camel.NoTypeConversionAvailableException; import org.apache.camel.Processor; import org.apache.camel.ResolveEndpointFailedException; @@ -30,6 +31,7 @@ import org.apache.camel.impl.engine.DefaultProducerCache; import org.apache.camel.impl.engine.EmptyProducerCache; import org.apache.camel.spi.EndpointUtilizationStatistics; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.NormalizedEndpointUri; import org.apache.camel.spi.ProducerCache; import org.apache.camel.spi.RouteIdAware; import org.apache.camel.spi.SendDynamicAware; @@ -136,6 +138,15 @@ public class SendDynamicProcessor extends AsyncProcessorSupport implements IdAwa } } Object targetRecipient = staticUri != null ? staticUri : recipient; + targetRecipient = prepareRecipient(exchange, targetRecipient); + if (targetRecipient == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Send dynamic evaluated as null so cannot send to any endpoint"); + } + // no endpoint to send to, so ignore + callback.done(true); + return true; + } Endpoint existing = getExistingEndpoint(exchange, targetRecipient); if (existing == null) { endpoint = resolveEndpoint(exchange, targetRecipient, prototype); @@ -144,14 +155,6 @@ public class SendDynamicProcessor extends AsyncProcessorSupport implements IdAwa // we have an existing endpoint then its not a prototype scope prototype = false; } - if (endpoint == null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Send dynamic evaluated as null so cannot send to any endpoint"); - } - // no endpoint to send to, so ignore - callback.done(true); - return true; - } destinationExchangePattern = EndpointHelper.resolveExchangePatternFromUrl(endpoint.getEndpointUri()); } catch (Throwable e) { if (isIgnoreInvalidEndpoint()) { @@ -238,37 +241,47 @@ public class SendDynamicProcessor extends AsyncProcessorSupport implements IdAwa return ExchangeHelper.resolveScheme(uri); } - protected static Endpoint getExistingEndpoint(Exchange exchange, Object recipient) throws NoTypeConversionAvailableException { - // trim strings as end users might have added spaces between separators - if (recipient instanceof Endpoint) { - return (Endpoint) recipient; + protected static Object prepareRecipient(Exchange exchange, Object recipient) throws NoTypeConversionAvailableException { + if (recipient instanceof Endpoint || recipient instanceof NormalizedEndpointUri) { + return recipient; } else if (recipient instanceof String) { + // trim strings as end users might have added spaces between separators recipient = ((String) recipient).trim(); } if (recipient != null) { - // convert to a string type we can work with - String uri = exchange.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient); - return exchange.getContext().hasEndpoint(uri); + ExtendedCamelContext ecc = (ExtendedCamelContext) exchange.getContext(); + String uri; + if (recipient instanceof String) { + uri = (String) recipient; + } else { + // convert to a string type we can work with + uri = ecc.getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient); + } + // optimize and normalize endpoint + return ecc.normalizeUri(uri); } return null; } - protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient, boolean prototype) throws NoTypeConversionAvailableException { - // trim strings as end users might have added spaces between separators - if (recipient instanceof String) { - recipient = ((String) recipient).trim(); - } else if (recipient instanceof Endpoint) { + protected static Endpoint getExistingEndpoint(Exchange exchange, Object recipient) { + if (recipient instanceof Endpoint) { return (Endpoint) recipient; - } else if (recipient != null) { - // convert to a string type we can work with - recipient = exchange.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient); } - if (recipient != null) { - return prototype ? ExchangeHelper.resolvePrototypeEndpoint(exchange, recipient) : ExchangeHelper.resolveEndpoint(exchange, recipient); - } else { - return null; + if (recipient instanceof NormalizedEndpointUri) { + NormalizedEndpointUri nu = (NormalizedEndpointUri) recipient; + ExtendedCamelContext ecc = (ExtendedCamelContext) exchange.getContext(); + return ecc.hasEndpoint(nu); + } else { + String uri = recipient.toString(); + return exchange.getContext().hasEndpoint(uri); + } } + return null; + } + + protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient, boolean prototype) { + return prototype ? ExchangeHelper.resolvePrototypeEndpoint(exchange, recipient) : ExchangeHelper.resolveEndpoint(exchange, recipient); } protected Exchange configureExchange(Exchange exchange, ExchangePattern pattern, Endpoint endpoint) { diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/SendProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/SendProcessor.java index 9c5d26e..1abbc08 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/SendProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/SendProcessor.java @@ -71,12 +71,8 @@ public class SendProcessor extends AsyncProcessorSupport implements Traceable, E this.destination = destination; this.camelContext = (ExtendedCamelContext) destination.getCamelContext(); this.pattern = pattern; - try { - this.destinationExchangePattern = null; - this.destinationExchangePattern = EndpointHelper.resolveExchangePatternFromUrl(destination.getEndpointUri()); - } catch (URISyntaxException e) { - throw RuntimeCamelException.wrapRuntimeCamelException(e); - } + this.destinationExchangePattern = null; + this.destinationExchangePattern = EndpointHelper.resolveExchangePatternFromUrl(destination.getEndpointUri()); ObjectHelper.notNull(this.camelContext, "camelContext"); } diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/engine/DefaultCamelContextTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/engine/DefaultCamelContextTest.java index 9a9c5a4..01a29db 100644 --- a/core/camel-core/src/test/java/org/apache/camel/impl/engine/DefaultCamelContextTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/impl/engine/DefaultCamelContextTest.java @@ -119,7 +119,7 @@ public class DefaultCamelContextTest extends TestSupport { assertNotNull(endpoint); try { - ctx.getEndpoint(null); + ctx.getEndpoint((String) null); fail("Should have thrown exception"); } catch (IllegalArgumentException e) { // expected @@ -280,7 +280,7 @@ public class DefaultCamelContextTest extends TestSupport { assertEquals(1, map.size()); try { - ctx.hasEndpoint(null); + ctx.hasEndpoint((String) null); fail("Should have thrown exception"); } catch (ResolveEndpointFailedException e) { // expected diff --git a/core/camel-support/src/main/java/org/apache/camel/support/CamelContextHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/CamelContextHelper.java index 1983d9a..08f1b46 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/CamelContextHelper.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/CamelContextHelper.java @@ -30,6 +30,7 @@ import org.apache.camel.NamedNode; import org.apache.camel.NoSuchBeanException; import org.apache.camel.NoSuchEndpointException; import org.apache.camel.NoTypeConversionAvailableException; +import org.apache.camel.spi.NormalizedEndpointUri; import org.apache.camel.spi.RouteStartupOrder; import org.apache.camel.util.ObjectHelper; @@ -63,6 +64,21 @@ public final class CamelContextHelper { } /** + * Returns the mandatory endpoint for the given URI or the + * {@link org.apache.camel.NoSuchEndpointException} is thrown + */ + public static Endpoint getMandatoryEndpoint(CamelContext camelContext, NormalizedEndpointUri uri) + throws NoSuchEndpointException { + ExtendedCamelContext ecc = (ExtendedCamelContext) camelContext; + Endpoint endpoint = ecc.getEndpoint(uri); + if (endpoint == null) { + throw new NoSuchEndpointException(uri.getUri()); + } else { + return endpoint; + } + } + + /** * Returns the mandatory endpoint (prototype scope) for the given URI or the * {@link org.apache.camel.NoSuchEndpointException} is thrown */ @@ -78,6 +94,21 @@ public final class CamelContextHelper { } /** + * Returns the mandatory endpoint (prototype scope) for the given URI or the + * {@link org.apache.camel.NoSuchEndpointException} is thrown + */ + public static Endpoint getMandatoryPrototypeEndpoint(CamelContext camelContext, NormalizedEndpointUri uri) + throws NoSuchEndpointException { + ExtendedCamelContext ecc = (ExtendedCamelContext) camelContext; + Endpoint endpoint = ecc.getPrototypeEndpoint(uri); + if (endpoint == null) { + throw new NoSuchEndpointException(uri.getUri()); + } else { + return endpoint; + } + } + + /** * Returns the mandatory endpoint for the given URI and type or the * {@link org.apache.camel.NoSuchEndpointException} is thrown */ diff --git a/core/camel-support/src/main/java/org/apache/camel/support/EndpointHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/EndpointHelper.java index 675cd85..c41edd7 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/EndpointHelper.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/EndpointHelper.java @@ -40,6 +40,7 @@ import org.apache.camel.util.URISupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import static org.apache.camel.util.StringHelper.after; /** @@ -378,17 +379,15 @@ public final class EndpointHelper { * * @param url the url * @return the exchange pattern, or <tt>null</tt> if the url has no <tt>exchangePattern</tt> configured. - * @throws URISyntaxException is thrown if uri is invalid */ - public static ExchangePattern resolveExchangePatternFromUrl(String url) throws URISyntaxException { - int idx = url.indexOf("?"); - if (idx > 0) { - url = url.substring(idx + 1); - } - Map<String, Object> parameters = URISupport.parseQuery(url, true); - String pattern = (String) parameters.get("exchangePattern"); - if (pattern != null) { - return ExchangePattern.asEnum(pattern); + public static ExchangePattern resolveExchangePatternFromUrl(String url) { + // optimize to use simple string contains check + if (url.contains("exchangePattern=InOnly")) { + return ExchangePattern.InOnly; + } else if (url.contains("exchangePattern=InOut")) { + return ExchangePattern.InOut; + } else if (url.contains("exchangePattern=InOptionalOut")) { + return ExchangePattern.InOptionalOut; } return null; } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java index e208448..87b7655 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java @@ -49,6 +49,7 @@ import org.apache.camel.RuntimeCamelException; import org.apache.camel.TypeConversionException; import org.apache.camel.TypeConverter; import org.apache.camel.WrappedFile; +import org.apache.camel.spi.NormalizedEndpointUri; import org.apache.camel.spi.Synchronization; import org.apache.camel.spi.UnitOfWork; import org.apache.camel.util.IOHelper; @@ -95,6 +96,9 @@ public final class ExchangeHelper { Endpoint endpoint; if (value instanceof Endpoint) { endpoint = (Endpoint) value; + } else if (value instanceof NormalizedEndpointUri) { + NormalizedEndpointUri nu = (NormalizedEndpointUri) value; + endpoint = CamelContextHelper.getMandatoryEndpoint(exchange.getContext(), nu); } else { String uri = value.toString().trim(); endpoint = CamelContextHelper.getMandatoryEndpoint(exchange.getContext(), uri); @@ -119,6 +123,9 @@ public final class ExchangeHelper { Endpoint endpoint; if (value instanceof Endpoint) { endpoint = (Endpoint) value; + } else if (value instanceof NormalizedEndpointUri) { + NormalizedEndpointUri nu = (NormalizedEndpointUri) value; + endpoint = CamelContextHelper.getMandatoryPrototypeEndpoint(exchange.getContext(), nu); } else { String uri = value.toString().trim(); endpoint = CamelContextHelper.getMandatoryPrototypeEndpoint(exchange.getContext(), uri); diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/EndpointKey.java b/core/camel-support/src/main/java/org/apache/camel/support/NormalizedUri.java similarity index 57% copy from core/camel-base/src/main/java/org/apache/camel/impl/engine/EndpointKey.java copy to core/camel-support/src/main/java/org/apache/camel/support/NormalizedUri.java index 8981d44..e774a26 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/EndpointKey.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/NormalizedUri.java @@ -14,32 +14,25 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.camel.impl.engine; +package org.apache.camel.support; -import org.apache.camel.ValueHolder; -import org.apache.camel.util.StringHelper; +import org.apache.camel.spi.NormalizedEndpointUri; -/** - * Key used in {@link DefaultEndpointRegistry} in {@link AbstractCamelContext}, - * to ensure a consistent lookup. - */ -public final class EndpointKey extends ValueHolder<String> { +public final class NormalizedUri implements NormalizedEndpointUri { + + private final String uri; - public EndpointKey(String uri) { - this(uri, false); + public NormalizedUri(String uri) { + this.uri = uri; } - /** - * Optimized when the uri is already normalized. - */ - public EndpointKey(String uri, boolean normalized) { - super(normalized ? uri : AbstractCamelContext.normalizeEndpointUri(uri)); - StringHelper.notEmpty(uri, "uri"); + @Override + public String getUri() { + return uri; } @Override public String toString() { - return get(); + return uri; } - }