Repository: camel Updated Branches: refs/heads/master c14e0063b -> c3d0f2f22
Component docs Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ee5c7a07 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ee5c7a07 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ee5c7a07 Branch: refs/heads/master Commit: ee5c7a0797f482392d30694737f7d6571554f096 Parents: c14e006 Author: Claus Ibsen <davscl...@apache.org> Authored: Mon May 18 16:39:26 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Mon May 18 16:39:26 2015 +0200 ---------------------------------------------------------------------- .../component/kafka/KafkaConfiguration.java | 168 +++++++++++++++---- .../camel/component/kafka/KafkaEndpoint.java | 4 +- 2 files changed, 136 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/ee5c7a07/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java index 576cb9e..368b10f 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java @@ -37,13 +37,14 @@ public class KafkaConfiguration { private String groupId; @UriParam(defaultValue = "DefaultPartitioner") private String partitioner = DefaultPartitioner.class.getCanonicalName(); - @UriParam(defaultValue = "10") + + @UriParam(label = "consumer", defaultValue = "10") private int consumerStreams = 10; - @UriParam(defaultValue = "1") + @UriParam(label = "consumer", defaultValue = "1") private int consumersCount = 1; - @UriParam(defaultValue = "100") + @UriParam(label = "consumer", defaultValue = "100") private int batchSize = 100; - @UriParam(defaultValue = "10000") + @UriParam(label = "consumer", defaultValue = "10000") private int barrierAwaitTimeoutMs = 10000; //Common configuration properties @@ -51,33 +52,33 @@ public class KafkaConfiguration { private String clientId; //Consumer configuration properties - @UriParam + @UriParam(label = "consumer") private String consumerId; - @UriParam - private Integer socketTimeoutMs; - @UriParam - private Integer socketReceiveBufferBytes; - @UriParam - private Integer fetchMessageMaxBytes; - @UriParam - private Boolean autoCommitEnable; - @UriParam - private Integer autoCommitIntervalMs; - @UriParam - private Integer queuedMaxMessages; - @UriParam - private Integer rebalanceMaxRetries; - @UriParam - private Integer fetchMinBytes; - @UriParam - private Integer fetchWaitMaxMs; - @UriParam - private Integer rebalanceBackoffMs; - @UriParam - private Integer refreshLeaderBackoffMs; - @UriParam - private String autoOffsetReset; - @UriParam + @UriParam(label = "consumer", defaultValue = "30000") + private Integer socketTimeoutMs = 30 * 1000; + @UriParam(label = "consumer", defaultValue = "" + 64 * 1024) + private Integer socketReceiveBufferBytes = 64 * 1024; + @UriParam(label = "consumer", defaultValue = "" + 1024 * 1024) + private Integer fetchMessageMaxBytes = 1024 * 1024; + @UriParam(label = "consumer", defaultValue = "true") + private Boolean autoCommitEnable = true; + @UriParam(label = "consumer", defaultValue = "60000") + private Integer autoCommitIntervalMs = 60 * 1000; + @UriParam(label = "consumer", defaultValue = "2") + private Integer queuedMaxMessageChunks = 2; + @UriParam(label = "consumer", defaultValue = "4") + private Integer rebalanceMaxRetries = 4; + @UriParam(label = "consumer", defaultValue = "1") + private Integer fetchMinBytes = 1; + @UriParam(label = "consumer", defaultValue = "100") + private Integer fetchWaitMaxMs = 100; + @UriParam(label = "consumer", defaultValue = "2000") + private Integer rebalanceBackoffMs = 2000; + @UriParam(label = "consumer", defaultValue = "200") + private Integer refreshLeaderBackoffMs = 200; + @UriParam(label = "consumer", defaultValue = "largest", enums = "smallest,largest,fail") + private String autoOffsetReset = "largest"; + @UriParam(label = "consumer") private Integer consumerTimeoutMs; //Zookeepr configuration properties @@ -160,6 +161,7 @@ public class KafkaConfiguration { addPropertyIfNotNull(props, "queued.max.message.chunks", getQueueBufferingMaxMessages()); addPropertyIfNotNull(props, "fetch.min.bytes", getFetchMinBytes()); addPropertyIfNotNull(props, "fetch.wait.max.ms", getFetchWaitMaxMs()); + addPropertyIfNotNull(props, "queued.max.message.chunks", getQueuedMaxMessageChunks()); addPropertyIfNotNull(props, "rebalance.max.retries", getRebalanceMaxRetries()); addPropertyIfNotNull(props, "rebalance.backoff.ms", getRebalanceBackoffMs()); addPropertyIfNotNull(props, "refresh.leader.backoff.ms", getRefreshLeaderBackoffMs()); @@ -187,6 +189,15 @@ public class KafkaConfiguration { } } + /** + * Specifies the ZooKeeper connection string in the form hostname:port where host and port are the host and port of a ZooKeeper server. + * To allow connecting through other ZooKeeper nodes when that ZooKeeper machine is down you can also specify multiple hosts in the + * form hostname1:port1,hostname2:port2,hostname3:port3. + * The server may also have a ZooKeeper chroot path as part of it's ZooKeeper connection string which puts its data + * under some path in the global ZooKeeper namespace. If so the consumer should use the same chroot path in its connection string. + * For example to give a chroot path of /chroot/path you would give the connection + * string as hostname1:port1,hostname2:port2,hostname3:port3/chroot/path. + */ public void setZookeeperConnect(String zookeeperConnect) { this.zookeeperConnect = zookeeperConnect; @@ -199,6 +210,13 @@ public class KafkaConfiguration { return zookeeperHost; } + /** + * The zookeeper host to use. + * <p/> + * To connect to multiple zookeeper hosts use the zookeeperConnect option instead. + * <p/> + * This option can only be used if zookeeperConnect is not in use. + */ public void setZookeeperHost(String zookeeperHost) { if (this.zookeeperConnect == null) { this.zookeeperHost = zookeeperHost; @@ -209,6 +227,13 @@ public class KafkaConfiguration { return zookeeperPort; } + /** + * The zookeeper port to use + * <p/> + * To connect to multiple zookeeper hosts use the zookeeperConnect option instead. + * <p/> + * This option can only be used if zookeeperConnect is not in use. + */ public void setZookeeperPort(int zookeeperPort) { if (this.zookeeperConnect == null) { this.zookeeperPort = zookeeperPort; @@ -219,6 +244,10 @@ public class KafkaConfiguration { return groupId; } + /** + * A string that uniquely identifies the group of consumer processes to which this consumer belongs. + * By setting the same group id multiple processes indicate that they are all part of the same consumer group. + */ public void setGroupId(String groupId) { this.groupId = groupId; } @@ -251,6 +280,9 @@ public class KafkaConfiguration { return batchSize; } + /** + * The batchSize that the BatchingConsumerTask processes once. + */ public void setBatchSize(int batchSize) { this.batchSize = batchSize; } @@ -259,6 +291,9 @@ public class KafkaConfiguration { return barrierAwaitTimeoutMs; } + /** + * If the BatchingConsumerTask processes exchange exceed the batchSize, it will wait for barrierAwaitTimeoutMs. + */ public void setBarrierAwaitTimeoutMs(int barrierAwaitTimeoutMs) { this.barrierAwaitTimeoutMs = barrierAwaitTimeoutMs; } @@ -267,6 +302,9 @@ public class KafkaConfiguration { return consumersCount; } + /** + * The number of consumers that connect to kafka server + */ public void setConsumersCount(int consumersCount) { this.consumersCount = consumersCount; } @@ -275,6 +313,10 @@ public class KafkaConfiguration { return clientId; } + /** + * The client id is a user-specified string sent in each request to help trace calls. + * It should logically identify the application making the request. + */ public void setClientId(String clientId) { this.clientId = clientId; } @@ -283,6 +325,9 @@ public class KafkaConfiguration { return consumerId; } + /** + * Generated automatically if not set. + */ public void setConsumerId(String consumerId) { this.consumerId = consumerId; } @@ -291,6 +336,9 @@ public class KafkaConfiguration { return socketTimeoutMs; } + /** + * The socket timeout for network requests. The actual timeout set will be max.fetch.wait + socket.timeout.ms. + */ public void setSocketTimeoutMs(Integer socketTimeoutMs) { this.socketTimeoutMs = socketTimeoutMs; } @@ -299,6 +347,9 @@ public class KafkaConfiguration { return socketReceiveBufferBytes; } + /** + * The socket receive buffer for network requests + */ public void setSocketReceiveBufferBytes(Integer socketReceiveBufferBytes) { this.socketReceiveBufferBytes = socketReceiveBufferBytes; } @@ -307,6 +358,12 @@ public class KafkaConfiguration { return fetchMessageMaxBytes; } + /** + * The number of byes of messages to attempt to fetch for each topic-partition in each fetch request. + * These bytes will be read into memory for each partition, so this helps control the memory used by the consumer. + * The fetch request size must be at least as large as the maximum message size the server allows or else it + * is possible for the producer to send messages larger than the consumer can fetch. + */ public void setFetchMessageMaxBytes(Integer fetchMessageMaxBytes) { this.fetchMessageMaxBytes = fetchMessageMaxBytes; } @@ -315,6 +372,10 @@ public class KafkaConfiguration { return autoCommitEnable; } + /** + * If true, periodically commit to ZooKeeper the offset of messages already fetched by the consumer. + * This committed offset will be used when the process fails as the position from which the new consumer will begin. + */ public void setAutoCommitEnable(Boolean autoCommitEnable) { this.autoCommitEnable = autoCommitEnable; } @@ -323,22 +384,30 @@ public class KafkaConfiguration { return autoCommitIntervalMs; } + /** + * The frequency in ms that the consumer offsets are committed to zookeeper. + */ public void setAutoCommitIntervalMs(Integer autoCommitIntervalMs) { this.autoCommitIntervalMs = autoCommitIntervalMs; } - public Integer getQueuedMaxMessages() { - return queuedMaxMessages; + public Integer getQueuedMaxMessageChunks() { + return queuedMaxMessageChunks; } - public void setQueuedMaxMessages(Integer queuedMaxMessages) { - this.queuedMaxMessages = queuedMaxMessages; + public void setQueuedMaxMessageChunks(Integer queuedMaxMessageChunks) { + this.queuedMaxMessageChunks = queuedMaxMessageChunks; } public Integer getRebalanceMaxRetries() { return rebalanceMaxRetries; } + /** + * When a new consumer joins a consumer group the set of consumers attempt to "rebalance" the load to assign partitions to each consumer. + * If the set of consumers changes while this assignment is taking place the rebalance will fail and retry. + * This setting controls the maximum number of attempts before giving up. + */ public void setRebalanceMaxRetries(Integer rebalanceMaxRetries) { this.rebalanceMaxRetries = rebalanceMaxRetries; } @@ -347,6 +416,10 @@ public class KafkaConfiguration { return fetchMinBytes; } + /** + * The minimum amount of data the server should return for a fetch request. + * If insufficient data is available the request will wait for that much data to accumulate before answering the request. + */ public void setFetchMinBytes(Integer fetchMinBytes) { this.fetchMinBytes = fetchMinBytes; } @@ -355,6 +428,9 @@ public class KafkaConfiguration { return fetchWaitMaxMs; } + /** + * The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy fetch.min.bytes + */ public void setFetchWaitMaxMs(Integer fetchWaitMaxMs) { this.fetchWaitMaxMs = fetchWaitMaxMs; } @@ -363,6 +439,9 @@ public class KafkaConfiguration { return rebalanceBackoffMs; } + /** + * Backoff time between retries during rebalance. + */ public void setRebalanceBackoffMs(Integer rebalanceBackoffMs) { this.rebalanceBackoffMs = rebalanceBackoffMs; } @@ -371,6 +450,9 @@ public class KafkaConfiguration { return refreshLeaderBackoffMs; } + /** + * Backoff time to wait before trying to determine the leader of a partition that has just lost its leader. + */ public void setRefreshLeaderBackoffMs(Integer refreshLeaderBackoffMs) { this.refreshLeaderBackoffMs = refreshLeaderBackoffMs; } @@ -379,6 +461,12 @@ public class KafkaConfiguration { return autoOffsetReset; } + /** + * What to do when there is no initial offset in ZooKeeper or if an offset is out of range: + * smallest : automatically reset the offset to the smallest offset + * largest : automatically reset the offset to the largest offset + * fail: throw exception to the consumer + */ public void setAutoOffsetReset(String autoOffsetReset) { this.autoOffsetReset = autoOffsetReset; } @@ -387,6 +475,9 @@ public class KafkaConfiguration { return consumerTimeoutMs; } + /** + * Throw a timeout exception to the consumer if no message is available for consumption after the specified interval + */ public void setConsumerTimeoutMs(Integer consumerTimeoutMs) { this.consumerTimeoutMs = consumerTimeoutMs; } @@ -395,6 +486,9 @@ public class KafkaConfiguration { return zookeeperSessionTimeoutMs; } + /** + * ZooKeeper session timeout. If the consumer fails to heartbeat to ZooKeeper for this period of time it is considered dead and a rebalance will occur. + */ public void setZookeeperSessionTimeoutMs(Integer zookeeperSessionTimeoutMs) { this.zookeeperSessionTimeoutMs = zookeeperSessionTimeoutMs; } @@ -403,6 +497,9 @@ public class KafkaConfiguration { return zookeeperConnectionTimeoutMs; } + /** + * The max time that the client waits while establishing a connection to zookeeper. + */ public void setZookeeperConnectionTimeoutMs(Integer zookeeperConnectionTimeoutMs) { this.zookeeperConnectionTimeoutMs = zookeeperConnectionTimeoutMs; } @@ -411,6 +508,9 @@ public class KafkaConfiguration { return zookeeperSyncTimeMs; } + /** + * How far a ZK follower can be behind a ZK leader + */ public void setZookeeperSyncTimeMs(Integer zookeeperSyncTimeMs) { this.zookeeperSyncTimeMs = zookeeperSyncTimeMs; } http://git-wip-us.apache.org/repos/asf/camel/blob/ee5c7a07/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java index 5ec0d62..a424955 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java @@ -240,7 +240,7 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS } public int getQueuedMaxMessages() { - return configuration.getQueuedMaxMessages(); + return configuration.getQueuedMaxMessageChunks(); } public int getAutoCommitIntervalMs() { @@ -416,7 +416,7 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS } public void setQueuedMaxMessages(int queuedMaxMessages) { - configuration.setQueuedMaxMessages(queuedMaxMessages); + configuration.setQueuedMaxMessageChunks(queuedMaxMessages); } public void setRetryBackoffMs(int retryBackoffMs) {