This is an automated email from the ASF dual-hosted git repository. onders pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit a7a458826dbafe1f155f538cfcbc0957d296fad8 Author: Sezgin <onder.sez...@nokia.com> AuthorDate: Tue Jun 5 16:02:19 2018 +0300 CAMEL-6840 make it possible grouped throttling --- camel-core/src/main/docs/eips/throttle-eip.adoc | 3 +- .../apache/camel/model/AggregateDefinition.java | 2 +- .../apache/camel/model/ProcessorDefinition.java | 42 ++++++++ .../org/apache/camel/model/ThrottleDefinition.java | 37 ++++++- .../java/org/apache/camel/processor/Throttler.java | 107 +++++++++++++++++++-- .../camel/processor/ThrottlingGroupingTest.java | 76 +++++++++++++++ 6 files changed, 255 insertions(+), 12 deletions(-) diff --git a/camel-core/src/main/docs/eips/throttle-eip.adoc b/camel-core/src/main/docs/eips/throttle-eip.adoc index 71da959..7ae5472 100644 --- a/camel-core/src/main/docs/eips/throttle-eip.adoc +++ b/camel-core/src/main/docs/eips/throttle-eip.adoc @@ -6,7 +6,7 @@ The Throttler Pattern allows you to ensure that a specific endpoint does not get === Options // eip options: START -The Throttle EIP supports 5 options which are listed below: +The Throttle EIP supports 6 options which are listed below: [width="100%",cols="2,5,^1,2",options="header"] |=== @@ -16,6 +16,7 @@ The Throttle EIP supports 5 options which are listed below: | *asyncDelayed* | Enables asynchronous delay which means the thread will not block while delaying. | false | Boolean | *callerRunsWhenRejected* | Whether or not the caller should run the task when it was rejected by the thread pool. Is by default true | true | Boolean | *rejectExecution* | Whether or not throttler throws the ThrottlerRejectedExecutionException when the exchange exceeds the request limit Is by default false | false | Boolean +| *correlationExpression* | The expression used to calculate the correlation key to use for throttle grouping. The Exchange which has the same correlation key is throttled together. | | NamespaceAware Expression |=== // eip options: END diff --git a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java index 2e60ec3..1aa34c4c 100644 --- a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java @@ -142,7 +142,7 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition this(ExpressionNodeHelper.toExpressionDefinition(expression)); } - public AggregateDefinition(ExpressionDefinition correlationExpression) { + private AggregateDefinition(ExpressionDefinition correlationExpression) { setExpression(correlationExpression); ExpressionSubElementDefinition cor = new ExpressionSubElementDefinition(); diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java index 005270e..e4622e7 100644 --- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java @@ -2284,6 +2284,48 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> addOutput(answer); return answer; } + + /** + * <a href="http://camel.apache.org/throttler.html">Throttler EIP:</a> + * Creates a throttler allowing you to ensure that a specific endpoint does not get overloaded, + * or that we don't exceed an agreed SLA with some external service. + * Here another parameter correlationExpressionKey is introduced for the functionality which + * will throttle based on the key expression to group exchanges. This will make key-based throttling + * instead of overall throttling. + * <p/> + * Will default use a time period of 1 second, so setting the maximumRequestCount to eg 10 + * will default ensure at most 10 messages per second. + * + * @param maximumRequestCount an expression to calculate the maximum request count + * @param correlationExpressionKey is a correlation key that can throttle by the given key instead of overall throttling + * @return the builder + */ + public ThrottleDefinition throttle(long correlationExpressionKey, Expression maximumRequestCount) { + ThrottleDefinition answer = new ThrottleDefinition(ExpressionBuilder.constantExpression(correlationExpressionKey), maximumRequestCount); + addOutput(answer); + return answer; + } + + /** + * <a href="http://camel.apache.org/throttler.html">Throttler EIP:</a> + * Creates a throttler allowing you to ensure that a specific endpoint does not get overloaded, + * or that we don't exceed an agreed SLA with some external service. + * Here another parameter correlationExpressionKey is introduced for the functionality which + * will throttle based on the key expression to group exchanges. This will make key-based throttling + * instead of overall throttling. + * <p/> + * Will default use a time period of 1 second, so setting the maximumRequestCount to eg 10 + * will default ensure at most 10 messages per second. + * + * @param maximumRequestCount an expression to calculate the maximum request count + * @param correlationExpressionKey is a correlation key as an expression that can throttle by the given key instead of overall throttling + * @return the builder + */ + public ThrottleDefinition throttle(Expression correlationExpressionKey, Expression maximumRequestCount) { + ThrottleDefinition answer = new ThrottleDefinition(correlationExpressionKey, maximumRequestCount); + addOutput(answer); + return answer; + } /** * <a href="http://camel.apache.org/loop.html">Loop EIP:</a> diff --git a/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java index 613d2b3..7bd5213 100644 --- a/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java @@ -21,6 +21,7 @@ import java.util.concurrent.ScheduledExecutorService; import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlAttribute; +import javax.xml.bind.annotation.XmlElement; import javax.xml.bind.annotation.XmlRootElement; import javax.xml.bind.annotation.XmlTransient; @@ -55,7 +56,9 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic private Boolean callerRunsWhenRejected; @XmlAttribute private Boolean rejectExecution; - + @XmlElement(name = "correlationExpression") + private ExpressionSubElementDefinition correlationExpression; + public ThrottleDefinition() { } @@ -63,6 +66,18 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic super(maximumRequestsPerPeriod); } + public ThrottleDefinition(Expression maximumRequestsPerPeriod, Expression correlationExpression) { + this(ExpressionNodeHelper.toExpressionDefinition(maximumRequestsPerPeriod), correlationExpression); + } + + private ThrottleDefinition(ExpressionDefinition maximumRequestsPerPeriod, Expression correlationExpression) { + super(maximumRequestsPerPeriod); + + ExpressionSubElementDefinition cor = new ExpressionSubElementDefinition(); + cor.setExpressionType(ExpressionNodeHelper.toExpressionDefinition(correlationExpression)); + setCorrelationExpression(cor); + } + @Override public String toString() { return "Throttle[" + description() + " -> " + getOutputs() + "]"; @@ -93,9 +108,14 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic if (maxRequestsExpression == null) { throw new IllegalArgumentException("MaxRequestsPerPeriod expression must be provided on " + this); } + + Expression correlation = null; + if (correlationExpression != null) { + correlation = correlationExpression.createExpression(routeContext); + } boolean reject = getRejectExecution() != null && getRejectExecution(); - Throttler answer = new Throttler(routeContext.getCamelContext(), childProcessor, maxRequestsExpression, period, threadPool, shutdownThreadPool, reject); + Throttler answer = new Throttler(routeContext.getCamelContext(), childProcessor, maxRequestsExpression, period, threadPool, shutdownThreadPool, reject, correlation); answer.setAsyncDelayed(async); if (getCallerRunsWhenRejected() == null) { @@ -104,6 +124,7 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic } else { answer.setCallerRunsWhenRejected(getCallerRunsWhenRejected()); } + return answer; } @@ -256,4 +277,16 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic public void setRejectExecution(Boolean rejectExecution) { this.rejectExecution = rejectExecution; } + + /** + * The expression used to calculate the correlation key to use for throttle grouping. + * The Exchange which has the same correlation key is throttled together. + */ + public void setCorrelationExpression(ExpressionSubElementDefinition correlationExpression) { + this.correlationExpression = correlationExpression; + } + + public ExpressionSubElementDefinition getCorrelationExpression() { + return correlationExpression; + } } diff --git a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java index 543ec9a..73d53f0 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java +++ b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java @@ -16,8 +16,11 @@ */ package org.apache.camel.processor; +import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; @@ -31,7 +34,11 @@ import org.apache.camel.RuntimeExchangeException; import org.apache.camel.Traceable; import org.apache.camel.spi.IdAware; import org.apache.camel.util.AsyncProcessorHelper; +import org.apache.camel.util.CamelContextHelper; +import org.apache.camel.util.LRUCache; +import org.apache.camel.util.LRUCacheFactory; import org.apache.camel.util.ObjectHelper; +import org.apache.camel.util.ServiceHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,12 +68,14 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw private static final String PROPERTY_EXCHANGE_QUEUED_TIMESTAMP = "CamelThrottlerExchangeQueuedTimestamp"; private static final String PROPERTY_EXCHANGE_STATE = "CamelThrottlerExchangeState"; + // (throttling grouping) defaulted as 1 because there will be only one queue which is similar to implementation + // when there is no grouping for throttling + private static final Integer NO_CORRELATION_QUEUE_ID = new Integer(1); private enum State { SYNC, ASYNC, ASYNC_REJECTED } private final Logger log = LoggerFactory.getLogger(Throttler.class); private final CamelContext camelContext; - private final DelayQueue<ThrottlePermit> delayQueue = new DelayQueue<>(); private final ExecutorService asyncExecutor; private final boolean shutdownAsyncExecutor; @@ -77,9 +86,14 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw private boolean rejectExecution; private boolean asyncDelayed; private boolean callerRunsWhenRejected = true; + private Expression correlationExpression; + // below 2 fields added for (throttling grouping) + private Map<Integer, DelayQueue<ThrottlePermit>> delayQueueCache; + private ExecutorService delayQueueCacheExecutorService; + public Throttler(final CamelContext camelContext, final Processor processor, final Expression maxRequestsPerPeriodExpression, final long timePeriodMillis, - final ExecutorService asyncExecutor, final boolean shutdownAsyncExecutor, final boolean rejectExecution) { + final ExecutorService asyncExecutor, final boolean shutdownAsyncExecutor, final boolean rejectExecution, Expression correlation) { super(processor); this.camelContext = camelContext; this.rejectExecution = rejectExecution; @@ -93,6 +107,7 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw } this.timePeriodMillis = timePeriodMillis; this.asyncExecutor = asyncExecutor; + this.correlationExpression = correlation; } @Override @@ -111,7 +126,8 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw throw new RejectedExecutionException("Run is not allowed"); } - calculateAndSetMaxRequestsPerPeriod(exchange); + calculateAndSetMaxRequestsPerPeriod(exchange, doneSync); + DelayQueue<ThrottlePermit> delayQueue = locateDelayQueue(exchange, doneSync); ThrottlePermit permit = delayQueue.poll(); if (permit == null) { @@ -135,7 +151,7 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw if (log.isTraceEnabled()) { elapsed = System.currentTimeMillis() - start; } - enqueuePermit(permit, exchange); + enqueuePermit(permit, exchange, doneSync); if (state == State.ASYNC) { if (log.isTraceEnabled()) { @@ -147,7 +163,7 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw } } } else { - enqueuePermit(permit, exchange); + enqueuePermit(permit, exchange, doneSync); if (state == State.ASYNC) { if (log.isTraceEnabled()) { @@ -192,6 +208,34 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw } } + private DelayQueue<ThrottlePermit> locateDelayQueue(final Exchange exchange, final boolean doneSync) throws InterruptedException, ExecutionException { + Integer key; + CompletableFuture<DelayQueue<ThrottlePermit>> futureDelayQueue = new CompletableFuture<>(); + + if (correlationExpression != null) { + key = correlationExpression.evaluate(exchange, Integer.class); + } else { + key = NO_CORRELATION_QUEUE_ID; + } + + if (!doneSync) { + delayQueueCacheExecutorService.submit(() -> { + futureDelayQueue.complete(findDelayQueue(key)); + }); + } + + return (!doneSync) ? futureDelayQueue.get() : findDelayQueue(key); + } + + private DelayQueue<ThrottlePermit> findDelayQueue(Integer key) { + DelayQueue<ThrottlePermit> currentDelayQueue = delayQueueCache.get(key); + if (currentDelayQueue == null) { + currentDelayQueue = new DelayQueue<>(); + delayQueueCache.put(key, currentDelayQueue); + } + return currentDelayQueue; + } + /** * Delegate blocking on the DelayQueue to an asyncExecutor. Except if the executor rejects the submission * and isCallerRunsWhenRejected() is enabled, then this method will delegate back to process(), but not @@ -222,10 +266,12 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw /** * Returns a permit to the DelayQueue, first resetting it's delay to be relative to now. + * @throws ExecutionException + * @throws InterruptedException */ - protected void enqueuePermit(final ThrottlePermit permit, final Exchange exchange) { + protected void enqueuePermit(final ThrottlePermit permit, final Exchange exchange, final boolean doneSync) throws InterruptedException, ExecutionException { permit.setDelayMs(getTimePeriodMillis()); - delayQueue.put(permit); + locateDelayQueue(exchange, doneSync).put(permit); // try and incur the least amount of overhead while releasing permits back to the queue if (log.isTraceEnabled()) { log.trace("Permit released, for exchangeId: {}", exchange.getExchangeId()); @@ -235,7 +281,7 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw /** * Evaluates the maxRequestsPerPeriodExpression and adjusts the throttle rate up or down. */ - protected void calculateAndSetMaxRequestsPerPeriod(final Exchange exchange) throws Exception { + protected void calculateAndSetMaxRequestsPerPeriod(final Exchange exchange, final boolean doneSync) throws Exception { Integer newThrottle = maxRequestsPerPeriodExpression.evaluate(exchange, Integer.class); if (newThrottle != null && newThrottle < 0) { @@ -249,6 +295,8 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw if (newThrottle != null) { if (newThrottle != throttleRate) { + // get the queue from the cache + DelayQueue<ThrottlePermit> delayQueue = locateDelayQueue(exchange, doneSync); // decrease if (throttleRate > newThrottle) { int delta = throttleRate - newThrottle; @@ -279,19 +327,62 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw } } + @SuppressWarnings("unchecked") @Override protected void doStart() throws Exception { if (isAsyncDelayed()) { ObjectHelper.notNull(asyncExecutor, "executorService", this); } + if (camelContext != null) { + int maxSize = CamelContextHelper.getMaximumSimpleCacheSize(camelContext); + if (maxSize > 0) { + delayQueueCache = LRUCacheFactory.newLRUCache(16, maxSize, false); + log.debug("DelayQueues cache size: {}", maxSize); + } else { + delayQueueCache = LRUCacheFactory.newLRUCache(100); + log.debug("Defaulting DelayQueues cache size: {}", 100); + } + } + if (delayQueueCache != null) { + ServiceHelper.startService(delayQueueCache); + } + if (delayQueueCacheExecutorService == null) { + String name = getClass().getSimpleName() + "-DelayQueueLocatorTask"; + delayQueueCacheExecutorService = createDelayQueueCacheExecutorService(name); + } super.doStart(); } + + /** + * Strategy to create the thread pool for locating right DelayQueue from the case as a background task + * + * @param name the suggested name for the background thread + * @return the thread pool + */ + protected synchronized ExecutorService createDelayQueueCacheExecutorService(String name) { + // use a cached thread pool so we each on-the-fly task has a dedicated thread to process completions as they come in + return camelContext.getExecutorServiceManager().newCachedThreadPool(this, name); + } + @SuppressWarnings("rawtypes") @Override protected void doShutdown() throws Exception { if (shutdownAsyncExecutor && asyncExecutor != null) { camelContext.getExecutorServiceManager().shutdownNow(asyncExecutor); } + if (delayQueueCacheExecutorService != null) { + camelContext.getExecutorServiceManager().shutdownNow(delayQueueCacheExecutorService); + } + if (delayQueueCache != null) { + ServiceHelper.stopService(delayQueueCache); + if (log.isDebugEnabled()) { + if (delayQueueCache instanceof LRUCache) { + log.debug("Clearing deleay queues cache[size={}, hits={}, misses={}, evicted={}]", + delayQueueCache.size(), ((LRUCache) delayQueueCache).getHits(), ((LRUCache) delayQueueCache).getMisses(), ((LRUCache) delayQueueCache).getEvicted()); + } + } + delayQueueCache.clear(); + } super.doShutdown(); } diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java new file mode 100644 index 0000000..01cd378 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; + +/** + * @version + */ +public class ThrottlingGroupingTest extends ContextTestSupport { + + public void testGroupingWithSingleConstant() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Bye World"); + getMockEndpoint("mock:dead").expectedBodiesReceived("Kaboom"); + + template.sendBodyAndHeader("seda:a", "Kaboom", "max", null); + template.sendBodyAndHeader("seda:a", "Hello World", "max", 2); + template.sendBodyAndHeader("seda:a", "Bye World", "max", 2); + + assertMockEndpointsSatisfied(); + } + + public void testGroupingWithDynamicHeaderExpression() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Bye World"); + getMockEndpoint("mock:dead").expectedBodiesReceived("Kaboom"); + getMockEndpoint("mock:resultdynamic").expectedBodiesReceived("Hello Dynamic World", "Bye Dynamic World"); + + Map<String, Object> headers = new HashMap<String, Object>(); + headers.put("max", null); + + template.sendBodyAndHeaders("seda:a", "Kaboom", headers); + + headers.put("max", "2"); + template.sendBodyAndHeaders("seda:a", "Hello World", headers); + template.sendBodyAndHeaders("seda:b", "Bye World", headers); + + headers.put("key", "1"); + template.sendBodyAndHeaders("seda:c", "Hello Dynamic World", headers); + headers.put("key", "2"); + template.sendBodyAndHeaders("seda:c", "Bye Dynamic World", headers); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + errorHandler(deadLetterChannel("mock:dead")); + + from("seda:a").throttle(1, header("max")).to("mock:result"); + from("seda:b").throttle(2, header("max")).to("mock:result"); + from("seda:c").throttle(header("key"), header("max")).to("mock:resultdynamic"); + } + }; + } +} -- To stop receiving notification emails like this one, please contact ond...@apache.org.