CAMEL-9790: When sending to kafka then Camel should catch exceptions so Camel error handler can react.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/77428ae1 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/77428ae1 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/77428ae1 Branch: refs/heads/camel-2.17.x Commit: 77428ae1ca262c7ea895ead50cd3e931d2c66286 Parents: e4dbb8e Author: Claus Ibsen <davscl...@apache.org> Authored: Sat Apr 30 09:23:29 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sat Apr 30 09:24:24 2016 +0200 ---------------------------------------------------------------------- .../org/apache/camel/component/kafka/KafkaProducer.java | 9 +++++++-- .../apache/camel/component/kafka/KafkaProducerTest.java | 12 +++++++++++- 2 files changed, 18 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/77428ae1/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java index 6f9ea79..4f6468b 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java @@ -97,8 +97,13 @@ public class KafkaProducer extends DefaultProducer { record = new ProducerRecord(topic, msg); } - // TODO: add support for async callback in the send - kafkaProducer.send(record); + // TODO: add support for async callback + // requires a thread pool for processing outgoing routing + try { + kafkaProducer.send(record).get(); + } catch (Exception e) { + throw new CamelException(e); + } } } http://git-wip-us.apache.org/repos/asf/camel/blob/77428ae1/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java index 98f6421..8e94320 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java @@ -17,6 +17,7 @@ package org.apache.camel.component.kafka; import java.util.Properties; +import java.util.concurrent.Future; import org.apache.camel.CamelException; import org.apache.camel.Exchange; @@ -24,6 +25,7 @@ import org.apache.camel.Message; import org.apache.camel.impl.DefaultMessage; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.Matchers; @@ -45,7 +47,15 @@ public class KafkaProducerTest { "kafka:broker1:1234,broker2:4567?topic=sometopic", null); endpoint.setBrokers("broker1:1234,broker2:4567"); producer = new KafkaProducer(endpoint); - producer.setKafkaProducer(Mockito.mock(org.apache.kafka.clients.producer.KafkaProducer.class)); + + + RecordMetadata rm = new RecordMetadata(null, 1, 1); + Future future = Mockito.mock(Future.class); + Mockito.when(future.get()).thenReturn(rm); + org.apache.kafka.clients.producer.KafkaProducer kp = Mockito.mock(org.apache.kafka.clients.producer.KafkaProducer.class); + Mockito.when(kp.send(Mockito.any())).thenReturn(future); + + producer.setKafkaProducer(kp); } @Test