This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 95790be3d45867af4c64f42fb9ac518580e150a4
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Wed Feb 24 16:08:21 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../apache/camel/spi/ExchangeFactoryManager.java   |  12 +-
 .../camel/impl/engine/AbstractCamelContext.java    |   2 +-
 .../impl/engine/DefaultExchangeFactoryManager.java |  84 ++++++++++++-
 .../api/management/mbean/CamelOpenMBeanTypes.java  |   8 +-
 .../mbean/ManagedExchangeFactoryManagerMBean.java  |  22 +++-
 .../mbean/ManagedExchangeFactoryManager.java       |  40 ++++++-
 .../management/ManagedNonManagedServiceTest.java   |   2 +-
 .../management/ManagedPooledExchangeTest.java      | 132 +++++++++++++++++++++
 ...edProducerRouteAddRemoveRegisterAlwaysTest.java |   2 +-
 .../management/ManagedRouteAddRemoveTest.java      |   2 +-
 .../src/test/resources/log4j2.properties           |   2 +-
 11 files changed, 292 insertions(+), 16 deletions(-)

diff --git 
a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactoryManager.java 
b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactoryManager.java
index a46884a..1145129 100644
--- 
a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactoryManager.java
+++ 
b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactoryManager.java
@@ -48,7 +48,7 @@ public interface ExchangeFactoryManager extends StaticService 
{
     /**
      * Number of consumers currently being managed
      */
-    int getSize();
+    int getConsumerCounter();
 
     /**
      * The capacity the pool (for each consumer) uses for storing exchanges. 
The default capacity is 100.
@@ -56,6 +56,11 @@ public interface ExchangeFactoryManager extends 
StaticService {
     int getCapacity();
 
     /**
+     * Number of currently exchanges being pooled (if pooled is in use)
+     */
+    int getPooledCounter();
+
+    /**
      * Whether statistics is enabled.
      */
     boolean isStatisticsEnabled();
@@ -75,4 +80,9 @@ public interface ExchangeFactoryManager extends StaticService 
{
      */
     void purge();
 
+    /**
+     * Aggregated statistics for all the managed exchange factories
+     */
+    ExchangeFactory.Statistics getStatistics();
+
 }
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
index 305bfa9..c706c88 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
@@ -266,7 +266,7 @@ public abstract class AbstractCamelContext extends 
BaseService
     private volatile String version;
     private volatile PropertiesComponent propertiesComponent;
     private volatile CamelContextNameStrategy nameStrategy;
-    private volatile ExchangeFactoryManager exchangeFactoryManager = new 
DefaultExchangeFactoryManager();
+    private volatile ExchangeFactoryManager exchangeFactoryManager;
     private volatile ExchangeFactory exchangeFactory;
     private volatile ReactiveExecutor reactiveExecutor;
     private volatile ManagementNameStrategy managementNameStrategy;
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactoryManager.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactoryManager.java
index 7b5ffc1..5d1b0c1 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactoryManager.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactoryManager.java
@@ -30,8 +30,9 @@ import org.apache.camel.support.service.ServiceSupport;
 
 public class DefaultExchangeFactoryManager extends ServiceSupport implements 
ExchangeFactoryManager, CamelContextAware {
 
-    private CamelContext camelContext;
     private final Map<Consumer, ExchangeFactory> factories = new 
ConcurrentHashMap<>();
+    private final UtilizationStatistics statistics = new 
UtilizationStatistics();
+    private CamelContext camelContext;
     private int capacity;
     private boolean statisticsEnabled;
 
@@ -62,7 +63,7 @@ public class DefaultExchangeFactoryManager extends 
ServiceSupport implements Exc
     }
 
     @Override
-    public int getSize() {
+    public int getConsumerCounter() {
         return factories.size();
     }
 
@@ -72,6 +73,15 @@ public class DefaultExchangeFactoryManager extends 
ServiceSupport implements Exc
     }
 
     @Override
+    public int getPooledCounter() {
+        int counter = 0;
+        for (ExchangeFactory ef : factories.values()) {
+            counter += ef.getSize();
+        }
+        return counter;
+    }
+
+    @Override
     public boolean isStatisticsEnabled() {
         return statisticsEnabled;
     }
@@ -95,6 +105,76 @@ public class DefaultExchangeFactoryManager extends 
ServiceSupport implements Exc
     }
 
     @Override
