Repository: camel Updated Branches: refs/heads/master 00bef4ef0 -> f032da66d
CAMEL-8873: ErrorHandler mbean has information how many exchanges are currently pending redelivery Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f032da66 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f032da66 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f032da66 Branch: refs/heads/master Commit: f032da66da934a41360a8b6e1b7755e6577fd5b5 Parents: 00bef4e Author: Claus Ibsen <davscl...@apache.org> Authored: Wed Jul 29 14:58:29 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Jul 29 14:58:29 2015 +0200 ---------------------------------------------------------------------- .../mbean/ManagedErrorHandlerMBean.java | 3 +++ .../management/mbean/ManagedErrorHandler.java | 10 ++++++++++ .../camel/processor/RedeliveryErrorHandler.java | 21 +++++++++++++++++++- 3 files changed, 33 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/f032da66/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedErrorHandlerMBean.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedErrorHandlerMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedErrorHandlerMBean.java index 0cb5a3b..d1a276c 100644 --- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedErrorHandlerMBean.java +++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedErrorHandlerMBean.java @@ -44,6 +44,9 @@ public interface ManagedErrorHandlerMBean { @ManagedAttribute(description = "Endpoint Uri for the dead letter channel where dead message is move to", mask = true) String getDeadLetterChannelEndpointUri(); + @ManagedAttribute(description = "Number of Exchanges scheduled for redelivery (waiting to be redelivered in the future)") + Integer getPendingRedeliveryCount(); + @ManagedAttribute(description = "RedeliveryPolicy for maximum redeliveries") Integer getMaximumRedeliveries(); http://git-wip-us.apache.org/repos/asf/camel/blob/f032da66/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedErrorHandler.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedErrorHandler.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedErrorHandler.java index e298ae8..e7e0aea 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedErrorHandler.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedErrorHandler.java @@ -456,4 +456,14 @@ public class ManagedErrorHandler implements ManagedErrorHandlerMBean { RedeliveryErrorHandler redelivery = (RedeliveryErrorHandler) errorHandler; redelivery.getRedeliveryPolicy().setAllowRedeliveryWhileStopping(allow); } + + public Integer getPendingRedeliveryCount() { + if (!isSupportRedelivery()) { + return null; + } + + RedeliveryErrorHandler redelivery = (RedeliveryErrorHandler) errorHandler; + return redelivery.getPendingRedeliveryCount(); + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/f032da66/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java b/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java index dbb87da..d15d2a1 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java +++ b/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java @@ -21,7 +21,9 @@ import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; @@ -41,7 +43,6 @@ import org.apache.camel.util.AsyncProcessorConverterHelper; import org.apache.camel.util.AsyncProcessorHelper; import org.apache.camel.util.CamelContextHelper; import org.apache.camel.util.CamelLogger; -import org.apache.camel.util.EndpointHelper; import org.apache.camel.util.EventHelper; import org.apache.camel.util.ExchangeHelper; import org.apache.camel.util.MessageHelper; @@ -60,6 +61,7 @@ import org.apache.camel.util.URISupport; */ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport implements AsyncProcessor, ShutdownPrepared, Navigate<Processor> { + protected final AtomicInteger redeliverySleepCounter = new AtomicInteger(); protected ScheduledExecutorService executorService; protected final CamelContext camelContext; protected final Processor deadLetter; @@ -427,8 +429,12 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme // async delayed redelivery was disabled or we are transacted so we must be synchronous // as the transaction manager requires to execute in the same thread context try { + // we are doing synchronous redelivery and use thread sleep, so we keep track using a counter how many are sleeping + redeliverySleepCounter.incrementAndGet(); data.currentRedeliveryPolicy.sleep(data.redeliveryDelay); + redeliverySleepCounter.decrementAndGet(); } catch (InterruptedException e) { + redeliverySleepCounter.decrementAndGet(); // we was interrupted so break out exchange.setException(e); // mark the exchange to stop continue routing when interrupted @@ -1313,6 +1319,18 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme return false; } + /** + * Gets the number of exchanges that are pending for redelivery + */ + public int getPendingRedeliveryCount() { + int answer = redeliverySleepCounter.get(); + if (executorService != null && executorService instanceof ThreadPoolExecutor) { + answer += ((ThreadPoolExecutor) executorService).getQueue().size(); + } + + return answer; + } + @Override protected void doStart() throws Exception { ServiceHelper.startServices(output, outputAsync, deadLetter); @@ -1336,6 +1354,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme // reset flag when starting preparingShutdown = false; + redeliverySleepCounter.set(0); } @Override