CAMEL-8223: Inflight repository to allow browsing of current inflight exchanges


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3b943187
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3b943187
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3b943187

Branch: refs/heads/master
Commit: 3b94318773a9ed9705e1a9819273bb65c068649c
Parents: 4ec817a
Author: Claus Ibsen <davscl...@apache.org>
Authored: Fri Jan 9 09:53:39 2015 +0100
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Fri Jan 9 13:41:46 2015 +0100

----------------------------------------------------------------------
 .../camel/impl/DefaultInflightRepository.java   | 110 +++++++++++++++++--
 .../camel/impl/TimeoutInflightRepository.java   |  11 +-
 .../apache/camel/spi/InflightRepository.java    |  43 ++++++++
 .../impl/InflightRepositoryBrowseTest.java      |  66 +++++++++++
 4 files changed, 222 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/3b943187/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java 
b/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
index 4408fde..f992042 100644
--- 
a/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
+++ 
b/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
@@ -16,34 +16,40 @@
  */
 package org.apache.camel.impl;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.MessageHistory;
 import org.apache.camel.spi.InflightRepository;
 import org.apache.camel.support.ServiceSupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Default implement which just uses a counter
+ * Default {@link org.apache.camel.spi.InflightRepository}.
  *
  * @version 
  */
-public class DefaultInflightRepository extends ServiceSupport implements 
InflightRepository  {
+public class DefaultInflightRepository extends ServiceSupport implements 
InflightRepository {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(DefaultInflightRepository.class);
-    private final AtomicInteger totalCount = new AtomicInteger();
+    private final ConcurrentMap<String, Exchange> inflight = new 
ConcurrentHashMap<String, Exchange>();
     private final ConcurrentMap<String, AtomicInteger> routeCount = new 
ConcurrentHashMap<String, AtomicInteger>();
 
     public void add(Exchange exchange) {
-        totalCount.incrementAndGet();
+        inflight.put(exchange.getExchangeId(), exchange);
     }
 
     public void remove(Exchange exchange) {
-        totalCount.decrementAndGet();
+        inflight.remove(exchange.getExchangeId());
     }
 
     public void add(Exchange exchange, String routeId) {
@@ -61,7 +67,7 @@ public class DefaultInflightRepository extends ServiceSupport 
implements Infligh
     }
 
     public int size() {
-        return totalCount.get();
+        return inflight.size();
     }
 
     @Deprecated
@@ -77,7 +83,16 @@ public class DefaultInflightRepository extends 
ServiceSupport implements Infligh
     @Override
     public int size(String routeId) {
         AtomicInteger existing = routeCount.get(routeId);
-        return existing != null ? existing.get() : 0; 
+        return existing != null ? existing.get() : 0;
+    }
+
+    @Override
+    public Collection<InflightExchange> browse() {
+        List<InflightExchange> answer = new ArrayList<InflightExchange>();
+        for (Exchange exchange : inflight.values()) {
+            answer.add(new InflightExchangeEntry(exchange));
+        }
+        return Collections.unmodifiableCollection(answer);
     }
 
     @Override
@@ -94,4 +109,85 @@ public class DefaultInflightRepository extends 
ServiceSupport implements Infligh
         }
         routeCount.clear();
     }
