This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 2b973ad06420e5637a3c183afbccd42b61e311c6 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Sun Dec 29 13:41:43 2019 +0100 CAMEL-14338: Add RouteIdAware so EIP processors can know which route they are serving --- .../java/org/apache/camel/spi/RouteIdAware.java | 38 ++++++++++++++++++++++ .../org/apache/camel/processor/CatchProcessor.java | 14 +++++++- .../apache/camel/processor/ChoiceProcessor.java | 14 +++++++- .../camel/processor/ClaimCheckProcessor.java | 14 +++++++- .../camel/processor/ConvertBodyProcessor.java | 14 +++++++- .../java/org/apache/camel/processor/Delayer.java | 14 +++++++- .../java/org/apache/camel/processor/Enricher.java | 14 +++++++- .../camel/processor/ExchangePatternProcessor.java | 14 +++++++- .../apache/camel/processor/FilterProcessor.java | 14 +++++++- .../apache/camel/processor/FinallyProcessor.java | 14 +++++++- .../org/apache/camel/processor/LogProcessor.java | 14 +++++++- .../org/apache/camel/processor/LoopProcessor.java | 16 +++++++-- .../camel/processor/MethodCallProcessor.java | 14 +++++++- .../apache/camel/processor/MulticastProcessor.java | 14 +++++++- .../camel/processor/OnCompletionProcessor.java | 14 +++++++- .../java/org/apache/camel/processor/Pipeline.java | 14 +++++++- .../org/apache/camel/processor/PollEnricher.java | 14 +++++++- .../org/apache/camel/processor/RecipientList.java | 14 +++++++- .../camel/processor/RemoveHeaderProcessor.java | 14 +++++++- .../camel/processor/RemoveHeadersProcessor.java | 14 +++++++- .../camel/processor/RemovePropertiesProcessor.java | 14 +++++++- .../camel/processor/RemovePropertyProcessor.java | 14 +++++++- .../org/apache/camel/processor/Resequencer.java | 16 +++++++-- .../apache/camel/processor/RollbackProcessor.java | 14 +++++++- .../org/apache/camel/processor/RoutingSlip.java | 14 +++++++- .../apache/camel/processor/SamplingThrottler.java | 14 +++++++- .../apache/camel/processor/ScriptProcessor.java | 14 +++++++- .../camel/processor/SendDynamicProcessor.java | 14 +++++++- .../org/apache/camel/processor/SendProcessor.java | 13 +++++++- .../apache/camel/processor/SetBodyProcessor.java | 14 +++++++- .../apache/camel/processor/SetHeaderProcessor.java | 14 +++++++- .../camel/processor/SetPropertyProcessor.java | 14 +++++++- .../org/apache/camel/processor/SortProcessor.java | 15 ++++++++- .../org/apache/camel/processor/StopProcessor.java | 14 +++++++- .../apache/camel/processor/StreamResequencer.java | 14 +++++++- .../apache/camel/processor/ThreadsProcessor.java | 14 +++++++- .../java/org/apache/camel/processor/Throttler.java | 14 +++++++- .../camel/processor/ThrowExceptionProcessor.java | 14 +++++++- .../apache/camel/processor/TransformProcessor.java | 14 +++++++- .../org/apache/camel/processor/TryProcessor.java | 14 +++++++- .../apache/camel/processor/WireTapProcessor.java | 14 +++++++- .../processor/aggregate/AggregateProcessor.java | 12 ++++++- .../processor/idempotent/IdempotentConsumer.java | 9 +++++ .../loadbalancer/LoadBalancerSupport.java | 14 +++++++- .../org/apache/camel/reifier/ProcessorReifier.java | 8 +++++ .../camel/management/mbean/ManagedProcessor.java | 3 ++ .../management/ManagedUnregisterProducerTest.java | 4 +-- .../camel/support/processor/CamelLogProcessor.java | 14 +++++++- .../camel/support/processor/MarshalProcessor.java | 14 +++++++- .../camel/support/processor/ThroughputLogger.java | 14 +++++++- .../support/processor/UnmarshalProcessor.java | 14 +++++++- 51 files changed, 657 insertions(+), 51 deletions(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/RouteIdAware.java b/core/camel-api/src/main/java/org/apache/camel/spi/RouteIdAware.java new file mode 100644 index 0000000..6d8ce7c --- /dev/null +++ b/core/camel-api/src/main/java/org/apache/camel/spi/RouteIdAware.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spi; + +/** + * To allow objects to be injected with the route id + * <p/> + * This allows access to the route id of the processor at runtime, to know which route its associated with. + */ +public interface RouteIdAware { + + /** + * Gets the route id + */ + String getRouteId(); + + /** + * Sets the route id + * + * @param routeId the route id + */ + void setRouteId(String routeId); + +} diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/CatchProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/CatchProcessor.java index 505b37d..ee6f814 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/CatchProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/CatchProcessor.java @@ -24,6 +24,7 @@ import org.apache.camel.Predicate; import org.apache.camel.Processor; import org.apache.camel.Traceable; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.EventHelper; import org.apache.camel.support.ExchangeHelper; import org.apache.camel.support.processor.DelegateAsyncProcessor; @@ -32,9 +33,10 @@ import org.apache.camel.util.ObjectHelper; /** * A processor which catches exceptions. */ -public class CatchProcessor extends DelegateAsyncProcessor implements Traceable, IdAware { +public class CatchProcessor extends DelegateAsyncProcessor implements Traceable, IdAware, RouteIdAware { private String id; + private String routeId; private final List<Class<? extends Throwable>> exceptions; private final Predicate onWhen; @@ -60,6 +62,16 @@ public class CatchProcessor extends DelegateAsyncProcessor implements Traceable, } @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + + @Override public String getTraceLabel() { return "catch"; } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/ChoiceProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/ChoiceProcessor.java index eab02fa..96f29ba 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/ChoiceProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/ChoiceProcessor.java @@ -27,6 +27,7 @@ import org.apache.camel.Navigate; import org.apache.camel.Processor; import org.apache.camel.Traceable; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.AsyncProcessorConverterHelper; import org.apache.camel.support.AsyncProcessorSupport; import org.apache.camel.support.service.ServiceHelper; @@ -38,9 +39,10 @@ import static org.apache.camel.processor.PipelineHelper.continueProcessing; * they are true their processors are used, with a default otherwise clause used * if none match. */ -public class ChoiceProcessor extends AsyncProcessorSupport implements Navigate<Processor>, Traceable, IdAware { +public class ChoiceProcessor extends AsyncProcessorSupport implements Navigate<Processor>, Traceable, IdAware, RouteIdAware { private String id; + private String routeId; private final List<FilterProcessor> filters; private final Processor otherwise; private transient long notFiltered; @@ -196,6 +198,16 @@ public class ChoiceProcessor extends AsyncProcessorSupport implements Navigate<P } @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + + @Override protected void doStart() throws Exception { ServiceHelper.startService(filters, otherwise); } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/ClaimCheckProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/ClaimCheckProcessor.java index 587fed6..016d463 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/ClaimCheckProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/ClaimCheckProcessor.java @@ -25,6 +25,7 @@ import org.apache.camel.Expression; import org.apache.camel.impl.engine.DefaultClaimCheckRepository; import org.apache.camel.spi.ClaimCheckRepository; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.AsyncProcessorSupport; import org.apache.camel.support.ExchangeHelper; import org.apache.camel.support.LanguageSupport; @@ -39,10 +40,11 @@ import org.apache.camel.util.ObjectHelper; * This guards against concurrent and thread-safe issues. For off-memory persistent storage of data, then use * any of the many Camel components that support persistent storage, and do not use this Claim Check EIP implementation. */ -public class ClaimCheckProcessor extends AsyncProcessorSupport implements IdAware, CamelContextAware { +public class ClaimCheckProcessor extends AsyncProcessorSupport implements IdAware, RouteIdAware, CamelContextAware { private CamelContext camelContext; private String id; + private String routeId; private String operation; private AggregationStrategy aggregationStrategy; private String key; @@ -69,6 +71,16 @@ public class ClaimCheckProcessor extends AsyncProcessorSupport implements IdAwar this.id = id; } + @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + public String getOperation() { return operation; } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/ConvertBodyProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/ConvertBodyProcessor.java index bcdabee..dcb87a8 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/ConvertBodyProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/ConvertBodyProcessor.java @@ -23,6 +23,7 @@ import org.apache.camel.AsyncProcessor; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.AsyncCallbackToCompletableFutureAdapter; import org.apache.camel.support.DefaultMessage; import org.apache.camel.support.ExchangeHelper; @@ -35,8 +36,9 @@ import org.apache.camel.util.ObjectHelper; * <p/> * If the conversion fails an {@link org.apache.camel.InvalidPayloadException} is thrown. */ -public class ConvertBodyProcessor extends ServiceSupport implements AsyncProcessor, IdAware { +public class ConvertBodyProcessor extends ServiceSupport implements AsyncProcessor, IdAware, RouteIdAware { private String id; + private String routeId; private final Class<?> type; private final String charset; @@ -68,6 +70,16 @@ public class ConvertBodyProcessor extends ServiceSupport implements AsyncProcess } @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + + @Override public void process(Exchange exchange) throws Exception { Message old = exchange.getMessage(); diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Delayer.java b/core/camel-base/src/main/java/org/apache/camel/processor/Delayer.java index 9408cbf..d5b3eb5 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/Delayer.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/Delayer.java @@ -24,6 +24,7 @@ import org.apache.camel.Expression; import org.apache.camel.Processor; import org.apache.camel.Traceable; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.RouteIdAware; /** * A <a href="http://camel.apache.org/delayer.html">Delayer</a> which @@ -32,7 +33,8 @@ import org.apache.camel.spi.IdAware; * <p/> * This implementation will block while waiting. */ -public class Delayer extends DelayProcessorSupport implements Traceable, IdAware { +public class Delayer extends DelayProcessorSupport implements Traceable, IdAware, RouteIdAware { + private String routeId; private String id; private Expression delay; private long delayValue; @@ -59,6 +61,16 @@ public class Delayer extends DelayProcessorSupport implements Traceable, IdAware } @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + + @Override public String getTraceLabel() { return "delay[" + delay + "]"; } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java b/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java index 3ede7e2..21dab2f 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java @@ -31,6 +31,7 @@ import org.apache.camel.impl.engine.DefaultProducerCache; import org.apache.camel.spi.EndpointUtilizationStatistics; import org.apache.camel.spi.IdAware; import org.apache.camel.spi.ProducerCache; +import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.AsyncProcessorConverterHelper; import org.apache.camel.support.AsyncProcessorSupport; import org.apache.camel.support.DefaultExchange; @@ -53,10 +54,11 @@ import static org.apache.camel.support.ExchangeHelper.copyResultsPreservePattern * * @see PollEnricher */ -public class Enricher extends AsyncProcessorSupport implements IdAware, CamelContextAware { +public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdAware, CamelContextAware { private CamelContext camelContext; private String id; + private String routeId; private ProducerCache producerCache; private final Expression expression; private AggregationStrategy aggregationStrategy; @@ -89,6 +91,16 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, CamelCon this.id = id; } + @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + public Expression getExpression() { return expression; } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/ExchangePatternProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/ExchangePatternProcessor.java index 9da6562..516bcc7 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/ExchangePatternProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/ExchangePatternProcessor.java @@ -20,13 +20,15 @@ import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.AsyncProcessorSupport; /** * Processor to set {@link org.apache.camel.ExchangePattern} on the {@link org.apache.camel.Exchange}. */ -public class ExchangePatternProcessor extends AsyncProcessorSupport implements IdAware { +public class ExchangePatternProcessor extends AsyncProcessorSupport implements IdAware, RouteIdAware { private String id; + private String routeId; private ExchangePattern exchangePattern = ExchangePattern.InOnly; public ExchangePatternProcessor() { @@ -50,6 +52,16 @@ public class ExchangePatternProcessor extends AsyncProcessorSupport implements I this.id = id; } + @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + public ExchangePattern getExchangePattern() { return exchangePattern; } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/FilterProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/FilterProcessor.java index 880d2d6..a07576a 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/FilterProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/FilterProcessor.java @@ -22,6 +22,7 @@ import org.apache.camel.Predicate; import org.apache.camel.Processor; import org.apache.camel.Traceable; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.processor.DelegateAsyncProcessor; import org.apache.camel.support.service.ServiceHelper; @@ -29,9 +30,10 @@ import org.apache.camel.support.service.ServiceHelper; * The processor which implements the * <a href="http://camel.apache.org/message-filter.html">Message Filter</a> EIP pattern. */ -public class FilterProcessor extends DelegateAsyncProcessor implements Traceable, IdAware { +public class FilterProcessor extends DelegateAsyncProcessor implements Traceable, IdAware, RouteIdAware { private String id; + private String routeId; private final Predicate predicate; private transient long filtered; @@ -89,6 +91,16 @@ public class FilterProcessor extends DelegateAsyncProcessor implements Traceable } @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + + @Override public String getTraceLabel() { return "filter[if: " + predicate + "]"; } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/FinallyProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/FinallyProcessor.java index bb3f0bc..d43c373 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/FinallyProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/FinallyProcessor.java @@ -21,15 +21,17 @@ import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.Traceable; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.ExchangeHelper; import org.apache.camel.support.processor.DelegateAsyncProcessor; /** * Processor to handle do finally supporting asynchronous routing engine */ -public class FinallyProcessor extends DelegateAsyncProcessor implements Traceable, IdAware { +public class FinallyProcessor extends DelegateAsyncProcessor implements Traceable, IdAware, RouteIdAware { private String id; + private String routeId; public FinallyProcessor(Processor processor) { super(processor); @@ -73,6 +75,16 @@ public class FinallyProcessor extends DelegateAsyncProcessor implements Traceabl this.id = id; } + @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + private final class FinallyAsyncCallback implements AsyncCallback { private final Exchange exchange; diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/LogProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/LogProcessor.java index c2c5f49..5d2203e 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/LogProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/LogProcessor.java @@ -26,14 +26,16 @@ import org.apache.camel.spi.CamelLogger; import org.apache.camel.spi.IdAware; import org.apache.camel.spi.LogListener; import org.apache.camel.spi.MaskingFormatter; +import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.AsyncProcessorSupport; /** * A processor which evaluates an {@link Expression} and logs it. */ -public class LogProcessor extends AsyncProcessorSupport implements Traceable, IdAware { +public class LogProcessor extends AsyncProcessorSupport implements Traceable, IdAware, RouteIdAware { private String id; + private String routeId; private final Expression expression; private final CamelLogger logger; private final MaskingFormatter formatter; @@ -107,6 +109,16 @@ public class LogProcessor extends AsyncProcessorSupport implements Traceable, Id this.id = id; } + @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + public Expression getExpression() { return expression; } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/LoopProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/LoopProcessor.java index 6ef3a5b..d368555 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/LoopProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/LoopProcessor.java @@ -24,6 +24,7 @@ import org.apache.camel.Predicate; import org.apache.camel.Processor; import org.apache.camel.Traceable; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.ExchangeHelper; import org.apache.camel.support.processor.DelegateAsyncProcessor; @@ -32,9 +33,10 @@ import static org.apache.camel.processor.PipelineHelper.continueProcessing; /** * The processor which sends messages in a loop. */ -public class LoopProcessor extends DelegateAsyncProcessor implements Traceable, IdAware { +public class LoopProcessor extends DelegateAsyncProcessor implements Traceable, IdAware, RouteIdAware { private String id; + private String routeId; private final Expression expression; private final Predicate predicate; private final boolean copy; @@ -49,7 +51,6 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable, @Override public boolean process(Exchange exchange, AsyncCallback callback) { try { - LoopState state = new LoopState(exchange, callback); if (exchange.isTransacted()) { @@ -58,7 +59,6 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable, exchange.getContext().getReactiveExecutor().scheduleMain(state); } return false; - } catch (Exception e) { exchange.setException(e); callback.done(true); @@ -183,6 +183,16 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable, } @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + + @Override public String toString() { if (predicate != null) { return "Loop[while: " + predicate + " do: " + getProcessor() + "]"; diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/MethodCallProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/MethodCallProcessor.java index 0263dc4..6a7e111 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/MethodCallProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/MethodCallProcessor.java @@ -22,6 +22,7 @@ import org.apache.camel.Expression; import org.apache.camel.Message; import org.apache.camel.Traceable; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.AsyncProcessorSupport; import org.apache.camel.support.DefaultMessage; import org.apache.camel.support.ExchangeHelper; @@ -30,8 +31,9 @@ import org.apache.camel.util.ObjectHelper; /** * A processor which are used when calling a method and setting the response as the message body */ -public class MethodCallProcessor extends AsyncProcessorSupport implements Traceable, IdAware { +public class MethodCallProcessor extends AsyncProcessorSupport implements Traceable, IdAware, RouteIdAware { private String id; + private String routeId; private final Expression expression; public MethodCallProcessor(Expression expression) { @@ -104,6 +106,16 @@ public class MethodCallProcessor extends AsyncProcessorSupport implements Tracea this.id = id; } + @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + public Expression getExpression() { return expression; } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java index 9172b76..b0d49f0 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -51,6 +51,7 @@ import org.apache.camel.StreamCache; import org.apache.camel.Traceable; import org.apache.camel.spi.IdAware; import org.apache.camel.spi.RouteContext; +import org.apache.camel.spi.RouteIdAware; import org.apache.camel.spi.UnitOfWork; import org.apache.camel.support.AsyncProcessorConverterHelper; import org.apache.camel.support.AsyncProcessorSupport; @@ -71,7 +72,7 @@ import static org.apache.camel.util.ObjectHelper.notNull; * * @see Pipeline */ -public class MulticastProcessor extends AsyncProcessorSupport implements Navigate<Processor>, Traceable, IdAware { +public class MulticastProcessor extends AsyncProcessorSupport implements Navigate<Processor>, Traceable, IdAware, RouteIdAware { /** * Class that represent each step in the multicast route to do @@ -140,6 +141,7 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat protected final Processor onPrepare; private final CamelContext camelContext; private String id; + private String routeId; private Collection<Processor> processors; private final AggregationStrategy aggregationStrategy; private final boolean parallelProcessing; @@ -206,6 +208,16 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat } @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + + @Override public String getTraceLabel() { return "multicast"; } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java index b5a5cc5..686e00c 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java @@ -30,6 +30,7 @@ import org.apache.camel.Processor; import org.apache.camel.Route; import org.apache.camel.Traceable; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.AsyncProcessorSupport; import org.apache.camel.support.ExchangeHelper; import org.apache.camel.support.SynchronizationAdapter; @@ -40,10 +41,11 @@ import static org.apache.camel.util.ObjectHelper.notNull; /** * Processor implementing <a href="http://camel.apache.org/oncompletion.html">onCompletion</a>. */ -public class OnCompletionProcessor extends AsyncProcessorSupport implements Traceable, IdAware { +public class OnCompletionProcessor extends AsyncProcessorSupport implements Traceable, IdAware, RouteIdAware { private final CamelContext camelContext; private String id; + private String routeId; private final Processor processor; private final ExecutorService executorService; private final boolean shutdownExecutorService; @@ -101,6 +103,16 @@ public class OnCompletionProcessor extends AsyncProcessorSupport implements Trac } @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + + @Override public boolean process(Exchange exchange, AsyncCallback callback) { if (processor != null) { // register callback diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java index f4aceac..fcc9baa 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java @@ -30,6 +30,7 @@ import org.apache.camel.Navigate; import org.apache.camel.Processor; import org.apache.camel.Traceable; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.AsyncProcessorConverterHelper; import org.apache.camel.support.AsyncProcessorSupport; import org.apache.camel.support.ExchangeHelper; @@ -41,11 +42,12 @@ import static org.apache.camel.processor.PipelineHelper.continueProcessing; * Creates a Pipeline pattern where the output of the previous step is sent as * input to the next step, reusing the same message exchanges */ -public class Pipeline extends AsyncProcessorSupport implements Navigate<Processor>, Traceable, IdAware { +public class Pipeline extends AsyncProcessorSupport implements Navigate<Processor>, Traceable, IdAware, RouteIdAware { private final CamelContext camelContext; private List<AsyncProcessor> processors; private String id; + private String routeId; public Pipeline(CamelContext camelContext, Collection<Processor> processors) { this.camelContext = camelContext; @@ -165,6 +167,16 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo } @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + + @Override public List<Processor> next() { if (!hasNext()) { return null; diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/PollEnricher.java b/core/camel-base/src/main/java/org/apache/camel/processor/PollEnricher.java index 18c9595..f947a0b 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/PollEnricher.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/PollEnricher.java @@ -31,6 +31,7 @@ import org.apache.camel.spi.ConsumerCache; import org.apache.camel.spi.EndpointUtilizationStatistics; import org.apache.camel.spi.ExceptionHandler; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.AsyncProcessorSupport; import org.apache.camel.support.BridgeExceptionHandlerToErrorHandler; import org.apache.camel.support.DefaultConsumer; @@ -52,11 +53,12 @@ import static org.apache.camel.support.ExchangeHelper.copyResultsPreservePattern * * @see Enricher */ -public class PollEnricher extends AsyncProcessorSupport implements IdAware, CamelContextAware { +public class PollEnricher extends AsyncProcessorSupport implements IdAware, RouteIdAware, CamelContextAware { private CamelContext camelContext; private ConsumerCache consumerCache; private String id; + private String routeId; private AggregationStrategy aggregationStrategy; private final Expression expression; private long timeout; @@ -95,6 +97,16 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Came this.id = id; } + @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + public Expression getExpression() { return expression; } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/RecipientList.java b/core/camel-base/src/main/java/org/apache/camel/processor/RecipientList.java index 6523c31..037b064 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/RecipientList.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/RecipientList.java @@ -31,6 +31,7 @@ import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy; import org.apache.camel.spi.EndpointUtilizationStatistics; import org.apache.camel.spi.IdAware; import org.apache.camel.spi.ProducerCache; +import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.AsyncProcessorSupport; import org.apache.camel.support.ExchangeHelper; import org.apache.camel.support.ObjectHelper; @@ -45,11 +46,12 @@ import static org.apache.camel.util.ObjectHelper.notNull; * pattern where the list of actual endpoints to send a message exchange to are * dependent on some dynamic expression. */ -public class RecipientList extends AsyncProcessorSupport implements IdAware { +public class RecipientList extends AsyncProcessorSupport implements IdAware, RouteIdAware { private static final String IGNORE_DELIMITER_MARKER = "false"; private final CamelContext camelContext; private String id; + private String routeId; private ProducerCache producerCache; private Expression expression; private final String delimiter; @@ -110,6 +112,16 @@ public class RecipientList extends AsyncProcessorSupport implements IdAware { } @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + + @Override public boolean process(Exchange exchange, AsyncCallback callback) { if (!isStarted()) { throw new IllegalStateException("RecipientList has not been started: " + this); diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/RemoveHeaderProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/RemoveHeaderProcessor.java index 1be2c87..07e93dc 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/RemoveHeaderProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/RemoveHeaderProcessor.java @@ -20,14 +20,16 @@ import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.Traceable; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.AsyncProcessorSupport; /** * A processor which removes the header from the IN or OUT message */ -public class RemoveHeaderProcessor extends AsyncProcessorSupport implements Traceable, IdAware { +public class RemoveHeaderProcessor extends AsyncProcessorSupport implements Traceable, IdAware, RouteIdAware { private final String headerName; private String id; + private String routeId; public RemoveHeaderProcessor(String headerName) { this.headerName = headerName; @@ -65,6 +67,16 @@ public class RemoveHeaderProcessor extends AsyncProcessorSupport implements Trac this.id = id; } + @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + public String getHeaderName() { return headerName; } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/RemoveHeadersProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/RemoveHeadersProcessor.java index 95b993d..27e1249 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/RemoveHeadersProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/RemoveHeadersProcessor.java @@ -20,13 +20,15 @@ import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.Traceable; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.AsyncProcessorSupport; /** * A processor which removes one ore more headers from the IN or OUT message */ -public class RemoveHeadersProcessor extends AsyncProcessorSupport implements Traceable, IdAware { +public class RemoveHeadersProcessor extends AsyncProcessorSupport implements Traceable, IdAware, RouteIdAware { private String id; + private String routeId; private final String pattern; private final String[] excludePattern; @@ -67,6 +69,16 @@ public class RemoveHeadersProcessor extends AsyncProcessorSupport implements Tra this.id = id; } + @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + public String getPattern() { return pattern; } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/RemovePropertiesProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/RemovePropertiesProcessor.java index 86818fb..f7b9016 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/RemovePropertiesProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/RemovePropertiesProcessor.java @@ -20,13 +20,15 @@ import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.Traceable; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.AsyncProcessorSupport; /** * A processor which removes one ore more properties from the exchange */ -public class RemovePropertiesProcessor extends AsyncProcessorSupport implements Traceable, IdAware { +public class RemovePropertiesProcessor extends AsyncProcessorSupport implements Traceable, IdAware, RouteIdAware { private String id; + private String routeId; private final String pattern; private final String[] excludePattern; @@ -67,6 +69,16 @@ public class RemovePropertiesProcessor extends AsyncProcessorSupport implements this.id = id; } + @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + public String getPattern() { return pattern; } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/RemovePropertyProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/RemovePropertyProcessor.java index 8cdf7a5..b7bf5f9 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/RemovePropertyProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/RemovePropertyProcessor.java @@ -20,13 +20,15 @@ import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.Traceable; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.AsyncProcessorSupport; /** * A processor which removes the property from the exchange */ -public class RemovePropertyProcessor extends AsyncProcessorSupport implements Traceable, IdAware { +public class RemovePropertyProcessor extends AsyncProcessorSupport implements Traceable, IdAware, RouteIdAware { private String id; + private String routeId; private final String propertyName; public RemovePropertyProcessor(String propertyName) { @@ -65,6 +67,16 @@ public class RemovePropertyProcessor extends AsyncProcessorSupport implements Tr this.id = id; } + @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + public String getPropertyName() { return propertyName; } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Resequencer.java b/core/camel-base/src/main/java/org/apache/camel/processor/Resequencer.java index 0ce608e..f2598c5 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/Resequencer.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/Resequencer.java @@ -44,6 +44,7 @@ import org.apache.camel.Processor; import org.apache.camel.Traceable; import org.apache.camel.spi.ExceptionHandler; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.AsyncProcessorConverterHelper; import org.apache.camel.support.AsyncProcessorSupport; import org.apache.camel.support.ExpressionComparator; @@ -55,12 +56,13 @@ import org.apache.camel.util.ObjectHelper; * An implementation of the <a href="http://camel.apache.org/resequencer.html">Resequencer</a> * which can reorder messages within a batch. */ -public class Resequencer extends AsyncProcessorSupport implements Navigate<Processor>, IdAware, Traceable { +public class Resequencer extends AsyncProcessorSupport implements Navigate<Processor>, IdAware, RouteIdAware, Traceable { public static final long DEFAULT_BATCH_TIMEOUT = 1000L; public static final int DEFAULT_BATCH_SIZE = 100; private String id; + private String routeId; private long batchTimeout = DEFAULT_BATCH_TIMEOUT; private int batchSize = DEFAULT_BATCH_SIZE; private int outBatchSize; @@ -250,7 +252,17 @@ public class Resequencer extends AsyncProcessorSupport implements Navigate<Proce this.id = id; } - // Implementation methods + @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + +// Implementation methods //------------------------------------------------------------------------- protected static Set<Exchange> createSet(Expression expression, boolean allowDuplicates, boolean reverse) { diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/RollbackProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/RollbackProcessor.java index e91c6a3..7ed8589 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/RollbackProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/RollbackProcessor.java @@ -21,14 +21,16 @@ import org.apache.camel.Exchange; import org.apache.camel.RollbackExchangeException; import org.apache.camel.Traceable; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.AsyncProcessorSupport; /** * Processor for marking an {@link org.apache.camel.Exchange} to rollback. */ -public class RollbackProcessor extends AsyncProcessorSupport implements Traceable, IdAware { +public class RollbackProcessor extends AsyncProcessorSupport implements Traceable, IdAware, RouteIdAware { private String id; + private String routeId; private boolean markRollbackOnly; private boolean markRollbackOnlyLast; private String message; @@ -92,6 +94,16 @@ public class RollbackProcessor extends AsyncProcessorSupport implements Traceabl this.id = id; } + @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + public String getMessage() { return message; } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/RoutingSlip.java b/core/camel-base/src/main/java/org/apache/camel/processor/RoutingSlip.java index b79f264..975456d 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/RoutingSlip.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/RoutingSlip.java @@ -32,6 +32,7 @@ import org.apache.camel.spi.EndpointUtilizationStatistics; import org.apache.camel.spi.IdAware; import org.apache.camel.spi.ProducerCache; import org.apache.camel.spi.RouteContext; +import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.AsyncProcessorSupport; import org.apache.camel.support.DefaultExchange; import org.apache.camel.support.ExchangeHelper; @@ -52,9 +53,10 @@ import static org.apache.camel.util.ObjectHelper.notNull; * as the failover load balancer is a specialized pipeline. So the trick is to keep doing the same as the * pipeline to ensure it works the same and the async routing engine is flawless. */ -public class RoutingSlip extends AsyncProcessorSupport implements Traceable, IdAware { +public class RoutingSlip extends AsyncProcessorSupport implements Traceable, IdAware, RouteIdAware { protected String id; + protected String routeId; protected ProducerCache producerCache; protected int cacheSize; protected boolean ignoreInvalidEndpoints; @@ -112,6 +114,16 @@ public class RoutingSlip extends AsyncProcessorSupport implements Traceable, IdA this.id = id; } + @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + public Expression getExpression() { return expression; } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/SamplingThrottler.java b/core/camel-base/src/main/java/org/apache/camel/processor/SamplingThrottler.java index 10b2492..974cf4b 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/SamplingThrottler.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/SamplingThrottler.java @@ -23,6 +23,7 @@ import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.Traceable; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.AsyncProcessorSupport; /** @@ -36,9 +37,10 @@ import org.apache.camel.support.AsyncProcessorSupport; * an exchange stream, rough consolidation of noisy and bursty exchange traffic * or where queuing of throttled exchanges is undesirable. */ -public class SamplingThrottler extends AsyncProcessorSupport implements Traceable, IdAware { +public class SamplingThrottler extends AsyncProcessorSupport implements Traceable, IdAware, RouteIdAware { private String id; + private String routeId; private long messageFrequency; private long currentMessageCount; private long samplePeriod; @@ -88,6 +90,16 @@ public class SamplingThrottler extends AsyncProcessorSupport implements Traceabl } @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + + @Override public String getTraceLabel() { if (messageFrequency > 0) { return "samplingThrottler[1 exchange per: " + messageFrequency + " messages received]"; diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/ScriptProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/ScriptProcessor.java index 6183585..4d72dbc 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/ScriptProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/ScriptProcessor.java @@ -21,14 +21,16 @@ import org.apache.camel.Exchange; import org.apache.camel.Expression; import org.apache.camel.Traceable; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.AsyncProcessorSupport; import org.apache.camel.util.ObjectHelper; /** * A processor which executes the script as an expression and does not change the message body. */ -public class ScriptProcessor extends AsyncProcessorSupport implements Traceable, IdAware { +public class ScriptProcessor extends AsyncProcessorSupport implements Traceable, IdAware, RouteIdAware { private String id; + private String routeId; private final Expression expression; public ScriptProcessor(Expression expression) { @@ -68,6 +70,16 @@ public class ScriptProcessor extends AsyncProcessorSupport implements Traceable, this.id = id; } + @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + public Expression getExpression() { return expression; } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java index 7cc21f8..12234bd 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java @@ -30,6 +30,7 @@ import org.apache.camel.impl.engine.DefaultProducerCache; import org.apache.camel.spi.EndpointUtilizationStatistics; import org.apache.camel.spi.IdAware; import org.apache.camel.spi.ProducerCache; +import org.apache.camel.spi.RouteIdAware; import org.apache.camel.spi.SendDynamicAware; import org.apache.camel.support.AsyncProcessorSupport; import org.apache.camel.support.EndpointHelper; @@ -42,7 +43,7 @@ import org.apache.camel.util.URISupport; * * @see org.apache.camel.processor.SendProcessor */ -public class SendDynamicProcessor extends AsyncProcessorSupport implements IdAware, CamelContextAware { +public class SendDynamicProcessor extends AsyncProcessorSupport implements IdAware, RouteIdAware, CamelContextAware { protected SendDynamicAware dynamicAware; protected CamelContext camelContext; @@ -51,6 +52,7 @@ public class SendDynamicProcessor extends AsyncProcessorSupport implements IdAwa protected ExchangePattern pattern; protected ProducerCache producerCache; protected String id; + protected String routeId; protected boolean ignoreInvalidEndpoint; protected int cacheSize; protected boolean allowOptimisedComponents = true; @@ -76,6 +78,16 @@ public class SendDynamicProcessor extends AsyncProcessorSupport implements IdAwa } @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + + @Override public boolean process(Exchange exchange, final AsyncCallback callback) { if (!isStarted()) { exchange.setException(new IllegalStateException("SendProcessor has not been started: " + this)); diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/SendProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/SendProcessor.java index cb7abc6..44bc3bd 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/SendProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/SendProcessor.java @@ -30,6 +30,7 @@ import org.apache.camel.Traceable; import org.apache.camel.impl.engine.DefaultProducerCache; import org.apache.camel.spi.IdAware; import org.apache.camel.spi.ProducerCache; +import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.AsyncProcessorSupport; import org.apache.camel.support.EndpointHelper; import org.apache.camel.support.EventHelper; @@ -43,7 +44,7 @@ import org.apache.camel.util.URISupport; * * @see SendDynamicProcessor */ -public class SendProcessor extends AsyncProcessorSupport implements Traceable, EndpointAware, IdAware { +public class SendProcessor extends AsyncProcessorSupport implements Traceable, EndpointAware, IdAware, RouteIdAware { protected transient String traceLabelToString; protected final CamelContext camelContext; @@ -53,6 +54,7 @@ public class SendProcessor extends AsyncProcessorSupport implements Traceable, E protected Endpoint destination; protected ExchangePattern destinationExchangePattern; protected String id; + protected String routeId; protected volatile long counter; public SendProcessor(Endpoint destination) { @@ -88,6 +90,15 @@ public class SendProcessor extends AsyncProcessorSupport implements Traceable, E this.id = id; } + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + @Override public String getTraceLabel() { if (traceLabelToString == null) { diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/SetBodyProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/SetBodyProcessor.java index ba97651..d39f2ab 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/SetBodyProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/SetBodyProcessor.java @@ -22,6 +22,7 @@ import org.apache.camel.Expression; import org.apache.camel.Message; import org.apache.camel.Traceable; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.AsyncProcessorSupport; import org.apache.camel.support.DefaultMessage; import org.apache.camel.support.ExchangeHelper; @@ -29,8 +30,9 @@ import org.apache.camel.support.ExchangeHelper; /** * A processor which sets the body on the IN or OUT message with an {@link Expression} */ -public class SetBodyProcessor extends AsyncProcessorSupport implements Traceable, IdAware { +public class SetBodyProcessor extends AsyncProcessorSupport implements Traceable, IdAware, RouteIdAware { private String id; + private String routeId; private final Expression expression; public SetBodyProcessor(Expression expression) { @@ -93,6 +95,16 @@ public class SetBodyProcessor extends AsyncProcessorSupport implements Traceable this.id = id; } + @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + public Expression getExpression() { return expression; } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/SetHeaderProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/SetHeaderProcessor.java index 124d71d..1fe8188 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/SetHeaderProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/SetHeaderProcessor.java @@ -22,14 +22,16 @@ import org.apache.camel.Expression; import org.apache.camel.Message; import org.apache.camel.Traceable; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.AsyncProcessorSupport; import org.apache.camel.util.ObjectHelper; /** * A processor which sets the header on the IN or OUT message with an {@link org.apache.camel.Expression} */ -public class SetHeaderProcessor extends AsyncProcessorSupport implements Traceable, IdAware { +public class SetHeaderProcessor extends AsyncProcessorSupport implements Traceable, IdAware, RouteIdAware { private String id; + private String routeId; private final Expression headerName; private final Expression expression; @@ -84,6 +86,16 @@ public class SetHeaderProcessor extends AsyncProcessorSupport implements Traceab this.id = id; } + @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + public String getHeaderName() { return headerName.toString(); } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/SetPropertyProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/SetPropertyProcessor.java index a1292d4..aaf29d4 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/SetPropertyProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/SetPropertyProcessor.java @@ -21,14 +21,16 @@ import org.apache.camel.Exchange; import org.apache.camel.Expression; import org.apache.camel.Traceable; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.AsyncProcessorSupport; import org.apache.camel.util.ObjectHelper; /** * A processor which sets the property on the exchange with an {@link org.apache.camel.Expression} */ -public class SetPropertyProcessor extends AsyncProcessorSupport implements Traceable, IdAware { +public class SetPropertyProcessor extends AsyncProcessorSupport implements Traceable, IdAware, RouteIdAware { private String id; + private String routeId; private final Expression propertyName; private final Expression expression; @@ -80,6 +82,16 @@ public class SetPropertyProcessor extends AsyncProcessorSupport implements Trace this.id = id; } + @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + public String getPropertyName() { return propertyName.toString(); } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/SortProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/SortProcessor.java index 18d27ef..4bb4373 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/SortProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/SortProcessor.java @@ -22,15 +22,18 @@ import java.util.List; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.Expression; +import org.apache.camel.Traceable; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.AsyncProcessorSupport; /** * A processor that sorts the expression using a comparator */ -public class SortProcessor<T> extends AsyncProcessorSupport implements IdAware, org.apache.camel.Traceable { +public class SortProcessor<T> extends AsyncProcessorSupport implements IdAware, RouteIdAware, Traceable { private String id; + private String routeId; private final Expression expression; private final Comparator<? super T> comparator; @@ -75,6 +78,16 @@ public class SortProcessor<T> extends AsyncProcessorSupport implements IdAware, this.id = id; } + @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + public Expression getExpression() { return expression; } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/StopProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/StopProcessor.java index 5532fe5..aefd1ce 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/StopProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/StopProcessor.java @@ -19,14 +19,16 @@ package org.apache.camel.processor; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.AsyncProcessorSupport; /** * Stops continue processing the route and marks it as complete. */ -public class StopProcessor extends AsyncProcessorSupport implements IdAware { +public class StopProcessor extends AsyncProcessorSupport implements IdAware, RouteIdAware { private String id; + private String routeId; @Override public boolean process(Exchange exchange, AsyncCallback callback) { @@ -53,6 +55,16 @@ public class StopProcessor extends AsyncProcessorSupport implements IdAware { } @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + + @Override protected void doStart() throws Exception { // noop } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/StreamResequencer.java b/core/camel-base/src/main/java/org/apache/camel/processor/StreamResequencer.java index c81edf8..93f52ab 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/StreamResequencer.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/StreamResequencer.java @@ -36,6 +36,7 @@ import org.apache.camel.processor.resequencer.SequenceElementComparator; import org.apache.camel.processor.resequencer.SequenceSender; import org.apache.camel.spi.ExceptionHandler; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.AsyncProcessorSupport; import org.apache.camel.support.LoggingExceptionHandler; import org.apache.camel.support.service.ServiceHelper; @@ -62,9 +63,10 @@ import org.apache.camel.util.ObjectHelper; * * @see ResequencerEngine */ -public class StreamResequencer extends AsyncProcessorSupport implements SequenceSender<Exchange>, Navigate<Processor>, Traceable, IdAware { +public class StreamResequencer extends AsyncProcessorSupport implements SequenceSender<Exchange>, Navigate<Processor>, Traceable, IdAware, RouteIdAware { private String id; + private String routeId; private final CamelContext camelContext; private final ExceptionHandler exceptionHandler; private final ResequencerEngine<Exchange> engine; @@ -189,6 +191,16 @@ public class StreamResequencer extends AsyncProcessorSupport implements Sequence } @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + + @Override protected void doStart() throws Exception { ServiceHelper.startService(processor); delivery = new Delivery(); diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/ThreadsProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/ThreadsProcessor.java index 0e7868b..af737a6 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/ThreadsProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/ThreadsProcessor.java @@ -25,6 +25,7 @@ import org.apache.camel.AsyncCallback; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.AsyncProcessorSupport; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.concurrent.Rejectable; @@ -53,9 +54,10 @@ import org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy; * will not be free to process a new exchange, as its processing the current exchange.</li> * </ul> */ -public class ThreadsProcessor extends AsyncProcessorSupport implements IdAware { +public class ThreadsProcessor extends AsyncProcessorSupport implements IdAware, RouteIdAware { private String id; + private String routeId; private final CamelContext camelContext; private final ExecutorService executorService; private final ThreadPoolRejectedPolicy rejectedPolicy; @@ -164,6 +166,16 @@ public class ThreadsProcessor extends AsyncProcessorSupport implements IdAware { this.id = id; } + @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + public ThreadPoolRejectedPolicy getRejectedPolicy() { return rejectedPolicy; } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Throttler.java b/core/camel-base/src/main/java/org/apache/camel/processor/Throttler.java index 0c68d8e..4ebb520 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/Throttler.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/Throttler.java @@ -33,6 +33,7 @@ import org.apache.camel.Expression; import org.apache.camel.RuntimeExchangeException; import org.apache.camel.Traceable; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.AsyncProcessorSupport; import org.apache.camel.util.ObjectHelper; @@ -56,7 +57,7 @@ import org.apache.camel.util.ObjectHelper; * callers point of view in the last timePeriodMillis no more than * maxRequestsPerPeriod have been allowed to be acquired. */ -public class Throttler extends AsyncProcessorSupport implements Traceable, IdAware { +public class Throttler extends AsyncProcessorSupport implements Traceable, IdAware, RouteIdAware { private static final String DEFAULT_KEY = "CamelThrottlerDefaultKey"; @@ -72,6 +73,7 @@ public class Throttler extends AsyncProcessorSupport implements Traceable, IdAwa private volatile long timePeriodMillis; private volatile long cleanPeriodMillis; private String id; + private String routeId; private Expression maxRequestsPerPeriodExpression; private boolean rejectExecution; private boolean asyncDelayed; @@ -389,6 +391,16 @@ public class Throttler extends AsyncProcessorSupport implements Traceable, IdAwa this.id = id; } + @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + /** * Sets the maximum number of requests per time period expression */ diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/ThrowExceptionProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/ThrowExceptionProcessor.java index 0f83a98..68ee7e5 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/ThrowExceptionProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/ThrowExceptionProcessor.java @@ -26,14 +26,16 @@ import org.apache.camel.Exchange; import org.apache.camel.Expression; import org.apache.camel.Traceable; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.AsyncProcessorSupport; import org.apache.camel.util.ObjectHelper; /** * The processor which sets an {@link Exception} on the {@link Exchange} */ -public class ThrowExceptionProcessor extends AsyncProcessorSupport implements Traceable, IdAware, CamelContextAware { +public class ThrowExceptionProcessor extends AsyncProcessorSupport implements Traceable, IdAware, RouteIdAware, CamelContextAware { private String id; + private String routeId; private CamelContext camelContext; private Expression simple; private final Exception exception; @@ -94,6 +96,16 @@ public class ThrowExceptionProcessor extends AsyncProcessorSupport implements Tr this.id = id; } + @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + public Exception getException() { return exception; } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/TransformProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/TransformProcessor.java index d218d01..19924d0 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/TransformProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/TransformProcessor.java @@ -22,6 +22,7 @@ import org.apache.camel.Expression; import org.apache.camel.Message; import org.apache.camel.Traceable; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.AsyncProcessorSupport; import org.apache.camel.support.DefaultMessage; import org.apache.camel.support.ExchangeHelper; @@ -30,8 +31,9 @@ import org.apache.camel.util.ObjectHelper; /** * A processor which sets the body on the OUT message with an {@link Expression}. */ -public class TransformProcessor extends AsyncProcessorSupport implements Traceable, IdAware { +public class TransformProcessor extends AsyncProcessorSupport implements Traceable, IdAware, RouteIdAware { private String id; + private String routeId; private final Expression expression; public TransformProcessor(Expression expression) { @@ -100,6 +102,16 @@ public class TransformProcessor extends AsyncProcessorSupport implements Traceab this.id = id; } + @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + public Expression getExpression() { return expression; } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/TryProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/TryProcessor.java index d1af436..2d70216 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/TryProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/TryProcessor.java @@ -27,6 +27,7 @@ import org.apache.camel.Navigate; import org.apache.camel.Processor; import org.apache.camel.Traceable; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.AsyncProcessorConverterHelper; import org.apache.camel.support.AsyncProcessorSupport; import org.apache.camel.support.ExchangeHelper; @@ -35,9 +36,10 @@ import org.apache.camel.support.service.ServiceHelper; /** * Implements try/catch/finally type processing */ -public class TryProcessor extends AsyncProcessorSupport implements Navigate<Processor>, Traceable, IdAware { +public class TryProcessor extends AsyncProcessorSupport implements Navigate<Processor>, Traceable, IdAware, RouteIdAware { protected String id; + protected String routeId; protected final Processor tryProcessor; protected final List<Processor> catchClauses; protected final Processor finallyProcessor; @@ -164,4 +166,14 @@ public class TryProcessor extends AsyncProcessorSupport implements Navigate<Proc public void setId(String id) { this.id = id; } + + @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/WireTapProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/WireTapProcessor.java index 3c8f190..8ec1f0c 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/WireTapProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/WireTapProcessor.java @@ -36,6 +36,7 @@ import org.apache.camel.StreamCache; import org.apache.camel.Traceable; import org.apache.camel.spi.EndpointUtilizationStatistics; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.RouteIdAware; import org.apache.camel.spi.ShutdownAware; import org.apache.camel.support.AsyncProcessorConverterHelper; import org.apache.camel.support.AsyncProcessorSupport; @@ -47,9 +48,10 @@ import org.apache.camel.util.ObjectHelper; /** * Processor for wire tapping exchanges to an endpoint destination. */ -public class WireTapProcessor extends AsyncProcessorSupport implements Traceable, ShutdownAware, IdAware, CamelContextAware { +public class WireTapProcessor extends AsyncProcessorSupport implements Traceable, ShutdownAware, IdAware, RouteIdAware, CamelContextAware { private String id; + private String routeId; private CamelContext camelContext; private final SendDynamicProcessor dynamicProcessor; private final String uri; @@ -100,6 +102,16 @@ public class WireTapProcessor extends AsyncProcessorSupport implements Traceable } @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + + @Override public CamelContext getCamelContext() { return camelContext; } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java index ad0f96d..71028c1 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java @@ -55,6 +55,7 @@ import org.apache.camel.spi.ExceptionHandler; import org.apache.camel.spi.IdAware; import org.apache.camel.spi.OptimisticLockingAggregationRepository; import org.apache.camel.spi.RecoverableAggregationRepository; +import org.apache.camel.spi.RouteIdAware; import org.apache.camel.spi.ShutdownAware; import org.apache.camel.spi.ShutdownPrepared; import org.apache.camel.spi.Synchronization; @@ -84,7 +85,7 @@ import org.apache.camel.util.TimeUtils; * and older prices are discarded). Another idea is to combine line item messages * together into a single invoice message. */ -public class AggregateProcessor extends AsyncProcessorSupport implements Navigate<Processor>, Traceable, ShutdownPrepared, ShutdownAware, IdAware { +public class AggregateProcessor extends AsyncProcessorSupport implements Navigate<Processor>, Traceable, ShutdownPrepared, ShutdownAware, IdAware, RouteIdAware { public static final String AGGREGATE_TIMEOUT_CHECKER = "AggregateTimeoutChecker"; public static final String AGGREGATE_OPTIMISTIC_LOCKING_EXECUTOR = "AggregateOptimisticLockingExecutor"; @@ -102,6 +103,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat private final CamelContext camelContext; private final AsyncProcessor processor; private String id; + private String routeId; private AggregationStrategy aggregationStrategy; private boolean preCompletion; private Expression correlationExpression; @@ -295,6 +297,14 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat this.id = id; } + public String getRouteId() { + return routeId; + } + + public void setRouteId(String routeId) { + this.routeId = routeId; + } + @Override public boolean process(Exchange exchange, AsyncCallback callback) { try { diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java b/core/camel-base/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java index 9752419..8a24cb3 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java @@ -47,6 +47,7 @@ public class IdempotentConsumer extends AsyncProcessorSupport implements CamelCo private CamelContext camelContext; private String id; + private String routeId; private final Expression messageIdExpression; private final AsyncProcessor processor; private final IdempotentRepository idempotentRepository; @@ -92,6 +93,14 @@ public class IdempotentConsumer extends AsyncProcessorSupport implements CamelCo this.id = id; } + public String getRouteId() { + return routeId; + } + + public void setRouteId(String routeId) { + this.routeId = routeId; + } + @Override public boolean process(final Exchange exchange, final AsyncCallback callback) { final AsyncCallback target; diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancerSupport.java b/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancerSupport.java index aae3fbf..d098969 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancerSupport.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancerSupport.java @@ -25,16 +25,18 @@ import org.apache.camel.AsyncProcessor; import org.apache.camel.Navigate; import org.apache.camel.Processor; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.AsyncProcessorSupport; import org.apache.camel.support.service.ServiceHelper; /** * A default base class for a {@link LoadBalancer} implementation. */ -public abstract class LoadBalancerSupport extends AsyncProcessorSupport implements LoadBalancer, Navigate<Processor>, IdAware { +public abstract class LoadBalancerSupport extends AsyncProcessorSupport implements LoadBalancer, Navigate<Processor>, IdAware, RouteIdAware { private final AtomicReference<AsyncProcessor[]> processors = new AtomicReference<>(new AsyncProcessor[0]); private String id; + private String routeId; @Override public void addProcessor(AsyncProcessor processor) { @@ -100,6 +102,16 @@ public abstract class LoadBalancerSupport extends AsyncProcessorSupport implemen } @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + + @Override protected void doStart() throws Exception { ServiceHelper.startService((Object[]) processors.get()); } diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ProcessorReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ProcessorReifier.java index 59e2587..d831188 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ProcessorReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ProcessorReifier.java @@ -27,6 +27,7 @@ import org.apache.camel.Channel; import org.apache.camel.ErrorHandlerFactory; import org.apache.camel.ExtendedCamelContext; import org.apache.camel.Processor; +import org.apache.camel.RouteAware; import org.apache.camel.model.AggregateDefinition; import org.apache.camel.model.BeanDefinition; import org.apache.camel.model.CatchDefinition; @@ -106,6 +107,7 @@ import org.apache.camel.spi.IdAware; import org.apache.camel.spi.InterceptStrategy; import org.apache.camel.spi.LifecycleStrategy; import org.apache.camel.spi.RouteContext; +import org.apache.camel.spi.RouteIdAware; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -488,6 +490,9 @@ public abstract class ProcessorReifier<T extends ProcessorDefinition<?>> extends String id = getId(output, routeContext); ((IdAware)processor).setId(id); } + if (processor instanceof RouteIdAware) { + ((RouteIdAware)processor).setRouteId(routeContext.getRouteId()); + } if (output instanceof Channel && processor == null) { continue; @@ -576,6 +581,9 @@ public abstract class ProcessorReifier<T extends ProcessorDefinition<?>> extends String id = getId(definition, routeContext); ((IdAware)processor).setId(id); } + if (processor instanceof RouteIdAware) { + ((RouteIdAware)processor).setRouteId(routeContext.getRouteId()); + } if (processor == null) { // no processor to make diff --git a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedProcessor.java b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedProcessor.java index 60ad5dd..8c1beb3 100644 --- a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedProcessor.java +++ b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedProcessor.java @@ -30,6 +30,7 @@ import org.apache.camel.model.ProcessorDefinition; import org.apache.camel.model.ProcessorDefinitionHelper; import org.apache.camel.model.StepDefinition; import org.apache.camel.spi.ManagementStrategy; +import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.service.ServiceHelper; @ManagedResource(description = "Managed Processor") @@ -133,6 +134,8 @@ public class ManagedProcessor extends ManagedPerformanceCounter implements Manag public String getRouteId() { if (route != null) { return route.getId(); + } else if (processor instanceof RouteIdAware) { + return ((RouteIdAware) processor).getRouteId(); } return null; } diff --git a/core/camel-management/src/test/java/org/apache/camel/management/ManagedUnregisterProducerTest.java b/core/camel-management/src/test/java/org/apache/camel/management/ManagedUnregisterProducerTest.java index 4f15e20..afa4c6d 100644 --- a/core/camel-management/src/test/java/org/apache/camel/management/ManagedUnregisterProducerTest.java +++ b/core/camel-management/src/test/java/org/apache/camel/management/ManagedUnregisterProducerTest.java @@ -49,8 +49,8 @@ public class ManagedUnregisterProducerTest extends ManagementTestSupport { assertEquals("mock://result", uri); // TODO: populating route id on producers is not implemented yet - // String routeId = (String) mbeanServer.getAttribute(on, "RouteId"); - // assertEquals("route1", routeId); + //String routeId = (String) mbeanServer.getAttribute(on, "RouteId"); + //assertEquals("route1", routeId); Boolean singleton = (Boolean) mbeanServer.getAttribute(on, "Singleton"); assertEquals(Boolean.TRUE, singleton); diff --git a/core/camel-support/src/main/java/org/apache/camel/support/processor/CamelLogProcessor.java b/core/camel-support/src/main/java/org/apache/camel/support/processor/CamelLogProcessor.java index b28e728..f614fdb 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/processor/CamelLogProcessor.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/processor/CamelLogProcessor.java @@ -27,6 +27,7 @@ import org.apache.camel.spi.ExchangeFormatter; import org.apache.camel.spi.IdAware; import org.apache.camel.spi.LogListener; import org.apache.camel.spi.MaskingFormatter; +import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.AsyncProcessorSupport; /** @@ -36,9 +37,10 @@ import org.apache.camel.support.AsyncProcessorSupport; * The name <tt>CamelLogger</tt> has been chosen to avoid any name clash with log kits * which has a <tt>Logger</tt> class. */ -public class CamelLogProcessor extends AsyncProcessorSupport implements IdAware { +public class CamelLogProcessor extends AsyncProcessorSupport implements IdAware, RouteIdAware { private String id; + private String routeId; private CamelLogger logger; private ExchangeFormatter formatter; private MaskingFormatter maskingFormatter; @@ -76,6 +78,16 @@ public class CamelLogProcessor extends AsyncProcessorSupport implements IdAware } @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + + @Override public boolean process(Exchange exchange, AsyncCallback callback) { if (logger.shouldLog()) { String output = formatter.format(exchange); diff --git a/core/camel-support/src/main/java/org/apache/camel/support/processor/MarshalProcessor.java b/core/camel-support/src/main/java/org/apache/camel/support/processor/MarshalProcessor.java index 4f4bfbd..ecbac22 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/processor/MarshalProcessor.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/processor/MarshalProcessor.java @@ -24,6 +24,7 @@ import org.apache.camel.Message; import org.apache.camel.Traceable; import org.apache.camel.spi.DataFormat; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.AsyncProcessorSupport; import org.apache.camel.support.builder.OutputStreamBuilder; import org.apache.camel.support.service.ServiceHelper; @@ -33,8 +34,9 @@ import org.apache.camel.util.ObjectHelper; * Marshals the body of the incoming message using the given * <a href="http://camel.apache.org/data-format.html">data format</a> */ -public class MarshalProcessor extends AsyncProcessorSupport implements Traceable, CamelContextAware, IdAware { +public class MarshalProcessor extends AsyncProcessorSupport implements Traceable, CamelContextAware, IdAware, RouteIdAware { private String id; + private String routeId; private CamelContext camelContext; private final DataFormat dataFormat; @@ -92,6 +94,16 @@ public class MarshalProcessor extends AsyncProcessorSupport implements Traceable } @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + + @Override public CamelContext getCamelContext() { return camelContext; } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/processor/ThroughputLogger.java b/core/camel-support/src/main/java/org/apache/camel/support/processor/ThroughputLogger.java index 4e5f4be..fd5ae95 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/processor/ThroughputLogger.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/processor/ThroughputLogger.java @@ -27,15 +27,17 @@ import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.spi.CamelLogger; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.AsyncProcessorSupport; import org.apache.camel.util.ObjectHelper; /** * A logger for logging message throughput. */ -public class ThroughputLogger extends AsyncProcessorSupport implements AsyncProcessor, IdAware { +public class ThroughputLogger extends AsyncProcessorSupport implements AsyncProcessor, IdAware, RouteIdAware { private String id; + private String routeId; private final AtomicInteger receivedCounter = new AtomicInteger(); private NumberFormat numberFormat = NumberFormat.getNumberInstance(); private long groupReceivedCount; @@ -83,6 +85,16 @@ public class ThroughputLogger extends AsyncProcessorSupport implements AsyncProc } @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + + @Override public void process(Exchange exchange) throws Exception { if (startTime == 0) { startTime = System.currentTimeMillis(); diff --git a/core/camel-support/src/main/java/org/apache/camel/support/processor/UnmarshalProcessor.java b/core/camel-support/src/main/java/org/apache/camel/support/processor/UnmarshalProcessor.java index 21346da..2691450 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/processor/UnmarshalProcessor.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/processor/UnmarshalProcessor.java @@ -28,6 +28,7 @@ import org.apache.camel.RuntimeCamelException; import org.apache.camel.Traceable; import org.apache.camel.spi.DataFormat; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.AsyncProcessorSupport; import org.apache.camel.support.service.ServiceHelper; import org.apache.camel.util.IOHelper; @@ -37,8 +38,9 @@ import org.apache.camel.util.ObjectHelper; * Unmarshals the body of the incoming message using the given * <a href="http://camel.apache.org/data-format.html">data format</a> */ -public class UnmarshalProcessor extends AsyncProcessorSupport implements Traceable, CamelContextAware, IdAware { +public class UnmarshalProcessor extends AsyncProcessorSupport implements Traceable, CamelContextAware, IdAware, RouteIdAware { private String id; + private String routeId; private CamelContext camelContext; private final DataFormat dataFormat; @@ -106,6 +108,16 @@ public class UnmarshalProcessor extends AsyncProcessorSupport implements Traceab } @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + + @Override public CamelContext getCamelContext() { return camelContext; }