This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 880b7b1 CAMEL-15628: Fixed ArrayIndexOutOfBoundsException for concurrent / high throughtput routing with message history enabled. 880b7b1 is described below commit 880b7b173ef0edaab37419ae88a43ad88bd0a1ac Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Sat Oct 3 14:15:05 2020 +0200 CAMEL-15628: Fixed ArrayIndexOutOfBoundsException for concurrent / high throughtput routing with message history enabled. --- .../org/apache/camel/impl/engine/DefaultInflightRepository.java | 5 ++--- .../java/org/apache/camel/processor/CamelInternalProcessor.java | 5 +++-- .../src/main/java/org/apache/camel/processor/Splitter.java | 6 ++++-- .../org/apache/camel/processor/interceptor/DefaultDebugger.java | 4 ++-- .../src/main/docs/modules/eips/pages/message-history.adoc | 3 +++ .../src/main/java/org/apache/camel/support/DefaultExchange.java | 5 +++-- .../src/main/java/org/apache/camel/support/ExchangeHelper.java | 5 +++-- 7 files changed, 20 insertions(+), 13 deletions(-) diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultInflightRepository.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultInflightRepository.java index 7438f00..b2e7db8 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultInflightRepository.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultInflightRepository.java @@ -19,7 +19,6 @@ package org.apache.camel.impl.engine; import java.util.Collection; import java.util.Collections; import java.util.Comparator; -import java.util.LinkedList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -234,13 +233,13 @@ public class DefaultInflightRepository extends ServiceSupport implements Infligh @SuppressWarnings("unchecked") public long getElapsed() { // this can only be calculate if message history is enabled - LinkedList<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, LinkedList.class); + List<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, List.class); if (list == null || list.isEmpty()) { return 0; } // get latest entry - MessageHistory history = list.getLast(); + MessageHistory history = list.get(list.size() - 1); if (history != null) { long elapsed = history.getElapsed(); if (elapsed == 0 && history.getTime() > 0) { diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java index 1899481..d244cc8 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java @@ -17,9 +17,9 @@ package org.apache.camel.processor; import java.util.ArrayList; -import java.util.LinkedList; import java.util.List; import java.util.Objects; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.RejectedExecutionException; import org.apache.camel.AsyncCallback; @@ -723,7 +723,8 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor { if (history != null) { List<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, List.class); if (list == null) { - list = new LinkedList<>(); + // use thread-safe list as message history may be accessed concurrently + list = new CopyOnWriteArrayList<>(); exchange.setProperty(Exchange.MESSAGE_HISTORY, list); } list.add(history); diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Splitter.java b/core/camel-base/src/main/java/org/apache/camel/processor/Splitter.java index a6c8847..8efa0c7 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/Splitter.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/Splitter.java @@ -276,8 +276,10 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac private static Exchange copyAndPrepareSubExchange(Exchange exchange, boolean preserveExchangeId) { Exchange answer = ExchangeHelper.createCopy(exchange, preserveExchangeId); - // we do not want to copy the message history for splitted sub-messages - answer.getProperties().remove(Exchange.MESSAGE_HISTORY); + if (exchange.getContext().isMessageHistory()) { + // we do not want to copy the message history for splitted sub-messages + answer.getProperties().remove(Exchange.MESSAGE_HISTORY); + } return answer; } } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/interceptor/DefaultDebugger.java b/core/camel-base/src/main/java/org/apache/camel/processor/interceptor/DefaultDebugger.java index 2f54f27..d25123b 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/interceptor/DefaultDebugger.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/interceptor/DefaultDebugger.java @@ -300,8 +300,8 @@ public class DefaultDebugger extends ServiceSupport implements Debugger, CamelCo @SuppressWarnings("unchecked") protected void onEvent(Exchange exchange, ExchangeEvent event, Breakpoint breakpoint) { // try to get the last known definition - LinkedList<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, LinkedList.class); - MessageHistory last = list != null ? list.getLast() : null; + List<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, List.class); + MessageHistory last = list != null ? list.get(list.size() - 1) : null; NamedNode definition = last != null ? last.getNode() : null; try { diff --git a/core/camel-core-engine/src/main/docs/modules/eips/pages/message-history.adoc b/core/camel-core-engine/src/main/docs/modules/eips/pages/message-history.adoc index 170f59a..43cb853 100644 --- a/core/camel-core-engine/src/main/docs/modules/eips/pages/message-history.adoc +++ b/core/camel-core-engine/src/main/docs/modules/eips/pages/message-history.adoc @@ -16,6 +16,9 @@ if needed, such as during development, where Camel can report route stack-traces But for production usage, then message history should only be enabled if you have monitoring systems that rely on gather these fine grained details. +IMPORTANT: When message history is enabled then there is a slight performance overhead as the history data is stored +in a `java.util.concurrent.CopyOnWriteArrayList` due to the need of being thread safe. + == Enabling or disabling Message History The Message History can be enabled or disabled per CamelContext or per route (disabled by default). diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java index d48e6bd..ccadefe 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java @@ -19,11 +19,11 @@ package org.apache.camel.support; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import org.apache.camel.CamelContext; import org.apache.camel.CamelExecutionException; @@ -176,7 +176,8 @@ public final class DefaultExchange implements ExtendedExchange { // safe copy message history using a defensive copy List<MessageHistory> history = (List<MessageHistory>) target.remove(Exchange.MESSAGE_HISTORY); if (history != null) { - target.put(Exchange.MESSAGE_HISTORY, new LinkedList<>(history)); + // use thread-safe list as message history may be accessed concurrently + target.put(Exchange.MESSAGE_HISTORY, new CopyOnWriteArrayList<>(history)); } } } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java index 41315fe..b207cf0 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java @@ -21,10 +21,10 @@ import java.io.IOException; import java.io.InputStream; import java.nio.channels.ReadableByteChannel; import java.util.HashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -929,7 +929,8 @@ public final class ExchangeHelper { // safe copy message history using a defensive copy List<MessageHistory> history = (List<MessageHistory>) answer.remove(Exchange.MESSAGE_HISTORY); if (history != null) { - answer.put(Exchange.MESSAGE_HISTORY, new LinkedList<>(history)); + // use thread-safe list as message history may be accessed concurrently + answer.put(Exchange.MESSAGE_HISTORY, new CopyOnWriteArrayList<>(history)); } return answer;