This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 98fceb97004c115dcfbccee339c2c4742f3d1dd6 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Sat Apr 10 10:53:08 2021 +0200 CAMEL-16458: Enricher EIP - Use SendDynamicProcessor to call endpoint instead of own code. --- .../java/org/apache/camel/processor/Enricher.java | 223 ++------------------- 1 file changed, 14 insertions(+), 209 deletions(-) diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java index 847fe72..c465c0c 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java @@ -18,35 +18,24 @@ package org.apache.camel.processor; import org.apache.camel.AggregationStrategy; import org.apache.camel.AsyncCallback; -import org.apache.camel.AsyncProcessor; -import org.apache.camel.AsyncProducer; import org.apache.camel.CamelContext; import org.apache.camel.CamelContextAware; import org.apache.camel.CamelExchangeException; -import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.ExchangePropertyKey; import org.apache.camel.Expression; import org.apache.camel.ExtendedCamelContext; import org.apache.camel.ExtendedExchange; -import org.apache.camel.NoTypeConversionAvailableException; import org.apache.camel.spi.EndpointUtilizationStatistics; import org.apache.camel.spi.IdAware; -import org.apache.camel.spi.NormalizedEndpointUri; import org.apache.camel.spi.ProcessorExchangeFactory; -import org.apache.camel.spi.ProducerCache; import org.apache.camel.spi.RouteIdAware; -import org.apache.camel.support.AsyncProcessorConverterHelper; import org.apache.camel.support.AsyncProcessorSupport; import org.apache.camel.support.DefaultExchange; -import org.apache.camel.support.EventHelper; import org.apache.camel.support.ExchangeHelper; import org.apache.camel.support.MessageHelper; -import org.apache.camel.support.cache.DefaultProducerCache; -import org.apache.camel.support.cache.EmptyProducerCache; import org.apache.camel.support.service.ServiceHelper; -import org.apache.camel.util.StopWatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,7 +58,6 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA private CamelContext camelContext; private String id; private String routeId; - private ProducerCache producerCache; private final Expression expression; private AggregationStrategy aggregationStrategy; private boolean aggregateOnException; @@ -77,6 +65,7 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA private int cacheSize; private boolean ignoreInvalidEndpoint; private ProcessorExchangeFactory processorExchangeFactory; + private SendDynamicProcessor sendDynamicProcessor; public Enricher(Expression expression) { this.expression = expression; @@ -117,7 +106,7 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA } public EndpointUtilizationStatistics getEndpointUtilizationStatistics() { - return producerCache.getEndpointUtilizationStatistics(); + return sendDynamicProcessor.getEndpointUtilizationStatistics(); } public void setAggregationStrategy(AggregationStrategy aggregationStrategy) { @@ -160,72 +149,12 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA this.ignoreInvalidEndpoint = ignoreInvalidEndpoint; } - /** - * Enriches the input data (<code>exchange</code>) by first obtaining additional data from an endpoint represented - * by an endpoint <code>producer</code> and second by aggregating input data and additional data. Aggregation of - * input data and additional data is delegated to an {@link AggregationStrategy} object set at construction time. If - * the message exchange with the resource endpoint fails then no aggregation will be done and the failed exchange - * content is copied over to the original message exchange. - * - * @param exchange input data. - */ @Override public boolean process(final Exchange exchange, final AsyncCallback callback) { - // which producer to use - final AsyncProducer producer; - final Endpoint endpoint; - - // use dynamic endpoint so calculate the endpoint to use - Object recipient = null; - boolean prototype = cacheSize < 0; - try { - recipient = expression.evaluate(exchange, Object.class); - recipient = prepareRecipient(exchange, recipient); - Endpoint existing = getExistingEndpoint(exchange, recipient); - if (existing == null) { - endpoint = resolveEndpoint(exchange, recipient, prototype); - } else { - endpoint = existing; - // we have an existing endpoint then its not a prototype scope - prototype = false; - } - // acquire the producer from the cache - producer = producerCache.acquireProducer(endpoint); - } catch (Throwable e) { - if (isIgnoreInvalidEndpoint()) { - LOG.debug("Endpoint uri is invalid: {}. This exception will be ignored.", recipient, e); - } else { - exchange.setException(e); - } - callback.done(true); - return true; - } - final Exchange resourceExchange = createResourceExchange(exchange, ExchangePattern.InOut); - final Endpoint destination = producer.getEndpoint(); - - StopWatch sw = null; - boolean sending = EventHelper.notifyExchangeSending(exchange.getContext(), resourceExchange, destination); - if (sending) { - sw = new StopWatch(); - } - // record timing for sending the exchange using the producer - final StopWatch watch = sw; - final boolean prototypeEndpoint = prototype; - AsyncProcessor ap = AsyncProcessorConverterHelper.convert(producer); - boolean sync = ap.process(resourceExchange, new AsyncCallback() { + return sendDynamicProcessor.process(resourceExchange, new AsyncCallback() { + @Override public void done(boolean doneSync) { - // we only have to handle async completion - if (doneSync) { - return; - } - - // emit event that the exchange was sent to the endpoint - if (watch != null) { - long timeTaken = watch.taken(); - EventHelper.notifyExchangeSent(resourceExchange.getContext(), resourceExchange, destination, timeTaken); - } - if (!isAggregateOnException() && resourceExchange.isFailed()) { // copy resource exchange onto original exchange (preserving pattern) copyResultsPreservePattern(exchange, resourceExchange); @@ -251,132 +180,12 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA } } - // set property with the uri of the endpoint enriched so we can use that for tracing etc - exchange.setProperty(ExchangePropertyKey.TO_ENDPOINT, producer.getEndpoint().getEndpointUri()); - - // return the producer back to the cache - try { - producerCache.releaseProducer(endpoint, producer); - } catch (Exception e) { - // ignore - } - // and stop prototype endpoints - if (prototypeEndpoint) { - ServiceHelper.stopAndShutdownService(endpoint); - } - // and release resource exchange back in pool processorExchangeFactory.release(resourceExchange); - callback.done(false); + callback.done(doneSync); } }); - - if (!sync) { - if (LOG.isTraceEnabled()) { - LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId()); - } - // the remainder of the routing slip will be completed async - // so we break out now, then the callback will be invoked which then continue routing from where we left here - return false; - } - - if (LOG.isTraceEnabled()) { - LOG.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId()); - } - - if (watch != null) { - // emit event that the exchange was sent to the endpoint - long timeTaken = watch.taken(); - EventHelper.notifyExchangeSent(resourceExchange.getContext(), resourceExchange, destination, timeTaken); - } - - if (!isAggregateOnException() && resourceExchange.isFailed()) { - // copy resource exchange onto original exchange (preserving pattern) - copyResultsPreservePattern(exchange, resourceExchange); - } else { - prepareResult(exchange); - - try { - // prepare the exchanges for aggregation - ExchangeHelper.prepareAggregation(exchange, resourceExchange); - MessageHelper.resetStreamCache(exchange.getIn()); - - Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange); - if (aggregatedExchange != null) { - // copy aggregation result onto original exchange (preserving pattern) - copyResultsPreservePattern(exchange, aggregatedExchange); - } - } catch (Throwable e) { - // if the aggregationStrategy threw an exception, set it on the original exchange - exchange.setException(new CamelExchangeException("Error occurred during aggregation", exchange, e)); - } - } - - // set property with the uri of the endpoint enriched so we can use that for tracing etc - exchange.setProperty(ExchangePropertyKey.TO_ENDPOINT, producer.getEndpoint().getEndpointUri()); - - // return the producer back to the cache - try { - producerCache.releaseProducer(endpoint, producer); - } catch (Exception e) { - // ignore - } - // and stop prototype endpoints - if (prototypeEndpoint) { - ServiceHelper.stopAndShutdownService(endpoint); - } - - // and release resource exchange back in pool - processorExchangeFactory.release(resourceExchange); - - callback.done(true); - return true; - } - - protected static Object prepareRecipient(Exchange exchange, Object recipient) throws NoTypeConversionAvailableException { - if (recipient instanceof Endpoint || recipient instanceof NormalizedEndpointUri) { - return recipient; - } else if (recipient instanceof String) { - // trim strings as end users might have added spaces between separators - recipient = ((String) recipient).trim(); - } - if (recipient != null) { - ExtendedCamelContext ecc = (ExtendedCamelContext) exchange.getContext(); - String uri; - if (recipient instanceof String) { - uri = (String) recipient; - } else { - // convert to a string type we can work with - uri = ecc.getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient); - } - // optimize and normalize endpoint - return ecc.normalizeUri(uri); - } - return null; - } - - protected static Endpoint getExistingEndpoint(Exchange exchange, Object recipient) { - if (recipient instanceof Endpoint) { - return (Endpoint) recipient; - } - if (recipient != null) { - if (recipient instanceof NormalizedEndpointUri) { - NormalizedEndpointUri nu = (NormalizedEndpointUri) recipient; - ExtendedCamelContext ecc = exchange.getContext().adapt(ExtendedCamelContext.class); - return ecc.hasEndpoint(nu); - } else { - String uri = recipient.toString(); - return exchange.getContext().hasEndpoint(uri); - } - } - return null; - } - - protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient, boolean prototype) { - return prototype - ? ExchangeHelper.resolvePrototypeEndpoint(exchange, recipient) - : ExchangeHelper.resolveEndpoint(exchange, recipient); } /** @@ -418,6 +227,12 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA @Override protected void doBuild() throws Exception { + // use send dynamic to send to endpoint + this.sendDynamicProcessor = new SendDynamicProcessor(null, getExpression()); + this.sendDynamicProcessor.setCamelContext(camelContext); + this.sendDynamicProcessor.setCacheSize(cacheSize); + this.sendDynamicProcessor.setIgnoreInvalidEndpoint(ignoreInvalidEndpoint); + // create a per processor exchange factory this.processorExchangeFactory = getCamelContext().adapt(ExtendedCamelContext.class) .getProcessorExchangeFactory().newProcessorExchangeFactory(this); @@ -430,27 +245,17 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA if (aggregationStrategy instanceof CamelContextAware) { ((CamelContextAware) aggregationStrategy).setCamelContext(camelContext); } - ServiceHelper.buildService(processorExchangeFactory); + ServiceHelper.buildService(processorExchangeFactory, sendDynamicProcessor); } @Override protected void doStart() throws Exception { - if (producerCache == null) { - if (cacheSize < 0) { - producerCache = new EmptyProducerCache(this, camelContext); - LOG.debug("Enricher {} is not using ProducerCache", this); - } else { - producerCache = new DefaultProducerCache(this, camelContext, cacheSize); - LOG.debug("Enricher {} using ProducerCache with cacheSize={}", this, cacheSize); - } - } - - ServiceHelper.startService(processorExchangeFactory, producerCache, aggregationStrategy); + ServiceHelper.startService(processorExchangeFactory, aggregationStrategy, sendDynamicProcessor); } @Override protected void doStop() throws Exception { - ServiceHelper.stopService(aggregationStrategy, producerCache, processorExchangeFactory); + ServiceHelper.stopService(aggregationStrategy, processorExchangeFactory, sendDynamicProcessor); } private static class CopyAggregationStrategy implements AggregationStrategy {