Repository: camel
Updated Branches:
  refs/heads/master c2f424c21 -> ab6074949


CAMEL-10586: make the kafka endpoint a little easier to use. You can now set 
key and partitionKey in the endpoint uri.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ab607494
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ab607494
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ab607494

Branch: refs/heads/master
Commit: ab607494918a1044907a73c1d179fe82e0644e2f
Parents: c2f424c
Author: Claus Ibsen <davscl...@apache.org>
Authored: Fri Mar 3 21:18:58 2017 +0100
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Fri Mar 3 21:18:58 2017 +0100

----------------------------------------------------------------------
 .../src/main/docs/kafka-component.adoc          |  4 +-
 .../component/kafka/KafkaConfiguration.java     | 28 ++++++++++++++
 .../camel/component/kafka/KafkaProducer.java    | 13 +++++--
 .../component/kafka/KafkaProducerFullTest.java  | 40 +++++++++++++++++++-
 .../component/kafka/KafkaProducerTest.java      | 24 ++++++------
 5 files changed, 90 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/ab607494/components/camel-kafka/src/main/docs/kafka-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc 
b/components/camel-kafka/src/main/docs/kafka-component.adoc
index b6c7887..e4aff38 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -51,7 +51,7 @@ The Kafka component supports 2 options which are listed below.
 
 
 // endpoint options: START
-The Kafka component supports 80 endpoint options which are listed below:
+The Kafka component supports 82 endpoint options which are listed below:
 
 {% raw %}
 [width="100%",cols="2,1,1m,1m,5",options="header"]
@@ -89,6 +89,7 @@ The Kafka component supports 80 endpoint options which are 
listed below:
 | bufferMemorySize | producer | 33554432 | Integer | The total bytes of memory 
the producer can use to buffer records waiting to be sent to the server. If 
records are sent faster than they can be delivered to the server the producer 
will either block or throw an exception based on the preference specified by 
block.on.buffer.full.This setting should correspond roughly to the total memory 
the producer will use but is not a hard bound since not all memory the producer 
uses is used for buffering. Some additional memory will be used for compression 
(if compression is enabled) as well as for maintaining in-flight requests.
 | compressionCodec | producer | none | String | This parameter allows you to 
specify the compression codec for all data generated by this producer. Valid 
values are none gzip and snappy.
 | connectionMaxIdleMs | producer | 540000 | Integer | Close idle connections 
after the number of milliseconds specified by this config.
+| key | producer |  | String | The record key (or null if no key is 
specified). If this option has been configured then it take precedence over 
header link KafkaConstantsKEY
 | keySerializerClass | producer |  | String | The serializer class for keys 
(defaults to the same as for messages if nothing is given).
 | lingerMs | producer | 0 | Integer | The producer groups together any records 
that arrive in between request transmissions into a single batched request. 
Normally this occurs only under load when records arrive faster than they can 
be sent out. However in some circumstances the client may want to reduce the 
number of requests even under moderate load. This setting accomplishes this by 
adding a small amount of artificial delaythat is rather than immediately 
sending out a record the producer will wait for up to the given delay to allow 
other records to be sent so that the sends can be batched together. This can be 
thought of as analogous to Nagle's algorithm in TCP. This setting gives the 
upper bound on the delay for batching: once we get batch.size worth of records 
for a partition it will be sent immediately regardless of this setting however 
if we have fewer than this many bytes accumulated for this partition we will 
'linger' for the specified time waiting for more records to show 
 up. This setting defaults to 0 (i.e. no delay). Setting linger.ms=5 for 
example would have the effect of reducing the number of requests sent but would 
add up to 5ms of latency to records sent in the absense of load.
 | maxBlockMs | producer | 60000 | Integer | The configuration controls how 
long sending to kafka will block. These methods can be blocked for multiple 
reasons. For e.g: buffer full metadata unavailable.This configuration imposes 
maximum limit on the total time spent in fetching metadata serialization of key 
and value partitioning and allocation of buffer memory when doing a send(). In 
case of partitionsFor() this configuration imposes a maximum time threshold on 
waiting for metadata
@@ -98,6 +99,7 @@ The Kafka component supports 80 endpoint options which are 
listed below:
 | metricReporters | producer |  | String | A list of classes to use as metrics 
reporters. Implementing the MetricReporter interface allows plugging in classes 
that will be notified of new metric creation. The JmxReporter is always 
included to register JMX statistics.
 | metricsSampleWindowMs | producer | 30000 | Integer | The number of samples 
maintained to compute metrics.
 | noOfMetricsSample | producer | 2 | Integer | The number of samples 
