CAMEL-8252: Add inflight exchanges to jmx performance counters as that is 
similar to the other stats, instead of having to filter all inflights to find 
per processor.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e0581024
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e0581024
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e0581024

Branch: refs/heads/master
Commit: e0581024441efc99cfaf4e1aa59e6c94f38f078c
Parents: bc2a1d0
Author: Claus Ibsen <davscl...@apache.org>
Authored: Fri Jan 16 17:44:44 2015 +0100
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Fri Jan 16 17:44:44 2015 +0100

----------------------------------------------------------------------
 .../api/management/PerformanceCounter.java      |  7 +++++
 .../mbean/ManagedCamelContextMBean.java         |  3 ++
 .../mbean/ManagedPerformanceCounterMBean.java   |  3 ++
 .../api/management/mbean/ManagedRouteMBean.java |  4 +++
 .../management/CompositePerformanceCounter.java | 10 ++++++
 .../management/DelegatePerformanceCounter.java  | 10 +++++-
 .../management/InstrumentationProcessor.java    |  9 ++++++
 .../management/mbean/ManagedCamelContext.java   | 32 ++------------------
 .../mbean/ManagedPerformanceCounter.java        | 11 +++++++
 .../camel/management/mbean/ManagedRoute.java    |  2 +-
 .../camel/management/mbean/Statistic.java       |  4 +++
 .../camel/processor/CamelInternalProcessor.java | 10 +++++-
 12 files changed, 73 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/e0581024/camel-core/src/main/java/org/apache/camel/api/management/PerformanceCounter.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/api/management/PerformanceCounter.java
 
