CAMEL-7641: Added SynchronizationRouteAware allows to get callbacks from UoW before/after routing.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/54fb4139 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/54fb4139 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/54fb4139 Branch: refs/heads/master Commit: 54fb413975ebdc16f12cbd927492fd19222c4f46 Parents: b1e2397 Author: Claus Ibsen <davscl...@apache.org> Authored: Sun Jul 27 11:27:51 2014 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Mon Jul 28 10:50:49 2014 +0200 ---------------------------------------------------------------------- .../apache/camel/impl/DefaultRouteContext.java | 7 ++ .../apache/camel/impl/DefaultUnitOfWork.java | 17 ++++ .../camel/processor/CamelInternalProcessor.java | 29 ++++++ .../processor/binding/RestBindingProcessor.java | 24 ++++- .../camel/spi/SynchronizationRouteAware.java | 58 ++++++++++++ .../java/org/apache/camel/spi/UnitOfWork.java | 17 ++++ .../camel/support/SynchronizationAdapter.java | 12 ++- .../org/apache/camel/util/UnitOfWorkHelper.java | 52 +++++++++++ .../RouteAwareSynchronizationTest.java | 97 ++++++++++++++++++++ .../jetty/rest/RestJettyPojoInOutTest.java | 2 - 10 files changed, 307 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/54fb4139/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java index aeb8b91..8029189 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java @@ -182,6 +182,9 @@ public class DefaultRouteContext implements RouteContext { // wrap in JMX instrumentation processor that is used for performance stats internal.addAdvice(new CamelInternalProcessor.InstrumentationAdvice("route")); + // wrap in route lifecycle + internal.addAdvice(new CamelInternalProcessor.RouteLifecycleAdvice()); + // and create the route that wraps the UoW Route edcr = new EventDrivenConsumerRoute(this, getEndpoint(), internal); edcr.getProperties().put(Route.ID_PROPERTY, routeId); @@ -195,6 +198,10 @@ public class DefaultRouteContext implements RouteContext { if (task != null) { task.setRoute(edcr); } + CamelInternalProcessor.RouteLifecycleAdvice task2 = internal.getAdvice(CamelInternalProcessor.RouteLifecycleAdvice.class); + if (task2 != null) { + task2.setRoute(edcr); + } // invoke init on route policy if (routePolicyList != null && !routePolicyList.isEmpty()) { http://git-wip-us.apache.org/repos/asf/camel/blob/54fb4139/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java index e8b98e0..246100b 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java @@ -30,6 +30,7 @@ import org.apache.camel.CamelUnitOfWorkException; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.Processor; +import org.apache.camel.Route; import org.apache.camel.Service; import org.apache.camel.spi.RouteContext; import org.apache.camel.spi.SubUnitOfWork; @@ -256,6 +257,22 @@ public class DefaultUnitOfWork implements UnitOfWork, Service { } } + @Override + public void beforeRoute(Exchange exchange, Route route) { + if (log.isTraceEnabled()) { + log.trace("UnitOfWork beforeRoute: {} for ExchangeId: {} with {}", new Object[]{route.getId(), exchange.getExchangeId(), exchange}); + } + UnitOfWorkHelper.beforeRouteSynchronizations(route, exchange, synchronizations, log); + } + + @Override + public void afterRoute(Exchange exchange, Route route) { + if (log.isTraceEnabled()) { + log.trace("UnitOfWork afterRouteL: {} for ExchangeId: {} with {}", new Object[]{route.getId(), exchange.getExchangeId(), exchange}); + } + UnitOfWorkHelper.afterRouteSynchronizations(route, exchange, synchronizations, log); + } + public String getId() { if (id == null) { id = context.getUuidGenerator().generateUuid(); http://git-wip-us.apache.org/repos/asf/camel/blob/54fb4139/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java index 7ebd726..47c2311 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java @@ -284,6 +284,35 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor { } /** + * Advice to invoke callbacks for before and after routing. + */ + public static class RouteLifecycleAdvice implements CamelInternalProcessorAdvice<Object> { + + private Route route; + + public void setRoute(Route route) { + this.route = route; + } + + @Override + public Object before(Exchange exchange) throws Exception { + UnitOfWork uow = exchange.getUnitOfWork(); + if (uow != null) { + uow.beforeRoute(exchange, route); + } + return null; + } + + @Override + public void after(Exchange exchange, Object object) throws Exception { + UnitOfWork uow = exchange.getUnitOfWork(); + if (uow != null) { + uow.afterRoute(exchange, route); + } + } + } + + /** * Advice for JMX instrumentation of the process being invoked. * <p/> * This advice keeps track of JMX metrics for performance statistics. http://git-wip-us.apache.org/repos/asf/camel/blob/54fb4139/camel-core/src/main/java/org/apache/camel/processor/binding/RestBindingProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/binding/RestBindingProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/binding/RestBindingProcessor.java index dc5d76f..b6ecbb4 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/binding/RestBindingProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/binding/RestBindingProcessor.java @@ -21,6 +21,7 @@ import java.util.Locale; import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; import org.apache.camel.Exchange; +import org.apache.camel.Route; import org.apache.camel.processor.MarshalProcessor; import org.apache.camel.processor.UnmarshalProcessor; import org.apache.camel.spi.DataFormat; @@ -148,7 +149,7 @@ public class RestBindingProcessor extends ServiceSupport implements AsyncProcess if (isXml && xmlUnmarshal != null) { // add reverse operation - exchange.addOnCompletion(new RestBindingMarshalOnCompletion(jsonMmarshal, xmlMmarshal)); + exchange.addOnCompletion(new RestBindingMarshalOnCompletion(exchange.getFromRouteId(), jsonMmarshal, xmlMmarshal)); if (ObjectHelper.isNotEmpty(body)) { return xmlUnmarshal.process(exchange, callback); } else { @@ -157,7 +158,7 @@ public class RestBindingProcessor extends ServiceSupport implements AsyncProcess } } else if (isJson && jsonUnmarshal != null) { // add reverse operation - exchange.addOnCompletion(new RestBindingMarshalOnCompletion(jsonMmarshal, xmlMmarshal)); + exchange.addOnCompletion(new RestBindingMarshalOnCompletion(exchange.getFromRouteId(), jsonMmarshal, xmlMmarshal)); if (ObjectHelper.isNotEmpty(body)) { return jsonUnmarshal.process(exchange, callback); } else { @@ -205,15 +206,28 @@ public class RestBindingProcessor extends ServiceSupport implements AsyncProcess private final AsyncProcessor jsonMmarshal; private final AsyncProcessor xmlMmarshal; + private final String routeId; - private RestBindingMarshalOnCompletion(AsyncProcessor jsonMmarshal, AsyncProcessor xmlMmarshal) { + private RestBindingMarshalOnCompletion(String routeId, AsyncProcessor jsonMmarshal, AsyncProcessor xmlMmarshal) { + this.routeId = routeId; this.jsonMmarshal = jsonMmarshal; this.xmlMmarshal = xmlMmarshal; } @Override - public void onComplete(Exchange exchange) { - // only marshal if we succeeded (= onComplete) + public void onAfterRoute(Route route, Exchange exchange) { + // we use the onAfterRoute callback, to ensure the data has been marshalled before + // the consumer writes the response back + + // only trigger when it was the 1st route that was done + if (!routeId.equals(route.getId())) { + return; + } + + // only marshal if there was no exception + if (exchange.getException() != null) { + return; + } if (bindingMode == null || "off".equals(bindingMode)) { // binding is off http://git-wip-us.apache.org/repos/asf/camel/blob/54fb4139/camel-core/src/main/java/org/apache/camel/spi/SynchronizationRouteAware.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/spi/SynchronizationRouteAware.java b/camel-core/src/main/java/org/apache/camel/spi/SynchronizationRouteAware.java new file mode 100644 index 0000000..62538e4 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/spi/SynchronizationRouteAware.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spi; + +import org.apache.camel.Exchange; +import org.apache.camel.Route; + +/** + * An extended {@link org.apache.camel.spi.Synchronization} which is route aware. + */ +public interface SynchronizationRouteAware extends Synchronization { + + /** + * Invoked before the {@link org.apache.camel.Exchange} is being routed by the given route. + * <p/> + * Notice if the exchange is being routed through multiple routes, there will be callbacks for each route. + * <p/> + * <b>Important:</b> this callback may not invoked if the {@link org.apache.camel.spi.SynchronizationRouteAware} implementation + * is being added to the {@link org.apache.camel.spi.UnitOfWork} after the routing has started. + * + * @param route the route + * @param exchange the exchange + */ + void onBeforeRoute(Route route, Exchange exchange); + + /** + * Invoked after the {@link org.apache.camel.Exchange} has been routed by the given route. + * <p/> + * Notice if the exchange is being routed through multiple routes, there will be callbacks for each route. + * <p/> + * This invocation happens before these callbacks: + * <ul> + * <li>The consumer of the route writes any response back to the caller (if in InOut mode)</li> + * <li>The UoW is done calling either {@link #onComplete(org.apache.camel.Exchange)} or {@link #onFailure(org.apache.camel.Exchange)}</li> + * </ul> + * This allows custom logic to be executed after all routing is done, but before the {@link org.apache.camel.Consumer} prepares and writes + * any data back to the caller (if in InOut mode). + * + * @param route the route + * @param exchange the exchange + */ + void onAfterRoute(Route route, Exchange exchange); + +} http://git-wip-us.apache.org/repos/asf/camel/blob/54fb4139/camel-core/src/main/java/org/apache/camel/spi/UnitOfWork.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/spi/UnitOfWork.java b/camel-core/src/main/java/org/apache/camel/spi/UnitOfWork.java index 5100666..64da7e1 100644 --- a/camel-core/src/main/java/org/apache/camel/spi/UnitOfWork.java +++ b/camel-core/src/main/java/org/apache/camel/spi/UnitOfWork.java @@ -20,6 +20,7 @@ import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.Processor; +import org.apache.camel.Route; import org.apache.camel.Service; /** @@ -71,6 +72,22 @@ public interface UnitOfWork extends Service { void done(Exchange exchange); /** + * Invoked when this unit of work is about to be routed by the given route. + * + * @param exchange the current exchange + * @param route the route + */ + void beforeRoute(Exchange exchange, Route route); + + /** + * Invoked when this unit of work is done being routed by the given route. + * + * @param exchange the current exchange + * @param route the route + */ + void afterRoute(Exchange exchange, Route route); + + /** * Returns the unique ID of this unit of work, lazily creating one if it does not yet have one * * @return the unique ID http://git-wip-us.apache.org/repos/asf/camel/blob/54fb4139/camel-core/src/main/java/org/apache/camel/support/SynchronizationAdapter.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/support/SynchronizationAdapter.java b/camel-core/src/main/java/org/apache/camel/support/SynchronizationAdapter.java index b0f460e..3dd852a 100644 --- a/camel-core/src/main/java/org/apache/camel/support/SynchronizationAdapter.java +++ b/camel-core/src/main/java/org/apache/camel/support/SynchronizationAdapter.java @@ -18,7 +18,9 @@ package org.apache.camel.support; import org.apache.camel.Exchange; import org.apache.camel.Ordered; +import org.apache.camel.Route; import org.apache.camel.spi.Synchronization; +import org.apache.camel.spi.SynchronizationRouteAware; import org.apache.camel.spi.SynchronizationVetoable; /** @@ -27,7 +29,7 @@ import org.apache.camel.spi.SynchronizationVetoable; * * @version */ -public class SynchronizationAdapter implements SynchronizationVetoable, Ordered { +public class SynchronizationAdapter implements SynchronizationVetoable, Ordered, SynchronizationRouteAware { public void onComplete(Exchange exchange) { onDone(exchange); @@ -50,4 +52,12 @@ public class SynchronizationAdapter implements SynchronizationVetoable, Ordered // no particular order by default return 0; } + + public void onBeforeRoute(Route route, Exchange exchange) { + // noop + } + + public void onAfterRoute(Route route, Exchange exchange) { + // noop + } } http://git-wip-us.apache.org/repos/asf/camel/blob/54fb4139/camel-core/src/main/java/org/apache/camel/util/UnitOfWorkHelper.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/util/UnitOfWorkHelper.java b/camel-core/src/main/java/org/apache/camel/util/UnitOfWorkHelper.java index 50c4e8f..0cd6d9f 100644 --- a/camel-core/src/main/java/org/apache/camel/util/UnitOfWorkHelper.java +++ b/camel-core/src/main/java/org/apache/camel/util/UnitOfWorkHelper.java @@ -21,7 +21,9 @@ import java.util.Collections; import java.util.List; import org.apache.camel.Exchange; +import org.apache.camel.Route; import org.apache.camel.spi.Synchronization; +import org.apache.camel.spi.SynchronizationRouteAware; import org.apache.camel.spi.UnitOfWork; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -107,4 +109,54 @@ public final class UnitOfWorkHelper { } } + public static void beforeRouteSynchronizations(Route route, Exchange exchange, List<Synchronization> synchronizations, Logger log) { + if (synchronizations != null && !synchronizations.isEmpty()) { + // work on a copy of the list to avoid any modification which may cause ConcurrentModificationException + List<Synchronization> copy = new ArrayList<Synchronization>(synchronizations); + + // reverse so we invoke it FILO style instead of FIFO + Collections.reverse(copy); + // and honor if any was ordered by sorting it accordingly + Collections.sort(copy, new OrderedComparator()); + + // invoke synchronization callbacks + for (Synchronization synchronization : copy) { + if (synchronization instanceof SynchronizationRouteAware) { + try { + log.trace("Invoking synchronization.onBeforeRoute: {} with {}", synchronization, exchange); + ((SynchronizationRouteAware) synchronization).onBeforeRoute(route, exchange); + } catch (Throwable e) { + // must catch exceptions to ensure all synchronizations have a chance to run + log.warn("Exception occurred during onBeforeRoute. This exception will be ignored.", e); + } + } + } + } + } + + public static void afterRouteSynchronizations(Route route, Exchange exchange, List<Synchronization> synchronizations, Logger log) { + if (synchronizations != null && !synchronizations.isEmpty()) { + // work on a copy of the list to avoid any modification which may cause ConcurrentModificationException + List<Synchronization> copy = new ArrayList<Synchronization>(synchronizations); + + // reverse so we invoke it FILO style instead of FIFO + Collections.reverse(copy); + // and honor if any was ordered by sorting it accordingly + Collections.sort(copy, new OrderedComparator()); + + // invoke synchronization callbacks + for (Synchronization synchronization : copy) { + if (synchronization instanceof SynchronizationRouteAware) { + try { + log.trace("Invoking synchronization.onAfterRoute: {} with {}", synchronization, exchange); + ((SynchronizationRouteAware) synchronization).onAfterRoute(route, exchange); + } catch (Throwable e) { + // must catch exceptions to ensure all synchronizations have a chance to run + log.warn("Exception occurred during onAfterRoute. This exception will be ignored.", e); + } + } + } + } + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/54fb4139/camel-core/src/test/java/org/apache/camel/processor/RouteAwareSynchronizationTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/RouteAwareSynchronizationTest.java b/camel-core/src/test/java/org/apache/camel/processor/RouteAwareSynchronizationTest.java new file mode 100644 index 0000000..0df495f --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/RouteAwareSynchronizationTest.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.Route; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.spi.SynchronizationRouteAware; + +public class RouteAwareSynchronizationTest extends ContextTestSupport { + + private static final List<String> EVENTS = new ArrayList<String>(); + + public void testRouteAwareSynchronization() throws Exception { + EVENTS.clear(); + assertEquals(0, EVENTS.size()); + + getMockEndpoint("mock:a").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:b").expectedBodiesReceived("Hello World"); + + template.send("direct:start", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.addOnCompletion(new MyRouteAware()); + exchange.getIn().setBody("Hello World"); + } + }); + + assertMockEndpointsSatisfied(); + + assertEquals(5, EVENTS.size()); + assertEquals("onBeforeRoute-start", EVENTS.get(0)); + assertEquals("onBeforeRoute-foo", EVENTS.get(1)); + assertEquals("onAfterRoute-foo", EVENTS.get(2)); + assertEquals("onAfterRoute-start", EVENTS.get(3)); + assertEquals("onComplete", EVENTS.get(4)); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start").routeId("start") + .to("mock:a") + .to("direct:foo") + .to("mock:b"); + + from("direct:foo").routeId("foo") + .to("mock:foo"); + } + }; + } + + private static final class MyRouteAware implements SynchronizationRouteAware { + + @Override + public void onBeforeRoute(Route route, Exchange exchange) { + EVENTS.add("onBeforeRoute-" + route.getId()); + } + + @Override + public void onAfterRoute(Route route, Exchange exchange) { + EVENTS.add("onAfterRoute-" + route.getId()); + } + + @Override + public void onComplete(Exchange exchange) { + EVENTS.add("onComplete"); + } + + @Override + public void onFailure(Exchange exchange) { + EVENTS.add("onFailure"); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/54fb4139/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/rest/RestJettyPojoInOutTest.java ---------------------------------------------------------------------- diff --git a/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/rest/RestJettyPojoInOutTest.java b/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/rest/RestJettyPojoInOutTest.java index 7d3c6fa..b2df470 100644 --- a/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/rest/RestJettyPojoInOutTest.java +++ b/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/rest/RestJettyPojoInOutTest.java @@ -19,10 +19,8 @@ package org.apache.camel.component.jetty.rest; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.jetty.BaseJettyTest; import org.apache.camel.model.rest.RestBindingMode; -import org.junit.Ignore; import org.junit.Test; -@Ignore public class RestJettyPojoInOutTest extends BaseJettyTest { @Test