Repository: camel Updated Branches: refs/heads/master c2f424c21 -> ab6074949
CAMEL-10586: make the kafka endpoint a little easier to use. You can now set key and partitionKey in the endpoint uri. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ab607494 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ab607494 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ab607494 Branch: refs/heads/master Commit: ab607494918a1044907a73c1d179fe82e0644e2f Parents: c2f424c Author: Claus Ibsen <davscl...@apache.org> Authored: Fri Mar 3 21:18:58 2017 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Mar 3 21:18:58 2017 +0100 ---------------------------------------------------------------------- .../src/main/docs/kafka-component.adoc | 4 +- .../component/kafka/KafkaConfiguration.java | 28 ++++++++++++++ .../camel/component/kafka/KafkaProducer.java | 13 +++++-- .../component/kafka/KafkaProducerFullTest.java | 40 +++++++++++++++++++- .../component/kafka/KafkaProducerTest.java | 24 ++++++------ 5 files changed, 90 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/ab607494/components/camel-kafka/src/main/docs/kafka-component.adoc ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc index b6c7887..e4aff38 100644 --- a/components/camel-kafka/src/main/docs/kafka-component.adoc +++ b/components/camel-kafka/src/main/docs/kafka-component.adoc @@ -51,7 +51,7 @@ The Kafka component supports 2 options which are listed below. // endpoint options: START -The Kafka component supports 80 endpoint options which are listed below: +The Kafka component supports 82 endpoint options which are listed below: {% raw %} [width="100%",cols="2,1,1m,1m,5",options="header"] @@ -89,6 +89,7 @@ The Kafka component supports 80 endpoint options which are listed below: | bufferMemorySize | producer | 33554432 | Integer | The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered to the server the producer will either block or throw an exception based on the preference specified by block.on.buffer.full.This setting should correspond roughly to the total memory the producer will use but is not a hard bound since not all memory the producer uses is used for buffering. Some additional memory will be used for compression (if compression is enabled) as well as for maintaining in-flight requests. | compressionCodec | producer | none | String | This parameter allows you to specify the compression codec for all data generated by this producer. Valid values are none gzip and snappy. | connectionMaxIdleMs | producer | 540000 | Integer | Close idle connections after the number of milliseconds specified by this config. +| key | producer | | String | The record key (or null if no key is specified). If this option has been configured then it take precedence over header link KafkaConstantsKEY | keySerializerClass | producer | | String | The serializer class for keys (defaults to the same as for messages if nothing is given). | lingerMs | producer | 0 | Integer | The producer groups together any records that arrive in between request transmissions into a single batched request. Normally this occurs only under load when records arrive faster than they can be sent out. However in some circumstances the client may want to reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount of artificial delaythat is rather than immediately sending out a record the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together. This can be thought of as analogous to Nagle's algorithm in TCP. This setting gives the upper bound on the delay for batching: once we get batch.size worth of records for a partition it will be sent immediately regardless of this setting however if we have fewer than this many bytes accumulated for this partition we will 'linger' for the specified time waiting for more records to show up. This setting defaults to 0 (i.e. no delay). Setting linger.ms=5 for example would have the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absense of load. | maxBlockMs | producer | 60000 | Integer | The configuration controls how long sending to kafka will block. These methods can be blocked for multiple reasons. For e.g: buffer full metadata unavailable.This configuration imposes maximum limit on the total time spent in fetching metadata serialization of key and value partitioning and allocation of buffer memory when doing a send(). In case of partitionsFor() this configuration imposes a maximum time threshold on waiting for metadata @@ -98,6 +99,7 @@ The Kafka component supports 80 endpoint options which are listed below: | metricReporters | producer | | String | A list of classes to use as metrics reporters. Implementing the MetricReporter interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics. | metricsSampleWindowMs | producer | 30000 | Integer | The number of samples maintained to compute metrics. | noOfMetricsSample | producer | 2 | Integer | The number of samples maintained to compute metrics. +| partitionKey | producer | | Integer | The partition to which the record will be sent (or null if no partition was specified). If this option has been configured then it take precedence over header link KafkaConstantsPARTITION_KEY | producerBatchSize | producer | 16384 | Integer | The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes. No attempt will be made to batch records larger than this size.Requests sent to brokers will contain multiple batches one for each partition with data available to be sent.A small batch size will make batching less common and may reduce throughput (a batch size of zero will disable batching entirely). A very large batch size may use memory a bit more wastefully as we will always allocate a buffer of the specified batch size in anticipation of additional records. | queueBufferingMaxMessages | producer | 10000 | Integer | The maximum number of unsent messages that can be queued up the producer when using async mode before either the producer must be blocked or data must be dropped. | receiveBufferBytes | producer | 32768 | Integer | The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. http://git-wip-us.apache.org/repos/asf/camel/blob/ab607494/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java index 8807d29..31e95db 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java @@ -136,6 +136,10 @@ public class KafkaConfiguration { @UriParam(label = "producer") private String keySerializerClass; + @UriParam(label = "producer") + private String key; + @UriParam(label = "producer") + private Integer partitionKey; @UriParam(label = "producer", enums = "-1,0,1,all", defaultValue = "1") private String requestRequiredAcks = "1"; //buffer.memory @@ -994,6 +998,30 @@ public class KafkaConfiguration { this.bufferMemorySize = bufferMemorySize; } + public String getKey() { + return key; + } + + /** + * The record key (or null if no key is specified). + * If this option has been configured then it take precedence over header {@link KafkaConstants#KEY} + */ + public void setKey(String key) { + this.key = key; + } + + public Integer getPartitionKey() { + return partitionKey; + } + + /** + * The partition to which the record will be sent (or null if no partition was specified). + * If this option has been configured then it take precedence over header {@link KafkaConstants#PARTITION_KEY} + */ + public void setPartitionKey(Integer partitionKey) { + this.partitionKey = partitionKey; + } + public String getRequestRequiredAcks() { return requestRequiredAcks; } http://git-wip-us.apache.org/repos/asf/camel/blob/ab607494/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java index abfc588..b5c192e 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java @@ -124,10 +124,15 @@ public class KafkaProducer extends DefaultAsyncProducer { if (topic == null) { throw new CamelExchangeException("No topic key set", exchange); } - final Object partitionKey = exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY); + + // endpoint take precedence over header configuration + final Integer partitionKey = endpoint.getConfiguration().getPartitionKey() != null + ? endpoint.getConfiguration().getPartitionKey() : exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY, Integer.class); final boolean hasPartitionKey = partitionKey != null; - final Object messageKey = exchange.getIn().getHeader(KafkaConstants.KEY); + // endpoint take precedence over header configuration + final Object messageKey = endpoint.getConfiguration().getKey() != null + ? endpoint.getConfiguration().getKey() : exchange.getIn().getHeader(KafkaConstants.KEY); final boolean hasMessageKey = messageKey != null; Object msg = exchange.getIn().getBody(); @@ -149,7 +154,7 @@ public class KafkaProducer extends DefaultAsyncProducer { @Override public ProducerRecord next() { if (hasPartitionKey && hasMessageKey) { - return new ProducerRecord(msgTopic, new Integer(partitionKey.toString()), messageKey, msgList.next()); + return new ProducerRecord(msgTopic, partitionKey, messageKey, msgList.next()); } else if (hasMessageKey) { return new ProducerRecord(msgTopic, messageKey, msgList.next()); } @@ -164,7 +169,7 @@ public class KafkaProducer extends DefaultAsyncProducer { } ProducerRecord record; if (hasPartitionKey && hasMessageKey) { - record = new ProducerRecord(topic, new Integer(partitionKey.toString()), messageKey, msg); + record = new ProducerRecord(topic, partitionKey, messageKey, msg); } else if (hasMessageKey) { record = new ProducerRecord(topic, messageKey, msg); } else { http://git-wip-us.apache.org/repos/asf/camel/blob/ab607494/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java index fb396c2..4086ede 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java @@ -54,6 +54,8 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest { @EndpointInject(uri = "kafka:" + TOPIC_STRINGS + "?requestRequiredAcks=-1") private Endpoint toStrings; + @EndpointInject(uri = "kafka:" + TOPIC_STRINGS + "?requestRequiredAcks=-1&partitionKey=1") + private Endpoint toStrings2; @EndpointInject(uri = "kafka:" + TOPIC_INTERCEPTED + "?requestRequiredAcks=-1" + "&interceptorClasses=org.apache.camel.component.kafka.MockProducerInterceptor") private Endpoint toStringsWithInterceptor; @@ -67,6 +69,9 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest { @Produce(uri = "direct:startStrings") private ProducerTemplate stringsTemplate; + @Produce(uri = "direct:startStrings2") + private ProducerTemplate stringsTemplate2; + @Produce(uri = "direct:startBytes") private ProducerTemplate bytesTemplate; @@ -108,6 +113,8 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest { public void configure() throws Exception { from("direct:startStrings").to(toStrings).to(mockEndpoint); + from("direct:startStrings2").to(toStrings2).to(mockEndpoint); + from("direct:startBytes").to(toBytes).to(mockEndpoint); from("direct:startTraced").to(toStringsWithInterceptor).to(mockEndpoint); @@ -143,6 +150,33 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest { } @Test + public void producedString2MessageIsReceivedByKafka() throws InterruptedException, IOException { + int messageInTopic = 10; + int messageInOtherTopic = 5; + + CountDownLatch messagesLatch = new CountDownLatch(messageInTopic + messageInOtherTopic); + + sendMessagesInRoute(messageInTopic, stringsTemplate2, "IT test message", (String[]) null); + sendMessagesInRoute(messageInOtherTopic, stringsTemplate2, "IT test message in other topic", KafkaConstants.PARTITION_KEY, "1", KafkaConstants.TOPIC, TOPIC_STRINGS_IN_HEADER); + + createKafkaMessageConsumer(stringsConsumerConn, TOPIC_STRINGS, TOPIC_STRINGS_IN_HEADER, messagesLatch); + + boolean allMessagesReceived = messagesLatch.await(200, TimeUnit.MILLISECONDS); + + assertTrue("Not all messages were published to the kafka topics. Not received: " + messagesLatch.getCount(), allMessagesReceived); + + List<Exchange> exchangeList = mockEndpoint.getExchanges(); + assertEquals("Fifteen Exchanges are expected", exchangeList.size(), 15); + for (Exchange exchange : exchangeList) { + @SuppressWarnings("unchecked") + List<RecordMetadata> recordMetaData1 = (List<RecordMetadata>) (exchange.getIn().getHeader(KafkaConstants.KAFKA_RECORDMETA)); + assertEquals("One RecordMetadata is expected.", recordMetaData1.size(), 1); + assertTrue("Offset is positive", recordMetaData1.get(0).offset() >= 0); + assertTrue("Topic Name start with 'test'", recordMetaData1.get(0).topic().startsWith("test")); + } + } + + @Test public void producedStringMessageIsIntercepted() throws InterruptedException, IOException { int messageInTopic = 10; int messageInOtherTopic = 5; @@ -275,8 +309,10 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest { private void sendMessagesInRoute(int messages, ProducerTemplate template, Object bodyOther, String... headersWithValue) { Map<String, Object> headerMap = new HashMap<String, Object>(); - for (int i = 0; i < headersWithValue.length; i = i + 2) { - headerMap.put(headersWithValue[i], headersWithValue[i + 1]); + if (headersWithValue != null) { + for (int i = 0; i < headersWithValue.length; i = i + 2) { + headerMap.put(headersWithValue[i], headersWithValue[i + 1]); + } } sendMessagesInRoute(messages, template, bodyOther, headerMap); } http://git-wip-us.apache.org/repos/asf/camel/blob/ab607494/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java index 17a3aec..946e3cd 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java @@ -82,7 +82,7 @@ public class KafkaProducerTest { Mockito.when(exchange.getIn()).thenReturn(in); Mockito.when(exchange.getOut()).thenReturn(out); - in.setHeader(KafkaConstants.PARTITION_KEY, "4"); + in.setHeader(KafkaConstants.PARTITION_KEY, 4); producer.process(exchange); Mockito.verify(producer.getKafkaProducer()).send(Matchers.any(ProducerRecord.class)); @@ -97,7 +97,7 @@ public class KafkaProducerTest { org.apache.kafka.clients.producer.KafkaProducer kp = producer.getKafkaProducer(); Mockito.when(kp.send(Matchers.any(ProducerRecord.class))).thenThrow(new ApiException()); Mockito.when(exchange.getIn()).thenReturn(in); - in.setHeader(KafkaConstants.PARTITION_KEY, "4"); + in.setHeader(KafkaConstants.PARTITION_KEY, 4); producer.process(exchange); @@ -110,7 +110,7 @@ public class KafkaProducerTest { Mockito.when(exchange.getIn()).thenReturn(in); Mockito.when(exchange.getOut()).thenReturn(out); - in.setHeader(KafkaConstants.PARTITION_KEY, "4"); + in.setHeader(KafkaConstants.PARTITION_KEY, 4); producer.process(exchange, callback); @@ -131,7 +131,7 @@ public class KafkaProducerTest { org.apache.kafka.clients.producer.KafkaProducer kp = producer.getKafkaProducer(); Mockito.when(kp.send(Matchers.any(ProducerRecord.class), Matchers.any(Callback.class))).thenThrow(new ApiException()); - in.setHeader(KafkaConstants.PARTITION_KEY, "4"); + in.setHeader(KafkaConstants.PARTITION_KEY, 4); producer.process(exchange, callback); @@ -163,13 +163,13 @@ public class KafkaProducerTest { Mockito.when(exchange.getIn()).thenReturn(in); Mockito.when(exchange.getOut()).thenReturn(out); - in.setHeader(KafkaConstants.PARTITION_KEY, "4"); + in.setHeader(KafkaConstants.PARTITION_KEY, 4); in.setHeader(KafkaConstants.TOPIC, "anotherTopic"); in.setHeader(KafkaConstants.KEY, "someKey"); producer.process(exchange); - verifySendMessage("4", "anotherTopic", "someKey"); + verifySendMessage(4, "anotherTopic", "someKey"); assertRecordMetadataExists(); } @@ -200,12 +200,12 @@ public class KafkaProducerTest { endpoint.getConfiguration().setTopic("someTopic"); Mockito.when(exchange.getIn()).thenReturn(in); Mockito.when(exchange.getOut()).thenReturn(out); - in.setHeader(KafkaConstants.PARTITION_KEY, "4"); + in.setHeader(KafkaConstants.PARTITION_KEY, 4); in.setHeader(KafkaConstants.KEY, "someKey"); producer.process(exchange); - verifySendMessage("4", "someTopic", "someKey"); + verifySendMessage(4, "someTopic", "someKey"); assertRecordMetadataExists(); } @@ -230,11 +230,11 @@ public class KafkaProducerTest { Mockito.when(exchange.getOut()).thenReturn(out); in.setHeader(KafkaConstants.TOPIC, "anotherTopic"); in.setHeader(KafkaConstants.KEY, "someKey"); - in.setHeader(KafkaConstants.PARTITION_KEY, "4"); + in.setHeader(KafkaConstants.PARTITION_KEY, 4); producer.process(exchange); - verifySendMessage("4", "someTopic", "someKey"); + verifySendMessage(4, "someTopic", "someKey"); assertRecordMetadataExists(); } @@ -251,10 +251,10 @@ public class KafkaProducerTest { } @SuppressWarnings({"unchecked", "rawtypes"}) - protected void verifySendMessage(String partitionKey, String topic, String messageKey) { + protected void verifySendMessage(Integer partitionKey, String topic, String messageKey) { ArgumentCaptor<ProducerRecord> captor = ArgumentCaptor.forClass(ProducerRecord.class); Mockito.verify(producer.getKafkaProducer()).send(captor.capture()); - assertEquals(new Integer(partitionKey), captor.getValue().partition()); + assertEquals(partitionKey, captor.getValue().partition()); assertEquals(messageKey, captor.getValue().key()); assertEquals(topic, captor.getValue().topic()); }