Repository: camel
Updated Branches:
  refs/heads/master c14e0063b -> c3d0f2f22


Component docs


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ee5c7a07
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ee5c7a07
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ee5c7a07

Branch: refs/heads/master
Commit: ee5c7a0797f482392d30694737f7d6571554f096
Parents: c14e006
Author: Claus Ibsen <davscl...@apache.org>
Authored: Mon May 18 16:39:26 2015 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Mon May 18 16:39:26 2015 +0200

----------------------------------------------------------------------
 .../component/kafka/KafkaConfiguration.java     | 168 +++++++++++++++----
 .../camel/component/kafka/KafkaEndpoint.java    |   4 +-
 2 files changed, 136 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/ee5c7a07/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 576cb9e..368b10f 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -37,13 +37,14 @@ public class KafkaConfiguration {
     private String groupId;
     @UriParam(defaultValue = "DefaultPartitioner")
     private String partitioner = DefaultPartitioner.class.getCanonicalName();
-    @UriParam(defaultValue = "10")
+
+    @UriParam(label = "consumer", defaultValue = "10")
     private int consumerStreams = 10;
-    @UriParam(defaultValue = "1")
+    @UriParam(label = "consumer", defaultValue = "1")
     private int consumersCount = 1;
-    @UriParam(defaultValue = "100")
+    @UriParam(label = "consumer", defaultValue = "100")
     private int batchSize = 100;
-    @UriParam(defaultValue = "10000")
+    @UriParam(label = "consumer", defaultValue = "10000")
     private int barrierAwaitTimeoutMs = 10000;
 
     //Common configuration properties
@@ -51,33 +52,33 @@ public class KafkaConfiguration {
     private String clientId;
 
     //Consumer configuration properties
-    @UriParam
+    @UriParam(label = "consumer")
     private String consumerId;
-    @UriParam
-    private Integer socketTimeoutMs;
-    @UriParam
-    private Integer socketReceiveBufferBytes;
-    @UriParam
-    private Integer fetchMessageMaxBytes;
-    @UriParam
-    private Boolean autoCommitEnable;
-    @UriParam
-    private Integer autoCommitIntervalMs;
-    @UriParam
-    private Integer queuedMaxMessages;
-    @UriParam
-    private Integer rebalanceMaxRetries;
-    @UriParam
-    private Integer fetchMinBytes;
-    @UriParam
-    private Integer fetchWaitMaxMs;
-    @UriParam
-    private Integer rebalanceBackoffMs;
-    @UriParam
-    private Integer refreshLeaderBackoffMs;
-    @UriParam
-    private String autoOffsetReset;
-    @UriParam
+    @UriParam(label = "consumer", defaultValue = "30000")
+    private Integer socketTimeoutMs = 30 * 1000;
+    @UriParam(label = "consumer", defaultValue = "" + 64 * 1024)
+    private Integer socketReceiveBufferBytes = 64 * 1024;
+    @UriParam(label = "consumer", defaultValue = "" + 1024 * 1024)
+    private Integer fetchMessageMaxBytes = 1024 * 1024;
+    @UriParam(label = "consumer", defaultValue = "true")
+    private Boolean autoCommitEnable = true;
+    @UriParam(label = "consumer", defaultValue = "60000")
+    private Integer autoCommitIntervalMs = 60 * 1000;
+    @UriParam(label = "consumer", defaultValue = "2")
+    private Integer queuedMaxMessageChunks = 2;
+    @UriParam(label = "consumer", defaultValue = "4")
+    private Integer rebalanceMaxRetries = 4;
+    @UriParam(label = "consumer", defaultValue = "1")
+    private Integer fetchMinBytes = 1;
+    @UriParam(label = "consumer", defaultValue = "100")
+    private Integer fetchWaitMaxMs = 100;
+    @UriParam(label = "consumer", defaultValue = "2000")
+    private Integer rebalanceBackoffMs = 2000;
+    @UriParam(label = "consumer", defaultValue = "200")
+    private Integer refreshLeaderBackoffMs = 200;
+    @UriParam(label = "consumer", defaultValue = "largest", enums = 
"smallest,largest,fail")
+    private String autoOffsetReset = "largest";
+    @UriParam(label = "consumer")
     private Integer consumerTimeoutMs;
 
     //Zookeepr configuration properties
@@ -160,6 +161,7 @@ public class KafkaConfiguration {
         addPropertyIfNotNull(props, "queued.max.message.chunks", 
getQueueBufferingMaxMessages());
         addPropertyIfNotNull(props, "fetch.min.bytes", getFetchMinBytes());
         addPropertyIfNotNull(props, "fetch.wait.max.ms", getFetchWaitMaxMs());
+        addPropertyIfNotNull(props, "queued.max.message.chunks", 
getQueuedMaxMessageChunks());
         addPropertyIfNotNull(props, "rebalance.max.retries", 
getRebalanceMaxRetries());
         addPropertyIfNotNull(props, "rebalance.backoff.ms", 
getRebalanceBackoffMs());
         addPropertyIfNotNull(props, "refresh.leader.backoff.ms", 
getRefreshLeaderBackoffMs());
@@ -187,6 +189,15 @@ public class KafkaConfiguration {
         }
     }
 
+    /**
+     * Specifies the ZooKeeper connection string in the form hostname:port 
where host and port are the host and port of a ZooKeeper server.
+     * To allow connecting through other ZooKeeper nodes when that ZooKeeper 
machine is down you can also specify multiple hosts in the
+     * form hostname1:port1,hostname2:port2,hostname3:port3.
+     * The server may also have a ZooKeeper chroot path as part of it's 
ZooKeeper connection string which puts its data
+     * under some path in the global ZooKeeper namespace. If so the consumer 
should use the same chroot path in its connection string.
+     * For example to give a chroot path of /chroot/path you would give the 
connection
+     * string as hostname1:port1,hostname2:port2,hostname3:port3/chroot/path.
+     */
     public void setZookeeperConnect(String zookeeperConnect) {
         this.zookeeperConnect = zookeeperConnect;
         
@@ -199,6 +210,13 @@ public class KafkaConfiguration {
         return zookeeperHost;
     }
 
+    /**
+     * The zookeeper host to use.
+     * <p/>
+     * To connect to multiple zookeeper hosts use the zookeeperConnect option 
instead.
+     * <p/>
+     * This option can only be used if zookeeperConnect is not in use.
+     */
     public void setZookeeperHost(String zookeeperHost) {
         if (this.zookeeperConnect == null) {
             this.zookeeperHost = zookeeperHost;
@@ -209,6 +227,13 @@ public class KafkaConfiguration {
         return zookeeperPort;
     }
 
+    /**
+     * The zookeeper port to use
+     * <p/>
+     * To connect to multiple zookeeper hosts use the zookeeperConnect option 
instead.
+     * <p/>
+     * This option can only be used if zookeeperConnect is not in use.
+     */
     public void setZookeeperPort(int zookeeperPort) {
         if (this.zookeeperConnect == null) {
             this.zookeeperPort = zookeeperPort;
@@ -219,6 +244,10 @@ public class KafkaConfiguration {
         return groupId;
     }
 
+    /**
+     * A string that uniquely identifies the group of consumer processes to 
which this consumer belongs.
+     * By setting the same group id multiple processes indicate that they are 
all part of the same consumer group.
+     */
     public void setGroupId(String groupId) {
         this.groupId = groupId;
     }
@@ -251,6 +280,9 @@ public class KafkaConfiguration {
         return batchSize;
     }
 
+    /**
+     * The batchSize that the BatchingConsumerTask processes once.
+     */
     public void setBatchSize(int batchSize) {
         this.batchSize = batchSize;
     }
@@ -259,6 +291,9 @@ public class KafkaConfiguration {
         return barrierAwaitTimeoutMs;
     }
 
+    /**
+     * If the BatchingConsumerTask processes exchange exceed the batchSize, it 
will wait for barrierAwaitTimeoutMs.
+     */
     public void setBarrierAwaitTimeoutMs(int barrierAwaitTimeoutMs) {
         this.barrierAwaitTimeoutMs = barrierAwaitTimeoutMs;
     }
@@ -267,6 +302,9 @@ public class KafkaConfiguration {
         return consumersCount;
     }
 
+    /**
+     * The number of consumers that connect to kafka server
+     */
     public void setConsumersCount(int consumersCount) {
         this.consumersCount = consumersCount;
     }
@@ -275,6 +313,10 @@ public class KafkaConfiguration {
         return clientId;
     }
 
+    /**
+     * The client id is a user-specified string sent in each request to help 
trace calls.
+     * It should logically identify the application making the request.
+     */
     public void setClientId(String clientId) {
         this.clientId = clientId;
     }
@@ -283,6 +325,9 @@ public class KafkaConfiguration {
         return consumerId;
     }
 
+    /**
+     * Generated automatically if not set.
+     */
     public void setConsumerId(String consumerId) {
         this.consumerId = consumerId;
     }
@@ -291,6 +336,9 @@ public class KafkaConfiguration {
         return socketTimeoutMs;
     }
 
+    /**
+     * The socket timeout for network requests. The actual timeout set will be 
max.fetch.wait + socket.timeout.ms.
+     */
     public void setSocketTimeoutMs(Integer socketTimeoutMs) {
         this.socketTimeoutMs = socketTimeoutMs;
     }
@@ -299,6 +347,9 @@ public class KafkaConfiguration {
         return socketReceiveBufferBytes;
     }
 
+    /**
+     * The socket receive buffer for network requests
+     */
     public void setSocketReceiveBufferBytes(Integer socketReceiveBufferBytes) {
         this.socketReceiveBufferBytes = socketReceiveBufferBytes;
     }
@@ -307,6 +358,12 @@ public class KafkaConfiguration {
         return fetchMessageMaxBytes;
     }
 
+    /**
+     * The number of byes of messages to attempt to fetch for each 
topic-partition in each fetch request.
+     * These bytes will be read into memory for each partition, so this helps 
control the memory used by the consumer.
+     * The fetch request size must be at least as large as the maximum message 
size the server allows or else it
+     * is possible for the producer to send messages larger than the consumer 
can fetch.
+     */
     public void setFetchMessageMaxBytes(Integer fetchMessageMaxBytes) {
         this.fetchMessageMaxBytes = fetchMessageMaxBytes;
     }
@@ -315,6 +372,10 @@ public class KafkaConfiguration {
         return autoCommitEnable;
     }
 
+    /**
+     * If true, periodically commit to ZooKeeper the offset of messages 
already fetched by the consumer.
+     * This committed offset will be used when the process fails as the 
position from which the new consumer will begin.
+     */
     public void setAutoCommitEnable(Boolean autoCommitEnable) {
         this.autoCommitEnable = autoCommitEnable;
     }
@@ -323,22 +384,30 @@ public class KafkaConfiguration {
         return autoCommitIntervalMs;
     }
 
+    /**
+     * The frequency in ms that the consumer offsets are committed to 
zookeeper.
+     */
     public void setAutoCommitIntervalMs(Integer autoCommitIntervalMs) {
         this.autoCommitIntervalMs = autoCommitIntervalMs;
     }
 
-    public Integer getQueuedMaxMessages() {
-        return queuedMaxMessages;
+    public Integer getQueuedMaxMessageChunks() {
+        return queuedMaxMessageChunks;
     }
 
-    public void setQueuedMaxMessages(Integer queuedMaxMessages) {
-        this.queuedMaxMessages = queuedMaxMessages;
+    public void setQueuedMaxMessageChunks(Integer queuedMaxMessageChunks) {
+        this.queuedMaxMessageChunks = queuedMaxMessageChunks;
     }
 
     public Integer getRebalanceMaxRetries() {
         return rebalanceMaxRetries;
     }
 
+    /**
+     * When a new consumer joins a consumer group the set of consumers attempt 
to "rebalance" the load to assign partitions to each consumer.
+     * If the set of consumers changes while this assignment is taking place 
the rebalance will fail and retry.
+     * This setting controls the maximum number of attempts before giving up.
+     */
     public void setRebalanceMaxRetries(Integer rebalanceMaxRetries) {
         this.rebalanceMaxRetries = rebalanceMaxRetries;
     }
@@ -347,6 +416,10 @@ public class KafkaConfiguration {
         return fetchMinBytes;
     }
 
+    /**
+     * The minimum amount of data the server should return for a fetch request.
+     * If insufficient data is available the request will wait for that much 
data to accumulate before answering the request.
+     */
     public void setFetchMinBytes(Integer fetchMinBytes) {
         this.fetchMinBytes = fetchMinBytes;
     }
@@ -355,6 +428,9 @@ public class KafkaConfiguration {
         return fetchWaitMaxMs;
     }
 
+    /**
+     * The maximum amount of time the server will block before answering the 
fetch request if there isn't sufficient data to immediately satisfy 
fetch.min.bytes
+     */
     public void setFetchWaitMaxMs(Integer fetchWaitMaxMs) {
         this.fetchWaitMaxMs = fetchWaitMaxMs;
     }
@@ -363,6 +439,9 @@ public class KafkaConfiguration {
         return rebalanceBackoffMs;
     }
 
+    /**
+     * Backoff time between retries during rebalance.
+     */
     public void setRebalanceBackoffMs(Integer rebalanceBackoffMs) {
         this.rebalanceBackoffMs = rebalanceBackoffMs;
     }
@@ -371,6 +450,9 @@ public class KafkaConfiguration {
         return refreshLeaderBackoffMs;
     }
 
+    /**
+     * Backoff time to wait before trying to determine the leader of a 
partition that has just lost its leader.
+     */
     public void setRefreshLeaderBackoffMs(Integer refreshLeaderBackoffMs) {
         this.refreshLeaderBackoffMs = refreshLeaderBackoffMs;
     }
@@ -379,6 +461,12 @@ public class KafkaConfiguration {
         return autoOffsetReset;
     }
 
+    /**
+     * What to do when there is no initial offset in ZooKeeper or if an offset 
is out of range:
+     * smallest : automatically reset the offset to the smallest offset
+     * largest : automatically reset the offset to the largest offset
+     * fail: throw exception to the consumer
+     */
     public void setAutoOffsetReset(String autoOffsetReset) {
         this.autoOffsetReset = autoOffsetReset;
     }
@@ -387,6 +475,9 @@ public class KafkaConfiguration {
         return consumerTimeoutMs;
     }
 
+    /**
+     * Throw a timeout exception to the consumer if no message is available 
for consumption after the specified interval
+     */
     public void setConsumerTimeoutMs(Integer consumerTimeoutMs) {
         this.consumerTimeoutMs = consumerTimeoutMs;
     }
@@ -395,6 +486,9 @@ public class KafkaConfiguration {
         return zookeeperSessionTimeoutMs;
     }
 
+    /**
+     * ZooKeeper session timeout. If the consumer fails to heartbeat to 
ZooKeeper for this period of time it is considered dead and a rebalance will 
occur.
+     */
     public void setZookeeperSessionTimeoutMs(Integer 
zookeeperSessionTimeoutMs) {
         this.zookeeperSessionTimeoutMs = zookeeperSessionTimeoutMs;
     }
@@ -403,6 +497,9 @@ public class KafkaConfiguration {
         return zookeeperConnectionTimeoutMs;
     }
 
+    /**
+     * The max time that the client waits while establishing a connection to 
zookeeper.
+     */
     public void setZookeeperConnectionTimeoutMs(Integer 
zookeeperConnectionTimeoutMs) {
         this.zookeeperConnectionTimeoutMs = zookeeperConnectionTimeoutMs;
     }
@@ -411,6 +508,9 @@ public class KafkaConfiguration {
         return zookeeperSyncTimeMs;
     }
 
+    /**
+     * How far a ZK follower can be behind a ZK leader
+     */
     public void setZookeeperSyncTimeMs(Integer zookeeperSyncTimeMs) {
         this.zookeeperSyncTimeMs = zookeeperSyncTimeMs;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/ee5c7a07/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index 5ec0d62..a424955 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -240,7 +240,7 @@ public class KafkaEndpoint extends DefaultEndpoint 
implements MultipleConsumersS
     }
 
     public int getQueuedMaxMessages() {
-        return configuration.getQueuedMaxMessages();
+        return configuration.getQueuedMaxMessageChunks();
     }
 
     public int getAutoCommitIntervalMs() {
@@ -416,7 +416,7 @@ public class KafkaEndpoint extends DefaultEndpoint 
implements MultipleConsumersS
     }
 
     public void setQueuedMaxMessages(int queuedMaxMessages) {
-        configuration.setQueuedMaxMessages(queuedMaxMessages);
+        configuration.setQueuedMaxMessageChunks(queuedMaxMessages);
     }
 
     public void setRetryBackoffMs(int retryBackoffMs) {

Reply via email to