b/camel-core/src/main/java/org/apache/camel/api/management/PerformanceCounter.java
index d1779d8..91d25e1 100644
--- 
a/camel-core/src/main/java/org/apache/camel/api/management/PerformanceCounter.java
+++ 
b/camel-core/src/main/java/org/apache/camel/api/management/PerformanceCounter.java
@@ -26,6 +26,13 @@ import org.apache.camel.Exchange;
 public interface PerformanceCounter {
 
     /**
+     * Executed when an {@link org.apache.camel.Exchange} is about to be 
processed.
+     *
+     * @param exchange the exchange
+     */
+    void processExchange(Exchange exchange);
+
+    /**
      * Executed when an {@link org.apache.camel.Exchange} is complete.
      *
      * @param exchange the exchange

http://git-wip-us.apache.org/repos/asf/camel/blob/e0581024/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedCamelContextMBean.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedCamelContextMBean.java
 
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedCamelContextMBean.java
index aa4efeb..c803cdf 100644
--- 
a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedCamelContextMBean.java
+++ 
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedCamelContextMBean.java
@@ -81,6 +81,9 @@ public interface ManagedCamelContextMBean extends 
ManagedPerformanceCounterMBean
     @ManagedAttribute(description = "Tracing")
     void setTracing(Boolean tracing);
 
+    /**
+     * @deprecated use {@link #getExchangesInflight()}
+     */
     @ManagedAttribute(description = "Current number of inflight Exchanges")
     Integer getInflightExchanges();
 

http://git-wip-us.apache.org/repos/asf/camel/blob/e0581024/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedPerformanceCounterMBean.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedPerformanceCounterMBean.java
 
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedPerformanceCounterMBean.java
index 5b83614..613f3b5 100644
--- 
a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedPerformanceCounterMBean.java
+++ 
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedPerformanceCounterMBean.java
@@ -29,6 +29,9 @@ public interface ManagedPerformanceCounterMBean extends 
ManagedCounterMBean {
     @ManagedAttribute(description = "Number of failed exchanges")
     long getExchangesFailed() throws Exception;
 
+    @ManagedAttribute(description = "Number of inflight exchanges")
+    long getExchangesInflight() throws Exception;
+
     @ManagedAttribute(description = "Number of failures handled")
     long getFailuresHandled() throws Exception;
 

http://git-wip-us.apache.org/repos/asf/camel/blob/e0581024/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java
 
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java
index f577320..148c688 100644
--- 
a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java
+++ 
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java
@@ -33,7 +33,11 @@ public interface ManagedRouteMBean extends 
ManagedPerformanceCounterMBean {
     @ManagedAttribute(description = "Route State")
     String getState();
 
+    /**
+     * @deprecated use {@link #getExchangesInflight()}
+     */
     @ManagedAttribute(description = "Current number of inflight Exchanges")
+    @Deprecated
     Integer getInflightExchanges();
 
     @ManagedAttribute(description = "Camel ID")

http://git-wip-us.apache.org/repos/asf/camel/blob/e0581024/camel-core/src/main/java/org/apache/camel/management/CompositePerformanceCounter.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/management/CompositePerformanceCounter.java
 
b/camel-core/src/main/java/org/apache/camel/management/CompositePerformanceCounter.java
index 49067a2..2e93b1c 100644
--- 
a/camel-core/src/main/java/org/apache/camel/management/CompositePerformanceCounter.java
+++ 
b/camel-core/src/main/java/org/apache/camel/management/CompositePerformanceCounter.java
@@ -37,6 +37,16 @@ public class CompositePerformanceCounter implements 
PerformanceCounter {
     }
 
     @Override
+    public void processExchange(Exchange exchange) {
+        if (counter1.isStatisticsEnabled()) {
+            counter1.processExchange(exchange);
+        }
+        if (counter2.isStatisticsEnabled()) {
+            counter2.processExchange(exchange);
+        }
+    }
+
+    @Override
     public void completedExchange(Exchange exchange, long time) {
         if (counter1.isStatisticsEnabled()) {
             counter1.completedExchange(exchange, time);

http://git-wip-us.apache.org/repos/asf/camel/blob/e0581024/camel-core/src/main/java/org/apache/camel/management/DelegatePerformanceCounter.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/management/DelegatePerformanceCounter.java
 
b/camel-core/src/main/java/org/apache/camel/management/DelegatePerformanceCounter.java
index ef84417..0a0dbff 100644
--- 
a/camel-core/src/main/java/org/apache/camel/management/DelegatePerformanceCounter.java
+++ 
b/camel-core/src/main/java/org/apache/camel/management/DelegatePerformanceCounter.java
@@ -47,8 +47,16 @@ public class DelegatePerformanceCounter implements 
PerformanceCounter {
         this.counter.setStatisticsEnabled(statisticsEnabled);
     }
 
+    public void processExchange(Exchange exchange) {
+        if (counter != null) {
+            counter.processExchange(exchange);
+        }
+    }
+
     public void completedExchange(Exchange exchange, long time) {
-        counter.completedExchange(exchange, time);
+        if (counter != null) {
+            counter.completedExchange(exchange, time);
+        }
     }
 
     public void failedExchange(Exchange exchange) {

http://git-wip-us.apache.org/repos/asf/camel/blob/e0581024/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
 
b/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
index ab6859f..4c4d7dc 100644
--- 
a/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
+++ 
b/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
@@ -69,6 +69,11 @@ public class InstrumentationProcessor extends 
DelegateAsyncProcessor {
         // only record time if stats is enabled
         final StopWatch watch = (counter != null && 
counter.isStatisticsEnabled()) ? new StopWatch() : null;
 
+        // mark beginning to process the exchange
+        if (watch != null) {
+            beginTime(exchange);
+        }
+
         return processor.process(exchange, new AsyncCallback() {
             public void done(boolean doneSync) {
                 try {
@@ -89,6 +94,10 @@ public class InstrumentationProcessor extends 
DelegateAsyncProcessor {
         });
     }
 
+    protected void beginTime(Exchange exchange) {
+        counter.processExchange(exchange);
+    }
+
     protected void recordTime(Exchange exchange, long duration) {
         if (LOG.isTraceEnabled()) {
             LOG.trace("{}Recording duration: {} millis for exchange: {}", new 
Object[]{type != null ? type + ": " : "", duration, exchange});

http://git-wip-us.apache.org/repos/asf/camel/blob/e0581024/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java
 
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java
index be1a69f..c74d391 100644
--- 
a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java
+++ 
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java
@@ -57,7 +57,6 @@ import org.apache.camel.model.RouteDefinition;
 import org.apache.camel.model.RoutesDefinition;
 import org.apache.camel.model.rest.RestDefinition;
 import org.apache.camel.model.rest.RestsDefinition;
-import org.apache.camel.spi.InflightRepository;
 import org.apache.camel.util.CamelContextHelper;
 import org.apache.camel.util.JsonSchemaHelper;
 import org.apache.camel.util.ObjectHelper;
@@ -140,7 +139,7 @@ public class ManagedCamelContext extends 
ManagedPerformanceCounter implements Ti
     }
 
     public Integer getInflightExchanges() {
-        return context.getInflightRepository().size();
+        return (int) super.getExchangesInflight();
     }
 
     public Integer getTotalRoutes() {
@@ -398,11 +397,6 @@ public class ManagedCamelContext extends 
ManagedPerformanceCounter implements Ti
             }
             Collections.sort(processors, new OrderProcessorMBeans());
 
-            Collection<InflightRepository.InflightExchange> inflights = null;
-            if (fullStats) {
-                inflights = context.getInflightRepository().browse();
-            }
-
             // loop the routes, and append the processor stats if needed
             sb.append("  <routeStats>\n");
             for (ObjectName on : routes) {
@@ -410,7 +404,7 @@ public class ManagedCamelContext extends 
ManagedPerformanceCounter implements Ti
                 sb.append("    <routeStat").append(String.format(" id=\"%s\" 
state=\"%s\"", route.getRouteId(), route.getState()));
                 // use substring as we only want the attributes
                 stat = route.dumpStatsAsXml(fullStats);
-                sb.append(" 
exchangesInflight=\"").append(context.getInflightRepository().size(route.getRouteId())).append("\"");
+                sb.append(" 
exchangesInflight=\"").append(route.getExchangesInflight()).append("\"");
                 sb.append(" ").append(stat.substring(7, stat.length() - 
2)).append(">\n");
 
                 // add processor details if needed
@@ -422,10 +416,7 @@ public class ManagedCamelContext extends 
ManagedPerformanceCounter implements Ti
                             sb.append("        
<processorStat").append(String.format(" id=\"%s\" index=\"%s\" state=\"%s\"", 
processor.getProcessorId(), processor.getIndex(), processor.getState()));
                             // use substring as we only want the attributes
                             stat = processor.dumpStatsAsXml(fullStats);
-                            if (fullStats) {
-                                // only include this in full stats as it may 
be more expensive to compute
-                                sb.append(" 
exchangesInflight=\"").append(inflightSizeAtProcessor(inflights, 
processor.getProcessorId())).append("\"");
-                            }
+                            sb.append(" 
exchangesInflight=\"").append(processor.getExchangesInflight()).append("\"");
                             sb.append(" 
").append(stat.substring(7)).append("\n");
                         }
                     }
@@ -440,23 +431,6 @@ public class ManagedCamelContext extends 
ManagedPerformanceCounter implements Ti
         return sb.toString();
     }
 
-    /**
-     * Number of inflight exchanges at the given processor
-     *
-     * @param inflights   all inflight exchanges
-     * @param processorId the processor id
-     * @return the number
-     */
-    private String 
inflightSizeAtProcessor(Collection<InflightRepository.InflightExchange> 
inflights, String processorId) {
-        int count = 0;
-        for (InflightRepository.InflightExchange inflight : inflights) {
-            if (processorId.equals(inflight.getNodeId())) {
-                count++;
-            }
-        }
-        return String.valueOf(count);
-    }
-
     public boolean createEndpoint(String uri) throws Exception {
         if (context.hasEndpoint(uri) != null) {
             // endpoint already exists

http://git-wip-us.apache.org/repos/asf/camel/blob/e0581024/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedPerformanceCounter.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedPerformanceCounter.java
 
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedPerformanceCounter.java
index 6aedc57..07c7f95 100644
--- 
a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedPerformanceCounter.java
+++ 
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedPerformanceCounter.java
@@ -33,6 +33,7 @@ public abstract class ManagedPerformanceCounter extends 
ManagedCounter implement
 
     private Statistic exchangesCompleted;
     private Statistic exchangesFailed;
+    private Statistic exchangesInflight;
     private Statistic failuresHandled;
     private Statistic redeliveries;
     private Statistic externalRedeliveries;
@@ -56,6 +57,7 @@ public abstract class ManagedPerformanceCounter extends 
ManagedCounter implement
         super.init(strategy);
         this.exchangesCompleted = new 
Statistic("org.apache.camel.exchangesCompleted", this, 
Statistic.UpdateMode.COUNTER);
         this.exchangesFailed = new 
Statistic("org.apache.camel.exchangesFailed", this, 
Statistic.UpdateMode.COUNTER);
+        this.exchangesInflight = new 
Statistic("org.apache.camel.exchangesInflight", this, 
Statistic.UpdateMode.COUNTER);
 
         this.failuresHandled = new 
Statistic("org.apache.camel.failuresHandled", this, 
Statistic.UpdateMode.COUNTER);
         this.redeliveries = new Statistic("org.apache.camel.redeliveries", 
this, Statistic.UpdateMode.COUNTER);
@@ -79,6 +81,7 @@ public abstract class ManagedPerformanceCounter extends 
ManagedCounter implement
         super.reset();
         exchangesCompleted.reset();
         exchangesFailed.reset();
+        exchangesInflight.reset();
         failuresHandled.reset();
         redeliveries.reset();
         externalRedeliveries.reset();
@@ -106,6 +109,8 @@ public abstract class ManagedPerformanceCounter extends 
ManagedCounter implement
         return exchangesFailed.getValue();
     }
 
+    public long getExchangesInflight() { return exchangesInflight.getValue(); }
+
     public long getFailuresHandled() throws Exception {
         return failuresHandled.getValue();
     }
@@ -186,9 +191,14 @@ public abstract class ManagedPerformanceCounter extends 
ManagedCounter implement
         this.statisticsEnabled = statisticsEnabled;
     }
 
+    public synchronized void processExchange(Exchange exchange) {
+        exchangesInflight.increment();
+    }
+
     public synchronized void completedExchange(Exchange exchange, long time) {
         increment();
         exchangesCompleted.increment();
+        exchangesInflight.decrement();
 
         if (ExchangeHelper.isFailureHandled(exchange)) {
             failuresHandled.increment();
@@ -224,6 +234,7 @@ public abstract class ManagedPerformanceCounter extends 
ManagedCounter implement
     public synchronized void failedExchange(Exchange exchange) {
         increment();
         exchangesFailed.increment();
+        exchangesInflight.decrement();
 
         if (ExchangeHelper.isRedelivered(exchange)) {
             redeliveries.increment();

http://git-wip-us.apache.org/repos/asf/camel/blob/e0581024/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java 
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
index a0bdfb1..3b57332 100644
--- 
a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
+++ 
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
@@ -101,7 +101,7 @@ public class ManagedRoute extends ManagedPerformanceCounter 
implements TimerList
     }
 
     public Integer getInflightExchanges() {
-        return context.getInflightRepository().size(route.getId());
+        return (int) super.getExchangesInflight();
     }
 
     public String getCamelId() {

http://git-wip-us.apache.org/repos/asf/camel/blob/e0581024/camel-core/src/main/java/org/apache/camel/management/mbean/Statistic.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/management/mbean/Statistic.java 
b/camel-core/src/main/java/org/apache/camel/management/mbean/Statistic.java
index 1b4675f..2f00bf3 100644
--- a/camel-core/src/main/java/org/apache/camel/management/mbean/Statistic.java
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/Statistic.java
@@ -101,6 +101,10 @@ public class Statistic {
         updateValue(1);
     }
 
+    public synchronized void decrement() {
+        updateValue(-1);
+    }
+
     public synchronized long getValue() {
         if (updateMode == UpdateMode.DELTA) {
             if (updateCount == 0) {

http://git-wip-us.apache.org/repos/asf/camel/blob/e0581024/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
 
b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
index 099af1b..97f557b 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
@@ -344,6 +344,10 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor {
             }
         }
 
+        protected void beginTime(Exchange exchange) {
+            counter.processExchange(exchange);
+        }
+
         protected void recordTime(Exchange exchange, long duration) {
             if (LOG.isTraceEnabled()) {
                 LOG.trace("{}Recording duration: {} millis for exchange: {}", 
new Object[]{type != null ? type + ": " : "", duration, exchange});
@@ -367,7 +371,11 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor {
         @Override
         public StopWatch before(Exchange exchange) throws Exception {
             // only record time if stats is enabled
-            return (counter != null && counter.isStatisticsEnabled()) ? new 
StopWatch() : null;
+            StopWatch answer = counter != null && 
counter.isStatisticsEnabled() ? new StopWatch() : null;
+            if (answer != null) {
+                beginTime(exchange);
+            }
+            return answer;
         }
 
         @Override

Reply via email to