Repository: camel
Updated Branches:
  refs/heads/master 411f51f27 -> 8b9791f8b


CAMEL-9978: Camel-Kafka: configuration type mismatch for parameter acks


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8b9791f8
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8b9791f8
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8b9791f8

Branch: refs/heads/master
Commit: 8b9791f8b4b1154d6a053de901a5efd2105409e5
Parents: 411f51f
Author: Andrea Cosentino <anco...@gmail.com>
Authored: Fri May 20 14:17:53 2016 +0200
Committer: Andrea Cosentino <anco...@gmail.com>
Committed: Fri May 20 14:17:53 2016 +0200

----------------------------------------------------------------------
 components/camel-kafka/src/main/docs/kafka.adoc                | 4 +++-
 .../org/apache/camel/component/kafka/KafkaConfiguration.java   | 6 +++---
 .../java/org/apache/camel/component/kafka/KafkaEndpoint.java   | 4 ++--
 .../org/apache/camel/component/kafka/KafkaComponentTest.java   | 4 ++--
 4 files changed, 10 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/8b9791f8/components/camel-kafka/src/main/docs/kafka.adoc
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/docs/kafka.adoc 
b/components/camel-kafka/src/main/docs/kafka.adoc
index 2811a62..2e90613 100644
--- a/components/camel-kafka/src/main/docs/kafka.adoc
+++ b/components/camel-kafka/src/main/docs/kafka.adoc
@@ -84,6 +84,7 @@ The Kafka component supports 1 options which are listed below.
 
 
 
+
 // endpoint options: START
 The Kafka component supports 74 endpoint options which are listed below:
 
@@ -138,7 +139,7 @@ The Kafka component supports 74 endpoint options which are 
listed below:
 | queueBufferingMaxMessages | producer | 10000 | Integer | The maximum number 
of unsent messages that can be queued up the producer when using async mode 
before either the producer must be blocked or data must be dropped.
 | receiveBufferBytes | producer | 32768 | Integer | The size of the TCP 
receive buffer (SO_RCVBUF) to use when reading data.
 | reconnectBackoffMs | producer | 50 | Integer | The amount of time to wait 
