CAMEL-9957: Use a worker pool for processing kafka callbacks to not streal its io thread to do camel routing. See the kafka javadoc/documentation.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/af2a9677 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/af2a9677 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/af2a9677 Branch: refs/heads/kube-lb Commit: af2a9677c59e6199381d0807c9ed2981d6dc3771 Parents: 1428ccf Author: Claus Ibsen <davscl...@apache.org> Authored: Fri May 13 11:20:03 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Mon May 16 09:59:33 2016 +0200 ---------------------------------------------------------------------- components/camel-kafka/src/main/docs/kafka.adoc | 27 +++++++++++- .../camel/component/kafka/KafkaComponent.java | 21 ++++++++++ .../component/kafka/KafkaConfiguration.java | 43 ++++++++++++++++++++ .../camel/component/kafka/KafkaEndpoint.java | 31 +++++++++++++- .../camel/component/kafka/KafkaProducer.java | 40 +++++++++++++++--- 5 files changed, 153 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/af2a9677/components/camel-kafka/src/main/docs/kafka.adoc ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/docs/kafka.adoc b/components/camel-kafka/src/main/docs/kafka.adoc index 7fdec49..2811a62 100644 --- a/components/camel-kafka/src/main/docs/kafka.adoc +++ b/components/camel-kafka/src/main/docs/kafka.adoc @@ -59,16 +59,33 @@ Options (Camel 2.16 or older) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + + // component options: START -The Kafka component has no options. +The Kafka component supports 1 options which are listed below. + + + +{% raw %} +[width="100%",cols="2s,1m,8",options="header"] +|======================================================================= +| Name | Java Type | Description +| workerPool | ExecutorService | To use a shared custom worker pool for continue routing Exchange after kafka server has acknowledge the message that was sent to it from KafkaProducer using asynchronous non-blocking processing. If using this option then you must handle the lifecycle of the thread pool to shut the pool down when no longer needed. +|======================================================================= +{% endraw %} // component options: END + + + + + // endpoint options: START -The Kafka component supports 71 endpoint options which are listed below: +The Kafka component supports 74 endpoint options which are listed below: {% raw %} [width="100%",cols="2s,1,1m,1m,5",options="header"] @@ -143,6 +160,9 @@ The Kafka component supports 71 endpoint options which are listed below: | sslTruststoreLocation | producer | | String | The location of the trust store file. | sslTruststorePassword | producer | | String | The password for the trust store file. | sslTruststoreType | producer | JKS | String | The file format of the trust store file. Default value is JKS. +| workerPool | producer | | ExecutorService | To use a custom worker pool for continue routing Exchange after kafka server has acknowledge the message that was sent to it from KafkaProducer using asynchronous non-blocking processing. +| workerPoolCoreSize | producer | 10 | Integer | Number of core threads for the worker pool for continue routing Exchange after kafka server has acknowledge the message that was sent to it from KafkaProducer using asynchronous non-blocking processing. +| workerPoolMaxSize | producer | 20 | Integer | Maximum number of threads for the worker pool for continue routing Exchange after kafka server has acknowledge the message that was sent to it from KafkaProducer using asynchronous non-blocking processing. | exchangePattern | advanced | InOnly | ExchangePattern | Sets the default exchange pattern when creating an exchange | synchronous | advanced | false | boolean | Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported). |======================================================================= @@ -152,6 +172,9 @@ The Kafka component supports 71 endpoint options which are listed below: + + + For more information about Producer/Consumer configuration: http://kafka.apache.org/documentation.html#newconsumerconfigs[http://kafka.apache.org/documentation.html#newconsumerconfigs] http://git-wip-us.apache.org/repos/asf/camel/blob/af2a9677/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java index 2981b3f..76ef55f 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java @@ -17,8 +17,10 @@ package org.apache.camel.component.kafka; import java.util.Map; +import java.util.concurrent.ExecutorService; import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; import org.apache.camel.impl.UriEndpointComponent; public class KafkaComponent extends UriEndpointComponent { @@ -31,6 +33,8 @@ public class KafkaComponent extends UriEndpointComponent { super(context, KafkaEndpoint.class); } + private ExecutorService workerPool; + @Override protected KafkaEndpoint createEndpoint(String uri, String remaining, Map<String, Object> params) throws Exception { KafkaEndpoint endpoint = new KafkaEndpoint(uri, this); @@ -38,8 +42,25 @@ public class KafkaComponent extends UriEndpointComponent { if (brokers != null) { endpoint.getConfiguration().setBrokers(brokers); } + + // configure component options before endpoint properties which can override from params + endpoint.getConfiguration().setWorkerPool(workerPool); + setProperties(endpoint, params); return endpoint; } + public ExecutorService getWorkerPool() { + return workerPool; + } + + /** + * To use a shared custom worker pool for continue routing {@link Exchange} after kafka server has acknowledge + * the message that was sent to it from {@link KafkaProducer} using asynchronous non-blocking processing. + * If using this option then you must handle the lifecycle of the thread pool to shut the pool down when no longer needed. + */ + public void setWorkerPool(ExecutorService workerPool) { + this.workerPool = workerPool; + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/af2a9677/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java index e0580e9..1a068c3 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java @@ -17,7 +17,9 @@ package org.apache.camel.component.kafka; import java.util.Properties; +import java.util.concurrent.ExecutorService; +import org.apache.camel.Exchange; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriParams; @@ -102,6 +104,13 @@ public class KafkaConfiguration { @UriParam(label = "producer", defaultValue = "100") private Integer retryBackoffMs = 100; + @UriParam(label = "producer") + private ExecutorService workerPool; + @UriParam(label = "producer", defaultValue = "10") + private Integer workerPoolCoreSize = 10; + @UriParam(label = "producer", defaultValue = "20") + private Integer workerPoolMaxSize = 20; + //Async producer config @UriParam(label = "producer", defaultValue = "10000") private Integer queueBufferingMaxMessages = 10000; @@ -1158,5 +1167,39 @@ public class KafkaConfiguration { this.seekToBeginning = seekToBeginning; } + public ExecutorService getWorkerPool() { + return workerPool; + } + + /** + * To use a custom worker pool for continue routing {@link Exchange} after kafka server has acknowledge + * the message that was sent to it from {@link KafkaProducer} using asynchronous non-blocking processing. + */ + public void setWorkerPool(ExecutorService workerPool) { + this.workerPool = workerPool; + } + + public Integer getWorkerPoolCoreSize() { + return workerPoolCoreSize; + } + + /** + * Number of core threads for the worker pool for continue routing {@link Exchange} after kafka server has acknowledge + * the message that was sent to it from {@link KafkaProducer} using asynchronous non-blocking processing. + */ + public void setWorkerPoolCoreSize(Integer workerPoolCoreSize) { + this.workerPoolCoreSize = workerPoolCoreSize; + } + public Integer getWorkerPoolMaxSize() { + return workerPoolMaxSize; + } + + /** + * Maximum number of threads for the worker pool for continue routing {@link Exchange} after kafka server has acknowledge + * the message that was sent to it from {@link KafkaProducer} using asynchronous non-blocking processing. + */ + public void setWorkerPoolMaxSize(Integer workerPoolMaxSize) { + this.workerPoolMaxSize = workerPoolMaxSize; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/af2a9677/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java index 1c239c8..aabd020 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java @@ -86,7 +86,13 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS } public ExecutorService createExecutor() { - return getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, "KafkaTopic[" + configuration.getTopic() + "]", configuration.getConsumerStreams()); + return getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, "KafkaConsumer[" + configuration.getTopic() + "]", configuration.getConsumerStreams()); + } + + public ExecutorService createProducerExecutor() { + int core = getConfiguration().getWorkerPoolCoreSize(); + int max = getConfiguration().getWorkerPoolMaxSize(); + return getCamelContext().getExecutorServiceManager().newThreadPool(this, "KafkaProducer[" + configuration.getTopic() + "]", core, max); } public Exchange createKafkaExchange(ConsumerRecord record) { @@ -658,4 +664,27 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS this.bridgeEndpoint = bridgeEndpoint; } + public void setWorkerPool(ExecutorService workerPool) { + configuration.setWorkerPool(workerPool); + } + + public void setWorkerPoolMaxSize(Integer workerPoolMaxSize) { + configuration.setWorkerPoolMaxSize(workerPoolMaxSize); + } + + public Integer getWorkerPoolMaxSize() { + return configuration.getWorkerPoolMaxSize(); + } + + public Integer getWorkerPoolCoreSize() { + return configuration.getWorkerPoolCoreSize(); + } + + public ExecutorService getWorkerPool() { + return configuration.getWorkerPool(); + } + + public void setWorkerPoolCoreSize(Integer workerPoolCoreSize) { + configuration.setWorkerPoolCoreSize(workerPoolCoreSize); + } } http://git-wip-us.apache.org/repos/asf/camel/blob/af2a9677/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java index 6c432d6..0f783b5 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java @@ -17,6 +17,7 @@ package org.apache.camel.component.kafka; import java.util.Properties; +import java.util.concurrent.ExecutorService; import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; @@ -33,6 +34,8 @@ public class KafkaProducer extends DefaultProducer implements AsyncProcessor { private org.apache.kafka.clients.producer.KafkaProducer kafkaProducer; private final KafkaEndpoint endpoint; + private ExecutorService workerPool; + private boolean shutdownWorkerPool; public KafkaProducer(KafkaEndpoint endpoint) { super(endpoint); @@ -58,11 +61,12 @@ public class KafkaProducer extends DefaultProducer implements AsyncProcessor { this.kafkaProducer = kafkaProducer; } - @Override - protected void doStop() throws Exception { - if (kafkaProducer != null) { - kafkaProducer.close(); - } + public ExecutorService getWorkerPool() { + return workerPool; + } + + public void setWorkerPool(ExecutorService workerPool) { + this.workerPool = workerPool; } @Override @@ -71,6 +75,23 @@ public class KafkaProducer extends DefaultProducer implements AsyncProcessor { if (kafkaProducer == null) { kafkaProducer = new org.apache.kafka.clients.producer.KafkaProducer(props); } + + // if we are in asynchronous mode we need a worker pool + if (!endpoint.isSynchronous() && workerPool == null) { + workerPool = endpoint.createProducerExecutor(); + } + } + + @Override + protected void doStop() throws Exception { + if (kafkaProducer != null) { + kafkaProducer.close(); + } + + if (shutdownWorkerPool && workerPool != null) { + endpoint.getCamelContext().getExecutorServiceManager().shutdown(workerPool); + workerPool = null; + } } @SuppressWarnings("unchecked") @@ -152,7 +173,14 @@ public class KafkaProducer extends DefaultProducer implements AsyncProcessor { // Just set the exception back exchange.setException(e); } - callback.done(false); + // use worker pool to continue routing the exchange + // as this thread is from Kafka Callback and should not be used by Camel routing + workerPool.submit(new Runnable() { + @Override + public void run() { + callback.done(false); + } + }); } } }