This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 88950472467 Jbang trace (#10905) 88950472467 is described below commit 8895047246799a8ac623333b1e9789b35e5757af Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Sun Jul 30 08:40:47 2023 +0200 Jbang trace (#10905) * Deprecated some unusued exchange constants * CAMEL-19368: camel-core - Backlog tracer to capture endpoint uri of trace events when an exchange is send to endpoints with varioius EIPs. * CAMEL-19369: camel-jbang - Trace command to show endpoint uri --- .../camel/spi/BacklogTracerEventMessage.java | 6 ++ .../debugger/DefaultBacklogTracerEventMessage.java | 15 ++++ .../camel/impl/engine/CamelInternalProcessor.java | 86 +++++++++++++++++++++- .../apache/camel/impl/engine/DefaultChannel.java | 2 +- .../core/commands/action/CamelTraceAction.java | 16 +++- 5 files changed, 122 insertions(+), 3 deletions(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracerEventMessage.java b/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracerEventMessage.java index 1529d3d4151..69c2ed251f2 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracerEventMessage.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracerEventMessage.java @@ -121,6 +121,12 @@ public interface BacklogTracerEventMessage { */ String getExceptionAsJSon(); + /** + * The endpoint uri if this trace is either from a route input (from), or the exchange was sent to an endpoint such + * as (to, toD, wireTap) etc. + */ + String getEndpointUri(); + /** * Dumps the event message as XML using the {@link #ROOT_TAG} as root tag. * <p/> diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogTracerEventMessage.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogTracerEventMessage.java index bcdeb3a24b5..f5e4cc70723 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogTracerEventMessage.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogTracerEventMessage.java @@ -40,6 +40,7 @@ public final class DefaultBacklogTracerEventMessage implements BacklogTracerEven private final String toNode; private final String exchangeId; private final String threadName; + private String endpointUri; private final boolean rest; private final boolean template; private final String messageAsXml; @@ -183,6 +184,14 @@ public final class DefaultBacklogTracerEventMessage implements BacklogTracerEven this.exceptionAsJSon = exceptionAsJSon; } + public String getEndpointUri() { + return endpointUri; + } + + public void setEndpointUri(String endpointUri) { + this.endpointUri = endpointUri; + } + @Override public String toString() { return "DefaultBacklogTracerEventMessage[" + exchangeId + " at " + toNode + "]"; @@ -218,6 +227,9 @@ public final class DefaultBacklogTracerEventMessage implements BacklogTracerEven } // route id is optional and we then use an empty value for no route id sb.append(prefix).append(" <routeId>").append(routeId != null ? routeId : "").append("</routeId>\n"); + if (endpointUri != null) { + sb.append(prefix).append(" <endpointUri>").append(endpointUri).append("</endpointUri>\n"); + } if (toNode != null) { sb.append(prefix).append(" <toNode>").append(toNode).append("</toNode>\n"); } else { @@ -254,6 +266,9 @@ public final class DefaultBacklogTracerEventMessage implements BacklogTracerEven if (location != null) { jo.put("location", location); } + if (endpointUri != null) { + jo.put("endpointUri", endpointUri); + } if (routeId != null) { jo.put("routeId", routeId); } diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java index 08dc47d675a..ba72efb9b63 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java @@ -19,11 +19,14 @@ package org.apache.camel.impl.engine; import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.RejectedExecutionException; import org.apache.camel.AsyncCallback; import org.apache.camel.CamelContext; +import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.ExchangePropertyKey; import org.apache.camel.Message; @@ -39,8 +42,10 @@ import org.apache.camel.StreamCacheException; import org.apache.camel.impl.debugger.BacklogDebugger; import org.apache.camel.impl.debugger.BacklogTracer; import org.apache.camel.impl.debugger.DefaultBacklogTracerEventMessage; +import org.apache.camel.spi.CamelEvent; import org.apache.camel.spi.CamelInternalProcessorAdvice; import org.apache.camel.spi.Debugger; +import org.apache.camel.spi.EventNotifier; import org.apache.camel.spi.InflightRepository; import org.apache.camel.spi.InternalProcessor; import org.apache.camel.spi.ManagementInterceptStrategy.InstrumentationProcessor; @@ -63,6 +68,7 @@ import org.apache.camel.support.LoggerHelper; import org.apache.camel.support.MessageHelper; import org.apache.camel.support.OrderedComparator; import org.apache.camel.support.PluginHelper; +import org.apache.camel.support.SimpleEventNotifierSupport; import org.apache.camel.support.SynchronizationAdapter; import org.apache.camel.support.UnitOfWorkHelper; import org.apache.camel.support.processor.DelegateAsyncProcessor; @@ -552,6 +558,8 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In public static final class BacklogTracerAdvice implements CamelInternalProcessorAdvice<DefaultBacklogTracerEventMessage>, Ordered { + private final BacklogTraceAdviceEventNotifier notifier; + private final CamelContext camelContext; private final BacklogTracer backlogTracer; private final NamedNode processorDefinition; private final NamedRoute routeDefinition; @@ -560,8 +568,9 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In private final boolean template; private final boolean skip; - public BacklogTracerAdvice(BacklogTracer backlogTracer, NamedNode processorDefinition, + public BacklogTracerAdvice(CamelContext camelContext, BacklogTracer backlogTracer, NamedNode processorDefinition, NamedRoute routeDefinition, boolean first) { + this.camelContext = camelContext; this.backlogTracer = backlogTracer; this.processorDefinition = processorDefinition; this.routeDefinition = routeDefinition; @@ -582,11 +591,28 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In } else { this.skip = false; } + this.notifier = getOrCreateEventNotifier(camelContext); + } + + private BacklogTraceAdviceEventNotifier getOrCreateEventNotifier(CamelContext camelContext) { + // use a single instance of this event notifier + for (EventNotifier en : camelContext.getManagementStrategy().getEventNotifiers()) { + if (en instanceof BacklogTraceAdviceEventNotifier) { + return (BacklogTraceAdviceEventNotifier) en; + } + } + BacklogTraceAdviceEventNotifier answer = new BacklogTraceAdviceEventNotifier(); + camelContext.getManagementStrategy().addEventNotifier(answer); + return answer; } @Override public DefaultBacklogTracerEventMessage before(Exchange exchange) throws Exception { if (!skip && backlogTracer.shouldTrace(processorDefinition, exchange)) { + + // to capture if the exchange was sent to an endpoint during this event + notifier.before(exchange); + long timestamp = System.currentTimeMillis(); String toNode = processorDefinition.getId(); String exchangeId = exchange.getExchangeId(); @@ -656,6 +682,25 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In private void doneProcessing(Exchange exchange, DefaultBacklogTracerEventMessage data) { data.doneProcessing(); + + String uri = null; + Endpoint endpoint = notifier.after(exchange); + if (endpoint != null) { + uri = endpoint.getEndpointUri(); + } else if ((data.isFirst() || data.isLast()) && data.getToNode() == null && routeDefinition != null) { + // pseudo first/last event (the from in the route) + Route route = camelContext.getRoute(routeDefinition.getRouteId()); + if (route != null && route.getConsumer() != null) { + // get the actual resolved uri + uri = route.getConsumer().getEndpoint().getEndpointUri(); + } else { + uri = routeDefinition.getEndpointUrl(); + } + } + if (uri != null) { + data.setEndpointUri(uri); + } + if (!data.isFirst()) { // we want to capture if there was an exception Throwable e = exchange.getException(); @@ -1212,4 +1257,43 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In } } + /** + * Event notifier for {@link BacklogTracerAdvice} to capture + * {@link Exchange} sent to endpoints during tracing. + */ + private static final class BacklogTraceAdviceEventNotifier extends SimpleEventNotifierSupport { + + private final Object dummy = new Object(); + + private final ConcurrentMap<Exchange, Object> uris = new ConcurrentHashMap<>(); + + public BacklogTraceAdviceEventNotifier() { + // only capture sending events + setIgnoreExchangeEvents(false); + setIgnoreExchangeSendingEvents(false); + } + + @Override + public void notify(CamelEvent event) throws Exception { + if (event instanceof CamelEvent.ExchangeSendingEvent ess) { + Exchange e = ess.getExchange(); + if (uris.containsKey(e)) { + uris.put(e, ess.getEndpoint()); + } + } + } + + public void before(Exchange exchange) { + uris.put(exchange, dummy); + } + + public Endpoint after(Exchange exchange) { + Object o = uris.remove(exchange); + if (o == dummy) { + return null; + } + return (Endpoint) o; + } + + } } diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java index a3ec2d77988..5d8ede6ca66 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java @@ -196,7 +196,7 @@ public class DefaultChannel extends CamelInternalProcessor implements Channel { if (camelContext.isBacklogTracingStandby() || route.isBacklogTracing()) { // add jmx backlog tracer BacklogTracer backlogTracer = getOrCreateBacklogTracer(camelContext); - addAdvice(new BacklogTracerAdvice(backlogTracer, targetOutputDef, routeDefinition, first)); + addAdvice(new BacklogTracerAdvice(camelContext, backlogTracer, targetOutputDef, routeDefinition, first)); } if (route.isTracing() || camelContext.isTracingStandby()) { // add logger tracer diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelTraceAction.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelTraceAction.java index 9c1799584a6..e9269b91ca5 100644 --- a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelTraceAction.java +++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelTraceAction.java @@ -42,6 +42,7 @@ import org.apache.camel.util.IOHelper; import org.apache.camel.util.StopWatch; import org.apache.camel.util.StringHelper; import org.apache.camel.util.TimeUtils; +import org.apache.camel.util.URISupport; import org.apache.camel.util.json.JsonArray; import org.apache.camel.util.json.JsonObject; import org.apache.camel.util.json.Jsoner; @@ -137,6 +138,10 @@ public class CamelTraceAction extends ActionBaseCommand { description = "Only output traces from the latest (follow if necessary until complete and exit)") boolean latest; + @CommandLine.Option(names = { "--mask" }, + description = "Whether to mask endpoint URIs to avoid printing sensitive information such as password or access keys") + boolean mask; + @CommandLine.Option(names = { "--pretty" }, description = "Pretty print message body when using JSon or XML format") boolean pretty; @@ -421,6 +426,14 @@ public class CamelTraceAction extends ActionBaseCommand { row.location = jo.getString("location"); row.routeId = jo.getString("routeId"); row.nodeId = jo.getString("nodeId"); + String uri = jo.getString("endpointUri"); + if (uri != null) { + row.endpoint = new JsonObject(); + if (mask) { + uri = URISupport.sanitizeUri(uri); + } + row.endpoint.put("endpoint", uri); + } Long ts = jo.getLong("timestamp"); if (ts != null) { row.timestamp = ts; @@ -696,7 +709,7 @@ public class CamelTraceAction extends ActionBaseCommand { } private String getDataAsTable(Row r) { - return tableHelper.getDataAsTable(r.exchangeId, r.exchangePattern, null, r.message, r.exception); + return tableHelper.getDataAsTable(r.exchangeId, r.exchangePattern, r.endpoint, r.message, r.exception); } private String getElapsed(Row r) { @@ -780,6 +793,7 @@ public class CamelTraceAction extends ActionBaseCommand { long elapsed; boolean done; boolean failed; + JsonObject endpoint; JsonObject message; JsonObject exception;