This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 54c5f2c3242f0d4c7d9e61bc4e911d9fbad556f7 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Mon Apr 5 17:36:34 2021 +0200 CAMEL-16451: camel-core - ExchangePooling for EIPs. Wiretap EIP --- .../apache/camel/spi/ProcessorExchangeFactory.java | 9 ++++++++- .../engine/PooledProcessorExchangeFactory.java | 22 ++++++++++++++++++++++ .../engine/PrototypeProcessorExchangeFactory.java | 8 ++++++++ .../apache/camel/processor/WireTapProcessor.java | 13 ++++++------- 4 files changed, 44 insertions(+), 8 deletions(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ProcessorExchangeFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/ProcessorExchangeFactory.java index 77142d4..c48ad15 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/ProcessorExchangeFactory.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/ProcessorExchangeFactory.java @@ -16,7 +16,9 @@ */ package org.apache.camel.spi; +import org.apache.camel.Endpoint; import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; import org.apache.camel.NonManagedService; import org.apache.camel.Processor; @@ -53,7 +55,7 @@ public interface ProcessorExchangeFactory extends PooledObjectFactory<Exchange>, ProcessorExchangeFactory newProcessorExchangeFactory(Processor processor); /** - * Creates a copy of the given {@link Exchange} + * Gets a copy of the given {@link Exchange} * * @param exchange original copy of the exchange * @param handover whether the on completion callbacks should be handed over to the new copy. @@ -61,6 +63,11 @@ public interface ProcessorExchangeFactory extends PooledObjectFactory<Exchange>, Exchange createCorrelatedCopy(Exchange exchange, boolean handover); /** + * Gets a new {@link Exchange} + */ + Exchange create(Endpoint fromEndpoint, ExchangePattern exchangePattern); + + /** * Releases the exchange back into the pool * * @param exchange the exchange diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java index d3c37a6..697e02f 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java @@ -16,7 +16,9 @@ */ package org.apache.camel.impl.engine; +import org.apache.camel.Endpoint; import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; import org.apache.camel.ExchangePropertyKey; import org.apache.camel.ExtendedExchange; import org.apache.camel.PooledExchange; @@ -90,6 +92,26 @@ public class PooledProcessorExchangeFactory extends PrototypeProcessorExchangeFa } @Override + public Exchange create(Endpoint fromEndpoint, ExchangePattern exchangePattern) { + Exchange answer = pool.poll(); + if (answer == null) { + // create a new exchange as there was no free from the pool + answer = super.create(fromEndpoint, exchangePattern); + if (statisticsEnabled) { + statistics.created.increment(); + } + } else { + if (statisticsEnabled) { + statistics.acquired.increment(); + } + // reset exchange for reuse + PooledExchange ee = (PooledExchange) answer; + ee.reset(System.currentTimeMillis()); + } + return answer; + } + + @Override public boolean release(Exchange exchange) { try { // done exchange before returning back to pool diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeProcessorExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeProcessorExchangeFactory.java index 0b10211..e4f4d18 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeProcessorExchangeFactory.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeProcessorExchangeFactory.java @@ -16,9 +16,12 @@ */ package org.apache.camel.impl.engine; +import org.apache.camel.Endpoint; import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; import org.apache.camel.Processor; import org.apache.camel.spi.ProcessorExchangeFactory; +import org.apache.camel.support.DefaultExchange; import org.apache.camel.support.ExchangeHelper; import org.apache.camel.support.PooledObjectFactorySupport; import org.slf4j.Logger; @@ -83,6 +86,11 @@ public class PrototypeProcessorExchangeFactory extends PooledObjectFactorySuppor } @Override + public Exchange create(Endpoint fromEndpoint, ExchangePattern exchangePattern) { + return new DefaultExchange(fromEndpoint, exchangePattern); + } + + @Override public Exchange acquire() { throw new UnsupportedOperationException("Not in use"); } diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java index fff7cb9..f3b90d4 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java @@ -287,8 +287,7 @@ public class WireTapProcessor extends AsyncProcessorSupport } private Exchange configureNewExchange(Exchange exchange) { - // no copy so lets just create a new exchange always - return new DefaultExchange(exchange.getFromEndpoint(), ExchangePattern.InOnly); + return processorExchangeFactory.create(exchange.getFromEndpoint(), ExchangePattern.InOnly); } public List<Processor> getNewExchangeProcessors() { @@ -352,11 +351,11 @@ public class WireTapProcessor extends AsyncProcessorSupport @Override protected void doBuild() throws Exception { - if (copy) { - // create a per processor exchange factory - this.processorExchangeFactory = getCamelContext().adapt(ExtendedCamelContext.class) - .getProcessorExchangeFactory().newProcessorExchangeFactory(this); - } + // create a per processor exchange factory + this.processorExchangeFactory = getCamelContext().adapt(ExtendedCamelContext.class) + .getProcessorExchangeFactory().newProcessorExchangeFactory(this); + this.processorExchangeFactory.setRouteId(getRouteId()); + this.processorExchangeFactory.setId(getId()); boolean pooled = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().isPooled(); if (pooled) {