This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch CAMEL-23617 in repository https://gitbox.apache.org/repos/asf/camel.git
commit aaa4588e62ee6e1b7d86f59cfb103aeb0c5367ae Author: Claus Ibsen <[email protected]> AuthorDate: Tue May 26 14:20:54 2026 +0200 CAMEL-23617: Add option to capture message payload size for observation New MessageSizeStrategy SPI with default implementation that computes incoming message body and headers sizes during routing. Sizes are tracked per route via MBean statistics (min/max/mean) and exposed as exchange properties for use in routes. Enabled via camel.main.messageSizeEnabled=true (auto-enabled in dev profile). Co-Authored-By: Claude Opus 4.6 <[email protected]> --- .../org/apache/camel/ExchangeConstantProvider.java | 4 +- .../main/java/org/apache/camel/CamelContext.java | 15 +++ .../src/main/java/org/apache/camel/Exchange.java | 2 + .../java/org/apache/camel/ExchangePropertyKey.java | 6 + .../org/apache/camel/RuntimeConfiguration.java | 16 +++ .../org/apache/camel/spi/MessageSizeStrategy.java | 58 +++++++++ .../camel/impl/engine/AbstractCamelContext.java | 32 +++++ .../camel/impl/engine/CamelInternalProcessor.java | 38 ++++++ .../impl/engine/DefaultCamelContextExtension.java | 21 ++++ .../apache/camel/impl/engine/DefaultChannel.java | 4 + .../impl/engine/DefaultMessageSizeStrategy.java | 120 +++++++++++++++++++ .../org/apache/camel/impl/engine/DefaultRoute.java | 15 +++ .../camel/impl/engine/SimpleCamelContext.java | 6 + .../apache/camel/impl/CamelContextConfigurer.java | 12 ++ .../MainConfigurationPropertiesConfigurer.java | 7 ++ .../camel-main-configuration-metadata.json | 1 + core/camel-main/src/main/docs/main.adoc | 3 +- .../camel/main/DefaultConfigurationConfigurer.java | 3 + .../camel/main/DefaultConfigurationProperties.java | 26 ++++ .../org/apache/camel/main/ProfileConfigurer.java | 1 + .../mbean/ManagedPerformanceCounterMBean.java | 18 +++ .../mbean/ManagedPerformanceCounter.java | 102 ++++++++++++++++ .../camel/management/ManagedMessageSizeTest.java | 94 +++++++++++++++ docs/user-manual/modules/ROOT/nav.adoc | 1 + docs/user-manual/modules/ROOT/pages/jmx.adoc | 14 +++ .../modules/ROOT/pages/message-size.adoc | 133 +++++++++++++++++++++ 26 files changed, 750 insertions(+), 2 deletions(-) diff --git a/core/camel-api/src/generated/java/org/apache/camel/ExchangeConstantProvider.java b/core/camel-api/src/generated/java/org/apache/camel/ExchangeConstantProvider.java index b9f4b37dd355..f1dcc4d60bd6 100644 --- a/core/camel-api/src/generated/java/org/apache/camel/ExchangeConstantProvider.java +++ b/core/camel-api/src/generated/java/org/apache/camel/ExchangeConstantProvider.java @@ -15,7 +15,7 @@ public class ExchangeConstantProvider { private static final Map<String, String> MAP; static { - Map<String, String> map = new HashMap<>(149); + Map<String, String> map = new HashMap<>(151); map.put("AGGREGATED_COLLECTION_GUARD", "CamelAggregatedCollectionGuard"); map.put("AGGREGATED_COMPLETED_BY", "CamelAggregatedCompletedBy"); map.put("AGGREGATED_CORRELATION_KEY", "CamelAggregatedCorrelationKey"); @@ -113,6 +113,8 @@ public class ExchangeConstantProvider { map.put("MAXIMUM_SIMPLE_CACHE_SIZE", "CamelMaximumSimpleCacheSize"); map.put("MAXIMUM_TRANSFORMER_CACHE_SIZE", "CamelMaximumTransformerCacheSize"); map.put("MAXIMUM_VALIDATOR_CACHE_SIZE", "CamelMaximumValidatorCacheSize"); + map.put("MESSAGE_BODY_SIZE", "CamelMessageBodySize"); + map.put("MESSAGE_HEADERS_SIZE", "CamelMessageHeadersSize"); map.put("MESSAGE_HISTORY", "CamelMessageHistory"); map.put("MESSAGE_HISTORY_HEADER_FORMAT", "CamelMessageHistoryHeaderFormat"); map.put("MESSAGE_HISTORY_OUTPUT_FORMAT", "CamelMessageHistoryOutputFormat"); diff --git a/core/camel-api/src/main/java/org/apache/camel/CamelContext.java b/core/camel-api/src/main/java/org/apache/camel/CamelContext.java index b032c92853b3..1411af29db56 100644 --- a/core/camel-api/src/main/java/org/apache/camel/CamelContext.java +++ b/core/camel-api/src/main/java/org/apache/camel/CamelContext.java @@ -39,6 +39,7 @@ import org.apache.camel.spi.LifecycleStrategy; import org.apache.camel.spi.ManagementNameStrategy; import org.apache.camel.spi.ManagementStrategy; import org.apache.camel.spi.MessageHistoryFactory; +import org.apache.camel.spi.MessageSizeStrategy; import org.apache.camel.spi.PropertiesComponent; import org.apache.camel.spi.Registry; import org.apache.camel.spi.RestConfiguration; @@ -1771,6 +1772,20 @@ public interface CamelContext extends CamelContextLifecycle, RuntimeConfiguratio */ void setStreamCachingStrategy(StreamCachingStrategy streamCachingStrategy); + /** + * Gets the {@link MessageSizeStrategy} to use. + * + * @since 4.21 + */ + MessageSizeStrategy getMessageSizeStrategy(); + + /** + * Sets a custom {@link MessageSizeStrategy} to use. + * + * @since 4.21 + */ + void setMessageSizeStrategy(MessageSizeStrategy messageSizeStrategy); + /** * Gets the {@link org.apache.camel.spi.RuntimeEndpointRegistry} to use, or <tt>null</tt> if none is in use. */ diff --git a/core/camel-api/src/main/java/org/apache/camel/Exchange.java b/core/camel-api/src/main/java/org/apache/camel/Exchange.java index 595cecdfd8e4..4d1ab3e3e1d3 100644 --- a/core/camel-api/src/main/java/org/apache/camel/Exchange.java +++ b/core/camel-api/src/main/java/org/apache/camel/Exchange.java @@ -223,6 +223,8 @@ public interface Exchange extends VariableAware { String MESSAGE_HISTORY_HEADER_FORMAT = "CamelMessageHistoryHeaderFormat"; String MESSAGE_HISTORY_OUTPUT_FORMAT = "CamelMessageHistoryOutputFormat"; String MESSAGE_TIMESTAMP = "CamelMessageTimestamp"; + String MESSAGE_BODY_SIZE = "CamelMessageBodySize"; + String MESSAGE_HEADERS_SIZE = "CamelMessageHeadersSize"; @Metadata(label = "multicast", description = "An index counter that increases for each Exchange being multicasted. The counter starts from 0.", javaType = "int") diff --git a/core/camel-api/src/main/java/org/apache/camel/ExchangePropertyKey.java b/core/camel-api/src/main/java/org/apache/camel/ExchangePropertyKey.java index 650960c95b77..c106badb8783 100644 --- a/core/camel-api/src/main/java/org/apache/camel/ExchangePropertyKey.java +++ b/core/camel-api/src/main/java/org/apache/camel/ExchangePropertyKey.java @@ -72,6 +72,8 @@ public enum ExchangePropertyKey { INTERCEPT_SEND_TO_ENDPOINT_WHEN_MATCHED(Exchange.INTERCEPT_SEND_TO_ENDPOINT_WHEN_MATCHED), LOOP_INDEX(Exchange.LOOP_INDEX), LOOP_SIZE(Exchange.LOOP_SIZE), + MESSAGE_BODY_SIZE(Exchange.MESSAGE_BODY_SIZE), + MESSAGE_HEADERS_SIZE(Exchange.MESSAGE_HEADERS_SIZE), MESSAGE_HISTORY(Exchange.MESSAGE_HISTORY), MULTICAST_COMPLETE(Exchange.MULTICAST_COMPLETE), MULTICAST_INDEX(Exchange.MULTICAST_INDEX), @@ -190,6 +192,10 @@ public enum ExchangePropertyKey { return LOOP_INDEX; case Exchange.LOOP_SIZE: return LOOP_SIZE; + case Exchange.MESSAGE_BODY_SIZE: + return MESSAGE_BODY_SIZE; + case Exchange.MESSAGE_HEADERS_SIZE: + return MESSAGE_HEADERS_SIZE; case Exchange.MESSAGE_HISTORY: return MESSAGE_HISTORY; case Exchange.MULTICAST_COMPLETE: diff --git a/core/camel-api/src/main/java/org/apache/camel/RuntimeConfiguration.java b/core/camel-api/src/main/java/org/apache/camel/RuntimeConfiguration.java index 4aef5cda746b..e7296089d257 100644 --- a/core/camel-api/src/main/java/org/apache/camel/RuntimeConfiguration.java +++ b/core/camel-api/src/main/java/org/apache/camel/RuntimeConfiguration.java @@ -38,6 +38,22 @@ public interface RuntimeConfiguration { */ Boolean isStreamCaching(); + /** + * Sets whether message size capturing is enabled or not (default is disabled). + * + * @param messageSize whether message size is enabled or not + * @since 4.21 + */ + void setMessageSize(Boolean messageSize); + + /** + * Returns whether message size capturing is enabled. + * + * @return <tt>true</tt> if message size is enabled + * @since 4.21 + */ + Boolean isMessageSize(); + /** * Returns whether tracing enabled * diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/MessageSizeStrategy.java b/core/camel-api/src/main/java/org/apache/camel/spi/MessageSizeStrategy.java new file mode 100644 index 000000000000..c5f815d6f471 --- /dev/null +++ b/core/camel-api/src/main/java/org/apache/camel/spi/MessageSizeStrategy.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spi; + +import org.apache.camel.Message; +import org.apache.camel.StaticService; + +/** + * Strategy for computing the size of message payloads (body and headers) for observation purposes. + * <p/> + * Not all message body types can be sized efficiently. For example, Java objects have no inherent "size", and streaming + * payloads would be destructive to read unless stream caching is enabled. Implementations should return -1 when the + * size cannot be determined without side effects. + * + * @since 4.21 + */ +public interface MessageSizeStrategy extends StaticService { + + /** + * Whether message size computation is enabled. + */ + void setEnabled(boolean enabled); + + /** + * Whether message size computation is enabled. + */ + boolean isEnabled(); + + /** + * Computes the size of the message body in bytes. + * + * @param message the message + * @return the body size in bytes, 0 if the body is null, or -1 if the size cannot be determined + */ + long computeBodySize(Message message); + + /** + * Computes the total size of all message headers in bytes (keys and values). + * + * @param message the message + * @return the total headers size in bytes, or -1 if the size cannot be determined + */ + long computeHeadersSize(Message message); +} diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java index 9df88ddebe8e..ceb556bfae20 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java @@ -137,6 +137,7 @@ import org.apache.camel.spi.LifecycleStrategy; import org.apache.camel.spi.ManagementNameStrategy; import org.apache.camel.spi.ManagementStrategy; import org.apache.camel.spi.MessageHistoryFactory; +import org.apache.camel.spi.MessageSizeStrategy; import org.apache.camel.spi.ModelJAXBContextFactory; import org.apache.camel.spi.ModelToStructureDumper; import org.apache.camel.spi.ModelToXMLDumper; @@ -278,6 +279,7 @@ public abstract class AbstractCamelContext extends BaseService private Boolean logMask = Boolean.FALSE; private Boolean logExhaustedMessageBody = Boolean.FALSE; private Boolean streamCache = Boolean.TRUE; + private Boolean messageSize = Boolean.FALSE; private Boolean disableJMX = Boolean.FALSE; private Boolean loadTypeConverters = Boolean.FALSE; private Boolean loadHealthChecks = Boolean.FALSE; @@ -2009,6 +2011,16 @@ public abstract class AbstractCamelContext extends BaseService return streamCache; } + @Override + public void setMessageSize(Boolean messageSize) { + this.messageSize = messageSize; + } + + @Override + public Boolean isMessageSize() { + return messageSize; + } + @Override public void setTracing(Boolean tracing) { this.trace = tracing; @@ -3136,6 +3148,10 @@ public abstract class AbstractCamelContext extends BaseService + " See more details at https://camel.apache.org/stream-caching.html"); } + if (isMessageSizeInUse()) { + getMessageSizeStrategy().setEnabled(true); + } + if (isAllowUseOriginalMessage()) { LOG.debug("AllowUseOriginalMessage enabled because UseOriginalMessage is in use"); } @@ -3562,6 +3578,10 @@ public abstract class AbstractCamelContext extends BaseService return isStreamCaching(); } + protected boolean isMessageSizeInUse() throws Exception { + return isMessageSize(); + } + protected void bindDataFormats() throws Exception { } @@ -4336,6 +4356,16 @@ public abstract class AbstractCamelContext extends BaseService camelContextExtension.setStreamCachingStrategy(streamCachingStrategy); } + @Override + public MessageSizeStrategy getMessageSizeStrategy() { + return camelContextExtension.getMessageSizeStrategy(); + } + + @Override + public void setMessageSizeStrategy(MessageSizeStrategy messageSizeStrategy) { + camelContextExtension.setMessageSizeStrategy(messageSizeStrategy); + } + @Override public RestRegistry getRestRegistry() { return PluginHelper.getRestRegistry(this); @@ -4455,6 +4485,8 @@ public abstract class AbstractCamelContext extends BaseService protected abstract StreamCachingStrategy createStreamCachingStrategy(); + protected abstract MessageSizeStrategy createMessageSizeStrategy(); + protected abstract TypeConverter createTypeConverter(); protected abstract TypeConverterRegistry createTypeConverterRegistry(); diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java index 66c56142faf0..c0d276bc148a 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java @@ -52,6 +52,7 @@ import org.apache.camel.spi.InflightRepository; import org.apache.camel.spi.InternalProcessor; import org.apache.camel.spi.ManagementInterceptStrategy.InstrumentationProcessor; import org.apache.camel.spi.MessageHistoryFactory; +import org.apache.camel.spi.MessageSizeStrategy; import org.apache.camel.spi.PooledObjectFactory; import org.apache.camel.spi.ReactiveExecutor; import org.apache.camel.spi.RoutePolicy; @@ -1316,6 +1317,43 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In } } + /** + * Advice for {@link org.apache.camel.spi.MessageSizeStrategy} + */ + public static class MessageSizeAdvice implements CamelInternalProcessorAdvice, Ordered { + + private final MessageSizeStrategy strategy; + + public MessageSizeAdvice(MessageSizeStrategy strategy) { + this.strategy = strategy; + } + + @Override + public Object before(Exchange exchange) throws Exception { + long bodySize = strategy.computeBodySize(exchange.getIn()); + long headersSize = strategy.computeHeadersSize(exchange.getIn()); + exchange.setProperty(ExchangePropertyKey.MESSAGE_BODY_SIZE, bodySize); + exchange.setProperty(ExchangePropertyKey.MESSAGE_HEADERS_SIZE, headersSize); + return null; + } + + @Override + public void after(Exchange exchange, Object data) throws Exception { + // noop + } + + @Override + public boolean hasState() { + return false; + } + + @Override + public int getOrder() { + // after stream caching (HIGHEST) but before instrumentation (LOWEST - 2) + return Ordered.HIGHEST + 1; + } + } + /** * Advice for delaying */ diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultCamelContextExtension.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultCamelContextExtension.java index 1ad1b4f4cba2..9bfa5d15d3f6 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultCamelContextExtension.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultCamelContextExtension.java @@ -63,6 +63,7 @@ import org.apache.camel.spi.ManagementNameStrategy; import org.apache.camel.spi.ManagementStrategy; import org.apache.camel.spi.ManagementStrategyFactory; import org.apache.camel.spi.MessageHistoryFactory; +import org.apache.camel.spi.MessageSizeStrategy; import org.apache.camel.spi.NormalizedEndpointUri; import org.apache.camel.spi.PluginManager; import org.apache.camel.spi.ProcessorExchangeFactory; @@ -127,6 +128,7 @@ class DefaultCamelContextExtension implements ExtendedCamelContext { private volatile ClassResolver classResolver; private volatile MessageHistoryFactory messageHistoryFactory; private volatile StreamCachingStrategy streamCachingStrategy; + private volatile MessageSizeStrategy messageSizeStrategy; private volatile InflightRepository inflightRepository; private volatile ErrorRegistry errorRegistry; private volatile UuidGenerator uuidGenerator; @@ -865,6 +867,25 @@ class DefaultCamelContextExtension implements ExtendedCamelContext { = camelContext.getInternalServiceManager().addService(camelContext, streamCachingStrategy, true, false, true); } + MessageSizeStrategy getMessageSizeStrategy() { + if (messageSizeStrategy == null) { + lock.lock(); + try { + if (messageSizeStrategy == null) { + setMessageSizeStrategy(camelContext.createMessageSizeStrategy()); + } + } finally { + lock.unlock(); + } + } + return messageSizeStrategy; + } + + void setMessageSizeStrategy(MessageSizeStrategy messageSizeStrategy) { + this.messageSizeStrategy + = camelContext.getInternalServiceManager().addService(camelContext, messageSizeStrategy, true, false, true); + } + InflightRepository getInflightRepository() { if (inflightRepository == null) { lock.lock(); diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java index 34c4acb82499..69e53ac0c3b1 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java @@ -262,6 +262,10 @@ public class DefaultChannel extends CamelInternalProcessor implements Channel { addAdvice(new StreamCachingAdvice(camelContext.getStreamCachingStrategy())); } + if (route.isMessageSize()) { + addAdvice(new MessageSizeAdvice(camelContext.getMessageSizeStrategy())); + } + if (route.getDelayer() != null && route.getDelayer() > 0) { addAdvice(new DelayerAdvice(route.getDelayer())); } diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultMessageSizeStrategy.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultMessageSizeStrategy.java new file mode 100644 index 000000000000..cf06d9929556 --- /dev/null +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultMessageSizeStrategy.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl.engine; + +import java.io.File; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Map; + +import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.WrappedFile; +import org.apache.camel.StreamCache; +import org.apache.camel.spi.MessageSizeStrategy; +import org.apache.camel.support.service.ServiceSupport; + +/** + * Default implementation of {@link MessageSizeStrategy} that computes message payload sizes for known body types. + */ +public class DefaultMessageSizeStrategy extends ServiceSupport implements CamelContextAware, MessageSizeStrategy { + + private CamelContext camelContext; + private boolean enabled; + + @Override + public CamelContext getCamelContext() { + return camelContext; + } + + @Override + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; + } + + @Override + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + @Override + public boolean isEnabled() { + return enabled; + } + + @Override + public long computeBodySize(Message message) { + Object body = message.getBody(); + if (body == null) { + return 0; + } + if (body instanceof byte[] bytes) { + return bytes.length; + } + if (body instanceof String str) { + return str.getBytes(StandardCharsets.UTF_8).length; + } + if (body instanceof StreamCache sc) { + long len = sc.length(); + return len > 0 ? len : -1; + } + if (body instanceof WrappedFile<?> wf) { + long len = wf.getFileLength(); + return len > 0 ? len : -1; + } + if (body instanceof File f) { + return f.length(); + } + if (body instanceof Path p) { + try { + return Files.size(p); + } catch (Exception e) { + return -1; + } + } + // fallback to Content-Length header (e.g. HTTP components where body is a stream) + Long cl = message.getHeader(Exchange.CONTENT_LENGTH, Long.class); + if (cl != null && cl >= 0) { + return cl; + } + return -1; + } + + @Override + public long computeHeadersSize(Message message) { + Map<String, Object> headers = message.getHeaders(); + if (headers == null || headers.isEmpty()) { + return 0; + } + long total = 0; + for (Map.Entry<String, Object> entry : headers.entrySet()) { + String key = entry.getKey(); + if (key != null) { + total += key.getBytes(StandardCharsets.UTF_8).length; + } + Object value = entry.getValue(); + if (value != null) { + String s = value.toString(); + total += s.getBytes(StandardCharsets.UTF_8).length; + } + } + return total; + } +} diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java index 59ad429699d0..4d218f606174 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java @@ -90,6 +90,7 @@ public class DefaultRoute extends ServiceSupport implements Route { private Boolean logMask; private Boolean logExhaustedMessageBody; private Boolean streamCache; + private Boolean messageSize; private Long delay; private Boolean autoStartup = Boolean.TRUE; private final List<RoutePolicy> routePolicyList = new ArrayList<>(); @@ -505,6 +506,20 @@ public class DefaultRoute extends ServiceSupport implements Route { } } + @Override + public void setMessageSize(Boolean messageSize) { + this.messageSize = messageSize; + } + + @Override + public Boolean isMessageSize() { + if (messageSize != null) { + return messageSize; + } else { + return camelContext.isMessageSize(); + } + } + @Override public void setDelayer(Long delay) { this.delay = delay; diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java index 089c8c20298d..bbef1f98be5a 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java @@ -69,6 +69,7 @@ import org.apache.camel.spi.InternalProcessorFactory; import org.apache.camel.spi.LanguageResolver; import org.apache.camel.spi.ManagementNameStrategy; import org.apache.camel.spi.MessageHistoryFactory; +import org.apache.camel.spi.MessageSizeStrategy; import org.apache.camel.spi.ModelJAXBContextFactory; import org.apache.camel.spi.ModelToStructureDumper; import org.apache.camel.spi.ModelToXMLDumper; @@ -629,6 +630,11 @@ public class SimpleCamelContext extends AbstractCamelContext { return new DefaultStreamCachingStrategy(); } + @Override + protected MessageSizeStrategy createMessageSizeStrategy() { + return new DefaultMessageSizeStrategy(); + } + @Override protected ExchangeFactory createExchangeFactory() { Optional<ExchangeFactory> result = ResolverHelper.resolveService( diff --git a/core/camel-core-engine/src/generated/java/org/apache/camel/impl/CamelContextConfigurer.java b/core/camel-core-engine/src/generated/java/org/apache/camel/impl/CamelContextConfigurer.java index cceb0c83c9c2..9c1204342527 100644 --- a/core/camel-core-engine/src/generated/java/org/apache/camel/impl/CamelContextConfigurer.java +++ b/core/camel-core-engine/src/generated/java/org/apache/camel/impl/CamelContextConfigurer.java @@ -83,6 +83,10 @@ public class CamelContextConfigurer extends org.apache.camel.support.component.P case "messageHistory": target.setMessageHistory(property(camelContext, java.lang.Boolean.class, value)); return true; case "messagehistoryfactory": case "messageHistoryFactory": target.setMessageHistoryFactory(property(camelContext, org.apache.camel.spi.MessageHistoryFactory.class, value)); return true; + case "messagesize": + case "messageSize": target.setMessageSize(property(camelContext, java.lang.Boolean.class, value)); return true; + case "messagesizestrategy": + case "messageSizeStrategy": target.setMessageSizeStrategy(property(camelContext, org.apache.camel.spi.MessageSizeStrategy.class, value)); return true; case "modeline": target.setModeline(property(camelContext, java.lang.Boolean.class, value)); return true; case "namestrategy": case "nameStrategy": target.setNameStrategy(property(camelContext, org.apache.camel.spi.CamelContextNameStrategy.class, value)); return true; @@ -205,6 +209,10 @@ public class CamelContextConfigurer extends org.apache.camel.support.component.P case "messageHistory": return java.lang.Boolean.class; case "messagehistoryfactory": case "messageHistoryFactory": return org.apache.camel.spi.MessageHistoryFactory.class; + case "messagesize": + case "messageSize": return java.lang.Boolean.class; + case "messagesizestrategy": + case "messageSizeStrategy": return org.apache.camel.spi.MessageSizeStrategy.class; case "modeline": return java.lang.Boolean.class; case "namestrategy": case "nameStrategy": return org.apache.camel.spi.CamelContextNameStrategy.class; @@ -328,6 +336,10 @@ public class CamelContextConfigurer extends org.apache.camel.support.component.P case "messageHistory": return target.isMessageHistory(); case "messagehistoryfactory": case "messageHistoryFactory": return target.getMessageHistoryFactory(); + case "messagesize": + case "messageSize": return target.isMessageSize(); + case "messagesizestrategy": + case "messageSizeStrategy": return target.getMessageSizeStrategy(); case "modeline": return target.isModeline(); case "namestrategy": case "nameStrategy": return target.getNameStrategy(); diff --git a/core/camel-main/src/generated/java/org/apache/camel/main/MainConfigurationPropertiesConfigurer.java b/core/camel-main/src/generated/java/org/apache/camel/main/MainConfigurationPropertiesConfigurer.java index 1f0ddf7f7ff8..d21a1ab3029f 100644 --- a/core/camel-main/src/generated/java/org/apache/camel/main/MainConfigurationPropertiesConfigurer.java +++ b/core/camel-main/src/generated/java/org/apache/camel/main/MainConfigurationPropertiesConfigurer.java @@ -96,6 +96,7 @@ public class MainConfigurationPropertiesConfigurer extends org.apache.camel.supp map.put("MainListeners", java.util.List.class); map.put("MdcLoggingKeysPattern", java.lang.String.class); map.put("MessageHistory", boolean.class); + map.put("MessageSizeEnabled", boolean.class); map.put("Modeline", boolean.class); map.put("Name", java.lang.String.class); map.put("ProducerTemplateCacheSize", int.class); @@ -307,6 +308,8 @@ public class MainConfigurationPropertiesConfigurer extends org.apache.camel.supp case "mdcLoggingKeysPattern": target.setMdcLoggingKeysPattern(property(camelContext, java.lang.String.class, value)); return true; case "messagehistory": case "messageHistory": target.setMessageHistory(property(camelContext, boolean.class, value)); return true; + case "messagesizeenabled": + case "messageSizeEnabled": target.setMessageSizeEnabled(property(camelContext, boolean.class, value)); return true; case "modeline": target.setModeline(property(camelContext, boolean.class, value)); return true; case "name": target.setName(property(camelContext, java.lang.String.class, value)); return true; case "producertemplatecachesize": @@ -577,6 +580,8 @@ public class MainConfigurationPropertiesConfigurer extends org.apache.camel.supp case "mdcLoggingKeysPattern": return java.lang.String.class; case "messagehistory": case "messageHistory": return boolean.class; + case "messagesizeenabled": + case "messageSizeEnabled": return boolean.class; case "modeline": return boolean.class; case "name": return java.lang.String.class; case "producertemplatecachesize": @@ -843,6 +848,8 @@ public class MainConfigurationPropertiesConfigurer extends org.apache.camel.supp case "mdcLoggingKeysPattern": return target.getMdcLoggingKeysPattern(); case "messagehistory": case "messageHistory": return target.isMessageHistory(); + case "messagesizeenabled": + case "messageSizeEnabled": return target.isMessageSizeEnabled(); case "modeline": return target.isModeline(); case "name": return target.getName(); case "producertemplatecachesize": diff --git a/core/camel-main/src/generated/resources/META-INF/camel-main-configuration-metadata.json b/core/camel-main/src/generated/resources/META-INF/camel-main-configuration-metadata.json index 9e39a3ecdeff..a29587c0c21e 100644 --- a/core/camel-main/src/generated/resources/META-INF/camel-main-configuration-metadata.json +++ b/core/camel-main/src/generated/resources/META-INF/camel-main-configuration-metadata.json @@ -104,6 +104,7 @@ { "name": "camel.main.mainListeners", "required": false, "description": "Sets main listener objects that will be used for MainListener that makes it possible to do custom logic during starting and stopping camel-main.", "sourceType": "org.apache.camel.main.MainConfigurationProperties", "type": "array", "javaType": "java.util.List", "secret": false }, { "name": "camel.main.mdcLoggingKeysPattern", "required": false, "description": "Sets the pattern used for determine which custom MDC keys to propagate during message routing when the routing engine continues routing asynchronously for the given message. Setting this pattern to will propagate all custom keys. Or setting the pattern to foo,bar will propagate any keys starting with either foo or bar. Notice that a set of standard Camel MDC keys are always propagated which starts with c [...] { "name": "camel.main.messageHistory", "required": false, "description": "Sets whether message history is enabled or not. Default is false.", "sourceType": "org.apache.camel.main.DefaultConfigurationProperties", "type": "boolean", "javaType": "boolean", "defaultValue": false, "secret": false }, + { "name": "camel.main.messageSizeEnabled", "required": false, "description": "Sets whether message size observation is enabled (default is false). When enabled, Camel will compute the size of incoming message body and headers (in bytes) per route and make this available via JMX MBeans (min\/max\/mean body size and headers size).", "sourceType": "org.apache.camel.main.DefaultConfigurationProperties", "type": "boolean", "javaType": "boolean", "defaultValue": false, "secret": false }, { "name": "camel.main.modeline", "required": false, "description": "Whether to support JBang style \/\/DEPS to specify additional dependencies when running Camel JBang", "sourceType": "org.apache.camel.main.DefaultConfigurationProperties", "type": "boolean", "javaType": "boolean", "defaultValue": false, "secret": false }, { "name": "camel.main.name", "required": false, "description": "Sets the name of the CamelContext.", "sourceType": "org.apache.camel.main.DefaultConfigurationProperties", "type": "string", "javaType": "java.lang.String", "secret": false }, { "name": "camel.main.producerTemplateCacheSize", "required": false, "description": "Producer template endpoints cache size.", "sourceType": "org.apache.camel.main.DefaultConfigurationProperties", "type": "integer", "javaType": "int", "defaultValue": 1000, "secret": false }, diff --git a/core/camel-main/src/main/docs/main.adoc b/core/camel-main/src/main/docs/main.adoc index ba4bd0907e18..4a82ef095fbe 100644 --- a/core/camel-main/src/main/docs/main.adoc +++ b/core/camel-main/src/main/docs/main.adoc @@ -19,7 +19,7 @@ The following tables lists all the options: // main options: START === Camel Main configurations -The camel.main supports 132 options, which are listed below. +The camel.main supports 133 options, which are listed below. [width="100%",cols="2,5,^1,2",options="header"] |=== @@ -98,6 +98,7 @@ The camel.main supports 132 options, which are listed below. | *camel.main.mainListeners* | Sets main listener objects that will be used for MainListener that makes it possible to do custom logic during starting and stopping camel-main. | | List | *camel.main.mdcLoggingKeysPattern* | Sets the pattern used for determine which custom MDC keys to propagate during message routing when the routing engine continues routing asynchronously for the given message. Setting this pattern to will propagate all custom keys. Or setting the pattern to foo,bar will propagate any keys starting with either foo or bar. Notice that a set of standard Camel MDC keys are always propagated which starts with camel. as key name. The match rules are applied [...] | *camel.main.messageHistory* | Sets whether message history is enabled or not. Default is false. | false | boolean +| *camel.main.messageSizeEnabled* | Sets whether message size observation is enabled (default is false). When enabled, Camel will compute the size of incoming message body and headers (in bytes) per route and make this available via JMX MBeans (min/max/mean body size and headers size). | false | boolean | *camel.main.modeline* | Whether to support JBang style //DEPS to specify additional dependencies when running Camel JBang | false | boolean | *camel.main.name* | Sets the name of the CamelContext. | | String | *camel.main.producerTemplateCacheSize* | Producer template endpoints cache size. | 1000 | int diff --git a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java index 84793d6100f3..6ecd82204c1b 100644 --- a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java +++ b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java @@ -254,6 +254,9 @@ public final class DefaultConfigurationConfigurer { } } + // message size + camelContext.setMessageSize(config.isMessageSizeEnabled()); + if ("default".equals(config.getUuidGenerator())) { camelContext.setUuidGenerator(new DefaultUuidGenerator()); } else if ("short".equals(config.getUuidGenerator())) { diff --git a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationProperties.java b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationProperties.java index b341db33d56f..897106b33263 100644 --- a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationProperties.java +++ b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationProperties.java @@ -85,6 +85,7 @@ public abstract class DefaultConfigurationProperties<T> { private int streamCachingBufferSize; private boolean streamCachingRemoveSpoolDirectoryWhenStopping = true; private boolean streamCachingStatisticsEnabled; + private boolean messageSizeEnabled; private boolean typeConverterStatisticsEnabled; private boolean tracing; private boolean tracingStandby; @@ -711,6 +712,20 @@ public abstract class DefaultConfigurationProperties<T> { this.streamCachingStatisticsEnabled = streamCachingStatisticsEnabled; } + public boolean isMessageSizeEnabled() { + return messageSizeEnabled; + } + + /** + * Sets whether message size observation is enabled (default is false). + * + * When enabled, Camel will compute the size of incoming message body and headers (in bytes) per route and make this + * available via JMX MBeans (min/max/mean body size and headers size). + */ + public void setMessageSizeEnabled(boolean messageSizeEnabled) { + this.messageSizeEnabled = messageSizeEnabled; + } + public boolean isTypeConverterStatisticsEnabled() { return typeConverterStatisticsEnabled; } @@ -2186,6 +2201,17 @@ public abstract class DefaultConfigurationProperties<T> { return (T) this; } + /** + * Sets whether message size observation is enabled (default is false). + * + * When enabled, Camel will compute the size of incoming message body and headers (in bytes) per route and make this + * available via JMX MBeans (min/max/mean body size and headers size). + */ + public T withMessageSizeEnabled(boolean messageSizeEnabled) { + this.messageSizeEnabled = messageSizeEnabled; + return (T) this; + } + /** * Sets whether type converter statistics is enabled. * diff --git a/core/camel-main/src/main/java/org/apache/camel/main/ProfileConfigurer.java b/core/camel-main/src/main/java/org/apache/camel/main/ProfileConfigurer.java index dd6b6b5643f5..27c753c6822b 100644 --- a/core/camel-main/src/main/java/org/apache/camel/main/ProfileConfigurer.java +++ b/core/camel-main/src/main/java/org/apache/camel/main/ProfileConfigurer.java @@ -99,6 +99,7 @@ public class ProfileConfigurer { config.setLoadStatisticsEnabled(true); config.setMessageHistory(true); config.setInflightRepositoryBrowseEnabled(true); + config.setMessageSizeEnabled(true); config.setEndpointRuntimeStatisticsEnabled(true); config.setJmxManagementStatisticsLevel(ManagementStatisticsLevel.Extended); config.setJmxUpdateRouteEnabled(true); diff --git a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedPerformanceCounterMBean.java b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedPerformanceCounterMBean.java index b7b1f83d72ad..a76fcad296b4 100644 --- a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedPerformanceCounterMBean.java +++ b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedPerformanceCounterMBean.java @@ -90,6 +90,24 @@ public interface ManagedPerformanceCounterMBean extends ManagedCounterMBean { @ManagedAttribute(description = "First Exchange Failed ExchangeId") String getFirstExchangeFailureExchangeId(); + @ManagedAttribute(description = "Min Body Size [bytes]") + long getMinBodySize(); + + @ManagedAttribute(description = "Max Body Size [bytes]") + long getMaxBodySize(); + + @ManagedAttribute(description = "Mean Body Size [bytes]") + long getMeanBodySize(); + + @ManagedAttribute(description = "Min Headers Size [bytes]") + long getMinHeadersSize(); + + @ManagedAttribute(description = "Max Headers Size [bytes]") + long getMaxHeadersSize(); + + @ManagedAttribute(description = "Mean Headers Size [bytes]") + long getMeanHeadersSize(); + @ManagedAttribute(description = "Statistics enabled") boolean isStatisticsEnabled(); diff --git a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedPerformanceCounter.java b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedPerformanceCounter.java index f60378d8a5f6..262b5404a73c 100644 --- a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedPerformanceCounter.java +++ b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedPerformanceCounter.java @@ -21,6 +21,7 @@ import java.util.Date; import java.util.Map; import org.apache.camel.Exchange; +import org.apache.camel.ExchangePropertyKey; import org.apache.camel.api.management.ManagedResource; import org.apache.camel.api.management.mbean.ManagedPerformanceCounterMBean; import org.apache.camel.management.PerformanceCounter; @@ -55,6 +56,16 @@ public abstract class ManagedPerformanceCounter extends ManagedCounter private String lastExchangeCompletedExchangeId; private Statistic lastExchangeFailureTimestamp; private String lastExchangeFailureExchangeId; + private Statistic minBodySize; + private Statistic maxBodySize; + private Statistic totalBodySize; + private Statistic meanBodySize; + private Statistic minHeadersSize; + private Statistic maxHeadersSize; + private Statistic totalHeadersSize; + private Statistic meanHeadersSize; + private long exchangesWithBodySize; + private long exchangesWithHeadersSize; private boolean statisticsEnabled = true; @Override @@ -80,6 +91,15 @@ public abstract class ManagedPerformanceCounter extends ManagedCounter this.lastExchangeCreatedTimestamp = new StatisticValue(); this.lastExchangeCompletedTimestamp = new StatisticValue(); this.lastExchangeFailureTimestamp = new StatisticValue(); + + this.minBodySize = new StatisticMinimum(); + this.maxBodySize = new StatisticMaximum(); + this.totalBodySize = new StatisticCounter(); + this.meanBodySize = new StatisticValue(); + this.minHeadersSize = new StatisticMinimum(); + this.maxHeadersSize = new StatisticMaximum(); + this.totalHeadersSize = new StatisticCounter(); + this.meanHeadersSize = new StatisticValue(); } @Override @@ -106,6 +126,16 @@ public abstract class ManagedPerformanceCounter extends ManagedCounter lastExchangeCompletedExchangeId = null; lastExchangeFailureTimestamp.reset(); lastExchangeFailureExchangeId = null; + minBodySize.reset(); + maxBodySize.reset(); + totalBodySize.reset(); + meanBodySize.reset(); + minHeadersSize.reset(); + maxHeadersSize.reset(); + totalHeadersSize.reset(); + meanHeadersSize.reset(); + exchangesWithBodySize = 0; + exchangesWithHeadersSize = 0; } @Override @@ -184,6 +214,36 @@ public abstract class ManagedPerformanceCounter extends ManagedCounter return -1; } + @Override + public long getMinBodySize() { + return minBodySize.getValue(); + } + + @Override + public long getMaxBodySize() { + return maxBodySize.getValue(); + } + + @Override + public long getMeanBodySize() { + return meanBodySize.getValue(); + } + + @Override + public long getMinHeadersSize() { + return minHeadersSize.getValue(); + } + + @Override + public long getMaxHeadersSize() { + return maxHeadersSize.getValue(); + } + + @Override + public long getMeanHeadersSize() { + return meanHeadersSize.getValue(); + } + @Override public Date getLastExchangeCreatedTimestamp() { long value = lastExchangeCreatedTimestamp.getValue(); @@ -290,6 +350,28 @@ public abstract class ManagedPerformanceCounter extends ManagedCounter mean = totalProcessingTime.getValue() / completed; } meanProcessingTime.updateValue(mean); + + // update body/headers size stats (values represent the incoming message, set by MessageSizeAdvice before processing) + Long bodySize = exchange.getProperty(ExchangePropertyKey.MESSAGE_BODY_SIZE, Long.class); + if (bodySize != null && bodySize >= 0) { + minBodySize.updateValue(bodySize); + maxBodySize.updateValue(bodySize); + totalBodySize.updateValue(bodySize); + exchangesWithBodySize++; + if (exchangesWithBodySize > 0) { + meanBodySize.updateValue(totalBodySize.getValue() / exchangesWithBodySize); + } + } + Long headersSize = exchange.getProperty(ExchangePropertyKey.MESSAGE_HEADERS_SIZE, Long.class); + if (headersSize != null && headersSize >= 0) { + minHeadersSize.updateValue(headersSize); + maxHeadersSize.updateValue(headersSize); + totalHeadersSize.updateValue(headersSize); + exchangesWithHeadersSize++; + if (exchangesWithHeadersSize > 0) { + meanHeadersSize.updateValue(totalHeadersSize.getValue() / exchangesWithHeadersSize); + } + } } @Override @@ -334,6 +416,16 @@ public abstract class ManagedPerformanceCounter extends ManagedCounter sb.append(String.format(" deltaProcessingTime=\"%s\"", deltaProcessingTime.getValue())); sb.append(String.format(" meanProcessingTime=\"%s\"", meanProcessingTime.getValue())); sb.append(String.format(" idleSince=\"%s\"", getIdleSince())); + if (minBodySize.isUpdated()) { + sb.append(String.format(" minBodySize=\"%s\"", minBodySize.getValue())); + sb.append(String.format(" maxBodySize=\"%s\"", maxBodySize.getValue())); + sb.append(String.format(" meanBodySize=\"%s\"", meanBodySize.getValue())); + } + if (minHeadersSize.isUpdated()) { + sb.append(String.format(" minHeadersSize=\"%s\"", minHeadersSize.getValue())); + sb.append(String.format(" maxHeadersSize=\"%s\"", maxHeadersSize.getValue())); + sb.append(String.format(" meanHeadersSize=\"%s\"", meanHeadersSize.getValue())); + } if (fullStats) { sb.append(String.format(" startTimestamp=\"%s\"", dateAsString(startTimestamp.getTime()))); @@ -379,6 +471,16 @@ public abstract class ManagedPerformanceCounter extends ManagedCounter jo.put("deltaProcessingTime", deltaProcessingTime.getValue()); jo.put("meanProcessingTime", meanProcessingTime.getValue()); jo.put("idleSince", getIdleSince()); + if (minBodySize.isUpdated()) { + jo.put("minBodySize", minBodySize.getValue()); + jo.put("maxBodySize", maxBodySize.getValue()); + jo.put("meanBodySize", meanBodySize.getValue()); + } + if (minHeadersSize.isUpdated()) { + jo.put("minHeadersSize", minHeadersSize.getValue()); + jo.put("maxHeadersSize", maxHeadersSize.getValue()); + jo.put("meanHeadersSize", meanHeadersSize.getValue()); + } if (fullStats) { jo.put("startTimestamp", startTimestamp.getTime()); jo.put("resetTimestamp", resetTimestamp.getTime()); diff --git a/core/camel-management/src/test/java/org/apache/camel/management/ManagedMessageSizeTest.java b/core/camel-management/src/test/java/org/apache/camel/management/ManagedMessageSizeTest.java new file mode 100644 index 000000000000..47d599c13d6a --- /dev/null +++ b/core/camel-management/src/test/java/org/apache/camel/management/ManagedMessageSizeTest.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.management; + +import java.nio.charset.StandardCharsets; + +import javax.management.MBeanServer; +import javax.management.ObjectName; + +import org.apache.camel.CamelContext; +import org.apache.camel.builder.RouteBuilder; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.condition.OS; + +import static org.apache.camel.management.DefaultManagementObjectNameStrategy.TYPE_ROUTE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@DisabledOnOs(OS.AIX) +public class ManagedMessageSizeTest extends ManagementTestSupport { + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext context = super.createCamelContext(); + context.setMessageSize(true); + return context; + } + + @Test + public void testMessageSize() throws Exception { + getMockEndpoint("mock:result").expectedMessageCount(2); + + String body1 = "Hello World"; + template.sendBodyAndHeader("direct:start", body1, "myHeader", "myValue"); + + String body2 = "Bye World"; + template.sendBodyAndHeader("direct:start", body2, "myHeader", "myValue"); + + assertMockEndpointsSatisfied(); + + MBeanServer mbeanServer = getMBeanServer(); + ObjectName on = getCamelObjectName(TYPE_ROUTE, "route1"); + + long minBody = (Long) mbeanServer.getAttribute(on, "MinBodySize"); + long maxBody = (Long) mbeanServer.getAttribute(on, "MaxBodySize"); + long meanBody = (Long) mbeanServer.getAttribute(on, "MeanBodySize"); + + long body1Size = body1.getBytes(StandardCharsets.UTF_8).length; + long body2Size = body2.getBytes(StandardCharsets.UTF_8).length; + long expectedMin = Math.min(body1Size, body2Size); + long expectedMax = Math.max(body1Size, body2Size); + long expectedMean = (body1Size + body2Size) / 2; + + assertEquals(expectedMin, minBody); + assertEquals(expectedMax, maxBody); + assertEquals(expectedMean, meanBody); + + long minHeaders = (Long) mbeanServer.getAttribute(on, "MinHeadersSize"); + long maxHeaders = (Long) mbeanServer.getAttribute(on, "MaxHeadersSize"); + long meanHeaders = (Long) mbeanServer.getAttribute(on, "MeanHeadersSize"); + + assertTrue(minHeaders > 0, "MinHeadersSize should be positive"); + assertTrue(maxHeaders > 0, "MaxHeadersSize should be positive"); + assertTrue(meanHeaders > 0, "MeanHeadersSize should be positive"); + // both messages have the same header, so min == max + assertEquals(minHeaders, maxHeaders); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:start").routeId("route1") + .to("mock:result"); + } + }; + } +} diff --git a/docs/user-manual/modules/ROOT/nav.adoc b/docs/user-manual/modules/ROOT/nav.adoc index 1d8dd9deb0b0..83ae650487ec 100644 --- a/docs/user-manual/modules/ROOT/nav.adoc +++ b/docs/user-manual/modules/ROOT/nav.adoc @@ -79,6 +79,7 @@ ** xref:components:eips:intercept.adoc[Intercept] ** xref:jmx.adoc[JMX] ** xref:lifecycle.adoc[Camel Lifecycle] +** xref:message-size.adoc[Message Size] ** xref:oncompletion.adoc[OnCompletion] ** xref:pluggable-class-resolvers.adoc[Pluggable Class Resolvers] ** xref:predicate.adoc[Predicates] diff --git a/docs/user-manual/modules/ROOT/pages/jmx.adoc b/docs/user-manual/modules/ROOT/pages/jmx.adoc index 9918110fdf56..4286af4fd78d 100644 --- a/docs/user-manual/modules/ROOT/pages/jmx.adoc +++ b/docs/user-manual/modules/ROOT/pages/jmx.adoc @@ -324,6 +324,20 @@ You can enable load statistics such as from `application.properties`: camel.main.loadStatisticsEnabled = true ---- +=== Message size statistics + +It is possible to include message size statistics for Route MBeans. +These track the min, max, and mean sizes of incoming message bodies and headers. + +You can enable message size statistics from `application.properties`: + +[source,properties] +---- +camel.main.messageSizeEnabled = true +---- + +See xref:message-size.adoc[Message Size] for more details. + == Hiding sensitive information By default, Camel enlists MBeans in JMX such as endpoints configured diff --git a/docs/user-manual/modules/ROOT/pages/message-size.adoc b/docs/user-manual/modules/ROOT/pages/message-size.adoc new file mode 100644 index 000000000000..93e10c2eaaed --- /dev/null +++ b/docs/user-manual/modules/ROOT/pages/message-size.adoc @@ -0,0 +1,133 @@ += Message Size + +Camel can track the size of incoming message payloads (body and headers) during routing. +This is useful for observability — understanding how large messages are, detecting payload bloat, +and monitoring size trends over time. + +Message size tracking is *disabled by default* and must be explicitly enabled. + +== How it works + +When enabled, a `MessageSizeAdvice` runs in the routing pipeline for each processing step. +It runs _after_ xref:stream-caching.adoc[Stream Caching] so that streamed payloads are safely cached +before their size is computed. + +The advice computes the body size and the total headers size of the incoming message using a +pluggable `MessageSizeStrategy`. The computed sizes are stored as exchange properties +(`CamelMessageBodySize` and `CamelMessageHeadersSize`) and are available for the remainder of the route. + +Size statistics (min, max, mean) are collected per route and exposed via JMX. + +== Enabling Message Size + +[tabs] +==== + +Application Properties:: ++ +[source,properties] +---- +camel.main.messageSizeEnabled = true +---- + +Java:: ++ +[source,java] +---- +CamelContext context = ... +context.setMessageSize(true); +---- + +==== + +TIP: You can run Camel JBang: `camel doc main --filter=messageSize` from CLI to see message size options. + +== Default size computation + +The default `MessageSizeStrategy` computes body size using type-specific logic. +If the body type is not recognized, it falls back to the `Content-Length` header (useful for HTTP components +where the body is a stream). If size cannot be determined, it returns -1 and the exchange is not included +in the size statistics. + +=== Body size + +[width="100%",cols="1m,3",options="header"] +|=== +| Body type | Size computation +| null | 0 +| byte[] | Array length +| String | UTF-8 encoded byte length +| StreamCache | `length()` method +| WrappedFile | `getFileLength()` method (covers file, FTP, and similar components) +| File | `length()` method +| Path | `Files.size()` +| Content-Length header | Fallback: uses the `Content-Length` message header value +| Other | -1 (unknown, not tracked) +|=== + +=== Headers size + +The total headers size is computed by iterating all message headers and summing: + +* Each header key's UTF-8 byte length +* Each header value's `toString()` UTF-8 byte length + +Returns 0 if there are no headers. + +== JMX statistics + +When JMX is enabled, the following attributes are available on route MBeans +(under `org.apache.camel` domain, type `routes`): + +[width="100%",cols="1m,3",options="header"] +|=== +| Attribute | Description +| MinBodySize | Smallest body size observed (bytes) +| MaxBodySize | Largest body size observed (bytes) +| MeanBodySize | Average body size observed (bytes) +| MinHeadersSize | Smallest total headers size observed (bytes) +| MaxHeadersSize | Largest total headers size observed (bytes) +| MeanHeadersSize | Average total headers size observed (bytes) +|=== + +These statistics are also included in `dumpStatsAsXml` and `dumpStatsAsJson` output +when message size tracking is active. + +The statistics are reset when the route statistics are reset. + +== Exchange properties + +The computed sizes are available as exchange properties during routing. +You can use them for logging, content-based routing, or custom processing: + +[source,java] +---- +from("direct:start") + .log("Body size: ${exchangeProperty.CamelMessageBodySize} bytes") + .choice() + .when(exchangeProperty("CamelMessageBodySize").isGreaterThan(1048576)) + .to("direct:largeMessage") + .otherwise() + .to("direct:normalMessage"); +---- + +[width="100%",cols="1m,3",options="header"] +|=== +| Property | Description +| CamelMessageBodySize | Body size in bytes, or -1 if unknown +| CamelMessageHeadersSize | Total headers size in bytes +|=== + +== Custom strategy + +You can provide a custom `MessageSizeStrategy` implementation for specialized sizing logic +(for example, to handle application-specific body types): + +[source,java] +---- +CamelContext context = ... +context.setMessageSizeStrategy(new MyCustomMessageSizeStrategy()); +context.setMessageSize(true); +---- + +The strategy must implement `org.apache.camel.spi.MessageSizeStrategy`.
