This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/exchange-factory by this push:
     new 76617ba  CAMEL-16222: PooledExchangeFactory experiment
76617ba is described below

commit 76617ba3e8496afeb469195d547f1fbf03d2d81c
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Wed Feb 17 15:37:52 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../java/org/apache/camel/ExtendedExchange.java    |  5 ++
 .../camel/impl/engine/PooledExchangeFactory.java   | 84 ++++++++++++++++++++++
 .../org/apache/camel/support/DefaultExchange.java  | 29 +++++++-
 3 files changed, 117 insertions(+), 1 deletion(-)

diff --git 
a/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java 
b/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
index bb0b523..c2a1ffd 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
+++ b/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
@@ -29,6 +29,11 @@ import org.apache.camel.spi.UnitOfWork;
 public interface ExtendedExchange extends Exchange {
 
     /**
+     * Clears the exchange from user data so it may be reused.
+     */
+    void reset();
+
+    /**
      * Sets the endpoint which originated this message exchange. This method 
should typically only be called by
      * {@link Endpoint} implementations
      */
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
new file mode 100644
index 0000000..ced186e
--- /dev/null
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.engine;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.camel.*;
+import org.apache.camel.spi.ExchangeFactory;
+import org.apache.camel.support.DefaultExchange;
+import org.apache.camel.support.service.ServiceSupport;
+
+/**
+ * Pooled {@link ExchangeFactory} that reuses {@link Exchange} instance from a 
pool.
+ */
+@Experimental
+public class PooledExchangeFactory extends ServiceSupport
+        implements ExchangeFactory, CamelContextAware, StaticService, 
NonManagedService {
+
+    private final ConcurrentLinkedQueue<Exchange> pool = new 
ConcurrentLinkedQueue<>();
+
+    private CamelContext camelContext;
+
+    @Override
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    @Override
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    @Override
+    public Exchange create() {
+        Exchange exchange = pool.poll();
+        if (exchange == null) {
+            // create a new exchange as there was no free from the pool
+            exchange = new DefaultExchange(camelContext);
+        } else {
+            // reset exchange before we use it
+            ExtendedExchange ee = exchange.adapt(ExtendedExchange.class);
+            ee.reset();
+        }
+        return exchange;
+    }
+
+    @Override
+    public Exchange create(Endpoint fromEndpoint) {
+        Exchange exchange = pool.poll();
+        if (exchange == null) {
+            // create a new exchange as there was no free from the pool
+            exchange = new DefaultExchange(fromEndpoint);
+        } else {
+            // need to mark this exchange from the given endpoint
+            
exchange.adapt(ExtendedExchange.class).setFromEndpoint(fromEndpoint);
+        }
+        return exchange;
+    }
+
+    @Override
+    public void release(Exchange exchange) {
+        pool.offer(exchange);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        pool.clear();
+    }
+
+}
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
index 986312b..fca25fa 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
@@ -45,7 +45,7 @@ import org.apache.camel.util.ObjectHelper;
 public final class DefaultExchange implements ExtendedExchange {
 
     private final CamelContext context;
-    private final long created;
+    private long created;
     // optimize to create properties always and with a reasonable small size
     private final Map<String, Object> properties = new ConcurrentHashMap<>(8);
     private Message in;
@@ -115,6 +115,33 @@ public final class DefaultExchange implements 
ExtendedExchange {
         }
     }
 
+    public void reset() {
+        this.properties.clear();
+        this.exchangeId = null;
+        this.created = System.currentTimeMillis();
+        this.out = null;
+        this.exception = null;
+        this.unitOfWork = null;
+        this.pattern = null;
+        this.fromEndpoint = null;
+        this.fromRouteId = null;
+        if (this.onCompletions != null) {
+            this.onCompletions.clear();
+        }
+        this.externalRedelivered = null;
+        this.historyNodeId = null;
+        this.historyNodeLabel = null;
+        this.transacted = false;
+        this.routeStop = false;
+        this.rollbackOnly = false;
+        this.rollbackOnlyLast = false;
+        this.notifyEvent = false;
+        this.interrupted = false;
+        this.interruptable = true;
+        this.redeliveryExhausted = false;
+        this.errorHandlerHandled = null;
+    }
+
     @Override
     public long getCreated() {
         return created;

Reply via email to