Repository: camel Updated Branches: refs/heads/master fe80773ee -> 9ab0bd5c6
CAMEL-8165: Async routing engine - Add insight into threads blocked waiting for callbacks Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9ab0bd5c Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9ab0bd5c Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9ab0bd5c Branch: refs/heads/master Commit: 9ab0bd5c677983d7bdec945e76ee6739640c7ff0 Parents: fe80773 Author: Claus Ibsen <davscl...@apache.org> Authored: Sun Dec 21 10:39:08 2014 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sun Dec 21 10:39:08 2014 +0100 ---------------------------------------------------------------------- .../ManagedAsyncProcessorAwaitManagerMBean.java | 27 +++++ .../impl/DefaultAsyncProcessorAwaitManager.java | 102 ++++++++++++++++++- .../ManagedAsyncProcessorAwaitManager.java | 45 ++++++++ .../camel/spi/AsyncProcessorAwaitManager.java | 61 +++++++++++ ...AsyncProcessorAwaitManagerInterruptTest.java | 4 + .../async/AsyncProcessorAwaitManagerTest.java | 4 + 6 files changed, 242 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/9ab0bd5c/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAsyncProcessorAwaitManagerMBean.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAsyncProcessorAwaitManagerMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAsyncProcessorAwaitManagerMBean.java index bb5b669..8683b38 100644 --- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAsyncProcessorAwaitManagerMBean.java +++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAsyncProcessorAwaitManagerMBean.java @@ -38,4 +38,31 @@ public interface ManagedAsyncProcessorAwaitManagerMBean extends ManagedServiceMB @ManagedOperation(description = "To interrupt an exchange which may seem as stuck, to force the exchange to continue, allowing any blocking thread to be released.") void interrupt(String exchangeId); + @ManagedAttribute(description = "Number of threads that has been blocked") + long getThreadsBlocked(); + + @ManagedAttribute(description = "Number of threads that has been interrupted") + long getThreadsInterrupted(); + + @ManagedAttribute(description = "Total wait time in msec.") + long getTotalDuration(); + + @ManagedAttribute(description = "The minimum wait time in msec.") + long getMinDuration(); + + @ManagedAttribute(description = "The maximum wait time in msec.") + long getMaxDuration(); + + @ManagedAttribute(description = "The average wait time in msec.") + long getMeanDuration(); + + @ManagedOperation(description = "Resets the statistics") + void resetStatistics(); + + @ManagedAttribute(description = "Utilization statistics enabled") + boolean isStatisticsEnabled(); + + @ManagedAttribute(description = "Utilization statistics enabled") + void setStatisticsEnabled(boolean statisticsEnabled); + } http://git-wip-us.apache.org/repos/asf/camel/blob/9ab0bd5c/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java index 20c2927..2712178 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicLong; import org.apache.camel.Exchange; import org.apache.camel.MessageHistory; @@ -39,6 +40,14 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements private static final Logger LOG = LoggerFactory.getLogger(DefaultAsyncProcessorAwaitManager.class); + private final AsyncProcessorAwaitManager.Statistics statistics = new UtilizationStatistics(); + private final AtomicLong blockedCounter = new AtomicLong(); + private final AtomicLong interruptedCounter = new AtomicLong(); + private final AtomicLong totalDuration = new AtomicLong(); + private final AtomicLong minDuration = new AtomicLong(); + private final AtomicLong maxDuration = new AtomicLong(); + private final AtomicLong meanDuration = new AtomicLong(); + private final Map<Exchange, AwaitThread> inflight = new ConcurrentHashMap<Exchange, AwaitThread>(); private final ExchangeFormatter exchangeFormatter; private boolean interruptThreadsWhileStopping = true; @@ -58,6 +67,9 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements LOG.trace("Waiting for asynchronous callback before continuing for exchangeId: {} -> {}", exchange.getExchangeId(), exchange); try { + if (statistics.isStatisticsEnabled()) { + blockedCounter.incrementAndGet(); + } inflight.put(exchange, new AwaitThreadEntry(Thread.currentThread(), exchange, latch)); latch.await(); LOG.trace("Asynchronous callback received, will continue routing exchangeId: {} -> {}", @@ -68,7 +80,24 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements exchange.getExchangeId(), exchange); exchange.setException(e); } finally { - inflight.remove(exchange); + AwaitThread thread = inflight.remove(exchange); + + if (statistics.isStatisticsEnabled() && thread != null) { + long time = thread.getWaitDuration(); + long total = totalDuration.get() + time; + totalDuration.set(total); + + if (time < minDuration.get()) { + minDuration.set(time); + } else if (time > maxDuration.get()) { + maxDuration.set(time); + } + + // update mean + long count = blockedCounter.get(); + long mean = count > 0 ? total / count : 0; + meanDuration.set(mean); + } } } @@ -127,6 +156,9 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements } catch (Exception e) { throw ObjectHelper.wrapRuntimeCamelException(e); } finally { + if (statistics.isStatisticsEnabled()) { + interruptedCounter.incrementAndGet(); + } exchange.setException(new RejectedExecutionException("Interrupted while waiting for asynchronous callback for exchangeId: " + exchange.getExchangeId())); entry.getLatch().countDown(); } @@ -141,6 +173,10 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements this.interruptThreadsWhileStopping = interruptThreadsWhileStopping; } + public Statistics getStatistics() { + return statistics; + } + @Override protected void doStart() throws Exception { // noop @@ -258,4 +294,68 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements } } + /** + * Represents utilization statistics + */ + private final class UtilizationStatistics implements AsyncProcessorAwaitManager.Statistics { + + private boolean statisticsEnabled; + + @Override + public long getThreadsBlocked() { + return blockedCounter.get(); + } + + @Override + public long getThreadsInterrupted() { + return interruptedCounter.get(); + } + + @Override + public long getTotalDuration() { + return totalDuration.get(); + } + + @Override + public long getMinDuration() { + return minDuration.get(); + } + + @Override + public long getMaxDuration() { + return maxDuration.get(); + } + + @Override + public long getMeanDuration() { + return meanDuration.get(); + } + + @Override + public void reset() { + blockedCounter.set(0); + interruptedCounter.set(0); + totalDuration.set(0); + minDuration.set(0); + maxDuration.set(0); + meanDuration.set(0); + } + + @Override + public boolean isStatisticsEnabled() { + return statisticsEnabled; + } + + @Override + public void setStatisticsEnabled(boolean statisticsEnabled) { + this.statisticsEnabled = statisticsEnabled; + } + + @Override + public String toString() { + return String.format("AsyncProcessAwaitManager utilization[blocked=%s, interrupted=%s, total=%s min=%s, max=%s, mean=%s]", + getThreadsBlocked(), getThreadsInterrupted(), getTotalDuration(), getMinDuration(), getMaxDuration(), getMeanDuration()); + } + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/9ab0bd5c/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAsyncProcessorAwaitManager.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAsyncProcessorAwaitManager.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAsyncProcessorAwaitManager.java index a4759ef..5659127 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAsyncProcessorAwaitManager.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAsyncProcessorAwaitManager.java @@ -92,4 +92,49 @@ public class ManagedAsyncProcessorAwaitManager extends ManagedService implements manager.interrupt(exchangeId); } + @Override + public long getThreadsBlocked() { + return manager.getStatistics().getThreadsBlocked(); + } + + @Override + public long getThreadsInterrupted() { + return manager.getStatistics().getThreadsInterrupted(); + } + + @Override + public long getTotalDuration() { + return manager.getStatistics().getTotalDuration(); + } + + @Override + public long getMinDuration() { + return manager.getStatistics().getMinDuration(); + } + + @Override + public long getMaxDuration() { + return manager.getStatistics().getMaxDuration(); + } + + @Override + public long getMeanDuration() { + return manager.getStatistics().getMeanDuration(); + } + + @Override + public void resetStatistics() { + manager.getStatistics().reset(); + } + + @Override + public boolean isStatisticsEnabled() { + return manager.getStatistics().isStatisticsEnabled(); + } + + @Override + public void setStatisticsEnabled(boolean statisticsEnabled) { + manager.getStatistics().setStatisticsEnabled(statisticsEnabled); + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/9ab0bd5c/camel-core/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java b/camel-core/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java index 3081449..d4f8bdd 100644 --- a/camel-core/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java +++ b/camel-core/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java @@ -33,6 +33,59 @@ import org.apache.camel.StaticService; public interface AsyncProcessorAwaitManager extends StaticService { /** + * Utilization statistics of the this manager. + */ + interface Statistics { + + /** + * Total number of threads that has been blocked + */ + long getThreadsBlocked(); + + /** + * Total number of threads that has been forced interrupted + */ + long getThreadsInterrupted(); + + /** + * The total duration time in millis. + */ + long getTotalDuration(); + + /** + * The lowest duration time in millis. + */ + long getMinDuration(); + + /** + * The highest duration time in millis. + */ + long getMaxDuration(); + + /** + * The average duration time in millis. + */ + long getMeanDuration(); + + /** + * Reset the counters + */ + void reset(); + + /** + * Whether statistics is enabled. + */ + boolean isStatisticsEnabled(); + + /** + * Sets whether statistics is enabled. + * + * @param statisticsEnabled <tt>true</tt> to enable + */ + void setStatisticsEnabled(boolean statisticsEnabled); + } + + /** * Information about the thread and exchange that are inflight. */ interface AwaitThread { @@ -136,4 +189,12 @@ public interface AsyncProcessorAwaitManager extends StaticService { * This is enabled by default which allows Camel to release any blocked thread during shutting down Camel itself. */ void setInterruptThreadsWhileStopping(boolean interruptThreadsWhileStopping); + + /** + * Gets the utilization statistics of this manager + * + * @return the utilization statistics + */ + Statistics getStatistics(); + } http://git-wip-us.apache.org/repos/asf/camel/blob/9ab0bd5c/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java index 2d97c80..c53aa9e 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java @@ -32,6 +32,8 @@ import org.apache.camel.spi.AsyncProcessorAwaitManager; public class AsyncProcessorAwaitManagerInterruptTest extends ContextTestSupport { public void testAsyncAwaitInterrupt() throws Exception { + context.getAsyncProcessorAwaitManager().getStatistics().setStatisticsEnabled(true); + assertEquals(0, context.getAsyncProcessorAwaitManager().size()); getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel"); @@ -49,6 +51,8 @@ public class AsyncProcessorAwaitManagerInterruptTest extends ContextTestSupport assertMockEndpointsSatisfied(); assertEquals(0, context.getAsyncProcessorAwaitManager().size()); + assertEquals(1, context.getAsyncProcessorAwaitManager().getStatistics().getThreadsBlocked()); + assertEquals(1, context.getAsyncProcessorAwaitManager().getStatistics().getThreadsInterrupted()); } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/9ab0bd5c/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java index 8389069..1fc4635 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java @@ -30,6 +30,8 @@ import org.apache.camel.spi.AsyncProcessorAwaitManager; public class AsyncProcessorAwaitManagerTest extends ContextTestSupport { public void testAsyncAwait() throws Exception { + context.getAsyncProcessorAwaitManager().getStatistics().setStatisticsEnabled(true); + assertEquals(0, context.getAsyncProcessorAwaitManager().size()); getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel"); @@ -42,6 +44,8 @@ public class AsyncProcessorAwaitManagerTest extends ContextTestSupport { assertMockEndpointsSatisfied(); assertEquals(0, context.getAsyncProcessorAwaitManager().size()); + assertEquals(1, context.getAsyncProcessorAwaitManager().getStatistics().getThreadsBlocked()); + assertEquals(0, context.getAsyncProcessorAwaitManager().getStatistics().getThreadsInterrupted()); } @Override