This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch CAMEL-23617 in repository https://gitbox.apache.org/repos/asf/camel.git
commit 81851b6f8d47578f4bf521c7cda4cde690a59a41 Author: Claus Ibsen <[email protected]> AuthorDate: Tue May 26 15:37:56 2026 +0200 CAMEL-23617: Rework message size tracking to endpoint-based approach Move message size statistics from per-route MBean attributes to per-endpoint tracking via RuntimeEndpointRegistry. This captures both IN and OUT directions naturally and avoids per-channel advice overhead. Sizes are computed at ExchangeCreatedEvent (IN) and ExchangeSendingEvent (OUT) using the existing MessageSizeStrategy SPI. Co-Authored-By: Claude Opus 4.6 <[email protected]> --- .../apache/camel/spi/RuntimeEndpointRegistry.java | 54 +++++++++ .../camel/impl/engine/CamelInternalProcessor.java | 38 ------ .../apache/camel/impl/engine/DefaultChannel.java | 4 - .../impl/engine/DefaultMessageSizeStrategy.java | 2 +- .../engine/DefaultRuntimeEndpointRegistry.java | 106 ++++++++++++++++- .../api/management/mbean/CamelOpenMBeanTypes.java | 14 ++- .../mbean/ManagedPerformanceCounterMBean.java | 18 --- .../mbean/ManagedPerformanceCounter.java | 102 ---------------- .../mbean/ManagedRuntimeEndpointRegistry.java | 11 +- .../camel/management/ManagedMessageSizeTest.java | 90 +++++++++----- .../camel/support/EndpointSizeStatistics.java | 132 +++++++++++++++++++++ docs/user-manual/modules/ROOT/pages/jmx.adoc | 6 +- .../modules/ROOT/pages/message-size.adoc | 71 +++++++---- 13 files changed, 415 insertions(+), 233 deletions(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/RuntimeEndpointRegistry.java b/core/camel-api/src/main/java/org/apache/camel/spi/RuntimeEndpointRegistry.java index afc86371e421..950b7280c24c 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/RuntimeEndpointRegistry.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/RuntimeEndpointRegistry.java @@ -54,6 +54,60 @@ public interface RuntimeEndpointRegistry extends StaticService { * {@link org.apache.camel.ManagementStatisticsLevel#Extended}. */ long getHits(); + + /** + * Minimum message body size in bytes (-1 if not available) + * + * @since 4.21 + */ + default long getMinBodySize() { + return -1; + } + + /** + * Maximum message body size in bytes (-1 if not available) + * + * @since 4.21 + */ + default long getMaxBodySize() { + return -1; + } + + /** + * Mean message body size in bytes (-1 if not available) + * + * @since 4.21 + */ + default long getMeanBodySize() { + return -1; + } + + /** + * Minimum message headers size in bytes (-1 if not available) + * + * @since 4.21 + */ + default long getMinHeadersSize() { + return -1; + } + + /** + * Maximum message headers size in bytes (-1 if not available) + * + * @since 4.21 + */ + default long getMaxHeadersSize() { + return -1; + } + + /** + * Mean message headers size in bytes (-1 if not available) + * + * @since 4.21 + */ + default long getMeanHeadersSize() { + return -1; + } } /** diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java index c0d276bc148a..66c56142faf0 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java @@ -52,7 +52,6 @@ import org.apache.camel.spi.InflightRepository; import org.apache.camel.spi.InternalProcessor; import org.apache.camel.spi.ManagementInterceptStrategy.InstrumentationProcessor; import org.apache.camel.spi.MessageHistoryFactory; -import org.apache.camel.spi.MessageSizeStrategy; import org.apache.camel.spi.PooledObjectFactory; import org.apache.camel.spi.ReactiveExecutor; import org.apache.camel.spi.RoutePolicy; @@ -1317,43 +1316,6 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In } } - /** - * Advice for {@link org.apache.camel.spi.MessageSizeStrategy} - */ - public static class MessageSizeAdvice implements CamelInternalProcessorAdvice, Ordered { - - private final MessageSizeStrategy strategy; - - public MessageSizeAdvice(MessageSizeStrategy strategy) { - this.strategy = strategy; - } - - @Override - public Object before(Exchange exchange) throws Exception { - long bodySize = strategy.computeBodySize(exchange.getIn()); - long headersSize = strategy.computeHeadersSize(exchange.getIn()); - exchange.setProperty(ExchangePropertyKey.MESSAGE_BODY_SIZE, bodySize); - exchange.setProperty(ExchangePropertyKey.MESSAGE_HEADERS_SIZE, headersSize); - return null; - } - - @Override - public void after(Exchange exchange, Object data) throws Exception { - // noop - } - - @Override - public boolean hasState() { - return false; - } - - @Override - public int getOrder() { - // after stream caching (HIGHEST) but before instrumentation (LOWEST - 2) - return Ordered.HIGHEST + 1; - } - } - /** * Advice for delaying */ diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java index 69e53ac0c3b1..34c4acb82499 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java @@ -262,10 +262,6 @@ public class DefaultChannel extends CamelInternalProcessor implements Channel { addAdvice(new StreamCachingAdvice(camelContext.getStreamCachingStrategy())); } - if (route.isMessageSize()) { - addAdvice(new MessageSizeAdvice(camelContext.getMessageSizeStrategy())); - } - if (route.getDelayer() != null && route.getDelayer() > 0) { addAdvice(new DelayerAdvice(route.getDelayer())); } diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultMessageSizeStrategy.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultMessageSizeStrategy.java index cf06d9929556..4d2240960ade 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultMessageSizeStrategy.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultMessageSizeStrategy.java @@ -26,8 +26,8 @@ import org.apache.camel.CamelContext; import org.apache.camel.CamelContextAware; import org.apache.camel.Exchange; import org.apache.camel.Message; -import org.apache.camel.WrappedFile; import org.apache.camel.StreamCache; +import org.apache.camel.WrappedFile; import org.apache.camel.spi.MessageSizeStrategy; import org.apache.camel.support.service.ServiceSupport; diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRuntimeEndpointRegistry.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRuntimeEndpointRegistry.java index 88626bbd199a..3e5177a2f7b6 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRuntimeEndpointRegistry.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRuntimeEndpointRegistry.java @@ -26,6 +26,8 @@ import java.util.Set; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; +import org.apache.camel.ExchangePropertyKey; +import org.apache.camel.Message; import org.apache.camel.spi.CamelEvent; import org.apache.camel.spi.CamelEvent.ExchangeCompletedEvent; import org.apache.camel.spi.CamelEvent.ExchangeCreatedEvent; @@ -34,8 +36,10 @@ import org.apache.camel.spi.CamelEvent.ExchangeSendingEvent; import org.apache.camel.spi.CamelEvent.RouteAddedEvent; import org.apache.camel.spi.CamelEvent.RouteRemovedEvent; import org.apache.camel.spi.EndpointUtilizationStatistics; +import org.apache.camel.spi.MessageSizeStrategy; import org.apache.camel.spi.RuntimeEndpointRegistry; import org.apache.camel.support.DefaultEndpointUtilizationStatistics; +import org.apache.camel.support.EndpointSizeStatistics; import org.apache.camel.support.EventNotifierSupport; import org.apache.camel.support.ExchangeHelper; import org.apache.camel.support.LRUCacheFactory; @@ -54,8 +58,11 @@ public class DefaultRuntimeEndpointRegistry extends EventNotifierSupport impleme private int limit = 1000; private boolean enabled = true; private volatile boolean extended; + private volatile boolean messageSizeEnabled; private EndpointUtilizationStatistics inputUtilization; private EndpointUtilizationStatistics outputUtilization; + private EndpointSizeStatistics inputSizeStats; + private EndpointSizeStatistics outputSizeStats; @Override public boolean isEnabled() { @@ -106,7 +113,8 @@ public class DefaultRuntimeEndpointRegistry extends EventNotifierSupport impleme String routeId = entry.getKey(); for (String uri : entry.getValue()) { Long hits = getHits(routeId, uri, inputUtilization); - answer.add(new EndpointRuntimeStatistics(uri, routeId, "in", hits)); + EndpointSizeStatistics.SizeStats sizeStats = getSizeStats(routeId, uri, inputSizeStats); + answer.add(new EndpointRuntimeStatistics(uri, routeId, "in", hits, sizeStats)); } } @@ -115,7 +123,8 @@ public class DefaultRuntimeEndpointRegistry extends EventNotifierSupport impleme String routeId = entry.getKey(); for (String uri : entry.getValue().keySet()) { Long hits = getHits(routeId, uri, outputUtilization); - answer.add(new EndpointRuntimeStatistics(uri, routeId, "out", hits)); + EndpointSizeStatistics.SizeStats sizeStats = getSizeStats(routeId, uri, outputSizeStats); + answer.add(new EndpointRuntimeStatistics(uri, routeId, "out", hits, sizeStats)); } } @@ -162,6 +171,12 @@ public class DefaultRuntimeEndpointRegistry extends EventNotifierSupport impleme if (outputUtilization != null) { outputUtilization.clear(); } + if (inputSizeStats != null) { + inputSizeStats.clear(); + } + if (outputSizeStats != null) { + outputSizeStats.clear(); + } } @Override @@ -189,6 +204,11 @@ public class DefaultRuntimeEndpointRegistry extends EventNotifierSupport impleme inputUtilization = new DefaultEndpointUtilizationStatistics(limit); outputUtilization = new DefaultEndpointUtilizationStatistics(limit); } + messageSizeEnabled = extended && getCamelContext().isMessageSize() != null && getCamelContext().isMessageSize(); + if (messageSizeEnabled) { + inputSizeStats = new EndpointSizeStatistics(limit); + outputSizeStats = new EndpointSizeStatistics(limit); + } if (extended) { LOG.debug( "Runtime endpoint registry is in extended mode gathering usage statistics of all incoming and outgoing endpoints (cache limit: {})", @@ -246,6 +266,9 @@ public class DefaultRuntimeEndpointRegistry extends EventNotifierSupport impleme String key = asUtilizationKey(routeId, uri); if (key != null) { inputUtilization.remove(key); + if (messageSizeEnabled) { + inputSizeStats.remove(key); + } } if (rse.getRoute().getConsumer() != null) { String consumerUri = rse.getRoute().getConsumer().getEndpoint().getEndpointUri(); @@ -253,24 +276,38 @@ public class DefaultRuntimeEndpointRegistry extends EventNotifierSupport impleme String consumerKey = asUtilizationKey(routeId, consumerUri); if (consumerKey != null) { inputUtilization.remove(consumerKey); + if (messageSizeEnabled) { + inputSizeStats.remove(consumerKey); + } } } } } } else if (extended && event instanceof ExchangeCreatedEvent ece) { // we only capture details in extended mode - Endpoint endpoint = ece.getExchange().getFromEndpoint(); + Exchange exchange = ece.getExchange(); + Endpoint endpoint = exchange.getFromEndpoint(); if (endpoint != null) { - String routeId = ece.getExchange().getFromRouteId(); + String routeId = exchange.getFromRouteId(); String uri = endpoint.getEndpointUri(); String key = asUtilizationKey(routeId, uri); if (key != null) { inputUtilization.onHit(key); + if (messageSizeEnabled) { + Message message = exchange.getIn(); + MessageSizeStrategy strategy = getCamelContext().getMessageSizeStrategy(); + long bodySize = strategy.computeBodySize(message); + long headersSize = strategy.computeHeadersSize(message); + inputSizeStats.onHit(key, bodySize, headersSize); + exchange.setProperty(ExchangePropertyKey.MESSAGE_BODY_SIZE, bodySize); + exchange.setProperty(ExchangePropertyKey.MESSAGE_HEADERS_SIZE, headersSize); + } } } } else if (event instanceof ExchangeSendingEvent ese) { Endpoint endpoint = ese.getEndpoint(); - String routeId = ExchangeHelper.getRouteId(ese.getExchange()); + Exchange exchange = ese.getExchange(); + String routeId = ExchangeHelper.getRouteId(exchange); String uri = endpoint.getEndpointUri(); Map<String, String> uris = outputs.get(routeId); @@ -281,6 +318,13 @@ public class DefaultRuntimeEndpointRegistry extends EventNotifierSupport impleme String key = asUtilizationKey(routeId, uri); if (key != null) { outputUtilization.onHit(key); + if (messageSizeEnabled) { + Message message = exchange.getIn(); + MessageSizeStrategy strategy = getCamelContext().getMessageSizeStrategy(); + long bodySize = strategy.computeBodySize(message); + long headersSize = strategy.computeHeadersSize(message); + outputSizeStats.onHit(key, bodySize, headersSize); + } } } } else if (event instanceof ExchangeCompletedEvent || event instanceof ExchangeFailedEvent) { @@ -301,6 +345,13 @@ public class DefaultRuntimeEndpointRegistry extends EventNotifierSupport impleme String key = asUtilizationKey(routeId, uri); if (key != null) { outputUtilization.onHit(key); + if (messageSizeEnabled) { + Message message = exchange.getMessage(); + MessageSizeStrategy strategy = getCamelContext().getMessageSizeStrategy(); + long bodySize = strategy.computeBodySize(message); + long headersSize = strategy.computeHeadersSize(message); + outputSizeStats.onHit(key, bodySize, headersSize); + } } } } @@ -323,6 +374,16 @@ public class DefaultRuntimeEndpointRegistry extends EventNotifierSupport impleme || event instanceof RouteRemovedEvent; } + private EndpointSizeStatistics.SizeStats getSizeStats(String routeId, String uri, EndpointSizeStatistics statistics) { + if (messageSizeEnabled && statistics != null) { + String key = asUtilizationKey(routeId, uri); + if (key != null) { + return statistics.getStats(key); + } + } + return null; + } + private static String asUtilizationKey(String routeId, String uri) { if (routeId == null || uri == null) { return null; @@ -337,12 +398,15 @@ public class DefaultRuntimeEndpointRegistry extends EventNotifierSupport impleme private final String routeId; private final String direction; private final long hits; + private final EndpointSizeStatistics.SizeStats sizeStats; - private EndpointRuntimeStatistics(String uri, String routeId, String direction, long hits) { + private EndpointRuntimeStatistics(String uri, String routeId, String direction, long hits, + EndpointSizeStatistics.SizeStats sizeStats) { this.uri = uri; this.routeId = routeId; this.direction = direction; this.hits = hits; + this.sizeStats = sizeStats; } @Override @@ -364,5 +428,35 @@ public class DefaultRuntimeEndpointRegistry extends EventNotifierSupport impleme public long getHits() { return hits; } + + @Override + public long getMinBodySize() { + return sizeStats != null ? sizeStats.getMinBodySize() : -1; + } + + @Override + public long getMaxBodySize() { + return sizeStats != null ? sizeStats.getMaxBodySize() : -1; + } + + @Override + public long getMeanBodySize() { + return sizeStats != null ? sizeStats.getMeanBodySize() : -1; + } + + @Override + public long getMinHeadersSize() { + return sizeStats != null ? sizeStats.getMinHeadersSize() : -1; + } + + @Override + public long getMaxHeadersSize() { + return sizeStats != null ? sizeStats.getMaxHeadersSize() : -1; + } + + @Override + public long getMeanHeadersSize() { + return sizeStats != null ? sizeStats.getMeanHeadersSize() : -1; + } } } diff --git a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java index 00445f80aa20..74095faf878a 100644 --- a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java +++ b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java @@ -108,11 +108,19 @@ public final class CamelOpenMBeanTypes { public static CompositeType listRuntimeEndpointsCompositeType() throws OpenDataException { return new CompositeType( "endpoints", "Endpoints", - new String[] { "index", "url", "routeId", "direction", "static", "dynamic", "hits" }, - new String[] { "Index", "Url", "Route Id", "Direction", "Static", "Dynamic", "Hits" }, + new String[] { + "index", "url", "routeId", "direction", "static", "dynamic", "hits", + "minBodySize", "maxBodySize", "meanBodySize", + "minHeadersSize", "maxHeadersSize", "meanHeadersSize" }, + new String[] { + "Index", "Url", "Route Id", "Direction", "Static", "Dynamic", "Hits", + "Min Body Size", "Max Body Size", "Mean Body Size", + "Min Headers Size", "Max Headers Size", "Mean Headers Size" }, new OpenType[] { SimpleType.INTEGER, SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.BOOLEAN, - SimpleType.BOOLEAN, SimpleType.LONG }); + SimpleType.BOOLEAN, SimpleType.LONG, + SimpleType.LONG, SimpleType.LONG, SimpleType.LONG, + SimpleType.LONG, SimpleType.LONG, SimpleType.LONG }); } public static TabularType listComponentsTabularType() throws OpenDataException { diff --git a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedPerformanceCounterMBean.java b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedPerformanceCounterMBean.java index a76fcad296b4..b7b1f83d72ad 100644 --- a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedPerformanceCounterMBean.java +++ b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedPerformanceCounterMBean.java @@ -90,24 +90,6 @@ public interface ManagedPerformanceCounterMBean extends ManagedCounterMBean { @ManagedAttribute(description = "First Exchange Failed ExchangeId") String getFirstExchangeFailureExchangeId(); - @ManagedAttribute(description = "Min Body Size [bytes]") - long getMinBodySize(); - - @ManagedAttribute(description = "Max Body Size [bytes]") - long getMaxBodySize(); - - @ManagedAttribute(description = "Mean Body Size [bytes]") - long getMeanBodySize(); - - @ManagedAttribute(description = "Min Headers Size [bytes]") - long getMinHeadersSize(); - - @ManagedAttribute(description = "Max Headers Size [bytes]") - long getMaxHeadersSize(); - - @ManagedAttribute(description = "Mean Headers Size [bytes]") - long getMeanHeadersSize(); - @ManagedAttribute(description = "Statistics enabled") boolean isStatisticsEnabled(); diff --git a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedPerformanceCounter.java b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedPerformanceCounter.java index 262b5404a73c..f60378d8a5f6 100644 --- a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedPerformanceCounter.java +++ b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedPerformanceCounter.java @@ -21,7 +21,6 @@ import java.util.Date; import java.util.Map; import org.apache.camel.Exchange; -import org.apache.camel.ExchangePropertyKey; import org.apache.camel.api.management.ManagedResource; import org.apache.camel.api.management.mbean.ManagedPerformanceCounterMBean; import org.apache.camel.management.PerformanceCounter; @@ -56,16 +55,6 @@ public abstract class ManagedPerformanceCounter extends ManagedCounter private String lastExchangeCompletedExchangeId; private Statistic lastExchangeFailureTimestamp; private String lastExchangeFailureExchangeId; - private Statistic minBodySize; - private Statistic maxBodySize; - private Statistic totalBodySize; - private Statistic meanBodySize; - private Statistic minHeadersSize; - private Statistic maxHeadersSize; - private Statistic totalHeadersSize; - private Statistic meanHeadersSize; - private long exchangesWithBodySize; - private long exchangesWithHeadersSize; private boolean statisticsEnabled = true; @Override @@ -91,15 +80,6 @@ public abstract class ManagedPerformanceCounter extends ManagedCounter this.lastExchangeCreatedTimestamp = new StatisticValue(); this.lastExchangeCompletedTimestamp = new StatisticValue(); this.lastExchangeFailureTimestamp = new StatisticValue(); - - this.minBodySize = new StatisticMinimum(); - this.maxBodySize = new StatisticMaximum(); - this.totalBodySize = new StatisticCounter(); - this.meanBodySize = new StatisticValue(); - this.minHeadersSize = new StatisticMinimum(); - this.maxHeadersSize = new StatisticMaximum(); - this.totalHeadersSize = new StatisticCounter(); - this.meanHeadersSize = new StatisticValue(); } @Override @@ -126,16 +106,6 @@ public abstract class ManagedPerformanceCounter extends ManagedCounter lastExchangeCompletedExchangeId = null; lastExchangeFailureTimestamp.reset(); lastExchangeFailureExchangeId = null; - minBodySize.reset(); - maxBodySize.reset(); - totalBodySize.reset(); - meanBodySize.reset(); - minHeadersSize.reset(); - maxHeadersSize.reset(); - totalHeadersSize.reset(); - meanHeadersSize.reset(); - exchangesWithBodySize = 0; - exchangesWithHeadersSize = 0; } @Override @@ -214,36 +184,6 @@ public abstract class ManagedPerformanceCounter extends ManagedCounter return -1; } - @Override - public long getMinBodySize() { - return minBodySize.getValue(); - } - - @Override - public long getMaxBodySize() { - return maxBodySize.getValue(); - } - - @Override - public long getMeanBodySize() { - return meanBodySize.getValue(); - } - - @Override - public long getMinHeadersSize() { - return minHeadersSize.getValue(); - } - - @Override - public long getMaxHeadersSize() { - return maxHeadersSize.getValue(); - } - - @Override - public long getMeanHeadersSize() { - return meanHeadersSize.getValue(); - } - @Override public Date getLastExchangeCreatedTimestamp() { long value = lastExchangeCreatedTimestamp.getValue(); @@ -350,28 +290,6 @@ public abstract class ManagedPerformanceCounter extends ManagedCounter mean = totalProcessingTime.getValue() / completed; } meanProcessingTime.updateValue(mean); - - // update body/headers size stats (values represent the incoming message, set by MessageSizeAdvice before processing) - Long bodySize = exchange.getProperty(ExchangePropertyKey.MESSAGE_BODY_SIZE, Long.class); - if (bodySize != null && bodySize >= 0) { - minBodySize.updateValue(bodySize); - maxBodySize.updateValue(bodySize); - totalBodySize.updateValue(bodySize); - exchangesWithBodySize++; - if (exchangesWithBodySize > 0) { - meanBodySize.updateValue(totalBodySize.getValue() / exchangesWithBodySize); - } - } - Long headersSize = exchange.getProperty(ExchangePropertyKey.MESSAGE_HEADERS_SIZE, Long.class); - if (headersSize != null && headersSize >= 0) { - minHeadersSize.updateValue(headersSize); - maxHeadersSize.updateValue(headersSize); - totalHeadersSize.updateValue(headersSize); - exchangesWithHeadersSize++; - if (exchangesWithHeadersSize > 0) { - meanHeadersSize.updateValue(totalHeadersSize.getValue() / exchangesWithHeadersSize); - } - } } @Override @@ -416,16 +334,6 @@ public abstract class ManagedPerformanceCounter extends ManagedCounter sb.append(String.format(" deltaProcessingTime=\"%s\"", deltaProcessingTime.getValue())); sb.append(String.format(" meanProcessingTime=\"%s\"", meanProcessingTime.getValue())); sb.append(String.format(" idleSince=\"%s\"", getIdleSince())); - if (minBodySize.isUpdated()) { - sb.append(String.format(" minBodySize=\"%s\"", minBodySize.getValue())); - sb.append(String.format(" maxBodySize=\"%s\"", maxBodySize.getValue())); - sb.append(String.format(" meanBodySize=\"%s\"", meanBodySize.getValue())); - } - if (minHeadersSize.isUpdated()) { - sb.append(String.format(" minHeadersSize=\"%s\"", minHeadersSize.getValue())); - sb.append(String.format(" maxHeadersSize=\"%s\"", maxHeadersSize.getValue())); - sb.append(String.format(" meanHeadersSize=\"%s\"", meanHeadersSize.getValue())); - } if (fullStats) { sb.append(String.format(" startTimestamp=\"%s\"", dateAsString(startTimestamp.getTime()))); @@ -471,16 +379,6 @@ public abstract class ManagedPerformanceCounter extends ManagedCounter jo.put("deltaProcessingTime", deltaProcessingTime.getValue()); jo.put("meanProcessingTime", meanProcessingTime.getValue()); jo.put("idleSince", getIdleSince()); - if (minBodySize.isUpdated()) { - jo.put("minBodySize", minBodySize.getValue()); - jo.put("maxBodySize", maxBodySize.getValue()); - jo.put("meanBodySize", meanBodySize.getValue()); - } - if (minHeadersSize.isUpdated()) { - jo.put("minHeadersSize", minHeadersSize.getValue()); - jo.put("maxHeadersSize", maxHeadersSize.getValue()); - jo.put("meanHeadersSize", meanHeadersSize.getValue()); - } if (fullStats) { jo.put("startTimestamp", startTimestamp.getTime()); jo.put("resetTimestamp", resetTimestamp.getTime()); diff --git a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedRuntimeEndpointRegistry.java b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedRuntimeEndpointRegistry.java index bcb1cce8cce5..68718e39c73f 100644 --- a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedRuntimeEndpointRegistry.java +++ b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedRuntimeEndpointRegistry.java @@ -113,8 +113,15 @@ public class ManagedRuntimeEndpointRegistry extends ManagedService implements Ma long hits = stat.getHits(); CompositeData data = new CompositeDataSupport( - ct, new String[] { "index", "url", "routeId", "direction", "static", "dynamic", "hits" }, - new Object[] { index, url, routeId, direction, isStatic, isDynamic, hits }); + ct, + new String[] { + "index", "url", "routeId", "direction", "static", "dynamic", "hits", + "minBodySize", "maxBodySize", "meanBodySize", + "minHeadersSize", "maxHeadersSize", "meanHeadersSize" }, + new Object[] { + index, url, routeId, direction, isStatic, isDynamic, hits, + stat.getMinBodySize(), stat.getMaxBodySize(), stat.getMeanBodySize(), + stat.getMinHeadersSize(), stat.getMaxHeadersSize(), stat.getMeanHeadersSize() }); answer.put(data); // use a counter as the single index in the TabularData as we do not want a multi-value index diff --git a/core/camel-management/src/test/java/org/apache/camel/management/ManagedMessageSizeTest.java b/core/camel-management/src/test/java/org/apache/camel/management/ManagedMessageSizeTest.java index 47d599c13d6a..0b45ceabf220 100644 --- a/core/camel-management/src/test/java/org/apache/camel/management/ManagedMessageSizeTest.java +++ b/core/camel-management/src/test/java/org/apache/camel/management/ManagedMessageSizeTest.java @@ -17,18 +17,18 @@ package org.apache.camel.management; import java.nio.charset.StandardCharsets; - -import javax.management.MBeanServer; -import javax.management.ObjectName; +import java.util.List; import org.apache.camel.CamelContext; +import org.apache.camel.ManagementStatisticsLevel; import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.spi.RuntimeEndpointRegistry; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.DisabledOnOs; import org.junit.jupiter.api.condition.OS; -import static org.apache.camel.management.DefaultManagementObjectNameStrategy.TYPE_ROUTE; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; @DisabledOnOs(OS.AIX) @@ -38,47 +38,73 @@ public class ManagedMessageSizeTest extends ManagementTestSupport { protected CamelContext createCamelContext() throws Exception { CamelContext context = super.createCamelContext(); context.setMessageSize(true); + context.getManagementStrategy().getManagementAgent().setEndpointRuntimeStatisticsEnabled(true); + context.getManagementStrategy().getManagementAgent().setStatisticsLevel(ManagementStatisticsLevel.Extended); return context; } @Test - public void testMessageSize() throws Exception { + public void testMessageSizeOnEndpoints() throws Exception { getMockEndpoint("mock:result").expectedMessageCount(2); String body1 = "Hello World"; - template.sendBodyAndHeader("direct:start", body1, "myHeader", "myValue"); + template.sendBodyAndHeader("seda:start", body1, "myHeader", "myValue"); String body2 = "Bye World"; - template.sendBodyAndHeader("direct:start", body2, "myHeader", "myValue"); + template.sendBodyAndHeader("seda:start", body2, "myHeader", "myValue"); assertMockEndpointsSatisfied(); - MBeanServer mbeanServer = getMBeanServer(); - ObjectName on = getCamelObjectName(TYPE_ROUTE, "route1"); - - long minBody = (Long) mbeanServer.getAttribute(on, "MinBodySize"); - long maxBody = (Long) mbeanServer.getAttribute(on, "MaxBodySize"); - long meanBody = (Long) mbeanServer.getAttribute(on, "MeanBodySize"); - long body1Size = body1.getBytes(StandardCharsets.UTF_8).length; long body2Size = body2.getBytes(StandardCharsets.UTF_8).length; - long expectedMin = Math.min(body1Size, body2Size); - long expectedMax = Math.max(body1Size, body2Size); - long expectedMean = (body1Size + body2Size) / 2; - - assertEquals(expectedMin, minBody); - assertEquals(expectedMax, maxBody); - assertEquals(expectedMean, meanBody); - - long minHeaders = (Long) mbeanServer.getAttribute(on, "MinHeadersSize"); - long maxHeaders = (Long) mbeanServer.getAttribute(on, "MaxHeadersSize"); - long meanHeaders = (Long) mbeanServer.getAttribute(on, "MeanHeadersSize"); - - assertTrue(minHeaders > 0, "MinHeadersSize should be positive"); - assertTrue(maxHeaders > 0, "MaxHeadersSize should be positive"); - assertTrue(meanHeaders > 0, "MeanHeadersSize should be positive"); - // both messages have the same header, so min == max - assertEquals(minHeaders, maxHeaders); + + List<RuntimeEndpointRegistry.Statistic> stats = context.getRuntimeEndpointRegistry().getEndpointStatistics(); + + // find the input endpoint (seda:start, direction "in") + RuntimeEndpointRegistry.Statistic inputStat = stats.stream() + .filter(s -> "in".equals(s.getDirection()) && s.getUri().contains("seda://start")) + .findFirst() + .orElse(null); + + assertNotNull(inputStat, "Should find input endpoint statistic"); + assertEquals(2, inputStat.getHits()); + assertEquals(Math.min(body1Size, body2Size), inputStat.getMinBodySize()); + assertEquals(Math.max(body1Size, body2Size), inputStat.getMaxBodySize()); + assertEquals((body1Size + body2Size) / 2, inputStat.getMeanBodySize()); + assertTrue(inputStat.getMinHeadersSize() > 0, "MinHeadersSize should be positive"); + assertTrue(inputStat.getMaxHeadersSize() > 0, "MaxHeadersSize should be positive"); + assertTrue(inputStat.getMeanHeadersSize() > 0, "MeanHeadersSize should be positive"); + + // find the output endpoint (mock:result, direction "out") + RuntimeEndpointRegistry.Statistic outputStat = stats.stream() + .filter(s -> "out".equals(s.getDirection()) && s.getUri().contains("mock://result")) + .findFirst() + .orElse(null); + + assertNotNull(outputStat, "Should find output endpoint statistic"); + assertEquals(2, outputStat.getHits()); + assertEquals(Math.min(body1Size, body2Size), outputStat.getMinBodySize()); + assertEquals(Math.max(body1Size, body2Size), outputStat.getMaxBodySize()); + } + + @Test + public void testExchangePropertiesSet() throws Exception { + getMockEndpoint("mock:result").expectedMessageCount(1); + + String body = "Hello World"; + template.sendBody("seda:start", body); + + assertMockEndpointsSatisfied(); + + long expectedBodySize = body.getBytes(StandardCharsets.UTF_8).length; + Long actualBodySize = getMockEndpoint("mock:result").getExchanges().get(0) + .getProperty("CamelMessageBodySize", Long.class); + assertEquals(expectedBodySize, actualBodySize); + + Long actualHeadersSize = getMockEndpoint("mock:result").getExchanges().get(0) + .getProperty("CamelMessageHeadersSize", Long.class); + assertNotNull(actualHeadersSize, "Headers size should be set"); + assertTrue(actualHeadersSize >= 0, "Headers size should be non-negative"); } @Override @@ -86,7 +112,7 @@ public class ManagedMessageSizeTest extends ManagementTestSupport { return new RouteBuilder() { @Override public void configure() { - from("direct:start").routeId("route1") + from("seda:start").routeId("route1") .to("mock:result"); } }; diff --git a/core/camel-support/src/main/java/org/apache/camel/support/EndpointSizeStatistics.java b/core/camel-support/src/main/java/org/apache/camel/support/EndpointSizeStatistics.java new file mode 100644 index 000000000000..211e4aa4c643 --- /dev/null +++ b/core/camel-support/src/main/java/org/apache/camel/support/EndpointSizeStatistics.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.support; + +import java.util.Map; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Tracks message body and headers size statistics per endpoint key. + * + * @since 4.21 + */ +public class EndpointSizeStatistics { + + private final Map<String, SizeStats> map; + private final Lock lock = new ReentrantLock(); + + public EndpointSizeStatistics(int maxCapacity) { + this.map = LRUCacheFactory.newLRUCache(16, maxCapacity, false); + } + + public void onHit(String key, long bodySize, long headersSize) { + lock.lock(); + try { + map.compute(key, (k, current) -> { + if (current == null) { + current = new SizeStats(); + } + if (bodySize >= 0) { + current.bodyCount++; + current.totalBodySize += bodySize; + if (bodySize < current.minBodySize || current.bodyCount == 1) { + current.minBodySize = bodySize; + } + if (bodySize > current.maxBodySize) { + current.maxBodySize = bodySize; + } + } + if (headersSize >= 0) { + current.headersCount++; + current.totalHeadersSize += headersSize; + if (headersSize < current.minHeadersSize || current.headersCount == 1) { + current.minHeadersSize = headersSize; + } + if (headersSize > current.maxHeadersSize) { + current.maxHeadersSize = headersSize; + } + } + return current; + }); + } finally { + lock.unlock(); + } + } + + public SizeStats getStats(String key) { + lock.lock(); + try { + return map.get(key); + } finally { + lock.unlock(); + } + } + + public void remove(String key) { + lock.lock(); + try { + map.remove(key); + } finally { + lock.unlock(); + } + } + + public void clear() { + lock.lock(); + try { + map.clear(); + } finally { + lock.unlock(); + } + } + + public static class SizeStats { + long minBodySize; + long maxBodySize; + long totalBodySize; + long bodyCount; + long minHeadersSize; + long maxHeadersSize; + long totalHeadersSize; + long headersCount; + + public long getMinBodySize() { + return bodyCount > 0 ? minBodySize : -1; + } + + public long getMaxBodySize() { + return bodyCount > 0 ? maxBodySize : -1; + } + + public long getMeanBodySize() { + return bodyCount > 0 ? totalBodySize / bodyCount : -1; + } + + public long getMinHeadersSize() { + return headersCount > 0 ? minHeadersSize : -1; + } + + public long getMaxHeadersSize() { + return headersCount > 0 ? maxHeadersSize : -1; + } + + public long getMeanHeadersSize() { + return headersCount > 0 ? totalHeadersSize / headersCount : -1; + } + } +} diff --git a/docs/user-manual/modules/ROOT/pages/jmx.adoc b/docs/user-manual/modules/ROOT/pages/jmx.adoc index 4286af4fd78d..b175e79ab374 100644 --- a/docs/user-manual/modules/ROOT/pages/jmx.adoc +++ b/docs/user-manual/modules/ROOT/pages/jmx.adoc @@ -326,14 +326,16 @@ camel.main.loadStatisticsEnabled = true === Message size statistics -It is possible to include message size statistics for Route MBeans. -These track the min, max, and mean sizes of incoming message bodies and headers. +It is possible to include message size statistics per endpoint. +These track the min, max, and mean sizes of message bodies and headers for both incoming (IN) +and outgoing (OUT) endpoints. You can enable message size statistics from `application.properties`: [source,properties] ---- camel.main.messageSizeEnabled = true +camel.main.jmxManagementStatisticsLevel = Extended ---- See xref:message-size.adoc[Message Size] for more details. diff --git a/docs/user-manual/modules/ROOT/pages/message-size.adoc b/docs/user-manual/modules/ROOT/pages/message-size.adoc index 93e10c2eaaed..e21f67fef82d 100644 --- a/docs/user-manual/modules/ROOT/pages/message-size.adoc +++ b/docs/user-manual/modules/ROOT/pages/message-size.adoc @@ -1,25 +1,34 @@ = Message Size -Camel can track the size of incoming message payloads (body and headers) during routing. +Camel can track the size of message payloads (body and headers) at the endpoint level. This is useful for observability — understanding how large messages are, detecting payload bloat, -and monitoring size trends over time. +and monitoring size trends over time. Sizes are tracked per endpoint for both incoming (IN) +and outgoing (OUT) directions. Message size tracking is *disabled by default* and must be explicitly enabled. +It also requires extended management statistics level. == How it works -When enabled, a `MessageSizeAdvice` runs in the routing pipeline for each processing step. -It runs _after_ xref:stream-caching.adoc[Stream Caching] so that streamed payloads are safely cached -before their size is computed. +When enabled, Camel computes message body and headers sizes using the `RuntimeEndpointRegistry` +event mechanism: -The advice computes the body size and the total headers size of the incoming message using a -pluggable `MessageSizeStrategy`. The computed sizes are stored as exchange properties -(`CamelMessageBodySize` and `CamelMessageHeadersSize`) and are available for the remainder of the route. +* *IN direction* — sizes are computed when an exchange is created by a consumer endpoint + (e.g., an HTTP request arrives, a file is consumed, a JMS message is received). + The computed sizes are also stored as exchange properties + (`CamelMessageBodySize` and `CamelMessageHeadersSize`) for use during routing. +* *OUT direction* — sizes are computed when a message is sent to a producer endpoint + (e.g., sending to a Kafka topic, writing to a file, calling an HTTP service). -Size statistics (min, max, mean) are collected per route and exposed via JMX. +Size statistics (min, max, mean) are collected per endpoint and exposed via JMX as part +of the runtime endpoint statistics. + +A pluggable `MessageSizeStrategy` computes the actual sizes. == Enabling Message Size +Message size tracking requires both the `messageSizeEnabled` option and `Extended` statistics level: + [tabs] ==== @@ -28,6 +37,7 @@ Application Properties:: [source,properties] ---- camel.main.messageSizeEnabled = true +camel.main.jmxManagementStatisticsLevel = Extended ---- Java:: @@ -36,10 +46,14 @@ Java:: ---- CamelContext context = ... context.setMessageSize(true); +context.getManagementStrategy().getManagementAgent() + .setStatisticsLevel(ManagementStatisticsLevel.Extended); ---- ==== +TIP: Message size tracking is automatically enabled when running with the `dev` profile. + TIP: You can run Camel JBang: `camel doc main --filter=messageSize` from CLI to see message size options. == Default size computation @@ -74,35 +88,39 @@ The total headers size is computed by iterating all message headers and summing: Returns 0 if there are no headers. -== JMX statistics +== Endpoint statistics via JMX -When JMX is enabled, the following attributes are available on route MBeans -(under `org.apache.camel` domain, type `routes`): +When JMX is enabled with `Extended` statistics level, size statistics are available +in the runtime endpoint registry MBean alongside the existing endpoint hit counts. + +The statistics are part of the `endpointStatistics` tabular data on the +`RuntimeEndpointRegistry` MBean (under `org.apache.camel` domain, type `services`): [width="100%",cols="1m,3",options="header"] |=== -| Attribute | Description -| MinBodySize | Smallest body size observed (bytes) -| MaxBodySize | Largest body size observed (bytes) -| MeanBodySize | Average body size observed (bytes) -| MinHeadersSize | Smallest total headers size observed (bytes) -| MaxHeadersSize | Largest total headers size observed (bytes) -| MeanHeadersSize | Average total headers size observed (bytes) +| Column | Description +| url | Endpoint URI +| routeId | Associated route ID +| direction | `in` for consumer endpoints, `out` for producer endpoints +| hits | Number of messages processed +| minBodySize | Smallest message body size observed (bytes), -1 if not tracked +| maxBodySize | Largest message body size observed (bytes), -1 if not tracked +| meanBodySize | Average message body size observed (bytes), -1 if not tracked +| minHeadersSize | Smallest total headers size observed (bytes), -1 if not tracked +| maxHeadersSize | Largest total headers size observed (bytes), -1 if not tracked +| meanHeadersSize | Average total headers size observed (bytes), -1 if not tracked |=== -These statistics are also included in `dumpStatsAsXml` and `dumpStatsAsJson` output -when message size tracking is active. - -The statistics are reset when the route statistics are reset. +The statistics are reset when the endpoint registry statistics are reset. == Exchange properties -The computed sizes are available as exchange properties during routing. +For the IN direction, the computed sizes are available as exchange properties during routing. You can use them for logging, content-based routing, or custom processing: [source,java] ---- -from("direct:start") +from("seda:start") .log("Body size: ${exchangeProperty.CamelMessageBodySize} bytes") .choice() .when(exchangeProperty("CamelMessageBodySize").isGreaterThan(1048576)) @@ -118,6 +136,9 @@ from("direct:start") | CamelMessageHeadersSize | Total headers size in bytes |=== +NOTE: Exchange properties are set when the consumer endpoint creates the exchange. +They reflect the size of the incoming message as received by the route. + == Custom strategy You can provide a custom `MessageSizeStrategy` implementation for specialized sizing logic
