CAMEL-8527: Processor in routes should be IdAware
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/eaf8a3c4 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/eaf8a3c4 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/eaf8a3c4 Branch: refs/heads/master Commit: eaf8a3c4260aacc59acc6cd3f9021d43a32c5233 Parents: 89cbfd8 Author: Claus Ibsen <davscl...@apache.org> Authored: Sun Mar 22 08:18:15 2015 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sun Mar 22 16:12:41 2015 +0100 ---------------------------------------------------------------------- .../main/java/org/apache/camel/Navigate.java | 6 ++ .../src/main/java/org/apache/camel/Route.java | 8 +++ .../camel/impl/EventDrivenConsumerRoute.java | 30 +++++++++ .../apache/camel/model/ProcessorDefinition.java | 11 ++++ .../apache/camel/processor/BatchProcessor.java | 12 +++- .../camel/processor/CamelLogProcessor.java | 12 +++- .../apache/camel/processor/CatchProcessor.java | 12 +++- .../apache/camel/processor/ChoiceProcessor.java | 12 +++- .../camel/processor/ConvertBodyProcessor.java | 12 +++- .../org/apache/camel/processor/Delayer.java | 12 +++- .../org/apache/camel/processor/Enricher.java | 12 +++- .../processor/ExchangePatternProcessor.java | 12 +++- .../apache/camel/processor/FilterProcessor.java | 12 +++- .../camel/processor/FinallyProcessor.java | 12 +++- .../apache/camel/processor/LogProcessor.java | 12 +++- .../apache/camel/processor/LoopProcessor.java | 12 +++- .../camel/processor/MarshalProcessor.java | 12 +++- .../camel/processor/MulticastProcessor.java | 12 +++- .../camel/processor/OnCompletionProcessor.java | 12 +++- .../org/apache/camel/processor/Pipeline.java | 13 +++- .../apache/camel/processor/PollEnricher.java | 18 +++++- .../apache/camel/processor/RecipientList.java | 12 +++- .../camel/processor/RemoveHeaderProcessor.java | 16 ++++- .../camel/processor/RemoveHeadersProcessor.java | 20 +++++- .../processor/RemovePropertiesProcessor.java | 20 +++++- .../processor/RemovePropertyProcessor.java | 16 ++++- .../camel/processor/RollbackProcessor.java | 16 ++++- .../org/apache/camel/processor/RoutingSlip.java | 14 ++++- .../camel/processor/SamplingThrottler.java | 23 +++++-- .../apache/camel/processor/SendProcessor.java | 12 +++- .../camel/processor/SetBodyProcessor.java | 16 ++++- .../camel/processor/SetHeaderProcessor.java | 20 +++++- .../camel/processor/SetPropertyProcessor.java | 20 +++++- .../apache/camel/processor/SortProcessor.java | 25 +++++++- .../apache/camel/processor/StopProcessor.java | 13 +++- .../camel/processor/StreamResequencer.java | 12 +++- .../camel/processor/ThreadsProcessor.java | 12 +++- .../org/apache/camel/processor/Throttler.java | 12 +++- .../ThrottlerRejectedExecutionException.java | 5 +- .../camel/processor/ThroughputLogger.java | 12 +++- .../processor/ThrowExceptionProcessor.java | 16 ++++- .../camel/processor/TransformProcessor.java | 16 ++++- .../apache/camel/processor/TryProcessor.java | 11 +++- .../camel/processor/UnmarshalProcessor.java | 12 +++- .../camel/processor/WireTapProcessor.java | 12 +++- .../processor/aggregate/AggregateProcessor.java | 12 +++- .../idempotent/IdempotentConsumer.java | 12 +++- .../loadbalancer/LoadBalancerSupport.java | 12 +++- .../main/java/org/apache/camel/spi/IdAware.java | 33 ++++++++++ .../processor/SimpleProcessorIdAwareTest.java | 66 ++++++++++++++++++++ 50 files changed, 712 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/eaf8a3c4/camel-core/src/main/java/org/apache/camel/Navigate.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/Navigate.java b/camel-core/src/main/java/org/apache/camel/Navigate.java index 1f5219f..62568f1 100644 --- a/camel-core/src/main/java/org/apache/camel/Navigate.java +++ b/camel-core/src/main/java/org/apache/camel/Navigate.java @@ -28,6 +28,9 @@ public interface Navigate<T> { /** * Next group of outputs + * <p/> + * Important only invoke this once, as this method do not carry state, and is not intended to be used in a while loop, + * but used by a if statement instead. * * @return next group or <tt>null</tt> if no more outputs */ @@ -35,6 +38,9 @@ public interface Navigate<T> { /** * Are there more outputs? + * <p/> + * Important only invoke this once, as this method do not carry state, and is not intended to be used in a while loop, + * but used by a if statement instead. * * @return <tt>true</tt> if more outputs */ http://git-wip-us.apache.org/repos/asf/camel/blob/eaf8a3c4/camel-core/src/main/java/org/apache/camel/Route.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/Route.java b/camel-core/src/main/java/org/apache/camel/Route.java index 162f07a..d5cc0db 100644 --- a/camel-core/src/main/java/org/apache/camel/Route.java +++ b/camel-core/src/main/java/org/apache/camel/Route.java @@ -112,6 +112,14 @@ public interface Route extends EndpointAware { Navigate<Processor> navigate(); /** + * Returns a list of all the {@link Processor}s from this route that has id's matching the pattern + * + * @param pattern the pattern to match by ids + * @return a list of {@link Processor}, is never <tt>null</tt>. + */ + List<Processor> filter(String pattern); + + /** * Callback preparing the route to be started, by warming up the route. */ void warmUp(); http://git-wip-us.apache.org/repos/asf/camel/blob/eaf8a3c4/camel-core/src/main/java/org/apache/camel/impl/EventDrivenConsumerRoute.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/EventDrivenConsumerRoute.java b/camel-core/src/main/java/org/apache/camel/impl/EventDrivenConsumerRoute.java index d762a51..8a6da5d 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/EventDrivenConsumerRoute.java +++ b/camel-core/src/main/java/org/apache/camel/impl/EventDrivenConsumerRoute.java @@ -16,6 +16,7 @@ */ package org.apache.camel.impl; +import java.util.ArrayList; import java.util.List; import org.apache.camel.Consumer; @@ -25,7 +26,9 @@ import org.apache.camel.Processor; import org.apache.camel.RouteAware; import org.apache.camel.Service; import org.apache.camel.SuspendableService; +import org.apache.camel.spi.IdAware; import org.apache.camel.spi.RouteContext; +import org.apache.camel.util.EndpointHelper; /** * A {@link DefaultRoute} which starts with an @@ -94,6 +97,33 @@ public class EventDrivenConsumerRoute extends DefaultRoute { return null; } + @SuppressWarnings("unchecked") + public List<Processor> filter(String pattern) { + List<Processor> match = new ArrayList<Processor>(); + doFilter(pattern, navigate(), match); + return match; + } + + @SuppressWarnings("unchecked") + private void doFilter(String pattern, Navigate<Processor> nav, List<Processor> match) { + List<Processor> list = nav.next(); + if (list != null) { + for (Processor proc : list) { + String id = null; + if (proc instanceof IdAware) { + id = ((IdAware) proc).getId(); + } + if (EndpointHelper.matchPattern(id, pattern)) { + match.add(proc); + } + if (proc instanceof Navigate) { + Navigate<Processor> child = (Navigate<Processor>) proc; + doFilter(pattern, child, match); + } + } + } + } + public Consumer getConsumer() { return consumer; } http://git-wip-us.apache.org/repos/asf/camel/blob/eaf8a3c4/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java index d335008..f3122d7 100644 --- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java @@ -62,6 +62,7 @@ import org.apache.camel.processor.interceptor.HandleFault; import org.apache.camel.processor.interceptor.StreamCaching; import org.apache.camel.processor.loadbalancer.LoadBalancer; import org.apache.camel.spi.DataFormat; +import org.apache.camel.spi.IdAware; import org.apache.camel.spi.IdempotentRepository; import org.apache.camel.spi.InterceptStrategy; import org.apache.camel.spi.LifecycleStrategy; @@ -444,6 +445,11 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> Processor processor = createProcessor(routeContext, output); + // inject id + if (processor instanceof IdAware) { + ((IdAware) processor).setId(output.getId()); + } + if (output instanceof Channel && processor == null) { continue; } @@ -527,6 +533,11 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> processor = createProcessor(routeContext); } + // inject id + if (processor instanceof IdAware) { + ((IdAware) processor).setId(this.getId()); + } + if (processor == null) { // no processor to make return null; http://git-wip-us.apache.org/repos/asf/camel/blob/eaf8a3c4/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java index c62b614..b3c8f05 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java @@ -38,6 +38,7 @@ import org.apache.camel.Navigate; import org.apache.camel.Predicate; import org.apache.camel.Processor; import org.apache.camel.spi.ExceptionHandler; +import org.apache.camel.spi.IdAware; import org.apache.camel.support.LoggingExceptionHandler; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorHelper; @@ -53,13 +54,14 @@ import org.slf4j.LoggerFactory; * @deprecated may be removed in the future when we overhaul the resequencer EIP */ @Deprecated -public class BatchProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor> { +public class BatchProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, IdAware { public static final long DEFAULT_BATCH_TIMEOUT = 1000L; public static final int DEFAULT_BATCH_SIZE = 100; private static final Logger LOG = LoggerFactory.getLogger(BatchProcessor.class); + private String id; private long batchTimeout = DEFAULT_BATCH_TIMEOUT; private int batchSize = DEFAULT_BATCH_SIZE; private int outBatchSize; @@ -199,6 +201,14 @@ public class BatchProcessor extends ServiceSupport implements AsyncProcessor, Na return processor != null; } + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + /** * A strategy method to decide if the "in" batch is completed. That is, whether the resulting exchanges in * the in queue should be drained to the "out" collection. http://git-wip-us.apache.org/repos/asf/camel/blob/eaf8a3c4/camel-core/src/main/java/org/apache/camel/processor/CamelLogProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/CamelLogProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/CamelLogProcessor.java index 052d1e7..fd98ce3 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/CamelLogProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/CamelLogProcessor.java @@ -22,6 +22,7 @@ import org.apache.camel.Exchange; import org.apache.camel.LoggingLevel; import org.apache.camel.Processor; import org.apache.camel.spi.ExchangeFormatter; +import org.apache.camel.spi.IdAware; import org.apache.camel.util.AsyncProcessorHelper; import org.apache.camel.util.CamelLogger; @@ -34,7 +35,8 @@ import org.apache.camel.util.CamelLogger; * * @version */ -public class CamelLogProcessor implements AsyncProcessor { +public class CamelLogProcessor implements AsyncProcessor, IdAware { + private String id; private CamelLogger log; private ExchangeFormatter formatter; @@ -57,6 +59,14 @@ public class CamelLogProcessor implements AsyncProcessor { return "Logger[" + log + "]"; } + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + public void process(Exchange exchange) throws Exception { AsyncProcessorHelper.process(this, exchange); } http://git-wip-us.apache.org/repos/asf/camel/blob/eaf8a3c4/camel-core/src/main/java/org/apache/camel/processor/CatchProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/CatchProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/CatchProcessor.java index 5cc5b9c..063a776 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/CatchProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/CatchProcessor.java @@ -23,6 +23,7 @@ import org.apache.camel.Exchange; import org.apache.camel.Predicate; import org.apache.camel.Processor; import org.apache.camel.Traceable; +import org.apache.camel.spi.IdAware; import org.apache.camel.util.ExchangeHelper; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; @@ -33,9 +34,10 @@ import org.slf4j.LoggerFactory; * * @version */ -public class CatchProcessor extends DelegateAsyncProcessor implements Traceable { +public class CatchProcessor extends DelegateAsyncProcessor implements Traceable, IdAware { private static final Logger LOG = LoggerFactory.getLogger(CatchProcessor.class); + private String id; private final List<Class<? extends Throwable>> exceptions; private final Predicate onWhen; private final Predicate handled; @@ -52,6 +54,14 @@ public class CatchProcessor extends DelegateAsyncProcessor implements Traceable return "Catch[" + exceptions + " -> " + getProcessor() + "]"; } + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + public String getTraceLabel() { return "catch"; } http://git-wip-us.apache.org/repos/asf/camel/blob/eaf8a3c4/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java index 44f4b10..acd7cf5 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java @@ -26,6 +26,7 @@ import org.apache.camel.Exchange; import org.apache.camel.Navigate; import org.apache.camel.Processor; import org.apache.camel.Traceable; +import org.apache.camel.spi.IdAware; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorConverterHelper; import org.apache.camel.util.AsyncProcessorHelper; @@ -42,8 +43,9 @@ import static org.apache.camel.processor.PipelineHelper.continueProcessing; * * @version */ -public class ChoiceProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable { +public class ChoiceProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable, IdAware { private static final Logger LOG = LoggerFactory.getLogger(ChoiceProcessor.class); + private String id; private final List<Processor> filters; private final Processor otherwise; @@ -167,6 +169,14 @@ public class ChoiceProcessor extends ServiceSupport implements AsyncProcessor, N return otherwise != null || (filters != null && !filters.isEmpty()); } + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + protected void doStart() throws Exception { ServiceHelper.startServices(filters, otherwise); } http://git-wip-us.apache.org/repos/asf/camel/blob/eaf8a3c4/camel-core/src/main/java/org/apache/camel/processor/ConvertBodyProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/ConvertBodyProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/ConvertBodyProcessor.java index 40e863b..125e0e6 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/ConvertBodyProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/ConvertBodyProcessor.java @@ -21,6 +21,7 @@ import org.apache.camel.AsyncProcessor; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.impl.DefaultMessage; +import org.apache.camel.spi.IdAware; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorHelper; import org.apache.camel.util.ExchangeHelper; @@ -34,7 +35,8 @@ import org.apache.camel.util.ObjectHelper; * * @version */ -public class ConvertBodyProcessor extends ServiceSupport implements AsyncProcessor { +public class ConvertBodyProcessor extends ServiceSupport implements AsyncProcessor, IdAware { + private String id; private final Class<?> type; private final String charset; @@ -55,6 +57,14 @@ public class ConvertBodyProcessor extends ServiceSupport implements AsyncProcess return "convertBodyTo[" + type.getCanonicalName() + "]"; } + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + public void process(Exchange exchange) throws Exception { AsyncProcessorHelper.process(this, exchange); } http://git-wip-us.apache.org/repos/asf/camel/blob/eaf8a3c4/camel-core/src/main/java/org/apache/camel/processor/Delayer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/Delayer.java b/camel-core/src/main/java/org/apache/camel/processor/Delayer.java index 807b974..2f598e6 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/Delayer.java +++ b/camel-core/src/main/java/org/apache/camel/processor/Delayer.java @@ -23,6 +23,7 @@ import org.apache.camel.Exchange; import org.apache.camel.Expression; import org.apache.camel.Processor; import org.apache.camel.Traceable; +import org.apache.camel.spi.IdAware; /** * A <a href="http://camel.apache.org/delayer.html">Delayer</a> which @@ -33,7 +34,8 @@ import org.apache.camel.Traceable; * * @version */ -public class Delayer extends DelayProcessorSupport implements Traceable { +public class Delayer extends DelayProcessorSupport implements Traceable, IdAware { + private String id; private Expression delay; private long delayValue; @@ -48,6 +50,14 @@ public class Delayer extends DelayProcessorSupport implements Traceable { return "Delayer[" + delay + " to: " + getProcessor() + "]"; } + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + public String getTraceLabel() { return "delay[" + delay + "]"; } http://git-wip-us.apache.org/repos/asf/camel/blob/eaf8a3c4/camel-core/src/main/java/org/apache/camel/processor/Enricher.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/Enricher.java b/camel-core/src/main/java/org/apache/camel/processor/Enricher.java index e3ebcff..75d2429 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/Enricher.java +++ b/camel-core/src/main/java/org/apache/camel/processor/Enricher.java @@ -26,6 +26,7 @@ import org.apache.camel.ExchangePattern; import org.apache.camel.Producer; import org.apache.camel.impl.DefaultExchange; import org.apache.camel.processor.aggregate.AggregationStrategy; +import org.apache.camel.spi.IdAware; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorConverterHelper; import org.apache.camel.util.AsyncProcessorHelper; @@ -49,9 +50,10 @@ import static org.apache.camel.util.ExchangeHelper.copyResultsPreservePattern; * * @see PollEnricher */ -public class Enricher extends ServiceSupport implements AsyncProcessor, EndpointAware { +public class Enricher extends ServiceSupport implements AsyncProcessor, EndpointAware, IdAware { private static final Logger LOG = LoggerFactory.getLogger(Enricher.class); + private String id; private AggregationStrategy aggregationStrategy; private Producer producer; private boolean aggregateOnException; @@ -79,6 +81,14 @@ public class Enricher extends ServiceSupport implements AsyncProcessor, Endpoint this.producer = producer; } + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + /** * Sets the aggregation strategy for this enricher. * http://git-wip-us.apache.org/repos/asf/camel/blob/eaf8a3c4/camel-core/src/main/java/org/apache/camel/processor/ExchangePatternProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/ExchangePatternProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/ExchangePatternProcessor.java index 3d151b9..9ff313b 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/ExchangePatternProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/ExchangePatternProcessor.java @@ -20,13 +20,15 @@ import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; +import org.apache.camel.spi.IdAware; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorHelper; /** * Processor to set {@link org.apache.camel.ExchangePattern} on the {@link org.apache.camel.Exchange}. */ -public class ExchangePatternProcessor extends ServiceSupport implements AsyncProcessor { +public class ExchangePatternProcessor extends ServiceSupport implements AsyncProcessor, IdAware { + private String id; private ExchangePattern exchangePattern = ExchangePattern.InOnly; public ExchangePatternProcessor() { @@ -40,6 +42,14 @@ public class ExchangePatternProcessor extends ServiceSupport implements AsyncPro exchangePattern = ep; } + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + public void process(Exchange exchange) throws Exception { AsyncProcessorHelper.process(this, exchange); } http://git-wip-us.apache.org/repos/asf/camel/blob/eaf8a3c4/camel-core/src/main/java/org/apache/camel/processor/FilterProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/FilterProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/FilterProcessor.java index aebc12d..64e9324 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/FilterProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/FilterProcessor.java @@ -21,6 +21,7 @@ import org.apache.camel.Exchange; import org.apache.camel.Predicate; import org.apache.camel.Processor; import org.apache.camel.Traceable; +import org.apache.camel.spi.IdAware; import org.apache.camel.util.ServiceHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,8 +32,9 @@ import org.slf4j.LoggerFactory; * * @version */ -public class FilterProcessor extends DelegateAsyncProcessor implements Traceable { +public class FilterProcessor extends DelegateAsyncProcessor implements Traceable, IdAware { private static final Logger LOG = LoggerFactory.getLogger(FilterProcessor.class); + private String id; private final Predicate predicate; public FilterProcessor(Predicate predicate, Processor processor) { @@ -67,6 +69,14 @@ public class FilterProcessor extends DelegateAsyncProcessor implements Traceable return "Filter[if: " + predicate + " do: " + getProcessor() + "]"; } + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + public String getTraceLabel() { return "filter[if: " + predicate + "]"; } http://git-wip-us.apache.org/repos/asf/camel/blob/eaf8a3c4/camel-core/src/main/java/org/apache/camel/processor/FinallyProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/FinallyProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/FinallyProcessor.java index c4d0bec..b04e172 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/FinallyProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/FinallyProcessor.java @@ -20,6 +20,7 @@ import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.Traceable; +import org.apache.camel.spi.IdAware; import org.apache.camel.util.ExchangeHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,9 +30,10 @@ import org.slf4j.LoggerFactory; * * @version */ -public class FinallyProcessor extends DelegateAsyncProcessor implements Traceable { +public class FinallyProcessor extends DelegateAsyncProcessor implements Traceable, IdAware { private static final Logger LOG = LoggerFactory.getLogger(FinallyProcessor.class); + private String id; public FinallyProcessor(Processor processor) { super(processor); @@ -80,4 +82,12 @@ public class FinallyProcessor extends DelegateAsyncProcessor implements Traceabl public String getTraceLabel() { return "finally"; } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/eaf8a3c4/camel-core/src/main/java/org/apache/camel/processor/LogProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/LogProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/LogProcessor.java index fed1503..6038825 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/LogProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/LogProcessor.java @@ -21,6 +21,7 @@ import org.apache.camel.AsyncProcessor; import org.apache.camel.Exchange; import org.apache.camel.Expression; import org.apache.camel.Traceable; +import org.apache.camel.spi.IdAware; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorHelper; import org.apache.camel.util.CamelLogger; @@ -30,8 +31,9 @@ import org.apache.camel.util.CamelLogger; * * @version */ -public class LogProcessor extends ServiceSupport implements AsyncProcessor, Traceable { +public class LogProcessor extends ServiceSupport implements AsyncProcessor, Traceable, IdAware { + private String id; private final Expression expression; private final CamelLogger logger; @@ -69,6 +71,14 @@ public class LogProcessor extends ServiceSupport implements AsyncProcessor, Trac return "log[" + expression + "]"; } + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + @Override protected void doStart() throws Exception { // noop http://git-wip-us.apache.org/repos/asf/camel/blob/eaf8a3c4/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java index 89649b1..d5b484d 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java @@ -24,6 +24,7 @@ import org.apache.camel.Expression; import org.apache.camel.NoTypeConversionAvailableException; import org.apache.camel.Processor; import org.apache.camel.Traceable; +import org.apache.camel.spi.IdAware; import org.apache.camel.util.ExchangeHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,9 +32,10 @@ import org.slf4j.LoggerFactory; /** * The processor which sends messages in a loop. */ -public class LoopProcessor extends DelegateAsyncProcessor implements Traceable { +public class LoopProcessor extends DelegateAsyncProcessor implements Traceable, IdAware { private static final Logger LOG = LoggerFactory.getLogger(LoopProcessor.class); + private String id; private final Expression expression; private final boolean copy; @@ -173,6 +175,14 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable { return "loop[" + expression + "]"; } + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + @Override public String toString() { return "Loop[for: " + expression + " times do: " + getProcessor() + "]"; http://git-wip-us.apache.org/repos/asf/camel/blob/eaf8a3c4/camel-core/src/main/java/org/apache/camel/processor/MarshalProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/MarshalProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/MarshalProcessor.java index 7a45624..1b78d86 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/MarshalProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/MarshalProcessor.java @@ -27,6 +27,7 @@ import org.apache.camel.Message; import org.apache.camel.Traceable; import org.apache.camel.converter.stream.CachedOutputStream; import org.apache.camel.spi.DataFormat; +import org.apache.camel.spi.IdAware; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorHelper; import org.apache.camel.util.ObjectHelper; @@ -38,7 +39,8 @@ import org.apache.camel.util.ServiceHelper; * * @version */ -public class MarshalProcessor extends ServiceSupport implements AsyncProcessor, Traceable, CamelContextAware { +public class MarshalProcessor extends ServiceSupport implements AsyncProcessor, Traceable, CamelContextAware, IdAware { + private String id; private CamelContext camelContext; private final DataFormat dataFormat; @@ -101,6 +103,14 @@ public class MarshalProcessor extends ServiceSupport implements AsyncProcessor, return "marshal[" + dataFormat + "]"; } + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + public CamelContext getCamelContext() { return camelContext; } http://git-wip-us.apache.org/repos/asf/camel/blob/eaf8a3c4/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java index b953d08..1cd86da 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -50,6 +50,7 @@ import org.apache.camel.Traceable; import org.apache.camel.processor.aggregate.AggregationStrategy; import org.apache.camel.processor.aggregate.CompletionAwareAggregationStrategy; import org.apache.camel.processor.aggregate.TimeoutAwareAggregationStrategy; +import org.apache.camel.spi.IdAware; import org.apache.camel.spi.RouteContext; import org.apache.camel.spi.TracedRouteNodes; import org.apache.camel.spi.UnitOfWork; @@ -80,7 +81,7 @@ import static org.apache.camel.util.ObjectHelper.notNull; * @version * @see Pipeline */ -public class MulticastProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable { +public class MulticastProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable, IdAware { private static final Logger LOG = LoggerFactory.getLogger(MulticastProcessor.class); @@ -144,6 +145,7 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor protected final Processor onPrepare; private final CamelContext camelContext; + private String id; private Collection<Processor> processors; private final AggregationStrategy aggregationStrategy; private final boolean parallelProcessing; @@ -198,6 +200,14 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor return "Multicast[" + getProcessors() + "]"; } + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + public String getTraceLabel() { return "multicast"; } http://git-wip-us.apache.org/repos/asf/camel/blob/eaf8a3c4/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java index e420772..29ad577 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java @@ -30,6 +30,7 @@ import org.apache.camel.Predicate; import org.apache.camel.Processor; import org.apache.camel.Route; import org.apache.camel.Traceable; +import org.apache.camel.spi.IdAware; import org.apache.camel.support.ServiceSupport; import org.apache.camel.support.SynchronizationAdapter; import org.apache.camel.util.AsyncProcessorHelper; @@ -45,10 +46,11 @@ import static org.apache.camel.util.ObjectHelper.notNull; * * @version */ -public class OnCompletionProcessor extends ServiceSupport implements AsyncProcessor, Traceable { +public class OnCompletionProcessor extends ServiceSupport implements AsyncProcessor, Traceable, IdAware { private static final Logger LOG = LoggerFactory.getLogger(OnCompletionProcessor.class); private final CamelContext camelContext; + private String id; private final Processor processor; private final ExecutorService executorService; private final boolean shutdownExecutorService; @@ -95,6 +97,14 @@ public class OnCompletionProcessor extends ServiceSupport implements AsyncProces return camelContext; } + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + public void process(Exchange exchange) throws Exception { AsyncProcessorHelper.process(this, exchange); } http://git-wip-us.apache.org/repos/asf/camel/blob/eaf8a3c4/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java b/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java index 9c14476..ec1b070 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java +++ b/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java @@ -26,6 +26,7 @@ import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.Traceable; +import org.apache.camel.spi.IdAware; import org.apache.camel.util.AsyncProcessorConverterHelper; import org.apache.camel.util.AsyncProcessorHelper; import org.apache.camel.util.ExchangeHelper; @@ -40,9 +41,11 @@ import static org.apache.camel.processor.PipelineHelper.continueProcessing; * * @version */ -public class Pipeline extends MulticastProcessor implements AsyncProcessor, Traceable { +public class Pipeline extends MulticastProcessor implements AsyncProcessor, Traceable, IdAware { private static final Logger LOG = LoggerFactory.getLogger(Pipeline.class); + private String id; + public Pipeline(CamelContext camelContext, Collection<Processor> processors) { super(camelContext, processors); } @@ -189,4 +192,12 @@ public class Pipeline extends MulticastProcessor implements AsyncProcessor, Trac public String getTraceLabel() { return "pipeline"; } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/eaf8a3c4/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java index 4899530..9cbca74 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java +++ b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java @@ -19,9 +19,12 @@ package org.apache.camel.processor; import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; import org.apache.camel.CamelExchangeException; +import org.apache.camel.Endpoint; +import org.apache.camel.EndpointAware; import org.apache.camel.Exchange; import org.apache.camel.PollingConsumer; import org.apache.camel.processor.aggregate.AggregationStrategy; +import org.apache.camel.spi.IdAware; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorHelper; import org.apache.camel.util.ExchangeHelper; @@ -43,9 +46,10 @@ import static org.apache.camel.util.ExchangeHelper.copyResultsPreservePattern; * * @see Enricher */ -public class PollEnricher extends ServiceSupport implements AsyncProcessor { +public class PollEnricher extends ServiceSupport implements AsyncProcessor, EndpointAware, IdAware { private static final Logger LOG = LoggerFactory.getLogger(PollEnricher.class); + private String id; private AggregationStrategy aggregationStrategy; private PollingConsumer consumer; private long timeout; @@ -76,6 +80,18 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor { this.timeout = timeout; } + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public Endpoint getEndpoint() { + return consumer.getEndpoint(); + } + public AggregationStrategy getAggregationStrategy() { return aggregationStrategy; } http://git-wip-us.apache.org/repos/asf/camel/blob/eaf8a3c4/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java b/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java index ee80672..a5a0c0d 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java +++ b/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java @@ -30,6 +30,7 @@ import org.apache.camel.impl.EmptyProducerCache; import org.apache.camel.impl.ProducerCache; import org.apache.camel.processor.aggregate.AggregationStrategy; import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy; +import org.apache.camel.spi.IdAware; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorHelper; import org.apache.camel.util.ExchangeHelper; @@ -48,11 +49,12 @@ import static org.apache.camel.util.ObjectHelper.notNull; * * @version */ -public class RecipientList extends ServiceSupport implements AsyncProcessor { +public class RecipientList extends ServiceSupport implements AsyncProcessor, IdAware { private static final Logger LOG = LoggerFactory.getLogger(RecipientList.class); private static final String IGNORE_DELIMITER_MARKER = "false"; private final CamelContext camelContext; + private String id; private ProducerCache producerCache; private Expression expression; private final String delimiter; @@ -101,6 +103,14 @@ public class RecipientList extends ServiceSupport implements AsyncProcessor { return "RecipientList[" + (expression != null ? expression : "") + "]"; } + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + public void process(Exchange exchange) throws Exception { AsyncProcessorHelper.process(this, exchange); } http://git-wip-us.apache.org/repos/asf/camel/blob/eaf8a3c4/camel-core/src/main/java/org/apache/camel/processor/RemoveHeaderProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/RemoveHeaderProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/RemoveHeaderProcessor.java index b0b83f7..9bddb41 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/RemoveHeaderProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/RemoveHeaderProcessor.java @@ -21,14 +21,16 @@ import org.apache.camel.AsyncProcessor; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.Traceable; +import org.apache.camel.spi.IdAware; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorHelper; /** * A processor which removes the header from the IN or OUT message */ -public class RemoveHeaderProcessor extends ServiceSupport implements AsyncProcessor, Traceable { +public class RemoveHeaderProcessor extends ServiceSupport implements AsyncProcessor, Traceable, IdAware { private final String headerName; + private String id; public RemoveHeaderProcessor(String headerName) { this.headerName = headerName; @@ -60,6 +62,18 @@ public class RemoveHeaderProcessor extends ServiceSupport implements AsyncProces return "removeHeader[" + headerName + "]"; } + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getHeaderName() { + return headerName; + } + @Override protected void doStart() throws Exception { // noop http://git-wip-us.apache.org/repos/asf/camel/blob/eaf8a3c4/camel-core/src/main/java/org/apache/camel/processor/RemoveHeadersProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/RemoveHeadersProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/RemoveHeadersProcessor.java index a104c26..12c0fc1 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/RemoveHeadersProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/RemoveHeadersProcessor.java @@ -21,13 +21,15 @@ import org.apache.camel.AsyncProcessor; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.Traceable; +import org.apache.camel.spi.IdAware; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorHelper; /** * A processor which removes one ore more headers from the IN or OUT message */ -public class RemoveHeadersProcessor extends ServiceSupport implements AsyncProcessor, Traceable { +public class RemoveHeadersProcessor extends ServiceSupport implements AsyncProcessor, Traceable, IdAware { + private String id; private final String pattern; private final String[] excludePattern; @@ -62,6 +64,22 @@ public class RemoveHeadersProcessor extends ServiceSupport implements AsyncProce return "removeHeaders[" + pattern + "]"; } + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getPattern() { + return pattern; + } + + public String[] getExcludePattern() { + return excludePattern; + } + @Override protected void doStart() throws Exception { // noop http://git-wip-us.apache.org/repos/asf/camel/blob/eaf8a3c4/camel-core/src/main/java/org/apache/camel/processor/RemovePropertiesProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/RemovePropertiesProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/RemovePropertiesProcessor.java index 0ad4096..69889cd 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/RemovePropertiesProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/RemovePropertiesProcessor.java @@ -21,13 +21,15 @@ import org.apache.camel.AsyncProcessor; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.Traceable; +import org.apache.camel.spi.IdAware; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorHelper; /** * A processor which removes one ore more properties from the exchange */ -public class RemovePropertiesProcessor extends ServiceSupport implements AsyncProcessor, Traceable { +public class RemovePropertiesProcessor extends ServiceSupport implements AsyncProcessor, Traceable, IdAware { + private String id; private final String pattern; private final String[] excludePattern; @@ -61,6 +63,22 @@ public class RemovePropertiesProcessor extends ServiceSupport implements AsyncPr return "removeProperties[" + pattern + "]"; } + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getPattern() { + return pattern; + } + + public String[] getExcludePattern() { + return excludePattern; + } + @Override protected void doStart() throws Exception { // noop http://git-wip-us.apache.org/repos/asf/camel/blob/eaf8a3c4/camel-core/src/main/java/org/apache/camel/processor/RemovePropertyProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/RemovePropertyProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/RemovePropertyProcessor.java index a3a6ad0..dffacc2 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/RemovePropertyProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/RemovePropertyProcessor.java @@ -20,13 +20,15 @@ import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; import org.apache.camel.Exchange; import org.apache.camel.Traceable; +import org.apache.camel.spi.IdAware; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorHelper; /** * A processor which removes the property from the exchange */ -public class RemovePropertyProcessor extends ServiceSupport implements AsyncProcessor, Traceable { +public class RemovePropertyProcessor extends ServiceSupport implements AsyncProcessor, Traceable, IdAware { + private String id; private final String propertyName; public RemovePropertyProcessor(String propertyName) { @@ -58,6 +60,18 @@ public class RemovePropertyProcessor extends ServiceSupport implements AsyncProc return "removeProperty[" + propertyName + "]"; } + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getPropertyName() { + return propertyName; + } + @Override protected void doStart() throws Exception { // noop http://git-wip-us.apache.org/repos/asf/camel/blob/eaf8a3c4/camel-core/src/main/java/org/apache/camel/processor/RollbackProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/RollbackProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/RollbackProcessor.java index 3fc466e..a2f9b3e 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/RollbackProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/RollbackProcessor.java @@ -21,6 +21,7 @@ import org.apache.camel.AsyncProcessor; import org.apache.camel.Exchange; import org.apache.camel.RollbackExchangeException; import org.apache.camel.Traceable; +import org.apache.camel.spi.IdAware; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorHelper; @@ -29,8 +30,9 @@ import org.apache.camel.util.AsyncProcessorHelper; * * @version */ -public class RollbackProcessor extends ServiceSupport implements AsyncProcessor, Traceable { +public class RollbackProcessor extends ServiceSupport implements AsyncProcessor, Traceable, IdAware { + private String id; private boolean markRollbackOnly; private boolean markRollbackOnlyLast; private String message; @@ -86,6 +88,18 @@ public class RollbackProcessor extends ServiceSupport implements AsyncProcessor, return "rollback"; } + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getMessage() { + return message; + } + public boolean isMarkRollbackOnly() { return markRollbackOnly; } http://git-wip-us.apache.org/repos/asf/camel/blob/eaf8a3c4/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java index 00625d6..8de6624 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java +++ b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java @@ -34,6 +34,7 @@ import org.apache.camel.builder.ExpressionBuilder; import org.apache.camel.impl.DefaultExchange; import org.apache.camel.impl.EmptyProducerCache; import org.apache.camel.impl.ProducerCache; +import org.apache.camel.spi.IdAware; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorHelper; import org.apache.camel.util.ExchangeHelper; @@ -55,8 +56,9 @@ import static org.apache.camel.util.ObjectHelper.notNull; * as the failover load balancer is a specialized pipeline. So the trick is to keep doing the same as the * pipeline to ensure it works the same and the async routing engine is flawless. */ -public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Traceable { +public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Traceable, IdAware { protected final Logger log = LoggerFactory.getLogger(getClass()); + protected String id; protected ProducerCache producerCache; protected int cacheSize; protected boolean ignoreInvalidEndpoints; @@ -102,7 +104,15 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace this.uriDelimiter = uriDelimiter; this.header = null; } - + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + public void setDelimiter(String delimiter) { this.uriDelimiter = delimiter; } http://git-wip-us.apache.org/repos/asf/camel/blob/eaf8a3c4/camel-core/src/main/java/org/apache/camel/processor/SamplingThrottler.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/SamplingThrottler.java b/camel-core/src/main/java/org/apache/camel/processor/SamplingThrottler.java index ee989a0..661646c 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/SamplingThrottler.java +++ b/camel-core/src/main/java/org/apache/camel/processor/SamplingThrottler.java @@ -22,6 +22,8 @@ import java.util.concurrent.TimeUnit; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.Processor; +import org.apache.camel.Traceable; +import org.apache.camel.spi.IdAware; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,9 +40,10 @@ import org.slf4j.LoggerFactory; * * @version */ -public class SamplingThrottler extends DelegateAsyncProcessor { +public class SamplingThrottler extends DelegateAsyncProcessor implements Traceable, IdAware { - protected final Logger log = LoggerFactory.getLogger(getClass()); + private static final Logger LOG = LoggerFactory.getLogger(SamplingThrottler.class); + private String id; private long messageFrequency; private long currentMessageCount; private long samplePeriod; @@ -83,6 +86,14 @@ public class SamplingThrottler extends DelegateAsyncProcessor { } } + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + public String getTraceLabel() { if (messageFrequency > 0) { return "samplingThrottler[1 exchange per: " + messageFrequency + " messages received]"; @@ -106,13 +117,13 @@ public class SamplingThrottler extends DelegateAsyncProcessor { long now = System.currentTimeMillis(); if (now >= timeOfLastExchange + periodInMillis) { doSend = true; - if (log.isTraceEnabled()) { - log.trace(sampled.sample()); + if (LOG.isTraceEnabled()) { + LOG.trace(sampled.sample()); } timeOfLastExchange = now; } else { - if (log.isTraceEnabled()) { - log.trace(sampled.drop()); + if (LOG.isTraceEnabled()) { + LOG.trace(sampled.drop()); } } } http://git-wip-us.apache.org/repos/asf/camel/blob/eaf8a3c4/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java index 9b9e0d3..884f674 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java @@ -32,6 +32,7 @@ import org.apache.camel.ServicePoolAware; import org.apache.camel.Traceable; import org.apache.camel.impl.InterceptSendToEndpoint; import org.apache.camel.impl.ProducerCache; +import org.apache.camel.spi.IdAware; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorConverterHelper; import org.apache.camel.util.AsyncProcessorHelper; @@ -49,7 +50,7 @@ import org.slf4j.LoggerFactory; * * @version */ -public class SendProcessor extends ServiceSupport implements AsyncProcessor, Traceable, EndpointAware { +public class SendProcessor extends ServiceSupport implements AsyncProcessor, Traceable, EndpointAware, IdAware { protected static final Logger LOG = LoggerFactory.getLogger(SendProcessor.class); protected final CamelContext camelContext; protected final ExchangePattern pattern; @@ -57,6 +58,7 @@ public class SendProcessor extends ServiceSupport implements AsyncProcessor, Tra protected AsyncProcessor producer; protected Endpoint destination; protected ExchangePattern destinationExchangePattern; + protected String id; public SendProcessor(Endpoint destination) { this(destination, null); @@ -81,6 +83,14 @@ public class SendProcessor extends ServiceSupport implements AsyncProcessor, Tra return "sendTo(" + destination + (pattern != null ? " " + pattern : "") + ")"; } + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + /** * @deprecated not longer supported. */ http://git-wip-us.apache.org/repos/asf/camel/blob/eaf8a3c4/camel-core/src/main/java/org/apache/camel/processor/SetBodyProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/SetBodyProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/SetBodyProcessor.java index 8da646a..ef098be 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/SetBodyProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/SetBodyProcessor.java @@ -23,6 +23,7 @@ import org.apache.camel.Expression; import org.apache.camel.Message; import org.apache.camel.Traceable; import org.apache.camel.impl.DefaultMessage; +import org.apache.camel.spi.IdAware; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorHelper; import org.apache.camel.util.ExchangeHelper; @@ -30,7 +31,8 @@ import org.apache.camel.util.ExchangeHelper; /** * A processor which sets the body on the IN or OUT message with an {@link Expression} */ -public class SetBodyProcessor extends ServiceSupport implements AsyncProcessor, Traceable { +public class SetBodyProcessor extends ServiceSupport implements AsyncProcessor, Traceable, IdAware { + private String id; private final Expression expression; public SetBodyProcessor(Expression expression) { @@ -82,6 +84,18 @@ public class SetBodyProcessor extends ServiceSupport implements AsyncProcessor, return "setBody[" + expression + "]"; } + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public Expression getExpression() { + return expression; + } + @Override protected void doStart() throws Exception { // noop http://git-wip-us.apache.org/repos/asf/camel/blob/eaf8a3c4/camel-core/src/main/java/org/apache/camel/processor/SetHeaderProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/SetHeaderProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/SetHeaderProcessor.java index bfcd3dd..fd1e378 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/SetHeaderProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/SetHeaderProcessor.java @@ -22,13 +22,15 @@ import org.apache.camel.Exchange; import org.apache.camel.Expression; import org.apache.camel.Message; import org.apache.camel.Traceable; +import org.apache.camel.spi.IdAware; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorHelper; /** * A processor which sets the header on the IN or OUT message with an {@link org.apache.camel.Expression} */ -public class SetHeaderProcessor extends ServiceSupport implements AsyncProcessor, Traceable { +public class SetHeaderProcessor extends ServiceSupport implements AsyncProcessor, Traceable, IdAware { + private String id; private final String headerName; private final Expression expression; @@ -68,6 +70,22 @@ public class SetHeaderProcessor extends ServiceSupport implements AsyncProcessor return "setHeader[" + headerName + ", " + expression + "]"; } + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getHeaderName() { + return headerName; + } + + public Expression getExpression() { + return expression; + } + @Override protected void doStart() throws Exception { // noop http://git-wip-us.apache.org/repos/asf/camel/blob/eaf8a3c4/camel-core/src/main/java/org/apache/camel/processor/SetPropertyProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/SetPropertyProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/SetPropertyProcessor.java index d50d07c..48eb206 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/SetPropertyProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/SetPropertyProcessor.java @@ -21,13 +21,15 @@ import org.apache.camel.AsyncProcessor; import org.apache.camel.Exchange; import org.apache.camel.Expression; import org.apache.camel.Traceable; +import org.apache.camel.spi.IdAware; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorHelper; /** * A processor which sets the property on the exchange with an {@link org.apache.camel.Expression} */ -public class SetPropertyProcessor extends ServiceSupport implements AsyncProcessor, Traceable { +public class SetPropertyProcessor extends ServiceSupport implements AsyncProcessor, Traceable, IdAware { + private String id; private final String propertyName; private final Expression expression; @@ -62,6 +64,22 @@ public class SetPropertyProcessor extends ServiceSupport implements AsyncProcess return "setProperty[" + propertyName + ", " + expression + "]"; } + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getPropertyName() { + return propertyName; + } + + public Expression getExpression() { + return expression; + } + @Override protected void doStart() throws Exception { // noop http://git-wip-us.apache.org/repos/asf/camel/blob/eaf8a3c4/camel-core/src/main/java/org/apache/camel/processor/SortProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/SortProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/SortProcessor.java index e66f041..92db786 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/SortProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/SortProcessor.java @@ -25,14 +25,16 @@ import org.apache.camel.AsyncProcessor; import org.apache.camel.Exchange; import org.apache.camel.Expression; import org.apache.camel.Message; +import org.apache.camel.spi.IdAware; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorHelper; /** * A processor that sorts the expression using a comparator */ -public class SortProcessor<T> extends ServiceSupport implements AsyncProcessor { +public class SortProcessor<T> extends ServiceSupport implements AsyncProcessor, Traceable, IdAware { + private String id; private final Expression expression; private final Comparator<? super T> comparator; @@ -74,6 +76,27 @@ public class SortProcessor<T> extends ServiceSupport implements AsyncProcessor { } @Override + public String getTraceLabel() { + return "sort[" + expression + "]"; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public Expression getExpression() { + return expression; + } + + public Comparator<? super T> getComparator() { + return comparator; + } + + @Override protected void doStart() throws Exception { // noop } http://git-wip-us.apache.org/repos/asf/camel/blob/eaf8a3c4/camel-core/src/main/java/org/apache/camel/processor/StopProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/StopProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/StopProcessor.java index 6e07afb..af08c12 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/StopProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/StopProcessor.java @@ -19,6 +19,7 @@ package org.apache.camel.processor; import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; import org.apache.camel.Exchange; +import org.apache.camel.spi.IdAware; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorHelper; @@ -27,7 +28,9 @@ import org.apache.camel.util.AsyncProcessorHelper; * * @version */ -public class StopProcessor extends ServiceSupport implements AsyncProcessor { +public class StopProcessor extends ServiceSupport implements AsyncProcessor, IdAware { + + private String id; public void process(Exchange exchange) throws Exception { AsyncProcessorHelper.process(this, exchange); @@ -46,6 +49,14 @@ public class StopProcessor extends ServiceSupport implements AsyncProcessor { return "Stop"; } + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + @Override protected void doStart() throws Exception { // noop http://git-wip-us.apache.org/repos/asf/camel/blob/eaf8a3c4/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java b/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java index c51a446..71af68f 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java +++ b/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java @@ -35,6 +35,7 @@ import org.apache.camel.processor.resequencer.ResequencerEngine; import org.apache.camel.processor.resequencer.SequenceElementComparator; import org.apache.camel.processor.resequencer.SequenceSender; import org.apache.camel.spi.ExceptionHandler; +import org.apache.camel.spi.IdAware; import org.apache.camel.support.LoggingExceptionHandler; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorHelper; @@ -65,11 +66,12 @@ import org.slf4j.LoggerFactory; * * @see ResequencerEngine */ -public class StreamResequencer extends ServiceSupport implements SequenceSender<Exchange>, AsyncProcessor, Navigate<Processor>, Traceable { +public class StreamResequencer extends ServiceSupport implements SequenceSender<Exchange>, AsyncProcessor, Navigate<Processor>, Traceable, IdAware { private static final long DELIVERY_ATTEMPT_INTERVAL = 1000L; private static final Logger LOG = LoggerFactory.getLogger(StreamResequencer.class); + private String id; private final CamelContext camelContext; private final ExceptionHandler exceptionHandler; private final ResequencerEngine<Exchange> engine; @@ -167,6 +169,14 @@ public class StreamResequencer extends ServiceSupport implements SequenceSender< return "streamResequence"; } + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + @Override protected void doStart() throws Exception { ServiceHelper.startServices(processor); http://git-wip-us.apache.org/repos/asf/camel/blob/eaf8a3c4/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java index 183cfa0..3b7073c 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java @@ -26,6 +26,7 @@ import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.Rejectable; import org.apache.camel.ThreadPoolRejectedPolicy; +import org.apache.camel.spi.IdAware; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorHelper; import org.apache.camel.util.ObjectHelper; @@ -55,9 +56,10 @@ import org.slf4j.LoggerFactory; * will not be free to process a new exchange, as its processing the current exchange.</li> * </ul> */ -public class ThreadsProcessor extends ServiceSupport implements AsyncProcessor { +public class ThreadsProcessor extends ServiceSupport implements AsyncProcessor, IdAware { private static final Logger LOG = LoggerFactory.getLogger(ThreadsProcessor.class); + private String id; private final CamelContext camelContext; private final ExecutorService executorService; private volatile boolean shutdownExecutorService; @@ -174,6 +176,14 @@ public class ThreadsProcessor extends ServiceSupport implements AsyncProcessor { return "Threads"; } + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + protected void doStart() throws Exception { shutdown.set(false); } http://git-wip-us.apache.org/repos/asf/camel/blob/eaf8a3c4/camel-core/src/main/java/org/apache/camel/processor/Throttler.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java index c986bf7..a711f6d 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java +++ b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java @@ -26,6 +26,7 @@ import org.apache.camel.Expression; import org.apache.camel.Processor; import org.apache.camel.RuntimeExchangeException; import org.apache.camel.Traceable; +import org.apache.camel.spi.IdAware; import org.apache.camel.util.ObjectHelper; /** @@ -39,7 +40,8 @@ import org.apache.camel.util.ObjectHelper; * * @version */ -public class Throttler extends DelayProcessorSupport implements Traceable { +public class Throttler extends DelayProcessorSupport implements Traceable, IdAware { + private String id; private volatile long maximumRequestsPerPeriod; private Expression maxRequestsPerPeriodExpression; private AtomicLong timePeriodMillis = new AtomicLong(1000); @@ -70,6 +72,14 @@ public class Throttler extends DelayProcessorSupport implements Traceable { return "throttle[" + maxRequestsPerPeriodExpression + " per: " + timePeriodMillis + "]"; } + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + // Properties // ----------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/eaf8a3c4/camel-core/src/main/java/org/apache/camel/processor/ThrottlerRejectedExecutionException.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/ThrottlerRejectedExecutionException.java b/camel-core/src/main/java/org/apache/camel/processor/ThrottlerRejectedExecutionException.java index 144fd7b..7246b7c 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/ThrottlerRejectedExecutionException.java +++ b/camel-core/src/main/java/org/apache/camel/processor/ThrottlerRejectedExecutionException.java @@ -18,13 +18,12 @@ package org.apache.camel.processor; import java.util.concurrent.RejectedExecutionException; - -public class ThrottlerRejectedExecutionException - extends RejectedExecutionException { +public class ThrottlerRejectedExecutionException extends RejectedExecutionException { private static final long serialVersionUID = 1L; public ThrottlerRejectedExecutionException(String message) { super(message); } + } http://git-wip-us.apache.org/repos/asf/camel/blob/eaf8a3c4/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java b/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java index 8cc0616..4c0ae34 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java +++ b/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java @@ -25,6 +25,7 @@ import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; +import org.apache.camel.spi.IdAware; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorHelper; import org.apache.camel.util.CamelLogger; @@ -37,9 +38,10 @@ import org.slf4j.LoggerFactory; * * @version */ -public class ThroughputLogger extends ServiceSupport implements AsyncProcessor { +public class ThroughputLogger extends ServiceSupport implements AsyncProcessor, IdAware { private static final Logger LOG = LoggerFactory.getLogger(ThroughputLogger.class); + private String id; private final AtomicInteger receivedCounter = new AtomicInteger(); private NumberFormat numberFormat = NumberFormat.getNumberInstance(); private long groupReceivedCount; @@ -76,6 +78,14 @@ public class ThroughputLogger extends ServiceSupport implements AsyncProcessor { } } + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + public void process(Exchange exchange) throws Exception { AsyncProcessorHelper.process(this, exchange); } http://git-wip-us.apache.org/repos/asf/camel/blob/eaf8a3c4/camel-core/src/main/java/org/apache/camel/processor/ThrowExceptionProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/ThrowExceptionProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/ThrowExceptionProcessor.java index d96ab7f..728f311 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/ThrowExceptionProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/ThrowExceptionProcessor.java @@ -20,6 +20,7 @@ import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; import org.apache.camel.Exchange; import org.apache.camel.Traceable; +import org.apache.camel.spi.IdAware; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorHelper; import org.apache.camel.util.ObjectHelper; @@ -27,7 +28,8 @@ import org.apache.camel.util.ObjectHelper; /** * The processor which sets an {@link Exception} on the {@link Exchange} */ -public class ThrowExceptionProcessor extends ServiceSupport implements AsyncProcessor, Traceable { +public class ThrowExceptionProcessor extends ServiceSupport implements AsyncProcessor, Traceable, IdAware { + private String id; private final Exception exception; public ThrowExceptionProcessor(Exception exception) { @@ -49,6 +51,18 @@ public class ThrowExceptionProcessor extends ServiceSupport implements AsyncProc return "throwException[" + exception.getClass().getSimpleName() + "]"; } + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public Exception getException() { + return exception; + } + public String toString() { return "ThrowException"; } http://git-wip-us.apache.org/repos/asf/camel/blob/eaf8a3c4/camel-core/src/main/java/org/apache/camel/processor/TransformProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/TransformProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/TransformProcessor.java index 61db471..a0b0662 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/TransformProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/TransformProcessor.java @@ -23,6 +23,7 @@ import org.apache.camel.Expression; import org.apache.camel.Message; import org.apache.camel.Traceable; import org.apache.camel.impl.DefaultMessage; +import org.apache.camel.spi.IdAware; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorHelper; import org.apache.camel.util.ExchangeHelper; @@ -31,7 +32,8 @@ import org.apache.camel.util.ObjectHelper; /** * A processor which sets the body on the OUT message with an {@link Expression} */ -public class TransformProcessor extends ServiceSupport implements AsyncProcessor, Traceable { +public class TransformProcessor extends ServiceSupport implements AsyncProcessor, Traceable, IdAware { + private String id; private final Expression expression; public TransformProcessor(Expression expression) { @@ -88,6 +90,18 @@ public class TransformProcessor extends ServiceSupport implements AsyncProcessor return "transform[" + expression + "]"; } + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public Expression getExpression() { + return expression; + } + @Override protected void doStart() throws Exception { // noop http://git-wip-us.apache.org/repos/asf/camel/blob/eaf8a3c4/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java index 24cea69..807ebe3 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java @@ -26,6 +26,7 @@ import org.apache.camel.Exchange; import org.apache.camel.Navigate; import org.apache.camel.Processor; import org.apache.camel.Traceable; +import org.apache.camel.spi.IdAware; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorConverterHelper; import org.apache.camel.util.AsyncProcessorHelper; @@ -41,9 +42,10 @@ import static org.apache.camel.processor.PipelineHelper.continueProcessing; * * @version */ -public class TryProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable { +public class TryProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable, IdAware { private static final Logger LOG = LoggerFactory.getLogger(TryProcessor.class); + protected String id; protected final Processor tryProcessor; protected final List<Processor> catchClauses; protected final Processor finallyProcessor; @@ -188,4 +190,11 @@ public class TryProcessor extends ServiceSupport implements AsyncProcessor, Navi return tryProcessor != null || catchClauses != null && !catchClauses.isEmpty() || finallyProcessor != null; } + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/eaf8a3c4/camel-core/src/main/java/org/apache/camel/processor/UnmarshalProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/UnmarshalProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/UnmarshalProcessor.java index e81a283..60883c0 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/UnmarshalProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/UnmarshalProcessor.java @@ -28,6 +28,7 @@ import org.apache.camel.Message; import org.apache.camel.RuntimeCamelException; import org.apache.camel.Traceable; import org.apache.camel.spi.DataFormat; +import org.apache.camel.spi.IdAware; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorHelper; import org.apache.camel.util.IOHelper; @@ -40,7 +41,8 @@ import org.apache.camel.util.ServiceHelper; * * @version */ -public class UnmarshalProcessor extends ServiceSupport implements AsyncProcessor, Traceable, CamelContextAware { +public class UnmarshalProcessor extends ServiceSupport implements AsyncProcessor, Traceable, CamelContextAware, IdAware { + private String id; private CamelContext camelContext; private final DataFormat dataFormat; @@ -98,6 +100,14 @@ public class UnmarshalProcessor extends ServiceSupport implements AsyncProcessor return "unmarshal[" + dataFormat + "]"; } + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + public CamelContext getCamelContext() { return camelContext; } http://git-wip-us.apache.org/repos/asf/camel/blob/eaf8a3c4/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java index 3f71226..532608d 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java @@ -34,6 +34,7 @@ import org.apache.camel.Processor; import org.apache.camel.StreamCache; import org.apache.camel.Traceable; import org.apache.camel.impl.DefaultExchange; +import org.apache.camel.spi.IdAware; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorHelper; import org.apache.camel.util.ExchangeHelper; @@ -47,8 +48,9 @@ import org.slf4j.LoggerFactory; * * @version */ -public class WireTapProcessor extends ServiceSupport implements AsyncProcessor, Traceable, EndpointAware { +public class WireTapProcessor extends ServiceSupport implements AsyncProcessor, Traceable, EndpointAware, IdAware { private static final Logger LOG = LoggerFactory.getLogger(WireTapProcessor.class); + private String id; private final Endpoint destination; private final Processor processor; private final ExchangePattern exchangePattern; @@ -82,6 +84,14 @@ public class WireTapProcessor extends ServiceSupport implements AsyncProcessor, return "wireTap(" + destination + ")"; } + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + public Endpoint getEndpoint() { return destination; } http://git-wip-us.apache.org/repos/asf/camel/blob/eaf8a3c4/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java index 73bf0da..b71c0bf 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java @@ -48,6 +48,7 @@ import org.apache.camel.TimeoutMap; import org.apache.camel.Traceable; import org.apache.camel.spi.AggregationRepository; import org.apache.camel.spi.ExceptionHandler; +import org.apache.camel.spi.IdAware; import org.apache.camel.spi.OptimisticLockingAggregationRepository; import org.apache.camel.spi.RecoverableAggregationRepository; import org.apache.camel.spi.ShutdownPrepared; @@ -80,7 +81,7 @@ import org.slf4j.LoggerFactory; * and older prices are discarded). Another idea is to combine line item messages * together into a single invoice message. */ -public class AggregateProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable, ShutdownPrepared { +public class AggregateProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable, ShutdownPrepared, IdAware { public static final String AGGREGATE_TIMEOUT_CHECKER = "AggregateTimeoutChecker"; @@ -89,6 +90,7 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor private final Lock lock = new ReentrantLock(); private final CamelContext camelContext; private final Processor processor; + private String id; private AggregationStrategy aggregationStrategy; private Expression correlationExpression; private AggregateController aggregateController; @@ -243,6 +245,14 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor return processor != null; } + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + public void process(Exchange exchange) throws Exception { AsyncProcessorHelper.process(this, exchange); } http://git-wip-us.apache.org/repos/asf/camel/blob/eaf8a3c4/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java b/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java index d3afe7a..67d20d5 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java +++ b/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java @@ -27,6 +27,7 @@ import org.apache.camel.Expression; import org.apache.camel.Navigate; import org.apache.camel.Processor; import org.apache.camel.spi.ExchangeIdempotentRepository; +import org.apache.camel.spi.IdAware; import org.apache.camel.spi.IdempotentRepository; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorConverterHelper; @@ -48,8 +49,9 @@ import org.slf4j.LoggerFactory; * @see org.apache.camel.spi.IdempotentRepository * @see org.apache.camel.spi.ExchangeIdempotentRepository */ -public class IdempotentConsumer extends ServiceSupport implements AsyncProcessor, Navigate<Processor> { +public class IdempotentConsumer extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, IdAware { private static final Logger LOG = LoggerFactory.getLogger(IdempotentConsumer.class); + private String id; private final Expression messageIdExpression; private final AsyncProcessor processor; private final IdempotentRepository<String> idempotentRepository; @@ -73,6 +75,14 @@ public class IdempotentConsumer extends ServiceSupport implements AsyncProcessor return "IdempotentConsumer[" + messageIdExpression + " -> " + processor + "]"; } + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + public void process(Exchange exchange) throws Exception { AsyncProcessorHelper.process(this, exchange); }