Repository: camel
Updated Branches:
  refs/heads/master fe80773ee -> 9ab0bd5c6


CAMEL-8165: Async routing engine - Add insight into threads blocked waiting for 
callbacks


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9ab0bd5c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9ab0bd5c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9ab0bd5c

Branch: refs/heads/master
Commit: 9ab0bd5c677983d7bdec945e76ee6739640c7ff0
Parents: fe80773
Author: Claus Ibsen <davscl...@apache.org>
Authored: Sun Dec 21 10:39:08 2014 +0100
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Sun Dec 21 10:39:08 2014 +0100

----------------------------------------------------------------------
 .../ManagedAsyncProcessorAwaitManagerMBean.java |  27 +++++
 .../impl/DefaultAsyncProcessorAwaitManager.java | 102 ++++++++++++++++++-
 .../ManagedAsyncProcessorAwaitManager.java      |  45 ++++++++
 .../camel/spi/AsyncProcessorAwaitManager.java   |  61 +++++++++++
 ...AsyncProcessorAwaitManagerInterruptTest.java |   4 +
 .../async/AsyncProcessorAwaitManagerTest.java   |   4 +
 6 files changed, 242 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/9ab0bd5c/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAsyncProcessorAwaitManagerMBean.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAsyncProcessorAwaitManagerMBean.java
 
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAsyncProcessorAwaitManagerMBean.java
index bb5b669..8683b38 100644
--- 
a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAsyncProcessorAwaitManagerMBean.java
+++ 
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAsyncProcessorAwaitManagerMBean.java
@@ -38,4 +38,31 @@ public interface ManagedAsyncProcessorAwaitManagerMBean 
extends ManagedServiceMB
     @ManagedOperation(description = "To interrupt an exchange which may seem 
as stuck, to force the exchange to continue, allowing any blocking thread to be 
released.")
     void interrupt(String exchangeId);
 
+    @ManagedAttribute(description = "Number of threads that has been blocked")
+    long getThreadsBlocked();
+
+    @ManagedAttribute(description = "Number of threads that has been 
interrupted")
+    long getThreadsInterrupted();
+
+    @ManagedAttribute(description = "Total wait time in msec.")
+    long getTotalDuration();
+
+    @ManagedAttribute(description = "The minimum wait time in msec.")
+    long getMinDuration();
+
+    @ManagedAttribute(description = "The maximum wait time in msec.")
+    long getMaxDuration();
+
+    @ManagedAttribute(description = "The average wait time in msec.")
+    long getMeanDuration();
+
+    @ManagedOperation(description = "Resets the statistics")
+    void resetStatistics();
+
+    @ManagedAttribute(description = "Utilization statistics enabled")
+    boolean isStatisticsEnabled();
+
+    @ManagedAttribute(description = "Utilization statistics enabled")
+    void setStatisticsEnabled(boolean statisticsEnabled);
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/9ab0bd5c/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java
 
b/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java
index 20c2927..2712178 100644
--- 
a/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java
+++ 
b/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.MessageHistory;
@@ -39,6 +40,14 @@ public class DefaultAsyncProcessorAwaitManager extends 
ServiceSupport implements
 
     private static final Logger LOG = 
LoggerFactory.getLogger(DefaultAsyncProcessorAwaitManager.class);
 
+    private final AsyncProcessorAwaitManager.Statistics statistics = new 
UtilizationStatistics();
+    private final AtomicLong blockedCounter = new AtomicLong();
+    private final AtomicLong interruptedCounter = new AtomicLong();
+    private final AtomicLong totalDuration = new AtomicLong();
+    private final AtomicLong minDuration = new AtomicLong();
+    private final AtomicLong maxDuration = new AtomicLong();
+    private final AtomicLong meanDuration = new AtomicLong();
+
     private final Map<Exchange, AwaitThread> inflight = new 
ConcurrentHashMap<Exchange, AwaitThread>();
     private final ExchangeFormatter exchangeFormatter;
     private boolean interruptThreadsWhileStopping = true;
@@ -58,6 +67,9 @@ public class DefaultAsyncProcessorAwaitManager extends 
ServiceSupport implements
         LOG.trace("Waiting for asynchronous callback before continuing for 
exchangeId: {} -> {}",
                 exchange.getExchangeId(), exchange);
         try {
+            if (statistics.isStatisticsEnabled()) {
+                blockedCounter.incrementAndGet();
+            }
             inflight.put(exchange, new 
AwaitThreadEntry(Thread.currentThread(), exchange, latch));
             latch.await();
             LOG.trace("Asynchronous callback received, will continue routing 
exchangeId: {} -> {}",
@@ -68,7 +80,24 @@ public class DefaultAsyncProcessorAwaitManager extends 
ServiceSupport implements
                     exchange.getExchangeId(), exchange);
             exchange.setException(e);
         } finally {
-            inflight.remove(exchange);
+            AwaitThread thread = inflight.remove(exchange);
+
+            if (statistics.isStatisticsEnabled() && thread != null) {
+                long time = thread.getWaitDuration();
+                long total = totalDuration.get() + time;
+                totalDuration.set(total);
+
+                if (time < minDuration.get()) {
+                    minDuration.set(time);
+                } else if (time > maxDuration.get()) {
+                    maxDuration.set(time);
+                }
+
+                // update mean
+                long count = blockedCounter.get();
+                long mean = count > 0 ? total / count : 0;
+                meanDuration.set(mean);
+            }
         }
     }
 
@@ -127,6 +156,9 @@ public class DefaultAsyncProcessorAwaitManager extends 
ServiceSupport implements
             } catch (Exception e) {
                 throw ObjectHelper.wrapRuntimeCamelException(e);
             } finally {
+                if (statistics.isStatisticsEnabled()) {
+                    interruptedCounter.incrementAndGet();
+                }
                 exchange.setException(new 
RejectedExecutionException("Interrupted while waiting for asynchronous callback 
for exchangeId: " + exchange.getExchangeId()));
                 entry.getLatch().countDown();
             }
@@ -141,6 +173,10 @@ public class DefaultAsyncProcessorAwaitManager extends 
ServiceSupport implements
         this.interruptThreadsWhileStopping = interruptThreadsWhileStopping;
     }
 
+    public Statistics getStatistics() {
+        return statistics;
+    }
+
     @Override
     protected void doStart() throws Exception {
         // noop
@@ -258,4 +294,68 @@ public class DefaultAsyncProcessorAwaitManager extends 
ServiceSupport implements
         }
     }
 
