This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch exchange-factory in repository https://gitbox.apache.org/repos/asf/camel.git
commit 95790be3d45867af4c64f42fb9ac518580e150a4 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Wed Feb 24 16:08:21 2021 +0100 CAMEL-16222: PooledExchangeFactory experiment --- .../apache/camel/spi/ExchangeFactoryManager.java | 12 +- .../camel/impl/engine/AbstractCamelContext.java | 2 +- .../impl/engine/DefaultExchangeFactoryManager.java | 84 ++++++++++++- .../api/management/mbean/CamelOpenMBeanTypes.java | 8 +- .../mbean/ManagedExchangeFactoryManagerMBean.java | 22 +++- .../mbean/ManagedExchangeFactoryManager.java | 40 ++++++- .../management/ManagedNonManagedServiceTest.java | 2 +- .../management/ManagedPooledExchangeTest.java | 132 +++++++++++++++++++++ ...edProducerRouteAddRemoveRegisterAlwaysTest.java | 2 +- .../management/ManagedRouteAddRemoveTest.java | 2 +- .../src/test/resources/log4j2.properties | 2 +- 11 files changed, 292 insertions(+), 16 deletions(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactoryManager.java b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactoryManager.java index a46884a..1145129 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactoryManager.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactoryManager.java @@ -48,7 +48,7 @@ public interface ExchangeFactoryManager extends StaticService { /** * Number of consumers currently being managed */ - int getSize(); + int getConsumerCounter(); /** * The capacity the pool (for each consumer) uses for storing exchanges. The default capacity is 100. @@ -56,6 +56,11 @@ public interface ExchangeFactoryManager extends StaticService { int getCapacity(); /** + * Number of currently exchanges being pooled (if pooled is in use) + */ + int getPooledCounter(); + + /** * Whether statistics is enabled. */ boolean isStatisticsEnabled(); @@ -75,4 +80,9 @@ public interface ExchangeFactoryManager extends StaticService { */ void purge(); + /** + * Aggregated statistics for all the managed exchange factories + */ + ExchangeFactory.Statistics getStatistics(); + } diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java index 305bfa9..c706c88 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java @@ -266,7 +266,7 @@ public abstract class AbstractCamelContext extends BaseService private volatile String version; private volatile PropertiesComponent propertiesComponent; private volatile CamelContextNameStrategy nameStrategy; - private volatile ExchangeFactoryManager exchangeFactoryManager = new DefaultExchangeFactoryManager(); + private volatile ExchangeFactoryManager exchangeFactoryManager; private volatile ExchangeFactory exchangeFactory; private volatile ReactiveExecutor reactiveExecutor; private volatile ManagementNameStrategy managementNameStrategy; diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactoryManager.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactoryManager.java index 7b5ffc1..5d1b0c1 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactoryManager.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactoryManager.java @@ -30,8 +30,9 @@ import org.apache.camel.support.service.ServiceSupport; public class DefaultExchangeFactoryManager extends ServiceSupport implements ExchangeFactoryManager, CamelContextAware { - private CamelContext camelContext; private final Map<Consumer, ExchangeFactory> factories = new ConcurrentHashMap<>(); + private final UtilizationStatistics statistics = new UtilizationStatistics(); + private CamelContext camelContext; private int capacity; private boolean statisticsEnabled; @@ -62,7 +63,7 @@ public class DefaultExchangeFactoryManager extends ServiceSupport implements Exc } @Override - public int getSize() { + public int getConsumerCounter() { return factories.size(); } @@ -72,6 +73,15 @@ public class DefaultExchangeFactoryManager extends ServiceSupport implements Exc } @Override + public int getPooledCounter() { + int counter = 0; + for (ExchangeFactory ef : factories.values()) { + counter += ef.getSize(); + } + return counter; + } + + @Override public boolean isStatisticsEnabled() { return statisticsEnabled; } @@ -95,6 +105,76 @@ public class DefaultExchangeFactoryManager extends ServiceSupport implements Exc } @Override + public ExchangeFactory.Statistics getStatistics() { + return statistics; + } + + /** + * Represents utilization statistics + */ + final class UtilizationStatistics implements ExchangeFactory.Statistics { + + @Override + public void reset() { + resetStatistics(); + } + + @Override + public long getCreatedCounter() { + long answer = 0; + if (statisticsEnabled) { + for (ExchangeFactory ef : factories.values()) { + answer += ef.getStatistics().getCreatedCounter(); + } + } + return answer; + } + + @Override + public long getAcquiredCounter() { + long answer = 0; + if (statisticsEnabled) { + for (ExchangeFactory ef : factories.values()) { + answer += ef.getStatistics().getAcquiredCounter(); + } + } + return answer; + } + + @Override + public long getReleasedCounter() { + long answer = 0; + if (statisticsEnabled) { + for (ExchangeFactory ef : factories.values()) { + answer += ef.getStatistics().getReleasedCounter(); + } + } + return answer; + } + + @Override + public long getDiscardedCounter() { + long answer = 0; + if (statisticsEnabled) { + for (ExchangeFactory ef : factories.values()) { + answer += ef.getStatistics().getDiscardedCounter(); + } + } + return answer; + } + + @Override + public boolean isStatisticsEnabled() { + return statisticsEnabled; + } + + @Override + public void setStatisticsEnabled(boolean statisticsEnabled) { + throw new UnsupportedOperationException(); + } + } + + @Override protected void doShutdown() throws Exception { factories.clear(); } diff --git a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java index 96fbcd8..e332344 100644 --- a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java +++ b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java @@ -73,9 +73,11 @@ public final class CamelOpenMBeanTypes { public static CompositeType listExchangeFactoryCompositeType() throws OpenDataException { return new CompositeType( "factories", "Factories", - new String[] { "url", "capacity", "pooled", "created", "released" }, - new String[] { "Url", "Capacity", "Pooled", "Created", "Released" }, - new OpenType[] { SimpleType.STRING, SimpleType.INTEGER, SimpleType.INTEGER, SimpleType.LONG, SimpleType.LONG }); + new String[] { "url", "capacity", "pooled", "created", "acquired", "released", "discarded" }, + new String[] { "Url", "Capacity", "Pooled", "Created", "Acquired", "Released", "Discarded" }, + new OpenType[] { + SimpleType.STRING, SimpleType.INTEGER, SimpleType.INTEGER, SimpleType.LONG, SimpleType.LONG, + SimpleType.LONG, SimpleType.LONG }); } public static TabularType listRuntimeEndpointsTabularType() throws OpenDataException { diff --git a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedExchangeFactoryManagerMBean.java b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedExchangeFactoryManagerMBean.java index bedebc9..f2d57fd 100644 --- a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedExchangeFactoryManagerMBean.java +++ b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedExchangeFactoryManagerMBean.java @@ -23,6 +23,9 @@ import org.apache.camel.api.management.ManagedOperation; public interface ManagedExchangeFactoryManagerMBean extends ManagedServiceMBean { + @ManagedAttribute(description = "Number of consumers managed") + Integer getConsumerCounter(); + @ManagedAttribute(description = "Max capacity per consumer for exchange pooling") Integer getCapacity(); @@ -38,7 +41,22 @@ public interface ManagedExchangeFactoryManagerMBean extends ManagedServiceMBean @ManagedOperation(description = "Purges the pool") void purge(); - @ManagedOperation(description = "Lists all the consumers and their pooling statistics") - TabularData listPools(); + @ManagedAttribute(description = "Total number of currently pooled exchanges (if pooling is in use)") + Integer getTotalPooled(); + + @ManagedAttribute(description = "Total number of new exchanges created") + Long getTotalCreated(); + + @ManagedAttribute(description = "Total number of exchanges reused (if pooling is in use)") + Long getTotalAcquired(); + + @ManagedAttribute(description = "Total number of exchanges released back to the pool") + Long getTotalReleased(); + + @ManagedAttribute(description = "Total number of exchanges discarded (such as when capacity is full)") + Long getTotalDiscarded(); + + @ManagedOperation(description = "Lists all the statistics in tabular form") + TabularData listStatistics(); } diff --git a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedExchangeFactoryManager.java b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedExchangeFactoryManager.java index bab7a0d..dea0a7c 100644 --- a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedExchangeFactoryManager.java +++ b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedExchangeFactoryManager.java @@ -52,6 +52,16 @@ public class ManagedExchangeFactoryManager extends ManagedService implements Man } @Override + public Integer getConsumerCounter() { + return exchangeFactoryManager.getConsumerCounter(); + } + + @Override + public Integer getTotalPooled() { + return exchangeFactoryManager.getPooledCounter(); + } + + @Override public Integer getCapacity() { return exchangeFactoryManager.getCapacity(); } @@ -77,7 +87,27 @@ public class ManagedExchangeFactoryManager extends ManagedService implements Man } @Override - public TabularData listPools() { + public Long getTotalCreated() { + return exchangeFactoryManager.getStatistics().getCreatedCounter(); + } + + @Override + public Long getTotalAcquired() { + return exchangeFactoryManager.getStatistics().getAcquiredCounter(); + } + + @Override + public Long getTotalReleased() { + return exchangeFactoryManager.getStatistics().getReleasedCounter(); + } + + @Override + public Long getTotalDiscarded() { + return exchangeFactoryManager.getStatistics().getDiscardedCounter(); + } + + @Override + public TabularData listStatistics() { try { TabularData answer = new TabularDataSupport(CamelOpenMBeanTypes.listExchangeFactoryTabularType()); Collection<ExchangeFactory> factories = exchangeFactoryManager.getExchangeFactories(); @@ -91,15 +121,19 @@ public class ManagedExchangeFactoryManager extends ManagedService implements Man int capacity = ef.getCapacity(); int size = ef.getSize(); long created = 0; + long acquired = 0; long released = 0; + long discarded = 0; if (ef.isStatisticsEnabled()) { created = ef.getStatistics().getCreatedCounter(); + acquired = ef.getStatistics().getAcquiredCounter(); released = ef.getStatistics().getReleasedCounter(); + discarded = ef.getStatistics().getDiscardedCounter(); } CompositeData data = new CompositeDataSupport( - ct, new String[] { "url", "capacity", "pooled", "created", "released" }, - new Object[] { url, capacity, size, created, released }); + ct, new String[] { "url", "capacity", "pooled", "created", "acquired", "released", "discarded" }, + new Object[] { url, capacity, size, created, acquired, released, discarded }); answer.put(data); } return answer; diff --git a/core/camel-management/src/test/java/org/apache/camel/management/ManagedNonManagedServiceTest.java b/core/camel-management/src/test/java/org/apache/camel/management/ManagedNonManagedServiceTest.java index 0779481..87948df 100644 --- a/core/camel-management/src/test/java/org/apache/camel/management/ManagedNonManagedServiceTest.java +++ b/core/camel-management/src/test/java/org/apache/camel/management/ManagedNonManagedServiceTest.java @@ -31,7 +31,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; public class ManagedNonManagedServiceTest extends ManagementTestSupport { - private static final int SERVICES = 11; + private static final int SERVICES = 12; @Test public void testService() throws Exception { diff --git a/core/camel-management/src/test/java/org/apache/camel/management/ManagedPooledExchangeTest.java b/core/camel-management/src/test/java/org/apache/camel/management/ManagedPooledExchangeTest.java new file mode 100644 index 0000000..163c7bc --- /dev/null +++ b/core/camel-management/src/test/java/org/apache/camel/management/ManagedPooledExchangeTest.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.management; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import javax.management.MBeanServer; +import javax.management.ObjectName; + +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.ExtendedCamelContext; +import org.apache.camel.Processor; +import org.apache.camel.ServiceStatus; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.engine.PooledExchangeFactory; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertSame; + +public class ManagedPooledExchangeTest extends ManagementTestSupport { + + private final AtomicInteger counter = new AtomicInteger(); + private final AtomicReference<Exchange> ref = new AtomicReference<>(); + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext context = super.createCamelContext(); + + PooledExchangeFactory pef = new PooledExchangeFactory(); + pef.setStatisticsEnabled(true); + pef.setCapacity(123); + context.adapt(ExtendedCamelContext.class).setExchangeFactory(pef); + + return context; + } + + @Test + public void testSameExchange() throws Exception { + // JMX tests dont work well on AIX CI servers (hangs them) + if (isPlatform("aix")) { + return; + } + + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(3); + mock.expectedPropertyValuesReceivedInAnyOrder("myprop", 1, 3, 5); + mock.expectedHeaderValuesReceivedInAnyOrder("myheader", 2, 4, 6); + mock.message(0).header("first").isEqualTo(true); + mock.message(1).header("first").isNull(); + mock.message(2).header("first").isNull(); + + context.getRouteController().startAllRoutes(); + + assertMockEndpointsSatisfied(); + + // get the stats for the route + MBeanServer mbeanServer = getMBeanServer(); + + // get the object name for the delayer + ObjectName on + = ObjectName.getInstance("org.apache.camel:context=camel-1,type=services,name=DefaultExchangeFactoryManager"); + + String state = (String) mbeanServer.getAttribute(on, "State"); + assertEquals(ServiceStatus.Started.name(), state); + + Integer con = (Integer) mbeanServer.getAttribute(on, "ConsumerCounter"); + assertEquals(1, con.intValue()); + + Integer cap = (Integer) mbeanServer.getAttribute(on, "Capacity"); + assertEquals(123, cap.intValue()); + + // also only 1 exchange pooled + con = (Integer) mbeanServer.getAttribute(on, "TotalPooled"); + assertEquals(1, con.intValue()); + + Long num = (Long) mbeanServer.getAttribute(on, "TotalCreated"); + assertEquals(1, num.intValue()); + + num = (Long) mbeanServer.getAttribute(on, "TotalAcquired"); + assertEquals(2, num.intValue()); + + num = (Long) mbeanServer.getAttribute(on, "TotalReleased"); + assertEquals(3, num.intValue()); + + num = (Long) mbeanServer.getAttribute(on, "TotalDiscarded"); + assertEquals(0, num.intValue()); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("timer:foo?period=1&delay=1&repeatCount=3").noAutoStartup() + .setProperty("myprop", counter::incrementAndGet) + .setHeader("myheader", counter::incrementAndGet) + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + // should be same exchange instance as its pooled + Exchange old = ref.get(); + if (old == null) { + ref.set(exchange); + exchange.getMessage().setHeader("first", true); + } else { + assertSame(old, exchange); + } + } + }) + .to("mock:result"); + } + }; + } +} diff --git a/core/camel-management/src/test/java/org/apache/camel/management/ManagedProducerRouteAddRemoveRegisterAlwaysTest.java b/core/camel-management/src/test/java/org/apache/camel/management/ManagedProducerRouteAddRemoveRegisterAlwaysTest.java index c88836f..faa9bc7 100644 --- a/core/camel-management/src/test/java/org/apache/camel/management/ManagedProducerRouteAddRemoveRegisterAlwaysTest.java +++ b/core/camel-management/src/test/java/org/apache/camel/management/ManagedProducerRouteAddRemoveRegisterAlwaysTest.java @@ -31,7 +31,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class ManagedProducerRouteAddRemoveRegisterAlwaysTest extends ManagementTestSupport { - private static final int SERVICES = 11; + private static final int SERVICES = 12; @Override protected CamelContext createCamelContext() throws Exception { diff --git a/core/camel-management/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java b/core/camel-management/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java index bfef164..b589ecd 100644 --- a/core/camel-management/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java +++ b/core/camel-management/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java @@ -36,7 +36,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; */ public class ManagedRouteAddRemoveTest extends ManagementTestSupport { - private static final int SERVICES = 11; + private static final int SERVICES = 12; @Override protected RouteBuilder createRouteBuilder() throws Exception { diff --git a/core/camel-management/src/test/resources/log4j2.properties b/core/camel-management/src/test/resources/log4j2.properties index 3f1dcba..2e68815 100644 --- a/core/camel-management/src/test/resources/log4j2.properties +++ b/core/camel-management/src/test/resources/log4j2.properties @@ -46,4 +46,4 @@ rootLogger.appenderRef.file.ref = file #rootLogger.appenderRef.console.ref = console #logger.camel-core.name = org.apache.camel -#logger.camel-core.level = TRACE +#logger.camel-core.level = DEBUG