CAMEL-9957: Use a worker pool for processing kafka callbacks to not streal its io thread to do camel routing. See the kafka javadoc/documentation.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d9f7fdab Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d9f7fdab Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d9f7fdab Branch: refs/heads/master Commit: d9f7fdabfe4491b71862687c0684dfdfbf936da1 Parents: cd2f4af Author: Claus Ibsen <davscl...@apache.org> Authored: Sun May 15 10:21:38 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sun May 15 10:21:38 2016 +0200 ---------------------------------------------------------------------- .../camel/component/kafka/KafkaComponent.java | 4 +- .../camel/component/kafka/KafkaProducer.java | 39 +++++++++----------- .../component/kafka/KafkaProducerTest.java | 1 - 3 files changed, 19 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/d9f7fdab/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java index 76ef55f..2e11b22 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java @@ -25,6 +25,8 @@ import org.apache.camel.impl.UriEndpointComponent; public class KafkaComponent extends UriEndpointComponent { + private ExecutorService workerPool; + public KafkaComponent() { super(KafkaEndpoint.class); } @@ -33,8 +35,6 @@ public class KafkaComponent extends UriEndpointComponent { super(context, KafkaEndpoint.class); } - private ExecutorService workerPool; - @Override protected KafkaEndpoint createEndpoint(String uri, String remaining, Map<String, Object> params) throws Exception { KafkaEndpoint endpoint = new KafkaEndpoint(uri, this); http://git-wip-us.apache.org/repos/asf/camel/blob/d9f7fdab/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java index 0f783b5..2138df1 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java @@ -20,17 +20,16 @@ import java.util.Properties; import java.util.concurrent.ExecutorService; import org.apache.camel.AsyncCallback; -import org.apache.camel.AsyncProcessor; import org.apache.camel.CamelException; import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; -import org.apache.camel.impl.DefaultProducer; +import org.apache.camel.impl.DefaultAsyncProducer; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; -public class KafkaProducer extends DefaultProducer implements AsyncProcessor { +public class KafkaProducer extends DefaultAsyncProducer { private org.apache.kafka.clients.producer.KafkaProducer kafkaProducer; private final KafkaEndpoint endpoint; @@ -79,6 +78,8 @@ public class KafkaProducer extends DefaultProducer implements AsyncProcessor { // if we are in asynchronous mode we need a worker pool if (!endpoint.isSynchronous() && workerPool == null) { workerPool = endpoint.createProducerExecutor(); + // we create a thread pool so we should also shut it down + shutdownWorkerPool = true; } } @@ -127,34 +128,28 @@ public class KafkaProducer extends DefaultProducer implements AsyncProcessor { @SuppressWarnings("unchecked") public void process(Exchange exchange) throws Exception { ProducerRecord record = createRecorder(exchange); - // Just send out the record in the sync way kafkaProducer.send(record).get(); } @Override + @SuppressWarnings("unchecked") public boolean process(Exchange exchange, AsyncCallback callback) { - // force processing synchronously using different api - if (endpoint.isSynchronous()) { - try { + try { + if (endpoint.isSynchronous()) { + // force process using synchronous call on kafka process(exchange); - } catch (Throwable e) { - exchange.setException(e); + } else { + ProducerRecord record = createRecorder(exchange); + kafkaProducer.send(record, new KafkaProducerCallBack(exchange, callback)); + // return false to process asynchronous + return false; } - callback.done(true); - return true; - } - - try { - ProducerRecord record = createRecorder(exchange); - kafkaProducer.send(record, new KafkaProducerCallBack(exchange, callback)); - // Finishing the processing in an async way - return false; } catch (Exception ex) { - // Just set the exception back to the client exchange.setException(ex); - callback.done(true); - return true; } + + callback.done(true); + return true; } private final class KafkaProducerCallBack implements Callback { @@ -170,7 +165,6 @@ public class KafkaProducer extends DefaultProducer implements AsyncProcessor { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e != null) { - // Just set the exception back exchange.setException(e); } // use worker pool to continue routing the exchange @@ -183,4 +177,5 @@ public class KafkaProducer extends DefaultProducer implements AsyncProcessor { }); } } + } http://git-wip-us.apache.org/repos/asf/camel/blob/d9f7fdab/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java index 40f2113..dcd3365 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java @@ -52,7 +52,6 @@ public class KafkaProducerTest { endpoint.setBrokers("broker1:1234,broker2:4567"); producer = new KafkaProducer(endpoint); - RecordMetadata rm = new RecordMetadata(null, 1, 1); Future future = Mockito.mock(Future.class); Mockito.when(future.get()).thenReturn(rm);