Updated Branches: refs/heads/master 06952f7ab -> 0bdf8431d
CAMEL-6688: Added option SuppressLoggingOnTimeout to allow shutdown to not log after tineout occurred and doing aggressive shutdown which otherwise may log WARNs about messages not being complete and whatnot. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0bdf8431 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0bdf8431 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0bdf8431 Branch: refs/heads/master Commit: 0bdf8431df29986fa89ff50bc5c594c99d4e5711 Parents: 06952f7 Author: Claus Ibsen <davscl...@apache.org> Authored: Mon Sep 2 10:42:52 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Mon Sep 2 10:49:04 2013 +0200 ---------------------------------------------------------------------- .../component/file/GenericFileOnCompletion.java | 14 +++--- .../camel/component/seda/SedaConsumer.java | 4 +- .../BridgeExceptionHandlerToErrorHandler.java | 2 +- .../org/apache/camel/impl/DefaultConsumer.java | 4 +- .../camel/impl/DefaultShutdownStrategy.java | 41 ++++++++++++--- .../camel/impl/EventDrivenPollingConsumer.java | 3 +- .../camel/impl/LoggingExceptionHandler.java | 52 ++++++++++++++++---- .../camel/impl/PollingConsumerSupport.java | 4 +- .../apache/camel/impl/RoutePolicySupport.java | 11 +++-- .../apache/camel/processor/BatchProcessor.java | 4 +- .../camel/processor/StreamResequencer.java | 2 +- .../processor/aggregate/AggregateProcessor.java | 3 +- .../org/apache/camel/spi/ShutdownStrategy.java | 31 ++++++++++++ ...ownStrategySuppressLoggingOnTimeoutTest.java | 46 +++++++++++++++++ 14 files changed, 175 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/0bdf8431/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java index 9511159..5bedc00 100644 --- a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java +++ b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java @@ -47,6 +47,7 @@ public class GenericFileOnCompletion<T> implements Synchronization { this.operations = operations; this.file = file; this.absoluteFileName = absoluteFileName; + this.exceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), getClass()); } public void onComplete(Exchange exchange) { @@ -58,9 +59,6 @@ public class GenericFileOnCompletion<T> implements Synchronization { } public ExceptionHandler getExceptionHandler() { - if (exceptionHandler == null) { - exceptionHandler = new LoggingExceptionHandler(getClass()); - } return exceptionHandler; } @@ -136,7 +134,7 @@ public class GenericFileOnCompletion<T> implements Synchronization { log.warn("Done file: " + doneFileName + " could not be deleted"); } } catch (Exception e) { - handleException(e); + handleException("Error deleting done file: " + doneFileName, exchange, e); } } } @@ -145,7 +143,7 @@ public class GenericFileOnCompletion<T> implements Synchronization { log.trace("Commit file strategy: {} for file: {}", processStrategy, file); processStrategy.commit(operations, endpoint, exchange, file); } catch (Exception e) { - handleException(e); + handleException("Error during commit", exchange, e); } } @@ -165,13 +163,13 @@ public class GenericFileOnCompletion<T> implements Synchronization { try { processStrategy.rollback(operations, endpoint, exchange, file); } catch (Exception e) { - handleException(e); + handleException("Error during rollback", exchange, e); } } - protected void handleException(Throwable t) { + protected void handleException(String message, Exchange exchange, Throwable t) { Throwable newt = (t == null) ? new IllegalArgumentException("Handling [null] exception") : t; - getExceptionHandler().handleException(newt); + getExceptionHandler().handleException(message, exchange, newt); } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/0bdf8431/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java index ce6d69d..d0f47bd 100644 --- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java +++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java @@ -69,6 +69,7 @@ public class SedaConsumer extends ServiceSupport implements Consumer, Runnable, this.endpoint = endpoint; this.processor = AsyncProcessorConverterHelper.convert(processor); this.pollTimeout = endpoint.getPollTimeout(); + this.exceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), getClass()); } @Override @@ -81,9 +82,6 @@ public class SedaConsumer extends ServiceSupport implements Consumer, Runnable, } public ExceptionHandler getExceptionHandler() { - if (exceptionHandler == null) { - exceptionHandler = new LoggingExceptionHandler(getClass()); - } return exceptionHandler; } http://git-wip-us.apache.org/repos/asf/camel/blob/0bdf8431/camel-core/src/main/java/org/apache/camel/impl/BridgeExceptionHandlerToErrorHandler.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/BridgeExceptionHandlerToErrorHandler.java b/camel-core/src/main/java/org/apache/camel/impl/BridgeExceptionHandlerToErrorHandler.java index 12ba590..6292189 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/BridgeExceptionHandlerToErrorHandler.java +++ b/camel-core/src/main/java/org/apache/camel/impl/BridgeExceptionHandlerToErrorHandler.java @@ -44,7 +44,7 @@ public class BridgeExceptionHandlerToErrorHandler implements ExceptionHandler { public BridgeExceptionHandlerToErrorHandler(DefaultConsumer consumer) { this.consumer = consumer; - this.fallback = new LoggingExceptionHandler(consumer.getClass()); + this.fallback = new LoggingExceptionHandler(consumer.getEndpoint().getCamelContext(), consumer.getClass()); this.bridge = consumer.getProcessor(); } http://git-wip-us.apache.org/repos/asf/camel/blob/0bdf8431/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java index fd400ec..74e9555 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java @@ -46,6 +46,7 @@ public class DefaultConsumer extends ServiceSupport implements Consumer { public DefaultConsumer(Endpoint endpoint, Processor processor) { this.endpoint = endpoint; this.processor = processor; + this.exceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), getClass()); } @Override @@ -105,9 +106,6 @@ public class DefaultConsumer extends ServiceSupport implements Consumer { } public ExceptionHandler getExceptionHandler() { - if (exceptionHandler == null) { - exceptionHandler = new LoggingExceptionHandler(getClass()); - } return exceptionHandler; } http://git-wip-us.apache.org/repos/asf/camel/blob/0bdf8431/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java index d94b34e..fddd80a 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java @@ -93,6 +93,11 @@ import org.slf4j.LoggerFactory; * the strategy performs a more aggressive forced shutdown, by forcing all consumers to shutdown * and then invokes {@link ShutdownPrepared#prepareShutdown(boolean)} with <tt>force=true</tt> * on the services. This allows the services to know they should force shutdown now. + * <p/> + * When timeout occurred and a forced shutdown is happening, then there may be threads/tasks which are + * still inflight which may be rejected continued being routed. By default this can cause WARN and ERRORs + * to be logged. The option {@link #setSuppressLoggingOnTimeout(boolean)} can be used to suppress these + * logs, so they are logged at TRACE level instead. * * @version */ @@ -105,7 +110,9 @@ public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownS private TimeUnit timeUnit = TimeUnit.SECONDS; private boolean shutdownNowOnTimeout = true; private boolean shutdownRoutesInReverseOrder = true; + private boolean suppressLoggingOnTimeout; private volatile boolean forceShutdown; + private final AtomicBoolean timeoutOccurred = new AtomicBoolean(); public DefaultShutdownStrategy() { } @@ -165,7 +172,7 @@ public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownS LOG.info("Starting to graceful shutdown " + routesOrdered.size() + " routes (timeout " + timeout + " " + timeUnit.toString().toLowerCase(Locale.ENGLISH) + ")"); // use another thread to perform the shutdowns so we can support timeout - final AtomicBoolean timeoutOccurred = new AtomicBoolean(); + timeoutOccurred.set(false); Future<?> future = getExecutorService().submit(new ShutdownTask(context, routesOrdered, timeout, timeUnit, suspendOnly, abortAfterTimeout, timeoutOccurred)); try { future.get(timeout, timeUnit); @@ -192,7 +199,7 @@ public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownS // now the route consumers has been shutdown, then prepare route services for shutdown now (forced) for (RouteStartupOrder order : routes) { for (Service service : order.getServices()) { - prepareShutdown(service, true, true); + prepareShutdown(service, true, true, isSuppressLoggingOnTimeout()); } } } else { @@ -216,6 +223,11 @@ public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownS return forceShutdown; } + @Override + public boolean hasTimeoutOccurred() { + return timeoutOccurred.get(); + } + public void setTimeout(long timeout) { if (timeout <= 0) { throw new IllegalArgumentException("Timeout must be a positive value"); @@ -251,6 +263,14 @@ public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownS this.shutdownRoutesInReverseOrder = shutdownRoutesInReverseOrder; } + public boolean isSuppressLoggingOnTimeout() { + return suppressLoggingOnTimeout; + } + + public void setSuppressLoggingOnTimeout(boolean suppressLoggingOnTimeout) { + this.suppressLoggingOnTimeout = suppressLoggingOnTimeout; + } + public CamelContext getCamelContext() { return camelContext; } @@ -345,6 +365,7 @@ public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownS ObjectHelper.notNull(camelContext, "CamelContext"); // reset option forceShutdown = false; + timeoutOccurred.set(false); } @Override @@ -370,7 +391,7 @@ public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownS * @param forced whether to force shutdown * @param includeChildren whether to prepare the child of the service as well */ - private static void prepareShutdown(Service service, boolean forced, boolean includeChildren) { + private static void prepareShutdown(Service service, boolean forced, boolean includeChildren, boolean suppressLogging) { Set<Service> list; if (includeChildren) { // include error handlers as we want to prepare them for shutdown as well @@ -386,7 +407,11 @@ public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownS LOG.trace("Preparing {} shutdown on {}", forced ? "forced" : "", child); ((ShutdownPrepared) child).prepareShutdown(forced); } catch (Exception e) { - LOG.warn("Error during prepare shutdown on " + child + ". This exception will be ignored.", e); + if (suppressLogging) { + LOG.trace("Error during prepare shutdown on " + child + ". This exception will be ignored.", e); + } else { + LOG.warn("Error during prepare shutdown on " + child + ". This exception will be ignored.", e); + } } } } @@ -509,7 +534,7 @@ public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownS if (service instanceof Consumer) { continue; } - prepareShutdown(service, false, true); + prepareShutdown(service, false, true, false); } } @@ -558,7 +583,8 @@ public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownS if (consumer instanceof ShutdownAware) { LOG.trace("Route: {} preparing to shutdown.", deferred.getRoute().getId()); boolean forced = context.getShutdownStrategy().forceShutdown(consumer); - prepareShutdown(consumer, forced, false); + boolean suppress = context.getShutdownStrategy().isSuppressLoggingOnTimeout(); + prepareShutdown(consumer, forced, false, suppress); LOG.debug("Route: {} preparing to shutdown complete.", deferred.getRoute().getId()); } } @@ -579,7 +605,8 @@ public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownS for (RouteStartupOrder order : routes) { for (Service service : order.getServices()) { boolean forced = context.getShutdownStrategy().forceShutdown(service); - prepareShutdown(service, forced, true); + boolean suppress = context.getShutdownStrategy().isSuppressLoggingOnTimeout(); + prepareShutdown(service, forced, true, suppress); } } } http://git-wip-us.apache.org/repos/asf/camel/blob/0bdf8431/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java b/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java index 27d9cc4..07bcf07 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java +++ b/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java @@ -41,7 +41,7 @@ import org.slf4j.LoggerFactory; public class EventDrivenPollingConsumer extends PollingConsumerSupport implements Processor { private static final Logger LOG = LoggerFactory.getLogger(EventDrivenPollingConsumer.class); private final BlockingQueue<Exchange> queue; - private ExceptionHandler interruptedExceptionHandler = new LoggingExceptionHandler(EventDrivenPollingConsumer.class); + private ExceptionHandler interruptedExceptionHandler; private Consumer consumer; public EventDrivenPollingConsumer(Endpoint endpoint) { @@ -51,6 +51,7 @@ public class EventDrivenPollingConsumer extends PollingConsumerSupport implement public EventDrivenPollingConsumer(Endpoint endpoint, BlockingQueue<Exchange> queue) { super(endpoint); this.queue = queue; + this.interruptedExceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), EventDrivenPollingConsumer.class); } public Exchange receiveNoWait() { http://git-wip-us.apache.org/repos/asf/camel/blob/0bdf8431/camel-core/src/main/java/org/apache/camel/impl/LoggingExceptionHandler.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/LoggingExceptionHandler.java b/camel-core/src/main/java/org/apache/camel/impl/LoggingExceptionHandler.java index 6518ad4..ba3fc28 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/LoggingExceptionHandler.java +++ b/camel-core/src/main/java/org/apache/camel/impl/LoggingExceptionHandler.java @@ -16,6 +16,7 @@ */ package org.apache.camel.impl; +import org.apache.camel.CamelContext; import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; import org.apache.camel.LoggingLevel; @@ -29,21 +30,41 @@ import org.slf4j.LoggerFactory; * log the exception. * <p/> * This implementation will by default log the exception with stack trace at WARN level. + * <p/> + * This implementation honors the {@link org.apache.camel.impl.DefaultShutdownStrategy#isSuppressLoggingOnTimeout()} + * option to avoid logging if the logging should be suppressed. * * @version */ public class LoggingExceptionHandler implements ExceptionHandler { private final CamelLogger logger; + private final CamelContext camelContext; + @Deprecated public LoggingExceptionHandler(Class<?> ownerType) { - this(new CamelLogger(LoggerFactory.getLogger(ownerType), LoggingLevel.WARN)); + this(null, new CamelLogger(LoggerFactory.getLogger(ownerType), LoggingLevel.WARN)); + } + + public LoggingExceptionHandler(CamelContext camelContext, Class<?> ownerType) { + this(camelContext, new CamelLogger(LoggerFactory.getLogger(ownerType), LoggingLevel.WARN)); } + @Deprecated public LoggingExceptionHandler(Class<?> ownerType, LoggingLevel level) { - this(new CamelLogger(LoggerFactory.getLogger(ownerType), level)); + this(null, new CamelLogger(LoggerFactory.getLogger(ownerType), level)); } + public LoggingExceptionHandler(CamelContext camelContext, Class<?> ownerType, LoggingLevel level) { + this(camelContext, new CamelLogger(LoggerFactory.getLogger(ownerType), level)); + } + + @Deprecated public LoggingExceptionHandler(CamelLogger logger) { + this(null, logger); + } + + public LoggingExceptionHandler(CamelContext camelContext, CamelLogger logger) { + this.camelContext = camelContext; this.logger = logger; } @@ -57,15 +78,17 @@ public class LoggingExceptionHandler implements ExceptionHandler { public void handleException(String message, Exchange exchange, Throwable exception) { try { - String msg = CamelExchangeException.createExceptionMessage(message, exchange, exception); - if (isCausedByRollbackExchangeException(exception)) { - // do not log stack trace for intended rollbacks - logger.log(msg); - } else { - if (exception != null) { - logger.log(msg, exception); - } else { + if (!isSuppressLogging()) { + String msg = CamelExchangeException.createExceptionMessage(message, exchange, exception); + if (isCausedByRollbackExchangeException(exception)) { + // do not log stack trace for intended rollbacks logger.log(msg); + } else { + if (exception != null) { + logger.log(msg, exception); + } else { + logger.log(msg); + } } } } catch (Throwable e) { @@ -86,4 +109,13 @@ public class LoggingExceptionHandler implements ExceptionHandler { return false; } + + protected boolean isSuppressLogging() { + if (camelContext != null) { + return (camelContext.getStatus().isStopping() || camelContext.getStatus().isStopped()) + && camelContext.getShutdownStrategy().hasTimeoutOccurred() && camelContext.getShutdownStrategy().isSuppressLoggingOnTimeout(); + } else { + return false; + } + } } http://git-wip-us.apache.org/repos/asf/camel/blob/0bdf8431/camel-core/src/main/java/org/apache/camel/impl/PollingConsumerSupport.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/PollingConsumerSupport.java b/camel-core/src/main/java/org/apache/camel/impl/PollingConsumerSupport.java index 626ed0a..95e0a75 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/PollingConsumerSupport.java +++ b/camel-core/src/main/java/org/apache/camel/impl/PollingConsumerSupport.java @@ -35,6 +35,7 @@ public abstract class PollingConsumerSupport extends ServiceSupport implements P public PollingConsumerSupport(Endpoint endpoint) { this.endpoint = endpoint; + this.exceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), getClass()); } @Override @@ -47,9 +48,6 @@ public abstract class PollingConsumerSupport extends ServiceSupport implements P } public ExceptionHandler getExceptionHandler() { - if (exceptionHandler == null) { - exceptionHandler = new LoggingExceptionHandler(getClass()); - } return exceptionHandler; } http://git-wip-us.apache.org/repos/asf/camel/blob/0bdf8431/camel-core/src/main/java/org/apache/camel/impl/RoutePolicySupport.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/RoutePolicySupport.java b/camel-core/src/main/java/org/apache/camel/impl/RoutePolicySupport.java index f39cb76..c76b4e2 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/RoutePolicySupport.java +++ b/camel-core/src/main/java/org/apache/camel/impl/RoutePolicySupport.java @@ -41,7 +41,9 @@ public abstract class RoutePolicySupport extends ServiceSupport implements Route private ExceptionHandler exceptionHandler; public void onInit(Route route) { - // noop + if (exceptionHandler == null) { + exceptionHandler = new LoggingExceptionHandler(route.getRouteContext().getCamelContext(), getClass()); + } } public void onRemove(Route route) { @@ -122,7 +124,9 @@ public abstract class RoutePolicySupport extends ServiceSupport implements Route * @param t the exception to handle */ protected void handleException(Throwable t) { - getExceptionHandler().handleException(t); + if (exceptionHandler != null) { + exceptionHandler.handleException(t); + } } @Override @@ -136,9 +140,6 @@ public abstract class RoutePolicySupport extends ServiceSupport implements Route } public ExceptionHandler getExceptionHandler() { - if (exceptionHandler == null) { - exceptionHandler = new LoggingExceptionHandler(getClass()); - } return exceptionHandler; } http://git-wip-us.apache.org/repos/asf/camel/blob/0bdf8431/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java index 3caea14..2bb93f7 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java @@ -88,6 +88,7 @@ public class BatchProcessor extends ServiceSupport implements AsyncProcessor, Na this.collection = collection; this.expression = expression; this.sender = new BatchSender(); + this.exceptionHandler = new LoggingExceptionHandler(camelContext, getClass()); } @Override @@ -98,9 +99,6 @@ public class BatchProcessor extends ServiceSupport implements AsyncProcessor, Na // Properties // ------------------------------------------------------------------------- public ExceptionHandler getExceptionHandler() { - if (exceptionHandler == null) { - exceptionHandler = new LoggingExceptionHandler(getClass()); - } return exceptionHandler; } http://git-wip-us.apache.org/repos/asf/camel/blob/0bdf8431/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java b/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java index dea7fcf..c8426d7 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java +++ b/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java @@ -87,10 +87,10 @@ public class StreamResequencer extends ServiceSupport implements SequenceSender< public StreamResequencer(CamelContext camelContext, Processor processor, SequenceElementComparator<Exchange> comparator) { ObjectHelper.notNull(camelContext, "CamelContext"); this.camelContext = camelContext; - this.exceptionHandler = new LoggingExceptionHandler(getClass()); this.engine = new ResequencerEngine<Exchange>(comparator); this.engine.setSequenceSender(this); this.processor = processor; + this.exceptionHandler = new LoggingExceptionHandler(camelContext, getClass()); } /** http://git-wip-us.apache.org/repos/asf/camel/blob/0bdf8431/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java index 0f716fb..8e2a26f 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java @@ -98,7 +98,7 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor private ScheduledExecutorService recoverService; // store correlation key -> exchange id in timeout map private TimeoutMap<String, String> timeoutMap; - private ExceptionHandler exceptionHandler = new LoggingExceptionHandler(getClass()); + private ExceptionHandler exceptionHandler; private AggregationRepository aggregationRepository; private Map<String, String> closedCorrelationKeys; private final Set<String> batchConsumerCorrelationKeys = new ConcurrentSkipListSet<String>(); @@ -145,6 +145,7 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor this.aggregationStrategy = aggregationStrategy; this.executorService = executorService; this.shutdownExecutorService = shutdownExecutorService; + this.exceptionHandler = new LoggingExceptionHandler(camelContext, getClass()); } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/0bdf8431/camel-core/src/main/java/org/apache/camel/spi/ShutdownStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/spi/ShutdownStrategy.java b/camel-core/src/main/java/org/apache/camel/spi/ShutdownStrategy.java index b063304..b9a9187 100644 --- a/camel-core/src/main/java/org/apache/camel/spi/ShutdownStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/spi/ShutdownStrategy.java @@ -144,6 +144,32 @@ public interface ShutdownStrategy extends StaticService { TimeUnit getTimeUnit(); /** + * Whether Camel should try to suppress logging during shutdown and timeout was triggered, + * meaning forced shutdown is happening. And during forced shutdown we want to avoid logging + * errors/warnings et all in the logs as a side-effect of the forced timeout. + * <p/> + * By default this is <tt>false</tt> + * <p/> + * Notice the suppress is a <i>best effort</i> as there may still be some logs coming + * from 3rd party libraries and whatnot, which Camel cannot control. + * + * @param suppressLoggingOnTimeout <tt>true</tt> to suppress logging, false to log as usual. + */ + void setSuppressLoggingOnTimeout(boolean suppressLoggingOnTimeout); + + /** + * Whether Camel should try to suppress logging during shutdown and timeout was triggered, + * meaning forced shutdown is happening. And during forced shutdown we want to avoid logging + * errors/warnings et all in the logs as a side-effect of the forced timeout. + * <p/> + * By default this is <tt>false</tt> + * <p/> + * Notice the suppress is a <i>best effort</i> as there may still be some logs coming + * from 3rd party libraries and whatnot, which Camel cannot control. + */ + boolean isSuppressLoggingOnTimeout(); + + /** * Sets whether to force shutdown of all consumers when a timeout occurred and thus * not all consumers was shutdown within that period. * <p/> @@ -191,4 +217,9 @@ public interface ShutdownStrategy extends StaticService { */ boolean forceShutdown(Service service); + /** + * Whether a timeout has occurred during a shutdown. + */ + boolean hasTimeoutOccurred(); + } http://git-wip-us.apache.org/repos/asf/camel/blob/0bdf8431/camel-core/src/test/java/org/apache/camel/impl/ShutdownStrategySuppressLoggingOnTimeoutTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/impl/ShutdownStrategySuppressLoggingOnTimeoutTest.java b/camel-core/src/test/java/org/apache/camel/impl/ShutdownStrategySuppressLoggingOnTimeoutTest.java new file mode 100644 index 0000000..9c77862 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/impl/ShutdownStrategySuppressLoggingOnTimeoutTest.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; + +public class ShutdownStrategySuppressLoggingOnTimeoutTest extends ContextTestSupport { + + public void testSuppressLogging() throws Exception { + context.getShutdownStrategy().setTimeout(1); + context.getShutdownStrategy().setSuppressLoggingOnTimeout(true); + + template.sendBody("seda:foo", "Hello World"); + + Thread.sleep(2000); + + context.stop(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("seda:foo") + .delay(8000) + .to("log:out"); + } + }; + } +}