This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 4e945fa44153ca970d4f3d0782bfee3b764c5973 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Thu Jan 23 08:02:00 2020 +0100 CAMEL-14426: camel-core - Optimize inflight repository --- .../camel/blueprint/CamelContextFactoryBean.java | 11 +++++++ .../camel/cdi/xml/CamelContextFactoryBean.java | 12 ++++++++ .../camel/spring/CamelContextFactoryBean.java | 16 ++++++++++ .../org/apache/camel/spi/InflightRepository.java | 16 ++++++++++ .../impl/engine/DefaultInflightRepository.java | 34 ++++++++++++++++++++-- .../core/xml/AbstractCamelContextFactoryBean.java | 5 ++++ .../InflightRepositoryBrowseFromRouteTest.java | 8 +++++ .../camel/impl/InflightRepositoryBrowseTest.java | 8 +++++ .../camel/main/DefaultConfigurationConfigurer.java | 2 ++ .../camel/main/DefaultConfigurationProperties.java | 28 ++++++++++++++++++ .../camel-main-configuration-metadata.json | 8 ++++- .../mbean/ManagedInflightRepositoryMBean.java | 3 ++ .../mbean/ManagedInflightRepository.java | 5 ++++ .../management/ManagedInflightRepositoryTest.java | 3 ++ .../management/ManagedInflightStatisticsTest.java | 8 +++++ 15 files changed, 163 insertions(+), 4 deletions(-) diff --git a/components/camel-blueprint/src/main/java/org/apache/camel/blueprint/CamelContextFactoryBean.java b/components/camel-blueprint/src/main/java/org/apache/camel/blueprint/CamelContextFactoryBean.java index 12f04c3..4db65ae 100644 --- a/components/camel-blueprint/src/main/java/org/apache/camel/blueprint/CamelContextFactoryBean.java +++ b/components/camel-blueprint/src/main/java/org/apache/camel/blueprint/CamelContextFactoryBean.java @@ -140,6 +140,8 @@ public class CamelContextFactoryBean extends AbstractCamelContextFactoryBean<Blu @XmlAttribute private Boolean typeConverterStatisticsEnabled; @XmlAttribute + private Boolean inflightRepositoryExchangeEnabled; + @XmlAttribute private TypeConverterExists typeConverterExists; @XmlAttribute private LoggingLevel typeConverterExistsLoggingLevel; @@ -494,6 +496,15 @@ public class CamelContextFactoryBean extends AbstractCamelContextFactoryBean<Blu } @Override + public Boolean getInflightRepositoryExchangeEnabled() { + return inflightRepositoryExchangeEnabled; + } + + public void setInflightRepositoryExchangeEnabled(Boolean inflightRepositoryExchangeEnabled) { + this.inflightRepositoryExchangeEnabled = inflightRepositoryExchangeEnabled; + } + + @Override public TypeConverterExists getTypeConverterExists() { return typeConverterExists; } diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/xml/CamelContextFactoryBean.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/xml/CamelContextFactoryBean.java index f271c76..797bc8b 100644 --- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/xml/CamelContextFactoryBean.java +++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/xml/CamelContextFactoryBean.java @@ -147,6 +147,9 @@ public class CamelContextFactoryBean extends AbstractCamelContextFactoryBean<Def private Boolean typeConverterStatisticsEnabled; @XmlAttribute + private Boolean inflightRepositoryExchangeEnabled; + + @XmlAttribute private TypeConverterExists typeConverterExists; @XmlAttribute @@ -757,6 +760,15 @@ public class CamelContextFactoryBean extends AbstractCamelContextFactoryBean<Def } @Override + public Boolean getInflightRepositoryExchangeEnabled() { + return inflightRepositoryExchangeEnabled; + } + + public void setInflightRepositoryExchangeEnabled(Boolean inflightRepositoryExchangeEnabled) { + this.inflightRepositoryExchangeEnabled = inflightRepositoryExchangeEnabled; + } + + @Override public TypeConverterExists getTypeConverterExists() { return typeConverterExists; } diff --git a/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java b/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java index f82068f..7423b80 100644 --- a/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java +++ b/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java @@ -150,6 +150,8 @@ public class CamelContextFactoryBean extends AbstractCamelContextFactoryBean<Spr private Boolean loadTypeConverters; @XmlAttribute private Boolean typeConverterStatisticsEnabled; + @XmlAttribute + private Boolean inflightRepositoryExchangeEnabled; @XmlAttribute @Metadata(defaultValue = "Override") private TypeConverterExists typeConverterExists; @XmlAttribute @Metadata(defaultValue = "WARN") @@ -920,6 +922,20 @@ public class CamelContextFactoryBean extends AbstractCamelContextFactoryBean<Spr } @Override + public Boolean getInflightRepositoryExchangeEnabled() { + return inflightRepositoryExchangeEnabled; + } + + /** + * Sets whether the inflight repository should track each inflight exchange. + * + * This is by default disabled as there is a very slight performance overhead when enabled. + */ + public void setInflightRepositoryExchangeEnabled(Boolean inflightRepositoryExchangeEnabled) { + this.inflightRepositoryExchangeEnabled = inflightRepositoryExchangeEnabled; + } + + @Override public String getManagementNamePattern() { return managementNamePattern; } diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/InflightRepository.java b/core/camel-api/src/main/java/org/apache/camel/spi/InflightRepository.java index 2c0f5ae..eb678ea 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/InflightRepository.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/InflightRepository.java @@ -135,6 +135,22 @@ public interface InflightRepository extends StaticService { int size(String routeId); /** + * Whether tracking inflight exchanges is enabled. This is required to be enabled for the browse and oldest APIs to function. + * + * This is by default disabled as there is a very slight performance overhead when enabled. + */ + boolean isInflightExchangeEnabled(); + + /** + * Whether tracking inflight exchanges is enabled. This is required to be enabled for the browse and oldest APIs to function. + * + * This is by default disabled as there is a very slight performance overhead when enabled. + * + * @param inflightExchangeEnabled whether tracking inflight exchanges is enabled + */ + void setInflightExchangeEnabled(boolean inflightExchangeEnabled); + + /** * A <i>read-only</i> browser of the {@link InflightExchange}s that are currently inflight. */ Collection<InflightExchange> browse(); diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultInflightRepository.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultInflightRepository.java index 4215e64..3ab887f 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultInflightRepository.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultInflightRepository.java @@ -44,17 +44,27 @@ public class DefaultInflightRepository extends ServiceSupport implements Infligh private static final Logger LOG = LoggerFactory.getLogger(DefaultInflightRepository.class); + private final AtomicInteger size = new AtomicInteger(); private final ConcurrentMap<String, Exchange> inflight = new ConcurrentHashMap<>(); private final ConcurrentMap<String, AtomicInteger> routeCount = new ConcurrentHashMap<>(); + private boolean inflightExchangeEnabled; @Override public void add(Exchange exchange) { - inflight.put(exchange.getExchangeId(), exchange); + size.incrementAndGet(); + + if (inflightExchangeEnabled) { + inflight.put(exchange.getExchangeId(), exchange); + } } @Override public void remove(Exchange exchange) { - inflight.remove(exchange.getExchangeId()); + size.decrementAndGet(); + + if (inflightExchangeEnabled) { + inflight.remove(exchange.getExchangeId()); + } } @Override @@ -75,7 +85,7 @@ public class DefaultInflightRepository extends ServiceSupport implements Infligh @Override public int size() { - return inflight.size(); + return size.get(); } @Override @@ -95,6 +105,16 @@ public class DefaultInflightRepository extends ServiceSupport implements Infligh } @Override + public boolean isInflightExchangeEnabled() { + return inflightExchangeEnabled; + } + + @Override + public void setInflightExchangeEnabled(boolean inflightExchangeEnabled) { + this.inflightExchangeEnabled = inflightExchangeEnabled; + } + + @Override public Collection<InflightExchange> browse() { return browse(null, -1, false); } @@ -111,6 +131,10 @@ public class DefaultInflightRepository extends ServiceSupport implements Infligh @Override public Collection<InflightExchange> browse(String fromRouteId, int limit, boolean sortByLongestDuration) { + if (!inflightExchangeEnabled) { + return Collections.emptyList(); + } + Stream<Exchange> values; if (fromRouteId == null) { // all values @@ -144,6 +168,10 @@ public class DefaultInflightRepository extends ServiceSupport implements Infligh @Override public InflightExchange oldest(String fromRouteId) { + if (!inflightExchangeEnabled) { + return null; + } + Stream<Exchange> values; if (fromRouteId == null) { diff --git a/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java b/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java index 314ce92..e8165f5 100644 --- a/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java +++ b/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java @@ -824,6 +824,8 @@ public abstract class AbstractCamelContextFactoryBean<T extends ModelCamelContex public abstract Boolean getLoadTypeConverters(); + public abstract Boolean getInflightRepositoryExchangeEnabled(); + public abstract Boolean getTypeConverterStatisticsEnabled(); public abstract LoggingLevel getTypeConverterExistsLoggingLevel(); @@ -961,6 +963,9 @@ public abstract class AbstractCamelContextFactoryBean<T extends ModelCamelContex if (getTypeConverterStatisticsEnabled() != null) { context.setTypeConverterStatisticsEnabled(getTypeConverterStatisticsEnabled()); } + if (getInflightRepositoryExchangeEnabled() != null) { + context.getInflightRepository().setInflightExchangeEnabled(getInflightRepositoryExchangeEnabled()); + } if (getTypeConverterExists() != null) { context.getTypeConverterRegistry().setTypeConverterExists(getTypeConverterExists()); } diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryBrowseFromRouteTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryBrowseFromRouteTest.java index 7af2178..2772200 100644 --- a/core/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryBrowseFromRouteTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryBrowseFromRouteTest.java @@ -18,6 +18,7 @@ package org.apache.camel.impl; import java.util.Collection; +import org.apache.camel.CamelContext; import org.apache.camel.ContextTestSupport; import org.apache.camel.Exchange; import org.apache.camel.Processor; @@ -27,6 +28,13 @@ import org.junit.Test; public class InflightRepositoryBrowseFromRouteTest extends ContextTestSupport { + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext context = super.createCamelContext(); + context.getInflightRepository().setInflightExchangeEnabled(true); + return context; + } + @Test public void testInflight() throws Exception { assertEquals(0, context.getInflightRepository().browse().size()); diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryBrowseTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryBrowseTest.java index 13f15cc..7807fbe 100644 --- a/core/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryBrowseTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryBrowseTest.java @@ -18,6 +18,7 @@ package org.apache.camel.impl; import java.util.Collection; +import org.apache.camel.CamelContext; import org.apache.camel.ContextTestSupport; import org.apache.camel.Exchange; import org.apache.camel.Processor; @@ -27,6 +28,13 @@ import org.junit.Test; public class InflightRepositoryBrowseTest extends ContextTestSupport { + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext context = super.createCamelContext(); + context.getInflightRepository().setInflightExchangeEnabled(true); + return context; + } + @Test public void testInflight() throws Exception { assertEquals(0, context.getInflightRepository().browse().size()); diff --git a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java index 9b21acc..6b25e36 100644 --- a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java +++ b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java @@ -107,6 +107,8 @@ public final class DefaultConfigurationConfigurer { camelContext.getShutdownStrategy().setShutdownRoutesInReverseOrder(config.isShutdownRoutesInReverseOrder()); camelContext.getShutdownStrategy().setLogInflightExchangesOnTimeout(config.isShutdownLogInflightExchangesOnTimeout()); + camelContext.getInflightRepository().setInflightExchangeEnabled(config.isInflightRepositoryExchangeEnabled()); + if (config.getLogDebugMaxChars() != 0) { camelContext.getGlobalOptions().put(Exchange.LOG_DEBUG_BODY_MAX_CHARS, "" + config.getLogDebugMaxChars()); } diff --git a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationProperties.java b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationProperties.java index a91b6c5..e938314 100644 --- a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationProperties.java +++ b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationProperties.java @@ -34,6 +34,7 @@ public abstract class DefaultConfigurationProperties<T> { private boolean shutdownNowOnTimeout = true; private boolean shutdownRoutesInReverseOrder = true; private boolean shutdownLogInflightExchangesOnTimeout = true; + private boolean inflightRepositoryExchangeEnabled; private String fileConfigurations; private boolean jmxEnabled = true; private int producerTemplateCacheSize = 1000; @@ -190,11 +191,26 @@ public abstract class DefaultConfigurationProperties<T> { /** * Sets whether to log information about the inflight Exchanges which are still running * during a shutdown which didn't complete without the given timeout. + * + * This requires to enable the option inflightRepositoryExchangeEnabled. */ public void setShutdownLogInflightExchangesOnTimeout(boolean shutdownLogInflightExchangesOnTimeout) { this.shutdownLogInflightExchangesOnTimeout = shutdownLogInflightExchangesOnTimeout; } + public boolean isInflightRepositoryExchangeEnabled() { + return inflightRepositoryExchangeEnabled; + } + + /** + * Sets whether the inflight repository should track each inflight exchange. + * + * This is by default disabled as there is a very slight performance overhead when enabled. + */ + public void setInflightRepositoryExchangeEnabled(boolean inflightRepositoryExchangeEnabled) { + this.inflightRepositoryExchangeEnabled = inflightRepositoryExchangeEnabled; + } + public String getFileConfigurations() { return fileConfigurations; } @@ -937,6 +953,8 @@ public abstract class DefaultConfigurationProperties<T> { /** * Sets whether to log information about the inflight Exchanges which are still running * during a shutdown which didn't complete without the given timeout. + * + * This requires to enable the option inflightRepositoryExchangeEnabled. */ public T withShutdownLogInflightExchangesOnTimeout(boolean shutdownLogInflightExchangesOnTimeout) { this.shutdownLogInflightExchangesOnTimeout = shutdownLogInflightExchangesOnTimeout; @@ -944,6 +962,16 @@ public abstract class DefaultConfigurationProperties<T> { } /** + * Sets whether the inflight repository should track each inflight exchange. + * + * This is by default disabled as there is a very slight performance overhead when enabled. + */ + public T withInflightRepositoryExchangeEnabled(boolean inflightRepositoryExchangeEnabled) { + this.inflightRepositoryExchangeEnabled = inflightRepositoryExchangeEnabled; + return (T) this; + } + + /** * Directory to load additional configuration files that contains * configuration values that takes precedence over any other configuration. * This can be used to refer to files that may have secret configuration that diff --git a/core/camel-main/src/main/resources/META-INF/camel-main-configuration-metadata.json b/core/camel-main/src/main/resources/META-INF/camel-main-configuration-metadata.json index 29d8e30..3637664 100644 --- a/core/camel-main/src/main/resources/META-INF/camel-main-configuration-metadata.json +++ b/core/camel-main/src/main/resources/META-INF/camel-main-configuration-metadata.json @@ -176,6 +176,12 @@ "defaultValue":true }, { + "name":"camel.main.inflight-repository-exchange-enabled", + "type":"boolean", + "sourceType":"org.apache.camel.main.DefaultConfigurationProperties", + "description":"Sets whether the inflight repository should track each inflight exchange. This is by default disabled as there is a very slight performance overhead when enabled." + }, + { "name":"camel.main.java-routes-exclude-pattern", "type":"java.lang.String", "sourceType":"org.apache.camel.main.DefaultConfigurationProperties", @@ -293,7 +299,7 @@ "name":"camel.main.shutdown-log-inflight-exchanges-on-timeout", "type":"boolean", "sourceType":"org.apache.camel.main.DefaultConfigurationProperties", - "description":"Sets whether to log information about the inflight Exchanges which are still running during a shutdown which didn't complete without the given timeout.", + "description":"Sets whether to log information about the inflight Exchanges which are still running during a shutdown which didn't complete without the given timeout. This requires to enable the option inflightRepositoryExchangeEnabled.", "defaultValue":true }, { diff --git a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedInflightRepositoryMBean.java b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedInflightRepositoryMBean.java index f3845d7..260cec1 100644 --- a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedInflightRepositoryMBean.java +++ b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedInflightRepositoryMBean.java @@ -26,6 +26,9 @@ public interface ManagedInflightRepositoryMBean extends ManagedServiceMBean { @ManagedAttribute(description = "Current size of inflight exchanges.") int getSize(); + @ManagedAttribute(description = "Whether tracking inflight exchanges is enabled. This is required to be enabled for the browse operations to function.") + boolean isInflightExchangeEnabled(); + @ManagedOperation(description = "Current size of inflight exchanges which are from the given route.") int size(String routeId); diff --git a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedInflightRepository.java b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedInflightRepository.java index 7afa7c8..2891cd3 100644 --- a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedInflightRepository.java +++ b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedInflightRepository.java @@ -54,6 +54,11 @@ public class ManagedInflightRepository extends ManagedService implements Managed } @Override + public boolean isInflightExchangeEnabled() { + return inflightRepository.isInflightExchangeEnabled(); + } + + @Override public int size(String routeId) { return inflightRepository.size(routeId); } diff --git a/core/camel-management/src/test/java/org/apache/camel/management/ManagedInflightRepositoryTest.java b/core/camel-management/src/test/java/org/apache/camel/management/ManagedInflightRepositoryTest.java index cfaa7f8..b0166b8 100644 --- a/core/camel-management/src/test/java/org/apache/camel/management/ManagedInflightRepositoryTest.java +++ b/core/camel-management/src/test/java/org/apache/camel/management/ManagedInflightRepositoryTest.java @@ -20,6 +20,7 @@ import javax.management.MBeanServer; import javax.management.ObjectName; import javax.management.openmbean.TabularData; +import org.apache.camel.CamelContext; import org.apache.camel.builder.RouteBuilder; import org.junit.Test; @@ -44,6 +45,8 @@ public class ManagedInflightRepositoryTest extends ManagementTestSupport { return new RouteBuilder() { @Override public void configure() throws Exception { + context.getInflightRepository().setInflightExchangeEnabled(true); + from("direct:start").routeId("foo") .to("mock:a") .process(exchange -> { diff --git a/core/camel-management/src/test/java/org/apache/camel/management/ManagedInflightStatisticsTest.java b/core/camel-management/src/test/java/org/apache/camel/management/ManagedInflightStatisticsTest.java index 927742b..bae32e9 100644 --- a/core/camel-management/src/test/java/org/apache/camel/management/ManagedInflightStatisticsTest.java +++ b/core/camel-management/src/test/java/org/apache/camel/management/ManagedInflightStatisticsTest.java @@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit; import javax.management.MBeanServer; import javax.management.ObjectName; +import org.apache.camel.CamelContext; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.junit.Test; @@ -32,6 +33,13 @@ import static org.awaitility.Awaitility.await; public class ManagedInflightStatisticsTest extends ManagementTestSupport { + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext context = super.createCamelContext(); + context.getInflightRepository().setInflightExchangeEnabled(true); + return context; + } + @Test public void testOldestInflight() throws Exception { // JMX tests dont work well on AIX CI servers (hangs them)