This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 4d532a846ac616572b145e899b28dad2f503da00 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Sun Dec 29 18:25:54 2019 +0100 CAMEL-14338: Add RouteIdAware so EIP processors can know which route they are serving --- .../apache/camel/impl/engine/BaseRouteService.java | 4 + .../impl/engine/EventDrivenConsumerRoute.java | 4 + .../camel/processor/ConsumerRouteIdAwareTest.java | 106 +++++++++++++++++++++ .../camel/processor/RouteAwareProcessorTest.java | 94 ++++++++++++++++++ .../org/apache/camel/support/DefaultConsumer.java | 14 ++- 5 files changed, 221 insertions(+), 1 deletion(-) diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/BaseRouteService.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/BaseRouteService.java index 33b72d4..51b99b4 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/BaseRouteService.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/BaseRouteService.java @@ -40,6 +40,7 @@ import org.apache.camel.Service; import org.apache.camel.processor.ErrorHandler; import org.apache.camel.spi.LifecycleStrategy; import org.apache.camel.spi.RouteContext; +import org.apache.camel.spi.RouteIdAware; import org.apache.camel.spi.RoutePolicy; import org.apache.camel.support.ChildServiceSupport; import org.apache.camel.support.EventHelper; @@ -171,6 +172,9 @@ public abstract class BaseRouteService extends ChildServiceSupport { if (service instanceof RouteAware) { ((RouteAware) service).setRoute(route); } + if (service instanceof RouteIdAware) { + ((RouteIdAware) service).setRouteId(route.getId()); + } if (service instanceof Consumer) { inputs.put(route, (Consumer) service); diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/EventDrivenConsumerRoute.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/EventDrivenConsumerRoute.java index 2b87cfd..aacf546 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/EventDrivenConsumerRoute.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/EventDrivenConsumerRoute.java @@ -29,6 +29,7 @@ import org.apache.camel.Suspendable; import org.apache.camel.SuspendableService; import org.apache.camel.spi.IdAware; import org.apache.camel.spi.RouteContext; +import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.PatternHelper; /** @@ -71,6 +72,9 @@ public class EventDrivenConsumerRoute extends DefaultRoute { if (consumer instanceof RouteAware) { ((RouteAware) consumer).setRoute(this); } + if (consumer instanceof RouteIdAware) { + ((RouteIdAware) consumer).setRouteId(this.getId()); + } } Processor processor = getProcessor(); if (processor instanceof Service) { diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/ConsumerRouteIdAwareTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/ConsumerRouteIdAwareTest.java new file mode 100644 index 0000000..31840b7 --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/processor/ConsumerRouteIdAwareTest.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import java.util.Map; + +import org.apache.camel.CamelContext; +import org.apache.camel.Component; +import org.apache.camel.Consumer; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.support.DefaultComponent; +import org.apache.camel.support.DefaultConsumer; +import org.apache.camel.support.DefaultEndpoint; +import org.junit.Test; + +public class ConsumerRouteIdAwareTest extends ContextTestSupport { + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + context.addComponent("my", new MyComponent(context)); + + from("my:foo").routeId("foo").to("mock:result"); + } + }; + } + + @Test + public void testRouteIdAware() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("Hello from consumer route foo"); + + assertMockEndpointsSatisfied(); + } + + private class MyComponent extends DefaultComponent { + + public MyComponent(CamelContext context) { + super(context); + } + + @Override + protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { + return new MyEndpoint(uri, this); + } + } + + private class MyEndpoint extends DefaultEndpoint { + + public MyEndpoint(String endpointUri, Component component) { + super(endpointUri, component); + } + + @Override + public Producer createProducer() throws Exception { + throw new UnsupportedOperationException("Not supported"); + } + + @Override + public Consumer createConsumer(Processor processor) throws Exception { + return new MyConsumer(this, processor); + } + } + + private class MyConsumer extends DefaultConsumer { + + public MyConsumer(Endpoint endpoint, Processor processor) { + super(endpoint, processor); + + Runnable run = () -> { + Exchange exchange = endpoint.createExchange(); + exchange.getMessage().setBody("Hello from consumer route " + getRouteId()); + try { + Thread.sleep(100); + processor.process(exchange); + } catch (Exception e) { + // ignore + } + }; + Thread t = new Thread(run); + t.start(); + } + + } + +} diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/RouteAwareProcessorTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/RouteAwareProcessorTest.java new file mode 100644 index 0000000..8c553b7 --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/processor/RouteAwareProcessorTest.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.RouteIdAware; +import org.apache.camel.support.service.ServiceSupport; +import org.junit.Test; + +public class RouteAwareProcessorTest extends ContextTestSupport { + + private MyProcessor processor = new MyProcessor(); + + @Test + public void testRouteIdAware() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("Hello route foo from processor myProcessor"); + + template.sendBody("direct:start", "Hello World"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start").routeId("foo") + .process(processor).id("myProcessor") + .to("mock:result"); + } + }; + } + + private class MyProcessor extends ServiceSupport implements Processor, RouteIdAware, IdAware { + + private String id; + private String routeId; + + @Override + public String getId() { + return id; + } + + @Override + public void setId(String id) { + this.id = id; + } + + @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + + @Override + public void process(Exchange exchange) throws Exception { + exchange.getMessage().setBody("Hello route " + routeId + " from processor " + id); + } + + @Override + protected void doStart() throws Exception { + // noop + } + + @Override + protected void doStop() throws Exception { + // noop + } + } + +} diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java index beb5870..e263c9a 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java @@ -25,6 +25,7 @@ import org.apache.camel.Processor; import org.apache.camel.Route; import org.apache.camel.RouteAware; import org.apache.camel.spi.ExceptionHandler; +import org.apache.camel.spi.RouteIdAware; import org.apache.camel.spi.UnitOfWork; import org.apache.camel.support.service.ServiceHelper; import org.apache.camel.support.service.ServiceSupport; @@ -33,7 +34,7 @@ import org.apache.camel.util.URISupport; /** * A default consumer useful for implementation inheritance. */ -public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAware { +public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAware, RouteIdAware { private transient String consumerToString; private final Endpoint endpoint; @@ -41,6 +42,7 @@ public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAw private volatile AsyncProcessor asyncProcessor; private ExceptionHandler exceptionHandler; private Route route; + private String routeId; public DefaultConsumer(Endpoint endpoint, Processor processor) { this.endpoint = endpoint; @@ -66,6 +68,16 @@ public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAw this.route = route; } + @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + /** * If the consumer needs to defer done the {@link org.apache.camel.spi.UnitOfWork} on * the processed {@link Exchange} then this method should be use to create and start