Component docs
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c3d0f2f2 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c3d0f2f2 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c3d0f2f2 Branch: refs/heads/master Commit: c3d0f2f222aaaa60a4bc0300aa4eb6582e7143e7 Parents: ee5c7a0 Author: Claus Ibsen <[email protected]> Authored: Mon May 18 17:06:16 2015 +0200 Committer: Claus Ibsen <[email protected]> Committed: Mon May 18 17:07:31 2015 +0200 ---------------------------------------------------------------------- .../camel/component/kafka/KafkaComponent.java | 7 +- .../component/kafka/KafkaConfiguration.java | 162 ++++++++++++++++--- .../camel/component/kafka/KafkaEndpoint.java | 13 +- .../component/kafka/KafkaEndpointTest.java | 6 +- .../component/kafka/KafkaProducerTest.java | 4 +- 5 files changed, 154 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/c3d0f2f2/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java index bf33a03..b659e73 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java @@ -35,7 +35,12 @@ public class KafkaComponent extends UriEndpointComponent { protected KafkaEndpoint createEndpoint(String uri, String remaining, Map<String, Object> params) throws Exception { - KafkaEndpoint endpoint = new KafkaEndpoint(uri, remaining, this); + + KafkaEndpoint endpoint = new KafkaEndpoint(uri, this); + String brokers = remaining.split("\\?")[0]; + if (brokers != null) { + endpoint.getConfiguration().setBrokers(brokers); + } setProperties(endpoint, params); return endpoint; } http://git-wip-us.apache.org/repos/asf/camel/blob/c3d0f2f2/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java index 368b10f..4a2e1ea 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java @@ -19,8 +19,10 @@ package org.apache.camel.component.kafka; import java.util.Properties; import kafka.producer.DefaultPartitioner; +import org.apache.camel.spi.Metadata; import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriParams; +import org.apache.camel.spi.UriPath; @UriParams public class KafkaConfiguration { @@ -31,7 +33,7 @@ public class KafkaConfiguration { private String zookeeperHost; @UriParam(defaultValue = "2181") private int zookeeperPort = 2181; - @UriParam + @UriParam @Metadata(required = "true") private String topic; @UriParam private String groupId; @@ -90,39 +92,41 @@ public class KafkaConfiguration { private Integer zookeeperSyncTimeMs; //Producer configuration properties - @UriParam - private String producerType; - @UriParam - private String compressionCodec; - @UriParam + @UriPath + private String brokers; + @UriParam(label = "producer", defaultValue = "sync", enums = "async,sync") + private String producerType = "sync"; + @UriParam(label = "producer", defaultValue = "none", enums = "none,gzip,snappy") + private String compressionCodec = "none"; + @UriParam(label = "producer") private String compressedTopics; - @UriParam - private Integer messageSendMaxRetries; - @UriParam - private Integer retryBackoffMs; - @UriParam - private Integer topicMetadataRefreshIntervalMs; + @UriParam(label = "producer", defaultValue = "3") + private Integer messageSendMaxRetries = 3; + @UriParam(label = "producer", defaultValue = "100") + private Integer retryBackoffMs = 100; + @UriParam(label = "producer", defaultValue = "600000") + private Integer topicMetadataRefreshIntervalMs = 600 * 1000; //Sync producer config - @UriParam - private Integer sendBufferBytes; - @UriParam + @UriParam(label = "producer", defaultValue = "" + 100 * 1024) + private Integer sendBufferBytes = 100 * 1024; + @UriParam(label = "producer", defaultValue = "0") private short requestRequiredAcks; - @UriParam - private Integer requestTimeoutMs; + @UriParam(label = "producer", defaultValue = "10000") + private Integer requestTimeoutMs = 10000; //Async producer config - @UriParam - private Integer queueBufferingMaxMs; - @UriParam - private Integer queueBufferingMaxMessages; - @UriParam + @UriParam(label = "producer", defaultValue = "5000") + private Integer queueBufferingMaxMs = 5000; + @UriParam(label = "producer", defaultValue = "10000") + private Integer queueBufferingMaxMessages = 10000; + @UriParam(label = "producer") private Integer queueEnqueueTimeoutMs; - @UriParam - private Integer batchNumMessages; - @UriParam + @UriParam(label = "producer", defaultValue = "200") + private Integer batchNumMessages = 200; + @UriParam(label = "producer") private String serializerClass; - @UriParam + @UriParam(label = "producer") private String keySerializerClass; public KafkaConfiguration() { @@ -256,6 +260,9 @@ public class KafkaConfiguration { return partitioner; } + /** + * The partitioner class for partitioning messages amongst sub-topics. The default partitioner is based on the hash of the key. + */ public void setPartitioner(String partitioner) { this.partitioner = partitioner; } @@ -264,6 +271,9 @@ public class KafkaConfiguration { return topic; } + /** + * Name of the topic to use + */ public void setTopic(String topic) { this.topic = topic; } @@ -272,6 +282,9 @@ public class KafkaConfiguration { return consumerStreams; } + /** + * Number of concurrent consumers on the consumer + */ public void setConsumerStreams(int consumerStreams) { this.consumerStreams = consumerStreams; } @@ -395,6 +408,9 @@ public class KafkaConfiguration { return queuedMaxMessageChunks; } + /** + * Max number of message chunks buffered for consumption. Each chunk can be up to fetch.message.max.bytes. + */ public void setQueuedMaxMessageChunks(Integer queuedMaxMessageChunks) { this.queuedMaxMessageChunks = queuedMaxMessageChunks; } @@ -515,10 +531,31 @@ public class KafkaConfiguration { this.zookeeperSyncTimeMs = zookeeperSyncTimeMs; } + public String getBrokers() { + return brokers; + } + + /** + * This is for bootstrapping and the producer will only use it for getting metadata (topics, partitions and replicas). + * The socket connections for sending the actual data will be established based on the broker information returned in the metadata. + * The format is host1:port1,host2:port2, and the list can be a subset of brokers or a VIP pointing to a subset of brokers. + * <p/> + * This option is known as <tt>metadata.broker.list</tt> in the Kafka documentation. + */ + public void setBrokers(String brokers) { + this.brokers = brokers; + } + public String getProducerType() { return producerType; } + /** + * This parameter specifies whether the messages are sent asynchronously in a background thread. + * Valid values are (1) async for asynchronous send and (2) sync for synchronous send. + * By setting the producer to async we allow batching together of requests (which is great for throughput) + * but open the possibility of a failure of the client machine dropping unsent data. + */ public void setProducerType(String producerType) { this.producerType = producerType; } @@ -527,6 +564,9 @@ public class KafkaConfiguration { return compressionCodec; } + /** + * This parameter allows you to specify the compression codec for all data generated by this producer. Valid values are "none", "gzip" and "snappy". + */ public void setCompressionCodec(String compressionCodec) { this.compressionCodec = compressionCodec; } @@ -535,6 +575,12 @@ public class KafkaConfiguration { return compressedTopics; } + /** + * This parameter allows you to set whether compression should be turned on for particular topics. + * If the compression codec is anything other than NoCompressionCodec, enable compression only for specified topics if any. + * If the list of compressed topics is empty, then enable the specified compression codec for all topics. + * If the compression codec is NoCompressionCodec, compression is disabled for all topics + */ public void setCompressedTopics(String compressedTopics) { this.compressedTopics = compressedTopics; } @@ -543,6 +589,11 @@ public class KafkaConfiguration { return messageSendMaxRetries; } + /** + * This property will cause the producer to automatically retry a failed send request. + * This property specifies the number of retries when such failures occur. Note that setting a non-zero value here + * can lead to duplicates in the case of network errors that cause a message to be sent but the acknowledgement to be lost. + */ public void setMessageSendMaxRetries(Integer messageSendMaxRetries) { this.messageSendMaxRetries = messageSendMaxRetries; } @@ -551,6 +602,10 @@ public class KafkaConfiguration { return retryBackoffMs; } + /** + * Before each retry, the producer refreshes the metadata of relevant topics to see if a new leader has been elected. + * Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata. + */ public void setRetryBackoffMs(Integer retryBackoffMs) { this.retryBackoffMs = retryBackoffMs; } @@ -559,6 +614,14 @@ public class KafkaConfiguration { return topicMetadataRefreshIntervalMs; } + /** + * The producer generally refreshes the topic metadata from brokers when there is a failure (partition missing, + * leader not available...). It will also poll regularly (default: every 10min so 600000ms). + * If you set this to a negative value, metadata will only get refreshed on failure. + * If you set this to zero, the metadata will get refreshed after each message sent (not recommended). + * Important note: the refresh happen only AFTER the message is sent, so if the producer never + * sends a message the metadata is never refreshed + */ public void setTopicMetadataRefreshIntervalMs(Integer topicMetadataRefreshIntervalMs) { this.topicMetadataRefreshIntervalMs = topicMetadataRefreshIntervalMs; } @@ -567,6 +630,9 @@ public class KafkaConfiguration { return sendBufferBytes; } + /** + * Socket write buffer size + */ public void setSendBufferBytes(Integer sendBufferBytes) { this.sendBufferBytes = sendBufferBytes; } @@ -575,6 +641,22 @@ public class KafkaConfiguration { return requestRequiredAcks; } + /** + * This value controls when a produce request is considered completed. Specifically, + * how many other brokers must have committed the data to their log and acknowledged this to the leader? + * Typical values are (0, 1 or -1): + * 0, which means that the producer never waits for an acknowledgement from the broker (the same behavior as 0.7). + * This option provides the lowest latency but the weakest durability guarantees (some data will be lost when a server fails). + * 1, which means that the producer gets an acknowledgement after the leader replica has received the data. + * This option provides better durability as the client waits until the server acknowledges the request as successful + * (only messages that were written to the now-dead leader but not yet replicated will be lost). + * -1, The producer gets an acknowledgement after all in-sync replicas have received the data. + * This option provides the greatest level of durability. + * However, it does not completely eliminate the risk of message loss because the number of in sync replicas may, + * in rare cases, shrink to 1. If you want to ensure that some minimum number of replicas + * (typically a majority) receive a write, then you must set the topic-level min.insync.replicas setting. + * Please read the Replication section of the design documentation for a more in-depth discussion. + */ public void setRequestRequiredAcks(short requestRequiredAcks) { this.requestRequiredAcks = requestRequiredAcks; } @@ -583,6 +665,9 @@ public class KafkaConfiguration { return requestTimeoutMs; } + /** + * The amount of time the broker will wait trying to meet the request.required.acks requirement before sending back an error to the client. + */ public void setRequestTimeoutMs(Integer requestTimeoutMs) { this.requestTimeoutMs = requestTimeoutMs; } @@ -591,6 +676,11 @@ public class KafkaConfiguration { return queueBufferingMaxMs; } + /** + * Maximum time to buffer data when using async mode. + * For example a setting of 100 will try to batch together 100ms of messages to send at once. + * This will improve throughput but adds message delivery latency due to the buffering. + */ public void setQueueBufferingMaxMs(Integer queueBufferingMaxMs) { this.queueBufferingMaxMs = queueBufferingMaxMs; } @@ -599,6 +689,10 @@ public class KafkaConfiguration { return queueBufferingMaxMessages; } + /** + * The maximum number of unsent messages that can be queued up the producer when using async + * mode before either the producer must be blocked or data must be dropped. + */ public void setQueueBufferingMaxMessages(Integer queueBufferingMaxMessages) { this.queueBufferingMaxMessages = queueBufferingMaxMessages; } @@ -607,6 +701,11 @@ public class KafkaConfiguration { return queueEnqueueTimeoutMs; } + /** + * The amount of time to block before dropping messages when running in async mode and the buffer has reached + * queue.buffering.max.messages. If set to 0 events will be enqueued immediately or dropped if the queue is full + * (the producer send call will never block). If set to -1 the producer will block indefinitely and never willingly drop a send. + */ public void setQueueEnqueueTimeoutMs(Integer queueEnqueueTimeoutMs) { this.queueEnqueueTimeoutMs = queueEnqueueTimeoutMs; } @@ -615,6 +714,10 @@ public class KafkaConfiguration { return batchNumMessages; } + /** + * The number of messages to send in one batch when using async mode. + * The producer will wait until either this number of messages are ready to send or queue.buffer.max.ms is reached. + */ public void setBatchNumMessages(Integer batchNumMessages) { this.batchNumMessages = batchNumMessages; } @@ -623,6 +726,10 @@ public class KafkaConfiguration { return serializerClass; } + /** + * The serializer class for messages. The default encoder takes a byte[] and returns the same byte[]. + * The default class is kafka.serializer.DefaultEncoder + */ public void setSerializerClass(String serializerClass) { this.serializerClass = serializerClass; } @@ -631,6 +738,9 @@ public class KafkaConfiguration { return keySerializerClass; } + /** + * The serializer class for keys (defaults to the same as for messages if nothing is given). + */ public void setKeySerializerClass(String keySerializerClass) { this.keySerializerClass = keySerializerClass; } http://git-wip-us.apache.org/repos/asf/camel/blob/c3d0f2f2/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java index a424955..bebe6d7 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java @@ -37,19 +37,14 @@ import org.apache.camel.spi.UriPath; @UriEndpoint(scheme = "kafka", title = "Kafka", syntax = "kafka:brokers", consumerClass = KafkaConsumer.class, label = "messaging") public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersSupport { - @UriPath @Metadata(required = "true") - private String brokers; @UriParam private KafkaConfiguration configuration = new KafkaConfiguration(); public KafkaEndpoint() { } - public KafkaEndpoint(String endpointUri, - String remaining, - KafkaComponent component) throws URISyntaxException { + public KafkaEndpoint(String endpointUri, KafkaComponent component) { super(endpointUri, component); - this.brokers = remaining.split("\\?")[0]; } public KafkaConfiguration getConfiguration() { @@ -156,7 +151,11 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS } public String getBrokers() { - return brokers; + return configuration.getBrokers(); + } + + public void setBrokers(String brokers) { + configuration.setBrokers(brokers); } public int getConsumerStreams() { http://git-wip-us.apache.org/repos/asf/camel/blob/c3d0f2f2/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java index 6ac6f81..ed4a6d1 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java @@ -32,7 +32,8 @@ public class KafkaEndpointTest { @Test public void testCreatingKafkaExchangeSetsHeaders() throws URISyntaxException { - KafkaEndpoint endpoint = new KafkaEndpoint("kafka:localhost", "localhost", new KafkaComponent()); + KafkaEndpoint endpoint = new KafkaEndpoint("kafka:localhost", new KafkaComponent()); + endpoint.setBrokers("localhost"); Message message = new Message("mymessage".getBytes(), "somekey".getBytes()); DefaultDecoder decoder = new DefaultDecoder(null); @@ -47,7 +48,8 @@ public class KafkaEndpointTest { @Test public void assertSingleton() throws URISyntaxException { - KafkaEndpoint endpoint = new KafkaEndpoint("kafka:localhost", "localhost", new KafkaComponent()); + KafkaEndpoint endpoint = new KafkaEndpoint("kafka:localhost", new KafkaComponent()); + endpoint.setBrokers("localhost"); assertTrue(endpoint.isSingleton()); } http://git-wip-us.apache.org/repos/asf/camel/blob/c3d0f2f2/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java index d989c96..8e5b0f7 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java @@ -41,8 +41,8 @@ public class KafkaProducerTest { @SuppressWarnings({"unchecked"}) public KafkaProducerTest() throws Exception { - endpoint = new KafkaEndpoint("kafka:broker1:1234,broker2:4567?topic=sometopic", - "broker1:1234," + "broker2:4567?topic=sometopic", null); + endpoint = new KafkaEndpoint("kafka:broker1:1234,broker2:4567?topic=sometopic", null); + endpoint.setBrokers("broker1:1234,broker2:4567"); producer = new KafkaProducer(endpoint); producer.producer = Mockito.mock(Producer.class); }