+
+    private static final class InflightExchangeEntry implements 
InflightExchange {
+
+        private final Exchange exchange;
+
+        private InflightExchangeEntry(Exchange exchange) {
+            this.exchange = exchange;
+        }
+
+        @Override
+        public Exchange getExchange() {
+            return exchange;
+        }
+
+        @Override
+        public long getDuration() {
+            long duration = 0;
+            Date created = exchange.getProperty(Exchange.CREATED_TIMESTAMP, 
Date.class);
+            if (created != null) {
+                duration = System.currentTimeMillis() - created.getTime();
+            }
+            return duration;
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public long getElapsed() {
+            List<MessageHistory> list = 
exchange.getProperty(Exchange.MESSAGE_HISTORY, List.class);
+            if (list == null || list.isEmpty()) {
+                return 0;
+            }
+
+            // get latest entry
+            MessageHistory history = list.get(list.size() - 1);
+            if (history != null) {
+                return history.getElapsed();
+            } else {
+                return 0;
+            }
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public String getNodeId() {
+            List<MessageHistory> list = 
exchange.getProperty(Exchange.MESSAGE_HISTORY, List.class);
+            if (list == null || list.isEmpty()) {
+                return null;
+            }
+
+            // get latest entry
+            MessageHistory history = list.get(list.size() - 1);
+            if (history != null) {
+                return history.getNode().getId();
+            } else {
+                return null;
+            }
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public String getRouteId() {
+            List<MessageHistory> list = 
exchange.getProperty(Exchange.MESSAGE_HISTORY, List.class);
+            if (list == null || list.isEmpty()) {
+                return null;
+            }
+
+            // get latest entry
+            MessageHistory history = list.get(list.size() - 1);
+            if (history != null) {
+                return history.getRouteId();
+            } else {
+                return null;
+            }
+        }
+
+        @Override
+        public String toString() {
+            return "InflightExchangeEntry[exchangeId=" + 
exchange.getExchangeId() + "]";
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/3b943187/camel-core/src/main/java/org/apache/camel/impl/TimeoutInflightRepository.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/TimeoutInflightRepository.java 
b/camel-core/src/main/java/org/apache/camel/impl/TimeoutInflightRepository.java
index 64a58c4..85290f6 100644
--- 
a/camel-core/src/main/java/org/apache/camel/impl/TimeoutInflightRepository.java
+++ 
b/camel-core/src/main/java/org/apache/camel/impl/TimeoutInflightRepository.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.impl;
 
+import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -35,6 +36,9 @@ import org.slf4j.LoggerFactory;
  * Please use CamelContext.startService(repository) to start the service 
before set it to the CamelContext; 
  */
 public class TimeoutInflightRepository extends ServiceSupport implements 
InflightRepository {
+
+    // TODO: rework this a bit and likely add support for this to the default 
inflight repository
+
     private static final Logger LOG = 
LoggerFactory.getLogger(TimeoutInflightRepository.class);
     private static final String INFLIGHT_TIME_STAMP = "CamelInflightTimeStamp";
     private static final String TIMEOUT_EXCHANGE_PROCESSED = 
"CamelTimeoutExchangeProcessed";
@@ -124,7 +128,12 @@ public class TimeoutInflightRepository extends 
ServiceSupport implements Infligh
         // do nothing here
         return 0;
     }
-    
+
+    @Override
+    public Collection<InflightExchange> browse() {
+        return null;
+    }
+
     public long getWaitTime() {
         return waitTime;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/3b943187/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java 
b/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java
index cf22cfa..cd5391d 100644
--- a/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java
+++ b/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.spi;
 
+import java.util.Collection;
+
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.StaticService;
@@ -28,6 +30,42 @@ import org.apache.camel.StaticService;
 public interface InflightRepository extends StaticService {
 
     /**
+     * Information about the inflight exchange.
+     */
+    interface InflightExchange {
+
+        /**
+         * The exchange being inflight
+         */
+        Exchange getExchange();
+
+        /**
+         * The duration in millis the exchange has been inflight
+         */
+        long getDuration();
+
+        /**
+         * The elapsed time in millis processing the exchange at the current 
node
+         */
+        long getElapsed();
+
+        /**
+         * The id of the node from the route where the exchange currently is 
being processed
+         * <p/>
+         * Is <tt>null</tt> if message history is disabled.
+         */
+        String getNodeId();
+
+        /**
+         * The id of the route where the exchange currently is being processed
+         * <p/>
+         * Is <tt>null</tt> if message history is disabled.
+         */
+        String getRouteId();
+
+    }
+
+    /**
      * Adds the exchange to the inflight registry to the total counter
      *
      * @param exchange  the exchange
@@ -92,4 +130,9 @@ public interface InflightRepository extends StaticService {
      */
     int size(String routeId);
 
+    /**
+     * A <i>read-only</i> browser of the {@link InflightExchange}s that are 
currently inflight.
+     */
+    Collection<InflightExchange> browse();
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/3b943187/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryBrowseTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryBrowseTest.java
 
b/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryBrowseTest.java
new file mode 100644
index 0000000..76b65cf
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryBrowseTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.util.Collection;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spi.InflightRepository;
+
+/**
+ * @version
+ */
+public class InflightRepositoryBrowseTest extends ContextTestSupport {
+
+    public void testInflight() throws Exception {
+        assertEquals(0, context.getInflightRepository().browse().size());
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertEquals(0, context.getInflightRepository().browse().size());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").routeId("foo")
+                        .to("mock:a")
+                        .process(new Processor() {
+                            @Override
+                            public void process(Exchange exchange) throws 
Exception {
+                                
Collection<InflightRepository.InflightExchange> list = 
context.getInflightRepository().browse();
+                                assertEquals(1, list.size());
+
+                                InflightRepository.InflightExchange inflight = 
list.iterator().next();
+                                assertNotNull(inflight);
+
+                                assertEquals(exchange, inflight.getExchange());
+                                assertEquals("foo", inflight.getRouteId());
+                                assertEquals("myProcessor", 
inflight.getNodeId());
+                            }
+                        }).id("myProcessor")
+                        .to("mock:result");
+            }
+        };
+    }
+
+}

Reply via email to