This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new e6f0940 CAMEL-11972 - Upgrade to Kafka 1.0.0, added a new option in both Producer and Consumer config e6f0940 is described below commit e6f0940f3c65cf4f26c7b49363f9287a5c496e0a Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Fri Nov 10 11:22:48 2017 +0100 CAMEL-11972 - Upgrade to Kafka 1.0.0, added a new option in both Producer and Consumer config --- .../camel-kafka/src/main/docs/kafka-component.adoc | 3 ++- .../camel/component/kafka/KafkaConfiguration.java | 20 ++++++++++++++++++++ .../camel/component/kafka/KafkaComponentTest.java | 3 +++ .../springboot/KafkaComponentConfiguration.java | 16 ++++++++++++++++ 4 files changed, 41 insertions(+), 1 deletion(-) diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc index c25589b..ff17998 100644 --- a/components/camel-kafka/src/main/docs/kafka-component.adoc +++ b/components/camel-kafka/src/main/docs/kafka-component.adoc @@ -69,13 +69,14 @@ with the following path and query parameters: | *topic* | *Required* Name of the topic to use. On the consumer you can use comma to separate multiple topics. A producer can only send a message to a single topic. | | String |=== -==== Query Parameters (88 parameters): +==== Query Parameters (89 parameters): [width="100%",cols="2,5,^1,2",options="header"] |=== | Name | Description | Default | Type | *brokers* (common) | URL of the Kafka brokers to use. The format is host1:port1host2:port2 and the list can be a subset of brokers or a VIP pointing to a subset of brokers. This option is known as bootstrap.servers in the Kafka documentation. | | String | *clientId* (common) | The client id is a user-specified string sent in each request to help trace calls. It should logically identify the application making the request. | | String +| *reconnectBackoffMaxMs* (common) | The maximum amount of time in milliseconds to wait when reconnecting to a broker that has repeatedly failed to connect. If provided the backoff per host will increase exponentially for each consecutive connection failure up to this maximum. After calculating the backoff increase 20 random jitter is added to avoid connection storms. | 1000 | Integer | *allowManualCommit* (consumer) | Whether to allow doing manual commits via KafkaManualCommit. If this option is enabled then an instance of KafkaManualCommit is stored on the Exchange message header which allows end users to access this API and perform manual offset commits via the Kafka consumer. | false | boolean | *autoCommitEnable* (consumer) | If true periodically commit to ZooKeeper the offset of messages already fetched by the consumer. This committed offset will be used when the process fails as the position from which the new consumer will begin. | true | Boolean | *autoCommitIntervalMs* (consumer) | The frequency in ms that the consumer offsets are committed to zookeeper. | 5000 | Integer diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java index efa51c3..86cd537 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java @@ -214,6 +214,9 @@ public class KafkaConfiguration implements Cloneable { //reconnect.backoff.ms @UriParam(label = "producer", defaultValue = "false") private boolean enableIdempotence; + //reconnect.backoff.max.ms + @UriParam(label = "common", defaultValue = "1000") + private Integer reconnectBackoffMaxMs = 1000; // SSL @UriParam(label = "common,security") @@ -337,6 +340,8 @@ public class KafkaConfiguration implements Cloneable { addPropertyIfNotNull(props, ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, getReconnectBackoffMs()); addPropertyIfNotNull(props, ProducerConfig.RETRY_BACKOFF_MS_CONFIG, getRetryBackoffMs()); addPropertyIfNotNull(props, ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, isEnableIdempotence()); + addPropertyIfNotNull(props, ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, getReconnectBackoffMaxMs()); + // SSL applySslConfiguration(props, getSslContextParameters()); addPropertyIfNotNull(props, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, getSecurityProtocol()); @@ -394,6 +399,7 @@ public class KafkaConfiguration implements Cloneable { addPropertyIfNotNull(props, ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG, getMetricsSampleWindowMs()); addPropertyIfNotNull(props, ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, getReconnectBackoffMs()); addPropertyIfNotNull(props, ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, getRetryBackoffMs()); + addPropertyIfNotNull(props, ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, getReconnectBackoffMaxMs()); // SSL applySslConfiguration(props, getSslContextParameters()); @@ -1558,4 +1564,18 @@ public class KafkaConfiguration implements Cloneable { public void setEnableIdempotence(boolean enableIdempotence) { this.enableIdempotence = enableIdempotence; } + + public Integer getReconnectBackoffMaxMs() { + return reconnectBackoffMaxMs; + } + + /** + * + * The maximum amount of time in milliseconds to wait when reconnecting to a broker that has repeatedly failed to connect. + * If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. + * After calculating the backoff increase, 20% random jitter is added to avoid connection storms. + */ + public void setReconnectBackoffMaxMs(Integer reconnectBackoffMaxMs) { + this.reconnectBackoffMaxMs = reconnectBackoffMaxMs; + } } diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java index fa75551..ebafce4 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java @@ -83,6 +83,7 @@ public class KafkaComponentTest { assertEquals(new Integer(1029), endpoint.getConfiguration().getMetadataMaxAgeMs()); assertEquals(new Integer(23), endpoint.getConfiguration().getReceiveBufferBytes()); assertEquals(new Integer(234), endpoint.getConfiguration().getReconnectBackoffMs()); + assertEquals(new Integer(234), endpoint.getConfiguration().getReconnectBackoffMaxMs()); assertEquals(new Integer(0), endpoint.getConfiguration().getRetries()); assertEquals(3782, endpoint.getConfiguration().getRetryBackoffMs().intValue()); assertEquals(765, endpoint.getConfiguration().getSendBufferBytes().intValue()); @@ -146,6 +147,7 @@ public class KafkaComponentTest { props.put(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG, "2"); props.put(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG, "30000"); props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "50"); + props.put(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, "1000"); props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_SERIALIZER); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_SERIALIZER); @@ -185,6 +187,7 @@ public class KafkaComponentTest { params.put("metadataFetchTimeoutMs", 9043); params.put("metadataMaxAgeMs", 1029); params.put("reconnectBackoffMs", 234); + params.put("reconnectBackoffMaxMs", 234); params.put("retryBackoffMs", 3782); params.put("noOfMetricsSample", 3); params.put("metricReporters", "org.apache.camel.reporters.TestReport,org.apache.camel.reporters.SampleReport"); diff --git a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java index c52e1e1..ad6f219 100644 --- a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java +++ b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java @@ -734,6 +734,14 @@ public class KafkaComponentConfiguration * be set to 'all'. */ private Boolean enableIdempotence = false; + /** + * The maximum amount of time in milliseconds to wait when reconnecting + * to a broker that has repeatedly failed to connect. If provided, the + * backoff per host will increase exponentially for each consecutive + * connection failure, up to this maximum. After calculating the backoff + * increase, 20% random jitter is added to avoid connection storms. + */ + private Integer reconnectBackoffMaxMs = 1000; public Boolean getTopicIsPattern() { return topicIsPattern; @@ -1419,5 +1427,13 @@ public class KafkaComponentConfiguration public void setEnableIdempotence(Boolean enableIdempotence) { this.enableIdempotence = enableIdempotence; } + + public Integer getReconnectBackoffMaxMs() { + return reconnectBackoffMaxMs; + } + + public void setReconnectBackoffMaxMs(Integer reconnectBackoffMaxMs) { + this.reconnectBackoffMaxMs = reconnectBackoffMaxMs; + } } } \ No newline at end of file -- To stop receiving notification emails like this one, please contact ['"commits@camel.apache.org" <commits@camel.apache.org>'].