before attempting to reconnect to a given host. This avoids repeatedly 
connecting to a host in a tight loop. This backoff applies to all requests sent 
by the consumer to the broker.
-| requestRequiredAcks | producer | 1 | Integer | The number of acknowledgments 
the producer requires the leader to have received before considering a request 
complete. This controls the durability of records that are sent. The following 
settings are common: acks=0 If set to zero then the producer will not wait for 
any acknowledgment from the server at all. The record will be immediately added 
to the socket buffer and considered sent. No guarantee can be made that the 
server has received the record in this case and the retries configuration will 
not take effect (as the client won't generally know of any failures). The 
offset given back for each record will always be set to -1. acks=1 This will 
mean the leader will write the record to its local log but will respond without 
awaiting full acknowledgement from all followers. In this case should the 
leader fail immediately after acknowledging the record but before the followers 
have replicated it then the record will be lost. acks=all Thi
 s means the leader will wait for the full set of in-sync replicas to 
acknowledge the record. This guarantees that the record will not be lost as 
long as at least one in-sync replica remains alive. This is the strongest 
available guarantee.
+| requestRequiredAcks | producer | 1 | String | The number of acknowledgments 
the producer requires the leader to have received before considering a request 
complete. This controls the durability of records that are sent. The following 
settings are common: acks=0 If set to zero then the producer will not wait for 
any acknowledgment from the server at all. The record will be immediately added 
to the socket buffer and considered sent. No guarantee can be made that the 
server has received the record in this case and the retries configuration will 
not take effect (as the client won't generally know of any failures). The 
offset given back for each record will always be set to -1. acks=1 This will 
mean the leader will write the record to its local log but will respond without 
awaiting full acknowledgement from all followers. In this case should the 
leader fail immediately after acknowledging the record but before the followers 
have replicated it then the record will be lost. acks=all This
  means the leader will wait for the full set of in-sync replicas to 
acknowledge the record. This guarantees that the record will not be lost as 
long as at least one in-sync replica remains alive. This is the strongest 
available guarantee.
 | requestTimeoutMs | producer | 30000 | Integer | The amount of time the 
broker will wait trying to meet the request.required.acks requirement before 
sending back an error to the client.
 | retries | producer | 0 | Integer | Setting a value greater than zero will 
cause the client to resend any record whose send fails with a potentially 
transient error. Note that this retry is no different than if the client resent 
the record upon receiving the error. Allowing retries will potentially change 
the ordering of records because if two records are sent to a single partition 
and the first fails and is retried but the second succeeds then the second 
record may appear first.
 | retryBackoffMs | producer | 100 | Integer | Before each retry the producer 
refreshes the metadata of relevant topics to see if a new leader has been 
elected. Since leader election takes a bit of time this property specifies the 
amount of time that the producer waits before refreshing the metadata.
@@ -175,6 +176,7 @@ The Kafka component supports 74 endpoint options which are 
listed below:
 
 
 
+
 For more information about Producer/Consumer configuration:
 
 
http://kafka.apache.org/documentation.html#newconsumerconfigs[http://kafka.apache.org/documentation.html#newconsumerconfigs]

http://git-wip-us.apache.org/repos/asf/camel/blob/8b9791f8/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 1a068c3..9e3b39d 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -120,7 +120,7 @@ public class KafkaConfiguration {
     private String keySerializerClass;
 
     @UriParam(label = "producer", defaultValue = "1")
-    private Integer requestRequiredAcks = 1;
+    private String requestRequiredAcks = "1";
     //buffer.memory
     @UriParam(label = "producer", defaultValue = "33554432")
     private Integer bufferMemorySize = 33554432;
@@ -867,7 +867,7 @@ public class KafkaConfiguration {
         this.bufferMemorySize = bufferMemorySize;
     }
 
-    public Integer getRequestRequiredAcks() {
+    public String getRequestRequiredAcks() {
         return requestRequiredAcks;
     }
 
@@ -884,7 +884,7 @@ public class KafkaConfiguration {
      * acks=all This means the leader will wait for the full set of in-sync 
replicas to acknowledge the record. This guarantees that the
      * record will not be lost as long as at least one in-sync replica remains 
alive. This is the strongest available guarantee.
      */
-    public void setRequestRequiredAcks(Integer requestRequiredAcks) {
+    public void setRequestRequiredAcks(String requestRequiredAcks) {
         this.requestRequiredAcks = requestRequiredAcks;
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/8b9791f8/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index eb03493..ec75c4b 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -419,7 +419,7 @@ public class KafkaEndpoint extends DefaultEndpoint 
implements MultipleConsumersS
         configuration.setSslKeyPassword(sslKeyPassword);
     }
 
-    public Integer getRequestRequiredAcks() {
+    public String getRequestRequiredAcks() {
         return configuration.getRequestRequiredAcks();
     }
 
@@ -479,7 +479,7 @@ public class KafkaEndpoint extends DefaultEndpoint 
implements MultipleConsumersS
         return configuration.getSslCipherSuites();
     }
 
-    public void setRequestRequiredAcks(Integer requestRequiredAcks) {
+    public void setRequestRequiredAcks(String requestRequiredAcks) {
         configuration.setRequestRequiredAcks(requestRequiredAcks);
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/8b9791f8/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
index 1c2c564..6a3773a 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
@@ -59,7 +59,7 @@ public class KafkaComponentTest {
 
         KafkaEndpoint endpoint = new 
KafkaComponent(context).createEndpoint(uri, remaining, params);
 
-        assertEquals(new Integer(0), endpoint.getRequestRequiredAcks());
+        assertEquals("1", endpoint.getRequestRequiredAcks());
         assertEquals(new Integer(1), endpoint.getBufferMemorySize());
         assertEquals(new Integer(10), endpoint.getProducerBatchSize());
         assertEquals(new Integer(12), endpoint.getConnectionMaxIdleMs());
@@ -155,7 +155,7 @@ public class KafkaComponentTest {
     }
 
     private void setProducerProperty(Map<String, Object> params) {
-        params.put("requestRequiredAcks", 0);
+        params.put("requestRequiredAcks", "1");
         params.put("bufferMemorySize", 1);
         params.put("compressionCodec", "none");
         params.put("retries", 0);

Reply via email to