Repository: camel
Updated Branches:
  refs/heads/master 00bef4ef0 -> f032da66d


CAMEL-8873: ErrorHandler mbean has information how many exchanges are currently 
pending redelivery


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f032da66
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f032da66
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f032da66

Branch: refs/heads/master
Commit: f032da66da934a41360a8b6e1b7755e6577fd5b5
Parents: 00bef4e
Author: Claus Ibsen <davscl...@apache.org>
Authored: Wed Jul 29 14:58:29 2015 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Wed Jul 29 14:58:29 2015 +0200

----------------------------------------------------------------------
 .../mbean/ManagedErrorHandlerMBean.java         |  3 +++
 .../management/mbean/ManagedErrorHandler.java   | 10 ++++++++++
 .../camel/processor/RedeliveryErrorHandler.java | 21 +++++++++++++++++++-
 3 files changed, 33 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/f032da66/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedErrorHandlerMBean.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedErrorHandlerMBean.java
 
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedErrorHandlerMBean.java
index 0cb5a3b..d1a276c 100644
--- 
a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedErrorHandlerMBean.java
+++ 
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedErrorHandlerMBean.java
@@ -44,6 +44,9 @@ public interface ManagedErrorHandlerMBean {
     @ManagedAttribute(description = "Endpoint Uri for the dead letter channel 
where dead message is move to", mask = true)
     String getDeadLetterChannelEndpointUri();
 
+    @ManagedAttribute(description = "Number of Exchanges scheduled for 
redelivery (waiting to be redelivered in the future)")
+    Integer getPendingRedeliveryCount();
+
     @ManagedAttribute(description = "RedeliveryPolicy for maximum 
redeliveries")
     Integer getMaximumRedeliveries();
 

http://git-wip-us.apache.org/repos/asf/camel/blob/f032da66/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedErrorHandler.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedErrorHandler.java
 
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedErrorHandler.java
index e298ae8..e7e0aea 100644
--- 
a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedErrorHandler.java
+++ 
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedErrorHandler.java
@@ -456,4 +456,14 @@ public class ManagedErrorHandler implements 
ManagedErrorHandlerMBean {
         RedeliveryErrorHandler redelivery = (RedeliveryErrorHandler) 
errorHandler;
         
redelivery.getRedeliveryPolicy().setAllowRedeliveryWhileStopping(allow);
     }
+
+    public Integer getPendingRedeliveryCount() {
+        if (!isSupportRedelivery()) {
+            return null;
+        }
+
+        RedeliveryErrorHandler redelivery = (RedeliveryErrorHandler) 
errorHandler;
+        return redelivery.getPendingRedeliveryCount();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/f032da66/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
 
b/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
index dbb87da..d15d2a1 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
@@ -21,7 +21,9 @@ import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
@@ -41,7 +43,6 @@ import org.apache.camel.util.AsyncProcessorConverterHelper;
 import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.camel.util.CamelContextHelper;
 import org.apache.camel.util.CamelLogger;
-import org.apache.camel.util.EndpointHelper;
 import org.apache.camel.util.EventHelper;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.MessageHelper;
@@ -60,6 +61,7 @@ import org.apache.camel.util.URISupport;
  */
 public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport 
implements AsyncProcessor, ShutdownPrepared, Navigate<Processor> {
 
+    protected final AtomicInteger redeliverySleepCounter = new AtomicInteger();
     protected ScheduledExecutorService executorService;
     protected final CamelContext camelContext;
     protected final Processor deadLetter;
@@ -427,8 +429,12 @@ public abstract class RedeliveryErrorHandler extends 
ErrorHandlerSupport impleme
                         // async delayed redelivery was disabled or we are 
transacted so we must be synchronous
                         // as the transaction manager requires to execute in 
the same thread context
                         try {
+                            // we are doing synchronous redelivery and use 
thread sleep, so we keep track using a counter how many are sleeping
+                            redeliverySleepCounter.incrementAndGet();
                             
data.currentRedeliveryPolicy.sleep(data.redeliveryDelay);
+                            redeliverySleepCounter.decrementAndGet();
                         } catch (InterruptedException e) {
+                            redeliverySleepCounter.decrementAndGet();
                             // we was interrupted so break out
                             exchange.setException(e);
                             // mark the exchange to stop continue routing when 
interrupted
@@ -1313,6 +1319,18 @@ public abstract class RedeliveryErrorHandler extends 
ErrorHandlerSupport impleme
         return false;
     }
 
+    /**
+     * Gets the number of exchanges that are pending for redelivery
+     */
+    public int getPendingRedeliveryCount() {
+        int answer = redeliverySleepCounter.get();
+        if (executorService != null && executorService instanceof 
ThreadPoolExecutor) {
+            answer += ((ThreadPoolExecutor) executorService).getQueue().size();
+        }
+
+        return answer;
+    }
+
     @Override
     protected void doStart() throws Exception {
         ServiceHelper.startServices(output, outputAsync, deadLetter);
@@ -1336,6 +1354,7 @@ public abstract class RedeliveryErrorHandler extends 
ErrorHandlerSupport impleme
 
         // reset flag when starting
         preparingShutdown = false;
+        redeliverySleepCounter.set(0);
     }
 
     @Override

Reply via email to