Repository: camel Updated Branches: refs/heads/camel-2.17.x 9e2f79c1d -> 8f3224356
CAMEL-10116: Retrieve last MessageHistory when getting NodeId and RouteId Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8f322435 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8f322435 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8f322435 Branch: refs/heads/camel-2.17.x Commit: 8f322435627e4c3c5deea41fb760cd6ed85e9fa8 Parents: 9e2f79c Author: Arno Noordover <anoordo...@users.noreply.github.com> Authored: Tue Jul 5 20:24:35 2016 +0200 Committer: Arno Noordover <anoordo...@users.noreply.github.com> Committed: Thu Jul 7 23:14:19 2016 +0200 ---------------------------------------------------------------------- .../impl/DefaultAsyncProcessorAwaitManager.java | 46 ++++-- .../org/apache/camel/impl/DefaultExchange.java | 3 +- .../camel/impl/DefaultInflightRepository.java | 13 +- .../camel/processor/CamelInternalProcessor.java | 3 +- .../org/apache/camel/util/ExchangeHelper.java | 4 +- .../DefaultAsyncProcessorAwaitManagerTest.java | 154 +++++++++++++++++++ .../async/AsyncProcessorAwaitManagerTest.java | 3 +- 7 files changed, 201 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/8f322435/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java index 2712178..d0d306b 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java @@ -18,7 +18,7 @@ package org.apache.camel.impl; import java.util.Collection; import java.util.Collections; -import java.util.List; +import java.util.LinkedList; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.camel.Exchange; import org.apache.camel.MessageHistory; +import org.apache.camel.NamedNode; import org.apache.camel.processor.DefaultExchangeFormatter; import org.apache.camel.spi.AsyncProcessorAwaitManager; import org.apache.camel.spi.ExchangeFormatter; @@ -240,23 +241,12 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements private final Exchange exchange; private final CountDownLatch latch; private final long start; - private String routeId; - private String nodeId; private AwaitThreadEntry(Thread thread, Exchange exchange, CountDownLatch latch) { this.thread = thread; this.exchange = exchange; this.latch = latch; this.start = System.currentTimeMillis(); - - // capture details from message history if enabled - List<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, List.class); - if (list != null && !list.isEmpty()) { - // grab last part - MessageHistory history = list.get(list.size() - 1); - routeId = history.getRouteId(); - nodeId = history.getNode() != null ? history.getNode().getId() : null; - } } @Override @@ -276,18 +266,46 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements @Override public String getRouteId() { - return routeId; + MessageHistory lastMessageHistory = getLastMessageHistory(); + if (lastMessageHistory == null) { + return null; + } + return lastMessageHistory.getRouteId(); } @Override public String getNodeId() { - return nodeId; + NamedNode node = getNode(); + if (node == null) { + return null; + } + return node.getId(); } public CountDownLatch getLatch() { return latch; } + private NamedNode getNode() { + MessageHistory lastMessageHistory = getLastMessageHistory(); + if (lastMessageHistory == null) { + return null; + } + return lastMessageHistory.getNode(); + } + + private MessageHistory getLastMessageHistory() { + LinkedList<MessageHistory> list = getMessageHistories(); + if (list == null || list.isEmpty()) { + return null; + } + return list.getLast(); + } + + private LinkedList<MessageHistory> getMessageHistories() { + return exchange.getProperty(Exchange.MESSAGE_HISTORY, LinkedList.class); + } + @Override public String toString() { return "AwaitThreadEntry[name=" + thread.getName() + ", exchangeId=" + exchange.getExchangeId() + "]"; http://git-wip-us.apache.org/repos/asf/camel/blob/8f322435/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java index 923c0d8..bebc5b3 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java @@ -17,6 +17,7 @@ package org.apache.camel.impl; import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -148,7 +149,7 @@ public final class DefaultExchange implements Exchange { // safe copy message history using a defensive copy List<MessageHistory> history = (List<MessageHistory>) answer.remove(Exchange.MESSAGE_HISTORY); if (history != null) { - answer.put(Exchange.MESSAGE_HISTORY, new ArrayList<MessageHistory>(history)); + answer.put(Exchange.MESSAGE_HISTORY, new LinkedList<>(history)); } return answer; http://git-wip-us.apache.org/repos/asf/camel/blob/8f322435/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java index ab0079b..c9c17c9 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java @@ -21,6 +21,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.Date; +import java.util.LinkedList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -170,13 +171,13 @@ public class DefaultInflightRepository extends ServiceSupport implements Infligh @Override @SuppressWarnings("unchecked") public long getElapsed() { - List<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, List.class); + LinkedList<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, LinkedList.class); if (list == null || list.isEmpty()) { return 0; } // get latest entry - MessageHistory history = list.get(list.size() - 1); + MessageHistory history = list.getLast(); if (history != null) { return history.getElapsed(); } else { @@ -187,13 +188,13 @@ public class DefaultInflightRepository extends ServiceSupport implements Infligh @Override @SuppressWarnings("unchecked") public String getNodeId() { - List<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, List.class); + LinkedList<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, LinkedList.class); if (list == null || list.isEmpty()) { return null; } // get latest entry - MessageHistory history = list.get(list.size() - 1); + MessageHistory history = list.getLast(); if (history != null) { return history.getNode().getId(); } else { @@ -204,13 +205,13 @@ public class DefaultInflightRepository extends ServiceSupport implements Infligh @Override @SuppressWarnings("unchecked") public String getRouteId() { - List<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, List.class); + LinkedList<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, LinkedList.class); if (list == null || list.isEmpty()) { return null; } // get latest entry - MessageHistory history = list.get(list.size() - 1); + MessageHistory history = list.getLast(); if (history != null) { return history.getRouteId(); } else { http://git-wip-us.apache.org/repos/asf/camel/blob/8f322435/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java index a7f31e4..fac1107 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java @@ -19,6 +19,7 @@ package org.apache.camel.processor; import java.util.ArrayList; import java.util.Collections; import java.util.Date; +import java.util.LinkedList; import java.util.List; import java.util.concurrent.RejectedExecutionException; @@ -744,7 +745,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor { public MessageHistory before(Exchange exchange) throws Exception { List<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, List.class); if (list == null) { - list = new ArrayList<MessageHistory>(); + list = new LinkedList<>(); exchange.setProperty(Exchange.MESSAGE_HISTORY, list); } MessageHistory history = factory.newMessageHistory(routeId, definition, new Date()); http://git-wip-us.apache.org/repos/asf/camel/blob/8f322435/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java index 9641026..030b78d 100644 --- a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java +++ b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java @@ -16,8 +16,8 @@ */ package org.apache.camel.util; -import java.util.ArrayList; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -915,7 +915,7 @@ public final class ExchangeHelper { // safe copy message history using a defensive copy List<MessageHistory> history = (List<MessageHistory>) answer.remove(Exchange.MESSAGE_HISTORY); if (history != null) { - answer.put(Exchange.MESSAGE_HISTORY, new ArrayList<MessageHistory>(history)); + answer.put(Exchange.MESSAGE_HISTORY, new LinkedList<>(history)); } return answer; http://git-wip-us.apache.org/repos/asf/camel/blob/8f322435/camel-core/src/test/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManagerTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManagerTest.java b/camel-core/src/test/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManagerTest.java new file mode 100644 index 0000000..b91354e --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManagerTest.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl; + +import java.util.LinkedList; +import java.util.concurrent.CountDownLatch; + +import org.apache.camel.Exchange; +import org.apache.camel.MessageHistory; +import org.apache.camel.NamedNode; +import org.apache.camel.spi.AsyncProcessorAwaitManager; +import org.apache.camel.spi.MessageHistoryFactory; +import org.junit.Test; + +import static org.hamcrest.core.Is.is; +import static org.hamcrest.core.IsNull.nullValue; +import static org.junit.Assert.assertThat; + +public class DefaultAsyncProcessorAwaitManagerTest { + + private static final MessageHistoryFactory MESSAGE_HISTORY_FACTORY = new DefaultMessageHistoryFactory(); + private DefaultAsyncProcessorAwaitManager defaultAsyncProcessorAwaitManager; + private DefaultExchange exchange; + private CountDownLatch latch; + private Thread thread; + + @Test + public void testNoMessageHistory() throws Exception { + startAsyncProcess(); + AsyncProcessorAwaitManager.AwaitThread awaitThread = defaultAsyncProcessorAwaitManager.browse().iterator().next(); + assertThat(awaitThread.getRouteId(), is(nullValue())); + assertThat(awaitThread.getNodeId(), is(nullValue())); + waitForEndOfAsyncProcess(); + } + + @Test + public void testMessageHistoryWithEmptyList() throws Exception { + startAsyncProcess(); + exchange.setProperty(Exchange.MESSAGE_HISTORY, new LinkedList<MessageHistory>()); + AsyncProcessorAwaitManager.AwaitThread awaitThread = defaultAsyncProcessorAwaitManager.browse().iterator().next(); + assertThat(awaitThread.getRouteId(), is(nullValue())); + assertThat(awaitThread.getNodeId(), is(nullValue())); + waitForEndOfAsyncProcess(); + } + + @Test + public void testMessageHistoryWithNullMessageHistory() throws Exception { + startAsyncProcess(); + LinkedList<MessageHistory> messageHistories = new LinkedList<>(); + messageHistories.add(null); + exchange.setProperty(Exchange.MESSAGE_HISTORY, messageHistories); + AsyncProcessorAwaitManager.AwaitThread awaitThread = defaultAsyncProcessorAwaitManager.browse().iterator().next(); + assertThat(awaitThread.getRouteId(), is(nullValue())); + assertThat(awaitThread.getNodeId(), is(nullValue())); + waitForEndOfAsyncProcess(); + } + + @Test + public void testMessageHistoryWithNullElements() throws Exception { + startAsyncProcess(); + LinkedList<MessageHistory> messageHistories = new LinkedList<>(); + messageHistories.add(MESSAGE_HISTORY_FACTORY.newMessageHistory(null, + new MockNamedNode().withId(null), + null)); + exchange.setProperty(Exchange.MESSAGE_HISTORY, messageHistories); + AsyncProcessorAwaitManager.AwaitThread awaitThread = defaultAsyncProcessorAwaitManager.browse().iterator().next(); + assertThat(awaitThread.getRouteId(), is(nullValue())); + assertThat(awaitThread.getNodeId(), is(nullValue())); + waitForEndOfAsyncProcess(); + } + + @Test + public void testMessageHistoryWithNotNullElements() throws Exception { + startAsyncProcess(); + LinkedList<MessageHistory> messageHistories = new LinkedList<>(); + messageHistories.add(MESSAGE_HISTORY_FACTORY.newMessageHistory("routeId", + new MockNamedNode().withId("nodeId"), + null)); + exchange.setProperty(Exchange.MESSAGE_HISTORY, messageHistories); + AsyncProcessorAwaitManager.AwaitThread awaitThread = defaultAsyncProcessorAwaitManager.browse().iterator().next(); + assertThat(awaitThread.getRouteId(), is("routeId")); + assertThat(awaitThread.getNodeId(), is("nodeId")); + waitForEndOfAsyncProcess(); + } + + private void waitForEndOfAsyncProcess() { + latch.countDown(); + while (thread.isAlive()) { + } + } + + private void startAsyncProcess() throws InterruptedException { + defaultAsyncProcessorAwaitManager = new DefaultAsyncProcessorAwaitManager(); + latch = new CountDownLatch(1); + BackgroundAwait backgroundAwait = new BackgroundAwait(); + exchange = new DefaultExchange(new DefaultCamelContext()); + thread = new Thread(backgroundAwait); + thread.start(); + Thread.sleep(100); + } + + + private class BackgroundAwait implements Runnable { + + @Override + public void run() { + defaultAsyncProcessorAwaitManager.await(exchange, latch); + } + } + + private static class MockNamedNode implements NamedNode { + + private String id; + + @Override + public String getId() { + return id; + } + + @Override + public String getShortName() { + return this.getClass().getSimpleName(); + } + + @Override + public String getLabel() { + return this.getClass().getName(); + } + + @Override + public String getDescriptionText() { + return this.getClass().getCanonicalName(); + } + + public MockNamedNode withId(String id) { + this.id = id; + return this; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/8f322435/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java index 1fc4635..b56b279 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java @@ -73,7 +73,8 @@ public class AsyncProcessorAwaitManagerTest extends ContextTestSupport { log.info("Thread {} has waited for {} msec.", thread.getBlockedThread().getName(), wait); assertEquals("myRoute", thread.getRouteId()); - assertEquals("myAsync", thread.getNodeId()); + //assertEquals("myAsync", thread.getNodeId()); + assertEquals("process1", thread.getNodeId()); } }) .to("mock:result");