+    /**
+     * Represents utilization statistics
+     */
+    private final class UtilizationStatistics implements 
AsyncProcessorAwaitManager.Statistics {
+
+        private boolean statisticsEnabled;
+
+        @Override
+        public long getThreadsBlocked() {
+            return blockedCounter.get();
+        }
+
+        @Override
+        public long getThreadsInterrupted() {
+            return interruptedCounter.get();
+        }
+
+        @Override
+        public long getTotalDuration() {
+            return totalDuration.get();
+        }
+
+        @Override
+        public long getMinDuration() {
+            return minDuration.get();
+        }
+
+        @Override
+        public long getMaxDuration() {
+            return maxDuration.get();
+        }
+
+        @Override
+        public long getMeanDuration() {
+            return meanDuration.get();
+        }
+
+        @Override
+        public void reset() {
+            blockedCounter.set(0);
+            interruptedCounter.set(0);
+            totalDuration.set(0);
+            minDuration.set(0);
+            maxDuration.set(0);
+            meanDuration.set(0);
+        }
+
+        @Override
+        public boolean isStatisticsEnabled() {
+            return statisticsEnabled;
+        }
+
+        @Override
+        public void setStatisticsEnabled(boolean statisticsEnabled) {
+            this.statisticsEnabled = statisticsEnabled;
+        }
+
+        @Override
+        public String toString() {
+            return String.format("AsyncProcessAwaitManager 
utilization[blocked=%s, interrupted=%s, total=%s min=%s, max=%s, mean=%s]",
+                    getThreadsBlocked(), getThreadsInterrupted(), 
getTotalDuration(), getMinDuration(), getMaxDuration(), getMeanDuration());
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/9ab0bd5c/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAsyncProcessorAwaitManager.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAsyncProcessorAwaitManager.java
 
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAsyncProcessorAwaitManager.java
index a4759ef..5659127 100644
--- 
a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAsyncProcessorAwaitManager.java
+++ 
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAsyncProcessorAwaitManager.java
@@ -92,4 +92,49 @@ public class ManagedAsyncProcessorAwaitManager extends 
ManagedService implements
         manager.interrupt(exchangeId);
     }
 
+    @Override
+    public long getThreadsBlocked() {
+        return manager.getStatistics().getThreadsBlocked();
+    }
+
+    @Override
+    public long getThreadsInterrupted() {
+        return manager.getStatistics().getThreadsInterrupted();
+    }
+
+    @Override
+    public long getTotalDuration() {
+        return manager.getStatistics().getTotalDuration();
+    }
+
+    @Override
+    public long getMinDuration() {
+        return manager.getStatistics().getMinDuration();
+    }
+
+    @Override
+    public long getMaxDuration() {
+        return manager.getStatistics().getMaxDuration();
+    }
+
+    @Override
+    public long getMeanDuration() {
+        return manager.getStatistics().getMeanDuration();
+    }
+
+    @Override
+    public void resetStatistics() {
+        manager.getStatistics().reset();
+    }
+
+    @Override
+    public boolean isStatisticsEnabled() {
+        return manager.getStatistics().isStatisticsEnabled();
+    }
+
+    @Override
+    public void setStatisticsEnabled(boolean statisticsEnabled) {
+        manager.getStatistics().setStatisticsEnabled(statisticsEnabled);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/9ab0bd5c/camel-core/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java 
b/camel-core/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java
index 3081449..d4f8bdd 100644
--- 
a/camel-core/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java
+++ 
b/camel-core/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java
@@ -33,6 +33,59 @@ import org.apache.camel.StaticService;
 public interface AsyncProcessorAwaitManager extends StaticService {
 
     /**
+     * Utilization statistics of the this manager.
+     */
+    interface Statistics {
+
+        /**
+         * Total number of threads that has been blocked
+         */
+        long getThreadsBlocked();
+
+        /**
+         * Total number of threads that has been forced interrupted
+         */
+        long getThreadsInterrupted();
+
+        /**
+         * The total duration time in millis.
+         */
+        long getTotalDuration();
+
+        /**
+         * The lowest duration time in millis.
+         */
+        long getMinDuration();
+
+        /**
+         * The highest duration time in millis.
+         */
+        long getMaxDuration();
+
+        /**
+         * The average duration time in millis.
+         */
+        long getMeanDuration();
+
+        /**
+         * Reset the counters
+         */
+        void reset();
+
+        /**
+         * Whether statistics is enabled.
+         */
+        boolean isStatisticsEnabled();
+
+        /**
+         * Sets whether statistics is enabled.
+         *
+         * @param statisticsEnabled <tt>true</tt> to enable
+         */
+        void setStatisticsEnabled(boolean statisticsEnabled);
+    }
+
+    /**
      * Information about the thread and exchange that are inflight.
      */
     interface AwaitThread {
@@ -136,4 +189,12 @@ public interface AsyncProcessorAwaitManager extends 
StaticService {
      * This is enabled by default which allows Camel to release any blocked 
thread during shutting down Camel itself.
      */
     void setInterruptThreadsWhileStopping(boolean 
interruptThreadsWhileStopping);
+
+    /**
+     * Gets the utilization statistics of this manager
+     *
+     * @return the utilization statistics
+     */
+    Statistics getStatistics();
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/9ab0bd5c/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java
index 2d97c80..c53aa9e 100644
--- 
a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java
@@ -32,6 +32,8 @@ import org.apache.camel.spi.AsyncProcessorAwaitManager;
 public class AsyncProcessorAwaitManagerInterruptTest extends 
ContextTestSupport {
 
     public void testAsyncAwaitInterrupt() throws Exception {
+        
context.getAsyncProcessorAwaitManager().getStatistics().setStatisticsEnabled(true);
+
         assertEquals(0, context.getAsyncProcessorAwaitManager().size());
 
         getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
@@ -49,6 +51,8 @@ public class AsyncProcessorAwaitManagerInterruptTest extends 
ContextTestSupport
         assertMockEndpointsSatisfied();
 
         assertEquals(0, context.getAsyncProcessorAwaitManager().size());
+        assertEquals(1, 
context.getAsyncProcessorAwaitManager().getStatistics().getThreadsBlocked());
+        assertEquals(1, 
context.getAsyncProcessorAwaitManager().getStatistics().getThreadsInterrupted());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/9ab0bd5c/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java
index 8389069..1fc4635 100644
--- 
a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java
@@ -30,6 +30,8 @@ import org.apache.camel.spi.AsyncProcessorAwaitManager;
 public class AsyncProcessorAwaitManagerTest extends ContextTestSupport {
 
     public void testAsyncAwait() throws Exception {
+        
context.getAsyncProcessorAwaitManager().getStatistics().setStatisticsEnabled(true);
+
         assertEquals(0, context.getAsyncProcessorAwaitManager().size());
 
         getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
@@ -42,6 +44,8 @@ public class AsyncProcessorAwaitManagerTest extends 
ContextTestSupport {
         assertMockEndpointsSatisfied();
 
         assertEquals(0, context.getAsyncProcessorAwaitManager().size());
+        assertEquals(1, 
context.getAsyncProcessorAwaitManager().getStatistics().getThreadsBlocked());
+        assertEquals(0, 
context.getAsyncProcessorAwaitManager().getStatistics().getThreadsInterrupted());
     }
 
     @Override

Reply via email to