maintained to compute metrics.
+| partitionKey | producer |  | Integer | The partition to which the record 
will be sent (or null if no partition was specified). If this option has been 
configured then it take precedence over header link KafkaConstantsPARTITION_KEY
 | producerBatchSize | producer | 16384 | Integer | The producer will attempt 
to batch records together into fewer requests whenever multiple records are 
being sent to the same partition. This helps performance on both the client and 
the server. This configuration controls the default batch size in bytes. No 
attempt will be made to batch records larger than this size.Requests sent to 
brokers will contain multiple batches one for each partition with data 
available to be sent.A small batch size will make batching less common and may 
reduce throughput (a batch size of zero will disable batching entirely). A very 
large batch size may use memory a bit more wastefully as we will always 
allocate a buffer of the specified batch size in anticipation of additional 
records.
 | queueBufferingMaxMessages | producer | 10000 | Integer | The maximum number 
of unsent messages that can be queued up the producer when using async mode 
before either the producer must be blocked or data must be dropped.
 | receiveBufferBytes | producer | 32768 | Integer | The size of the TCP 
receive buffer (SO_RCVBUF) to use when reading data.

http://git-wip-us.apache.org/repos/asf/camel/blob/ab607494/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 8807d29..31e95db 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -136,6 +136,10 @@ public class KafkaConfiguration {
     @UriParam(label = "producer")
     private String keySerializerClass;
 
+    @UriParam(label = "producer")
+    private String key;
+    @UriParam(label = "producer")
+    private Integer partitionKey;
     @UriParam(label = "producer", enums = "-1,0,1,all", defaultValue = "1")
     private String requestRequiredAcks = "1";
     //buffer.memory
@@ -994,6 +998,30 @@ public class KafkaConfiguration {
         this.bufferMemorySize = bufferMemorySize;
     }
 
+    public String getKey() {
+        return key;
+    }
+
+    /**
+     * The record key (or null if no key is specified).
+     * If this option has been configured then it take precedence over header 
{@link KafkaConstants#KEY}
+     */
+    public void setKey(String key) {
+        this.key = key;
+    }
+
+    public Integer getPartitionKey() {
+        return partitionKey;
+    }
+
+    /**
+     * The partition to which the record will be sent (or null if no partition 
was specified).
+     * If this option has been configured then it take precedence over header 
{@link KafkaConstants#PARTITION_KEY}
+     */
+    public void setPartitionKey(Integer partitionKey) {
+        this.partitionKey = partitionKey;
+    }
+
     public String getRequestRequiredAcks() {
         return requestRequiredAcks;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/ab607494/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index abfc588..b5c192e 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -124,10 +124,15 @@ public class KafkaProducer extends DefaultAsyncProducer {
         if (topic == null) {
             throw new CamelExchangeException("No topic key set", exchange);
         }
-        final Object partitionKey = 
exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY);
+
+        // endpoint take precedence over header configuration
+        final Integer partitionKey = 
endpoint.getConfiguration().getPartitionKey() != null
+            ? endpoint.getConfiguration().getPartitionKey() : 
exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY, Integer.class);
         final boolean hasPartitionKey = partitionKey != null;
 
-        final Object messageKey = 
exchange.getIn().getHeader(KafkaConstants.KEY);
+        // endpoint take precedence over header configuration
+        final Object messageKey = endpoint.getConfiguration().getKey() != null
+            ? endpoint.getConfiguration().getKey() : 
exchange.getIn().getHeader(KafkaConstants.KEY);
         final boolean hasMessageKey = messageKey != null;
 
         Object msg = exchange.getIn().getBody();
