This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 0c0f70c CAMEL-12462 (#2302) 0c0f70c is described below commit 0c0f70cdda4cccd27c5bb00f0dca4e103bf5ac5e Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Thu Apr 26 10:38:35 2018 +0200 CAMEL-12462 (#2302) * CAMEL-12462: Optimise toD to allow components to be SendDynamicAware in their producers to use static endpoint and dynamic parts provided via headers (pre processor) * CAMEL-12462: Optimise toD to allow components to be SendDynamicAware in their producers to use static endpoint and dynamic parts provided via headers (pre processor) * CAMEL-12462: Optimise toD to allow components to be SendDynamicAware in their producers to use static endpoint and dynamic parts provided via headers (pre processor) * CAMEL-12462: Optimise toD to allow components to be SendDynamicAware in their producers to use static endpoint and dynamic parts provided via headers (pre processor) * CAMEL-12462: camel-http now supports optimised toD for all its lenient query parameters. * CAMEL-12462: camel-http now supports optimised toD for all its lenient query parameters. * CAMEL-12462: camel-http now supports optimised toD for all its lenient query parameters. * CAMEL-12462: camel-http4 now supports optimised toD for all its lenient query parameters. * CAMEL-12462: camel-undertow now supports optimised toD for all its lenient query parameters. * CAMEL-12462: camel-netty4-http now supports optimised toD for all its lenient query parameters. * CAMEL-12462: Optimise toD to allow components to be SendDynamicAware in their producers to use static endpoint and dynamic parts provided via headers (pre processor) * CAMEL-12462: Optimise toD to allow components to be SendDynamicAware in their producers to use static endpoint and dynamic parts provided via headers (pre processor) * Polished doc * CAMEL-12462: Optimise toD to allow components to be SendDynamicAware in their producers to use static endpoint and dynamic parts provided via headers (pre processor) * CAMEL-12462: Fixed typo. Thanks to Dmitry Volodin for spotting this. * CAMEL-12462: Better name thanks to Alex for suggestion. --- camel-core/src/main/docs/eips/toD-eip.adoc | 106 ++++++++++++- camel-core/src/main/docs/eips/wireTap-eip.adoc | 3 +- .../mbean/ManagedSendDynamicProcessorMBean.java | 6 + .../api/management/mbean/ManagedWireTapMBean.java | 3 + .../mbean/ManagedSendDynamicProcessor.java | 8 + .../management/mbean/ManagedWireTapProcessor.java | 4 + .../apache/camel/model/ProcessorDefinition.java | 18 +++ .../apache/camel/model/ToDynamicDefinition.java | 20 +++ .../org/apache/camel/model/WireTapDefinition.java | 2 +- .../camel/processor/SendDynamicAwareResolver.java | 69 +++++++++ .../camel/processor/SendDynamicProcessor.java | 125 +++++++++++++++- .../apache/camel/processor/WireTapProcessor.java | 8 +- .../org/apache/camel/spi/SendDynamicAware.java | 117 +++++++++++++++ .../java/org/apache/camel/util/ExchangeHelper.java | 18 +++ .../apache/camel/component/bar/BarComponent.java | 33 +++++ .../apache/camel/component/bar/BarConstants.java | 25 ++++ .../apache/camel/component/bar/BarEndpoint.java | 65 ++++++++ .../apache/camel/component/bar/BarProducer.java | 44 ++++++ .../camel/component/bar/BarSendDynamicAware.java | 79 ++++++++++ .../ManagedSendDynamicProcessorTest.java | 5 +- .../camel/management/ManagedWireTapTest.java | 5 +- .../processor/ToDynamicSendDynamicAwareTest.java | 54 +++++++ .../services/org/apache/camel/send-dynamic/bar | 18 +++ .../camel-headersmap/src/main/docs/headersmap.adoc | 6 +- .../camel/http/common/HttpSendDynamicAware.java | 165 +++++++++++++++++++++ .../http/common/HttpSendDynamicPostProcessor.java | 34 +++++ .../http/common/HttpSendDynamicPreProcessor.java | 49 ++++++ .../services/org/apache/camel/send-dynamic/http | 18 +++ .../services/org/apache/camel/send-dynamic/https | 18 +++ .../component/http/HttpSendDynamicAwareTest.java | 96 ++++++++++++ .../http/handler/BasicValidationHandler.java | 9 +- .../http/handler/DrinkValidationHandler.java | 40 +++++ .../services/org/apache/camel/send-dynamic/http4 | 18 +++ .../services/org/apache/camel/send-dynamic/https4 | 18 +++ .../component/http4/HttpSendDynamicAwareTest.java | 89 +++++++++++ .../http4/handler/BasicValidationHandler.java | 10 +- .../http4/handler/DrinkValidationHandler.java | 34 +++++ .../services/org/apache/camel/send-dynamic/jetty | 17 +++ .../JettyHttpProducerSendDynamicAwareTest.java | 59 ++++++++ .../netty4/http/NettyHttpSendDynamicAware.java | 40 +++++ .../org/apache/camel/send-dynamic/netty4-http | 17 +++ .../netty4/http/NettyHttpSendDynamicAwareTest.java | 58 ++++++++ .../org/apache/camel/send-dynamic/undertow | 18 +++ .../undertow/UndertowSendDynamicAwareTest.java | 59 ++++++++ 44 files changed, 1692 insertions(+), 15 deletions(-) diff --git a/camel-core/src/main/docs/eips/toD-eip.adoc b/camel-core/src/main/docs/eips/toD-eip.adoc index 21b08b8..f788864 100644 --- a/camel-core/src/main/docs/eips/toD-eip.adoc +++ b/camel-core/src/main/docs/eips/toD-eip.adoc @@ -10,7 +10,7 @@ the endpoint. === Options // eip options: START -The To D EIP supports 4 options which are listed below: +The To D EIP supports 5 options which are listed below: [width="100%",cols="2,5,^1,2",options="header"] |=== @@ -19,6 +19,7 @@ The To D EIP supports 4 options which are listed below: | *pattern* | Sets the optional ExchangePattern used to invoke this endpoint | | ExchangePattern | *cacheSize* | Sets the maximum size used by the org.apache.camel.impl.ConsumerCache which is used to cache and reuse producers. | | Integer | *ignoreInvalidEndpoint* | Ignore the invalidate endpoint exception when try to create a producer with that endpoint | false | Boolean +| *allowOptimisedComponents* | Whether to allow components to optimise toD if they are org.apache.camel.spi.SendDynamicAware. | true | Boolean |=== // eip options: END @@ -114,3 +115,106 @@ from("direct:start") You can concat as many languages as you want, just separate them with the plus sign + +=== Avoid creating endless dynamic endpoints which takes up resources + +When using dynamic computed endpoints with `toD` then you may compute a lot of dynamic endpoints, +which results in an overhead of resources in use, by each dynamic endpoint uri, and its associated producer. + +For example HTTP based endpoints where you may have dynamic values in URI parameters when calling the HTTP service, such as: + +[source,java] +---- +from("direct:login") + .toD("http:myloginserver:8080/login?userid=${header.userName}"); +---- + +In the example above then the parameter `userid` is dynamic computed, and would result in one instance of endpoint and producer +for each different userid. To avoid having too many dynamic endpoints you can configure `toD` to reduce its cache size, for example: + +[source,java] +---- +from("direct:login") + .toD("http:myloginserver:8080/login?cacheSize=10&userid=${header.userName}"); +---- + +where the cache is 10. *Important* this will only reduce the endpoint cache of the `toD` that has a chance +of being reused in case a message is routed with the same `userName` header. Therefore reducing the cache size +will not solve the _endless dynamic endoints_ problem. Instead you should use static endpoints with `to` and +provide the dynamic parts in Camel message headers (if possible). + +The example above can resolve via static endpoints, or from Camel 2.22 onwards via optimised components (see further below) + +==== Using static endpoints + +In the example above then the parameter `userid` is dynamic computed, and would result in one instance of endpoint and producer +for each different userid. To avoid having too dynamic endpoints you use a single static endpoint and use headers to provide the dynamic parts: + +[source,java] +---- +from("direct:login") + .setHeader(Exchange.HTTP_PATH, constant("/login")) + .setHeader(Exchange.HTTP_QUERY, simple("userid=${header.userName}")) + .toD("http:myloginserver:8080"); +---- + +If you are using Camel 2.22 onwards then you can use its optimised components for `toD` that can _solve_ this out of the box, +as documented next. + +=== Using optimised components + +But a better solution would be if the HTTP component could be optimised to handle the variations of dynamic computed endpoint uris. +This is now possible with *Camel 2.22* onwards, where the following components has been optimised for `toD`: + +- camel-http +- camel-http4 +- camel-jetty +- camel-netty4-http +- camel-undertow + +For the optimisation to work, then: + +1. The optimisation is detected and activated during startup of the Camel routes with `toD`'s. +2. The dynamic uri in `toD` must provide the component name as either static or resolved via property placeholders. +3. The supported components must be on the classpath. + +The HTTP based components will be optimised to use the same hostname:port for each endpoint, and the dynamic values +for context-path and query parameters will be provided as headers: + +For example this route: + +[source,java] +---- +from("direct:login") + .toD("http:myloginserver:8080/login?userid=${header.userName}"); +---- + +will essentially be optimised to (pseudo route): + +[source,java] +---- +from("direct:login") + .setHeader(Exchange.HTTP_PATH, expression("/login")) + .setHeader(Exchange.HTTP_QUERY, expression("userid=${header.userName}")) + .toD("http:myloginserver:8080") + .removeHeader(Exchange.HTTP_PATH) + .removeHeader(Exchange.HTTP_QUERY); +---- + +Where _expression_ will be evaluated dynamically. Notice how the uri in `toD` is now static (`http:myloginserver:8080`). +This optimisation allows Camel to reuse the same endpoint and its associated producer for all dynamic variations. +This yields much lower resource overhead as the same http producer will be used for all the different variations of userid's. + +NOTE: When the optimised component is in use, then you cannot use the headers `Exchange.HTTP_PATH` and `Exchange.HTTP_QUERY` +to provide dynamic values to override the uri in `toD`. If you want to use these headers, then use the plain `to` DSL instead. +In other words these headers are used internally by `toD` to carry the dynamic details of the endpoint. + +In case of problems then you can turn on DEBUG logging level on `org.apache.camel.processor.SendDynamicProcessor` which will log +during startup if `toD` was optimised, or if there was a failure loading the optimised component, with a stacktrace logged. + +[source,text] +---- +Detected SendDynamicAware component: http optimising toD: http:myloginserver:8080/login?userid=${header.userName} +---- + + diff --git a/camel-core/src/main/docs/eips/wireTap-eip.adoc b/camel-core/src/main/docs/eips/wireTap-eip.adoc index a5a9f52..275d930 100644 --- a/camel-core/src/main/docs/eips/wireTap-eip.adoc +++ b/camel-core/src/main/docs/eips/wireTap-eip.adoc @@ -18,7 +18,7 @@ at link:stream-caching.html[Stream caching]. === Options // eip options: START -The Wire Tap EIP supports 10 options which are listed below: +The Wire Tap EIP supports 11 options which are listed below: [width="100%",cols="2,5,^1,2",options="header"] |=== @@ -33,6 +33,7 @@ The Wire Tap EIP supports 10 options which are listed below: | *pattern* | Sets the optional ExchangePattern used to invoke this endpoint | | ExchangePattern | *cacheSize* | Sets the maximum size used by the org.apache.camel.impl.ConsumerCache which is used to cache and reuse producers. | | Integer | *ignoreInvalidEndpoint* | Ignore the invalidate endpoint exception when try to create a producer with that endpoint | false | Boolean +| *allowOptimisedComponents* | Whether to allow components to optimise toD if they are org.apache.camel.spi.SendDynamicAware. | true | Boolean |=== // eip options: END diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedSendDynamicProcessorMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedSendDynamicProcessorMBean.java index d880980..d0f9265 100644 --- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedSendDynamicProcessorMBean.java +++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedSendDynamicProcessorMBean.java @@ -35,6 +35,12 @@ public interface ManagedSendDynamicProcessorMBean extends ManagedProcessorMBean, @ManagedAttribute(description = "Ignore the invalidate endpoint exception when try to create a producer with that endpoint") Boolean isIgnoreInvalidEndpoint(); + @ManagedAttribute(description = "Whether to allow components to optimise toD if they are SendDynamicAware") + Boolean isAllowOptimisedComponents(); + + @ManagedAttribute(description = "Whether an optimised component (SendDynamicAware) is in use") + Boolean isOptimised(); + @ManagedOperation(description = "Statistics of the endpoints which has been sent to") TabularData extendedInformation(); diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedWireTapMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedWireTapMBean.java index d86ede7..0958d99 100644 --- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedWireTapMBean.java +++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedWireTapMBean.java @@ -35,6 +35,9 @@ public interface ManagedWireTapMBean extends ManagedProcessorMBean, ManagedExten @ManagedAttribute(description = "Uses a copy of the original exchange") Boolean isCopy(); + @ManagedAttribute(description = "Whether the uri is dynamic or static") + Boolean isDynamicUri(); + @ManagedAttribute(description = "Current size of inflight wire tapped exchanges.") Integer getTaskSize(); diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedSendDynamicProcessor.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedSendDynamicProcessor.java index 73f4cf1..d40dd8d 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedSendDynamicProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedSendDynamicProcessor.java @@ -95,6 +95,14 @@ public class ManagedSendDynamicProcessor extends ManagedProcessor implements Man return processor.isIgnoreInvalidEndpoint(); } + public Boolean isAllowOptimisedComponents() { + return processor.isAllowOptimisedComponents(); + } + + public Boolean isOptimised() { + return processor.getDynamicAware() != null; + } + @Override public TabularData extendedInformation() { try { diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedWireTapProcessor.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedWireTapProcessor.java index 8000799..c90ff4c 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedWireTapProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedWireTapProcessor.java @@ -91,6 +91,10 @@ public class ManagedWireTapProcessor extends ManagedProcessor implements Managed return processor.isCopy(); } + public Boolean isDynamicUri() { + return processor.isDynamicUri(); + } + public Integer getTaskSize() { return processor.getPendingExchangesSize(); } diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java index c5885e4..005270e 100644 --- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java @@ -654,6 +654,24 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> * Sends the exchange to the given dynamic endpoint * * @param uri the dynamic endpoint to send to (resolved using simple language by default) + * @param cacheSize sets the maximum size used by the {@link org.apache.camel.impl.ConsumerCache} which is used to cache and reuse producers. + * + * @return the builder + */ + @SuppressWarnings("unchecked") + public Type toD(@AsEndpointUri String uri, int cacheSize) { + ToDynamicDefinition answer = new ToDynamicDefinition(); + answer.setUri(uri); + answer.setCacheSize(cacheSize); + addOutput(answer); + return (Type) this; + } + + /** + * Sends the exchange to the given dynamic endpoint + * + * @param uri the dynamic endpoint to send to (resolved using simple language by default) + * @param ignoreInvalidEndpoint ignore the invalidate endpoint exception when try to create a producer with that endpoint * @return the builder */ @SuppressWarnings("unchecked") diff --git a/camel-core/src/main/java/org/apache/camel/model/ToDynamicDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ToDynamicDefinition.java index e640b77..ca19278 100644 --- a/camel-core/src/main/java/org/apache/camel/model/ToDynamicDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/ToDynamicDefinition.java @@ -60,6 +60,8 @@ public class ToDynamicDefinition extends NoOutputDefinition<ToDynamicDefinition> private Integer cacheSize; @XmlAttribute private Boolean ignoreInvalidEndpoint; + @XmlAttribute @Metadata(defaultValue = "true") + private Boolean allowOptimisedComponents; public ToDynamicDefinition() { } @@ -163,6 +165,16 @@ public class ToDynamicDefinition extends NoOutputDefinition<ToDynamicDefinition> return this; } + /** + * Whether to allow components to optimise toD if they are {@link org.apache.camel.spi.SendDynamicAware}. + * + * @return the builder + */ + public ToDynamicDefinition allowOptimisedComponents(boolean allowOptimisedComponents) { + setAllowOptimisedComponents(allowOptimisedComponents); + return this; + } + // Properties // ------------------------------------------------------------------------- @@ -201,6 +213,14 @@ public class ToDynamicDefinition extends NoOutputDefinition<ToDynamicDefinition> this.ignoreInvalidEndpoint = ignoreInvalidEndpoint; } + public Boolean getAllowOptimisedComponents() { + return allowOptimisedComponents; + } + + public void setAllowOptimisedComponents(Boolean allowOptimisedComponents) { + this.allowOptimisedComponents = allowOptimisedComponents; + } + // Utilities // ------------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java b/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java index 4d92407..4879621 100644 --- a/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java @@ -91,7 +91,7 @@ public class WireTapDefinition<Type extends ProcessorDefinition<Type>> extends T // is true bt default boolean isCopy = getCopy() == null || getCopy(); - WireTapProcessor answer = new WireTapProcessor(dynamicTo, internal, getPattern(), threadPool, shutdownThreadPool); + WireTapProcessor answer = new WireTapProcessor(dynamicTo, internal, getPattern(), threadPool, shutdownThreadPool, isDynamic()); answer.setCopy(isCopy); if (newExchangeProcessorRef != null) { newExchangeProcessor = routeContext.mandatoryLookup(newExchangeProcessorRef, Processor.class); diff --git a/camel-core/src/main/java/org/apache/camel/processor/SendDynamicAwareResolver.java b/camel-core/src/main/java/org/apache/camel/processor/SendDynamicAwareResolver.java new file mode 100644 index 0000000..7624c21 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/processor/SendDynamicAwareResolver.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import java.io.IOException; + +import org.apache.camel.CamelContext; +import org.apache.camel.spi.FactoryFinder; +import org.apache.camel.spi.SendDynamicAware; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SendDynamicAwareResolver { + + public static final String RESOURCE_PATH = "META-INF/services/org/apache/camel/send-dynamic/"; + + private static final Logger LOG = LoggerFactory.getLogger(SendDynamicAwareResolver.class); + + private FactoryFinder factoryFinder; + + public SendDynamicAware resolve(CamelContext context, String scheme) { + String name = scheme; + + // use factory finder to find a custom implementations + Class<?> type = null; + try { + type = findFactory(name, context); + } catch (Exception e) { + // ignore + } + + if (type != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Found SendDynamicAware: {} via: {}{}", type.getName(), factoryFinder.getResourcePath(), name); + } + if (SendDynamicAware.class.isAssignableFrom(type)) { + SendDynamicAware answer = (SendDynamicAware) context.getInjector().newInstance(type); + answer.setScheme(scheme); + return answer; + } else { + throw new IllegalArgumentException("Type is not a SendDynamicAware implementation. Found: " + type.getName()); + } + } + + return null; + } + + private Class<?> findFactory(String name, CamelContext context) throws ClassNotFoundException, IOException { + if (factoryFinder == null) { + factoryFinder = context.getFactoryFinder(RESOURCE_PATH); + } + return factoryFinder.findClass(name); + } + +} diff --git a/camel-core/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java index a43f2bf..2a6b241 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java @@ -26,16 +26,20 @@ import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.Expression; import org.apache.camel.NoTypeConversionAvailableException; +import org.apache.camel.Processor; import org.apache.camel.Producer; +import org.apache.camel.ResolveEndpointFailedException; import org.apache.camel.impl.EmptyProducerCache; import org.apache.camel.impl.ProducerCache; import org.apache.camel.spi.EndpointUtilizationStatistics; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.SendDynamicAware; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorHelper; import org.apache.camel.util.EndpointHelper; import org.apache.camel.util.ExchangeHelper; import org.apache.camel.util.ServiceHelper; +import org.apache.camel.util.URISupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,6 +50,7 @@ import org.slf4j.LoggerFactory; */ public class SendDynamicProcessor extends ServiceSupport implements AsyncProcessor, IdAware, CamelContextAware { protected static final Logger LOG = LoggerFactory.getLogger(SendDynamicProcessor.class); + protected SendDynamicAware dynamicAware; protected CamelContext camelContext; protected final String uri; protected final Expression expression; @@ -54,6 +59,7 @@ public class SendDynamicProcessor extends ServiceSupport implements AsyncProcess protected String id; protected boolean ignoreInvalidEndpoint; protected int cacheSize; + protected boolean allowOptimisedComponents = true; public SendDynamicProcessor(Expression expression) { this.uri = null; @@ -99,9 +105,34 @@ public class SendDynamicProcessor extends ServiceSupport implements AsyncProcess // use dynamic endpoint so calculate the endpoint to use Object recipient = null; + Processor preAwareProcessor = null; + Processor postAwareProcessor = null; + String staticUri = null; try { recipient = expression.evaluate(exchange, Object.class); - endpoint = resolveEndpoint(exchange, recipient); + if (dynamicAware != null) { + // if its the same scheme as the pre-resolved dynamic aware then we can optimise to use it + String uri = resolveUri(exchange, recipient); + String scheme = resolveScheme(exchange, uri); + if (dynamicAware.getScheme().equals(scheme)) { + SendDynamicAware.DynamicAwareEntry entry = dynamicAware.prepare(exchange, uri); + if (entry != null) { + staticUri = dynamicAware.resolveStaticUri(exchange, entry); + preAwareProcessor = dynamicAware.createPreProcessor(exchange, entry); + postAwareProcessor = dynamicAware.createPostProcessor(exchange, entry); + if (staticUri != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Optimising toD via SendDynamicAware component: {} to use static uri: {}", scheme, URISupport.sanitizeUri(staticUri)); + } + } + } + } + } + if (staticUri != null) { + endpoint = resolveEndpoint(exchange, staticUri); + } else { + endpoint = resolveEndpoint(exchange, recipient); + } if (endpoint == null) { if (LOG.isDebugEnabled()) { LOG.debug("Send dynamic evaluated as null so cannot send to any endpoint"); @@ -124,15 +155,37 @@ public class SendDynamicProcessor extends ServiceSupport implements AsyncProcess } // send the exchange to the destination using the producer cache + final Processor preProcessor = preAwareProcessor; + final Processor postProcessor = postAwareProcessor; return producerCache.doInAsyncProducer(endpoint, exchange, pattern, callback, new AsyncProducerCallback() { public boolean doInAsyncProducer(Producer producer, AsyncProcessor asyncProducer, final Exchange exchange, ExchangePattern pattern, final AsyncCallback callback) { final Exchange target = configureExchange(exchange, pattern, destinationExchangePattern, endpoint); + + try { + if (preProcessor != null) { + preProcessor.process(target); + } + } catch (Throwable e) { + exchange.setException(e); + // restore previous MEP + target.setPattern(existingPattern); + // we failed + callback.done(true); + } + LOG.debug(">>>> {} {}", endpoint, exchange); return asyncProducer.process(target, new AsyncCallback() { public void done(boolean doneSync) { // restore previous MEP target.setPattern(existingPattern); + try { + if (postProcessor != null) { + postProcessor.process(target); + } + } catch (Throwable e) { + target.setException(e); + } // signal we are done callback.done(doneSync); } @@ -141,6 +194,36 @@ public class SendDynamicProcessor extends ServiceSupport implements AsyncProcess }); } + protected static String resolveUri(Exchange exchange, Object recipient) throws NoTypeConversionAvailableException { + if (recipient == null) { + return null; + } + + String uri; + // trim strings as end users might have added spaces between separators + if (recipient instanceof String) { + uri = ((String) recipient).trim(); + } else if (recipient instanceof Endpoint) { + uri = ((Endpoint) recipient).getEndpointKey(); + } else { + // convert to a string type we can work with + uri = exchange.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient); + } + + // in case path has property placeholders then try to let property component resolve those + try { + uri = exchange.getContext().resolvePropertyPlaceholders(uri); + } catch (Exception e) { + throw new ResolveEndpointFailedException(uri, e); + } + + return uri; + } + + protected static String resolveScheme(Exchange exchange, String uri) { + return ExchangeHelper.resolveScheme(uri); + } + protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient) throws NoTypeConversionAvailableException { // trim strings as end users might have added spaces between separators if (recipient instanceof String) { @@ -184,7 +267,33 @@ public class SendDynamicProcessor extends ServiceSupport implements AsyncProcess LOG.debug("DynamicSendTo {} using ProducerCache with cacheSize={}", this, cacheSize); } } - ServiceHelper.startService(producerCache); + + if (isAllowOptimisedComponents() && uri != null) { + try { + // in case path has property placeholders then try to let property component resolve those + String u = camelContext.resolvePropertyPlaceholders(uri); + // find out which component it is + String scheme = ExchangeHelper.resolveScheme(u); + if (scheme != null) { + // find out if the component can be optimised for send-dynamic + SendDynamicAwareResolver resolver = new SendDynamicAwareResolver(); + dynamicAware = resolver.resolve(camelContext, scheme); + if (dynamicAware != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Detected SendDynamicAware component: {} optimising toD: {}", scheme, URISupport.sanitizeUri(uri)); + } + } + } + } catch (Throwable e) { + // ignore + if (LOG.isDebugEnabled()) { + LOG.debug("Error creating optimised SendDynamicAwareResolver for uri: " + URISupport.sanitizeUri(uri) + + " due to " + e.getMessage() + ". This exception is ignored", e); + } + } + } + + ServiceHelper.startServices(producerCache); } protected void doStop() throws Exception { @@ -203,6 +312,10 @@ public class SendDynamicProcessor extends ServiceSupport implements AsyncProcess this.camelContext = camelContext; } + public SendDynamicAware getDynamicAware() { + return dynamicAware; + } + public String getUri() { return uri; } @@ -234,4 +347,12 @@ public class SendDynamicProcessor extends ServiceSupport implements AsyncProcess public void setCacheSize(int cacheSize) { this.cacheSize = cacheSize; } + + public boolean isAllowOptimisedComponents() { + return allowOptimisedComponents; + } + + public void setAllowOptimisedComponents(boolean allowOptimisedComponents) { + this.allowOptimisedComponents = allowOptimisedComponents; + } } diff --git a/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java index 6feb4d2..63f7ae4 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java @@ -58,6 +58,7 @@ public class WireTapProcessor extends ServiceSupport implements AsyncProcessor, private CamelContext camelContext; private final SendDynamicProcessor dynamicProcessor; private final String uri; + private final boolean dynamicUri; private final Processor processor; private final ExchangePattern exchangePattern; private final ExecutorService executorService; @@ -72,7 +73,7 @@ public class WireTapProcessor extends ServiceSupport implements AsyncProcessor, private Processor onPrepare; public WireTapProcessor(SendDynamicProcessor dynamicProcessor, Processor processor, ExchangePattern exchangePattern, - ExecutorService executorService, boolean shutdownExecutorService) { + ExecutorService executorService, boolean shutdownExecutorService, boolean dynamicUri) { this.dynamicProcessor = dynamicProcessor; this.uri = dynamicProcessor.getUri(); this.processor = processor; @@ -80,6 +81,7 @@ public class WireTapProcessor extends ServiceSupport implements AsyncProcessor, ObjectHelper.notNull(executorService, "executorService"); this.executorService = executorService; this.shutdownExecutorService = shutdownExecutorService; + this.dynamicUri = dynamicUri; } @Override @@ -288,6 +290,10 @@ public class WireTapProcessor extends ServiceSupport implements AsyncProcessor, return dynamicProcessor.isIgnoreInvalidEndpoint(); } + public boolean isDynamicUri() { + return dynamicUri; + } + @Override protected void doStart() throws Exception { ServiceHelper.startService(processor); diff --git a/camel-core/src/main/java/org/apache/camel/spi/SendDynamicAware.java b/camel-core/src/main/java/org/apache/camel/spi/SendDynamicAware.java new file mode 100644 index 0000000..946d6b6 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/spi/SendDynamicAware.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spi; + +import java.util.Map; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.Producer; + +/** + * Used for components that can optimise the usage of {@link org.apache.camel.processor.SendDynamicProcessor} (toD) + * to reuse a static {@link org.apache.camel.Endpoint} and {@link Producer} that supports + * using headers to provide the dynamic parts. For example many of the HTTP components supports this. + */ +public interface SendDynamicAware { + + /** + * Sets the component name. + * + * @param scheme name of the component + */ + void setScheme(String scheme); + + /** + * Gets the component name + */ + String getScheme(); + + /** + * An entry of detailed information from the recipient uri, which allows the {@link SendDynamicAware} + * implementation to prepare pre- and post- processor and the static uri to be used for the optimised dynamic to. + */ + class DynamicAwareEntry { + + private final String originalUri; + private final Map<String, String> properties; + private final Map<String, String> lenientProperties; + + public DynamicAwareEntry(String originalUri, Map<String, String> properties, Map<String, String> lenientProperties) { + this.originalUri = originalUri; + this.properties = properties; + this.lenientProperties = lenientProperties; + } + + public String getOriginalUri() { + return originalUri; + } + + public Map<String, String> getProperties() { + return properties; + } + + public Map<String, String> getLenientProperties() { + return lenientProperties; + } + } + + /** + * Prepares for using optimised dynamic to by parsing the uri and returning an entry of details that are + * used for creating the pre and post processors, and the static uri. + * + * @param exchange the exchange + * @param uri the original uri + * @return prepared information about the dynamic endpoint to use + * @throws Exception is thrown if error parsing the uri + */ + DynamicAwareEntry prepare(Exchange exchange, String uri) throws Exception; + + /** + * Resolves the static part of the uri that are used for creating a single {@link org.apache.camel.Endpoint} + * and {@link Producer} that will be reused for processing the optimised toD. + * + * @param exchange the exchange + * @param entry prepared information about the dynamic endpoint to use + * @return the static uri, or <tt>null</tt> to not let toD use this optimisation. + * @throws Exception is thrown if error resolving the static uri. + */ + String resolveStaticUri(Exchange exchange, DynamicAwareEntry entry) throws Exception; + + /** + * Creates the pre {@link Processor} that will prepare the {@link Exchange} + * with dynamic details from the given recipient. + * + * @param exchange the exchange + * @param entry prepared information about the dynamic endpoint to use + * @return the processor, or <tt>null</tt> to not let toD use this optimisation. + * @throws Exception is thrown if error creating the pre processor. + */ + Processor createPreProcessor(Exchange exchange, DynamicAwareEntry entry) throws Exception; + + /** + * Creates an optional post {@link Processor} that will be executed afterwards + * when the message has been sent dynamic. + * + * @param exchange the exchange + * @param entry prepared information about the dynamic endpoint to use + * @return the post processor, or <tt>null</tt> if no post processor is needed. + * @throws Exception is thrown if error creating the post processor. + */ + Processor createPostProcessor(Exchange exchange, DynamicAwareEntry entry) throws Exception; + +} diff --git a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java index 8241793..da3308a 100644 --- a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java +++ b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java @@ -959,6 +959,24 @@ public final class ExchangeHelper { return answer; } + /** + * Resolve the component scheme (aka name) from the given endpoint uri + * + * @param uri the endpoint uri + * @return the component scheme (name), or <tt>null</tt> if not possible to resolve + */ + public static String resolveScheme(String uri) { + String scheme = null; + if (uri != null) { + // Use the URI prefix to find the component. + String splitURI[] = StringHelper.splitOnCharacter(uri, ":", 2); + if (splitURI[1] != null) { + scheme = splitURI[0]; + } + } + return scheme; + } + @SuppressWarnings("unchecked") private static Map<String, Object> safeCopyProperties(Map<String, Object> properties) { if (properties == null) { diff --git a/camel-core/src/test/java/org/apache/camel/component/bar/BarComponent.java b/camel-core/src/test/java/org/apache/camel/component/bar/BarComponent.java new file mode 100644 index 0000000..6b67090 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/component/bar/BarComponent.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.bar; + +import java.util.Map; + +import org.apache.camel.Endpoint; +import org.apache.camel.impl.DefaultComponent; + +public class BarComponent extends DefaultComponent { + + @Override + protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { + BarEndpoint answer = new BarEndpoint(uri, this); + answer.setName(remaining); + setProperties(answer, parameters); + return answer; + } +} diff --git a/camel-core/src/test/java/org/apache/camel/component/bar/BarConstants.java b/camel-core/src/test/java/org/apache/camel/component/bar/BarConstants.java new file mode 100644 index 0000000..e465547 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/component/bar/BarConstants.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.bar; + +public final class BarConstants { + + public static final String DRINK = "CamelBarDrink"; + + private BarConstants() { + } +} diff --git a/camel-core/src/test/java/org/apache/camel/component/bar/BarEndpoint.java b/camel-core/src/test/java/org/apache/camel/component/bar/BarEndpoint.java new file mode 100644 index 0000000..d1f950f --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/component/bar/BarEndpoint.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.bar; + +import org.apache.camel.Component; +import org.apache.camel.Consumer; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.impl.DefaultEndpoint; + +public class BarEndpoint extends DefaultEndpoint { + + private String name; + private String drink; + + public BarEndpoint(String endpointUri, Component component) { + super(endpointUri, component); + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getDrink() { + return drink; + } + + public void setDrink(String drink) { + this.drink = drink; + } + + @Override + public Producer createProducer() throws Exception { + return new BarProducer(this); + } + + @Override + public Consumer createConsumer(Processor processor) throws Exception { + throw new UnsupportedOperationException("Consumer not supported"); + } + + @Override + public boolean isSingleton() { + return true; + } + +} diff --git a/camel-core/src/test/java/org/apache/camel/component/bar/BarProducer.java b/camel-core/src/test/java/org/apache/camel/component/bar/BarProducer.java new file mode 100644 index 0000000..cca73d9 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/component/bar/BarProducer.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.bar; + +import org.apache.camel.Exchange; +import org.apache.camel.impl.DefaultProducer; + +public class BarProducer extends DefaultProducer { + + public BarProducer(BarEndpoint endpoint) { + super(endpoint); + } + + @Override + public BarEndpoint getEndpoint() { + return (BarEndpoint) super.getEndpoint(); + } + + @Override + public void process(Exchange exchange) throws Exception { + // is there a header with the drink + String drink = exchange.getIn().getHeader(BarConstants.DRINK, String.class); + if (drink == null) { + drink = getEndpoint().getDrink(); + } + + String order = exchange.getIn().getBody(String.class) + " ordered " + drink; + exchange.getIn().setBody(order); + } +} diff --git a/camel-core/src/test/java/org/apache/camel/component/bar/BarSendDynamicAware.java b/camel-core/src/test/java/org/apache/camel/component/bar/BarSendDynamicAware.java new file mode 100644 index 0000000..45bad2d --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/component/bar/BarSendDynamicAware.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.bar; + +import java.util.LinkedHashMap; +import java.util.Map; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.ExpressionBuilder; +import org.apache.camel.processor.RemoveHeaderProcessor; +import org.apache.camel.processor.SetHeaderProcessor; +import org.apache.camel.spi.SendDynamicAware; +import org.apache.camel.util.StringHelper; +import org.apache.camel.util.URISupport; + +public class BarSendDynamicAware implements SendDynamicAware { + + private String scheme; + + @Override + public void setScheme(String scheme) { + this.scheme = scheme; + } + + @Override + public String getScheme() { + return scheme; + } + + @Override + public DynamicAwareEntry prepare(Exchange exchange, String uri) throws Exception { + String query = StringHelper.after(uri, "?"); + if (query != null) { + Map<String, String> map = new LinkedHashMap(URISupport.parseQuery(query)); + return new DynamicAwareEntry(uri, map, null); + } else { + return new DynamicAwareEntry(uri, null, null); + } + } + + @Override + public Processor createPreProcessor(Exchange exchange, DynamicAwareEntry entry) throws Exception { + if (entry.getProperties().containsKey("drink")) { + Object value = entry.getProperties().get("drink"); + return new SetHeaderProcessor(ExpressionBuilder.constantExpression(BarConstants.DRINK), + ExpressionBuilder.constantExpression(value)); + } else { + return null; + } + } + + @Override + public Processor createPostProcessor(Exchange exchange, DynamicAwareEntry entry) throws Exception { + // remove header after use + return new RemoveHeaderProcessor(BarConstants.DRINK); + } + + @Override + public String resolveStaticUri(Exchange exchange, DynamicAwareEntry entry) throws Exception { + // before the ? + String uri = entry.getOriginalUri(); + return StringHelper.before(uri, "?"); + } +} diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedSendDynamicProcessorTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedSendDynamicProcessorTest.java index 1512b4b..812472f 100644 --- a/camel-core/src/test/java/org/apache/camel/management/ManagedSendDynamicProcessorTest.java +++ b/camel-core/src/test/java/org/apache/camel/management/ManagedSendDynamicProcessorTest.java @@ -75,6 +75,9 @@ public class ManagedSendDynamicProcessorTest extends ManagementTestSupport { String uri = (String) mbeanServer.getAttribute(on, "Uri"); assertEquals("direct:${header.whereto}", uri); + Boolean optimised = (Boolean) mbeanServer.getAttribute(on, "Optimised"); + assertFalse(optimised); + String pattern = (String) mbeanServer.getAttribute(on, "MessageExchangePattern"); assertNull(pattern); @@ -88,7 +91,7 @@ public class ManagedSendDynamicProcessorTest extends ManagementTestSupport { data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{true}, new String[]{"boolean"}); assertNotNull(data); - assertEquals(6, data.size()); + assertEquals(7, data.size()); String json = (String) mbeanServer.invoke(on, "informationJson", null, null); assertNotNull(json); diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedWireTapTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedWireTapTest.java index 30f9370..0e0920d 100644 --- a/camel-core/src/test/java/org/apache/camel/management/ManagedWireTapTest.java +++ b/camel-core/src/test/java/org/apache/camel/management/ManagedWireTapTest.java @@ -75,6 +75,9 @@ public class ManagedWireTapTest extends ManagementTestSupport { String uri = (String) mbeanServer.getAttribute(on, "Uri"); assertEquals("direct:${header.whereto}", uri); + Boolean dynamicUri = (Boolean) mbeanServer.getAttribute(on, "DynamicUri"); + assertTrue(dynamicUri); + TabularData data = (TabularData) mbeanServer.invoke(on, "extendedInformation", null, null); assertNotNull(data); assertEquals(2, data.size()); @@ -85,7 +88,7 @@ public class ManagedWireTapTest extends ManagementTestSupport { data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{true}, new String[]{"boolean"}); assertNotNull(data); - assertEquals(12, data.size()); + assertEquals(13, data.size()); String json = (String) mbeanServer.invoke(on, "informationJson", null, null); assertNotNull(json); diff --git a/camel-core/src/test/java/org/apache/camel/processor/ToDynamicSendDynamicAwareTest.java b/camel-core/src/test/java/org/apache/camel/processor/ToDynamicSendDynamicAwareTest.java new file mode 100644 index 0000000..6837c54 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/ToDynamicSendDynamicAwareTest.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.bar.BarComponent; +import org.apache.camel.component.bar.BarConstants; + +public class ToDynamicSendDynamicAwareTest extends ContextTestSupport { + + public void testToDynamic() throws Exception { + getMockEndpoint("mock:bar").expectedBodiesReceived("Hello Camel ordered beer", "Hello World ordered wine"); + // the post-processor should remove the header + getMockEndpoint("mock:bar").allMessages().header(BarConstants.DRINK).isNull(); + + template.sendBodyAndHeader("direct:start", "Hello Camel", "drink", "beer"); + template.sendBodyAndHeader("direct:start", "Hello World", "drink", "wine"); + + assertMockEndpointsSatisfied(); + + // there should only be a bar:order endpoint + boolean found = context.getEndpointMap().containsKey("bar://order"); + assertTrue("There should only be one bar endpoint", found); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + context.addComponent("bar", new BarComponent()); + + from("direct:start") + .toD("bar:order?drink=${header.drink}") + .to("mock:bar"); + } + }; + } +} diff --git a/camel-core/src/test/resources/META-INF/services/org/apache/camel/send-dynamic/bar b/camel-core/src/test/resources/META-INF/services/org/apache/camel/send-dynamic/bar new file mode 100644 index 0000000..c981966 --- /dev/null +++ b/camel-core/src/test/resources/META-INF/services/org/apache/camel/send-dynamic/bar @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +class=org.apache.camel.component.bar.BarSendDynamicAware \ No newline at end of file diff --git a/components/camel-headersmap/src/main/docs/headersmap.adoc b/components/camel-headersmap/src/main/docs/headersmap.adoc index c5e3003..8b9283f 100644 --- a/components/camel-headersmap/src/main/docs/headersmap.adoc +++ b/components/camel-headersmap/src/main/docs/headersmap.adoc @@ -18,7 +18,7 @@ For spring-boot there is a `camel-headersmap-starter` dependency you should use. ### Manual enabling -If you use OSGi or the implementation is not added to the classpath, you need to enable this explict such .Title +If you use OSGi or the implementation is not added to the classpath, you need to enable this explict such: ``` CamelContext camel = ... @@ -26,10 +26,10 @@ CamelContext camel = ... camel.setHeadersMapFactory(new FastHeadersMapFactory()); ``` -Or in XML DSL (spring or blueprint XML file) you can declare the factory as a `<bean>` +Or in XML DSL (spring or blueprint XML file) you can declare the factory as a `<bean>`: ``` <bean id="fastMapFactory" class="org.apache.camel.component.headersmap.FastHeadersMapFactory"/> ``` -and then Camel should detect the bean and use the factory, which is logged: \ No newline at end of file +and then Camel should detect the bean and use the factory. \ No newline at end of file diff --git a/components/camel-http-common/src/main/java/org/apache/camel/http/common/HttpSendDynamicAware.java b/components/camel-http-common/src/main/java/org/apache/camel/http/common/HttpSendDynamicAware.java new file mode 100644 index 0000000..7968952 --- /dev/null +++ b/components/camel-http-common/src/main/java/org/apache/camel/http/common/HttpSendDynamicAware.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.http.common; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.LinkedHashMap; +import java.util.Map; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.runtimecatalog.RuntimeCamelCatalog; +import org.apache.camel.spi.SendDynamicAware; +import org.apache.camel.util.StringHelper; +import org.apache.camel.util.URISupport; + +/** + * HTTP based {@link SendDynamicAware} which allows to optimise HTTP components + * with the toD (dynamic to) DSL in Camel. This implementation optimises by allowing + * to provide dynamic parameters via {@link Exchange#HTTP_PATH} and {@link Exchange#HTTP_QUERY} headers + * instead of the endpoint uri. That allows to use a static endpoint and its producer to service + * dynamic requests. + */ +public class HttpSendDynamicAware implements SendDynamicAware { + + private final Processor postProcessor = new HttpSendDynamicPostProcessor(); + + private String scheme; + + @Override + public void setScheme(String scheme) { + this.scheme = scheme; + } + + @Override + public String getScheme() { + return scheme; + } + + @Override + public DynamicAwareEntry prepare(Exchange exchange, String uri) throws Exception { + RuntimeCamelCatalog catalog = exchange.getContext().getRuntimeCamelCatalog(); + Map<String, String> properties = catalog.endpointProperties(uri); + Map<String, String> lenient = catalog.endpointLenientProperties(uri); + return new DynamicAwareEntry(uri, properties, lenient); + } + + @Override + public String resolveStaticUri(Exchange exchange, DynamicAwareEntry entry) throws Exception { + String[] hostAndPath = parseUri(entry); + String host = hostAndPath[0]; + String path = hostAndPath[1]; + if (path != null || !entry.getLenientProperties().isEmpty()) { + // the context path can be dynamic or any lenient properties + // and therefore build a new static uri without path or lenient options + Map<String, String> params = new LinkedHashMap<>(entry.getProperties()); + for (String k : entry.getLenientProperties().keySet()) { + params.remove(k); + } + if (path != null) { + // httpUri/httpURI contains the host and path, so replace it with just the host as the context-path is dynamic + if (params.containsKey("httpUri")) { + params.put("httpUri", host); + } else if (params.containsKey("httpURI")) { + params.put("httpURI", host); + } else if ("netty4-http".equals(scheme)) { + // the netty4-http stores host,port etc in other fields than httpURI so we can just remove the path parameter + params.remove("path"); + } + } + RuntimeCamelCatalog catalog = exchange.getContext().getRuntimeCamelCatalog(); + return catalog.asEndpointUri(scheme, params, false); + } else { + // no need for optimisation + return null; + } + } + + @Override + public Processor createPreProcessor(Exchange exchange, DynamicAwareEntry entry) throws Exception { + String[] hostAndPath = parseUri(entry); + String path = hostAndPath[1]; + String query = null; + if (!entry.getLenientProperties().isEmpty()) { + // all lenient properties can be dynamic and provided in the HTTP_QUERY header + query = URISupport.createQueryString(new LinkedHashMap<>(entry.getLenientProperties())); + } + + if (path != null || query != null) { + return new HttpSendDynamicPreProcessor(path, query); + } else { + // no optimisation + return null; + } + } + + @Override + public Processor createPostProcessor(Exchange exchange, DynamicAwareEntry entry) throws Exception { + return postProcessor; + } + + protected String[] parseUri(DynamicAwareEntry entry) { + String u = entry.getOriginalUri(); + + // remove scheme prefix (unless its camel-http or camel-http4) + boolean httpComponent = "http".equals(scheme) || "https".equals(scheme) || "http4".equals(scheme) || "https4".equals(scheme); + if (!httpComponent) { + String prefix = scheme + "://"; + String prefix2 = scheme + ":"; + if (u.startsWith(prefix)) { + u = u.substring(prefix.length()); + } else if (u.startsWith(prefix2)) { + u = u.substring(prefix2.length()); + } + } + + // remove query parameters + if (u.indexOf('?') > 0) { + u = StringHelper.before(u, "?"); + } + + // favour using java.net.URI for parsing into host and context-path + try { + URI parse = new URI(u); + String host = parse.getHost(); + String path = parse.getPath(); + // if the path is just a trailing slash then skip it (eg it must be longer than just the slash itself) + if (path != null && path.length() > 1) { + int port = parse.getPort(); + if (port != 80 && port != 443) { + host += ":" + port; + } + if (!httpComponent) { + // include scheme for components that are not camel-http + String scheme = parse.getScheme(); + if (scheme != null) { + host = scheme + "://" + host; + } + } + return new String[]{host, path}; + } + } catch (URISyntaxException e) { + // ignore + return new String[]{u, null}; + } + + // no context path + return new String[]{u, null}; + } + +} diff --git a/components/camel-http-common/src/main/java/org/apache/camel/http/common/HttpSendDynamicPostProcessor.java b/components/camel-http-common/src/main/java/org/apache/camel/http/common/HttpSendDynamicPostProcessor.java new file mode 100644 index 0000000..300533b --- /dev/null +++ b/components/camel-http-common/src/main/java/org/apache/camel/http/common/HttpSendDynamicPostProcessor.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.http.common; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; + +/** + * Post {@link Processor} used by {@link HttpSendDynamicAware}. + */ +public class HttpSendDynamicPostProcessor implements Processor { + + @Override + public void process(Exchange exchange) throws Exception { + // cleanup and remove the headers we used + exchange.getMessage().removeHeader(Exchange.HTTP_PATH); + exchange.getMessage().removeHeader(Exchange.HTTP_QUERY); + } + +} diff --git a/components/camel-http-common/src/main/java/org/apache/camel/http/common/HttpSendDynamicPreProcessor.java b/components/camel-http-common/src/main/java/org/apache/camel/http/common/HttpSendDynamicPreProcessor.java new file mode 100644 index 0000000..2596fb5 --- /dev/null +++ b/components/camel-http-common/src/main/java/org/apache/camel/http/common/HttpSendDynamicPreProcessor.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.http.common; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; + +/** + * Pre {@link Processor} used by {@link HttpSendDynamicAware}. + */ +public class HttpSendDynamicPreProcessor implements Processor { + + private final String path; + private final String query; + + public HttpSendDynamicPreProcessor(String path, String query) { + this.path = path; + this.query = query; + } + + @Override + public void process(Exchange exchange) throws Exception { + if (path != null) { + exchange.getIn().setHeader(Exchange.HTTP_PATH, path); + } else { + exchange.getIn().removeHeader(Exchange.HTTP_PATH); + } + if (query != null) { + exchange.getIn().setHeader(Exchange.HTTP_QUERY, query); + } else { + exchange.getIn().removeHeader(Exchange.HTTP_QUERY); + } + } + +} diff --git a/components/camel-http/src/main/resources/META-INF/services/org/apache/camel/send-dynamic/http b/components/camel-http/src/main/resources/META-INF/services/org/apache/camel/send-dynamic/http new file mode 100644 index 0000000..9b46865 --- /dev/null +++ b/components/camel-http/src/main/resources/META-INF/services/org/apache/camel/send-dynamic/http @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +class=org.apache.camel.http.common.HttpSendDynamicAware diff --git a/components/camel-http/src/main/resources/META-INF/services/org/apache/camel/send-dynamic/https b/components/camel-http/src/main/resources/META-INF/services/org/apache/camel/send-dynamic/https new file mode 100644 index 0000000..9b46865 --- /dev/null +++ b/components/camel-http/src/main/resources/META-INF/services/org/apache/camel/send-dynamic/https @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +class=org.apache.camel.http.common.HttpSendDynamicAware diff --git a/components/camel-http/src/test/java/org/apache/camel/component/http/HttpSendDynamicAwareTest.java b/components/camel-http/src/test/java/org/apache/camel/component/http/HttpSendDynamicAwareTest.java new file mode 100644 index 0000000..7885f41 --- /dev/null +++ b/components/camel-http/src/test/java/org/apache/camel/component/http/HttpSendDynamicAwareTest.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.http; + +import org.apache.camel.Exchange; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.http.handler.DrinkValidationHandler; +import org.apache.camel.test.AvailablePortFinder; +import org.eclipse.jetty.server.Server; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class HttpSendDynamicAwareTest extends BaseHttpTest { + + private static final int PORT = AvailablePortFinder.getNextAvailable(); + private Server localServer; + + @Before + @Override + public void setUp() throws Exception { + localServer = new Server(PORT); + localServer.setHandler(handlers( + contextHandler("/moes", new DrinkValidationHandler("GET", null, null, "drink")), + contextHandler("/joes", new DrinkValidationHandler("GET", null, null, "drink")) + )); + localServer.start(); + + super.setUp(); + } + + @After + @Override + public void tearDown() throws Exception { + super.tearDown(); + + if (localServer != null) { + localServer.stop(); + } + } + + @Override + protected RoutesBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:moes") + .toD("http://localhost:" + PORT + "/moes?throwExceptionOnFailure=false&drink=${header.drink}"); + + from("direct:joes") + .toD("http://localhost:" + PORT + "/joes?throwExceptionOnFailure=false&drink=${header.drink}"); + + from("direct:vodka") + // these 2 headers should not be in use when using toD + .setHeader(Exchange.HTTP_PATH, constant("shouldnotcauseproblems")) + .setHeader(Exchange.HTTP_QUERY, constant("drink=coke")) + .toD("http://localhost:" + PORT + "/joes?throwExceptionOnFailure=false&drink=vodka"); + } + }; + } + + @Test + public void testDynamicAware() throws Exception { + String out = fluentTemplate.to("direct:moes").withHeader("drink", "beer").request(String.class); + assertEquals("Drinking beer", out); + + out = fluentTemplate.to("direct:joes").withHeader("drink", "wine").request(String.class); + assertEquals("Drinking wine", out); + + out = fluentTemplate.to("direct:vodka").clearHeaders().request(String.class); + assertEquals("Drinking vodka", out); + + // and there should only be one http endpoint as they are both on same host + boolean found = context.getEndpointMap().containsKey("http://localhost:" + PORT + "?throwExceptionOnFailure=false"); + assertTrue("Should find static uri", found); + + // we only have 3xdirect and 1xhttp + assertEquals(4, context.getEndpointMap().size()); + } + +} diff --git a/components/camel-http/src/test/java/org/apache/camel/component/http/handler/BasicValidationHandler.java b/components/camel-http/src/test/java/org/apache/camel/component/http/handler/BasicValidationHandler.java index fa2d8ad..ddf38f1 100644 --- a/components/camel-http/src/test/java/org/apache/camel/component/http/handler/BasicValidationHandler.java +++ b/components/camel-http/src/test/java/org/apache/camel/component/http/handler/BasicValidationHandler.java @@ -74,13 +74,18 @@ public class BasicValidationHandler extends AbstractHandler { } response.setStatus(HttpServletResponse.SC_OK); - if (responseContent != null) { + String content = buildResponse(request); + if (content != null) { response.setContentType("text/plain; charset=utf-8"); PrintWriter out = response.getWriter(); - out.print(responseContent); + out.print(content); } } + protected String buildResponse(HttpServletRequest request) { + return responseContent; + } + protected boolean validateQuery(HttpServletRequest request) { String query = request.getQueryString(); if (expectedQuery != null && !expectedQuery.equals(query)) { diff --git a/components/camel-http/src/test/java/org/apache/camel/component/http/handler/DrinkValidationHandler.java b/components/camel-http/src/test/java/org/apache/camel/component/http/handler/DrinkValidationHandler.java new file mode 100644 index 0000000..3fa07ad --- /dev/null +++ b/components/camel-http/src/test/java/org/apache/camel/component/http/handler/DrinkValidationHandler.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.http.handler; + +import javax.servlet.http.HttpServletRequest; + +import org.apache.camel.util.StringHelper; + +public class DrinkValidationHandler extends BasicValidationHandler { + + private final String header; + + public DrinkValidationHandler(String expectedMethod, String expectedQuery, Object expectedContent, String header) { + super(expectedMethod, expectedQuery, expectedContent, null); + this.header = header; + } + + @Override + protected String buildResponse(HttpServletRequest request) { + String value = request.getHeader(header); + if (value == null) { + value = StringHelper.after(request.getQueryString(), "drink="); + } + return "Drinking " + value; + } +} diff --git a/components/camel-http4/src/main/resources/META-INF/services/org/apache/camel/send-dynamic/http4 b/components/camel-http4/src/main/resources/META-INF/services/org/apache/camel/send-dynamic/http4 new file mode 100644 index 0000000..9b46865 --- /dev/null +++ b/components/camel-http4/src/main/resources/META-INF/services/org/apache/camel/send-dynamic/http4 @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +class=org.apache.camel.http.common.HttpSendDynamicAware diff --git a/components/camel-http4/src/main/resources/META-INF/services/org/apache/camel/send-dynamic/https4 b/components/camel-http4/src/main/resources/META-INF/services/org/apache/camel/send-dynamic/https4 new file mode 100644 index 0000000..9b46865 --- /dev/null +++ b/components/camel-http4/src/main/resources/META-INF/services/org/apache/camel/send-dynamic/https4 @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +class=org.apache.camel.http.common.HttpSendDynamicAware diff --git a/components/camel-http4/src/test/java/org/apache/camel/component/http4/HttpSendDynamicAwareTest.java b/components/camel-http4/src/test/java/org/apache/camel/component/http4/HttpSendDynamicAwareTest.java new file mode 100644 index 0000000..0d4f276 --- /dev/null +++ b/components/camel-http4/src/test/java/org/apache/camel/component/http4/HttpSendDynamicAwareTest.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.http4; + +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.http4.handler.DrinkValidationHandler; +import org.apache.http.impl.bootstrap.HttpServer; +import org.apache.http.impl.bootstrap.ServerBootstrap; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class HttpSendDynamicAwareTest extends BaseHttpTest { + + private HttpServer localServer; + + @Before + @Override + public void setUp() throws Exception { + localServer = ServerBootstrap.bootstrap(). + setHttpProcessor(getBasicHttpProcessor()). + setConnectionReuseStrategy(getConnectionReuseStrategy()). + setResponseFactory(getHttpResponseFactory()). + setExpectationVerifier(getHttpExpectationVerifier()). + setSslContext(getSSLContext()). + registerHandler("/moes", new DrinkValidationHandler("GET", null, null, "drink")). + registerHandler("/joes", new DrinkValidationHandler("GET", null, null, "drink")). + create(); + localServer.start(); + + super.setUp(); + } + + @After + @Override + public void tearDown() throws Exception { + super.tearDown(); + + if (localServer != null) { + localServer.stop(); + } + } + + @Override + protected RoutesBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:moes") + .toD("http4://localhost:" + localServer.getLocalPort() + "/moes?throwExceptionOnFailure=false&drink=${header.drink}"); + + from("direct:joes") + .toD("http4://localhost:" + localServer.getLocalPort() + "/joes?throwExceptionOnFailure=false&drink=${header.drink}"); + } + }; + } + + @Test + public void testDynamicAware() throws Exception { + String out = fluentTemplate.to("direct:moes").withHeader("drink", "beer").request(String.class); + assertEquals("Drinking beer", out); + + out = fluentTemplate.to("direct:joes").withHeader("drink", "wine").request(String.class); + assertEquals("Drinking wine", out); + + // and there should only be one http endpoint as they are both on same host + boolean found = context.getEndpointMap().containsKey("http4://localhost:" + localServer.getLocalPort() + "?throwExceptionOnFailure=false"); + assertTrue("Should find static uri", found); + + // we only have 2xdirect and 1xhttp4 + assertEquals(3, context.getEndpointMap().size()); + } + +} diff --git a/components/camel-http4/src/test/java/org/apache/camel/component/http4/handler/BasicValidationHandler.java b/components/camel-http4/src/test/java/org/apache/camel/component/http4/handler/BasicValidationHandler.java index 70c1ce9..07ef89f 100644 --- a/components/camel-http4/src/test/java/org/apache/camel/component/http4/handler/BasicValidationHandler.java +++ b/components/camel-http4/src/test/java/org/apache/camel/component/http4/handler/BasicValidationHandler.java @@ -74,8 +74,9 @@ public class BasicValidationHandler implements HttpRequestHandler { } response.setStatusCode(HttpStatus.SC_OK); - if (responseContent != null) { - response.setEntity(new StringEntity(responseContent, "ASCII")); + String content = buildResponse(request); + if (content != null) { + response.setEntity(new StringEntity(content, "ASCII")); } } @@ -90,4 +91,9 @@ public class BasicValidationHandler implements HttpRequestHandler { } return true; } + + protected String buildResponse(HttpRequest request) { + return responseContent; + } + } \ No newline at end of file diff --git a/components/camel-http4/src/test/java/org/apache/camel/component/http4/handler/DrinkValidationHandler.java b/components/camel-http4/src/test/java/org/apache/camel/component/http4/handler/DrinkValidationHandler.java new file mode 100644 index 0000000..ac02410 --- /dev/null +++ b/components/camel-http4/src/test/java/org/apache/camel/component/http4/handler/DrinkValidationHandler.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.http4.handler; + +import org.apache.http.HttpRequest; + +public class DrinkValidationHandler extends BasicValidationHandler { + + private final String header; + + public DrinkValidationHandler(String expectedMethod, String expectedQuery, Object expectedContent, String header) { + super(expectedMethod, expectedQuery, expectedContent, null); + this.header = header; + } + + @Override + protected String buildResponse(HttpRequest request) { + return "Drinking " + request.getFirstHeader(header).getValue(); + } +} diff --git a/components/camel-jetty9/src/main/resources/META-INF/services/org/apache/camel/send-dynamic/jetty b/components/camel-jetty9/src/main/resources/META-INF/services/org/apache/camel/send-dynamic/jetty new file mode 100644 index 0000000..d65016e --- /dev/null +++ b/components/camel-jetty9/src/main/resources/META-INF/services/org/apache/camel/send-dynamic/jetty @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +class=org.apache.camel.http.common.HttpSendDynamicAware diff --git a/components/camel-jetty9/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerSendDynamicAwareTest.java b/components/camel-jetty9/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerSendDynamicAwareTest.java new file mode 100644 index 0000000..09428c0 --- /dev/null +++ b/components/camel-jetty9/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerSendDynamicAwareTest.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.jetty.jettyproducer; + +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.jetty.BaseJettyTest; +import org.junit.Test; + +public class JettyHttpProducerSendDynamicAwareTest extends BaseJettyTest { + + @Test + public void testDynamicAware() throws Exception { + String out = fluentTemplate.to("direct:moes").withHeader("drink", "beer").request(String.class); + assertEquals("Drinking beer", out); + + out = fluentTemplate.to("direct:joes").withHeader("drink", "wine").request(String.class); + assertEquals("Drinking wine", out); + + // and there should only be one http endpoint as they are both on same host + boolean found = context.getEndpointMap().containsKey("jetty://http://localhost:" + getPort() + "?throwExceptionOnFailure=false"); + assertTrue("Should find static uri", found); + + // we only have 2xdirect and 2xjetty + assertEquals(4, context.getEndpointMap().size()); + } + + @Override + protected RoutesBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:moes") + .toD("jetty:http://localhost:{{port}}/moes?throwExceptionOnFailure=false&drink=${header.drink}"); + + from("direct:joes") + .toD("jetty:http://localhost:{{port}}/joes?throwExceptionOnFailure=false&drink=${header.drink}"); + + from("jetty:http://localhost:{{port}}/?matchOnUriPrefix=true") + .transform().simple("Drinking ${header.drink[0]}"); + } + }; + } + +} diff --git a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpSendDynamicAware.java b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpSendDynamicAware.java new file mode 100644 index 0000000..96d86a7 --- /dev/null +++ b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpSendDynamicAware.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty4.http; + +import org.apache.camel.http.common.HttpSendDynamicAware; + +public class NettyHttpSendDynamicAware extends HttpSendDynamicAware { + + @Override + protected String[] parseUri(DynamicAwareEntry entry) { + // camel-netty4 parses the uri a bit differently than camel-http-common base class + + String scheme = entry.getProperties().get("protocol"); + String host = entry.getProperties().get("host"); + String port = entry.getProperties().get("port"); + String path = entry.getProperties().get("path"); + + String baseUrl = scheme + "://" + host; + if (port != null) { + baseUrl += ":" + port; + } + return new String[]{baseUrl, path}; + } + +} + diff --git a/components/camel-netty4-http/src/main/resources/META-INF/services/org/apache/camel/send-dynamic/netty4-http b/components/camel-netty4-http/src/main/resources/META-INF/services/org/apache/camel/send-dynamic/netty4-http new file mode 100644 index 0000000..e9d3f2e --- /dev/null +++ b/components/camel-netty4-http/src/main/resources/META-INF/services/org/apache/camel/send-dynamic/netty4-http @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +class=org.apache.camel.component.netty4.http.NettyHttpSendDynamicAware diff --git a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpSendDynamicAwareTest.java b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpSendDynamicAwareTest.java new file mode 100644 index 0000000..248c0e2 --- /dev/null +++ b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpSendDynamicAwareTest.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty4.http; + +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.junit.Test; + +public class NettyHttpSendDynamicAwareTest extends BaseNettyTest { + + @Test + public void testDynamicAware() throws Exception { + String out = fluentTemplate.to("direct:moes").withHeader("drink", "beer").request(String.class); + assertEquals("Drinking beer", out); + + out = fluentTemplate.to("direct:joes").withHeader("drink", "wine").request(String.class); + assertEquals("Drinking wine", out); + + // and there should only be one http endpoint as they are both on same host + boolean found = context.getEndpointMap().containsKey("netty4-http://http:localhost:" + getPort() + "?throwExceptionOnFailure=false"); + assertTrue("Should find static uri", found); + + // we only have 2xdirect and 2xnetty4-http + assertEquals(4, context.getEndpointMap().size()); + } + + @Override + protected RoutesBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:moes") + .toD("netty4-http:http://localhost:{{port}}/moes?throwExceptionOnFailure=false&drink=${header.drink}"); + + from("direct:joes") + .toD("netty4-http:http://localhost:{{port}}/joes?throwExceptionOnFailure=false&drink=${header.drink}"); + + from("netty4-http:http://localhost:{{port}}/?matchOnUriPrefix=true") + .transform().simple("Drinking ${header.drink[0]}"); + } + }; + } + +} diff --git a/components/camel-undertow/src/main/resources/META-INF/services/org/apache/camel/send-dynamic/undertow b/components/camel-undertow/src/main/resources/META-INF/services/org/apache/camel/send-dynamic/undertow new file mode 100644 index 0000000..9b46865 --- /dev/null +++ b/components/camel-undertow/src/main/resources/META-INF/services/org/apache/camel/send-dynamic/undertow @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +class=org.apache.camel.http.common.HttpSendDynamicAware diff --git a/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/UndertowSendDynamicAwareTest.java b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/UndertowSendDynamicAwareTest.java new file mode 100644 index 0000000..78fbf1f --- /dev/null +++ b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/UndertowSendDynamicAwareTest.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.undertow; + +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.junit.Test; + +public class UndertowSendDynamicAwareTest extends BaseUndertowTest { + + @Test + public void testDynamicAware() throws Exception { + String out = fluentTemplate.to("direct:moes").withHeader("drink", "beer").request(String.class); + assertEquals("Drinking beer", out); + + out = fluentTemplate.to("direct:joes").withHeader("drink", "wine").request(String.class); + assertEquals("Drinking wine", out); + + // and there should only be one http endpoint as they are both on same host + boolean found = context.getEndpointMap().containsKey("undertow://http://localhost:" + getPort() + "?throwExceptionOnFailure=false"); + assertTrue("Should find static uri", found); + + // we only have 2xdirect and 2xundertow + assertEquals(4, context.getEndpointMap().size()); + } + + @Override + protected RoutesBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:moes") + .toD("undertow:http://localhost:{{port}}/moes?throwExceptionOnFailure=false&drink=${header.drink}"); + + from("direct:joes") + .toD("undertow:http://localhost:{{port}}/joes?throwExceptionOnFailure=false&drink=${header.drink}"); + + // TODO: Fix the double header + from("undertow:http://localhost:{{port}}/?matchOnUriPrefix=true") + .transform().simple("Drinking ${header.drink[0]}"); + } + }; + } + +} -- To stop receiving notification emails like this one, please contact davscl...@apache.org.