CAMEL-6765: RouteAware API to allow injecting the Route into Consumer or other services which may need it
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3f9984b0 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3f9984b0 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3f9984b0 Branch: refs/heads/camel-2.12.x Commit: 3f9984b0c60c5b475dd700b1613b7805f301b3dd Parents: 74e9ff9 Author: Claus Ibsen <davscl...@apache.org> Authored: Wed Sep 18 12:48:50 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Sep 18 12:49:30 2013 +0200 ---------------------------------------------------------------------- .../main/java/org/apache/camel/RouteAware.java | 39 ++++++++++ .../org/apache/camel/impl/DefaultConsumer.java | 19 ++++- .../camel/impl/EventDrivenConsumerRoute.java | 4 + .../org/apache/camel/impl/RouteService.java | 7 ++ .../camel/processor/RouteAwareRouteTest.java | 78 ++++++++++++++++++++ 5 files changed, 146 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/3f9984b0/camel-core/src/main/java/org/apache/camel/RouteAware.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/RouteAware.java b/camel-core/src/main/java/org/apache/camel/RouteAware.java new file mode 100644 index 0000000..d871a95 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/RouteAware.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel; + +/** + * An interface to represent an object which wishes to be injected with + * a {@link Route} such as {@link Consumer} which is the consumer for a route. + */ +public interface RouteAware { + + /** + * Injects the {@link Route} + * + * @param route the route + */ + void setRoute(Route route); + + /** + * Gets the {@link Route} + * + * @return the route, or <tt>null</tt> if no route has been set. + */ + Route getRoute(); + +} http://git-wip-us.apache.org/repos/asf/camel/blob/3f9984b0/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java index 74e9555..2d82a47 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java @@ -21,6 +21,8 @@ import org.apache.camel.Consumer; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Processor; +import org.apache.camel.Route; +import org.apache.camel.RouteAware; import org.apache.camel.spi.ExceptionHandler; import org.apache.camel.spi.UnitOfWork; import org.apache.camel.support.ServiceSupport; @@ -36,12 +38,13 @@ import org.slf4j.LoggerFactory; * * @version */ -public class DefaultConsumer extends ServiceSupport implements Consumer { +public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAware { protected final Logger log = LoggerFactory.getLogger(getClass()); private final Endpoint endpoint; private final Processor processor; private volatile AsyncProcessor asyncProcessor; private ExceptionHandler exceptionHandler; + private Route route; public DefaultConsumer(Endpoint endpoint, Processor processor) { this.endpoint = endpoint; @@ -54,6 +57,14 @@ public class DefaultConsumer extends ServiceSupport implements Consumer { return "Consumer[" + URISupport.sanitizeUri(endpoint.getEndpointUri()) + "]"; } + public Route getRoute() { + return route; + } + + public void setRoute(Route route) { + this.route = route; + } + /** * If the consumer needs to defer done the {@link org.apache.camel.spi.UnitOfWork} on * the processed {@link Exchange} then this method should be use to create and start @@ -66,6 +77,12 @@ public class DefaultConsumer extends ServiceSupport implements Consumer { * @see #doneUoW(org.apache.camel.Exchange) */ public UnitOfWork createUoW(Exchange exchange) throws Exception { + // if the exchange doesn't have from route id set, then set it if it originated + // from this unit of work + if (route != null && exchange.getFromRouteId() == null) { + exchange.setFromRouteId(route.getId()); + } + UnitOfWork uow = UnitOfWorkHelper.createUoW(exchange); exchange.setUnitOfWork(uow); uow.start(); http://git-wip-us.apache.org/repos/asf/camel/blob/3f9984b0/camel-core/src/main/java/org/apache/camel/impl/EventDrivenConsumerRoute.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/EventDrivenConsumerRoute.java b/camel-core/src/main/java/org/apache/camel/impl/EventDrivenConsumerRoute.java index 4276ccc..d762a51 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/EventDrivenConsumerRoute.java +++ b/camel-core/src/main/java/org/apache/camel/impl/EventDrivenConsumerRoute.java @@ -22,6 +22,7 @@ import org.apache.camel.Consumer; import org.apache.camel.Endpoint; import org.apache.camel.Navigate; import org.apache.camel.Processor; +import org.apache.camel.RouteAware; import org.apache.camel.Service; import org.apache.camel.SuspendableService; import org.apache.camel.spi.RouteContext; @@ -64,6 +65,9 @@ public class EventDrivenConsumerRoute extends DefaultRoute { consumer = endpoint.createConsumer(processor); if (consumer != null) { services.add(consumer); + if (consumer instanceof RouteAware) { + ((RouteAware) consumer).setRoute(this); + } } Processor processor = getProcessor(); if (processor instanceof Service) { http://git-wip-us.apache.org/repos/asf/camel/blob/3f9984b0/camel-core/src/main/java/org/apache/camel/impl/RouteService.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/RouteService.java b/camel-core/src/main/java/org/apache/camel/impl/RouteService.java index 4f8f4e7..3df2829 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/RouteService.java +++ b/camel-core/src/main/java/org/apache/camel/impl/RouteService.java @@ -30,6 +30,7 @@ import org.apache.camel.Channel; import org.apache.camel.Consumer; import org.apache.camel.Processor; import org.apache.camel.Route; +import org.apache.camel.RouteAware; import org.apache.camel.Service; import org.apache.camel.model.OnCompletionDefinition; import org.apache.camel.model.OnExceptionDefinition; @@ -142,6 +143,12 @@ public class RouteService extends ChildServiceSupport { // afterwards to avoid them being active while the others start List<Service> childServices = new ArrayList<Service>(); for (Service service : list) { + + // inject the route + if (service instanceof RouteAware) { + ((RouteAware) service).setRoute(route); + } + if (service instanceof Consumer) { inputs.put(route, (Consumer) service); } else { http://git-wip-us.apache.org/repos/asf/camel/blob/3f9984b0/camel-core/src/test/java/org/apache/camel/processor/RouteAwareRouteTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/RouteAwareRouteTest.java b/camel-core/src/test/java/org/apache/camel/processor/RouteAwareRouteTest.java new file mode 100644 index 0000000..62f5995 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/RouteAwareRouteTest.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.Route; +import org.apache.camel.RouteAware; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.support.ServiceSupport; + +public class RouteAwareRouteTest extends ContextTestSupport { + + public void testRouteAware() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("foo"); + + template.sendBody("direct:start", "Hello World"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start").routeId("foo") + .process(new MyProcessor()) + .to("mock:result"); + } + }; + } + + private static final class MyProcessor extends ServiceSupport implements Processor, RouteAware { + + private Route route; + + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setBody(route.getId()); + } + + @Override + public void setRoute(Route route) { + this.route = route; + } + + @Override + public Route getRoute() { + return route; + } + + @Override + protected void doStart() throws Exception { + // noop + } + + @Override + protected void doStop() throws Exception { + // noop + } + } +}