CAMEL-11542 - camel-kafka - Add any new options from kafka 0.11.0 to the 
endpoint


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0753f11f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0753f11f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0753f11f

Branch: refs/heads/master
Commit: 0753f11f1070b586555dedd171d590be737d43ba
Parents: 7b921b0
Author: Andrea Cosentino <anco...@gmail.com>
Authored: Wed Jul 26 09:44:39 2017 +0200
Committer: Andrea Cosentino <anco...@gmail.com>
Committed: Wed Jul 26 09:48:38 2017 +0200

----------------------------------------------------------------------
 .../src/main/docs/kafka-component.adoc          |  3 +-
 .../component/kafka/KafkaConfiguration.java     | 96 +++++++++++---------
 .../component/kafka/KafkaComponentTest.java     |  1 +
 .../springboot/KafkaComponentConfiguration.java | 17 ++++
 4 files changed, 75 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/0753f11f/components/camel-kafka/src/main/docs/kafka-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc 
b/components/camel-kafka/src/main/docs/kafka-component.adoc
index 7b01d05..e67d7a7 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -67,7 +67,7 @@ with the following path and query parameters:
 | **topic** | *Required* Name of the topic to use. On the consumer you can use 
comma to separate multiple topics. A producer can only send a message to a 
single topic. |  | String
 |=======================================================================
 
-#### Query Parameters (84 parameters):
+#### Query Parameters (85 parameters):
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |=======================================================================
@@ -105,6 +105,7 @@ with the following path and query parameters:
 | **circularTopicDetection** (producer) | If the option is true then 
KafkaProducer will detect if the message is attempted to be sent back to the 
same topic it may come from if the message was original from a kafka consumer. 
If the KafkaConstants.TOPIC header is the same as the original kafka consumer 
topic then the header setting is ignored and the topic of the producer endpoint 
is used. In other words this avoids sending the same message back to where it 
came from. This option is not in use if the option bridgeEndpoint is set to 
true. | true | boolean
 | **compressionCodec** (producer) | This parameter allows you to specify the 
compression codec for all data generated by this producer. Valid values are 
none gzip and snappy. | none | String
 | **connectionMaxIdleMs** (producer) | Close idle connections after the number 
of milliseconds specified by this config. | 540000 | Integer
+| **enableIdempotence** (producer) | If set to 'true' the producer will ensure 
that exactly one copy of each message is written in the stream. If 'false' 
producer retries may write duplicates of the retried message in the stream. If 
set to true this option will require max.in.flight.requests.per.connection to 
be set to 1 and retries cannot be zero and additionally acks must be set to 
'all'. | false | boolean
 | **key** (producer) | The record key (or null if no key is specified). If 
this option has been configured then it take precedence over header link 
KafkaConstantsKEY |  | String
 | **keySerializerClass** (producer) | The serializer class for keys (defaults 
to the same as for messages if nothing is given). | 
org.apache.kafka.common.serialization.StringSerializer | String
 | **lingerMs** (producer) | The producer groups together any records that 
arrive in between request transmissions into a single batched request. Normally 
this occurs only under load when records arrive faster than they can be sent 
out. However in some circumstances the client may want to reduce the number of 
requests even under moderate load. This setting accomplishes this by adding a 
small amount of artificial delaythat is rather than immediately sending out a 
record the producer will wait for up to the given delay to allow other records 
to be sent so that the sends can be batched together. This can be thought of as 
analogous to Nagle's algorithm in TCP. This setting gives the upper bound on 
the delay for batching: once we get batch.size worth of records for a partition 
it will be sent immediately regardless of this setting however if we have fewer 
than this many bytes accumulated for this partition we will 'linger' for the 
specified time waiting for more records to show up. This s
 etting defaults to 0 (i.e. no delay). Setting linger.ms=5 for example would 
have the effect of reducing the number of requests sent but would add up to 5ms 
of latency to records sent in the absense of load. | 0 | Integer

http://git-wip-us.apache.org/repos/asf/camel/blob/0753f11f/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index ddb04a9..ae54443 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -206,6 +206,10 @@ public class KafkaConfiguration implements Cloneable {
     //reconnect.backoff.ms
     @UriParam(label = "producer", defaultValue = "50")
     private Integer reconnectBackoffMs = 50;
+    //enable.idempotence
+    //reconnect.backoff.ms
+    @UriParam(label = "producer", defaultValue = "false")
+    private boolean enableIdempotence = false;
 
     // SSL
     @UriParam(label = "common,security")
