CAMEL-11354: Optimize oldest exchange inflight per route

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ffd97f65
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ffd97f65
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ffd97f65

Branch: refs/heads/master
Commit: ffd97f652c00e2e83bb64fa8aaccd62b2f412a5f
Parents: ce4e8f7
Author: Claus Ibsen <davscl...@apache.org>
Authored: Sun May 28 20:24:19 2017 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Sun May 28 20:24:19 2017 +0200

----------------------------------------------------------------------
 .../camel/impl/DefaultInflightRepository.java   |  3 +-
 .../camel/management/mbean/ManagedRoute.java    | 53 +++++-------
 .../ManagedInflightStatisticsTest.java          | 88 +++++---------------
 3 files changed, 43 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/ffd97f65/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java 
b/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
index 2c6c78b..f176861 100644
--- 
a/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
+++ 
b/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
@@ -128,7 +128,8 @@ public class DefaultInflightRepository extends 
ServiceSupport implements Infligh
                 public int compare(Exchange e1, Exchange e2) {
                     long d1 = getExchangeDuration(e1);
                     long d2 = getExchangeDuration(e2);
-                    return Long.compare(d1, d2);
+                    // need the biggest number first
+                    return -1 * Long.compare(d1, d2);
                 }
             });
         } else {

http://git-wip-us.apache.org/repos/asf/camel/blob/ffd97f65/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java 
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
index 1b031b1..81f4794 100644
--- 
a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
+++ 
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
@@ -19,6 +19,7 @@ package org.apache.camel.management.mbean;
 import java.io.ByteArrayInputStream;
 import java.io.InputStream;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.Date;
@@ -36,6 +37,7 @@ import javax.management.Query;
 import javax.management.QueryExp;
 import javax.management.StringValueExp;
 
+import org.apache.camel.spi.InflightRepository;
 import org.w3c.dom.Document;
 
 import org.apache.camel.CamelContext;
@@ -408,14 +410,13 @@ public class ManagedRoute extends 
ManagedPerformanceCounter implements TimerList
         String stat = dumpStatsAsXml(fullStats);
         answer.append(" 
exchangesInflight=\"").append(getInflightExchanges()).append("\"");
         answer.append(" 
selfProcessingTime=\"").append(routeSelfTime).append("\"");
-        Exchange oldest = getOldestInflightEntry();
+        InflightRepository.InflightExchange oldest = getOldestInflightEntry();
         if (oldest == null) {
             answer.append(" oldestInflightExchangeId=\"\"");
             answer.append(" oldestInflightDuration=\"\"");
         } else {
-            long duration = System.currentTimeMillis() - 
oldest.getCreated().getTime();
-            answer.append(" 
oldestInflightExchangeId=\"").append(oldest.getExchangeId()).append("\"");
-            answer.append(" 
oldestInflightDuration=\"").append(duration).append("\"");
+            answer.append(" 
oldestInflightExchangeId=\"").append(oldest.getExchange().getExchangeId()).append("\"");
+            answer.append(" 
oldestInflightDuration=\"").append(oldest.getDuration()).append("\"");
         }
         answer.append(" ").append(stat.substring(7, stat.length() - 
2)).append(">\n");
 
@@ -465,47 +466,31 @@ public class ManagedRoute extends 
ManagedPerformanceCounter implements TimerList
         return route.hashCode();
     }
 
