CAMEL-8223: Inflight repository to allow browsing of current inflight exchanges
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3b943187 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3b943187 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3b943187 Branch: refs/heads/master Commit: 3b94318773a9ed9705e1a9819273bb65c068649c Parents: 4ec817a Author: Claus Ibsen <davscl...@apache.org> Authored: Fri Jan 9 09:53:39 2015 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Jan 9 13:41:46 2015 +0100 ---------------------------------------------------------------------- .../camel/impl/DefaultInflightRepository.java | 110 +++++++++++++++++-- .../camel/impl/TimeoutInflightRepository.java | 11 +- .../apache/camel/spi/InflightRepository.java | 43 ++++++++ .../impl/InflightRepositoryBrowseTest.java | 66 +++++++++++ 4 files changed, 222 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/3b943187/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java index 4408fde..f992042 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java @@ -16,34 +16,40 @@ */ package org.apache.camel.impl; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; +import org.apache.camel.MessageHistory; import org.apache.camel.spi.InflightRepository; import org.apache.camel.support.ServiceSupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Default implement which just uses a counter + * Default {@link org.apache.camel.spi.InflightRepository}. * * @version */ -public class DefaultInflightRepository extends ServiceSupport implements InflightRepository { +public class DefaultInflightRepository extends ServiceSupport implements InflightRepository { private static final Logger LOG = LoggerFactory.getLogger(DefaultInflightRepository.class); - private final AtomicInteger totalCount = new AtomicInteger(); + private final ConcurrentMap<String, Exchange> inflight = new ConcurrentHashMap<String, Exchange>(); private final ConcurrentMap<String, AtomicInteger> routeCount = new ConcurrentHashMap<String, AtomicInteger>(); public void add(Exchange exchange) { - totalCount.incrementAndGet(); + inflight.put(exchange.getExchangeId(), exchange); } public void remove(Exchange exchange) { - totalCount.decrementAndGet(); + inflight.remove(exchange.getExchangeId()); } public void add(Exchange exchange, String routeId) { @@ -61,7 +67,7 @@ public class DefaultInflightRepository extends ServiceSupport implements Infligh } public int size() { - return totalCount.get(); + return inflight.size(); } @Deprecated @@ -77,7 +83,16 @@ public class DefaultInflightRepository extends ServiceSupport implements Infligh @Override public int size(String routeId) { AtomicInteger existing = routeCount.get(routeId); - return existing != null ? existing.get() : 0; + return existing != null ? existing.get() : 0; + } + + @Override + public Collection<InflightExchange> browse() { + List<InflightExchange> answer = new ArrayList<InflightExchange>(); + for (Exchange exchange : inflight.values()) { + answer.add(new InflightExchangeEntry(exchange)); + } + return Collections.unmodifiableCollection(answer); } @Override @@ -94,4 +109,85 @@ public class DefaultInflightRepository extends ServiceSupport implements Infligh } routeCount.clear(); } + + private static final class InflightExchangeEntry implements InflightExchange { + + private final Exchange exchange; + + private InflightExchangeEntry(Exchange exchange) { + this.exchange = exchange; + } + + @Override + public Exchange getExchange() { + return exchange; + } + + @Override + public long getDuration() { + long duration = 0; + Date created = exchange.getProperty(Exchange.CREATED_TIMESTAMP, Date.class); + if (created != null) { + duration = System.currentTimeMillis() - created.getTime(); + } + return duration; + } + + @Override + @SuppressWarnings("unchecked") + public long getElapsed() { + List<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, List.class); + if (list == null || list.isEmpty()) { + return 0; + } + + // get latest entry + MessageHistory history = list.get(list.size() - 1); + if (history != null) { + return history.getElapsed(); + } else { + return 0; + } + } + + @Override + @SuppressWarnings("unchecked") + public String getNodeId() { + List<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, List.class); + if (list == null || list.isEmpty()) { + return null; + } + + // get latest entry + MessageHistory history = list.get(list.size() - 1); + if (history != null) { + return history.getNode().getId(); + } else { + return null; + } + } + + @Override + @SuppressWarnings("unchecked") + public String getRouteId() { + List<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, List.class); + if (list == null || list.isEmpty()) { + return null; + } + + // get latest entry + MessageHistory history = list.get(list.size() - 1); + if (history != null) { + return history.getRouteId(); + } else { + return null; + } + } + + @Override + public String toString() { + return "InflightExchangeEntry[exchangeId=" + exchange.getExchangeId() + "]"; + } + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/3b943187/camel-core/src/main/java/org/apache/camel/impl/TimeoutInflightRepository.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/TimeoutInflightRepository.java b/camel-core/src/main/java/org/apache/camel/impl/TimeoutInflightRepository.java index 64a58c4..85290f6 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/TimeoutInflightRepository.java +++ b/camel-core/src/main/java/org/apache/camel/impl/TimeoutInflightRepository.java @@ -16,6 +16,7 @@ */ package org.apache.camel.impl; +import java.util.Collection; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -35,6 +36,9 @@ import org.slf4j.LoggerFactory; * Please use CamelContext.startService(repository) to start the service before set it to the CamelContext; */ public class TimeoutInflightRepository extends ServiceSupport implements InflightRepository { + + // TODO: rework this a bit and likely add support for this to the default inflight repository + private static final Logger LOG = LoggerFactory.getLogger(TimeoutInflightRepository.class); private static final String INFLIGHT_TIME_STAMP = "CamelInflightTimeStamp"; private static final String TIMEOUT_EXCHANGE_PROCESSED = "CamelTimeoutExchangeProcessed"; @@ -124,7 +128,12 @@ public class TimeoutInflightRepository extends ServiceSupport implements Infligh // do nothing here return 0; } - + + @Override + public Collection<InflightExchange> browse() { + return null; + } + public long getWaitTime() { return waitTime; } http://git-wip-us.apache.org/repos/asf/camel/blob/3b943187/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java b/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java index cf22cfa..cd5391d 100644 --- a/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java +++ b/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java @@ -16,6 +16,8 @@ */ package org.apache.camel.spi; +import java.util.Collection; + import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.StaticService; @@ -28,6 +30,42 @@ import org.apache.camel.StaticService; public interface InflightRepository extends StaticService { /** + * Information about the inflight exchange. + */ + interface InflightExchange { + + /** + * The exchange being inflight + */ + Exchange getExchange(); + + /** + * The duration in millis the exchange has been inflight + */ + long getDuration(); + + /** + * The elapsed time in millis processing the exchange at the current node + */ + long getElapsed(); + + /** + * The id of the node from the route where the exchange currently is being processed + * <p/> + * Is <tt>null</tt> if message history is disabled. + */ + String getNodeId(); + + /** + * The id of the route where the exchange currently is being processed + * <p/> + * Is <tt>null</tt> if message history is disabled. + */ + String getRouteId(); + + } + + /** * Adds the exchange to the inflight registry to the total counter * * @param exchange the exchange @@ -92,4 +130,9 @@ public interface InflightRepository extends StaticService { */ int size(String routeId); + /** + * A <i>read-only</i> browser of the {@link InflightExchange}s that are currently inflight. + */ + Collection<InflightExchange> browse(); + } http://git-wip-us.apache.org/repos/asf/camel/blob/3b943187/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryBrowseTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryBrowseTest.java b/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryBrowseTest.java new file mode 100644 index 0000000..76b65cf --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryBrowseTest.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl; + +import java.util.Collection; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.spi.InflightRepository; + +/** + * @version + */ +public class InflightRepositoryBrowseTest extends ContextTestSupport { + + public void testInflight() throws Exception { + assertEquals(0, context.getInflightRepository().browse().size()); + + template.sendBody("direct:start", "Hello World"); + + assertEquals(0, context.getInflightRepository().browse().size()); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start").routeId("foo") + .to("mock:a") + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + Collection<InflightRepository.InflightExchange> list = context.getInflightRepository().browse(); + assertEquals(1, list.size()); + + InflightRepository.InflightExchange inflight = list.iterator().next(); + assertNotNull(inflight); + + assertEquals(exchange, inflight.getExchange()); + assertEquals("foo", inflight.getRouteId()); + assertEquals("myProcessor", inflight.getNodeId()); + } + }).id("myProcessor") + .to("mock:result"); + } + }; + } + +}