@@ -307,13 +311,6 @@ public class KafkaConfiguration implements Cloneable {
         addPropertyIfNotNull(props, ProducerConfig.COMPRESSION_TYPE_CONFIG, 
getCompressionCodec());
         addPropertyIfNotNull(props, ProducerConfig.RETRIES_CONFIG, 
getRetries());
         addPropertyIfNotNull(props, ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, 
getInterceptorClasses());
-        // SSL
-        applySslConfiguration(props, getSslContextParameters());
-        addPropertyIfNotNull(props, SslConfigs.SSL_KEY_PASSWORD_CONFIG, 
getSslKeyPassword());
-        addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, 
getSslKeystoreLocation());
-        addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, 
getSslKeystorePassword());
-        addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, 
getSslTruststoreLocation());
-        addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, 
getSslTruststorePassword());
         addPropertyIfNotNull(props, ProducerConfig.SEND_BUFFER_CONFIG, 
getRetries());
         addPropertyIfNotNull(props, ProducerConfig.BATCH_SIZE_CONFIG, 
getProducerBatchSize());
         addPropertyIfNotNull(props, ProducerConfig.CLIENT_ID_CONFIG, 
getClientId());
@@ -324,15 +321,7 @@ public class KafkaConfiguration implements Cloneable {
         addPropertyIfNotNull(props, ProducerConfig.PARTITIONER_CLASS_CONFIG, 
getPartitioner());
         addPropertyIfNotNull(props, ProducerConfig.RECEIVE_BUFFER_CONFIG, 
getReceiveBufferBytes());
         addPropertyIfNotNull(props, ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 
getRequestTimeoutMs());
-        // Security protocol
-        addPropertyIfNotNull(props, 
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, getSecurityProtocol());
         addPropertyIfNotNull(props, ProducerConfig.SEND_BUFFER_CONFIG, 
getSendBufferBytes());
-        //SSL
-        addPropertyIfNotNull(props, SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, 
getSslEnabledProtocols());
-        addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, 
getSslKeystoreType());
-        addPropertyIfNotNull(props, SslConfigs.SSL_PROTOCOL_CONFIG, 
getSslProtocol());
-        addPropertyIfNotNull(props, SslConfigs.SSL_PROVIDER_CONFIG, 
getSslProvider());
-        addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, 
getSslTruststoreType());
         addPropertyIfNotNull(props, 
ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, getMaxInFlightRequest());
         addPropertyIfNotNull(props, ProducerConfig.METADATA_MAX_AGE_CONFIG, 
getMetadataMaxAgeMs());
         addPropertyIfNotNull(props, 
ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, getMetricReporters());
@@ -340,6 +329,24 @@ public class KafkaConfiguration implements Cloneable {
         addPropertyIfNotNull(props, 
ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG, getMetricsSampleWindowMs());
         addPropertyIfNotNull(props, 
ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, getReconnectBackoffMs());
         addPropertyIfNotNull(props, ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 
getRetryBackoffMs());
+        addPropertyIfNotNull(props, ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
isEnableIdempotence());
+        // SSL
+        applySslConfiguration(props, getSslContextParameters());
+        addPropertyIfNotNull(props, 
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, getSecurityProtocol());
+        addPropertyIfNotNull(props, SslConfigs.SSL_KEY_PASSWORD_CONFIG, 
getSslKeyPassword());
+        addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, 
getSslKeystoreLocation());
+        addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, 
getSslKeystorePassword());
+        addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, 
getSslTruststoreLocation());
+        addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, 
getSslTruststorePassword());
+        addPropertyIfNotNull(props, SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, 
getSslEnabledProtocols());
+        addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, 
getSslKeystoreType());
+        addPropertyIfNotNull(props, SslConfigs.SSL_PROTOCOL_CONFIG, 
getSslProtocol());
+        addPropertyIfNotNull(props, SslConfigs.SSL_PROVIDER_CONFIG, 
getSslProvider());
+        addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, 
getSslTruststoreType());
+        addPropertyIfNotNull(props, SslConfigs.SSL_CIPHER_SUITES_CONFIG, 
getSslCipherSuites());
+        addPropertyIfNotNull(props, 
SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, 
getSslEndpointAlgorithm());
+        addPropertyIfNotNull(props, 
SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, getSslKeymanagerAlgorithm());
+        addPropertyIfNotNull(props, 
SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, getSslTrustmanagerAlgorithm());
         //SASL
         addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_SERVICE_NAME, 