+    public ExchangeFactory.Statistics getStatistics() {
+        return statistics;
+    }
+
+    /**
+     * Represents utilization statistics
+     */
+    final class UtilizationStatistics implements ExchangeFactory.Statistics {
+
+        @Override
+        public void reset() {
+            resetStatistics();
+        }
+
+        @Override
+        public long getCreatedCounter() {
+            long answer = 0;
+            if (statisticsEnabled) {
+                for (ExchangeFactory ef : factories.values()) {
+                    answer += ef.getStatistics().getCreatedCounter();
+                }
+            }
+            return answer;
+        }
+
+        @Override
+        public long getAcquiredCounter() {
+            long answer = 0;
+            if (statisticsEnabled) {
+                for (ExchangeFactory ef : factories.values()) {
+                    answer += ef.getStatistics().getAcquiredCounter();
+                }
+            }
+            return answer;
+        }
+
+        @Override
+        public long getReleasedCounter() {
+            long answer = 0;
+            if (statisticsEnabled) {
+                for (ExchangeFactory ef : factories.values()) {
+                    answer += ef.getStatistics().getReleasedCounter();
+                }
+            }
+            return answer;
+        }
+
+        @Override
+        public long getDiscardedCounter() {
+            long answer = 0;
+            if (statisticsEnabled) {
+                for (ExchangeFactory ef : factories.values()) {
+                    answer += ef.getStatistics().getDiscardedCounter();
+                }
+            }
+            return answer;
+        }
+
+        @Override
+        public boolean isStatisticsEnabled() {
+            return statisticsEnabled;
+        }
+
+        @Override
+        public void setStatisticsEnabled(boolean statisticsEnabled) {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    @Override
     protected void doShutdown() throws Exception {
         factories.clear();
     }
diff --git 
a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java
 
b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java
index 96fbcd8..e332344 100644
--- 
a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java
+++ 
b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java
@@ -73,9 +73,11 @@ public final class CamelOpenMBeanTypes {
     public static CompositeType listExchangeFactoryCompositeType() throws 
OpenDataException {
         return new CompositeType(
                 "factories", "Factories",
-                new String[] { "url", "capacity", "pooled", "created", 
"released" },
-                new String[] { "Url", "Capacity", "Pooled", "Created", 
"Released" },
-                new OpenType[] { SimpleType.STRING, SimpleType.INTEGER, 
SimpleType.INTEGER, SimpleType.LONG, SimpleType.LONG });
+                new String[] { "url", "capacity", "pooled", "created", 
"acquired", "released", "discarded" },
+                new String[] { "Url", "Capacity", "Pooled", "Created", 
"Acquired", "Released", "Discarded" },
+                new OpenType[] {
+                        SimpleType.STRING, SimpleType.INTEGER, 
SimpleType.INTEGER, SimpleType.LONG, SimpleType.LONG,
+                        SimpleType.LONG, SimpleType.LONG });
     }
 
     public static TabularType listRuntimeEndpointsTabularType() throws 
OpenDataException {
diff --git 
a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedExchangeFactoryManagerMBean.java
 
b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedExchangeFactoryManagerMBean.java
index bedebc9..f2d57fd 100644
--- 
a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedExchangeFactoryManagerMBean.java
+++ 
b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedExchangeFactoryManagerMBean.java
@@ -23,6 +23,9 @@ import org.apache.camel.api.management.ManagedOperation;
 
 public interface ManagedExchangeFactoryManagerMBean extends 
ManagedServiceMBean {
 
+    @ManagedAttribute(description = "Number of consumers managed")
+    Integer getConsumerCounter();
+
     @ManagedAttribute(description = "Max capacity per consumer for exchange 
pooling")
     Integer getCapacity();
 
@@ -38,7 +41,22 @@ public interface ManagedExchangeFactoryManagerMBean extends 
ManagedServiceMBean
     @ManagedOperation(description = "Purges the pool")
     void purge();
 
-    @ManagedOperation(description = "Lists all the consumers and their pooling 
statistics")
-    TabularData listPools();
+    @ManagedAttribute(description = "Total number of currently pooled 
exchanges (if pooling is in use)")
+    Integer getTotalPooled();
+
+    @ManagedAttribute(description = "Total number of new exchanges created")
+    Long getTotalCreated();
+
+    @ManagedAttribute(description = "Total number of exchanges reused (if 
pooling is in use)")
+    Long getTotalAcquired();
+
+    @ManagedAttribute(description = "Total number of exchanges released back 
to the pool")
+    Long getTotalReleased();
+
+    @ManagedAttribute(description = "Total number of exchanges discarded (such 
as when capacity is full)")
+    Long getTotalDiscarded();
+
+    @ManagedOperation(description = "Lists all the statistics in tabular form")
+    TabularData listStatistics();
 
 }
diff --git 
a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedExchangeFactoryManager.java
 
b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedExchangeFactoryManager.java
index bab7a0d..dea0a7c 100644
--- 
a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedExchangeFactoryManager.java
+++ 
b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedExchangeFactoryManager.java
@@ -52,6 +52,16 @@ public class ManagedExchangeFactoryManager extends 
ManagedService implements Man
     }
 
     @Override
+    public Integer getConsumerCounter() {
+        return exchangeFactoryManager.getConsumerCounter();
+    }
+
+    @Override
+    public Integer getTotalPooled() {
+        return exchangeFactoryManager.getPooledCounter();
+    }
+
+    @Override
     public Integer getCapacity() {
         return exchangeFactoryManager.getCapacity();
     }
@@ -77,7 +87,27 @@ public class ManagedExchangeFactoryManager extends 
ManagedService implements Man
     }
 
     @Override
-    public TabularData listPools() {
+    public Long getTotalCreated() {
+        return exchangeFactoryManager.getStatistics().getCreatedCounter();
+    }
+
+    @Override
+    public Long getTotalAcquired() {
+        return exchangeFactoryManager.getStatistics().getAcquiredCounter();
+    }
+
+    @Override
+    public Long getTotalReleased() {
+        return exchangeFactoryManager.getStatistics().getReleasedCounter();
+    }
+
+    @Override
+    public Long getTotalDiscarded() {
+        return exchangeFactoryManager.getStatistics().getDiscardedCounter();
+    }
+
+    @Override
+    public TabularData listStatistics() {
         try {
             TabularData answer = new 
TabularDataSupport(CamelOpenMBeanTypes.listExchangeFactoryTabularType());
             Collection<ExchangeFactory> factories = 
exchangeFactoryManager.getExchangeFactories();
@@ -91,15 +121,19 @@ public class ManagedExchangeFactoryManager extends 
ManagedService implements Man
                 int capacity = ef.getCapacity();
                 int size = ef.getSize();
                 long created = 0;
+                long acquired = 0;
                 long released = 0;
+                long discarded = 0;
                 if (ef.isStatisticsEnabled()) {
                     created = ef.getStatistics().getCreatedCounter();
+                    acquired = ef.getStatistics().getAcquiredCounter();
                     released = ef.getStatistics().getReleasedCounter();
+                    discarded = ef.getStatistics().getDiscardedCounter();
                 }
 
                 CompositeData data = new CompositeDataSupport(
-                        ct, new String[] { "url", "capacity", "pooled", 
"created", "released" },
-                        new Object[] { url, capacity, size, created, released 
});
+                        ct, new String[] { "url", "capacity", "pooled", 
"created", "acquired", "released", "discarded" },
+                        new Object[] { url, capacity, size, created, acquired, 
released, discarded });
                 answer.put(data);
             }
             return answer;
diff --git 
a/core/camel-management/src/test/java/org/apache/camel/management/ManagedNonManagedServiceTest.java
 
b/core/camel-management/src/test/java/org/apache/camel/management/ManagedNonManagedServiceTest.java
index 0779481..87948df 100644
--- 
a/core/camel-management/src/test/java/org/apache/camel/management/ManagedNonManagedServiceTest.java
+++ 
b/core/camel-management/src/test/java/org/apache/camel/management/ManagedNonManagedServiceTest.java
@@ -31,7 +31,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class ManagedNonManagedServiceTest extends ManagementTestSupport {
 
-    private static final int SERVICES = 11;
+    private static final int SERVICES = 12;
 
     @Test
     public void testService() throws Exception {
diff --git 
a/core/camel-management/src/test/java/org/apache/camel/management/ManagedPooledExchangeTest.java
 
b/core/camel-management/src/test/java/org/apache/camel/management/ManagedPooledExchangeTest.java
new file mode 100644
index 0000000..163c7bc
--- /dev/null
+++ 
b/core/camel-management/src/test/java/org/apache/camel/management/ManagedPooledExchangeTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedCamelContext;
+import org.apache.camel.Processor;
+import org.apache.camel.ServiceStatus;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.engine.PooledExchangeFactory;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertSame;
+
+public class ManagedPooledExchangeTest extends ManagementTestSupport {
+
+    private final AtomicInteger counter = new AtomicInteger();
+    private final AtomicReference<Exchange> ref = new AtomicReference<>();
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext context = super.createCamelContext();
+
+        PooledExchangeFactory pef = new PooledExchangeFactory();
+        pef.setStatisticsEnabled(true);
+        pef.setCapacity(123);
+        context.adapt(ExtendedCamelContext.class).setExchangeFactory(pef);
+
+        return context;
+    }
+
+    @Test
+    public void testSameExchange() throws Exception {
+        // JMX tests dont work well on AIX CI servers (hangs them)
+        if (isPlatform("aix")) {
+            return;
+        }
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(3);
+        mock.expectedPropertyValuesReceivedInAnyOrder("myprop", 1, 3, 5);
+        mock.expectedHeaderValuesReceivedInAnyOrder("myheader", 2, 4, 6);
+        mock.message(0).header("first").isEqualTo(true);
+        mock.message(1).header("first").isNull();
+        mock.message(2).header("first").isNull();
+
+        context.getRouteController().startAllRoutes();
+
+        assertMockEndpointsSatisfied();
+
+        // get the stats for the route
+        MBeanServer mbeanServer = getMBeanServer();
+
+        // get the object name for the delayer
+        ObjectName on
+                = 
ObjectName.getInstance("org.apache.camel:context=camel-1,type=services,name=DefaultExchangeFactoryManager");
+
+        String state = (String) mbeanServer.getAttribute(on, "State");
+        assertEquals(ServiceStatus.Started.name(), state);
+
+        Integer con = (Integer) mbeanServer.getAttribute(on, 
"ConsumerCounter");
+        assertEquals(1, con.intValue());
+
+        Integer cap = (Integer) mbeanServer.getAttribute(on, "Capacity");
+        assertEquals(123, cap.intValue());
+
+        // also only 1 exchange pooled
+        con = (Integer) mbeanServer.getAttribute(on, "TotalPooled");
+        assertEquals(1, con.intValue());
+
+        Long num = (Long) mbeanServer.getAttribute(on, "TotalCreated");
+        assertEquals(1, num.intValue());
+
+        num = (Long) mbeanServer.getAttribute(on, "TotalAcquired");
+        assertEquals(2, num.intValue());
+
+        num = (Long) mbeanServer.getAttribute(on, "TotalReleased");
+        assertEquals(3, num.intValue());
+
+        num = (Long) mbeanServer.getAttribute(on, "TotalDiscarded");
+        assertEquals(0, num.intValue());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                
from("timer:foo?period=1&delay=1&repeatCount=3").noAutoStartup()
+                        .setProperty("myprop", counter::incrementAndGet)
+                        .setHeader("myheader", counter::incrementAndGet)
+                        .process(new Processor() {
+                            @Override
+                            public void process(Exchange exchange) throws 
Exception {
+                                // should be same exchange instance as its 
pooled
+                                Exchange old = ref.get();
+                                if (old == null) {
+                                    ref.set(exchange);
+                                    exchange.getMessage().setHeader("first", 
true);
+                                } else {
+                                    assertSame(old, exchange);
+                                }
+                            }
+                        })
+                        .to("mock:result");
+            }
+        };
+    }
+}
diff --git 
a/core/camel-management/src/test/java/org/apache/camel/management/ManagedProducerRouteAddRemoveRegisterAlwaysTest.java
 
b/core/camel-management/src/test/java/org/apache/camel/management/ManagedProducerRouteAddRemoveRegisterAlwaysTest.java
index c88836f..faa9bc7 100644
--- 
a/core/camel-management/src/test/java/org/apache/camel/management/ManagedProducerRouteAddRemoveRegisterAlwaysTest.java
+++ 
b/core/camel-management/src/test/java/org/apache/camel/management/ManagedProducerRouteAddRemoveRegisterAlwaysTest.java
@@ -31,7 +31,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class ManagedProducerRouteAddRemoveRegisterAlwaysTest extends 
ManagementTestSupport {
 
-    private static final int SERVICES = 11;
+    private static final int SERVICES = 12;
 
     @Override
     protected CamelContext createCamelContext() throws Exception {
diff --git 
a/core/camel-management/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java
 
b/core/camel-management/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java
index bfef164..b589ecd 100644
--- 
a/core/camel-management/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java
+++ 
b/core/camel-management/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java
@@ -36,7 +36,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
  */
 public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
 
-    private static final int SERVICES = 11;
+    private static final int SERVICES = 12;
 
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
diff --git a/core/camel-management/src/test/resources/log4j2.properties 
b/core/camel-management/src/test/resources/log4j2.properties
index 3f1dcba..2e68815 100644
--- a/core/camel-management/src/test/resources/log4j2.properties
+++ b/core/camel-management/src/test/resources/log4j2.properties
@@ -46,4 +46,4 @@ rootLogger.appenderRef.file.ref = file
 #rootLogger.appenderRef.console.ref = console
 
 #logger.camel-core.name = org.apache.camel
-#logger.camel-core.level = TRACE
+#logger.camel-core.level = DEBUG

Reply via email to