This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new e6f0940  CAMEL-11972 - Upgrade to Kafka 1.0.0, added a new option in 
both Producer and Consumer config
e6f0940 is described below

commit e6f0940f3c65cf4f26c7b49363f9287a5c496e0a
Author: Andrea Cosentino <anco...@gmail.com>
AuthorDate: Fri Nov 10 11:22:48 2017 +0100

    CAMEL-11972 - Upgrade to Kafka 1.0.0, added a new option in both Producer 
and Consumer config
---
 .../camel-kafka/src/main/docs/kafka-component.adoc   |  3 ++-
 .../camel/component/kafka/KafkaConfiguration.java    | 20 ++++++++++++++++++++
 .../camel/component/kafka/KafkaComponentTest.java    |  3 +++
 .../springboot/KafkaComponentConfiguration.java      | 16 ++++++++++++++++
 4 files changed, 41 insertions(+), 1 deletion(-)

diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc 
b/components/camel-kafka/src/main/docs/kafka-component.adoc
index c25589b..ff17998 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -69,13 +69,14 @@ with the following path and query parameters:
 | *topic* | *Required* Name of the topic to use. On the consumer you can use 
comma to separate multiple topics. A producer can only send a message to a 
single topic. |  | String
 |===
 
-==== Query Parameters (88 parameters):
+==== Query Parameters (89 parameters):
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |===
 | Name | Description | Default | Type
 | *brokers* (common) | URL of the Kafka brokers to use. The format is 
host1:port1host2:port2 and the list can be a subset of brokers or a VIP 
pointing to a subset of brokers. This option is known as bootstrap.servers in 
the Kafka documentation. |  | String
 | *clientId* (common) | The client id is a user-specified string sent in each 
request to help trace calls. It should logically identify the application 
making the request. |  | String
+| *reconnectBackoffMaxMs* (common) | The maximum amount of time in 
milliseconds to wait when reconnecting to a broker that has repeatedly failed 
to connect. If provided the backoff per host will increase exponentially for 
each consecutive connection failure up to this maximum. After calculating the 
backoff increase 20 random jitter is added to avoid connection storms. | 1000 | 
Integer
 | *allowManualCommit* (consumer) | Whether to allow doing manual commits via 
KafkaManualCommit. If this option is enabled then an instance of 
KafkaManualCommit is stored on the Exchange message header which allows end 
users to access this API and perform manual offset commits via the Kafka 
consumer. | false | boolean
 | *autoCommitEnable* (consumer) | If true periodically commit to ZooKeeper the 
offset of messages already fetched by the consumer. This committed offset will 
be used when the process fails as the position from which the new consumer will 
begin. | true | Boolean
 | *autoCommitIntervalMs* (consumer) | The frequency in ms that the consumer 
