CAMEL-8252: Add inflight exchanges to jmx performance counters as that is similar to the other stats, instead of having to filter all inflights to find per processor.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e0581024 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e0581024 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e0581024 Branch: refs/heads/master Commit: e0581024441efc99cfaf4e1aa59e6c94f38f078c Parents: bc2a1d0 Author: Claus Ibsen <davscl...@apache.org> Authored: Fri Jan 16 17:44:44 2015 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Jan 16 17:44:44 2015 +0100 ---------------------------------------------------------------------- .../api/management/PerformanceCounter.java | 7 +++++ .../mbean/ManagedCamelContextMBean.java | 3 ++ .../mbean/ManagedPerformanceCounterMBean.java | 3 ++ .../api/management/mbean/ManagedRouteMBean.java | 4 +++ .../management/CompositePerformanceCounter.java | 10 ++++++ .../management/DelegatePerformanceCounter.java | 10 +++++- .../management/InstrumentationProcessor.java | 9 ++++++ .../management/mbean/ManagedCamelContext.java | 32 ++------------------ .../mbean/ManagedPerformanceCounter.java | 11 +++++++ .../camel/management/mbean/ManagedRoute.java | 2 +- .../camel/management/mbean/Statistic.java | 4 +++ .../camel/processor/CamelInternalProcessor.java | 10 +++++- 12 files changed, 73 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/e0581024/camel-core/src/main/java/org/apache/camel/api/management/PerformanceCounter.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/api/management/PerformanceCounter.java b/camel-core/src/main/java/org/apache/camel/api/management/PerformanceCounter.java index d1779d8..91d25e1 100644 --- a/camel-core/src/main/java/org/apache/camel/api/management/PerformanceCounter.java +++ b/camel-core/src/main/java/org/apache/camel/api/management/PerformanceCounter.java @@ -26,6 +26,13 @@ import org.apache.camel.Exchange; public interface PerformanceCounter { /** + * Executed when an {@link org.apache.camel.Exchange} is about to be processed. + * + * @param exchange the exchange + */ + void processExchange(Exchange exchange); + + /** * Executed when an {@link org.apache.camel.Exchange} is complete. * * @param exchange the exchange http://git-wip-us.apache.org/repos/asf/camel/blob/e0581024/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedCamelContextMBean.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedCamelContextMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedCamelContextMBean.java index aa4efeb..c803cdf 100644 --- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedCamelContextMBean.java +++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedCamelContextMBean.java @@ -81,6 +81,9 @@ public interface ManagedCamelContextMBean extends ManagedPerformanceCounterMBean @ManagedAttribute(description = "Tracing") void setTracing(Boolean tracing); + /** + * @deprecated use {@link #getExchangesInflight()} + */ @ManagedAttribute(description = "Current number of inflight Exchanges") Integer getInflightExchanges(); http://git-wip-us.apache.org/repos/asf/camel/blob/e0581024/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedPerformanceCounterMBean.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedPerformanceCounterMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedPerformanceCounterMBean.java index 5b83614..613f3b5 100644 --- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedPerformanceCounterMBean.java +++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedPerformanceCounterMBean.java @@ -29,6 +29,9 @@ public interface ManagedPerformanceCounterMBean extends ManagedCounterMBean { @ManagedAttribute(description = "Number of failed exchanges") long getExchangesFailed() throws Exception; + @ManagedAttribute(description = "Number of inflight exchanges") + long getExchangesInflight() throws Exception; + @ManagedAttribute(description = "Number of failures handled") long getFailuresHandled() throws Exception; http://git-wip-us.apache.org/repos/asf/camel/blob/e0581024/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java index f577320..148c688 100644 --- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java +++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java @@ -33,7 +33,11 @@ public interface ManagedRouteMBean extends ManagedPerformanceCounterMBean { @ManagedAttribute(description = "Route State") String getState(); + /** + * @deprecated use {@link #getExchangesInflight()} + */ @ManagedAttribute(description = "Current number of inflight Exchanges") + @Deprecated Integer getInflightExchanges(); @ManagedAttribute(description = "Camel ID") http://git-wip-us.apache.org/repos/asf/camel/blob/e0581024/camel-core/src/main/java/org/apache/camel/management/CompositePerformanceCounter.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/CompositePerformanceCounter.java b/camel-core/src/main/java/org/apache/camel/management/CompositePerformanceCounter.java index 49067a2..2e93b1c 100644 --- a/camel-core/src/main/java/org/apache/camel/management/CompositePerformanceCounter.java +++ b/camel-core/src/main/java/org/apache/camel/management/CompositePerformanceCounter.java @@ -37,6 +37,16 @@ public class CompositePerformanceCounter implements PerformanceCounter { } @Override + public void processExchange(Exchange exchange) { + if (counter1.isStatisticsEnabled()) { + counter1.processExchange(exchange); + } + if (counter2.isStatisticsEnabled()) { + counter2.processExchange(exchange); + } + } + + @Override public void completedExchange(Exchange exchange, long time) { if (counter1.isStatisticsEnabled()) { counter1.completedExchange(exchange, time); http://git-wip-us.apache.org/repos/asf/camel/blob/e0581024/camel-core/src/main/java/org/apache/camel/management/DelegatePerformanceCounter.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/DelegatePerformanceCounter.java b/camel-core/src/main/java/org/apache/camel/management/DelegatePerformanceCounter.java index ef84417..0a0dbff 100644 --- a/camel-core/src/main/java/org/apache/camel/management/DelegatePerformanceCounter.java +++ b/camel-core/src/main/java/org/apache/camel/management/DelegatePerformanceCounter.java @@ -47,8 +47,16 @@ public class DelegatePerformanceCounter implements PerformanceCounter { this.counter.setStatisticsEnabled(statisticsEnabled); } + public void processExchange(Exchange exchange) { + if (counter != null) { + counter.processExchange(exchange); + } + } + public void completedExchange(Exchange exchange, long time) { - counter.completedExchange(exchange, time); + if (counter != null) { + counter.completedExchange(exchange, time); + } } public void failedExchange(Exchange exchange) { http://git-wip-us.apache.org/repos/asf/camel/blob/e0581024/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java b/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java index ab6859f..4c4d7dc 100644 --- a/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java @@ -69,6 +69,11 @@ public class InstrumentationProcessor extends DelegateAsyncProcessor { // only record time if stats is enabled final StopWatch watch = (counter != null && counter.isStatisticsEnabled()) ? new StopWatch() : null; + // mark beginning to process the exchange + if (watch != null) { + beginTime(exchange); + } + return processor.process(exchange, new AsyncCallback() { public void done(boolean doneSync) { try { @@ -89,6 +94,10 @@ public class InstrumentationProcessor extends DelegateAsyncProcessor { }); } + protected void beginTime(Exchange exchange) { + counter.processExchange(exchange); + } + protected void recordTime(Exchange exchange, long duration) { if (LOG.isTraceEnabled()) { LOG.trace("{}Recording duration: {} millis for exchange: {}", new Object[]{type != null ? type + ": " : "", duration, exchange}); http://git-wip-us.apache.org/repos/asf/camel/blob/e0581024/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java index be1a69f..c74d391 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java @@ -57,7 +57,6 @@ import org.apache.camel.model.RouteDefinition; import org.apache.camel.model.RoutesDefinition; import org.apache.camel.model.rest.RestDefinition; import org.apache.camel.model.rest.RestsDefinition; -import org.apache.camel.spi.InflightRepository; import org.apache.camel.util.CamelContextHelper; import org.apache.camel.util.JsonSchemaHelper; import org.apache.camel.util.ObjectHelper; @@ -140,7 +139,7 @@ public class ManagedCamelContext extends ManagedPerformanceCounter implements Ti } public Integer getInflightExchanges() { - return context.getInflightRepository().size(); + return (int) super.getExchangesInflight(); } public Integer getTotalRoutes() { @@ -398,11 +397,6 @@ public class ManagedCamelContext extends ManagedPerformanceCounter implements Ti } Collections.sort(processors, new OrderProcessorMBeans()); - Collection<InflightRepository.InflightExchange> inflights = null; - if (fullStats) { - inflights = context.getInflightRepository().browse(); - } - // loop the routes, and append the processor stats if needed sb.append(" <routeStats>\n"); for (ObjectName on : routes) { @@ -410,7 +404,7 @@ public class ManagedCamelContext extends ManagedPerformanceCounter implements Ti sb.append(" <routeStat").append(String.format(" id=\"%s\" state=\"%s\"", route.getRouteId(), route.getState())); // use substring as we only want the attributes stat = route.dumpStatsAsXml(fullStats); - sb.append(" exchangesInflight=\"").append(context.getInflightRepository().size(route.getRouteId())).append("\""); + sb.append(" exchangesInflight=\"").append(route.getExchangesInflight()).append("\""); sb.append(" ").append(stat.substring(7, stat.length() - 2)).append(">\n"); // add processor details if needed @@ -422,10 +416,7 @@ public class ManagedCamelContext extends ManagedPerformanceCounter implements Ti sb.append(" <processorStat").append(String.format(" id=\"%s\" index=\"%s\" state=\"%s\"", processor.getProcessorId(), processor.getIndex(), processor.getState())); // use substring as we only want the attributes stat = processor.dumpStatsAsXml(fullStats); - if (fullStats) { - // only include this in full stats as it may be more expensive to compute - sb.append(" exchangesInflight=\"").append(inflightSizeAtProcessor(inflights, processor.getProcessorId())).append("\""); - } + sb.append(" exchangesInflight=\"").append(processor.getExchangesInflight()).append("\""); sb.append(" ").append(stat.substring(7)).append("\n"); } } @@ -440,23 +431,6 @@ public class ManagedCamelContext extends ManagedPerformanceCounter implements Ti return sb.toString(); } - /** - * Number of inflight exchanges at the given processor - * - * @param inflights all inflight exchanges - * @param processorId the processor id - * @return the number - */ - private String inflightSizeAtProcessor(Collection<InflightRepository.InflightExchange> inflights, String processorId) { - int count = 0; - for (InflightRepository.InflightExchange inflight : inflights) { - if (processorId.equals(inflight.getNodeId())) { - count++; - } - } - return String.valueOf(count); - } - public boolean createEndpoint(String uri) throws Exception { if (context.hasEndpoint(uri) != null) { // endpoint already exists http://git-wip-us.apache.org/repos/asf/camel/blob/e0581024/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedPerformanceCounter.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedPerformanceCounter.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedPerformanceCounter.java index 6aedc57..07c7f95 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedPerformanceCounter.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedPerformanceCounter.java @@ -33,6 +33,7 @@ public abstract class ManagedPerformanceCounter extends ManagedCounter implement private Statistic exchangesCompleted; private Statistic exchangesFailed; + private Statistic exchangesInflight; private Statistic failuresHandled; private Statistic redeliveries; private Statistic externalRedeliveries; @@ -56,6 +57,7 @@ public abstract class ManagedPerformanceCounter extends ManagedCounter implement super.init(strategy); this.exchangesCompleted = new Statistic("org.apache.camel.exchangesCompleted", this, Statistic.UpdateMode.COUNTER); this.exchangesFailed = new Statistic("org.apache.camel.exchangesFailed", this, Statistic.UpdateMode.COUNTER); + this.exchangesInflight = new Statistic("org.apache.camel.exchangesInflight", this, Statistic.UpdateMode.COUNTER); this.failuresHandled = new Statistic("org.apache.camel.failuresHandled", this, Statistic.UpdateMode.COUNTER); this.redeliveries = new Statistic("org.apache.camel.redeliveries", this, Statistic.UpdateMode.COUNTER); @@ -79,6 +81,7 @@ public abstract class ManagedPerformanceCounter extends ManagedCounter implement super.reset(); exchangesCompleted.reset(); exchangesFailed.reset(); + exchangesInflight.reset(); failuresHandled.reset(); redeliveries.reset(); externalRedeliveries.reset(); @@ -106,6 +109,8 @@ public abstract class ManagedPerformanceCounter extends ManagedCounter implement return exchangesFailed.getValue(); } + public long getExchangesInflight() { return exchangesInflight.getValue(); } + public long getFailuresHandled() throws Exception { return failuresHandled.getValue(); } @@ -186,9 +191,14 @@ public abstract class ManagedPerformanceCounter extends ManagedCounter implement this.statisticsEnabled = statisticsEnabled; } + public synchronized void processExchange(Exchange exchange) { + exchangesInflight.increment(); + } + public synchronized void completedExchange(Exchange exchange, long time) { increment(); exchangesCompleted.increment(); + exchangesInflight.decrement(); if (ExchangeHelper.isFailureHandled(exchange)) { failuresHandled.increment(); @@ -224,6 +234,7 @@ public abstract class ManagedPerformanceCounter extends ManagedCounter implement public synchronized void failedExchange(Exchange exchange) { increment(); exchangesFailed.increment(); + exchangesInflight.decrement(); if (ExchangeHelper.isRedelivered(exchange)) { redeliveries.increment(); http://git-wip-us.apache.org/repos/asf/camel/blob/e0581024/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java index a0bdfb1..3b57332 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java @@ -101,7 +101,7 @@ public class ManagedRoute extends ManagedPerformanceCounter implements TimerList } public Integer getInflightExchanges() { - return context.getInflightRepository().size(route.getId()); + return (int) super.getExchangesInflight(); } public String getCamelId() { http://git-wip-us.apache.org/repos/asf/camel/blob/e0581024/camel-core/src/main/java/org/apache/camel/management/mbean/Statistic.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/Statistic.java b/camel-core/src/main/java/org/apache/camel/management/mbean/Statistic.java index 1b4675f..2f00bf3 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/Statistic.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/Statistic.java @@ -101,6 +101,10 @@ public class Statistic { updateValue(1); } + public synchronized void decrement() { + updateValue(-1); + } + public synchronized long getValue() { if (updateMode == UpdateMode.DELTA) { if (updateCount == 0) { http://git-wip-us.apache.org/repos/asf/camel/blob/e0581024/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java index 099af1b..97f557b 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java @@ -344,6 +344,10 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor { } } + protected void beginTime(Exchange exchange) { + counter.processExchange(exchange); + } + protected void recordTime(Exchange exchange, long duration) { if (LOG.isTraceEnabled()) { LOG.trace("{}Recording duration: {} millis for exchange: {}", new Object[]{type != null ? type + ": " : "", duration, exchange}); @@ -367,7 +371,11 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor { @Override public StopWatch before(Exchange exchange) throws Exception { // only record time if stats is enabled - return (counter != null && counter.isStatisticsEnabled()) ? new StopWatch() : null; + StopWatch answer = counter != null && counter.isStatisticsEnabled() ? new StopWatch() : null; + if (answer != null) { + beginTime(exchange); + } + return answer; } @Override