-    private Exchange getOldestInflightEntry() {
-        return 
exchangesInFlight.values().stream().max(Comparator.comparing(Exchange::getCreated)).orElse(null);
+    private InflightRepository.InflightExchange getOldestInflightEntry() {
+        Collection<InflightRepository.InflightExchange> list = 
getContext().getInflightRepository().browse(getRouteId(), 1, true);
+        if (list.size() == 1) {
+            return list.iterator().next();
+        } else {
+            return null;
+        }
     }
 
     public Long getOldestInflightDuration() {
-        Exchange exchange = getOldestInflightEntry();
-        if (exchange == null) {
+        InflightRepository.InflightExchange oldest = getOldestInflightEntry();
+        if (oldest == null) {
             return null;
-        }
-        Date created = exchange.getCreated();
-        if (created != null) {
-            return System.currentTimeMillis() - created.getTime();
         } else {
-            return null;
+            return oldest.getDuration();
         }
     }
 
     public String getOldestInflightExchangeId() {
-        Exchange exchange = getOldestInflightEntry();
-        if (exchange == null) {
+        InflightRepository.InflightExchange oldest = getOldestInflightEntry();
+        if (oldest == null) {
             return null;
+        } else {
+            return oldest.getExchange().getExchangeId();
         }
-        return exchange.getExchangeId();
-    }
-
-    @Override
-    public void processExchange(Exchange exchange) {
-        exchangesInFlight.put(exchange.getExchangeId(), exchange);
-        super.processExchange(exchange);
-    }
-
-    @Override
-    public void completedExchange(Exchange exchange, long time) {
-        exchangesInFlight.remove(exchange.getExchangeId());
-        super.completedExchange(exchange, time);
-    }
-
-    @Override
-    public void failedExchange(Exchange exchange) {
-        exchangesInFlight.remove(exchange.getExchangeId());
-        super.failedExchange(exchange);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/camel/blob/ffd97f65/camel-core/src/test/java/org/apache/camel/management/ManagedInflightStatisticsTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/management/ManagedInflightStatisticsTest.java
 
b/camel-core/src/test/java/org/apache/camel/management/ManagedInflightStatisticsTest.java
index aa6fbe7..c8fd5ca 100644
--- 
a/camel-core/src/test/java/org/apache/camel/management/ManagedInflightStatisticsTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/management/ManagedInflightStatisticsTest.java
@@ -16,7 +16,10 @@
  */
 package org.apache.camel.management;
 
+import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
@@ -30,7 +33,7 @@ import org.apache.camel.component.mock.MockEndpoint;
  */
 public class ManagedInflightStatisticsTest extends ManagementTestSupport {
 
-    public void testManageStatistics() throws Exception {
+    public void testOldestInflight() throws Exception {
         // JMX tests dont work well on AIX CI servers (hangs them)
         if (isPlatform("aix")) {
             return;
@@ -53,10 +56,13 @@ public class ManagedInflightStatisticsTest extends 
ManagementTestSupport {
         MockEndpoint result = getMockEndpoint("mock:result");
         result.expectedMessageCount(2);
 
+        CountDownLatch latch1 = new CountDownLatch(1);
+        CountDownLatch latch2 = new CountDownLatch(1);
+
         // start some exchanges.
-        template.asyncSendBody("direct:start", 1000L);
+        template.asyncSendBody("direct:start", latch1);
         Thread.sleep(500);
-        template.asyncSendBody("direct:start", 1000L);
+        template.asyncSendBody("direct:start", latch2);
         Thread.sleep(100);
 
         inflight = (Long) mbeanServer.getAttribute(on, "ExchangesInflight");
@@ -67,77 +73,32 @@ public class ManagedInflightStatisticsTest extends 
ManagementTestSupport {
         id = (String) mbeanServer.getAttribute(on, "OldestInflightExchangeId");
         assertNotNull(id);
 
+        log.info("Oldest Exchange id: {}, duration: {}", id, ts);
+
+        // complete first exchange
+        latch1.countDown();
+
         // Lets wait for the first exchange to complete.
-        Thread.sleep(500);
+        Thread.sleep(200);
         Long ts2 = (Long) mbeanServer.getAttribute(on, 
"OldestInflightDuration");
         assertNotNull(ts2);
         String id2 = (String) mbeanServer.getAttribute(on, 
"OldestInflightExchangeId");
         assertNotNull(id2);
 
+        log.info("Oldest Exchange id: {}, duration: {}", id2, ts2);
+
         // Lets verify the oldest changed.
         assertTrue(!id2.equals(id));
         // The duration values could be different
-        //assertTrue(ts2 > ts);
-
-        // Lets wait for all the exchanges to complete.
-        Thread.sleep(500);
-
-        assertMockEndpointsSatisfied();
-
-        inflight = (Long) mbeanServer.getAttribute(on, "ExchangesInflight");
-        assertEquals(0, inflight.longValue());
-        ts = (Long) mbeanServer.getAttribute(on, "OldestInflightDuration");
-        assertNull(ts);
-        id = (String) mbeanServer.getAttribute(on, "OldestInflightExchangeId");
-        assertNull(id);
-    }
-
-    public void testManageStatisticsFailed() throws Exception {
-        // JMX tests dont work well on AIX CI servers (hangs them)
-        if (isPlatform("aix")) {
-            return;
-        }
-
-        // get the stats for the route
-        MBeanServer mbeanServer = getMBeanServer();
+        assertTrue(!Objects.equals(ts2, ts));
 
-        Set<ObjectName> set = mbeanServer.queryNames(new 
ObjectName("*:type=routes,*"), null);
-        assertEquals(1, set.size());
-        ObjectName on = set.iterator().next();
+        latch2.countDown();
 
-        Long inflight = (Long) mbeanServer.getAttribute(on, 
"ExchangesInflight");
-        assertEquals(0, inflight.longValue());
-        Long ts = (Long) mbeanServer.getAttribute(on, 
"OldestInflightDuration");
-        assertNull(ts);
-        String id = (String) mbeanServer.getAttribute(on, 
"OldestInflightExchangeId");
-        assertNull(id);
-
-        MockEndpoint result = getMockEndpoint("mock:result");
-        result.expectedMessageCount(1);
-
-        // start some exchanges.
-        template.asyncSendBody("direct:start", 1000L);
+        // Lets wait for all the exchanges to complete.
         Thread.sleep(500);
-        try {
-            template.sendBody("direct:start", "Kaboom");
-            fail("Should have thrown exception");
-        } catch (Exception e) {
-            // expected
-        }
-
-        inflight = (Long) mbeanServer.getAttribute(on, "ExchangesInflight");
-        assertEquals(1, inflight.longValue());
-
-        ts = (Long) mbeanServer.getAttribute(on, "OldestInflightDuration");
-        assertNotNull(ts);
-        id = (String) mbeanServer.getAttribute(on, "OldestInflightExchangeId");
-        assertNotNull(id);
 
         assertMockEndpointsSatisfied();
 
-        // Lets wait for all the exchanges to complete.
-        Thread.sleep(500);
-
         inflight = (Long) mbeanServer.getAttribute(on, "ExchangesInflight");
         assertEquals(0, inflight.longValue());
         ts = (Long) mbeanServer.getAttribute(on, "OldestInflightDuration");
@@ -155,13 +116,8 @@ public class ManagedInflightStatisticsTest extends 
ManagementTestSupport {
                         .process(new Processor() {
                             @Override
                             public void process(Exchange exchange) throws 
Exception {
-                                String body = 
exchange.getIn().getBody(String.class);
-                                if ("Kaboom".equals(body)) {
-                                    throw new 
IllegalArgumentException("Forced");
-                                }
-
-                                Long delay = (Long) exchange.getIn().getBody();
-                                Thread.sleep(delay.longValue());
+                                CountDownLatch latch = (CountDownLatch) 
exchange.getIn().getBody();
+                                latch.await(10, TimeUnit.SECONDS);
                             }
                         })
                         .to("mock:result").id("mock");

Reply via email to