offsets are committed to zookeeper. | 5000 | Integer
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index efa51c3..86cd537 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -214,6 +214,9 @@ public class KafkaConfiguration implements Cloneable {
     //reconnect.backoff.ms
     @UriParam(label = "producer", defaultValue = "false")
     private boolean enableIdempotence;
+    //reconnect.backoff.max.ms
+    @UriParam(label = "common", defaultValue = "1000")
+    private Integer reconnectBackoffMaxMs = 1000;
 
     // SSL
     @UriParam(label = "common,security")
@@ -337,6 +340,8 @@ public class KafkaConfiguration implements Cloneable {
         addPropertyIfNotNull(props, 
ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, getReconnectBackoffMs());
         addPropertyIfNotNull(props, ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 
getRetryBackoffMs());
         addPropertyIfNotNull(props, ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
isEnableIdempotence());
+        addPropertyIfNotNull(props, 
ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, getReconnectBackoffMaxMs());
+        
         // SSL
         applySslConfiguration(props, getSslContextParameters());
         addPropertyIfNotNull(props, 
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, getSecurityProtocol());
@@ -394,6 +399,7 @@ public class KafkaConfiguration implements Cloneable {
         addPropertyIfNotNull(props, 
ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG, getMetricsSampleWindowMs());
         addPropertyIfNotNull(props, 
ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, getReconnectBackoffMs());
         addPropertyIfNotNull(props, ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, 
getRetryBackoffMs());
+        addPropertyIfNotNull(props, 
ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, getReconnectBackoffMaxMs());
         
         // SSL
         applySslConfiguration(props, getSslContextParameters());
@@ -1558,4 +1564,18 @@ public class KafkaConfiguration implements Cloneable {
     public void setEnableIdempotence(boolean enableIdempotence) {
         this.enableIdempotence = enableIdempotence;
     }
+
+       public Integer getReconnectBackoffMaxMs() {
+               return reconnectBackoffMaxMs;
+       }
+
+       /**
+        * 
+     * The maximum amount of time in milliseconds to wait when reconnecting to 
a broker that has repeatedly failed to connect. 
+     * If provided, the backoff per host will increase exponentially for each 
consecutive connection failure, up to this maximum. 
+     * After calculating the backoff increase, 20% random jitter is added to 
avoid connection storms.
+        */
+       public void setReconnectBackoffMaxMs(Integer reconnectBackoffMaxMs) {
+               this.reconnectBackoffMaxMs = reconnectBackoffMaxMs;
+       }
 }
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
index fa75551..ebafce4 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
@@ -83,6 +83,7 @@ public class KafkaComponentTest {
         assertEquals(new Integer(1029), 
endpoint.getConfiguration().getMetadataMaxAgeMs());
         assertEquals(new Integer(23), 
endpoint.getConfiguration().getReceiveBufferBytes());
         assertEquals(new Integer(234), 
endpoint.getConfiguration().getReconnectBackoffMs());
+        assertEquals(new Integer(234), 
endpoint.getConfiguration().getReconnectBackoffMaxMs());
         assertEquals(new Integer(0), endpoint.getConfiguration().getRetries());
         assertEquals(3782, 
endpoint.getConfiguration().getRetryBackoffMs().intValue());
         assertEquals(765, 
endpoint.getConfiguration().getSendBufferBytes().intValue());
@@ -146,6 +147,7 @@ public class KafkaComponentTest {
         props.put(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG, "2");
         props.put(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG, "30000");
         props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "50");
+        props.put(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, "1000");
         props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100");
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
KafkaConstants.KAFKA_DEFAULT_SERIALIZER);
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
KafkaConstants.KAFKA_DEFAULT_SERIALIZER);
@@ -185,6 +187,7 @@ public class KafkaComponentTest {
         params.put("metadataFetchTimeoutMs", 9043);
         params.put("metadataMaxAgeMs", 1029);
         params.put("reconnectBackoffMs", 234);
+        params.put("reconnectBackoffMaxMs", 234);
         params.put("retryBackoffMs", 3782);
         params.put("noOfMetricsSample", 3);
         params.put("metricReporters", 
"org.apache.camel.reporters.TestReport,org.apache.camel.reporters.SampleReport");
diff --git 
a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
 
b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
index c52e1e1..ad6f219 100644
--- 
a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
+++ 
b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
@@ -734,6 +734,14 @@ public class KafkaComponentConfiguration
          * be set to 'all'.
          */
         private Boolean enableIdempotence = false;
+        /**
+         * The maximum amount of time in milliseconds to wait when reconnecting
+         * to a broker that has repeatedly failed to connect. If provided, the
+         * backoff per host will increase exponentially for each consecutive
+         * connection failure, up to this maximum. After calculating the 
backoff
+         * increase, 20% random jitter is added to avoid connection storms.
+         */
+        private Integer reconnectBackoffMaxMs = 1000;
 
         public Boolean getTopicIsPattern() {
             return topicIsPattern;
@@ -1419,5 +1427,13 @@ public class KafkaComponentConfiguration
         public void setEnableIdempotence(Boolean enableIdempotence) {
             this.enableIdempotence = enableIdempotence;
         }
+
+        public Integer getReconnectBackoffMaxMs() {
+            return reconnectBackoffMaxMs;
+        }
+
+        public void setReconnectBackoffMaxMs(Integer reconnectBackoffMaxMs) {
+            this.reconnectBackoffMaxMs = reconnectBackoffMaxMs;
+        }
     }
 }
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
['"commits@camel.apache.org" <commits@camel.apache.org>'].

Reply via email to