Repository: camel Updated Branches: refs/heads/master cc771eabe -> 1af2133f6
CAMEL-8790: Kafka producer hard coded to use Strings. Thanks to Mark Mindenhall for the patch. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1af2133f Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1af2133f Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1af2133f Branch: refs/heads/master Commit: 1af2133f68498bd91361a2ca2237fb2426233721 Parents: cc771ea Author: Claus Ibsen <davscl...@apache.org> Authored: Thu Jul 16 17:50:21 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Thu Jul 16 17:53:02 2015 +0200 ---------------------------------------------------------------------- .../camel/component/kafka/KafkaConstants.java | 3 + .../camel/component/kafka/KafkaEndpoint.java | 16 ++- .../camel/component/kafka/KafkaProducer.java | 26 ++-- .../component/kafka/KafkaProducerFullTest.java | 135 +++++++++++++------ 4 files changed, 129 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/1af2133f/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java index 18f1da0..6c31b65 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java @@ -28,6 +28,9 @@ public final class KafkaConstants { public static final String KEY = "kafka.CONTENT_TYPE"; public static final String TOPIC = "kafka.TOPIC"; + public static final String KAFKA_DEFAULT_ENCODER = "kafka.serializer.DefaultEncoder"; + public static final String KAFKA_STRING_ENCODER = "kafka.serializer.StringEncoder"; + private KafkaConstants() { // Utility class } http://git-wip-us.apache.org/repos/asf/camel/blob/1af2133f/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java index 7cbd524..7382bec 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java @@ -71,7 +71,18 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS @Override public Producer createProducer() throws Exception { - return new KafkaProducer(this); + String msgClassName = getConfiguration().getSerializerClass(); + String keyClassName = getConfiguration().getKeySerializerClass(); + if (msgClassName == null) { + msgClassName = KafkaConstants.KAFKA_DEFAULT_ENCODER; + } + if (keyClassName == null) { + keyClassName = msgClassName; + } + + Class k = getCamelContext().getClassResolver().resolveMandatoryClass(keyClassName); + Class v = getCamelContext().getClassResolver().resolveMandatoryClass(msgClassName); + return createProducer(k, v, this); } @Override @@ -98,6 +109,9 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS return exchange; } + protected <K, V> KafkaProducer<K, V> createProducer(Class<K> keyClass, Class<V> valueClass, KafkaEndpoint endpoint) { + return new KafkaProducer<K, V>(endpoint); + } // Delegated properties from the configuration //------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/1af2133f/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java index 80bf94e..3bc8e78 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java @@ -29,9 +29,9 @@ import org.apache.camel.impl.DefaultProducer; /** * */ -public class KafkaProducer extends DefaultProducer { +public class KafkaProducer<K, V> extends DefaultProducer { - protected Producer<String, String> producer; + protected Producer<K, V> producer; private final KafkaEndpoint endpoint; public KafkaProducer(KafkaEndpoint endpoint) { @@ -58,10 +58,11 @@ public class KafkaProducer extends DefaultProducer { protected void doStart() throws Exception { Properties props = getProps(); ProducerConfig config = new ProducerConfig(props); - producer = new Producer<String, String>(config); + producer = new Producer<K, V>(config); } @Override + @SuppressWarnings("unchecked") public void process(Exchange exchange) throws CamelException { String topic = endpoint.getTopic(); if (!endpoint.isBridgeEndpoint()) { @@ -70,21 +71,24 @@ public class KafkaProducer extends DefaultProducer { if (topic == null) { throw new CamelExchangeException("No topic key set", exchange); } - String partitionKey = exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY, String.class); + K partitionKey = (K) exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY); boolean hasPartitionKey = partitionKey != null; - String messageKey = exchange.getIn().getHeader(KafkaConstants.KEY, String.class); + + K messageKey = (K) exchange.getIn().getHeader(KafkaConstants.KEY); boolean hasMessageKey = messageKey != null; - String msg = exchange.getIn().getBody(String.class); - KeyedMessage<String, String> data; + + V msg = (V) exchange.getIn().getBody(); + KeyedMessage<K, V> data; + if (hasPartitionKey && hasMessageKey) { - data = new KeyedMessage<String, String>(topic, messageKey, partitionKey, msg); + data = new KeyedMessage<K, V>(topic, messageKey, partitionKey, msg); } else if (hasPartitionKey) { - data = new KeyedMessage<String, String>(topic, partitionKey, msg); + data = new KeyedMessage<K, V>(topic, partitionKey, msg); } else if (hasMessageKey) { - data = new KeyedMessage<String, String>(topic, messageKey, msg); + data = new KeyedMessage<K, V>(topic, messageKey, msg); } else { log.warn("No message key or partition key set"); - data = new KeyedMessage<String, String>(topic, messageKey, partitionKey, msg); + data = new KeyedMessage<K, V>(topic, messageKey, partitionKey, msg); } producer.send(data); } http://git-wip-us.apache.org/repos/asf/camel/blob/1af2133f/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java index 56e0eb2..d76a059 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java @@ -36,97 +36,154 @@ import org.apache.camel.Produce; import org.apache.camel.ProducerTemplate; import org.apache.camel.builder.RouteBuilder; import org.junit.After; +import org.junit.AfterClass; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest { - public static final String TOPIC = "test"; - public static final String TOPIC_IN_HEADER = "testHeader"; + private static final String TOPIC_STRINGS = "test"; + private static final String TOPIC_STRINGS_IN_HEADER = "testHeader"; + private static final String TOPIC_BYTES = "testBytes"; + private static final String TOPIC_BYTES_IN_HEADER = "testBytesHeader"; + private static final String GROUP_STRINGS = "groupStrings"; + private static final String GROUP_BYTES = "groupStrings"; private static final Logger LOG = LoggerFactory.getLogger(KafkaProducerFullTest.class); - @EndpointInject(uri = "kafka:localhost:{{karfkaPort}}?topic=" + TOPIC + private static ConsumerConnector stringsConsumerConn; + private static ConsumerConnector bytesConsumerConn; + + @EndpointInject(uri = "kafka:localhost:{{karfkaPort}}?topic=" + TOPIC_STRINGS + "&partitioner=org.apache.camel.component.kafka.SimplePartitioner&serializerClass=kafka.serializer.StringEncoder" + "&requestRequiredAcks=-1") - private Endpoint to; + private Endpoint toStrings; - @Produce(uri = "direct:start") - private ProducerTemplate template; + @EndpointInject(uri = "kafka:localhost:{{karfkaPort}}?topic=" + TOPIC_BYTES + "&requestRequiredAcks=-1") + private Endpoint toBytes; - private ConsumerConnector kafkaConsumer; + @Produce(uri = "direct:startStrings") + private ProducerTemplate stringsTemplate; - @Before - public void before() { - Properties props = new Properties(); - - props.put("zookeeper.connect", "localhost:" + getZookeeperPort()); - props.put("group.id", KafkaConstants.DEFAULT_GROUP); - props.put("zookeeper.session.timeout.ms", "6000"); - props.put("zookeeper.connectiontimeout.ms", "12000"); - props.put("zookeeper.sync.time.ms", "200"); - props.put("auto.commit.interval.ms", "1000"); - props.put("auto.offset.reset", "smallest"); + @Produce(uri = "direct:startBytes") + private ProducerTemplate bytesTemplate; + + + @BeforeClass + public static void before() { + Properties stringsProps = new Properties(); - kafkaConsumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(props)); + stringsProps.put("zookeeper.connect", "localhost:" + getZookeeperPort()); + stringsProps.put("group.id", GROUP_STRINGS); + stringsProps.put("zookeeper.session.timeout.ms", "6000"); + stringsProps.put("zookeeper.connectiontimeout.ms", "12000"); + stringsProps.put("zookeeper.sync.time.ms", "200"); + stringsProps.put("auto.commit.interval.ms", "1000"); + stringsProps.put("auto.offset.reset", "smallest"); + stringsConsumerConn = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(stringsProps)); + + Properties bytesProps = new Properties(); + bytesProps.putAll(stringsProps); + bytesProps.put("group.id", GROUP_BYTES); + bytesConsumerConn = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(bytesProps)); } - @After - public void after() { - kafkaConsumer.shutdown(); + @AfterClass + public static void after() { + stringsConsumerConn.shutdown(); + bytesConsumerConn.shutdown(); } @Override - protected RouteBuilder createRouteBuilder() throws Exception { - return new RouteBuilder() { - @Override - public void configure() throws Exception { - from("direct:start").to(to); + protected RouteBuilder[] createRouteBuilders() throws Exception { + return new RouteBuilder[] { + new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:startStrings").to(toStrings); + } + }, + new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:startBytes").to(toBytes); + } } }; } @Test - public void producedMessageIsReceivedByKafka() throws InterruptedException, IOException { + public void producedStringMessageIsReceivedByKafka() throws InterruptedException, IOException { + int messageInTopic = 10; + int messageInOtherTopic = 5; + + CountDownLatch messagesLatch = new CountDownLatch(messageInTopic + messageInOtherTopic); + + Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); + topicCountMap.put(TOPIC_STRINGS, 5); + topicCountMap.put(TOPIC_STRINGS_IN_HEADER, 5); + createKafkaMessageConsumer(stringsConsumerConn, TOPIC_STRINGS, TOPIC_STRINGS_IN_HEADER, messagesLatch, topicCountMap); + + sendMessagesInRoute(messageInTopic, stringsTemplate, "IT test message", KafkaConstants.PARTITION_KEY, "1"); + sendMessagesInRoute(messageInOtherTopic, stringsTemplate, "IT test message in other topic", KafkaConstants.PARTITION_KEY, "1", KafkaConstants.TOPIC, TOPIC_STRINGS_IN_HEADER); + + boolean allMessagesReceived = messagesLatch.await(200, TimeUnit.MILLISECONDS); + + assertTrue("Not all messages were published to the kafka topics. Not received: " + messagesLatch.getCount(), allMessagesReceived); + } + + @Test + public void producedBytesMessageIsReceivedByKafka() throws InterruptedException, IOException { int messageInTopic = 10; int messageInOtherTopic = 5; CountDownLatch messagesLatch = new CountDownLatch(messageInTopic + messageInOtherTopic); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); - topicCountMap.put(TOPIC, 5); - topicCountMap.put(TOPIC_IN_HEADER, 5); - createKafkaMessageConsumer(messagesLatch, topicCountMap); + topicCountMap.put(TOPIC_BYTES, 5); + topicCountMap.put(TOPIC_BYTES_IN_HEADER, 5); + createKafkaMessageConsumer(bytesConsumerConn, TOPIC_BYTES, TOPIC_BYTES_IN_HEADER, messagesLatch, topicCountMap); - sendMessagesInRoute(messageInTopic, "IT test message", KafkaConstants.PARTITION_KEY, "1"); - sendMessagesInRoute(messageInOtherTopic, "IT test message in other topic", KafkaConstants.PARTITION_KEY, "1", KafkaConstants.TOPIC, TOPIC_IN_HEADER); + Map<String, Object> inTopicHeaders = new HashMap<String, Object>(); + inTopicHeaders.put(KafkaConstants.PARTITION_KEY, "1".getBytes()); + sendMessagesInRoute(messageInTopic, bytesTemplate, "IT test message".getBytes(), inTopicHeaders); + + Map<String, Object> otherTopicHeaders = new HashMap<String, Object>(); + otherTopicHeaders.put(KafkaConstants.PARTITION_KEY, "1".getBytes()); + otherTopicHeaders.put(KafkaConstants.TOPIC, TOPIC_BYTES_IN_HEADER); + sendMessagesInRoute(messageInOtherTopic, bytesTemplate, "IT test message in other topic".getBytes(), otherTopicHeaders); boolean allMessagesReceived = messagesLatch.await(200, TimeUnit.MILLISECONDS); assertTrue("Not all messages were published to the kafka topics. Not received: " + messagesLatch.getCount(), allMessagesReceived); } - private void createKafkaMessageConsumer(CountDownLatch messagesLatch, Map<String, Integer> topicCountMap) { - Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = kafkaConsumer.createMessageStreams(topicCountMap); + private void createKafkaMessageConsumer(ConsumerConnector consumerConn, String topic, String topicInHeader, + CountDownLatch messagesLatch, Map<String, Integer> topicCountMap) { + Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConn.createMessageStreams(topicCountMap); ExecutorService executor = Executors.newFixedThreadPool(10); - for (final KafkaStream<byte[], byte[]> stream : consumerMap.get(TOPIC)) { + for (final KafkaStream<byte[], byte[]> stream : consumerMap.get(topic)) { executor.submit(new KakfaTopicConsumer(stream, messagesLatch)); } - for (final KafkaStream<byte[], byte[]> stream : consumerMap.get(TOPIC_IN_HEADER)) { + for (final KafkaStream<byte[], byte[]> stream : consumerMap.get(topicInHeader)) { executor.submit(new KakfaTopicConsumer(stream, messagesLatch)); } } - private void sendMessagesInRoute(int messageInOtherTopic, String bodyOther, String... headersWithValue) { + private void sendMessagesInRoute(int messages, ProducerTemplate template, Object bodyOther, String... headersWithValue) { Map<String, Object> headerMap = new HashMap<String, Object>(); for (int i = 0; i < headersWithValue.length; i = i + 2) { headerMap.put(headersWithValue[i], headersWithValue[i + 1]); } + sendMessagesInRoute(messages, template, bodyOther, headerMap); + } - for (int k = 0; k < messageInOtherTopic; k++) { + private void sendMessagesInRoute(int messages, ProducerTemplate template, Object bodyOther, Map<String, Object> headerMap) { + for (int k = 0; k < messages; k++) { template.sendBodyAndHeaders(bodyOther, headerMap); } }