This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 6f5ffc059e36 CAMEL-22858: camel-jbang - get history command may only
show last step (#20840)
6f5ffc059e36 is described below
commit 6f5ffc059e36fd88062d0c175a8587b0aa908300
Author: Claus Ibsen <[email protected]>
AuthorDate: Thu Jan 15 17:21:57 2026 +0100
CAMEL-22858: camel-jbang - get history command may only show last step
(#20840)
---
.../camel/spi/BacklogTracerEventMessage.java | 13 +-
.../org/apache/camel/spi/InternalProcessor.java | 4 +-
.../apache/camel/spi/InternalProcessorFactory.java | 3 +
.../apache/camel/impl/debugger/BacklogTracer.java | 4 +-
.../impl/debugger/DefaultBacklogDebugger.java | 12 +-
.../debugger/DefaultBacklogTracerEventMessage.java | 14 +-
.../camel/impl/engine/CamelInternalProcessor.java | 324 +++++++++++++++++----
.../apache/camel/impl/engine/DefaultChannel.java | 4 +-
.../impl/engine/DefaultInflightRepository.java | 4 +-
.../processor/DefaultInternalProcessorFactory.java | 7 +
.../org/apache/camel/reifier/AggregateReifier.java | 9 +-
.../org/apache/camel/reifier/RouteReifier.java | 2 +-
.../management/BacklogTracerAggregateTest.java | 1 -
.../BacklogTracerMessageHistoryTest.java | 57 +++-
.../apache/camel/management/BacklogTracerTest.java | 29 +-
.../core/commands/action/CamelHistoryAction.java | 20 +-
16 files changed, 403 insertions(+), 104 deletions(-)
diff --git
a/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracerEventMessage.java
b/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracerEventMessage.java
index 20aacafdd3d5..8ef5c1647970 100644
---
a/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracerEventMessage.java
+++
b/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracerEventMessage.java
@@ -34,12 +34,14 @@ public interface BacklogTracerEventMessage {
long getUid();
/**
- * Whether this is a new incoming message and this is the first trace.
+ * Whether this is first message for a given route When a message is
routed via multiple routes, then each route
+ * will have a first/last pair.
*/
boolean isFirst();
/**
- * Whether this is the last trace of the message (its complete).
+ * Whether this is last message for a given route When a message is routed
via multiple routes, then each route will
+ * have a first/last pair.
*/
boolean isLast();
@@ -54,10 +56,15 @@ public interface BacklogTracerEventMessage {
String getLocation();
/**
- * Route id
+ * Current route id
*/
String getRouteId();
+ /**
+ * The original incoming route id
+ */
+ String getFromRouteId();
+
/**
* Whether this event was from a route that is created from Rest DSL.
*/
diff --git
a/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessor.java
b/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessor.java
index 8b4136a0efb1..3e338080e7da 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessor.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessor.java
@@ -19,6 +19,8 @@ package org.apache.camel.spi;
import java.util.List;
import org.apache.camel.AsyncProcessor;
+import org.apache.camel.CamelContext;
+import org.apache.camel.NamedRoute;
import org.apache.camel.Processor;
import org.apache.camel.Route;
@@ -74,7 +76,7 @@ public interface InternalProcessor extends AsyncProcessor {
/**
* Add advice for setting up {@link UnitOfWork} with the lifecycle of the
route.
*/
- void addRouteLifecycleAdvice();
+ void addRouteLifecycleAdvice(CamelContext camelContext, Route route,
NamedRoute node);
/**
* Add advice for JMX management for the route
diff --git
a/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java
b/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java
index 012f998c380e..7344c3a6962a 100644
---
a/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java
+++
b/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java
@@ -21,6 +21,7 @@ import org.apache.camel.AsyncProducer;
import org.apache.camel.CamelContext;
import org.apache.camel.Channel;
import org.apache.camel.Endpoint;
+import org.apache.camel.NamedNode;
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
@@ -43,6 +44,8 @@ public interface InternalProcessorFactory {
InternalProcessor addUnitOfWorkProcessorAdvice(CamelContext camelContext,
Processor processor, Route route);
+ CamelInternalProcessorAdvice<?>
createAggregateBacklogTracerAdvice(CamelContext camelContext, NamedNode
definition);
+
SharedInternalProcessor createSharedCamelInternalProcessor(CamelContext
camelContext);
Channel createChannel(CamelContext camelContext);
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogTracer.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogTracer.java
index 28c08f8f6d21..3e70af452a7b 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogTracer.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogTracer.java
@@ -155,7 +155,9 @@ public class BacklogTracer extends ServiceSupport
implements org.apache.camel.sp
}
if (tid == null || tid.equals(event.getExchangeId()) ||
tid.equals(event.getCorrelationExchangeId())) {
provisionalHistoryQueue.add(event);
- if (event.isLast()) {
+ boolean original = head != null && event.getRouteId() != null
&& event.getRouteId().equals(head.getRouteId());
+ if (event.isLast() && original) {
+ // only trigger completion when it's the original last
completeHistoryQueue.clear();
completeHistoryQueue.addAll(provisionalHistoryQueue);
provisionalHistoryQueue.clear();
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogDebugger.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogDebugger.java
index 92daba25a46d..7adf38f6b7c8 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogDebugger.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogDebugger.java
@@ -890,7 +890,8 @@ public final class DefaultBacklogDebugger extends
ServiceSupport implements Back
nodeId,
(nId, message) -> new DefaultBacklogTracerEventMessage(
camelContext,
- false, false, message.getUid(),
message.getTimestamp(), message.getLocation(), message.getRouteId(),
+ false, false, message.getUid(),
message.getTimestamp(), message.getLocation(), message.getFromRouteId(),
+ message.getRouteId(),
message.getToNode(), message.getToNodeParentId(),
message.getToNodeParentWhenId(),
message.getToNodeParentWhenLabel(),
message.getToNodeShortName(), message.getToNodeLabel(),
@@ -934,6 +935,7 @@ public final class DefaultBacklogDebugger extends
ServiceSupport implements Back
String toNodeShortName = definition.getShortName();
// avoid label is too large
String toNodeLabel =
StringHelper.limitLength(definition.getLabel(), 50);
+ String fromRouteId = exchange.getFromRouteId();
String routeId = CamelContextHelper.getRouteId(definition);
String exchangeId = exchange.getExchangeId();
String correlationExchangeId =
exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class);
@@ -945,7 +947,7 @@ public final class DefaultBacklogDebugger extends
ServiceSupport implements Back
BacklogTracerEventMessage msg
= new DefaultBacklogTracerEventMessage(
camelContext,
- first, false, uid, timestamp, source, routeId,
toNode, toNodeParentId, null, null,
+ first, false, uid, timestamp, source, fromRouteId,
routeId, toNode, toNodeParentId, null, null,
toNodeShortName, toNodeLabel, level, exchangeId,
correlationExchangeId,
false, false, data);
suspendedBreakpointMessages.put(nodeId, msg);
@@ -1024,6 +1026,7 @@ public final class DefaultBacklogDebugger extends
ServiceSupport implements Back
String toNodeShortName = definition.getShortName();
// avoid label is too large
String toNodeLabel =
StringHelper.limitLength(definition.getLabel(), 50);
+ String fromRouteId = exchange.getFromRouteId();
String routeId = CamelContextHelper.getRouteId(definition);
String exchangeId = exchange.getExchangeId();
String correlationExchangeId =
exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class);
@@ -1034,7 +1037,7 @@ public final class DefaultBacklogDebugger extends
ServiceSupport implements Back
BacklogTracerEventMessage msg
= new DefaultBacklogTracerEventMessage(
camelContext,
- false, false, uid, timestamp, source, routeId,
toNode, toNodeParentId, null, null,
+ false, false, uid, timestamp, source, fromRouteId,
routeId, toNode, toNodeParentId, null, null,
toNodeShortName, toNodeLabel, level,
exchangeId, correlationExchangeId,
false, false, data);
@@ -1133,6 +1136,7 @@ public final class DefaultBacklogDebugger extends
ServiceSupport implements Back
long timestamp = System.currentTimeMillis();
String toNode = CamelContextHelper.getRouteId(definition);
String toNodeParentId = definition.getParentId();
+ String fromRouteId = exchange.getFromRouteId();
String routeId = route != null ? route.getRouteId() : toNode;
String exchangeId = exchange.getExchangeId();
String correlationExchangeId =
exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class);
@@ -1143,7 +1147,7 @@ public final class DefaultBacklogDebugger extends
ServiceSupport implements Back
BacklogTracerEventMessage msg
= new DefaultBacklogTracerEventMessage(
camelContext,
- false, true, uid, timestamp, source, routeId,
toNode, toNodeParentId,
+ false, true, uid, timestamp, source, fromRouteId,
routeId, toNode, toNodeParentId,
null, null, null, null, level, exchangeId,
correlationExchangeId,
false, false, data);
// we want to capture if there was an exception
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogTracerEventMessage.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogTracerEventMessage.java
index a299be7b7918..7e8b0edabc38 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogTracerEventMessage.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogTracerEventMessage.java
@@ -44,6 +44,7 @@ public final class DefaultBacklogTracerEventMessage
implements BacklogTracerEven
private final long timestamp;
private final String location;
private final String routeId;
+ private final String fromRouteId;
private final String toNode;
private final String toNodeParentId;
private final String toNodeParentWhenId;
@@ -72,7 +73,8 @@ public final class DefaultBacklogTracerEventMessage
implements BacklogTracerEven
private boolean done;
public DefaultBacklogTracerEventMessage(CamelContext camelContext, boolean
first, boolean last, long uid, long timestamp,
- String location, String routeId,
String toNode, String toNodeParentId,
+ String location, String
fromRouteId, String routeId, String toNode,
+ String toNodeParentId,
String toNodeParentWhenId, String
toNodeParentWhenLabel,
String toNodeShortName, String
toNodeLabel, int toNodeLevel, String exchangeId,
String correlationExchangeId,
@@ -84,6 +86,7 @@ public final class DefaultBacklogTracerEventMessage
implements BacklogTracerEven
this.uid = uid;
this.timestamp = timestamp;
this.location = location;
+ this.fromRouteId = fromRouteId;
this.routeId = routeId;
this.toNode = toNode;
this.toNodeParentId = toNodeParentId;
@@ -138,6 +141,11 @@ public final class DefaultBacklogTracerEventMessage
implements BacklogTracerEven
return routeId;
}
+ @Override
+ public String getFromRouteId() {
+ return fromRouteId;
+ }
+
@Override
public boolean isRest() {
return rest;
@@ -351,6 +359,7 @@ public final class DefaultBacklogTracerEventMessage
implements BacklogTracerEven
}
// route id is optional and we then use an empty value for no route id
sb.append(prefix).append(" <routeId>").append(routeId != null ?
routeId : "").append("</routeId>\n");
+ sb.append(prefix).append(" <fromRouteId>").append(fromRouteId != null
? fromRouteId : "").append("</fromRouteId>\n");
if (endpointUri != null) {
sb.append(prefix).append("
<endpointUri>").append(endpointUri).append("</endpointUri>\n");
sb.append(prefix).append("
<remoteEndpoint>").append(remoteEndpoint).append("</remoteEndpoint>\n");
@@ -550,6 +559,9 @@ public final class DefaultBacklogTracerEventMessage
implements BacklogTracerEven
if (routeId != null) {
jo.put("routeId", routeId);
}
+ if (fromRouteId != null) {
+ jo.put("fromRouteId", fromRouteId);
+ }
if (toNode != null) {
jo.put("nodeId", toNode);
}
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
index d86ef0f57c62..368a9c09bd7e 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
@@ -82,6 +82,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.Objects.requireNonNull;
+import static
org.apache.camel.impl.engine.DefaultChannel.getOrCreateBacklogTracer;
/**
* Internal {@link Processor} that Camel routing engine used during routing
for cross cutting functionality such as:
@@ -197,8 +198,11 @@ public class CamelInternalProcessor extends
DelegateAsyncProcessor implements In
}
@Override
- public void addRouteLifecycleAdvice() {
+ public void addRouteLifecycleAdvice(CamelContext camelContext, Route
route, NamedRoute node) {
addAdvice(new CamelInternalProcessor.RouteLifecycleAdvice());
+ if (camelContext.isBacklogTracingStandby() ||
route.isBacklogTracing()) {
+ addAdvice(new BacklogTracerRouteAdvice(camelContext, node));
+ }
}
@Override
@@ -216,6 +220,10 @@ public class CamelInternalProcessor extends
DelegateAsyncProcessor implements In
if (task2 != null) {
task2.setRoute(route);
}
+ BacklogTracerRouteAdvice task3 =
getAdvice(BacklogTracerRouteAdvice.class);
+ if (task3 != null) {
+ task3.setRoute(route);
+ }
}
/**
@@ -594,6 +602,254 @@ public class CamelInternalProcessor extends
DelegateAsyncProcessor implements In
}
}
+ /**
+ * Advice to invoke callbacks for before and after routing for {@link
org.apache.camel.spi.BacklogTracer}.
+ */
+ public static class BacklogTracerRouteAdvice implements
CamelInternalProcessorAdvice<DefaultBacklogTracerEventMessage> {
+
+ private final CamelContext camelContext;
+ private final NamedRoute routeDefinition;
+ private final BacklogTracer backlogTracer;
+ private final boolean rest;
+ private final boolean template;
+ private final boolean skip;
+ private Route route;
+
+ public BacklogTracerRouteAdvice(CamelContext camelContext, NamedRoute
definition) {
+ this.camelContext = camelContext;
+ this.routeDefinition = definition;
+ if (routeDefinition != null) {
+ this.rest = routeDefinition.isCreatedFromRest();
+ this.template = routeDefinition.isCreatedFromTemplate();
+ } else {
+ this.rest = false;
+ this.template = false;
+ }
+ this.backlogTracer = getOrCreateBacklogTracer(camelContext);
+ // optimize whether to skip this route or not
+ if (this.rest && !backlogTracer.isTraceRests()) {
+ this.skip = true;
+ } else if (this.template && !backlogTracer.isTraceTemplates()) {
+ this.skip = true;
+ } else {
+ this.skip = false;
+ }
+ }
+
+ public void setRoute(Route route) {
+ this.route = route;
+ }
+
+ @Override
+ public DefaultBacklogTracerEventMessage before(Exchange exchange)
throws Exception {
+ if (!skip && backlogTracer.shouldTrace(routeDefinition.getInput(),
exchange)) {
+ final long created = exchange.getClock().getCreated();
+ NamedNode input = routeDefinition.getInput();
+ String source = LoggerHelper.getLineNumberLoggerName(input);
+ String exchangeId = exchange.getExchangeId();
+ String correlationExchangeId =
exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class);
+ String routeId = routeDefinition.getRouteId();
+ String fromRouteId = exchange.getFromRouteId();
+ int level = 1;
+
+ boolean includeExchangeProperties =
backlogTracer.isIncludeExchangeProperties();
+ boolean includeExchangeVariables =
backlogTracer.isIncludeExchangeVariables();
+ JsonObject data =
MessageHelper.dumpAsJSonObject(exchange.getIn(), includeExchangeProperties,
+ includeExchangeVariables, true,
+ true, backlogTracer.isBodyIncludeStreams(),
backlogTracer.isBodyIncludeFiles(),
+ backlogTracer.getBodyMaxChars());
+
+ DefaultBacklogTracerEventMessage first = new
DefaultBacklogTracerEventMessage(
+ camelContext,
+ true, false, backlogTracer.incrementTraceCounter(),
created, source, fromRouteId, routeId,
+ input.getId(),
+ null, null, null,
+ input.getShortName(), input.getLabel(),
+ level, exchangeId, correlationExchangeId, rest,
template, data);
+ if (exchange.getFromEndpoint() instanceof
EndpointServiceLocation esl) {
+ first.setEndpointServiceUrl(esl.getServiceUrl());
+ first.setEndpointServiceProtocol(esl.getServiceProtocol());
+ first.setEndpointServiceMetadata(esl.getServiceMetadata());
+ }
+ backlogTracer.traceEvent(first);
+ return first;
+ }
+ return null;
+ }
+
+ @Override
+ public void after(Exchange exchange, DefaultBacklogTracerEventMessage
first) throws Exception {
+ if (first != null) {
+
+ final long created = exchange.getClock().getCreated();
+ NamedNode input = routeDefinition.getInput();
+ String source = LoggerHelper.getLineNumberLoggerName(input);
+ String exchangeId = exchange.getExchangeId();
+ String correlationExchangeId =
exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class);
+ String routeId = routeDefinition.getRouteId();
+ String fromRouteId = exchange.getFromRouteId();
+ int level = 1;
+
+ boolean includeExchangeProperties =
backlogTracer.isIncludeExchangeProperties();
+ boolean includeExchangeVariables =
backlogTracer.isIncludeExchangeVariables();
+ JsonObject data =
MessageHelper.dumpAsJSonObject(exchange.getIn(), includeExchangeProperties,
+ includeExchangeVariables, true,
+ true, backlogTracer.isBodyIncludeStreams(),
backlogTracer.isBodyIncludeFiles(),
+ backlogTracer.getBodyMaxChars());
+
+ DefaultBacklogTracerEventMessage last = new
DefaultBacklogTracerEventMessage(
+ camelContext,
+ false, true, backlogTracer.incrementTraceCounter(),
created, source, fromRouteId, routeId,
+ input.getId(),
+ null, null, null,
+ input.getShortName(), input.getLabel(),
+ level, exchangeId, correlationExchangeId, rest,
template, data);
+ if (exchange.getFromEndpoint() instanceof
EndpointServiceLocation esl) {
+ first.setEndpointServiceUrl(esl.getServiceUrl());
+ first.setEndpointServiceProtocol(esl.getServiceProtocol());
+ first.setEndpointServiceMetadata(esl.getServiceMetadata());
+ }
+ backlogTracer.traceEvent(last);
+ doneProcessing(exchange, last);
+ doneProcessing(exchange, first);
+ // to not be confused then lets store duration on first/last
as (first = 0, last = total time to process)
+ last.setElapsed(first.getElapsed());
+ first.setElapsed(0);
+ }
+ }
+
+ private void doneProcessing(Exchange exchange,
DefaultBacklogTracerEventMessage data) {
+ data.doneProcessing();
+
+ String uri = null;
+ boolean remote = true;
+ Endpoint endpoint = null;
+ if ((data.isFirst() || data.isLast())) {
+ if (route.getConsumer() != null) {
+ // get the actual resolved uri
+ uri = route.getConsumer().getEndpoint().getEndpointUri();
+ remote = route.getConsumer().getEndpoint().isRemote();
+ endpoint = route.getEndpoint();
+ } else {
+ uri = routeDefinition.getEndpointUrl();
+ }
+ }
+ if (uri != null) {
+ data.setEndpointUri(uri);
+ }
+ data.setRemoteEndpoint(remote);
+ if (endpoint instanceof EndpointServiceLocation esl) {
+ data.setEndpointServiceUrl(esl.getServiceUrl());
+ data.setEndpointServiceProtocol(esl.getServiceProtocol());
+ data.setEndpointServiceMetadata(esl.getServiceMetadata());
+ }
+
+ if (!data.isFirst() && backlogTracer.isIncludeException()) {
+ // we want to capture if there was an exception
+ Throwable e = exchange.getException();
+ if (e != null) {
+ data.setException(e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Special advice for handling aggregate EIP for the {@link
org.apache.camel.spi.BacklogTracer}.
+ */
+ public static final class BacklogTracerAggregateAdvice
+ implements
CamelInternalProcessorAdvice<DefaultBacklogTracerEventMessage> {
+
+ private final CamelContext camelContext;
+ private final NamedNode processorDefinition;
+ private final BacklogTracer backlogTracer;
+
+ public BacklogTracerAggregateAdvice(CamelContext camelContext,
NamedNode definition) {
+ this.camelContext = camelContext;
+ this.processorDefinition = definition;
+ this.backlogTracer = getOrCreateBacklogTracer(camelContext);
+ }
+
+ @Override
+ public DefaultBacklogTracerEventMessage before(Exchange exchange)
throws Exception {
+ String exchangeId = exchange.getExchangeId();
+ String correlationExchangeId =
exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class);
+ int level = processorDefinition.getLevel();
+ String routeId = ExchangeHelper.getAtRouteId(exchange);
+ String fromRouteId = exchange.getFromRouteId();
+ String source =
LoggerHelper.getLineNumberLoggerName(processorDefinition);
+
+ boolean includeExchangeProperties =
backlogTracer.isIncludeExchangeProperties();
+ boolean includeExchangeVariables =
backlogTracer.isIncludeExchangeVariables();
+ JsonObject data = MessageHelper.dumpAsJSonObject(exchange.getIn(),
includeExchangeProperties,
+ includeExchangeVariables, true,
+ true, backlogTracer.isBodyIncludeStreams(),
backlogTracer.isBodyIncludeFiles(),
+ backlogTracer.getBodyMaxChars());
+
+ DefaultBacklogTracerEventMessage event = new
DefaultBacklogTracerEventMessage(
+ camelContext,
+ true, false, backlogTracer.incrementTraceCounter(),
exchange.getClock().getCreated(), source,
+ fromRouteId, routeId,
+ processorDefinition.getId(),
+ null, null, null,
+ processorDefinition.getShortName(),
processorDefinition.getLabel(),
+ level + 1, exchangeId, correlationExchangeId, false,
false, data);
+ backlogTracer.traceEvent(event);
+ return event;
+ }
+
+ @Override
+ public void after(Exchange exchange, DefaultBacklogTracerEventMessage
first) throws Exception {
+ if (first != null) {
+ final long created = exchange.getClock().getCreated();
+ String source =
LoggerHelper.getLineNumberLoggerName(processorDefinition);
+ String exchangeId = exchange.getExchangeId();
+ String correlationExchangeId =
exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class);
+ String routeId = ExchangeHelper.getAtRouteId(exchange);
+ String fromRouteId = exchange.getFromRouteId();
+ int level = 1;
+
+ boolean includeExchangeProperties =
backlogTracer.isIncludeExchangeProperties();
+ boolean includeExchangeVariables =
backlogTracer.isIncludeExchangeVariables();
+ JsonObject data =
MessageHelper.dumpAsJSonObject(exchange.getIn(), includeExchangeProperties,
+ includeExchangeVariables, true,
+ true, backlogTracer.isBodyIncludeStreams(),
backlogTracer.isBodyIncludeFiles(),
+ backlogTracer.getBodyMaxChars());
+
+ DefaultBacklogTracerEventMessage last = new
DefaultBacklogTracerEventMessage(
+ camelContext,
+ false, true, backlogTracer.incrementTraceCounter(),
created, source, fromRouteId, routeId,
+ processorDefinition.getId(),
+ null, null, null,
+ processorDefinition.getShortName(),
processorDefinition.getLabel(),
+ level, exchangeId, correlationExchangeId, false,
false, data);
+ if (exchange.getFromEndpoint() instanceof
EndpointServiceLocation esl) {
+ first.setEndpointServiceUrl(esl.getServiceUrl());
+ first.setEndpointServiceProtocol(esl.getServiceProtocol());
+ first.setEndpointServiceMetadata(esl.getServiceMetadata());
+ }
+ backlogTracer.traceEvent(last);
+ doneProcessing(exchange, last);
+ doneProcessing(exchange, first);
+ // to not be confused then lets store duration on first/last
as (first = 0, last = total time to process)
+ last.setElapsed(first.getElapsed());
+ first.setElapsed(0);
+ }
+ }
+
+ private void doneProcessing(Exchange exchange,
DefaultBacklogTracerEventMessage data) {
+ data.doneProcessing();
+
+ if (!data.isFirst() && backlogTracer.isIncludeException()) {
+ // we want to capture if there was an exception
+ Throwable e = exchange.getException();
+ if (e != null) {
+ data.setException(e);
+ }
+ }
+ }
+ }
+
/**
* Advice to execute the {@link BacklogTracer} if enabled.
*/
@@ -605,18 +861,16 @@ public class CamelInternalProcessor extends
DelegateAsyncProcessor implements In
private final BacklogTracer backlogTracer;
private final NamedNode processorDefinition;
private final NamedRoute routeDefinition;
- private final boolean first;
private final boolean rest;
private final boolean template;
private final boolean skip;
public BacklogTracerAdvice(CamelContext camelContext, BacklogTracer
backlogTracer, NamedNode processorDefinition,
- NamedRoute routeDefinition, boolean first) {
+ NamedRoute routeDefinition) {
this.camelContext = camelContext;
this.backlogTracer = backlogTracer;
this.processorDefinition = processorDefinition;
this.routeDefinition = routeDefinition;
- this.first = first;
if (routeDefinition != null) {
this.rest = routeDefinition.isCreatedFromRest();
@@ -687,83 +941,48 @@ public class CamelInternalProcessor extends
DelegateAsyncProcessor implements In
// if first we should add a pseudo trace message as well, so
we have a starting message (eg from the route)
String routeId = routeDefinition != null ?
routeDefinition.getRouteId() : null;
- if (first) {
- // use route as pseudo source when first
- final long created = exchange.getClock().getCreated();
-
- // special for aggregate which output are regarded as a
new first
- boolean aggregate = false;
- NamedNode input = routeDefinition != null ?
routeDefinition.getInput() : null;
- if (processorDefinition.getParent() != null
- &&
"aggregate".equals(processorDefinition.getParent().getShortName())) {
- aggregate = true;
- input = processorDefinition.getParent();
- }
- String source =
LoggerHelper.getLineNumberLoggerName(input);
-
- DefaultBacklogTracerEventMessage pseudoFirst;
- if (aggregate) {
- pseudoFirst = new DefaultBacklogTracerEventMessage(
- camelContext,
- true, false,
backlogTracer.incrementTraceCounter(), created, source, routeId, input.getId(),
- null, null, null,
- input.getShortName(), input.getLabel(),
- level - 1, exchangeId, correlationExchangeId,
rest, template, data);
- } else {
- pseudoFirst = new DefaultBacklogTracerEventMessage(
- camelContext,
- true, false,
backlogTracer.incrementTraceCounter(), created, source, routeId, input.getId(),
- null, null, null,
- input.getShortName(), input.getLabel(),
- level, exchangeId, correlationExchangeId,
rest, template, data);
- if (exchange.getFromEndpoint() instanceof
EndpointServiceLocation esl) {
-
pseudoFirst.setEndpointServiceUrl(esl.getServiceUrl());
-
pseudoFirst.setEndpointServiceProtocol(esl.getServiceProtocol());
-
pseudoFirst.setEndpointServiceMetadata(esl.getServiceMetadata());
- }
- }
- backlogTracer.traceEvent(pseudoFirst);
-
exchange.getExchangeExtension().addOnCompletion(createOnCompletion(source,
aggregate, pseudoFirst));
- }
+ String fromRouteId = exchange.getFromRouteId();
String source =
LoggerHelper.getLineNumberLoggerName(processorDefinition);
+
DefaultBacklogTracerEventMessage event = new
DefaultBacklogTracerEventMessage(
camelContext,
- false, false, backlogTracer.incrementTraceCounter(),
timestamp, source, routeId, toNode, toNodeParentId,
+ false, false, backlogTracer.incrementTraceCounter(),
timestamp, source, fromRouteId, routeId, toNode,
+ toNodeParentId,
toNodeParentWhenId, toNodeParentWhenLabel,
toNodeShortName, toNodeLabel, level,
exchangeId, correlationExchangeId, rest, template,
data);
backlogTracer.traceEvent(event);
-
return event;
}
return null;
}
- private SynchronizationAdapter createOnCompletion(
- String source, boolean aggregate,
DefaultBacklogTracerEventMessage pseudoFirst) {
+ private SynchronizationAdapter createAggregateOnCompletion(
+ String source, DefaultBacklogTracerEventMessage pseudoFirst) {
return new SynchronizationAdapter() {
@Override
public void onDone(Exchange exchange) {
- // create pseudo last
+ // create pseudo last for the aggregate
String routeId = routeDefinition != null ?
routeDefinition.getRouteId() : null;
+ String fromRouteId = exchange.getFromRouteId();
String exchangeId = exchange.getExchangeId();
String correlationExchangeId =
exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class);
boolean includeExchangeProperties =
backlogTracer.isIncludeExchangeProperties();
boolean includeExchangeVariables =
backlogTracer.isIncludeExchangeVariables();
long created = exchange.getClock().getCreated();
int level = pseudoFirst.getToNodeLevel();
- // aggregate is special
- String toNode = aggregate ? pseudoFirst.getToNode() : null;
- String toNodeShortName = aggregate ?
pseudoFirst.getToNodeShortName() : null;
- String toNodeLabel = aggregate ?
pseudoFirst.getToNodeLabel() : null;
+ String toNode = pseudoFirst.getToNode();
+ String toNodeShortName = pseudoFirst.getToNodeShortName();
+ String toNodeLabel = pseudoFirst.getToNodeLabel();
JsonObject data =
MessageHelper.dumpAsJSonObject(exchange.getIn(), includeExchangeProperties,
includeExchangeVariables, true,
true, backlogTracer.isBodyIncludeStreams(),
backlogTracer.isBodyIncludeFiles(),
backlogTracer.getBodyMaxChars());
DefaultBacklogTracerEventMessage pseudoLast = new
DefaultBacklogTracerEventMessage(
camelContext,
- false, true,
backlogTracer.incrementTraceCounter(), created, source, routeId, toNode, null,
null,
+ false, true,
backlogTracer.incrementTraceCounter(), created, source, fromRouteId, routeId,
toNode,
+ null, null,
null, toNodeShortName, toNodeLabel,
level, exchangeId, correlationExchangeId, rest,
template, data);
backlogTracer.traceEvent(pseudoLast);
@@ -822,7 +1041,6 @@ public class CamelInternalProcessor extends
DelegateAsyncProcessor implements In
}
}
}
-
}
/**
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java
index 2e2e7c77aceb..46c06f9ef098 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java
@@ -210,7 +210,7 @@ public class DefaultChannel extends CamelInternalProcessor
implements Channel {
if (camelContext.isBacklogTracingStandby() ||
route.isBacklogTracing()) {
// add jmx backlog tracer
BacklogTracer backlogTracer =
getOrCreateBacklogTracer(camelContext);
- addAdvice(new BacklogTracerAdvice(camelContext, backlogTracer,
targetOutputDef, routeDefinition, first));
+ addAdvice(new BacklogTracerAdvice(camelContext, backlogTracer,
targetOutputDef, routeDefinition));
}
if (route.isTracing() || camelContext.isTracingStandby()) {
// add logger tracer
@@ -293,7 +293,7 @@ public class DefaultChannel extends CamelInternalProcessor
implements Channel {
}
}
- private static BacklogTracer getOrCreateBacklogTracer(CamelContext
camelContext) {
+ static BacklogTracer getOrCreateBacklogTracer(CamelContext camelContext) {
BacklogTracer tracer = null;
if (camelContext.getRegistry() != null) {
// lookup in registry
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultInflightRepository.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultInflightRepository.java
index c606fb47730a..b13186abbd15 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultInflightRepository.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultInflightRepository.java
@@ -231,7 +231,7 @@ public class DefaultInflightRepository extends
ServiceSupport implements Infligh
@Override
@SuppressWarnings("unchecked")
public long getElapsed() {
- // this can only be calculate if message history is enabled
+ // this can only be calculated if message history is enabled
List<MessageHistory> list =
exchange.getProperty(ExchangePropertyKey.MESSAGE_HISTORY, List.class);
if (list == null || list.isEmpty()) {
return 0;
@@ -242,7 +242,7 @@ public class DefaultInflightRepository extends
ServiceSupport implements Infligh
if (history != null) {
long elapsed = history.getElapsed();
if (elapsed == 0) {
- // still in progress, so lets compute it via the start time
+ // still in progress, so let's compute it via the start
time
elapsed = history.getElapsedSinceCreated();
}
return elapsed;
diff --git
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultInternalProcessorFactory.java
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultInternalProcessorFactory.java
index caa7a5058206..f7ee326de31e 100644
---
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultInternalProcessorFactory.java
+++
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultInternalProcessorFactory.java
@@ -22,6 +22,7 @@ import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.Channel;
import org.apache.camel.Endpoint;
+import org.apache.camel.NamedNode;
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
@@ -29,6 +30,7 @@ import org.apache.camel.Route;
import org.apache.camel.impl.engine.CamelInternalProcessor;
import org.apache.camel.impl.engine.DefaultChannel;
import org.apache.camel.impl.engine.SharedCamelInternalProcessor;
+import org.apache.camel.spi.CamelInternalProcessorAdvice;
import org.apache.camel.spi.InterceptSendToEndpoint;
import org.apache.camel.spi.InternalProcessor;
import org.apache.camel.spi.InternalProcessorFactory;
@@ -45,6 +47,11 @@ public class DefaultInternalProcessorFactory implements
InternalProcessorFactory
return internal;
}
+ @Override
+ public CamelInternalProcessorAdvice<?>
createAggregateBacklogTracerAdvice(CamelContext camelContext, NamedNode
definition) {
+ return new
CamelInternalProcessor.BacklogTracerAggregateAdvice(camelContext, definition);
+ }
+
@Override
public SharedInternalProcessor
createSharedCamelInternalProcessor(CamelContext camelContext) {
return new SharedCamelInternalProcessor(
diff --git
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/AggregateReifier.java
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/AggregateReifier.java
index d84389b45343..55f2a12c0534 100644
---
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/AggregateReifier.java
+++
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/AggregateReifier.java
@@ -20,7 +20,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.camel.AggregationStrategy;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.Expression;
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
@@ -33,6 +32,7 @@ import
org.apache.camel.processor.aggregate.AggregateProcessor;
import org.apache.camel.processor.aggregate.OptimisticLockRetryPolicy;
import org.apache.camel.spi.AggregationRepository;
import org.apache.camel.spi.ExecutorServiceManager;
+import org.apache.camel.spi.InternalProcessor;
import org.apache.camel.support.PluginHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,8 +54,13 @@ public class AggregateReifier extends
ProcessorReifier<AggregateDefinition> {
Processor childProcessor = this.createChildProcessor(true);
// wrap the aggregate route in a unit of work processor
- AsyncProcessor target =
PluginHelper.getInternalProcessorFactory(camelContext)
+ InternalProcessor target =
PluginHelper.getInternalProcessorFactory(camelContext)
.addUnitOfWorkProcessorAdvice(camelContext, childProcessor,
route);
+ // if backlog tracing then add special advice to handle this
+ if (camelContext.isBacklogTracingStandby() ||
route.isBacklogTracing()) {
+
target.addAdvice(PluginHelper.getInternalProcessorFactory(camelContext)
+ .createAggregateBacklogTracerAdvice(camelContext,
definition));
+ }
// correlation expression is required
if (definition.getExpression() == null) {
diff --git
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/RouteReifier.java
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/RouteReifier.java
index 66560579a4f2..82b7298077d5 100644
---
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/RouteReifier.java
+++
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/RouteReifier.java
@@ -324,7 +324,7 @@ public class RouteReifier extends
ProcessorReifier<RouteDefinition> {
}
// wrap in route lifecycle
- internal.addRouteLifecycleAdvice();
+ internal.addRouteLifecycleAdvice(camelContext, route, definition);
// add advices
if (definition.getRestBindingDefinition() != null) {
diff --git
a/core/camel-management/src/test/java/org/apache/camel/management/BacklogTracerAggregateTest.java
b/core/camel-management/src/test/java/org/apache/camel/management/BacklogTracerAggregateTest.java
index 4ce46df10b67..2d08cfc9e058 100644
---
a/core/camel-management/src/test/java/org/apache/camel/management/BacklogTracerAggregateTest.java
+++
b/core/camel-management/src/test/java/org/apache/camel/management/BacklogTracerAggregateTest.java
@@ -60,7 +60,6 @@ public class BacklogTracerAggregateTest extends
ManagementTestSupport {
= (List<BacklogTracerEventMessage>) mbeanServer.invoke(on,
"dumpAllTracedMessages", null, null);
assertNotNull(events);
- assertEquals(19, events.size());
// should be 4 first and 4 last
assertEquals(4,
events.stream().filter(BacklogTracerEventMessage::isFirst).count());
diff --git
a/core/camel-management/src/test/java/org/apache/camel/management/BacklogTracerMessageHistoryTest.java
b/core/camel-management/src/test/java/org/apache/camel/management/BacklogTracerMessageHistoryTest.java
index 167b3fbbf324..99e6b7a89fe7 100644
---
a/core/camel-management/src/test/java/org/apache/camel/management/BacklogTracerMessageHistoryTest.java
+++
b/core/camel-management/src/test/java/org/apache/camel/management/BacklogTracerMessageHistoryTest.java
@@ -31,7 +31,6 @@ import org.junit.jupiter.api.condition.OS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@DisabledOnOs(OS.AIX)
@@ -95,27 +94,54 @@ public class BacklogTracerMessageHistoryTest extends
ManagementTestSupport {
events = (List<BacklogTracerEventMessage>) mbeanServer.invoke(on,
"dumpLatestMessageHistory", null, null);
assertNotNull(events);
- assertEquals(4, events.size());
+ assertEquals(8, events.size());
assertTrue(events.get(0).isFirst());
assertEquals("direct://start", events.get(0).getEndpointUri());
assertEquals("from", events.get(0).getToNodeShortName());
-
+ assertEquals("myRoute", events.get(0).getRouteId());
+ assertEquals("myRoute", events.get(0).getFromRouteId());
assertFalse(events.get(1).isFirst());
assertFalse(events.get(1).isLast());
assertEquals("foo", events.get(1).getToNode());
assertEquals("to", events.get(1).getToNodeShortName());
assertEquals("to[mock:foo]", events.get(1).getToNodeLabel());
-
- assertFalse(events.get(2).isFirst());
- assertFalse(events.get(2).isLast());
- assertEquals("bar", events.get(2).getToNode());
- assertEquals("to", events.get(2).getToNodeShortName());
- assertEquals("to[mock:bar]", events.get(2).getToNodeLabel());
-
- assertTrue(events.get(3).isLast());
- assertEquals("direct://start", events.get(3).getEndpointUri());
- assertNull(events.get(3).getToNode());
+ assertEquals("myRoute", events.get(1).getRouteId());
+ assertEquals("myRoute", events.get(1).getFromRouteId());
+
+ // sub-route
+ assertTrue(events.get(3).isFirst());
+ assertFalse(events.get(3).isLast());
+ assertEquals("direct://sub", events.get(3).getEndpointUri());
+ assertEquals("from", events.get(3).getToNodeShortName());
+ assertEquals("mySub", events.get(3).getRouteId());
+ assertEquals("myRoute", events.get(3).getFromRouteId());
+ assertFalse(events.get(4).isFirst());
+ assertFalse(events.get(4).isLast());
+ assertEquals("sub", events.get(4).getToNode());
+ assertEquals("to", events.get(4).getToNodeShortName());
+ assertEquals("to[mock:sub]", events.get(4).getToNodeLabel());
+ assertEquals("mySub", events.get(4).getRouteId());
+ assertEquals("myRoute", events.get(4).getFromRouteId());
+ assertFalse(events.get(5).isFirst());
+ assertTrue(events.get(5).isLast());
+ assertEquals("direct://sub", events.get(5).getEndpointUri());
+ assertEquals("from", events.get(5).getToNodeShortName());
+ assertEquals("mySub", events.get(5).getRouteId());
+ assertEquals("myRoute", events.get(5).getFromRouteId());
+
+ assertFalse(events.get(6).isFirst());
+ assertFalse(events.get(6).isLast());
+ assertEquals("bar", events.get(6).getToNode());
+ assertEquals("to", events.get(6).getToNodeShortName());
+ assertEquals("to[mock:bar]", events.get(6).getToNodeLabel());
+ assertEquals("myRoute", events.get(6).getRouteId());
+ assertEquals("myRoute", events.get(6).getFromRouteId());
+ assertTrue(events.get(7).isLast());
+ assertEquals("direct://start", events.get(7).getEndpointUri());
+ assertEquals("from1", events.get(7).getToNode());
+ assertEquals("myRoute", events.get(7).getRouteId());
+ assertEquals("myRoute", events.get(7).getFromRouteId());
}
@Override
@@ -127,10 +153,13 @@ public class BacklogTracerMessageHistoryTest extends
ManagementTestSupport {
context.setBacklogTracing(true);
context.setMessageHistory(true);
- from("direct:start")
+ from("direct:start").routeId("myRoute")
.to("mock:foo").id("foo")
+ .to("direct:sub")
.to("mock:bar").id("bar");
+ from("direct:sub").routeId("mySub")
+ .to("mock:sub").id("sub");
}
};
}
diff --git
a/core/camel-management/src/test/java/org/apache/camel/management/BacklogTracerTest.java
b/core/camel-management/src/test/java/org/apache/camel/management/BacklogTracerTest.java
index 4f3f07857520..3b4d2a6edace 100644
---
a/core/camel-management/src/test/java/org/apache/camel/management/BacklogTracerTest.java
+++
b/core/camel-management/src/test/java/org/apache/camel/management/BacklogTracerTest.java
@@ -395,18 +395,19 @@ public class BacklogTracerTest extends
ManagementTestSupport {
Integer size = (Integer) mbeanServer.getAttribute(on, "BacklogSize");
assertEquals(100, size.intValue(), "Should be 100");
- // change size to 2 x 10 (as we need for first as well)
- mbeanServer.setAttribute(on, new Attribute("BacklogSize", 20));
+ // change size to 10
+ mbeanServer.setAttribute(on, new Attribute("BacklogSize", 10));
// set the pattern to match only foo
mbeanServer.setAttribute(on, new Attribute("TracePattern", "foo"));
Boolean enabled = (Boolean) mbeanServer.getAttribute(on, "Enabled");
assertEquals(Boolean.TRUE, enabled, "Should not be enabled");
- getMockEndpoint("mock:foo").expectedMessageCount(10);
- getMockEndpoint("mock:bar").expectedMessageCount(10);
+ getMockEndpoint("mock:foo").expectedMessageCount(13);
+ getMockEndpoint("mock:bar").expectedMessageCount(13);
- for (int i = 0; i < 10; i++) {
+ // send 10 + 3 extra
+ for (int i = 0; i < 13; i++) {
template.sendBody("direct:start", "###" + i + "###");
}
@@ -414,20 +415,22 @@ public class BacklogTracerTest extends
ManagementTestSupport {
List<BacklogTracerEventMessage> events =
(List<BacklogTracerEventMessage>) mbeanServer.invoke(on, "dumpTracedMessages",
new Object[] { "foo" }, new String[] { "java.lang.String" });
- assertEquals(7, events.size());
+ assertEquals(10, events.size());
// the first should be 3 and the last 9
String xml = events.get(0).getMessageAsXml();
assertTrue(xml.contains("###3###"));
xml = events.get(6).getMessageAsXml();
assertTrue(xml.contains("###9###"));
+ xml = events.get(9).getMessageAsXml();
+ assertTrue(xml.contains("###12###"));
// send in another message
template.sendBody("direct:start", "###" + 10 + "###");
events = (List<BacklogTracerEventMessage>) mbeanServer.invoke(on,
"dumpTracedMessages",
new Object[] { "foo" }, new String[] { "java.lang.String" });
- assertEquals(7, events.size());
+ assertEquals(10, events.size());
// and we are shifted one now
xml = events.get(0).getMessageAsXml();
@@ -436,20 +439,20 @@ public class BacklogTracerTest extends
ManagementTestSupport {
assertTrue(xml.contains("###10###"));
// send in 4 messages
- template.sendBody("direct:start", "###" + 11 + "###");
- template.sendBody("direct:start", "###" + 12 + "###");
- template.sendBody("direct:start", "###" + 13 + "###");
template.sendBody("direct:start", "###" + 14 + "###");
+ template.sendBody("direct:start", "###" + 15 + "###");
+ template.sendBody("direct:start", "###" + 16 + "###");
+ template.sendBody("direct:start", "###" + 17 + "###");
events = (List<BacklogTracerEventMessage>) mbeanServer.invoke(on,
"dumpTracedMessages",
new Object[] { "foo" }, new String[] { "java.lang.String" });
- assertEquals(7, events.size());
+ assertEquals(10, events.size());
// and we are shifted +4 now
xml = events.get(0).getMessageAsXml();
assertTrue(xml.contains("###8###"));
- xml = events.get(6).getMessageAsXml();
- assertTrue(xml.contains("###14###"));
+ xml = events.get(9).getMessageAsXml();
+ assertTrue(xml.contains("###17###"));
}
@Override
diff --git
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelHistoryAction.java
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelHistoryAction.java
index ff2b6d0ec075..ddf488b2cc67 100644
---
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelHistoryAction.java
+++
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelHistoryAction.java
@@ -533,16 +533,18 @@ public class CamelHistoryAction extends
ActionWatchCommand {
private String getStatus(Row r) {
boolean remote = r.endpoint != null &&
r.endpoint.getBooleanOrDefault("remote", false);
+ boolean original = r.fromRouteId != null &&
r.fromRouteId.equals(r.routeId);
if (r.first) {
- String s = "Created";
+ String s = original ? "Created" : remote ? "Sent" : "Processed";
if (loggingColor) {
return
Ansi.ansi().fg(Ansi.Color.GREEN).a(s).reset().toString();
} else {
return s;
}
} else if (r.last) {
- String done = r.exception != null ? "Completed (exception)" :
"Completed (success)";
+ String s = original ? "Completed" : remote ? "Sent" : "Processed";
+ String done = r.exception != null ? s + " (exception)" : s + "
(success)";
if (loggingColor) {
return Ansi.ansi().fg(r.failed ? Ansi.Color.RED :
Ansi.Color.GREEN).a(done).reset().toString();
} else {
@@ -610,7 +612,9 @@ public class CamelHistoryAction extends ActionWatchCommand {
if (source && r.location != null) {
answer = r.location;
} else {
- if (r.nodeId == null) {
+ if (r.routeId != null && r.nodeId != null) {
+ answer = r.routeId + "/" + r.nodeId;
+ } else if (r.nodeId == null) {
answer = r.routeId;
} else {
answer = r.nodeId;
@@ -630,10 +634,11 @@ public class CamelHistoryAction extends
ActionWatchCommand {
}
private String getDirection(Row r) {
+ boolean original = r.routeId != null &&
r.routeId.equals(r.fromRouteId);
if (r.first) {
- return "*-->";
+ return original ? "*-->" : " -->";
} else if (r.last) {
- return "*<--";
+ return original ? "*<--" : " <--";
} else {
return null;
}
@@ -654,10 +659,11 @@ public class CamelHistoryAction extends
ActionWatchCommand {
}
private String getMessage(Row r) {
+ boolean original = r.routeId != null &&
r.routeId.equals(r.fromRouteId);
if (r.failed && !r.last) {
return "Exception: " + r.exception.getString("message");
}
- if (r.last) {
+ if (r.last && original) {
return r.failed ? "Failed" : "Success";
}
return r.summary;
@@ -714,6 +720,7 @@ public class CamelHistoryAction extends ActionWatchCommand {
row.first = jo.getBoolean("first");
row.last = jo.getBoolean("last");
row.location = jo.getString("location");
+ row.fromRouteId = jo.getString("fromRouteId");
row.routeId = jo.getString("routeId");
row.nodeId = jo.getString("nodeId");
row.nodeParentId = jo.getString("nodeParentId");
@@ -919,6 +926,7 @@ public class CamelHistoryAction extends ActionWatchCommand {
String exchangePattern;
String threadName;
String location;
+ String fromRouteId;
String routeId;
String nodeId;
String nodeParentId;