This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new a83cd1097581 CAMEL-23617: Add option to capture message payload size
for observation
a83cd1097581 is described below
commit a83cd1097581158ac13ae6e77a4e077055885dbf
Author: Claus Ibsen <[email protected]>
AuthorDate: Tue May 26 21:07:02 2026 +0200
CAMEL-23617: Add option to capture message payload size for observation
New MessageSizeStrategy SPI with default implementation that computes
incoming message body and headers sizes during routing. Sizes are tracked
per endpoint via RuntimeEndpointRegistry (min/max/mean for body and headers,
IN and OUT directions). Exposed via JMX, dev console, and CLI
(camel get endpoint). Auto-enabled in dev profile with Extended statistics.
Includes bundled message-size example and documentation.
Closes #23532
---
.../main/camel-main-configuration-metadata.json | 1 +
.../org/apache/camel/ExchangeConstantProvider.java | 4 +-
.../main/java/org/apache/camel/CamelContext.java | 15 ++
.../src/main/java/org/apache/camel/Exchange.java | 2 +
.../java/org/apache/camel/ExchangePropertyKey.java | 6 +
.../org/apache/camel/RuntimeConfiguration.java | 16 ++
.../org/apache/camel/spi/MessageSizeStrategy.java | 58 ++++++++
.../apache/camel/spi/RuntimeEndpointRegistry.java | 54 +++++++
.../camel/impl/engine/AbstractCamelContext.java | 32 ++++
.../impl/engine/DefaultCamelContextExtension.java | 21 +++
.../impl/engine/DefaultMessageSizeStrategy.java | 125 ++++++++++++++++
.../org/apache/camel/impl/engine/DefaultRoute.java | 15 ++
.../engine/DefaultRuntimeEndpointRegistry.java | 119 ++++++++++++++-
.../camel/impl/engine/SimpleCamelContext.java | 6 +
.../camel/impl/console/EndpointDevConsole.java | 18 +++
.../apache/camel/impl/CamelContextConfigurer.java | 12 ++
.../MainConfigurationPropertiesConfigurer.java | 7 +
.../camel-main-configuration-metadata.json | 1 +
core/camel-main/src/main/docs/main.adoc | 3 +-
.../camel/main/DefaultConfigurationConfigurer.java | 3 +
.../camel/main/DefaultConfigurationProperties.java | 26 ++++
.../org/apache/camel/main/ProfileConfigurer.java | 1 +
.../api/management/mbean/CamelOpenMBeanTypes.java | 14 +-
.../mbean/ManagedRuntimeEndpointRegistry.java | 11 +-
.../ManagedMessageSizeStreamCachingTest.java | 69 +++++++++
.../camel/management/ManagedMessageSizeTest.java | 120 +++++++++++++++
.../camel/support/EndpointSizeStatistics.java | 132 +++++++++++++++++
docs/user-manual/modules/ROOT/nav.adoc | 1 +
.../jbang-commands/camel-jbang-get-endpoint.adoc | 3 +-
docs/user-manual/modules/ROOT/pages/jmx.adoc | 16 ++
.../modules/ROOT/pages/message-size.adoc | 161 +++++++++++++++++++++
.../META-INF/camel-jbang-commands-metadata.json | 2 +-
.../jbang/core/commands/process/ListEndpoint.java | 73 +++++++++-
.../examples/camel-jbang-example-catalog.json | 18 +++
.../main/resources/examples/message-size/README.md | 57 ++++++++
.../examples/message-size/message-size.camel.yaml | 102 +++++++++++++
36 files changed, 1305 insertions(+), 19 deletions(-)
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/main/camel-main-configuration-metadata.json
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/main/camel-main-configuration-metadata.json
index 9e39a3ecdeff..bdf518fd508a 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/main/camel-main-configuration-metadata.json
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/main/camel-main-configuration-metadata.json
@@ -104,6 +104,7 @@
{ "name": "camel.main.mainListeners", "required": false, "description":
"Sets main listener objects that will be used for MainListener that makes it
possible to do custom logic during starting and stopping camel-main.",
"sourceType": "org.apache.camel.main.MainConfigurationProperties", "type":
"array", "javaType": "java.util.List", "secret": false },
{ "name": "camel.main.mdcLoggingKeysPattern", "required": false,
"description": "Sets the pattern used for determine which custom MDC keys to
propagate during message routing when the routing engine continues routing
asynchronously for the given message. Setting this pattern to will propagate
all custom keys. Or setting the pattern to foo,bar will propagate any keys
starting with either foo or bar. Notice that a set of standard Camel MDC keys
are always propagated which starts with c [...]
{ "name": "camel.main.messageHistory", "required": false, "description":
"Sets whether message history is enabled or not. Default is false.",
"sourceType": "org.apache.camel.main.DefaultConfigurationProperties", "type":
"boolean", "javaType": "boolean", "defaultValue": false, "secret": false },
+ { "name": "camel.main.messageSizeEnabled", "required": false,
"description": "Sets whether message size observation is enabled (default is
false). When enabled, Camel will compute the size of message body and headers
(in bytes) per endpoint (for both IN and OUT directions) and make this
available via JMX MBeans (min\/max\/mean body size and headers size).",
"sourceType": "org.apache.camel.main.DefaultConfigurationProperties", "type":
"boolean", "javaType": "boolean", "defaultValue": [...]
{ "name": "camel.main.modeline", "required": false, "description":
"Whether to support JBang style \/\/DEPS to specify additional dependencies
when running Camel JBang", "sourceType":
"org.apache.camel.main.DefaultConfigurationProperties", "type": "boolean",
"javaType": "boolean", "defaultValue": false, "secret": false },
{ "name": "camel.main.name", "required": false, "description": "Sets the
name of the CamelContext.", "sourceType":
"org.apache.camel.main.DefaultConfigurationProperties", "type": "string",
"javaType": "java.lang.String", "secret": false },
{ "name": "camel.main.producerTemplateCacheSize", "required": false,
"description": "Producer template endpoints cache size.", "sourceType":
"org.apache.camel.main.DefaultConfigurationProperties", "type": "integer",
"javaType": "int", "defaultValue": 1000, "secret": false },
diff --git
a/core/camel-api/src/generated/java/org/apache/camel/ExchangeConstantProvider.java
b/core/camel-api/src/generated/java/org/apache/camel/ExchangeConstantProvider.java
index b9f4b37dd355..f1dcc4d60bd6 100644
---
a/core/camel-api/src/generated/java/org/apache/camel/ExchangeConstantProvider.java
+++
b/core/camel-api/src/generated/java/org/apache/camel/ExchangeConstantProvider.java
@@ -15,7 +15,7 @@ public class ExchangeConstantProvider {
private static final Map<String, String> MAP;
static {
- Map<String, String> map = new HashMap<>(149);
+ Map<String, String> map = new HashMap<>(151);
map.put("AGGREGATED_COLLECTION_GUARD",
"CamelAggregatedCollectionGuard");
map.put("AGGREGATED_COMPLETED_BY", "CamelAggregatedCompletedBy");
map.put("AGGREGATED_CORRELATION_KEY", "CamelAggregatedCorrelationKey");
@@ -113,6 +113,8 @@ public class ExchangeConstantProvider {
map.put("MAXIMUM_SIMPLE_CACHE_SIZE", "CamelMaximumSimpleCacheSize");
map.put("MAXIMUM_TRANSFORMER_CACHE_SIZE",
"CamelMaximumTransformerCacheSize");
map.put("MAXIMUM_VALIDATOR_CACHE_SIZE",
"CamelMaximumValidatorCacheSize");
+ map.put("MESSAGE_BODY_SIZE", "CamelMessageBodySize");
+ map.put("MESSAGE_HEADERS_SIZE", "CamelMessageHeadersSize");
map.put("MESSAGE_HISTORY", "CamelMessageHistory");
map.put("MESSAGE_HISTORY_HEADER_FORMAT",
"CamelMessageHistoryHeaderFormat");
map.put("MESSAGE_HISTORY_OUTPUT_FORMAT",
"CamelMessageHistoryOutputFormat");
diff --git a/core/camel-api/src/main/java/org/apache/camel/CamelContext.java
b/core/camel-api/src/main/java/org/apache/camel/CamelContext.java
index b032c92853b3..1411af29db56 100644
--- a/core/camel-api/src/main/java/org/apache/camel/CamelContext.java
+++ b/core/camel-api/src/main/java/org/apache/camel/CamelContext.java
@@ -39,6 +39,7 @@ import org.apache.camel.spi.LifecycleStrategy;
import org.apache.camel.spi.ManagementNameStrategy;
import org.apache.camel.spi.ManagementStrategy;
import org.apache.camel.spi.MessageHistoryFactory;
+import org.apache.camel.spi.MessageSizeStrategy;
import org.apache.camel.spi.PropertiesComponent;
import org.apache.camel.spi.Registry;
import org.apache.camel.spi.RestConfiguration;
@@ -1771,6 +1772,20 @@ public interface CamelContext extends
CamelContextLifecycle, RuntimeConfiguratio
*/
void setStreamCachingStrategy(StreamCachingStrategy streamCachingStrategy);
+ /**
+ * Gets the {@link MessageSizeStrategy} to use.
+ *
+ * @since 4.21
+ */
+ MessageSizeStrategy getMessageSizeStrategy();
+
+ /**
+ * Sets a custom {@link MessageSizeStrategy} to use.
+ *
+ * @since 4.21
+ */
+ void setMessageSizeStrategy(MessageSizeStrategy messageSizeStrategy);
+
/**
* Gets the {@link org.apache.camel.spi.RuntimeEndpointRegistry} to use,
or <tt>null</tt> if none is in use.
*/
diff --git a/core/camel-api/src/main/java/org/apache/camel/Exchange.java
b/core/camel-api/src/main/java/org/apache/camel/Exchange.java
index 595cecdfd8e4..4d1ab3e3e1d3 100644
--- a/core/camel-api/src/main/java/org/apache/camel/Exchange.java
+++ b/core/camel-api/src/main/java/org/apache/camel/Exchange.java
@@ -223,6 +223,8 @@ public interface Exchange extends VariableAware {
String MESSAGE_HISTORY_HEADER_FORMAT = "CamelMessageHistoryHeaderFormat";
String MESSAGE_HISTORY_OUTPUT_FORMAT = "CamelMessageHistoryOutputFormat";
String MESSAGE_TIMESTAMP = "CamelMessageTimestamp";
+ String MESSAGE_BODY_SIZE = "CamelMessageBodySize";
+ String MESSAGE_HEADERS_SIZE = "CamelMessageHeadersSize";
@Metadata(label = "multicast",
description = "An index counter that increases for each Exchange
being multicasted. The counter starts from 0.",
javaType = "int")
diff --git
a/core/camel-api/src/main/java/org/apache/camel/ExchangePropertyKey.java
b/core/camel-api/src/main/java/org/apache/camel/ExchangePropertyKey.java
index 650960c95b77..c106badb8783 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ExchangePropertyKey.java
+++ b/core/camel-api/src/main/java/org/apache/camel/ExchangePropertyKey.java
@@ -72,6 +72,8 @@ public enum ExchangePropertyKey {
INTERCEPT_SEND_TO_ENDPOINT_WHEN_MATCHED(Exchange.INTERCEPT_SEND_TO_ENDPOINT_WHEN_MATCHED),
LOOP_INDEX(Exchange.LOOP_INDEX),
LOOP_SIZE(Exchange.LOOP_SIZE),
+ MESSAGE_BODY_SIZE(Exchange.MESSAGE_BODY_SIZE),
+ MESSAGE_HEADERS_SIZE(Exchange.MESSAGE_HEADERS_SIZE),
MESSAGE_HISTORY(Exchange.MESSAGE_HISTORY),
MULTICAST_COMPLETE(Exchange.MULTICAST_COMPLETE),
MULTICAST_INDEX(Exchange.MULTICAST_INDEX),
@@ -190,6 +192,10 @@ public enum ExchangePropertyKey {
return LOOP_INDEX;
case Exchange.LOOP_SIZE:
return LOOP_SIZE;
+ case Exchange.MESSAGE_BODY_SIZE:
+ return MESSAGE_BODY_SIZE;
+ case Exchange.MESSAGE_HEADERS_SIZE:
+ return MESSAGE_HEADERS_SIZE;
case Exchange.MESSAGE_HISTORY:
return MESSAGE_HISTORY;
case Exchange.MULTICAST_COMPLETE:
diff --git
a/core/camel-api/src/main/java/org/apache/camel/RuntimeConfiguration.java
b/core/camel-api/src/main/java/org/apache/camel/RuntimeConfiguration.java
index 4aef5cda746b..e7296089d257 100644
--- a/core/camel-api/src/main/java/org/apache/camel/RuntimeConfiguration.java
+++ b/core/camel-api/src/main/java/org/apache/camel/RuntimeConfiguration.java
@@ -38,6 +38,22 @@ public interface RuntimeConfiguration {
*/
Boolean isStreamCaching();
+ /**
+ * Sets whether message size capturing is enabled or not (default is
disabled).
+ *
+ * @param messageSize whether message size is enabled or not
+ * @since 4.21
+ */
+ void setMessageSize(Boolean messageSize);
+
+ /**
+ * Returns whether message size capturing is enabled.
+ *
+ * @return <tt>true</tt> if message size is enabled
+ * @since 4.21
+ */
+ Boolean isMessageSize();
+
/**
* Returns whether tracing enabled
*
diff --git
a/core/camel-api/src/main/java/org/apache/camel/spi/MessageSizeStrategy.java
b/core/camel-api/src/main/java/org/apache/camel/spi/MessageSizeStrategy.java
new file mode 100644
index 000000000000..c5f815d6f471
--- /dev/null
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/MessageSizeStrategy.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+import org.apache.camel.Message;
+import org.apache.camel.StaticService;
+
+/**
+ * Strategy for computing the size of message payloads (body and headers) for
observation purposes.
+ * <p/>
+ * Not all message body types can be sized efficiently. For example, Java
objects have no inherent "size", and streaming
+ * payloads would be destructive to read unless stream caching is enabled.
Implementations should return -1 when the
+ * size cannot be determined without side effects.
+ *
+ * @since 4.21
+ */
+public interface MessageSizeStrategy extends StaticService {
+
+ /**
+ * Whether message size computation is enabled.
+ */
+ void setEnabled(boolean enabled);
+
+ /**
+ * Whether message size computation is enabled.
+ */
+ boolean isEnabled();
+
+ /**
+ * Computes the size of the message body in bytes.
+ *
+ * @param message the message
+ * @return the body size in bytes, 0 if the body is null, or -1 if
the size cannot be determined
+ */
+ long computeBodySize(Message message);
+
+ /**
+ * Computes the total size of all message headers in bytes (keys and
values).
+ *
+ * @param message the message
+ * @return the total headers size in bytes, or -1 if the size
cannot be determined
+ */
+ long computeHeadersSize(Message message);
+}
diff --git
a/core/camel-api/src/main/java/org/apache/camel/spi/RuntimeEndpointRegistry.java
b/core/camel-api/src/main/java/org/apache/camel/spi/RuntimeEndpointRegistry.java
index afc86371e421..950b7280c24c 100644
---
a/core/camel-api/src/main/java/org/apache/camel/spi/RuntimeEndpointRegistry.java
+++
b/core/camel-api/src/main/java/org/apache/camel/spi/RuntimeEndpointRegistry.java
@@ -54,6 +54,60 @@ public interface RuntimeEndpointRegistry extends
StaticService {
* {@link org.apache.camel.ManagementStatisticsLevel#Extended}.
*/
long getHits();
+
+ /**
+ * Minimum message body size in bytes (-1 if not available)
+ *
+ * @since 4.21
+ */
+ default long getMinBodySize() {
+ return -1;
+ }
+
+ /**
+ * Maximum message body size in bytes (-1 if not available)
+ *
+ * @since 4.21
+ */
+ default long getMaxBodySize() {
+ return -1;
+ }
+
+ /**
+ * Mean message body size in bytes (-1 if not available)
+ *
+ * @since 4.21
+ */
+ default long getMeanBodySize() {
+ return -1;
+ }
+
+ /**
+ * Minimum message headers size in bytes (-1 if not available)
+ *
+ * @since 4.21
+ */
+ default long getMinHeadersSize() {
+ return -1;
+ }
+
+ /**
+ * Maximum message headers size in bytes (-1 if not available)
+ *
+ * @since 4.21
+ */
+ default long getMaxHeadersSize() {
+ return -1;
+ }
+
+ /**
+ * Mean message headers size in bytes (-1 if not available)
+ *
+ * @since 4.21
+ */
+ default long getMeanHeadersSize() {
+ return -1;
+ }
}
/**
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
index 9df88ddebe8e..ceb556bfae20 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
@@ -137,6 +137,7 @@ import org.apache.camel.spi.LifecycleStrategy;
import org.apache.camel.spi.ManagementNameStrategy;
import org.apache.camel.spi.ManagementStrategy;
import org.apache.camel.spi.MessageHistoryFactory;
+import org.apache.camel.spi.MessageSizeStrategy;
import org.apache.camel.spi.ModelJAXBContextFactory;
import org.apache.camel.spi.ModelToStructureDumper;
import org.apache.camel.spi.ModelToXMLDumper;
@@ -278,6 +279,7 @@ public abstract class AbstractCamelContext extends
BaseService
private Boolean logMask = Boolean.FALSE;
private Boolean logExhaustedMessageBody = Boolean.FALSE;
private Boolean streamCache = Boolean.TRUE;
+ private Boolean messageSize = Boolean.FALSE;
private Boolean disableJMX = Boolean.FALSE;
private Boolean loadTypeConverters = Boolean.FALSE;
private Boolean loadHealthChecks = Boolean.FALSE;
@@ -2009,6 +2011,16 @@ public abstract class AbstractCamelContext extends
BaseService
return streamCache;
}
+ @Override
+ public void setMessageSize(Boolean messageSize) {
+ this.messageSize = messageSize;
+ }
+
+ @Override
+ public Boolean isMessageSize() {
+ return messageSize;
+ }
+
@Override
public void setTracing(Boolean tracing) {
this.trace = tracing;
@@ -3136,6 +3148,10 @@ public abstract class AbstractCamelContext extends
BaseService
+ " See more details at
https://camel.apache.org/stream-caching.html");
}
+ if (isMessageSizeInUse()) {
+ getMessageSizeStrategy().setEnabled(true);
+ }
+
if (isAllowUseOriginalMessage()) {
LOG.debug("AllowUseOriginalMessage enabled because
UseOriginalMessage is in use");
}
@@ -3562,6 +3578,10 @@ public abstract class AbstractCamelContext extends
BaseService
return isStreamCaching();
}
+ protected boolean isMessageSizeInUse() throws Exception {
+ return isMessageSize();
+ }
+
protected void bindDataFormats() throws Exception {
}
@@ -4336,6 +4356,16 @@ public abstract class AbstractCamelContext extends
BaseService
camelContextExtension.setStreamCachingStrategy(streamCachingStrategy);
}
+ @Override
+ public MessageSizeStrategy getMessageSizeStrategy() {
+ return camelContextExtension.getMessageSizeStrategy();
+ }
+
+ @Override
+ public void setMessageSizeStrategy(MessageSizeStrategy
messageSizeStrategy) {
+ camelContextExtension.setMessageSizeStrategy(messageSizeStrategy);
+ }
+
@Override
public RestRegistry getRestRegistry() {
return PluginHelper.getRestRegistry(this);
@@ -4455,6 +4485,8 @@ public abstract class AbstractCamelContext extends
BaseService
protected abstract StreamCachingStrategy createStreamCachingStrategy();
+ protected abstract MessageSizeStrategy createMessageSizeStrategy();
+
protected abstract TypeConverter createTypeConverter();
protected abstract TypeConverterRegistry createTypeConverterRegistry();
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultCamelContextExtension.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultCamelContextExtension.java
index 1ad1b4f4cba2..9bfa5d15d3f6 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultCamelContextExtension.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultCamelContextExtension.java
@@ -63,6 +63,7 @@ import org.apache.camel.spi.ManagementNameStrategy;
import org.apache.camel.spi.ManagementStrategy;
import org.apache.camel.spi.ManagementStrategyFactory;
import org.apache.camel.spi.MessageHistoryFactory;
+import org.apache.camel.spi.MessageSizeStrategy;
import org.apache.camel.spi.NormalizedEndpointUri;
import org.apache.camel.spi.PluginManager;
import org.apache.camel.spi.ProcessorExchangeFactory;
@@ -127,6 +128,7 @@ class DefaultCamelContextExtension implements
ExtendedCamelContext {
private volatile ClassResolver classResolver;
private volatile MessageHistoryFactory messageHistoryFactory;
private volatile StreamCachingStrategy streamCachingStrategy;
+ private volatile MessageSizeStrategy messageSizeStrategy;
private volatile InflightRepository inflightRepository;
private volatile ErrorRegistry errorRegistry;
private volatile UuidGenerator uuidGenerator;
@@ -865,6 +867,25 @@ class DefaultCamelContextExtension implements
ExtendedCamelContext {
=
camelContext.getInternalServiceManager().addService(camelContext,
streamCachingStrategy, true, false, true);
}
+ MessageSizeStrategy getMessageSizeStrategy() {
+ if (messageSizeStrategy == null) {
+ lock.lock();
+ try {
+ if (messageSizeStrategy == null) {
+
setMessageSizeStrategy(camelContext.createMessageSizeStrategy());
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ return messageSizeStrategy;
+ }
+
+ void setMessageSizeStrategy(MessageSizeStrategy messageSizeStrategy) {
+ this.messageSizeStrategy
+ =
camelContext.getInternalServiceManager().addService(camelContext,
messageSizeStrategy, true, false, true);
+ }
+
InflightRepository getInflightRepository() {
if (inflightRepository == null) {
lock.lock();
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultMessageSizeStrategy.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultMessageSizeStrategy.java
new file mode 100644
index 000000000000..e909a29022b1
--- /dev/null
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultMessageSizeStrategy.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.engine;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.StreamCache;
+import org.apache.camel.WrappedFile;
+import org.apache.camel.spi.MessageSizeStrategy;
+import org.apache.camel.support.service.ServiceSupport;
+
+/**
+ * Default implementation of {@link MessageSizeStrategy} that computes message
payload sizes for known body types.
+ */
+public class DefaultMessageSizeStrategy extends ServiceSupport implements
CamelContextAware, MessageSizeStrategy {
+
+ private CamelContext camelContext;
+ private boolean enabled;
+
+ @Override
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+
+ @Override
+ public void setCamelContext(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+
+ @Override
+ public void setEnabled(boolean enabled) {
+ this.enabled = enabled;
+ }
+
+ @Override
+ public boolean isEnabled() {
+ return enabled;
+ }
+
+ @Override
+ public long computeBodySize(Message message) {
+ try {
+ Object body = message.getBody();
+ if (body instanceof byte[] bytes) {
+ return bytes.length;
+ }
+ if (body instanceof String str) {
+ return str.getBytes(StandardCharsets.UTF_8).length;
+ }
+ if (body instanceof StreamCache sc) {
+ long len = sc.length();
+ return len > 0 ? len : -1;
+ }
+ if (body instanceof WrappedFile<?> wf) {
+ long len = wf.getFileLength();
+ return len > 0 ? len : -1;
+ }
+ if (body instanceof File f) {
+ return f.length();
+ }
+ if (body instanceof Path p) {
+ return Files.size(p);
+ }
+ // fallback to Content-Length header (e.g. HTTP components where
body is a stream)
+ Long cl = message.getHeader(Exchange.CONTENT_LENGTH, Long.class);
+ if (cl != null && cl >= 0) {
+ return cl;
+ }
+ if (body == null) {
+ return 0;
+ }
+ } catch (Exception e) {
+ // ignore
+ }
+ return -1;
+ }
+
+ @Override
+ public long computeHeadersSize(Message message) {
+ try {
+ Map<String, Object> headers = message.getHeaders();
+ if (headers == null || headers.isEmpty()) {
+ return 0;
+ }
+ long total = 0;
+ for (Map.Entry<String, Object> entry : headers.entrySet()) {
+ String key = entry.getKey();
+ if (key != null) {
+ total += key.getBytes(StandardCharsets.UTF_8).length;
+ }
+ Object value = entry.getValue();
+ if (value != null) {
+ String s = value.toString();
+ total += s.getBytes(StandardCharsets.UTF_8).length;
+ }
+ }
+ return total;
+ } catch (Exception e) {
+ // ignore
+ }
+ return -1;
+ }
+}
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java
index 59ad429699d0..4d218f606174 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java
@@ -90,6 +90,7 @@ public class DefaultRoute extends ServiceSupport implements
Route {
private Boolean logMask;
private Boolean logExhaustedMessageBody;
private Boolean streamCache;
+ private Boolean messageSize;
private Long delay;
private Boolean autoStartup = Boolean.TRUE;
private final List<RoutePolicy> routePolicyList = new ArrayList<>();
@@ -505,6 +506,20 @@ public class DefaultRoute extends ServiceSupport
implements Route {
}
}
+ @Override
+ public void setMessageSize(Boolean messageSize) {
+ this.messageSize = messageSize;
+ }
+
+ @Override
+ public Boolean isMessageSize() {
+ if (messageSize != null) {
+ return messageSize;
+ } else {
+ return camelContext.isMessageSize();
+ }
+ }
+
@Override
public void setDelayer(Long delay) {
this.delay = delay;
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRuntimeEndpointRegistry.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRuntimeEndpointRegistry.java
index 88626bbd199a..8f77ff2ed4de 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRuntimeEndpointRegistry.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRuntimeEndpointRegistry.java
@@ -26,6 +26,9 @@ import java.util.Set;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePropertyKey;
+import org.apache.camel.Message;
+import org.apache.camel.StreamCache;
import org.apache.camel.spi.CamelEvent;
import org.apache.camel.spi.CamelEvent.ExchangeCompletedEvent;
import org.apache.camel.spi.CamelEvent.ExchangeCreatedEvent;
@@ -34,8 +37,10 @@ import org.apache.camel.spi.CamelEvent.ExchangeSendingEvent;
import org.apache.camel.spi.CamelEvent.RouteAddedEvent;
import org.apache.camel.spi.CamelEvent.RouteRemovedEvent;
import org.apache.camel.spi.EndpointUtilizationStatistics;
+import org.apache.camel.spi.MessageSizeStrategy;
import org.apache.camel.spi.RuntimeEndpointRegistry;
import org.apache.camel.support.DefaultEndpointUtilizationStatistics;
+import org.apache.camel.support.EndpointSizeStatistics;
import org.apache.camel.support.EventNotifierSupport;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.LRUCacheFactory;
@@ -54,8 +59,11 @@ public class DefaultRuntimeEndpointRegistry extends
EventNotifierSupport impleme
private int limit = 1000;
private boolean enabled = true;
private volatile boolean extended;
+ private volatile boolean messageSizeEnabled;
private EndpointUtilizationStatistics inputUtilization;
private EndpointUtilizationStatistics outputUtilization;
+ private EndpointSizeStatistics inputSizeStats;
+ private EndpointSizeStatistics outputSizeStats;
@Override
public boolean isEnabled() {
@@ -106,7 +114,8 @@ public class DefaultRuntimeEndpointRegistry extends
EventNotifierSupport impleme
String routeId = entry.getKey();
for (String uri : entry.getValue()) {
Long hits = getHits(routeId, uri, inputUtilization);
- answer.add(new EndpointRuntimeStatistics(uri, routeId, "in",
hits));
+ EndpointSizeStatistics.SizeStats sizeStats =
getSizeStats(routeId, uri, inputSizeStats);
+ answer.add(new EndpointRuntimeStatistics(uri, routeId, "in",
hits, sizeStats));
}
}
@@ -115,7 +124,8 @@ public class DefaultRuntimeEndpointRegistry extends
EventNotifierSupport impleme
String routeId = entry.getKey();
for (String uri : entry.getValue().keySet()) {
Long hits = getHits(routeId, uri, outputUtilization);
- answer.add(new EndpointRuntimeStatistics(uri, routeId, "out",
hits));
+ EndpointSizeStatistics.SizeStats sizeStats =
getSizeStats(routeId, uri, outputSizeStats);
+ answer.add(new EndpointRuntimeStatistics(uri, routeId, "out",
hits, sizeStats));
}
}
@@ -162,6 +172,12 @@ public class DefaultRuntimeEndpointRegistry extends
EventNotifierSupport impleme
if (outputUtilization != null) {
outputUtilization.clear();
}
+ if (inputSizeStats != null) {
+ inputSizeStats.clear();
+ }
+ if (outputSizeStats != null) {
+ outputSizeStats.clear();
+ }
}
@Override
@@ -189,6 +205,11 @@ public class DefaultRuntimeEndpointRegistry extends
EventNotifierSupport impleme
inputUtilization = new DefaultEndpointUtilizationStatistics(limit);
outputUtilization = new
DefaultEndpointUtilizationStatistics(limit);
}
+ messageSizeEnabled = extended && getCamelContext().isMessageSize() !=
null && getCamelContext().isMessageSize();
+ if (messageSizeEnabled) {
+ inputSizeStats = new EndpointSizeStatistics(limit);
+ outputSizeStats = new EndpointSizeStatistics(limit);
+ }
if (extended) {
LOG.debug(
"Runtime endpoint registry is in extended mode gathering
usage statistics of all incoming and outgoing endpoints (cache limit: {})",
@@ -246,6 +267,9 @@ public class DefaultRuntimeEndpointRegistry extends
EventNotifierSupport impleme
String key = asUtilizationKey(routeId, uri);
if (key != null) {
inputUtilization.remove(key);
+ if (messageSizeEnabled) {
+ inputSizeStats.remove(key);
+ }
}
if (rse.getRoute().getConsumer() != null) {
String consumerUri =
rse.getRoute().getConsumer().getEndpoint().getEndpointUri();
@@ -253,24 +277,42 @@ public class DefaultRuntimeEndpointRegistry extends
EventNotifierSupport impleme
String consumerKey = asUtilizationKey(routeId,
consumerUri);
if (consumerKey != null) {
inputUtilization.remove(consumerKey);
+ if (messageSizeEnabled) {
+ inputSizeStats.remove(consumerKey);
+ }
}
}
}
}
} else if (extended && event instanceof ExchangeCreatedEvent ece) {
// we only capture details in extended mode
- Endpoint endpoint = ece.getExchange().getFromEndpoint();
+ Exchange exchange = ece.getExchange();
+ Endpoint endpoint = exchange.getFromEndpoint();
if (endpoint != null) {
- String routeId = ece.getExchange().getFromRouteId();
+ String routeId = exchange.getFromRouteId();
String uri = endpoint.getEndpointUri();
String key = asUtilizationKey(routeId, uri);
if (key != null) {
inputUtilization.onHit(key);
+ if (messageSizeEnabled) {
+ Message message = exchange.getIn();
+ MessageSizeStrategy strategy =
getCamelContext().getMessageSizeStrategy();
+ long bodySize = strategy.computeBodySize(message);
+ long headersSize =
strategy.computeHeadersSize(message);
+ inputSizeStats.onHit(key, bodySize, headersSize);
+
exchange.setProperty(ExchangePropertyKey.MESSAGE_BODY_SIZE, bodySize);
+
exchange.setProperty(ExchangePropertyKey.MESSAGE_HEADERS_SIZE, headersSize);
+ // reset stream cache so the body is re-readable
during routing
+ if (message.getBody() instanceof StreamCache sc) {
+ sc.reset();
+ }
+ }
}
}
} else if (event instanceof ExchangeSendingEvent ese) {
Endpoint endpoint = ese.getEndpoint();
- String routeId = ExchangeHelper.getRouteId(ese.getExchange());
+ Exchange exchange = ese.getExchange();
+ String routeId = ExchangeHelper.getRouteId(exchange);
String uri = endpoint.getEndpointUri();
Map<String, String> uris = outputs.get(routeId);
@@ -281,6 +323,17 @@ public class DefaultRuntimeEndpointRegistry extends
EventNotifierSupport impleme
String key = asUtilizationKey(routeId, uri);
if (key != null) {
outputUtilization.onHit(key);
+ if (messageSizeEnabled) {
+ Message message = exchange.getIn();
+ MessageSizeStrategy strategy =
getCamelContext().getMessageSizeStrategy();
+ long bodySize = strategy.computeBodySize(message);
+ long headersSize =
strategy.computeHeadersSize(message);
+ outputSizeStats.onHit(key, bodySize, headersSize);
+ // reset stream cache so the body is re-readable when
sending
+ if (message.getBody() instanceof StreamCache sc) {
+ sc.reset();
+ }
+ }
}
}
} else if (event instanceof ExchangeCompletedEvent || event instanceof
ExchangeFailedEvent) {
@@ -301,6 +354,17 @@ public class DefaultRuntimeEndpointRegistry extends
EventNotifierSupport impleme
String key = asUtilizationKey(routeId, uri);
if (key != null) {
outputUtilization.onHit(key);
+ if (messageSizeEnabled) {
+ Message message = exchange.getMessage();
+ MessageSizeStrategy strategy =
getCamelContext().getMessageSizeStrategy();
+ long bodySize =
strategy.computeBodySize(message);
+ long headersSize =
strategy.computeHeadersSize(message);
+ outputSizeStats.onHit(key, bodySize,
headersSize);
+ // reset stream cache so the body is
re-readable
+ if (message.getBody() instanceof StreamCache
sc) {
+ sc.reset();
+ }
+ }
}
}
}
@@ -323,6 +387,16 @@ public class DefaultRuntimeEndpointRegistry extends
EventNotifierSupport impleme
|| event instanceof RouteRemovedEvent;
}
+ private EndpointSizeStatistics.SizeStats getSizeStats(String routeId,
String uri, EndpointSizeStatistics statistics) {
+ if (messageSizeEnabled && statistics != null) {
+ String key = asUtilizationKey(routeId, uri);
+ if (key != null) {
+ return statistics.getStats(key);
+ }
+ }
+ return null;
+ }
+
private static String asUtilizationKey(String routeId, String uri) {
if (routeId == null || uri == null) {
return null;
@@ -337,12 +411,15 @@ public class DefaultRuntimeEndpointRegistry extends
EventNotifierSupport impleme
private final String routeId;
private final String direction;
private final long hits;
+ private final EndpointSizeStatistics.SizeStats sizeStats;
- private EndpointRuntimeStatistics(String uri, String routeId, String
direction, long hits) {
+ private EndpointRuntimeStatistics(String uri, String routeId, String
direction, long hits,
+ EndpointSizeStatistics.SizeStats
sizeStats) {
this.uri = uri;
this.routeId = routeId;
this.direction = direction;
this.hits = hits;
+ this.sizeStats = sizeStats;
}
@Override
@@ -364,5 +441,35 @@ public class DefaultRuntimeEndpointRegistry extends
EventNotifierSupport impleme
public long getHits() {
return hits;
}
+
+ @Override
+ public long getMinBodySize() {
+ return sizeStats != null ? sizeStats.getMinBodySize() : -1;
+ }
+
+ @Override
+ public long getMaxBodySize() {
+ return sizeStats != null ? sizeStats.getMaxBodySize() : -1;
+ }
+
+ @Override
+ public long getMeanBodySize() {
+ return sizeStats != null ? sizeStats.getMeanBodySize() : -1;
+ }
+
+ @Override
+ public long getMinHeadersSize() {
+ return sizeStats != null ? sizeStats.getMinHeadersSize() : -1;
+ }
+
+ @Override
+ public long getMaxHeadersSize() {
+ return sizeStats != null ? sizeStats.getMaxHeadersSize() : -1;
+ }
+
+ @Override
+ public long getMeanHeadersSize() {
+ return sizeStats != null ? sizeStats.getMeanHeadersSize() : -1;
+ }
}
}
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
index 089c8c20298d..bbef1f98be5a 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
@@ -69,6 +69,7 @@ import org.apache.camel.spi.InternalProcessorFactory;
import org.apache.camel.spi.LanguageResolver;
import org.apache.camel.spi.ManagementNameStrategy;
import org.apache.camel.spi.MessageHistoryFactory;
+import org.apache.camel.spi.MessageSizeStrategy;
import org.apache.camel.spi.ModelJAXBContextFactory;
import org.apache.camel.spi.ModelToStructureDumper;
import org.apache.camel.spi.ModelToXMLDumper;
@@ -629,6 +630,11 @@ public class SimpleCamelContext extends
AbstractCamelContext {
return new DefaultStreamCachingStrategy();
}
+ @Override
+ protected MessageSizeStrategy createMessageSizeStrategy() {
+ return new DefaultMessageSizeStrategy();
+ }
+
@Override
protected ExchangeFactory createExchangeFactory() {
Optional<ExchangeFactory> result = ResolverHelper.resolveService(
diff --git
a/core/camel-console/src/main/java/org/apache/camel/impl/console/EndpointDevConsole.java
b/core/camel-console/src/main/java/org/apache/camel/impl/console/EndpointDevConsole.java
index 1bf56f0c9f47..b58ba5702bfa 100644
---
a/core/camel-console/src/main/java/org/apache/camel/impl/console/EndpointDevConsole.java
+++
b/core/camel-console/src/main/java/org/apache/camel/impl/console/EndpointDevConsole.java
@@ -65,6 +65,14 @@ public class EndpointDevConsole extends AbstractDevConsole {
for (RuntimeEndpointRegistry.Statistic st : endpointStats)
{
sb.append(String.format("%n %s (remote: %s
direction: %s, usage: %s)", uri, remote,
st.getDirection(), st.getHits()));
+ if (st.getMinBodySize() >= 0) {
+ sb.append(String.format(" body:
min/max/mean=%s/%s/%s",
+ st.getMinBodySize(), st.getMaxBodySize(),
st.getMeanBodySize()));
+ }
+ if (st.getMinHeadersSize() >= 0) {
+ sb.append(String.format(" headers:
min/max/mean=%s/%s/%s",
+ st.getMinHeadersSize(),
st.getMaxHeadersSize(), st.getMeanHeadersSize()));
+ }
}
} else {
sb.append(String.format("%n %s (remote: %s)", uri,
remote));
@@ -110,6 +118,16 @@ public class EndpointDevConsole extends AbstractDevConsole
{
jo.put("direction", st.getDirection());
jo.put("hits", st.getHits());
jo.put("routeId", st.getRouteId());
+ if (st.getMinBodySize() >= 0) {
+ jo.put("minBodySize", st.getMinBodySize());
+ jo.put("maxBodySize", st.getMaxBodySize());
+ jo.put("meanBodySize", st.getMeanBodySize());
+ }
+ if (st.getMinHeadersSize() >= 0) {
+ jo.put("minHeadersSize", st.getMinHeadersSize());
+ jo.put("maxHeadersSize", st.getMaxHeadersSize());
+ jo.put("meanHeadersSize", st.getMeanHeadersSize());
+ }
list.add(jo);
}
} else {
diff --git
a/core/camel-core-engine/src/generated/java/org/apache/camel/impl/CamelContextConfigurer.java
b/core/camel-core-engine/src/generated/java/org/apache/camel/impl/CamelContextConfigurer.java
index cceb0c83c9c2..9c1204342527 100644
---
a/core/camel-core-engine/src/generated/java/org/apache/camel/impl/CamelContextConfigurer.java
+++
b/core/camel-core-engine/src/generated/java/org/apache/camel/impl/CamelContextConfigurer.java
@@ -83,6 +83,10 @@ public class CamelContextConfigurer extends
org.apache.camel.support.component.P
case "messageHistory": target.setMessageHistory(property(camelContext,
java.lang.Boolean.class, value)); return true;
case "messagehistoryfactory":
case "messageHistoryFactory":
target.setMessageHistoryFactory(property(camelContext,
org.apache.camel.spi.MessageHistoryFactory.class, value)); return true;
+ case "messagesize":
+ case "messageSize": target.setMessageSize(property(camelContext,
java.lang.Boolean.class, value)); return true;
+ case "messagesizestrategy":
+ case "messageSizeStrategy":
target.setMessageSizeStrategy(property(camelContext,
org.apache.camel.spi.MessageSizeStrategy.class, value)); return true;
case "modeline": target.setModeline(property(camelContext,
java.lang.Boolean.class, value)); return true;
case "namestrategy":
case "nameStrategy": target.setNameStrategy(property(camelContext,
org.apache.camel.spi.CamelContextNameStrategy.class, value)); return true;
@@ -205,6 +209,10 @@ public class CamelContextConfigurer extends
org.apache.camel.support.component.P
case "messageHistory": return java.lang.Boolean.class;
case "messagehistoryfactory":
case "messageHistoryFactory": return
org.apache.camel.spi.MessageHistoryFactory.class;
+ case "messagesize":
+ case "messageSize": return java.lang.Boolean.class;
+ case "messagesizestrategy":
+ case "messageSizeStrategy": return
org.apache.camel.spi.MessageSizeStrategy.class;
case "modeline": return java.lang.Boolean.class;
case "namestrategy":
case "nameStrategy": return
org.apache.camel.spi.CamelContextNameStrategy.class;
@@ -328,6 +336,10 @@ public class CamelContextConfigurer extends
org.apache.camel.support.component.P
case "messageHistory": return target.isMessageHistory();
case "messagehistoryfactory":
case "messageHistoryFactory": return target.getMessageHistoryFactory();
+ case "messagesize":
+ case "messageSize": return target.isMessageSize();
+ case "messagesizestrategy":
+ case "messageSizeStrategy": return target.getMessageSizeStrategy();
case "modeline": return target.isModeline();
case "namestrategy":
case "nameStrategy": return target.getNameStrategy();
diff --git
a/core/camel-main/src/generated/java/org/apache/camel/main/MainConfigurationPropertiesConfigurer.java
b/core/camel-main/src/generated/java/org/apache/camel/main/MainConfigurationPropertiesConfigurer.java
index 1f0ddf7f7ff8..d21a1ab3029f 100644
---
a/core/camel-main/src/generated/java/org/apache/camel/main/MainConfigurationPropertiesConfigurer.java
+++
b/core/camel-main/src/generated/java/org/apache/camel/main/MainConfigurationPropertiesConfigurer.java
@@ -96,6 +96,7 @@ public class MainConfigurationPropertiesConfigurer extends
org.apache.camel.supp
map.put("MainListeners", java.util.List.class);
map.put("MdcLoggingKeysPattern", java.lang.String.class);
map.put("MessageHistory", boolean.class);
+ map.put("MessageSizeEnabled", boolean.class);
map.put("Modeline", boolean.class);
map.put("Name", java.lang.String.class);
map.put("ProducerTemplateCacheSize", int.class);
@@ -307,6 +308,8 @@ public class MainConfigurationPropertiesConfigurer extends
org.apache.camel.supp
case "mdcLoggingKeysPattern":
target.setMdcLoggingKeysPattern(property(camelContext, java.lang.String.class,
value)); return true;
case "messagehistory":
case "messageHistory": target.setMessageHistory(property(camelContext,
boolean.class, value)); return true;
+ case "messagesizeenabled":
+ case "messageSizeEnabled":
target.setMessageSizeEnabled(property(camelContext, boolean.class, value));
return true;
case "modeline": target.setModeline(property(camelContext,
boolean.class, value)); return true;
case "name": target.setName(property(camelContext,
java.lang.String.class, value)); return true;
case "producertemplatecachesize":
@@ -577,6 +580,8 @@ public class MainConfigurationPropertiesConfigurer extends
org.apache.camel.supp
case "mdcLoggingKeysPattern": return java.lang.String.class;
case "messagehistory":
case "messageHistory": return boolean.class;
+ case "messagesizeenabled":
+ case "messageSizeEnabled": return boolean.class;
case "modeline": return boolean.class;
case "name": return java.lang.String.class;
case "producertemplatecachesize":
@@ -843,6 +848,8 @@ public class MainConfigurationPropertiesConfigurer extends
org.apache.camel.supp
case "mdcLoggingKeysPattern": return target.getMdcLoggingKeysPattern();
case "messagehistory":
case "messageHistory": return target.isMessageHistory();
+ case "messagesizeenabled":
+ case "messageSizeEnabled": return target.isMessageSizeEnabled();
case "modeline": return target.isModeline();
case "name": return target.getName();
case "producertemplatecachesize":
diff --git
a/core/camel-main/src/generated/resources/META-INF/camel-main-configuration-metadata.json
b/core/camel-main/src/generated/resources/META-INF/camel-main-configuration-metadata.json
index 9e39a3ecdeff..bdf518fd508a 100644
---
a/core/camel-main/src/generated/resources/META-INF/camel-main-configuration-metadata.json
+++
b/core/camel-main/src/generated/resources/META-INF/camel-main-configuration-metadata.json
@@ -104,6 +104,7 @@
{ "name": "camel.main.mainListeners", "required": false, "description":
"Sets main listener objects that will be used for MainListener that makes it
possible to do custom logic during starting and stopping camel-main.",
"sourceType": "org.apache.camel.main.MainConfigurationProperties", "type":
"array", "javaType": "java.util.List", "secret": false },
{ "name": "camel.main.mdcLoggingKeysPattern", "required": false,
"description": "Sets the pattern used for determine which custom MDC keys to
propagate during message routing when the routing engine continues routing
asynchronously for the given message. Setting this pattern to will propagate
all custom keys. Or setting the pattern to foo,bar will propagate any keys
starting with either foo or bar. Notice that a set of standard Camel MDC keys
are always propagated which starts with c [...]
{ "name": "camel.main.messageHistory", "required": false, "description":
"Sets whether message history is enabled or not. Default is false.",
"sourceType": "org.apache.camel.main.DefaultConfigurationProperties", "type":
"boolean", "javaType": "boolean", "defaultValue": false, "secret": false },
+ { "name": "camel.main.messageSizeEnabled", "required": false,
"description": "Sets whether message size observation is enabled (default is
false). When enabled, Camel will compute the size of message body and headers
(in bytes) per endpoint (for both IN and OUT directions) and make this
available via JMX MBeans (min\/max\/mean body size and headers size).",
"sourceType": "org.apache.camel.main.DefaultConfigurationProperties", "type":
"boolean", "javaType": "boolean", "defaultValue": [...]
{ "name": "camel.main.modeline", "required": false, "description":
"Whether to support JBang style \/\/DEPS to specify additional dependencies
when running Camel JBang", "sourceType":
"org.apache.camel.main.DefaultConfigurationProperties", "type": "boolean",
"javaType": "boolean", "defaultValue": false, "secret": false },
{ "name": "camel.main.name", "required": false, "description": "Sets the
name of the CamelContext.", "sourceType":
"org.apache.camel.main.DefaultConfigurationProperties", "type": "string",
"javaType": "java.lang.String", "secret": false },
{ "name": "camel.main.producerTemplateCacheSize", "required": false,
"description": "Producer template endpoints cache size.", "sourceType":
"org.apache.camel.main.DefaultConfigurationProperties", "type": "integer",
"javaType": "int", "defaultValue": 1000, "secret": false },
diff --git a/core/camel-main/src/main/docs/main.adoc
b/core/camel-main/src/main/docs/main.adoc
index ba4bd0907e18..3e5f2d95c857 100644
--- a/core/camel-main/src/main/docs/main.adoc
+++ b/core/camel-main/src/main/docs/main.adoc
@@ -19,7 +19,7 @@ The following tables lists all the options:
// main options: START
=== Camel Main configurations
-The camel.main supports 132 options, which are listed below.
+The camel.main supports 133 options, which are listed below.
[width="100%",cols="2,5,^1,2",options="header"]
|===
@@ -98,6 +98,7 @@ The camel.main supports 132 options, which are listed below.
| *camel.main.mainListeners* | Sets main listener objects that will be used
for MainListener that makes it possible to do custom logic during starting and
stopping camel-main. | | List
| *camel.main.mdcLoggingKeysPattern* | Sets the pattern used for determine
which custom MDC keys to propagate during message routing when the routing
engine continues routing asynchronously for the given message. Setting this
pattern to will propagate all custom keys. Or setting the pattern to foo,bar
will propagate any keys starting with either foo or bar. Notice that a set of
standard Camel MDC keys are always propagated which starts with camel. as key
name. The match rules are applied [...]
| *camel.main.messageHistory* | Sets whether message history is enabled or
not. Default is false. | false | boolean
+| *camel.main.messageSizeEnabled* | Sets whether message size observation is
enabled (default is false). When enabled, Camel will compute the size of
message body and headers (in bytes) per endpoint (for both IN and OUT
directions) and make this available via JMX MBeans (min/max/mean body size and
headers size). | false | boolean
| *camel.main.modeline* | Whether to support JBang style //DEPS to specify
additional dependencies when running Camel JBang | false | boolean
| *camel.main.name* | Sets the name of the CamelContext. | | String
| *camel.main.producerTemplateCacheSize* | Producer template endpoints cache
size. | 1000 | int
diff --git
a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java
b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java
index 84793d6100f3..6ecd82204c1b 100644
---
a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java
+++
b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java
@@ -254,6 +254,9 @@ public final class DefaultConfigurationConfigurer {
}
}
+ // message size
+ camelContext.setMessageSize(config.isMessageSizeEnabled());
+
if ("default".equals(config.getUuidGenerator())) {
camelContext.setUuidGenerator(new DefaultUuidGenerator());
} else if ("short".equals(config.getUuidGenerator())) {
diff --git
a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationProperties.java
b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationProperties.java
index b341db33d56f..38c3c3a62e0c 100644
---
a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationProperties.java
+++
b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationProperties.java
@@ -85,6 +85,7 @@ public abstract class DefaultConfigurationProperties<T> {
private int streamCachingBufferSize;
private boolean streamCachingRemoveSpoolDirectoryWhenStopping = true;
private boolean streamCachingStatisticsEnabled;
+ private boolean messageSizeEnabled;
private boolean typeConverterStatisticsEnabled;
private boolean tracing;
private boolean tracingStandby;
@@ -711,6 +712,20 @@ public abstract class DefaultConfigurationProperties<T> {
this.streamCachingStatisticsEnabled = streamCachingStatisticsEnabled;
}
+ public boolean isMessageSizeEnabled() {
+ return messageSizeEnabled;
+ }
+
+ /**
+ * Sets whether message size observation is enabled (default is false).
+ *
+ * When enabled, Camel will compute the size of message body and headers
(in bytes) per endpoint (for both IN and
+ * OUT directions) and make this available via JMX MBeans (min/max/mean
body size and headers size).
+ */
+ public void setMessageSizeEnabled(boolean messageSizeEnabled) {
+ this.messageSizeEnabled = messageSizeEnabled;
+ }
+
public boolean isTypeConverterStatisticsEnabled() {
return typeConverterStatisticsEnabled;
}
@@ -2186,6 +2201,17 @@ public abstract class DefaultConfigurationProperties<T> {
return (T) this;
}
+ /**
+ * Sets whether message size observation is enabled (default is false).
+ *
+ * When enabled, Camel will compute the size of message body and headers
(in bytes) per endpoint (for both IN and
+ * OUT directions) and make this available via JMX MBeans (min/max/mean
body size and headers size).
+ */
+ public T withMessageSizeEnabled(boolean messageSizeEnabled) {
+ this.messageSizeEnabled = messageSizeEnabled;
+ return (T) this;
+ }
+
/**
* Sets whether type converter statistics is enabled.
*
diff --git
a/core/camel-main/src/main/java/org/apache/camel/main/ProfileConfigurer.java
b/core/camel-main/src/main/java/org/apache/camel/main/ProfileConfigurer.java
index dd6b6b5643f5..27c753c6822b 100644
--- a/core/camel-main/src/main/java/org/apache/camel/main/ProfileConfigurer.java
+++ b/core/camel-main/src/main/java/org/apache/camel/main/ProfileConfigurer.java
@@ -99,6 +99,7 @@ public class ProfileConfigurer {
config.setLoadStatisticsEnabled(true);
config.setMessageHistory(true);
config.setInflightRepositoryBrowseEnabled(true);
+ config.setMessageSizeEnabled(true);
config.setEndpointRuntimeStatisticsEnabled(true);
config.setJmxManagementStatisticsLevel(ManagementStatisticsLevel.Extended);
config.setJmxUpdateRouteEnabled(true);
diff --git
a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java
b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java
index 00445f80aa20..74095faf878a 100644
---
a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java
+++
b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java
@@ -108,11 +108,19 @@ public final class CamelOpenMBeanTypes {
public static CompositeType listRuntimeEndpointsCompositeType() throws
OpenDataException {
return new CompositeType(
"endpoints", "Endpoints",
- new String[] { "index", "url", "routeId", "direction",
"static", "dynamic", "hits" },
- new String[] { "Index", "Url", "Route Id", "Direction",
"Static", "Dynamic", "Hits" },
+ new String[] {
+ "index", "url", "routeId", "direction", "static",
"dynamic", "hits",
+ "minBodySize", "maxBodySize", "meanBodySize",
+ "minHeadersSize", "maxHeadersSize", "meanHeadersSize"
},
+ new String[] {
+ "Index", "Url", "Route Id", "Direction", "Static",
"Dynamic", "Hits",
+ "Min Body Size", "Max Body Size", "Mean Body Size",
+ "Min Headers Size", "Max Headers Size", "Mean Headers
Size" },
new OpenType[] {
SimpleType.INTEGER, SimpleType.STRING,
SimpleType.STRING, SimpleType.STRING, SimpleType.BOOLEAN,
- SimpleType.BOOLEAN, SimpleType.LONG });
+ SimpleType.BOOLEAN, SimpleType.LONG,
+ SimpleType.LONG, SimpleType.LONG, SimpleType.LONG,
+ SimpleType.LONG, SimpleType.LONG, SimpleType.LONG });
}
public static TabularType listComponentsTabularType() throws
OpenDataException {
diff --git
a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedRuntimeEndpointRegistry.java
b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedRuntimeEndpointRegistry.java
index bcb1cce8cce5..68718e39c73f 100644
---
a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedRuntimeEndpointRegistry.java
+++
b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedRuntimeEndpointRegistry.java
@@ -113,8 +113,15 @@ public class ManagedRuntimeEndpointRegistry extends
ManagedService implements Ma
long hits = stat.getHits();
CompositeData data = new CompositeDataSupport(
- ct, new String[] { "index", "url", "routeId",
"direction", "static", "dynamic", "hits" },
- new Object[] { index, url, routeId, direction,
isStatic, isDynamic, hits });
+ ct,
+ new String[] {
+ "index", "url", "routeId", "direction",
"static", "dynamic", "hits",
+ "minBodySize", "maxBodySize", "meanBodySize",
+ "minHeadersSize", "maxHeadersSize",
"meanHeadersSize" },
+ new Object[] {
+ index, url, routeId, direction, isStatic,
isDynamic, hits,
+ stat.getMinBodySize(), stat.getMaxBodySize(),
stat.getMeanBodySize(),
+ stat.getMinHeadersSize(),
stat.getMaxHeadersSize(), stat.getMeanHeadersSize() });
answer.put(data);
// use a counter as the single index in the TabularData as we
do not want a multi-value index
diff --git
a/core/camel-management/src/test/java/org/apache/camel/management/ManagedMessageSizeStreamCachingTest.java
b/core/camel-management/src/test/java/org/apache/camel/management/ManagedMessageSizeStreamCachingTest.java
new file mode 100644
index 000000000000..4d2ef49dd94f
--- /dev/null
+++
b/core/camel-management/src/test/java/org/apache/camel/management/ManagedMessageSizeStreamCachingTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management;
+
+import java.io.ByteArrayInputStream;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ManagementStatisticsLevel;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@DisabledOnOs(OS.AIX)
+public class ManagedMessageSizeStreamCachingTest extends ManagementTestSupport
{
+
+ @Override
+ protected CamelContext createCamelContext() throws Exception {
+ CamelContext context = super.createCamelContext();
+ context.setMessageSize(true);
+ context.setStreamCaching(true);
+
context.getManagementStrategy().getManagementAgent().setEndpointRuntimeStatisticsEnabled(true);
+
context.getManagementStrategy().getManagementAgent().setStatisticsLevel(ManagementStatisticsLevel.Extended);
+ return context;
+ }
+
+ @Test
+ public void testStreamCacheBodyReadableAfterSizeComputation() throws
Exception {
+ getMockEndpoint("mock:result").expectedMessageCount(1);
+
+ String body = "Hello Stream Cached World";
+ template.sendBody("seda:start", new
ByteArrayInputStream(body.getBytes(StandardCharsets.UTF_8)));
+
+ assertMockEndpointsSatisfied();
+
+ // verify that the body was still readable after size computation
(stream cache was reset)
+ String receivedBody =
getMockEndpoint("mock:result").getExchanges().get(0)
+ .getIn().getBody(String.class);
+ assertEquals(body, receivedBody);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("seda:start").routeId("route1")
+ .to("mock:result");
+ }
+ };
+ }
+}
diff --git
a/core/camel-management/src/test/java/org/apache/camel/management/ManagedMessageSizeTest.java
b/core/camel-management/src/test/java/org/apache/camel/management/ManagedMessageSizeTest.java
new file mode 100644
index 000000000000..0b45ceabf220
--- /dev/null
+++
b/core/camel-management/src/test/java/org/apache/camel/management/ManagedMessageSizeTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ManagementStatisticsLevel;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spi.RuntimeEndpointRegistry;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@DisabledOnOs(OS.AIX)
+public class ManagedMessageSizeTest extends ManagementTestSupport {
+
+ @Override
+ protected CamelContext createCamelContext() throws Exception {
+ CamelContext context = super.createCamelContext();
+ context.setMessageSize(true);
+
context.getManagementStrategy().getManagementAgent().setEndpointRuntimeStatisticsEnabled(true);
+
context.getManagementStrategy().getManagementAgent().setStatisticsLevel(ManagementStatisticsLevel.Extended);
+ return context;
+ }
+
+ @Test
+ public void testMessageSizeOnEndpoints() throws Exception {
+ getMockEndpoint("mock:result").expectedMessageCount(2);
+
+ String body1 = "Hello World";
+ template.sendBodyAndHeader("seda:start", body1, "myHeader", "myValue");
+
+ String body2 = "Bye World";
+ template.sendBodyAndHeader("seda:start", body2, "myHeader", "myValue");
+
+ assertMockEndpointsSatisfied();
+
+ long body1Size = body1.getBytes(StandardCharsets.UTF_8).length;
+ long body2Size = body2.getBytes(StandardCharsets.UTF_8).length;
+
+ List<RuntimeEndpointRegistry.Statistic> stats =
context.getRuntimeEndpointRegistry().getEndpointStatistics();
+
+ // find the input endpoint (seda:start, direction "in")
+ RuntimeEndpointRegistry.Statistic inputStat = stats.stream()
+ .filter(s -> "in".equals(s.getDirection()) &&
s.getUri().contains("seda://start"))
+ .findFirst()
+ .orElse(null);
+
+ assertNotNull(inputStat, "Should find input endpoint statistic");
+ assertEquals(2, inputStat.getHits());
+ assertEquals(Math.min(body1Size, body2Size),
inputStat.getMinBodySize());
+ assertEquals(Math.max(body1Size, body2Size),
inputStat.getMaxBodySize());
+ assertEquals((body1Size + body2Size) / 2, inputStat.getMeanBodySize());
+ assertTrue(inputStat.getMinHeadersSize() > 0, "MinHeadersSize should
be positive");
+ assertTrue(inputStat.getMaxHeadersSize() > 0, "MaxHeadersSize should
be positive");
+ assertTrue(inputStat.getMeanHeadersSize() > 0, "MeanHeadersSize should
be positive");
+
+ // find the output endpoint (mock:result, direction "out")
+ RuntimeEndpointRegistry.Statistic outputStat = stats.stream()
+ .filter(s -> "out".equals(s.getDirection()) &&
s.getUri().contains("mock://result"))
+ .findFirst()
+ .orElse(null);
+
+ assertNotNull(outputStat, "Should find output endpoint statistic");
+ assertEquals(2, outputStat.getHits());
+ assertEquals(Math.min(body1Size, body2Size),
outputStat.getMinBodySize());
+ assertEquals(Math.max(body1Size, body2Size),
outputStat.getMaxBodySize());
+ }
+
+ @Test
+ public void testExchangePropertiesSet() throws Exception {
+ getMockEndpoint("mock:result").expectedMessageCount(1);
+
+ String body = "Hello World";
+ template.sendBody("seda:start", body);
+
+ assertMockEndpointsSatisfied();
+
+ long expectedBodySize = body.getBytes(StandardCharsets.UTF_8).length;
+ Long actualBodySize =
getMockEndpoint("mock:result").getExchanges().get(0)
+ .getProperty("CamelMessageBodySize", Long.class);
+ assertEquals(expectedBodySize, actualBodySize);
+
+ Long actualHeadersSize =
getMockEndpoint("mock:result").getExchanges().get(0)
+ .getProperty("CamelMessageHeadersSize", Long.class);
+ assertNotNull(actualHeadersSize, "Headers size should be set");
+ assertTrue(actualHeadersSize >= 0, "Headers size should be
non-negative");
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("seda:start").routeId("route1")
+ .to("mock:result");
+ }
+ };
+ }
+}
diff --git
a/core/camel-support/src/main/java/org/apache/camel/support/EndpointSizeStatistics.java
b/core/camel-support/src/main/java/org/apache/camel/support/EndpointSizeStatistics.java
new file mode 100644
index 000000000000..211e4aa4c643
--- /dev/null
+++
b/core/camel-support/src/main/java/org/apache/camel/support/EndpointSizeStatistics.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.support;
+
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Tracks message body and headers size statistics per endpoint key.
+ *
+ * @since 4.21
+ */
+public class EndpointSizeStatistics {
+
+ private final Map<String, SizeStats> map;
+ private final Lock lock = new ReentrantLock();
+
+ public EndpointSizeStatistics(int maxCapacity) {
+ this.map = LRUCacheFactory.newLRUCache(16, maxCapacity, false);
+ }
+
+ public void onHit(String key, long bodySize, long headersSize) {
+ lock.lock();
+ try {
+ map.compute(key, (k, current) -> {
+ if (current == null) {
+ current = new SizeStats();
+ }
+ if (bodySize >= 0) {
+ current.bodyCount++;
+ current.totalBodySize += bodySize;
+ if (bodySize < current.minBodySize || current.bodyCount ==
1) {
+ current.minBodySize = bodySize;
+ }
+ if (bodySize > current.maxBodySize) {
+ current.maxBodySize = bodySize;
+ }
+ }
+ if (headersSize >= 0) {
+ current.headersCount++;
+ current.totalHeadersSize += headersSize;
+ if (headersSize < current.minHeadersSize ||
current.headersCount == 1) {
+ current.minHeadersSize = headersSize;
+ }
+ if (headersSize > current.maxHeadersSize) {
+ current.maxHeadersSize = headersSize;
+ }
+ }
+ return current;
+ });
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public SizeStats getStats(String key) {
+ lock.lock();
+ try {
+ return map.get(key);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void remove(String key) {
+ lock.lock();
+ try {
+ map.remove(key);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void clear() {
+ lock.lock();
+ try {
+ map.clear();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public static class SizeStats {
+ long minBodySize;
+ long maxBodySize;
+ long totalBodySize;
+ long bodyCount;
+ long minHeadersSize;
+ long maxHeadersSize;
+ long totalHeadersSize;
+ long headersCount;
+
+ public long getMinBodySize() {
+ return bodyCount > 0 ? minBodySize : -1;
+ }
+
+ public long getMaxBodySize() {
+ return bodyCount > 0 ? maxBodySize : -1;
+ }
+
+ public long getMeanBodySize() {
+ return bodyCount > 0 ? totalBodySize / bodyCount : -1;
+ }
+
+ public long getMinHeadersSize() {
+ return headersCount > 0 ? minHeadersSize : -1;
+ }
+
+ public long getMaxHeadersSize() {
+ return headersCount > 0 ? maxHeadersSize : -1;
+ }
+
+ public long getMeanHeadersSize() {
+ return headersCount > 0 ? totalHeadersSize / headersCount : -1;
+ }
+ }
+}
diff --git a/docs/user-manual/modules/ROOT/nav.adoc
b/docs/user-manual/modules/ROOT/nav.adoc
index 1d8dd9deb0b0..83ae650487ec 100644
--- a/docs/user-manual/modules/ROOT/nav.adoc
+++ b/docs/user-manual/modules/ROOT/nav.adoc
@@ -79,6 +79,7 @@
** xref:components:eips:intercept.adoc[Intercept]
** xref:jmx.adoc[JMX]
** xref:lifecycle.adoc[Camel Lifecycle]
+** xref:message-size.adoc[Message Size]
** xref:oncompletion.adoc[OnCompletion]
** xref:pluggable-class-resolvers.adoc[Pluggable Class Resolvers]
** xref:predicate.adoc[Predicates]
diff --git
a/docs/user-manual/modules/ROOT/pages/jbang-commands/camel-jbang-get-endpoint.adoc
b/docs/user-manual/modules/ROOT/pages/jbang-commands/camel-jbang-get-endpoint.adoc
index f8d3f58e1600..5bf9bb5be3d0 100644
---
a/docs/user-manual/modules/ROOT/pages/jbang-commands/camel-jbang-get-endpoint.adoc
+++
b/docs/user-manual/modules/ROOT/pages/jbang-commands/camel-jbang-get-endpoint.adoc
@@ -25,7 +25,8 @@ camel get endpoint [options]
| `--json` | Output in JSON Format | | boolean
| `--limit` | Filter endpoints by limiting to the given number of rows | | int
| `--short-uri` | List endpoint URI without query parameters (short) | |
boolean
-| `--sort` | Sort by pid, name, age or total | pid | String
+| `--sort` | Sort by pid, name, age, total, or size | pid | String
+| `--verbose` | Show additional size statistics (min/max body and headers) |
| boolean
| `--watch` | Execute periodically and showing output fullscreen | | boolean
| `--wide-uri` | List endpoint URI in full details | | boolean
| `-h,--help` | Display the help and sub-commands | | boolean
diff --git a/docs/user-manual/modules/ROOT/pages/jmx.adoc
b/docs/user-manual/modules/ROOT/pages/jmx.adoc
index 9918110fdf56..b175e79ab374 100644
--- a/docs/user-manual/modules/ROOT/pages/jmx.adoc
+++ b/docs/user-manual/modules/ROOT/pages/jmx.adoc
@@ -324,6 +324,22 @@ You can enable load statistics such as from
`application.properties`:
camel.main.loadStatisticsEnabled = true
----
+=== Message size statistics
+
+It is possible to include message size statistics per endpoint.
+These track the min, max, and mean sizes of message bodies and headers for
both incoming (IN)
+and outgoing (OUT) endpoints.
+
+You can enable message size statistics from `application.properties`:
+
+[source,properties]
+----
+camel.main.messageSizeEnabled = true
+camel.main.jmxManagementStatisticsLevel = Extended
+----
+
+See xref:message-size.adoc[Message Size] for more details.
+
== Hiding sensitive information
By default, Camel enlists MBeans in JMX such as endpoints configured
diff --git a/docs/user-manual/modules/ROOT/pages/message-size.adoc
b/docs/user-manual/modules/ROOT/pages/message-size.adoc
new file mode 100644
index 000000000000..c059794c5ac8
--- /dev/null
+++ b/docs/user-manual/modules/ROOT/pages/message-size.adoc
@@ -0,0 +1,161 @@
+= Message Size
+
+Camel can track the size of message payloads (body and headers) at the
endpoint level.
+This is useful for observability — understanding how large messages are,
detecting payload bloat,
+and monitoring size trends over time. Sizes are tracked per endpoint for both
incoming (IN)
+and outgoing (OUT) directions.
+
+Message size tracking is *disabled by default* and must be explicitly enabled.
+It also requires extended management statistics level.
+
+NOTE: Message size tracking is a *global* feature. When enabled, it applies to
all endpoints
+in the CamelContext. There is no fine-grained control to enable or disable it
per route or per endpoint.
+
+IMPORTANT: Streaming message bodies (such as `InputStream`) are only supported
when
+xref:stream-caching.adoc[Stream Caching] is enabled. Without stream caching,
the body size
+of streaming payloads cannot be determined and will be reported as -1
(unknown).
+
+== How it works
+
+When enabled, Camel computes message body and headers sizes using the
`RuntimeEndpointRegistry`
+event mechanism:
+
+* *IN direction* — sizes are computed when an exchange is created by a
consumer endpoint
+ (e.g., an HTTP request arrives, a file is consumed, a JMS message is
received).
+ The computed sizes are also stored as exchange properties
+ (`CamelMessageBodySize` and `CamelMessageHeadersSize`) for use during
routing.
+* *OUT direction* — sizes are computed when a message is sent to a producer
endpoint
+ (e.g., sending to a Kafka topic, writing to a file, calling an HTTP service).
+
+Size statistics (min, max, mean) are collected per endpoint and exposed via
JMX as part
+of the runtime endpoint statistics.
+
+A pluggable `MessageSizeStrategy` computes the actual sizes.
+
+== Enabling Message Size
+
+Message size tracking requires both the `messageSizeEnabled` option and
`Extended` statistics level:
+
+[tabs]
+====
+
+Application Properties::
++
+[source,properties]
+----
+camel.main.messageSizeEnabled = true
+camel.main.jmxManagementStatisticsLevel = Extended
+----
+
+Java::
++
+[source,java]
+----
+CamelContext context = ...
+context.setMessageSize(true);
+context.getManagementStrategy().getManagementAgent()
+ .setStatisticsLevel(ManagementStatisticsLevel.Extended);
+----
+
+====
+
+TIP: Message size tracking is automatically enabled when running with the
`dev` profile.
+
+TIP: You can run Camel JBang: `camel doc main --filter=messageSize` from CLI
to see message size options.
+
+== Default size computation
+
+The default `MessageSizeStrategy` computes body size using type-specific logic.
+If the body type is not recognized, it falls back to the `Content-Length`
header (useful for HTTP components
+where the body is a stream). If size cannot be determined, it returns -1 and
the exchange is not included
+in the size statistics.
+
+=== Body size
+
+[width="100%",cols="1m,3",options="header"]
+|===
+| Body type | Size computation
+| null | 0
+| byte[] | Array length
+| String | UTF-8 encoded byte length
+| StreamCache | `length()` method
+| WrappedFile | `getFileLength()` method (covers file, FTP, and similar
components)
+| File | `length()` method
+| Path | `Files.size()`
+| Content-Length header | Fallback: uses the `Content-Length` message header
value
+| Other | -1 (unknown, not tracked)
+|===
+
+=== Headers size
+
+The total headers size is computed by iterating all message headers and
summing:
+
+* Each header key's UTF-8 byte length
+* Each header value's `toString()` UTF-8 byte length
+
+Returns 0 if there are no headers.
+
+== Endpoint statistics via JMX
+
+When JMX is enabled with `Extended` statistics level, size statistics are
available
+in the runtime endpoint registry MBean alongside the existing endpoint hit
counts.
+
+The statistics are part of the `endpointStatistics` tabular data on the
+`RuntimeEndpointRegistry` MBean (under `org.apache.camel` domain, type
`services`):
+
+[width="100%",cols="1m,3",options="header"]
+|===
+| Column | Description
+| url | Endpoint URI
+| routeId | Associated route ID
+| direction | `in` for consumer endpoints, `out` for producer endpoints
+| hits | Number of messages processed
+| minBodySize | Smallest message body size observed (bytes), -1 if not tracked
+| maxBodySize | Largest message body size observed (bytes), -1 if not tracked
+| meanBodySize | Average message body size observed (bytes), -1 if not tracked
+| minHeadersSize | Smallest total headers size observed (bytes), -1 if not
tracked
+| maxHeadersSize | Largest total headers size observed (bytes), -1 if not
tracked
+| meanHeadersSize | Average total headers size observed (bytes), -1 if not
tracked
+|===
+
+The statistics are reset when the endpoint registry statistics are reset.
+
+== Exchange properties
+
+For the IN direction, the computed sizes are available as exchange properties
during routing.
+You can use them for logging, content-based routing, or custom processing:
+
+[source,java]
+----
+from("seda:start")
+ .log("Body size: ${exchangeProperty.CamelMessageBodySize} bytes")
+ .choice()
+ .when(exchangeProperty("CamelMessageBodySize").isGreaterThan(1048576))
+ .to("direct:largeMessage")
+ .otherwise()
+ .to("direct:normalMessage");
+----
+
+[width="100%",cols="1m,3",options="header"]
+|===
+| Property | Description
+| CamelMessageBodySize | Body size in bytes, or -1 if unknown
+| CamelMessageHeadersSize | Total headers size in bytes
+|===
+
+NOTE: Exchange properties are set when the consumer endpoint creates the
exchange.
+They reflect the size of the incoming message as received by the route.
+
+== Custom strategy
+
+You can provide a custom `MessageSizeStrategy` implementation for specialized
sizing logic
+(for example, to handle application-specific body types):
+
+[source,java]
+----
+CamelContext context = ...
+context.setMessageSizeStrategy(new MyCustomMessageSizeStrategy());
+context.setMessageSize(true);
+----
+
+The strategy must implement `org.apache.camel.spi.MessageSizeStrategy`.
diff --git
a/dsl/camel-jbang/camel-jbang-core/src/generated/resources/META-INF/camel-jbang-commands-metadata.json
b/dsl/camel-jbang/camel-jbang-core/src/generated/resources/META-INF/camel-jbang-commands-metadata.json
index fdc22c91dc6c..194ec512ccdc 100644
---
a/dsl/camel-jbang/camel-jbang-core/src/generated/resources/META-INF/camel-jbang-commands-metadata.json
+++
b/dsl/camel-jbang/camel-jbang-core/src/generated/resources/META-INF/camel-jbang-commands-metadata.json
@@ -14,7 +14,7 @@
{ "name": "eval", "fullName": "eval", "description": "Evaluate Camel
expressions and scripts", "sourceClass":
"org.apache.camel.dsl.jbang.core.commands.EvalCommand", "options": [ { "names":
"-h,--help", "description": "Display the help and sub-commands", "javaType":
"boolean", "type": "boolean" } ], "subcommands": [ { "name": "expression",
"fullName": "eval expression", "description": "Evaluates Camel expression",
"sourceClass": "org.apache.camel.dsl.jbang.core.commands.action.EvalEx [...]
{ "name": "explain", "fullName": "explain", "description": "Explain what a
Camel route does using AI\/LLM", "sourceClass":
"org.apache.camel.dsl.jbang.core.commands.Explain", "options": [ { "names":
"--api-key", "description": "API key for authentication. Also reads
ANTHROPIC_API_KEY, OPENAI_API_KEY, or LLM_API_KEY env vars", "javaType":
"java.lang.String", "type": "string" }, { "names": "--api-type", "description":
"API type: 'ollama', 'openai' (OpenAI-compatible), or 'anthropic' (A [...]
{ "name": "export", "fullName": "export", "description": "Export to other
runtimes (Camel Main, Spring Boot, or Quarkus)", "sourceClass":
"org.apache.camel.dsl.jbang.core.commands.Export", "options": [ { "names":
"--build-property", "description": "Maven build properties, ex.
--build-property=prop1=foo", "javaType": "java.util.List", "type": "array" }, {
"names": "--camel-spring-boot-version", "description": "Camel version to use
with Spring Boot", "javaType": "java.lang.String", "ty [...]
- { "name": "get", "fullName": "get", "description": "Get status of Camel
integrations", "sourceClass":
"org.apache.camel.dsl.jbang.core.commands.process.CamelStatus", "options": [ {
"names": "--watch", "description": "Execute periodically and showing output
fullscreen", "javaType": "boolean", "type": "boolean" }, { "names":
"-h,--help", "description": "Display the help and sub-commands", "javaType":
"boolean", "type": "boolean" } ], "subcommands": [ { "name": "bean",
"fullName": "get [...]
+ { "name": "get", "fullName": "get", "description": "Get status of Camel
integrations", "sourceClass":
"org.apache.camel.dsl.jbang.core.commands.process.CamelStatus", "options": [ {
"names": "--watch", "description": "Execute periodically and showing output
fullscreen", "javaType": "boolean", "type": "boolean" }, { "names":
"-h,--help", "description": "Display the help and sub-commands", "javaType":
"boolean", "type": "boolean" } ], "subcommands": [ { "name": "bean",
"fullName": "get [...]
{ "name": "harden", "fullName": "harden", "description": "Suggest security
hardening for Camel routes using AI\/LLM", "sourceClass":
"org.apache.camel.dsl.jbang.core.commands.Harden", "options": [ { "names":
"--api-key", "description": "API key for authentication. Also reads
OPENAI_API_KEY or LLM_API_KEY env vars", "javaType": "java.lang.String",
"type": "string" }, { "names": "--api-type", "description": "API type: 'ollama'
or 'openai' (OpenAI-compatible)", "defaultValue": "ollama", [...]
{ "name": "hawtio", "fullName": "hawtio", "description": "Launch Hawtio
web console", "sourceClass":
"org.apache.camel.dsl.jbang.core.commands.process.Hawtio", "options": [ {
"names": "--host", "description": "Hostname to bind the Hawtio web console to",
"defaultValue": "127.0.0.1", "javaType": "java.lang.String", "type": "string"
}, { "names": "--openUrl", "description": "To automatic open Hawtio web console
in the web browser", "defaultValue": "true", "javaType": "boolean", "type":
[...]
{ "name": "infra", "fullName": "infra", "description": "List and Run
external services for testing and prototyping", "sourceClass":
"org.apache.camel.dsl.jbang.core.commands.infra.InfraCommand", "options": [ {
"names": "--json", "description": "Output in JSON Format", "javaType":
"boolean", "type": "boolean" }, { "names": "-h,--help", "description": "Display
the help and sub-commands", "javaType": "boolean", "type": "boolean" } ],
"subcommands": [ { "name": "get", "fullName": "infra [...]
diff --git
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListEndpoint.java
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListEndpoint.java
index 3ea7020087c8..5059593809af 100644
---
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListEndpoint.java
+++
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListEndpoint.java
@@ -20,6 +20,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
+import java.util.Locale;
import java.util.stream.Collectors;
import com.github.freva.asciitable.AsciiTable;
@@ -47,7 +48,7 @@ public class ListEndpoint extends ProcessWatchCommand {
@Override
public Iterator<String> iterator() {
- return List.of("pid", "name", "age", "total").iterator();
+ return List.of("pid", "name", "age", "total", "size").iterator();
}
}
@@ -56,7 +57,7 @@ public class ListEndpoint extends ProcessWatchCommand {
String name = "*";
@CommandLine.Option(names = { "--sort" }, completionCandidates =
PidNameAgeTotalCompletionCandidates.class,
- description = "Sort by pid, name, age or total",
defaultValue = "pid")
+ description = "Sort by pid, name, age, total, or
size", defaultValue = "pid")
String sort;
@CommandLine.Option(names = { "--limit" },
@@ -75,6 +76,10 @@ public class ListEndpoint extends ProcessWatchCommand {
description = "Filter endpoints that must be higher
than the given usage")
long filterTotal;
+ @CommandLine.Option(names = { "--verbose" },
+ description = "Show additional size statistics
(min/max body and headers)")
+ boolean verbose;
+
@CommandLine.Option(names = { "--short-uri" },
description = "List endpoint URI without query
parameters (short)")
boolean shortUri;
@@ -119,6 +124,12 @@ public class ListEndpoint extends ProcessWatchCommand {
row.remote = o.getBooleanOrDefault("remote",
true);
row.direction = o.getString("direction");
row.total = o.getString("hits");
+ row.meanBodySize =
o.getLongOrDefault("meanBodySize", -1L);
+ row.meanHeadersSize =
o.getLongOrDefault("meanHeadersSize", -1L);
+ row.minBodySize =
o.getLongOrDefault("minBodySize", -1L);
+ row.maxBodySize =
o.getLongOrDefault("maxBodySize", -1L);
+ row.minHeadersSize =
o.getLongOrDefault("minHeadersSize", -1L);
+ row.maxHeadersSize =
o.getLongOrDefault("maxHeadersSize", -1L);
row.uptime = extractSince(ph);
row.age = TimeUtils.printSince(row.uptime);
boolean add = true;
@@ -175,15 +186,36 @@ public class ListEndpoint extends ProcessWatchCommand {
jo.put("stub", r.stub);
jo.put("remote", r.remote);
jo.put("uri", r.endpoint);
+ if (r.meanBodySize >= 0) {
+ jo.put("minBodySize", r.minBodySize);
+ jo.put("maxBodySize", r.maxBodySize);
+ jo.put("meanBodySize", r.meanBodySize);
+ }
+ if (r.meanHeadersSize >= 0) {
+ jo.put("minHeadersSize", r.minHeadersSize);
+ jo.put("maxHeadersSize", r.maxHeadersSize);
+ jo.put("meanHeadersSize", r.meanHeadersSize);
+ }
return jo;
}).collect(Collectors.toList())));
return;
}
+ boolean hasSize = rows.stream().anyMatch(r -> r.meanBodySize >= 0 ||
r.meanHeadersSize >= 0);
// Flexible column: URI (90/140)
// Fixed columns:
PID(8)+NAME(30)+AGE(8)+DIR(3)+TOTAL(5)+STUB(4)+REMOTE(6) ~= 64
+ int fixedW = 64;
+ if (hasSize) {
+ // BODY-SIZE(9)+HDR-SIZE(8) = 17
+ fixedW += 17;
+ }
+ if (hasSize && verbose) {
+ // MIN-BODY(8)+MAX-BODY(8)+MIN-HDR(7)+MAX-HDR(7) = 30
+ fixedW += 30;
+ }
+ int numCols = 9 + (hasSize ? 2 : 0) + (hasSize && verbose ? 4 : 0);
int tw = terminalWidth();
- int uriW = TerminalWidthHelper.flexWidth(tw, 64,
TerminalWidthHelper.noBorderOverhead(9), 20, 90);
- int uriWideW = TerminalWidthHelper.flexWidth(tw, 64,
TerminalWidthHelper.noBorderOverhead(9), 20, 140);
+ int uriW = TerminalWidthHelper.flexWidth(tw, fixedW,
TerminalWidthHelper.noBorderOverhead(numCols), 20, 90);
+ int uriWideW = TerminalWidthHelper.flexWidth(tw, fixedW,
TerminalWidthHelper.noBorderOverhead(numCols), 20, 140);
printer().println(AsciiTable.getTable(AsciiTable.NO_BORDERS, rows,
Arrays.asList(
new
Column().header("PID").headerAlign(HorizontalAlign.CENTER).with(r -> r.pid),
new
Column().header("NAME").dataAlign(HorizontalAlign.LEFT).maxWidth(30,
OverflowBehaviour.ELLIPSIS_RIGHT)
@@ -193,6 +225,18 @@ public class ListEndpoint extends ProcessWatchCommand {
new Column().header("TOTAL").with(r -> r.total),
new
Column().header("STUB").dataAlign(HorizontalAlign.CENTER).with(r -> r.stub ?
"x" : ""),
new
Column().header("REMOTE").dataAlign(HorizontalAlign.CENTER).with(r -> r.remote
? "x" : ""),
+ new
Column().header("BODY-SIZE").visible(hasSize).dataAlign(HorizontalAlign.RIGHT)
+ .with(r -> sizeToString(r.meanBodySize)),
+ new
Column().header("HDR-SIZE").visible(hasSize).dataAlign(HorizontalAlign.RIGHT)
+ .with(r -> sizeToString(r.meanHeadersSize)),
+ new Column().header("MIN-BODY").visible(hasSize &&
verbose).dataAlign(HorizontalAlign.RIGHT)
+ .with(r -> sizeToString(r.minBodySize)),
+ new Column().header("MAX-BODY").visible(hasSize &&
verbose).dataAlign(HorizontalAlign.RIGHT)
+ .with(r -> sizeToString(r.maxBodySize)),
+ new Column().header("MIN-HDR").visible(hasSize &&
verbose).dataAlign(HorizontalAlign.RIGHT)
+ .with(r -> sizeToString(r.minHeadersSize)),
+ new Column().header("MAX-HDR").visible(hasSize &&
verbose).dataAlign(HorizontalAlign.RIGHT)
+ .with(r -> sizeToString(r.maxHeadersSize)),
new
Column().header("URI").visible(!wideUri).dataAlign(HorizontalAlign.LEFT)
.maxWidth(uriW, OverflowBehaviour.ELLIPSIS_RIGHT)
.with(this::getUri),
@@ -212,6 +256,19 @@ public class ListEndpoint extends ProcessWatchCommand {
return u;
}
+ private static String sizeToString(long size) {
+ if (size < 0) {
+ return "-";
+ }
+ if (size < 1024) {
+ return size + " B";
+ } else if (size < 1024 * 1024) {
+ return String.format(Locale.US, "%.1f KB", size / 1024.0);
+ } else {
+ return String.format(Locale.US, "%.1f MB", size / (1024.0 *
1024.0));
+ }
+ }
+
protected int sortRow(Row o1, Row o2) {
String s = sort;
int negate = 1;
@@ -228,6 +285,8 @@ public class ListEndpoint extends ProcessWatchCommand {
return Long.compare(o1.uptime, o2.uptime) * negate;
case "total":
return Long.compare(Long.parseLong(o1.total),
Long.parseLong(o2.total)) * negate;
+ case "size":
+ return Long.compare(o1.meanBodySize, o2.meanBodySize) * negate;
default:
return 0;
}
@@ -243,6 +302,12 @@ public class ListEndpoint extends ProcessWatchCommand {
String total;
boolean stub;
boolean remote;
+ long meanBodySize = -1;
+ long meanHeadersSize = -1;
+ long minBodySize = -1;
+ long maxBodySize = -1;
+ long minHeadersSize = -1;
+ long maxHeadersSize = -1;
}
}
diff --git
a/dsl/camel-jbang/camel-jbang-core/src/main/resources/examples/camel-jbang-example-catalog.json
b/dsl/camel-jbang/camel-jbang-core/src/main/resources/examples/camel-jbang-example-catalog.json
index 4e42f22e97d8..fca58c3f0788 100644
---
a/dsl/camel-jbang/camel-jbang-core/src/main/resources/examples/camel-jbang-example-catalog.json
+++
b/dsl/camel-jbang/camel-jbang-core/src/main/resources/examples/camel-jbang-example-catalog.json
@@ -226,6 +226,24 @@
"rest-api.camel.yaml"
]
},
+ {
+ "name": "message-size",
+ "title": "Message Size",
+ "description": "Track message body and header sizes per endpoint",
+ "level": "beginner",
+ "tags": [
+ "observability",
+ "monitoring",
+ "seda"
+ ],
+ "bundled": true,
+ "requiresDocker": false,
+ "hasCitrusTests": false,
+ "files": [
+ "README.md",
+ "message-size.camel.yaml"
+ ]
+ },
{
"name": "mqtt",
"title": "MQTT",
diff --git
a/dsl/camel-jbang/camel-jbang-core/src/main/resources/examples/message-size/README.md
b/dsl/camel-jbang/camel-jbang-core/src/main/resources/examples/message-size/README.md
new file mode 100644
index 000000000000..5332bdf435c9
--- /dev/null
+++
b/dsl/camel-jbang/camel-jbang-core/src/main/resources/examples/message-size/README.md
@@ -0,0 +1,57 @@
+## Message Size
+
+This example demonstrates Camel's message size tracking feature, which captures
+body and header sizes per endpoint for both incoming (IN) and outgoing (OUT)
directions.
+
+Three timer-driven producers simulate messages of different sizes (small,
medium, large)
+using the `Content-Length` header and send them to separate SEDA endpoints.
+The size statistics (min, max, mean) are tracked per endpoint and can be
viewed via the CLI.
+
+### How to run
+
+```sh
+$ camel run *
+```
+
+### Viewing message size statistics
+
+While the integration is running, open another terminal and use the `camel` CLI
+to view endpoint statistics including message sizes:
+
+```sh
+$ camel get endpoint
+```
+
+To see detailed min/max statistics:
+
+```sh
+$ camel get endpoint --verbose
+```
+
+To sort endpoints by body size (largest first):
+
+```sh
+$ camel get endpoint --sort -size
+```
+
+### How it works
+
+Message size tracking is automatically enabled when running with `camel run`
+which uses the dev profile. This sets:
+
+- `camel.main.messageSizeEnabled = true`
+- `camel.main.jmxManagementStatisticsLevel = Extended`
+
+Sizes are tracked per endpoint in the runtime endpoint registry. For incoming
messages,
+the body and headers sizes are also available as exchange properties
+(`CamelMessageBodySize` and `CamelMessageHeadersSize`) during routing.
+
+### Help and contributions
+
+If you hit any problem using Camel or have some feedback, then please
+[let us know](https://camel.apache.org/community/support/).
+
+We also love contributors, so
+[get involved](https://camel.apache.org/community/contributing/) :-)
+
+The Camel riders!
diff --git
a/dsl/camel-jbang/camel-jbang-core/src/main/resources/examples/message-size/message-size.camel.yaml
b/dsl/camel-jbang/camel-jbang-core/src/main/resources/examples/message-size/message-size.camel.yaml
new file mode 100644
index 000000000000..9b2e0361c3a4
--- /dev/null
+++
b/dsl/camel-jbang/camel-jbang-core/src/main/resources/examples/message-size/message-size.camel.yaml
@@ -0,0 +1,102 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Producers: simulate different payload sizes using Content-Length header
+- route:
+ id: small-producer
+ from:
+ uri: timer:small
+ parameters:
+ period: 2000
+ steps:
+ - setHeader:
+ name: Content-Length
+ simple: "${random(100,1023)}"
+ - setHeader:
+ name: source
+ simple: small-producer
+ - to:
+ uri: seda:small
+
+- route:
+ id: medium-producer
+ from:
+ uri: timer:medium
+ parameters:
+ period: 3000
+ steps:
+ - setHeader:
+ name: Content-Length
+ simple: "${random(8192,18432)}"
+ - setHeader:
+ name: source
+ simple: medium-producer
+ - setHeader:
+ name: tracking-id
+ simple: "TRK-${random(10000,99999)}"
+ - to:
+ uri: seda:medium
+
+- route:
+ id: large-producer
+ from:
+ uri: timer:large
+ parameters:
+ period: 5000
+ steps:
+ - setHeader:
+ name: Content-Length
+ simple: "${random(10485760,20971520)}"
+ - setHeader:
+ name: source
+ simple: large-producer
+ - setHeader:
+ name: tracking-id
+ simple: "TRK-${random(10000,99999)}"
+ - setHeader:
+ name: batch-id
+ simple: "BATCH-${random(100,999)}"
+ - setHeader:
+ name: priority
+ simple: "${random(1,5)}"
+ - to:
+ uri: seda:large
+
+# Consumers: process messages from each seda endpoint
+- route:
+ id: process-small
+ from:
+ uri: seda:small
+ steps:
+ - log:
+ message: "Small: Content-Length=${header.Content-Length}"
+
+- route:
+ id: process-medium
+ from:
+ uri: seda:medium
+ steps:
+ - log:
+ message: "Medium: Content-Length=${header.Content-Length}"
+
+- route:
+ id: process-large
+ from:
+ uri: seda:large
+ steps:
+ - log:
+ message: "Large: Content-Length=${header.Content-Length}"