getSaslKerberosServiceName());
         addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_KINIT_CMD, 
getKerberosInitCmd());
@@ -348,11 +355,6 @@ public class KafkaConfiguration implements Cloneable {
         addPropertyIfNotNull(props, 
SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, 
getKerberosRenewWindowFactor());
         addListPropertyIfNotNull(props, 
SaslConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES, 
getKerberosPrincipalToLocalRules());
         addPropertyIfNotNull(props, SaslConfigs.SASL_MECHANISM, 
getSaslMechanism());
-        //SSL
-        addPropertyIfNotNull(props, SslConfigs.SSL_CIPHER_SUITES_CONFIG, 
getSslCipherSuites());
-        addPropertyIfNotNull(props, 
SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, 
getSslEndpointAlgorithm());
-        addPropertyIfNotNull(props, 
SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, getSslKeymanagerAlgorithm());
-        addPropertyIfNotNull(props, 
SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, getSslTrustmanagerAlgorithm());
 
         return props;
     }
@@ -368,28 +370,12 @@ public class KafkaConfiguration implements Cloneable {
         addPropertyIfNotNull(props, ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
getSessionTimeoutMs());
         addPropertyIfNotNull(props, ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
getMaxPollRecords());
         addPropertyIfNotNull(props, ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, 
getInterceptorClasses());
-        // SSL
-        applySslConfiguration(props, getSslContextParameters());
-        addPropertyIfNotNull(props, SslConfigs.SSL_KEY_PASSWORD_CONFIG, 
getSslKeyPassword());
-        addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, 
getSslKeystoreLocation());
-        addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, 
getSslKeystorePassword());
-        addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, 
getSslTruststoreLocation());
-        addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, 
getSslTruststorePassword());
         addPropertyIfNotNull(props, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
getAutoOffsetReset());
         addPropertyIfNotNull(props, 
ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, getConnectionMaxIdleMs());
         addPropertyIfNotNull(props, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
isAutoCommitEnable());
         addPropertyIfNotNull(props, 
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, getPartitionAssignor());
         addPropertyIfNotNull(props, ConsumerConfig.RECEIVE_BUFFER_CONFIG, 
getReceiveBufferBytes());
         addPropertyIfNotNull(props, ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 
getConsumerRequestTimeoutMs());
-        // Security protocol
-        addPropertyIfNotNull(props, 
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, getSecurityProtocol());
-        addPropertyIfNotNull(props, ProducerConfig.SEND_BUFFER_CONFIG, 
getSendBufferBytes());
-        //SSL
-        addPropertyIfNotNull(props, SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, 
getSslEnabledProtocols());
-        addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, 
getSslKeystoreType());
-        addPropertyIfNotNull(props, SslConfigs.SSL_PROTOCOL_CONFIG, 
getSslProtocol());
-        addPropertyIfNotNull(props, SslConfigs.SSL_PROVIDER_CONFIG, 
getSslProvider());
-        addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, 
getSslTruststoreType());
         addPropertyIfNotNull(props, 
ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, getAutoCommitIntervalMs());
         addPropertyIfNotNull(props, ConsumerConfig.CHECK_CRCS_CONFIG, 
getCheckCrcs());
         addPropertyIfNotNull(props, ConsumerConfig.CLIENT_ID_CONFIG, 
getClientId());
@@ -400,6 +386,26 @@ public class KafkaConfiguration implements Cloneable {
         addPropertyIfNotNull(props, 
ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG, getMetricsSampleWindowMs());
         addPropertyIfNotNull(props, 
ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, getReconnectBackoffMs());
         addPropertyIfNotNull(props, ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, 
getRetryBackoffMs());
+        
+        // SSL
+        applySslConfiguration(props, getSslContextParameters());
+        addPropertyIfNotNull(props, SslConfigs.SSL_KEY_PASSWORD_CONFIG, 
getSslKeyPassword());
+        addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, 
getSslKeystoreLocation());
+        addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, 
getSslKeystorePassword());
+        addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, 
getSslTruststoreLocation());
+        addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, 
getSslTruststorePassword());
+        addPropertyIfNotNull(props, SslConfigs.SSL_CIPHER_SUITES_CONFIG, 
getSslCipherSuites());
+        addPropertyIfNotNull(props, 
SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, 
getSslEndpointAlgorithm());
+        addPropertyIfNotNull(props, 
SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, getSslKeymanagerAlgorithm());
+        addPropertyIfNotNull(props, 
SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, getSslTrustmanagerAlgorithm());
+        addPropertyIfNotNull(props, SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, 
getSslEnabledProtocols());
+        addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, 
getSslKeystoreType());
+        addPropertyIfNotNull(props, SslConfigs.SSL_PROTOCOL_CONFIG, 
getSslProtocol());
+        addPropertyIfNotNull(props, SslConfigs.SSL_PROVIDER_CONFIG, 
getSslProvider());
+        addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, 
getSslTruststoreType());
+        // Security protocol
+        addPropertyIfNotNull(props, 
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, getSecurityProtocol());
+        addPropertyIfNotNull(props, ProducerConfig.SEND_BUFFER_CONFIG, 
getSendBufferBytes());
         //SASL
         addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_SERVICE_NAME, 
