This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 59c8d801a254879e9b72745bc16933023a5682c6 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Mon Apr 5 16:03:59 2021 +0200 CAMEL-16451: camel-core - ExchangePooling for EIPs. Wiretap EIP --- .../org/apache/camel/ExtendedCamelContext.java | 11 ++ .../java/org/apache/camel/spi/ExchangeFactory.java | 3 + .../apache/camel/spi/ProcessorExchangeFactory.java | 71 +++++++++++ .../camel/impl/engine/AbstractCamelContext.java | 24 ++++ .../engine/PrototypeProcessorExchangeFactory.java | 136 +++++++++++++++++++++ .../camel/impl/engine/SimpleCamelContext.java | 12 ++ .../camel/impl/ExtendedCamelContextConfigurer.java | 6 + .../camel/impl/lw/LightweightCamelContext.java | 11 ++ .../impl/lw/LightweightRuntimeCamelContext.java | 13 ++ .../apache/camel/processor/WireTapProcessor.java | 34 ++++-- .../org/apache/camel/reifier/WireTapReifier.java | 3 +- 11 files changed, 309 insertions(+), 15 deletions(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java b/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java index 654cc52..b105f47 100644 --- a/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java +++ b/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java @@ -54,6 +54,7 @@ import org.apache.camel.spi.NodeIdFactory; import org.apache.camel.spi.NormalizedEndpointUri; import org.apache.camel.spi.PackageScanClassResolver; import org.apache.camel.spi.PackageScanResourceResolver; +import org.apache.camel.spi.ProcessorExchangeFactory; import org.apache.camel.spi.ProcessorFactory; import org.apache.camel.spi.ReactiveExecutor; import org.apache.camel.spi.Registry; @@ -237,6 +238,16 @@ public interface ExtendedCamelContext extends CamelContext { void setExchangeFactoryManager(ExchangeFactoryManager exchangeFactoryManager); /** + * Gets the processor exchange factory to use. + */ + ProcessorExchangeFactory getProcessorExchangeFactory(); + + /** + * Sets a custom processor exchange factory to use. + */ + void setProcessorExchangeFactory(ProcessorExchangeFactory processorExchangeFactory); + + /** * Returns the bean post processor used to do any bean customization. * * @return the bean post processor. diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java index 75b9d21..83a5e04 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java @@ -33,6 +33,9 @@ import org.apache.camel.NonManagedService; * <p/> * The factory is pluggable which allows to use different strategies. The default factory will create a new * {@link Exchange} instance, and the pooled factory will pool and reuse exchanges. + * + * @see ProcessorExchangeFactory + * @see org.apache.camel.PooledExchange */ public interface ExchangeFactory extends PooledObjectFactory<Exchange>, NonManagedService, RouteIdAware { diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ProcessorExchangeFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/ProcessorExchangeFactory.java new file mode 100644 index 0000000..77142d4 --- /dev/null +++ b/core/camel-api/src/main/java/org/apache/camel/spi/ProcessorExchangeFactory.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spi; + +import org.apache.camel.Exchange; +import org.apache.camel.NonManagedService; +import org.apache.camel.Processor; + +/** + * Factory used by {@link org.apache.camel.Processor} (EIPs) when they create copies of the processed {@link Exchange}. + * <p/> + * Some EIPs like WireTap, Multicast, Split etc creates copies of the processed exchange which they use as sub + * exchanges. This factory allows to use exchange pooling. + * + * The factory is pluggable which allows to use different strategies. The default factory will create a new + * {@link Exchange} instance, and the pooled factory will pool and reuse exchanges. + * + * @see ExchangeFactory + * @see org.apache.camel.PooledExchange + */ +public interface ProcessorExchangeFactory extends PooledObjectFactory<Exchange>, NonManagedService, RouteIdAware, IdAware { + + /** + * Service factory key. + */ + String FACTORY = "processor-exchange-factory"; + + /** + * The processor using this factory. + */ + Processor getProcessor(); + + /** + * Creates a new {@link ProcessorExchangeFactory} that is private for the given consumer. + * + * @param processor the processor that will use the created {@link ProcessorExchangeFactory} + * @return the created factory. + */ + ProcessorExchangeFactory newProcessorExchangeFactory(Processor processor); + + /** + * Creates a copy of the given {@link Exchange} + * + * @param exchange original copy of the exchange + * @param handover whether the on completion callbacks should be handed over to the new copy. + */ + Exchange createCorrelatedCopy(Exchange exchange, boolean handover); + + /** + * Releases the exchange back into the pool + * + * @param exchange the exchange + * @return true if released into the pool, or false if something went wrong and the exchange was discarded + */ + boolean release(Exchange exchange); + +} diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java index afa7832..c1d8fd1 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java @@ -130,6 +130,7 @@ import org.apache.camel.spi.NodeIdFactory; import org.apache.camel.spi.NormalizedEndpointUri; import org.apache.camel.spi.PackageScanClassResolver; import org.apache.camel.spi.PackageScanResourceResolver; +import org.apache.camel.spi.ProcessorExchangeFactory; import org.apache.camel.spi.ProcessorFactory; import org.apache.camel.spi.PropertiesComponent; import org.apache.camel.spi.ReactiveExecutor; @@ -271,6 +272,7 @@ public abstract class AbstractCamelContext extends BaseService private volatile CamelContextNameStrategy nameStrategy; private volatile ExchangeFactoryManager exchangeFactoryManager; private volatile ExchangeFactory exchangeFactory; + private volatile ProcessorExchangeFactory processorExchangeFactory; private volatile ReactiveExecutor reactiveExecutor; private volatile ManagementNameStrategy managementNameStrategy; private volatile Registry registry; @@ -3701,6 +3703,7 @@ public abstract class AbstractCamelContext extends BaseService asyncProcessorAwaitManager = null; exchangeFactory = null; exchangeFactoryManager = null; + processorExchangeFactory = null; registry = null; } @@ -4715,6 +4718,25 @@ public abstract class AbstractCamelContext extends BaseService } @Override + public ProcessorExchangeFactory getProcessorExchangeFactory() { + if (processorExchangeFactory == null) { + synchronized (lock) { + if (processorExchangeFactory == null) { + setProcessorExchangeFactory(createProcessorExchangeFactory()); + } + } + } + return processorExchangeFactory; + } + + @Override + public void setProcessorExchangeFactory(ProcessorExchangeFactory processorExchangeFactory) { + // automatic inject camel context + processorExchangeFactory.setCamelContext(this); + this.processorExchangeFactory = processorExchangeFactory; + } + + @Override public ReactiveExecutor getReactiveExecutor() { if (reactiveExecutor == null) { synchronized (lock) { @@ -4810,6 +4832,8 @@ public abstract class AbstractCamelContext extends BaseService protected abstract ExchangeFactoryManager createExchangeFactoryManager(); + protected abstract ProcessorExchangeFactory createProcessorExchangeFactory(); + protected abstract HealthCheckRegistry createHealthCheckRegistry(); protected abstract ReactiveExecutor createReactiveExecutor(); diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeProcessorExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeProcessorExchangeFactory.java new file mode 100644 index 0000000..0b10211 --- /dev/null +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeProcessorExchangeFactory.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl.engine; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.spi.ProcessorExchangeFactory; +import org.apache.camel.support.ExchangeHelper; +import org.apache.camel.support.PooledObjectFactorySupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link org.apache.camel.spi.ProcessorExchangeFactory} that creates a new {@link Exchange} instance. + */ +public class PrototypeProcessorExchangeFactory extends PooledObjectFactorySupport<Exchange> + implements ProcessorExchangeFactory { + + private static final Logger LOG = LoggerFactory.getLogger(PrototypeProcessorExchangeFactory.class); + + final Processor processor; + String routeId; + String id; + + public PrototypeProcessorExchangeFactory() { + this.processor = null; + } + + public PrototypeProcessorExchangeFactory(Processor processor) { + this.processor = processor; + } + + @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + + @Override + public String getId() { + return id; + } + + @Override + public void setId(String id) { + this.id = id; + } + + public Processor getProcessor() { + return processor; + } + + @Override + public ProcessorExchangeFactory newProcessorExchangeFactory(Processor processor) { + PrototypeProcessorExchangeFactory answer = new PrototypeProcessorExchangeFactory(processor); + answer.setStatisticsEnabled(statisticsEnabled); + answer.setCapacity(capacity); + answer.setCamelContext(camelContext); + return answer; + } + + @Override + public Exchange createCorrelatedCopy(Exchange exchange, boolean handover) { + return ExchangeHelper.createCorrelatedCopy(exchange, handover); + } + + @Override + public Exchange acquire() { + throw new UnsupportedOperationException("Not in use"); + } + + @Override + public boolean release(Exchange exchange) { + if (statisticsEnabled) { + statistics.released.increment(); + } + return true; + } + + @Override + public boolean isPooled() { + return false; + } + + @Override + protected void doStop() throws Exception { + super.doStop(); + logUsageSummary(LOG, "PrototypeProcessorExchangeFactory", 0); + } + + void logUsageSummary(Logger log, String name, int pooled) { + if (statisticsEnabled && processor != null) { + // only log if there is any usage + long created = statistics.getCreatedCounter(); + long acquired = statistics.getAcquiredCounter(); + long released = statistics.getReleasedCounter(); + long discarded = statistics.getDiscardedCounter(); + boolean shouldLog = pooled > 0 || created > 0 || acquired > 0 || released > 0 || discarded > 0; + if (shouldLog) { + String rid = getRouteId(); + String pid = getId(); + + // are there any leaks? + boolean leak = created + acquired > released + discarded; + if (leak) { + long leaks = (created + acquired) - (released + discarded); + log.info( + "{} {} ({}) usage (leaks detected: {}) [pooled: {}, created: {}, acquired: {} released: {}, discarded: {}]", + name, rid, pid, leaks, pooled, created, acquired, released, discarded); + } else { + log.info("{} {} ({}) usage [pooled: {}, created: {}, acquired: {} released: {}, discarded: {}]", + name, rid, pid, pooled, created, acquired, released, discarded); + } + } + } + } + +} diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java index 3c46c48..254fc2d 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java @@ -59,6 +59,7 @@ import org.apache.camel.spi.ModelToXMLDumper; import org.apache.camel.spi.NodeIdFactory; import org.apache.camel.spi.PackageScanClassResolver; import org.apache.camel.spi.PackageScanResourceResolver; +import org.apache.camel.spi.ProcessorExchangeFactory; import org.apache.camel.spi.ProcessorFactory; import org.apache.camel.spi.PropertiesComponent; import org.apache.camel.spi.ReactiveExecutor; @@ -574,6 +575,17 @@ public class SimpleCamelContext extends AbstractCamelContext { } @Override + protected ProcessorExchangeFactory createProcessorExchangeFactory() { + Optional<ProcessorExchangeFactory> result = ResolverHelper.resolveService( + getCamelContextReference(), + getBootstrapFactoryFinder(), + ProcessorExchangeFactory.FACTORY, + ProcessorExchangeFactory.class); + + return result.orElseGet(PrototypeProcessorExchangeFactory::new); + } + + @Override protected ReactiveExecutor createReactiveExecutor() { Optional<ReactiveExecutor> result = ResolverHelper.resolveService( getCamelContextReference(), diff --git a/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java b/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java index d06c43d..9d5d73a 100644 --- a/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java +++ b/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java @@ -121,6 +121,8 @@ public class ExtendedCamelContextConfigurer extends org.apache.camel.support.com case "PackageScanClassResolver": target.setPackageScanClassResolver(property(camelContext, org.apache.camel.spi.PackageScanClassResolver.class, value)); return true; case "packagescanresourceresolver": case "PackageScanResourceResolver": target.setPackageScanResourceResolver(property(camelContext, org.apache.camel.spi.PackageScanResourceResolver.class, value)); return true; + case "processorexchangefactory": + case "ProcessorExchangeFactory": target.setProcessorExchangeFactory(property(camelContext, org.apache.camel.spi.ProcessorExchangeFactory.class, value)); return true; case "processorfactory": case "ProcessorFactory": target.setProcessorFactory(property(camelContext, org.apache.camel.spi.ProcessorFactory.class, value)); return true; case "propertiescomponent": @@ -294,6 +296,8 @@ public class ExtendedCamelContextConfigurer extends org.apache.camel.support.com case "PackageScanClassResolver": return org.apache.camel.spi.PackageScanClassResolver.class; case "packagescanresourceresolver": case "PackageScanResourceResolver": return org.apache.camel.spi.PackageScanResourceResolver.class; + case "processorexchangefactory": + case "ProcessorExchangeFactory": return org.apache.camel.spi.ProcessorExchangeFactory.class; case "processorfactory": case "ProcessorFactory": return org.apache.camel.spi.ProcessorFactory.class; case "propertiescomponent": @@ -468,6 +472,8 @@ public class ExtendedCamelContextConfigurer extends org.apache.camel.support.com case "PackageScanClassResolver": return target.getPackageScanClassResolver(); case "packagescanresourceresolver": case "PackageScanResourceResolver": return target.getPackageScanResourceResolver(); + case "processorexchangefactory": + case "ProcessorExchangeFactory": return target.getProcessorExchangeFactory(); case "processorfactory": case "ProcessorFactory": return target.getProcessorFactory(); case "propertiescomponent": diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java index 6101ee5..626d4cf 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java @@ -114,6 +114,7 @@ import org.apache.camel.spi.NodeIdFactory; import org.apache.camel.spi.NormalizedEndpointUri; import org.apache.camel.spi.PackageScanClassResolver; import org.apache.camel.spi.PackageScanResourceResolver; +import org.apache.camel.spi.ProcessorExchangeFactory; import org.apache.camel.spi.ProcessorFactory; import org.apache.camel.spi.PropertiesComponent; import org.apache.camel.spi.ReactiveExecutor; @@ -1469,6 +1470,16 @@ public class LightweightCamelContext implements ExtendedCamelContext, CatalogCam } @Override + public ProcessorExchangeFactory getProcessorExchangeFactory() { + return getExtendedCamelContext().getProcessorExchangeFactory(); + } + + @Override + public void setProcessorExchangeFactory(ProcessorExchangeFactory processorExchangeFactory) { + getExtendedCamelContext().setProcessorExchangeFactory(processorExchangeFactory); + } + + @Override public ReactiveExecutor getReactiveExecutor() { return getExtendedCamelContext().getReactiveExecutor(); } diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightRuntimeCamelContext.java b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightRuntimeCamelContext.java index af98c56..d70e23d 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightRuntimeCamelContext.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightRuntimeCamelContext.java @@ -110,6 +110,7 @@ import org.apache.camel.spi.NodeIdFactory; import org.apache.camel.spi.NormalizedEndpointUri; import org.apache.camel.spi.PackageScanClassResolver; import org.apache.camel.spi.PackageScanResourceResolver; +import org.apache.camel.spi.ProcessorExchangeFactory; import org.apache.camel.spi.ProcessorFactory; import org.apache.camel.spi.PropertiesComponent; import org.apache.camel.spi.ReactiveExecutor; @@ -171,6 +172,7 @@ public class LightweightRuntimeCamelContext implements ExtendedCamelContext, Cat private final HeadersMapFactory headersMapFactory; private final ExchangeFactory exchangeFactory; private final ExchangeFactoryManager exchangeFactoryManager; + private final ProcessorExchangeFactory processorExchangeFactory; private final ReactiveExecutor reactiveExecutor; private final AsyncProcessorAwaitManager asyncProcessorAwaitManager; private final ExecutorServiceManager executorServiceManager; @@ -217,6 +219,7 @@ public class LightweightRuntimeCamelContext implements ExtendedCamelContext, Cat headersMapFactory = context.adapt(ExtendedCamelContext.class).getHeadersMapFactory(); exchangeFactory = context.adapt(ExtendedCamelContext.class).getExchangeFactory(); exchangeFactoryManager = context.adapt(ExtendedCamelContext.class).getExchangeFactoryManager(); + processorExchangeFactory = context.adapt(ExtendedCamelContext.class).getProcessorExchangeFactory(); reactiveExecutor = context.adapt(ExtendedCamelContext.class).getReactiveExecutor(); asyncProcessorAwaitManager = context.adapt(ExtendedCamelContext.class).getAsyncProcessorAwaitManager(); executorServiceManager = context.getExecutorServiceManager(); @@ -1578,6 +1581,16 @@ public class LightweightRuntimeCamelContext implements ExtendedCamelContext, Cat } @Override + public ProcessorExchangeFactory getProcessorExchangeFactory() { + return processorExchangeFactory; + } + + @Override + public void setProcessorExchangeFactory(ProcessorExchangeFactory processorExchangeFactory) { + throw new UnsupportedOperationException(); + } + + @Override public ReactiveExecutor getReactiveExecutor() { return reactiveExecutor; } diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java index b6c7822..8cf889b 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java @@ -39,12 +39,12 @@ import org.apache.camel.StreamCache; import org.apache.camel.Traceable; import org.apache.camel.spi.EndpointUtilizationStatistics; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.ProcessorExchangeFactory; import org.apache.camel.spi.RouteIdAware; import org.apache.camel.spi.ShutdownAware; import org.apache.camel.support.AsyncProcessorConverterHelper; import org.apache.camel.support.AsyncProcessorSupport; import org.apache.camel.support.DefaultExchange; -import org.apache.camel.support.ExchangeHelper; import org.apache.camel.support.service.ServiceHelper; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; @@ -67,26 +67,28 @@ public class WireTapProcessor extends AsyncProcessorSupport private final Processor processor; private final AsyncProcessor asyncProcessor; private final ExchangePattern exchangePattern; + private final boolean copy; private final ExecutorService executorService; private volatile boolean shutdownExecutorService; private final LongAdder taskCount = new LongAdder(); + private ProcessorExchangeFactory processorExchangeFactory; private PooledExchangeTaskFactory taskFactory; // expression or processor used for populating a new exchange to send // as opposed to traditional wiretap that sends a copy of the original exchange private Expression newExchangeExpression; private List<Processor> newExchangeProcessors; - private boolean copy; private Processor onPrepare; public WireTapProcessor(SendDynamicProcessor dynamicSendProcessor, Processor processor, String uri, - ExchangePattern exchangePattern, + ExchangePattern exchangePattern, boolean copy, ExecutorService executorService, boolean shutdownExecutorService, boolean dynamicUri) { this.dynamicSendProcessor = dynamicSendProcessor; this.uri = uri; this.processor = processor; this.asyncProcessor = AsyncProcessorConverterHelper.convert(processor); this.exchangePattern = exchangePattern; + this.copy = copy; ObjectHelper.notNull(executorService, "executorService"); this.executorService = executorService; this.shutdownExecutorService = shutdownExecutorService; @@ -101,6 +103,9 @@ public class WireTapProcessor extends AsyncProcessorSupport public void done(boolean doneSync) { taskCount.decrement(); taskFactory.release(WireTapTask.this); + if (processorExchangeFactory != null) { + processorExchangeFactory.release(exchange); + } } }; @@ -272,7 +277,7 @@ public class WireTapProcessor extends AsyncProcessorSupport private Exchange configureCopyExchange(Exchange exchange) { // must use a copy as we dont want it to cause side effects of the original exchange - Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false); + Exchange copy = processorExchangeFactory.createCorrelatedCopy(exchange, false); // set MEP to InOnly as this wire tap is a fire and forget copy.setPattern(ExchangePattern.InOnly); // remove STREAM_CACHE_UNIT_OF_WORK property because this wire tap will @@ -282,6 +287,7 @@ public class WireTapProcessor extends AsyncProcessorSupport } private Exchange configureNewExchange(Exchange exchange) { + // no copy so lets just create a new exchange always return new DefaultExchange(exchange.getFromEndpoint(), ExchangePattern.InOnly); } @@ -312,10 +318,6 @@ public class WireTapProcessor extends AsyncProcessorSupport return copy; } - public void setCopy(boolean copy) { - this.copy = copy; - } - public Processor getOnPrepare() { return onPrepare; } @@ -350,6 +352,12 @@ public class WireTapProcessor extends AsyncProcessorSupport @Override protected void doBuild() throws Exception { + if (copy) { + // create a per processor exchange factory + this.processorExchangeFactory = getCamelContext().adapt(ExtendedCamelContext.class) + .getProcessorExchangeFactory().newProcessorExchangeFactory(this); + } + boolean pooled = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().isPooled(); if (pooled) { taskFactory = new PooledTaskFactory(getId()) { @@ -370,27 +378,27 @@ public class WireTapProcessor extends AsyncProcessorSupport } LOG.trace("Using TaskFactory: {}", taskFactory); - ServiceHelper.buildService(taskFactory, processor); + ServiceHelper.buildService(processorExchangeFactory, taskFactory, processor); } @Override protected void doInit() throws Exception { - ServiceHelper.initService(taskFactory, processor); + ServiceHelper.initService(processorExchangeFactory, taskFactory, processor); } @Override protected void doStart() throws Exception { - ServiceHelper.startService(taskFactory, processor); + ServiceHelper.startService(processorExchangeFactory, taskFactory, processor); } @Override protected void doStop() throws Exception { - ServiceHelper.stopService(taskFactory, processor); + ServiceHelper.stopService(processorExchangeFactory, taskFactory, processor); } @Override protected void doShutdown() throws Exception { - ServiceHelper.stopAndShutdownServices(taskFactory, processor); + ServiceHelper.stopAndShutdownServices(processorExchangeFactory, taskFactory, processor); if (shutdownExecutorService) { getCamelContext().getExecutorServiceManager().shutdownNow(executorService); } diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java index 3d0c214..23a7508 100644 --- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java +++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java @@ -84,9 +84,8 @@ public class WireTapReifier extends ToDynamicReifier<WireTapDefinition<?>> { WireTapProcessor answer = new WireTapProcessor( dynamicSendProcessor, target, uri, - parse(ExchangePattern.class, definition.getPattern()), + parse(ExchangePattern.class, definition.getPattern()), isCopy, threadPool, shutdownThreadPool, dynamic); - answer.setCopy(isCopy); Processor newExchangeProcessor = definition.getNewExchangeProcessor(); String ref = parseString(definition.getNewExchangeProcessorRef()); if (ref != null) {