Repository: camel Updated Branches: refs/heads/master 411f51f27 -> 8b9791f8b
CAMEL-9978: Camel-Kafka: configuration type mismatch for parameter acks Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8b9791f8 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8b9791f8 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8b9791f8 Branch: refs/heads/master Commit: 8b9791f8b4b1154d6a053de901a5efd2105409e5 Parents: 411f51f Author: Andrea Cosentino <anco...@gmail.com> Authored: Fri May 20 14:17:53 2016 +0200 Committer: Andrea Cosentino <anco...@gmail.com> Committed: Fri May 20 14:17:53 2016 +0200 ---------------------------------------------------------------------- components/camel-kafka/src/main/docs/kafka.adoc | 4 +++- .../org/apache/camel/component/kafka/KafkaConfiguration.java | 6 +++--- .../java/org/apache/camel/component/kafka/KafkaEndpoint.java | 4 ++-- .../org/apache/camel/component/kafka/KafkaComponentTest.java | 4 ++-- 4 files changed, 10 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/8b9791f8/components/camel-kafka/src/main/docs/kafka.adoc ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/docs/kafka.adoc b/components/camel-kafka/src/main/docs/kafka.adoc index 2811a62..2e90613 100644 --- a/components/camel-kafka/src/main/docs/kafka.adoc +++ b/components/camel-kafka/src/main/docs/kafka.adoc @@ -84,6 +84,7 @@ The Kafka component supports 1 options which are listed below. + // endpoint options: START The Kafka component supports 74 endpoint options which are listed below: @@ -138,7 +139,7 @@ The Kafka component supports 74 endpoint options which are listed below: | queueBufferingMaxMessages | producer | 10000 | Integer | The maximum number of unsent messages that can be queued up the producer when using async mode before either the producer must be blocked or data must be dropped. | receiveBufferBytes | producer | 32768 | Integer | The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. | reconnectBackoffMs | producer | 50 | Integer | The amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all requests sent by the consumer to the broker. -| requestRequiredAcks | producer | 1 | Integer | The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are common: acks=0 If set to zero then the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case and the retries configuration will not take effect (as the client won't generally know of any failures). The offset given back for each record will always be set to -1. acks=1 This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost. acks=all Thi s means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee. +| requestRequiredAcks | producer | 1 | String | The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are common: acks=0 If set to zero then the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case and the retries configuration will not take effect (as the client won't generally know of any failures). The offset given back for each record will always be set to -1. acks=1 This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost. acks=all This means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee. | requestTimeoutMs | producer | 30000 | Integer | The amount of time the broker will wait trying to meet the request.required.acks requirement before sending back an error to the client. | retries | producer | 0 | Integer | Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries will potentially change the ordering of records because if two records are sent to a single partition and the first fails and is retried but the second succeeds then the second record may appear first. | retryBackoffMs | producer | 100 | Integer | Before each retry the producer refreshes the metadata of relevant topics to see if a new leader has been elected. Since leader election takes a bit of time this property specifies the amount of time that the producer waits before refreshing the metadata. @@ -175,6 +176,7 @@ The Kafka component supports 74 endpoint options which are listed below: + For more information about Producer/Consumer configuration: http://kafka.apache.org/documentation.html#newconsumerconfigs[http://kafka.apache.org/documentation.html#newconsumerconfigs] http://git-wip-us.apache.org/repos/asf/camel/blob/8b9791f8/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java index 1a068c3..9e3b39d 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java @@ -120,7 +120,7 @@ public class KafkaConfiguration { private String keySerializerClass; @UriParam(label = "producer", defaultValue = "1") - private Integer requestRequiredAcks = 1; + private String requestRequiredAcks = "1"; //buffer.memory @UriParam(label = "producer", defaultValue = "33554432") private Integer bufferMemorySize = 33554432; @@ -867,7 +867,7 @@ public class KafkaConfiguration { this.bufferMemorySize = bufferMemorySize; } - public Integer getRequestRequiredAcks() { + public String getRequestRequiredAcks() { return requestRequiredAcks; } @@ -884,7 +884,7 @@ public class KafkaConfiguration { * acks=all This means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the * record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee. */ - public void setRequestRequiredAcks(Integer requestRequiredAcks) { + public void setRequestRequiredAcks(String requestRequiredAcks) { this.requestRequiredAcks = requestRequiredAcks; } http://git-wip-us.apache.org/repos/asf/camel/blob/8b9791f8/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java index eb03493..ec75c4b 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java @@ -419,7 +419,7 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS configuration.setSslKeyPassword(sslKeyPassword); } - public Integer getRequestRequiredAcks() { + public String getRequestRequiredAcks() { return configuration.getRequestRequiredAcks(); } @@ -479,7 +479,7 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS return configuration.getSslCipherSuites(); } - public void setRequestRequiredAcks(Integer requestRequiredAcks) { + public void setRequestRequiredAcks(String requestRequiredAcks) { configuration.setRequestRequiredAcks(requestRequiredAcks); } http://git-wip-us.apache.org/repos/asf/camel/blob/8b9791f8/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java index 1c2c564..6a3773a 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java @@ -59,7 +59,7 @@ public class KafkaComponentTest { KafkaEndpoint endpoint = new KafkaComponent(context).createEndpoint(uri, remaining, params); - assertEquals(new Integer(0), endpoint.getRequestRequiredAcks()); + assertEquals("1", endpoint.getRequestRequiredAcks()); assertEquals(new Integer(1), endpoint.getBufferMemorySize()); assertEquals(new Integer(10), endpoint.getProducerBatchSize()); assertEquals(new Integer(12), endpoint.getConnectionMaxIdleMs()); @@ -155,7 +155,7 @@ public class KafkaComponentTest { } private void setProducerProperty(Map<String, Object> params) { - params.put("requestRequiredAcks", 0); + params.put("requestRequiredAcks", "1"); params.put("bufferMemorySize", 1); params.put("compressionCodec", "none"); params.put("retries", 0);