getSaslKerberosServiceName());
         addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_KINIT_CMD, 
getKerberosInitCmd());
@@ -408,11 +414,6 @@ public class KafkaConfiguration implements Cloneable {
         addPropertyIfNotNull(props, 
SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, 
getKerberosRenewWindowFactor());
         addListPropertyIfNotNull(props, 
SaslConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES, 
getKerberosPrincipalToLocalRules());
         addPropertyIfNotNull(props, SaslConfigs.SASL_MECHANISM, 
getSaslMechanism());
-        //SSL
-        addPropertyIfNotNull(props, SslConfigs.SSL_CIPHER_SUITES_CONFIG, 
getSslCipherSuites());
-        addPropertyIfNotNull(props, 
SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, 
getSslEndpointAlgorithm());
-        addPropertyIfNotNull(props, 
SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, getSslKeymanagerAlgorithm());
-        addPropertyIfNotNull(props, 
SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, getSslTrustmanagerAlgorithm());
         return props;
     }
 
@@ -1495,4 +1496,17 @@ public class KafkaConfiguration implements Cloneable {
     public void setInterceptorClasses(String interceptorClasses) {
         this.interceptorClasses = interceptorClasses;
     }
+
+    public boolean isEnableIdempotence() {
+        return enableIdempotence;
+    }
+
+    /**
+     * If set to 'true' the producer will ensure that exactly one copy of each 
message is written in the stream. If 'false', producer 
+     * retries may write duplicates of the retried message in the stream. If 
set to true this option will require max.in.flight.requests.per.connection to 
be set to 1 and 
+     * retries cannot be zero and additionally acks must be set to 'all'. 
+     */
+    public void setEnableIdempotence(boolean enableIdempotence) {
+        this.enableIdempotence = enableIdempotence;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/0753f11f/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
index 5a8af58..fa75551 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
@@ -149,6 +149,7 @@ public class KafkaComponentTest {
         props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100");
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
KafkaConstants.KAFKA_DEFAULT_SERIALIZER);
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
KafkaConstants.KAFKA_DEFAULT_SERIALIZER);
+        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
         props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");
         props.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, "TLSv1.2, TLSv1.1, 
TLSv1");
         props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");

http://git-wip-us.apache.org/repos/asf/camel/blob/0753f11f/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
----------------------------------------------------------------------
diff --git 
a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
 
b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
index 2ced5ac..419a037 100644
--- 
a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
+++ 
b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
@@ -672,6 +672,15 @@ public class KafkaComponentConfiguration
          * class cast exception in runtime
          */
         private String interceptorClasses;
+        /**
+         * If set to 'true' the producer will ensure that exactly one copy of
+         * each message is written in the stream. If 'false', producer retries
+         * may write duplicates of the retried message in the stream. If set to
+         * true this option will require max.in.flight.requests.per.connection
+         * to be set to 1 and retries cannot be zero and additionally acks must
+         * be set to 'all'.
+         */
+        private Boolean enableIdempotence = false;
 
         public String getGroupId() {
             return groupId;
@@ -1325,5 +1334,13 @@ public class KafkaComponentConfiguration
         public void setInterceptorClasses(String interceptorClasses) {
             this.interceptorClasses = interceptorClasses;
         }
+
+        public Boolean getEnableIdempotence() {
+            return enableIdempotence;
+        }
+
+        public void setEnableIdempotence(Boolean enableIdempotence) {
+            this.enableIdempotence = enableIdempotence;
+        }
     }
 }
\ No newline at end of file

Reply via email to