@@ -149,7 +154,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
                 @Override
                 public ProducerRecord next() {
                     if (hasPartitionKey && hasMessageKey) {
-                        return new ProducerRecord(msgTopic, new 
Integer(partitionKey.toString()), messageKey, msgList.next());
+                        return new ProducerRecord(msgTopic, partitionKey, 
messageKey, msgList.next());
                     } else if (hasMessageKey) {
                         return new ProducerRecord(msgTopic, messageKey, 
msgList.next());
                     }
@@ -164,7 +169,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
         }
         ProducerRecord record;
         if (hasPartitionKey && hasMessageKey) {
-            record = new ProducerRecord(topic, new 
Integer(partitionKey.toString()), messageKey, msg);
+            record = new ProducerRecord(topic, partitionKey, messageKey, msg);
         } else if (hasMessageKey) {
             record = new ProducerRecord(topic, messageKey, msg);
         } else {

http://git-wip-us.apache.org/repos/asf/camel/blob/ab607494/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
index fb396c2..4086ede 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
@@ -54,6 +54,8 @@ public class KafkaProducerFullTest extends 
BaseEmbeddedKafkaTest {
 
     @EndpointInject(uri = "kafka:" + TOPIC_STRINGS + "?requestRequiredAcks=-1")
     private Endpoint toStrings;
+    @EndpointInject(uri = "kafka:" + TOPIC_STRINGS + 
"?requestRequiredAcks=-1&partitionKey=1")
+    private Endpoint toStrings2;
     @EndpointInject(uri = "kafka:" + TOPIC_INTERCEPTED + 
"?requestRequiredAcks=-1"
             + 
"&interceptorClasses=org.apache.camel.component.kafka.MockProducerInterceptor")
     private Endpoint toStringsWithInterceptor;
@@ -67,6 +69,9 @@ public class KafkaProducerFullTest extends 
BaseEmbeddedKafkaTest {
     @Produce(uri = "direct:startStrings")
     private ProducerTemplate stringsTemplate;
 
+    @Produce(uri = "direct:startStrings2")
+    private ProducerTemplate stringsTemplate2;
+
     @Produce(uri = "direct:startBytes")
     private ProducerTemplate bytesTemplate;
 
@@ -108,6 +113,8 @@ public class KafkaProducerFullTest extends 
BaseEmbeddedKafkaTest {
             public void configure() throws Exception {
                 from("direct:startStrings").to(toStrings).to(mockEndpoint);
 
+                from("direct:startStrings2").to(toStrings2).to(mockEndpoint);
+
                 from("direct:startBytes").to(toBytes).to(mockEndpoint);
 
                 
from("direct:startTraced").to(toStringsWithInterceptor).to(mockEndpoint);
@@ -143,6 +150,33 @@ public class KafkaProducerFullTest extends 
BaseEmbeddedKafkaTest {
     }
 
     @Test
+    public void producedString2MessageIsReceivedByKafka() throws 
InterruptedException, IOException {
+        int messageInTopic = 10;
+        int messageInOtherTopic = 5;
+
+        CountDownLatch messagesLatch = new CountDownLatch(messageInTopic + 
messageInOtherTopic);
+
+        sendMessagesInRoute(messageInTopic, stringsTemplate2, "IT test 
message", (String[]) null);
+        sendMessagesInRoute(messageInOtherTopic, stringsTemplate2, "IT test 
message in other topic", KafkaConstants.PARTITION_KEY, "1", 
KafkaConstants.TOPIC, TOPIC_STRINGS_IN_HEADER);
+
+        createKafkaMessageConsumer(stringsConsumerConn, TOPIC_STRINGS, 
TOPIC_STRINGS_IN_HEADER, messagesLatch);
+
+        boolean allMessagesReceived = messagesLatch.await(200, 
TimeUnit.MILLISECONDS);
+
+        assertTrue("Not all messages were published to the kafka topics. Not 
received: " + messagesLatch.getCount(), allMessagesReceived);
+
+        List<Exchange> exchangeList = mockEndpoint.getExchanges();
+        assertEquals("Fifteen Exchanges are expected", exchangeList.size(), 
15);
+        for (Exchange exchange : exchangeList) {
+            @SuppressWarnings("unchecked")
+            List<RecordMetadata> recordMetaData1 = (List<RecordMetadata>) 
(exchange.getIn().getHeader(KafkaConstants.KAFKA_RECORDMETA));
+            assertEquals("One RecordMetadata is expected.", 
recordMetaData1.size(), 1);
+            assertTrue("Offset is positive", recordMetaData1.get(0).offset() 
>= 0);
+            assertTrue("Topic Name start with 'test'", 
recordMetaData1.get(0).topic().startsWith("test"));
+        }
+    }
+
+    @Test
     public void producedStringMessageIsIntercepted() throws 
InterruptedException, IOException {
         int messageInTopic = 10;
         int messageInOtherTopic = 5;
@@ -275,8 +309,10 @@ public class KafkaProducerFullTest extends 
BaseEmbeddedKafkaTest {
 
     private void sendMessagesInRoute(int messages, ProducerTemplate template, 
Object bodyOther, String... headersWithValue) {
         Map<String, Object> headerMap = new HashMap<String, Object>();
-        for (int i = 0; i < headersWithValue.length; i = i + 2) {
-            headerMap.put(headersWithValue[i], headersWithValue[i + 1]);
+        if (headersWithValue != null) {
+            for (int i = 0; i < headersWithValue.length; i = i + 2) {
+                headerMap.put(headersWithValue[i], headersWithValue[i + 1]);
+            }
         }
         sendMessagesInRoute(messages, template, bodyOther, headerMap);
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/ab607494/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index 17a3aec..946e3cd 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -82,7 +82,7 @@ public class KafkaProducerTest {
         Mockito.when(exchange.getIn()).thenReturn(in);
         Mockito.when(exchange.getOut()).thenReturn(out);
 
-        in.setHeader(KafkaConstants.PARTITION_KEY, "4");
+        in.setHeader(KafkaConstants.PARTITION_KEY, 4);
 
         producer.process(exchange);
         
Mockito.verify(producer.getKafkaProducer()).send(Matchers.any(ProducerRecord.class));
@@ -97,7 +97,7 @@ public class KafkaProducerTest {
         org.apache.kafka.clients.producer.KafkaProducer kp = 
producer.getKafkaProducer();
         
Mockito.when(kp.send(Matchers.any(ProducerRecord.class))).thenThrow(new 
ApiException());
         Mockito.when(exchange.getIn()).thenReturn(in);
-        in.setHeader(KafkaConstants.PARTITION_KEY, "4");
+        in.setHeader(KafkaConstants.PARTITION_KEY, 4);
 
         producer.process(exchange);
 
@@ -110,7 +110,7 @@ public class KafkaProducerTest {
         Mockito.when(exchange.getIn()).thenReturn(in);
         Mockito.when(exchange.getOut()).thenReturn(out);
 
-        in.setHeader(KafkaConstants.PARTITION_KEY, "4");
+        in.setHeader(KafkaConstants.PARTITION_KEY, 4);
 
         producer.process(exchange, callback);
 
@@ -131,7 +131,7 @@ public class KafkaProducerTest {
         org.apache.kafka.clients.producer.KafkaProducer kp = 
producer.getKafkaProducer();
         Mockito.when(kp.send(Matchers.any(ProducerRecord.class), 
Matchers.any(Callback.class))).thenThrow(new ApiException());
 
-        in.setHeader(KafkaConstants.PARTITION_KEY, "4");
+        in.setHeader(KafkaConstants.PARTITION_KEY, 4);
 
         producer.process(exchange, callback);
 
@@ -163,13 +163,13 @@ public class KafkaProducerTest {
         Mockito.when(exchange.getIn()).thenReturn(in);
         Mockito.when(exchange.getOut()).thenReturn(out);
 
-        in.setHeader(KafkaConstants.PARTITION_KEY, "4");
+        in.setHeader(KafkaConstants.PARTITION_KEY, 4);
         in.setHeader(KafkaConstants.TOPIC, "anotherTopic");
         in.setHeader(KafkaConstants.KEY, "someKey");
 
         producer.process(exchange);
 
-        verifySendMessage("4", "anotherTopic", "someKey");
+        verifySendMessage(4, "anotherTopic", "someKey");
         assertRecordMetadataExists();
     }
 
@@ -200,12 +200,12 @@ public class KafkaProducerTest {
         endpoint.getConfiguration().setTopic("someTopic");
         Mockito.when(exchange.getIn()).thenReturn(in);
         Mockito.when(exchange.getOut()).thenReturn(out);
-        in.setHeader(KafkaConstants.PARTITION_KEY, "4");
+        in.setHeader(KafkaConstants.PARTITION_KEY, 4);
         in.setHeader(KafkaConstants.KEY, "someKey");
 
         producer.process(exchange);
 
-        verifySendMessage("4", "someTopic", "someKey");
+        verifySendMessage(4, "someTopic", "someKey");
         assertRecordMetadataExists();
     }
 
@@ -230,11 +230,11 @@ public class KafkaProducerTest {
         Mockito.when(exchange.getOut()).thenReturn(out);
         in.setHeader(KafkaConstants.TOPIC, "anotherTopic");
         in.setHeader(KafkaConstants.KEY, "someKey");
-        in.setHeader(KafkaConstants.PARTITION_KEY, "4");
+        in.setHeader(KafkaConstants.PARTITION_KEY, 4);
 
         producer.process(exchange);
 
-        verifySendMessage("4", "someTopic", "someKey");
+        verifySendMessage(4, "someTopic", "someKey");
         assertRecordMetadataExists();
     }
 
@@ -251,10 +251,10 @@ public class KafkaProducerTest {
     }
 
     @SuppressWarnings({"unchecked", "rawtypes"})
-    protected void verifySendMessage(String partitionKey, String topic, String 
messageKey) {
+    protected void verifySendMessage(Integer partitionKey, String topic, 
String messageKey) {
         ArgumentCaptor<ProducerRecord> captor = 
ArgumentCaptor.forClass(ProducerRecord.class);
         Mockito.verify(producer.getKafkaProducer()).send(captor.capture());
-        assertEquals(new Integer(partitionKey), captor.getValue().partition());
+        assertEquals(partitionKey, captor.getValue().partition());
         assertEquals(messageKey, captor.getValue().key());
         assertEquals(topic, captor.getValue().topic());
     }

Reply via email to