Component docs

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c3d0f2f2
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c3d0f2f2
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c3d0f2f2

Branch: refs/heads/master
Commit: c3d0f2f222aaaa60a4bc0300aa4eb6582e7143e7
Parents: ee5c7a0
Author: Claus Ibsen <[email protected]>
Authored: Mon May 18 17:06:16 2015 +0200
Committer: Claus Ibsen <[email protected]>
Committed: Mon May 18 17:07:31 2015 +0200

----------------------------------------------------------------------
 .../camel/component/kafka/KafkaComponent.java   |   7 +-
 .../component/kafka/KafkaConfiguration.java     | 162 ++++++++++++++++---
 .../camel/component/kafka/KafkaEndpoint.java    |  13 +-
 .../component/kafka/KafkaEndpointTest.java      |   6 +-
 .../component/kafka/KafkaProducerTest.java      |   4 +-
 5 files changed, 154 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/c3d0f2f2/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
index bf33a03..b659e73 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
@@ -35,7 +35,12 @@ public class KafkaComponent extends UriEndpointComponent {
     protected KafkaEndpoint createEndpoint(String uri,
                                            String remaining,
                                            Map<String, Object> params) throws 
Exception {
-        KafkaEndpoint endpoint = new KafkaEndpoint(uri, remaining, this);
+
+        KafkaEndpoint endpoint = new KafkaEndpoint(uri, this);
+        String brokers = remaining.split("\\?")[0];
+        if (brokers != null) {
+            endpoint.getConfiguration().setBrokers(brokers);
+        }
         setProperties(endpoint, params);
         return endpoint;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/c3d0f2f2/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 368b10f..4a2e1ea 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -19,8 +19,10 @@ package org.apache.camel.component.kafka;
 import java.util.Properties;
 
 import kafka.producer.DefaultPartitioner;
+import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriParams;
+import org.apache.camel.spi.UriPath;
 
 @UriParams
 public class KafkaConfiguration {
@@ -31,7 +33,7 @@ public class KafkaConfiguration {
     private String zookeeperHost;
     @UriParam(defaultValue = "2181")
     private int zookeeperPort = 2181;
-    @UriParam
+    @UriParam @Metadata(required = "true")
     private String topic;
     @UriParam
     private String groupId;
@@ -90,39 +92,41 @@ public class KafkaConfiguration {
     private Integer zookeeperSyncTimeMs;
 
     //Producer configuration properties
-    @UriParam
-    private String producerType;
-    @UriParam
-    private String compressionCodec;
-    @UriParam
+    @UriPath
+    private String brokers;
+    @UriParam(label = "producer", defaultValue = "sync", enums = "async,sync")
+    private String producerType = "sync";
+    @UriParam(label = "producer", defaultValue = "none", enums = 
"none,gzip,snappy")
+    private String compressionCodec = "none";
+    @UriParam(label = "producer")
     private String compressedTopics;
-    @UriParam
-    private Integer messageSendMaxRetries;
-    @UriParam
-    private Integer retryBackoffMs;
-    @UriParam
-    private Integer topicMetadataRefreshIntervalMs;
+    @UriParam(label = "producer", defaultValue = "3")
+    private Integer messageSendMaxRetries = 3;
+    @UriParam(label = "producer", defaultValue = "100")
+    private Integer retryBackoffMs = 100;
+    @UriParam(label = "producer", defaultValue = "600000")
+    private Integer topicMetadataRefreshIntervalMs = 600 * 1000;
 
     //Sync producer config
-    @UriParam
-    private Integer sendBufferBytes;
-    @UriParam
+    @UriParam(label = "producer", defaultValue = "" + 100 * 1024)
+    private Integer sendBufferBytes = 100 * 1024;
+    @UriParam(label = "producer", defaultValue = "0")
     private short requestRequiredAcks;
-    @UriParam
-    private Integer requestTimeoutMs;
+    @UriParam(label = "producer", defaultValue = "10000")
+    private Integer requestTimeoutMs = 10000;
 
     //Async producer config
-    @UriParam
-    private Integer queueBufferingMaxMs;
-    @UriParam
-    private Integer queueBufferingMaxMessages;
-    @UriParam
+    @UriParam(label = "producer", defaultValue = "5000")
+    private Integer queueBufferingMaxMs = 5000;
+    @UriParam(label = "producer", defaultValue = "10000")
+    private Integer queueBufferingMaxMessages = 10000;
+    @UriParam(label = "producer")
     private Integer queueEnqueueTimeoutMs;
-    @UriParam
-    private Integer batchNumMessages;
-    @UriParam
+    @UriParam(label = "producer", defaultValue = "200")
+    private Integer batchNumMessages = 200;
+    @UriParam(label = "producer")
     private String serializerClass;
-    @UriParam
+    @UriParam(label = "producer")
     private String keySerializerClass;
 
     public KafkaConfiguration() {
@@ -256,6 +260,9 @@ public class KafkaConfiguration {
         return partitioner;
     }
 
+    /**
+     * The partitioner class for partitioning messages amongst sub-topics. The 
default partitioner is based on the hash of the key.
+     */
     public void setPartitioner(String partitioner) {
         this.partitioner = partitioner;
     }
@@ -264,6 +271,9 @@ public class KafkaConfiguration {
         return topic;
     }
 
+    /**
+     * Name of the topic to use
+     */
     public void setTopic(String topic) {
         this.topic = topic;
     }
@@ -272,6 +282,9 @@ public class KafkaConfiguration {
         return consumerStreams;
     }
 
+    /**
+     * Number of concurrent consumers on the consumer
+     */
     public void setConsumerStreams(int consumerStreams) {
         this.consumerStreams = consumerStreams;
     }
@@ -395,6 +408,9 @@ public class KafkaConfiguration {
         return queuedMaxMessageChunks;
     }
 
+    /**
+     * Max number of message chunks buffered for consumption. Each chunk can 
be up to fetch.message.max.bytes.
+     */
     public void setQueuedMaxMessageChunks(Integer queuedMaxMessageChunks) {
         this.queuedMaxMessageChunks = queuedMaxMessageChunks;
     }
@@ -515,10 +531,31 @@ public class KafkaConfiguration {
         this.zookeeperSyncTimeMs = zookeeperSyncTimeMs;
     }
 
+    public String getBrokers() {
+        return brokers;
+    }
+
+    /**
+     * This is for bootstrapping and the producer will only use it for getting 
metadata (topics, partitions and replicas).
+     * The socket connections for sending the actual data will be established 
based on the broker information returned in the metadata.
+     * The format is host1:port1,host2:port2, and the list can be a subset of 
brokers or a VIP pointing to a subset of brokers.
+     * <p/>
+     * This option is known as <tt>metadata.broker.list</tt> in the Kafka 
documentation.
+     */
+    public void setBrokers(String brokers) {
+        this.brokers = brokers;
+    }
+
     public String getProducerType() {
         return producerType;
     }
 
+    /**
+     * This parameter specifies whether the messages are sent asynchronously 
in a background thread.
+     * Valid values are (1) async for asynchronous send and (2) sync for 
synchronous send.
+     * By setting the producer to async we allow batching together of requests 
(which is great for throughput)
+     * but open the possibility of a failure of the client machine dropping 
unsent data.
+     */
     public void setProducerType(String producerType) {
         this.producerType = producerType;
     }
@@ -527,6 +564,9 @@ public class KafkaConfiguration {
         return compressionCodec;
     }
 
+    /**
+     * This parameter allows you to specify the compression codec for all data 
generated by this producer. Valid values are "none", "gzip" and "snappy".
+     */
     public void setCompressionCodec(String compressionCodec) {
         this.compressionCodec = compressionCodec;
     }
@@ -535,6 +575,12 @@ public class KafkaConfiguration {
         return compressedTopics;
     }
 
+    /**
+     * This parameter allows you to set whether compression should be turned 
on for particular topics.
+     * If the compression codec is anything other than NoCompressionCodec, 
enable compression only for specified topics if any.
+     * If the list of compressed topics is empty, then enable the specified 
compression codec for all topics.
+     * If the compression codec is NoCompressionCodec, compression is disabled 
for all topics
+     */
     public void setCompressedTopics(String compressedTopics) {
         this.compressedTopics = compressedTopics;
     }
@@ -543,6 +589,11 @@ public class KafkaConfiguration {
         return messageSendMaxRetries;
     }
 
+    /**
+     * This property will cause the producer to automatically retry a failed 
send request.
+     * This property specifies the number of retries when such failures occur. 
Note that setting a non-zero value here
+     * can lead to duplicates in the case of network errors that cause a 
message to be sent but the acknowledgement to be lost.
+     */
     public void setMessageSendMaxRetries(Integer messageSendMaxRetries) {
         this.messageSendMaxRetries = messageSendMaxRetries;
     }
@@ -551,6 +602,10 @@ public class KafkaConfiguration {
         return retryBackoffMs;
     }
 
+    /**
+     * Before each retry, the producer refreshes the metadata of relevant 
topics to see if a new leader has been elected.
+     * Since leader election takes a bit of time, this property specifies the 
amount of time that the producer waits before refreshing the metadata.
+     */
     public void setRetryBackoffMs(Integer retryBackoffMs) {
         this.retryBackoffMs = retryBackoffMs;
     }
@@ -559,6 +614,14 @@ public class KafkaConfiguration {
         return topicMetadataRefreshIntervalMs;
     }
 
+    /**
+     * The producer generally refreshes the topic metadata from brokers when 
there is a failure (partition missing,
+     * leader not available...). It will also poll regularly (default: every 
10min so 600000ms).
+     * If you set this to a negative value, metadata will only get refreshed 
on failure.
+     * If you set this to zero, the metadata will get refreshed after each 
message sent (not recommended).
+     * Important note: the refresh happen only AFTER the message is sent, so 
if the producer never
+     * sends a message the metadata is never refreshed
+     */
     public void setTopicMetadataRefreshIntervalMs(Integer 
topicMetadataRefreshIntervalMs) {
         this.topicMetadataRefreshIntervalMs = topicMetadataRefreshIntervalMs;
     }
@@ -567,6 +630,9 @@ public class KafkaConfiguration {
         return sendBufferBytes;
     }
 
+    /**
+     * Socket write buffer size
+     */
     public void setSendBufferBytes(Integer sendBufferBytes) {
         this.sendBufferBytes = sendBufferBytes;
     }
@@ -575,6 +641,22 @@ public class KafkaConfiguration {
         return requestRequiredAcks;
     }
 
+    /**
+     * This value controls when a produce request is considered completed. 
Specifically,
+     * how many other brokers must have committed the data to their log and 
acknowledged this to the leader?
+     * Typical values are (0, 1 or -1):
+     * 0, which means that the producer never waits for an acknowledgement 
from the broker (the same behavior as 0.7).
+     * This option provides the lowest latency but the weakest durability 
guarantees (some data will be lost when a server fails).
+     * 1, which means that the producer gets an acknowledgement after the 
leader replica has received the data.
+     * This option provides better durability as the client waits until the 
server acknowledges the request as successful
+     * (only messages that were written to the now-dead leader but not yet 
replicated will be lost).
+     * -1, The producer gets an acknowledgement after all in-sync replicas 
have received the data.
+     * This option provides the greatest level of durability.
+     * However, it does not completely eliminate the risk of message loss 
because the number of in sync replicas may,
+     * in rare cases, shrink to 1. If you want to ensure that some minimum 
number of replicas
+     * (typically a majority) receive a write, then you must set the 
topic-level min.insync.replicas setting.
+     * Please read the Replication section of the design documentation for a 
more in-depth discussion.
+     */
     public void setRequestRequiredAcks(short requestRequiredAcks) {
         this.requestRequiredAcks = requestRequiredAcks;
     }
@@ -583,6 +665,9 @@ public class KafkaConfiguration {
         return requestTimeoutMs;
     }
 
+    /**
+     * The amount of time the broker will wait trying to meet the 
request.required.acks requirement before sending back an error to the client.
+     */
     public void setRequestTimeoutMs(Integer requestTimeoutMs) {
         this.requestTimeoutMs = requestTimeoutMs;
     }
@@ -591,6 +676,11 @@ public class KafkaConfiguration {
         return queueBufferingMaxMs;
     }
 
+    /**
+     * Maximum time to buffer data when using async mode.
+     * For example a setting of 100 will try to batch together 100ms of 
messages to send at once.
+     * This will improve throughput but adds message delivery latency due to 
the buffering.
+     */
     public void setQueueBufferingMaxMs(Integer queueBufferingMaxMs) {
         this.queueBufferingMaxMs = queueBufferingMaxMs;
     }
@@ -599,6 +689,10 @@ public class KafkaConfiguration {
         return queueBufferingMaxMessages;
     }
 
+    /**
+     * The maximum number of unsent messages that can be queued up the 
producer when using async
+     * mode before either the producer must be blocked or data must be dropped.
+     */
     public void setQueueBufferingMaxMessages(Integer 
queueBufferingMaxMessages) {
         this.queueBufferingMaxMessages = queueBufferingMaxMessages;
     }
@@ -607,6 +701,11 @@ public class KafkaConfiguration {
         return queueEnqueueTimeoutMs;
     }
 
+    /**
+     * The amount of time to block before dropping messages when running in 
async mode and the buffer has reached
+     * queue.buffering.max.messages. If set to 0 events will be enqueued 
immediately or dropped if the queue is full
+     * (the producer send call will never block). If set to -1 the producer 
will block indefinitely and never willingly drop a send.
+     */
     public void setQueueEnqueueTimeoutMs(Integer queueEnqueueTimeoutMs) {
         this.queueEnqueueTimeoutMs = queueEnqueueTimeoutMs;
     }
@@ -615,6 +714,10 @@ public class KafkaConfiguration {
         return batchNumMessages;
     }
 
+    /**
+     * The number of messages to send in one batch when using async mode.
+     * The producer will wait until either this number of messages are ready 
to send or queue.buffer.max.ms is reached.
+     */
     public void setBatchNumMessages(Integer batchNumMessages) {
         this.batchNumMessages = batchNumMessages;
     }
@@ -623,6 +726,10 @@ public class KafkaConfiguration {
         return serializerClass;
     }
 
+    /**
+     * The serializer class for messages. The default encoder takes a byte[] 
and returns the same byte[].
+     * The default class is kafka.serializer.DefaultEncoder
+     */
     public void setSerializerClass(String serializerClass) {
         this.serializerClass = serializerClass;
     }
@@ -631,6 +738,9 @@ public class KafkaConfiguration {
         return keySerializerClass;
     }
 
+    /**
+     * The serializer class for keys (defaults to the same as for messages if 
nothing is given).
+     */
     public void setKeySerializerClass(String keySerializerClass) {
         this.keySerializerClass = keySerializerClass;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/c3d0f2f2/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index a424955..bebe6d7 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -37,19 +37,14 @@ import org.apache.camel.spi.UriPath;
 @UriEndpoint(scheme = "kafka", title = "Kafka", syntax = "kafka:brokers", 
consumerClass = KafkaConsumer.class, label = "messaging")
 public class KafkaEndpoint extends DefaultEndpoint implements 
MultipleConsumersSupport {
 
-    @UriPath @Metadata(required = "true")
-    private String brokers;
     @UriParam
     private KafkaConfiguration configuration = new KafkaConfiguration();
 
     public KafkaEndpoint() {
     }
 
-    public KafkaEndpoint(String endpointUri,
-                         String remaining,
-                         KafkaComponent component) throws URISyntaxException {
+    public KafkaEndpoint(String endpointUri, KafkaComponent component) {
         super(endpointUri, component);
-        this.brokers = remaining.split("\\?")[0];
     }
 
     public KafkaConfiguration getConfiguration() {
@@ -156,7 +151,11 @@ public class KafkaEndpoint extends DefaultEndpoint 
implements MultipleConsumersS
     }
 
     public String getBrokers() {
-        return brokers;
+        return configuration.getBrokers();
+    }
+
+    public void setBrokers(String brokers) {
+        configuration.setBrokers(brokers);
     }
 
     public int getConsumerStreams() {

http://git-wip-us.apache.org/repos/asf/camel/blob/c3d0f2f2/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
index 6ac6f81..ed4a6d1 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
@@ -32,7 +32,8 @@ public class KafkaEndpointTest {
 
     @Test
     public void testCreatingKafkaExchangeSetsHeaders() throws 
URISyntaxException {
-        KafkaEndpoint endpoint = new KafkaEndpoint("kafka:localhost", 
"localhost", new KafkaComponent());
+        KafkaEndpoint endpoint = new KafkaEndpoint("kafka:localhost", new 
KafkaComponent());
+        endpoint.setBrokers("localhost");
 
         Message message = new Message("mymessage".getBytes(), 
"somekey".getBytes());
         DefaultDecoder decoder = new DefaultDecoder(null);
@@ -47,7 +48,8 @@ public class KafkaEndpointTest {
 
     @Test
     public void assertSingleton() throws URISyntaxException {
-        KafkaEndpoint endpoint = new KafkaEndpoint("kafka:localhost", 
"localhost", new KafkaComponent());
+        KafkaEndpoint endpoint = new KafkaEndpoint("kafka:localhost", new 
KafkaComponent());
+        endpoint.setBrokers("localhost");
         assertTrue(endpoint.isSingleton());
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/c3d0f2f2/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index d989c96..8e5b0f7 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -41,8 +41,8 @@ public class KafkaProducerTest {
 
     @SuppressWarnings({"unchecked"})
     public KafkaProducerTest() throws Exception {
-        endpoint = new 
KafkaEndpoint("kafka:broker1:1234,broker2:4567?topic=sometopic",
-                "broker1:1234," + "broker2:4567?topic=sometopic", null);
+        endpoint = new 
KafkaEndpoint("kafka:broker1:1234,broker2:4567?topic=sometopic", null);
+        endpoint.setBrokers("broker1:1234,broker2:4567");
         producer = new KafkaProducer(endpoint);
         producer.producer = Mockito.mock(Producer.class);
     }

Reply via email to