This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 98fceb97004c115dcfbccee339c2c4742f3d1dd6
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Sat Apr 10 10:53:08 2021 +0200

    CAMEL-16458: Enricher EIP - Use SendDynamicProcessor to call endpoint 
instead of own code.
---
 .../java/org/apache/camel/processor/Enricher.java  | 223 ++-------------------
 1 file changed, 14 insertions(+), 209 deletions(-)

diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java
index 847fe72..c465c0c 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java
@@ -18,35 +18,24 @@ package org.apache.camel.processor;
 
 import org.apache.camel.AggregationStrategy;
 import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
-import org.apache.camel.AsyncProducer;
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.CamelExchangeException;
-import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.ExchangePropertyKey;
 import org.apache.camel.Expression;
 import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.ExtendedExchange;
-import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.spi.EndpointUtilizationStatistics;
 import org.apache.camel.spi.IdAware;
-import org.apache.camel.spi.NormalizedEndpointUri;
 import org.apache.camel.spi.ProcessorExchangeFactory;
-import org.apache.camel.spi.ProducerCache;
 import org.apache.camel.spi.RouteIdAware;
-import org.apache.camel.support.AsyncProcessorConverterHelper;
 import org.apache.camel.support.AsyncProcessorSupport;
 import org.apache.camel.support.DefaultExchange;
-import org.apache.camel.support.EventHelper;
 import org.apache.camel.support.ExchangeHelper;
 import org.apache.camel.support.MessageHelper;
-import org.apache.camel.support.cache.DefaultProducerCache;
-import org.apache.camel.support.cache.EmptyProducerCache;
 import org.apache.camel.support.service.ServiceHelper;
-import org.apache.camel.util.StopWatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -69,7 +58,6 @@ public class Enricher extends AsyncProcessorSupport 
implements IdAware, RouteIdA
     private CamelContext camelContext;
     private String id;
     private String routeId;
-    private ProducerCache producerCache;
     private final Expression expression;
     private AggregationStrategy aggregationStrategy;
     private boolean aggregateOnException;
@@ -77,6 +65,7 @@ public class Enricher extends AsyncProcessorSupport 
implements IdAware, RouteIdA
     private int cacheSize;
     private boolean ignoreInvalidEndpoint;
     private ProcessorExchangeFactory processorExchangeFactory;
+    private SendDynamicProcessor sendDynamicProcessor;
 
     public Enricher(Expression expression) {
         this.expression = expression;
@@ -117,7 +106,7 @@ public class Enricher extends AsyncProcessorSupport 
implements IdAware, RouteIdA
     }
 
     public EndpointUtilizationStatistics getEndpointUtilizationStatistics() {
-        return producerCache.getEndpointUtilizationStatistics();
+        return sendDynamicProcessor.getEndpointUtilizationStatistics();
     }
 
     public void setAggregationStrategy(AggregationStrategy 
aggregationStrategy) {
@@ -160,72 +149,12 @@ public class Enricher extends AsyncProcessorSupport 
implements IdAware, RouteIdA
         this.ignoreInvalidEndpoint = ignoreInvalidEndpoint;
     }
 
-    /**
-     * Enriches the input data (<code>exchange</code>) by first obtaining 
additional data from an endpoint represented
-     * by an endpoint <code>producer</code> and second by aggregating input 
data and additional data. Aggregation of
-     * input data and additional data is delegated to an {@link 
AggregationStrategy} object set at construction time. If
-     * the message exchange with the resource endpoint fails then no 
aggregation will be done and the failed exchange
-     * content is copied over to the original message exchange.
-     *
-     * @param exchange input data.
-     */
     @Override
     public boolean process(final Exchange exchange, final AsyncCallback 
callback) {
-        // which producer to use
-        final AsyncProducer producer;
-        final Endpoint endpoint;
-
-        // use dynamic endpoint so calculate the endpoint to use
-        Object recipient = null;
-        boolean prototype = cacheSize < 0;
-        try {
-            recipient = expression.evaluate(exchange, Object.class);
-            recipient = prepareRecipient(exchange, recipient);
-            Endpoint existing = getExistingEndpoint(exchange, recipient);
-            if (existing == null) {
-                endpoint = resolveEndpoint(exchange, recipient, prototype);
-            } else {
-                endpoint = existing;
-                // we have an existing endpoint then its not a prototype scope
-                prototype = false;
-            }
-            // acquire the producer from the cache
-            producer = producerCache.acquireProducer(endpoint);
-        } catch (Throwable e) {
-            if (isIgnoreInvalidEndpoint()) {
-                LOG.debug("Endpoint uri is invalid: {}. This exception will be 
ignored.", recipient, e);
-            } else {
-                exchange.setException(e);
-            }
-            callback.done(true);
-            return true;
-        }
-
         final Exchange resourceExchange = createResourceExchange(exchange, 
ExchangePattern.InOut);
-        final Endpoint destination = producer.getEndpoint();
-
-        StopWatch sw = null;
-        boolean sending = 
EventHelper.notifyExchangeSending(exchange.getContext(), resourceExchange, 
destination);
-        if (sending) {
-            sw = new StopWatch();
-        }
-        // record timing for sending the exchange using the producer
-        final StopWatch watch = sw;
-        final boolean prototypeEndpoint = prototype;
-        AsyncProcessor ap = AsyncProcessorConverterHelper.convert(producer);
-        boolean sync = ap.process(resourceExchange, new AsyncCallback() {
+        return sendDynamicProcessor.process(resourceExchange, new 
AsyncCallback() {
+            @Override
             public void done(boolean doneSync) {
-                // we only have to handle async completion
-                if (doneSync) {
-                    return;
-                }
-
-                // emit event that the exchange was sent to the endpoint
-                if (watch != null) {
-                    long timeTaken = watch.taken();
-                    
EventHelper.notifyExchangeSent(resourceExchange.getContext(), resourceExchange, 
destination, timeTaken);
-                }
-
                 if (!isAggregateOnException() && resourceExchange.isFailed()) {
                     // copy resource exchange onto original exchange 
(preserving pattern)
                     copyResultsPreservePattern(exchange, resourceExchange);
@@ -251,132 +180,12 @@ public class Enricher extends AsyncProcessorSupport 
implements IdAware, RouteIdA
                     }
                 }
 
-                // set property with the uri of the endpoint enriched so we 
can use that for tracing etc
-                exchange.setProperty(ExchangePropertyKey.TO_ENDPOINT, 
producer.getEndpoint().getEndpointUri());
-
-                // return the producer back to the cache
-                try {
-                    producerCache.releaseProducer(endpoint, producer);
-                } catch (Exception e) {
-                    // ignore
-                }
-                // and stop prototype endpoints
-                if (prototypeEndpoint) {
-                    ServiceHelper.stopAndShutdownService(endpoint);
-                }
-
                 // and release resource exchange back in pool
                 processorExchangeFactory.release(resourceExchange);
 
-                callback.done(false);
+                callback.done(doneSync);
             }
         });
-
-        if (!sync) {
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("Processing exchangeId: {} is continued being 
processed asynchronously", exchange.getExchangeId());
-            }
-            // the remainder of the routing slip will be completed async
-            // so we break out now, then the callback will be invoked which 
then continue routing from where we left here
-            return false;
-        }
-
-        if (LOG.isTraceEnabled()) {
-            LOG.trace("Processing exchangeId: {} is continued being processed 
synchronously", exchange.getExchangeId());
-        }
-
-        if (watch != null) {
-            // emit event that the exchange was sent to the endpoint
-            long timeTaken = watch.taken();
-            EventHelper.notifyExchangeSent(resourceExchange.getContext(), 
resourceExchange, destination, timeTaken);
-        }
-
-        if (!isAggregateOnException() && resourceExchange.isFailed()) {
-            // copy resource exchange onto original exchange (preserving 
pattern)
-            copyResultsPreservePattern(exchange, resourceExchange);
-        } else {
-            prepareResult(exchange);
-
-            try {
-                // prepare the exchanges for aggregation
-                ExchangeHelper.prepareAggregation(exchange, resourceExchange);
-                MessageHelper.resetStreamCache(exchange.getIn());
-
-                Exchange aggregatedExchange = 
aggregationStrategy.aggregate(exchange, resourceExchange);
-                if (aggregatedExchange != null) {
-                    // copy aggregation result onto original exchange 
(preserving pattern)
-                    copyResultsPreservePattern(exchange, aggregatedExchange);
-                }
-            } catch (Throwable e) {
-                // if the aggregationStrategy threw an exception, set it on 
the original exchange
-                exchange.setException(new CamelExchangeException("Error 
occurred during aggregation", exchange, e));
-            }
-        }
-
-        // set property with the uri of the endpoint enriched so we can use 
that for tracing etc
-        exchange.setProperty(ExchangePropertyKey.TO_ENDPOINT, 
producer.getEndpoint().getEndpointUri());
-
-        // return the producer back to the cache
-        try {
-            producerCache.releaseProducer(endpoint, producer);
-        } catch (Exception e) {
-            // ignore
-        }
-        // and stop prototype endpoints
-        if (prototypeEndpoint) {
-            ServiceHelper.stopAndShutdownService(endpoint);
-        }
-
-        // and release resource exchange back in pool
-        processorExchangeFactory.release(resourceExchange);
-
-        callback.done(true);
-        return true;
-    }
-
-    protected static Object prepareRecipient(Exchange exchange, Object 
recipient) throws NoTypeConversionAvailableException {
-        if (recipient instanceof Endpoint || recipient instanceof 
NormalizedEndpointUri) {
-            return recipient;
-        } else if (recipient instanceof String) {
-            // trim strings as end users might have added spaces between 
separators
-            recipient = ((String) recipient).trim();
-        }
-        if (recipient != null) {
-            ExtendedCamelContext ecc = (ExtendedCamelContext) 
exchange.getContext();
-            String uri;
-            if (recipient instanceof String) {
-                uri = (String) recipient;
-            } else {
-                // convert to a string type we can work with
-                uri = ecc.getTypeConverter().mandatoryConvertTo(String.class, 
exchange, recipient);
-            }
-            // optimize and normalize endpoint
-            return ecc.normalizeUri(uri);
-        }
-        return null;
-    }
-
-    protected static Endpoint getExistingEndpoint(Exchange exchange, Object 
recipient) {
-        if (recipient instanceof Endpoint) {
-            return (Endpoint) recipient;
-        }
-        if (recipient != null) {
-            if (recipient instanceof NormalizedEndpointUri) {
-                NormalizedEndpointUri nu = (NormalizedEndpointUri) recipient;
-                ExtendedCamelContext ecc = 
exchange.getContext().adapt(ExtendedCamelContext.class);
-                return ecc.hasEndpoint(nu);
-            } else {
-                String uri = recipient.toString();
-                return exchange.getContext().hasEndpoint(uri);
-            }
-        }
-        return null;
-    }
-
-    protected static Endpoint resolveEndpoint(Exchange exchange, Object 
recipient, boolean prototype) {
-        return prototype
-                ? ExchangeHelper.resolvePrototypeEndpoint(exchange, recipient)
-                : ExchangeHelper.resolveEndpoint(exchange, recipient);
     }
 
     /**
@@ -418,6 +227,12 @@ public class Enricher extends AsyncProcessorSupport 
implements IdAware, RouteIdA
 
     @Override
     protected void doBuild() throws Exception {
+        // use send dynamic to send to endpoint
+        this.sendDynamicProcessor = new SendDynamicProcessor(null, 
getExpression());
+        this.sendDynamicProcessor.setCamelContext(camelContext);
+        this.sendDynamicProcessor.setCacheSize(cacheSize);
+        
this.sendDynamicProcessor.setIgnoreInvalidEndpoint(ignoreInvalidEndpoint);
+
         // create a per processor exchange factory
         this.processorExchangeFactory = 
getCamelContext().adapt(ExtendedCamelContext.class)
                 
.getProcessorExchangeFactory().newProcessorExchangeFactory(this);
@@ -430,27 +245,17 @@ public class Enricher extends AsyncProcessorSupport 
implements IdAware, RouteIdA
         if (aggregationStrategy instanceof CamelContextAware) {
             ((CamelContextAware) 
aggregationStrategy).setCamelContext(camelContext);
         }
-        ServiceHelper.buildService(processorExchangeFactory);
+        ServiceHelper.buildService(processorExchangeFactory, 
sendDynamicProcessor);
     }
 
     @Override
     protected void doStart() throws Exception {
-        if (producerCache == null) {
-            if (cacheSize < 0) {
-                producerCache = new EmptyProducerCache(this, camelContext);
-                LOG.debug("Enricher {} is not using ProducerCache", this);
-            } else {
-                producerCache = new DefaultProducerCache(this, camelContext, 
cacheSize);
-                LOG.debug("Enricher {} using ProducerCache with cacheSize={}", 
this, cacheSize);
-            }
-        }
-
-        ServiceHelper.startService(processorExchangeFactory, producerCache, 
aggregationStrategy);
+        ServiceHelper.startService(processorExchangeFactory, 
aggregationStrategy, sendDynamicProcessor);
     }
 
     @Override
     protected void doStop() throws Exception {
-        ServiceHelper.stopService(aggregationStrategy, producerCache, 
processorExchangeFactory);
+        ServiceHelper.stopService(aggregationStrategy, 
processorExchangeFactory, sendDynamicProcessor);
     }
 
     private static class CopyAggregationStrategy implements 
AggregationStrategy {

Reply via email to