This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new e677a65  CAMEL-14968: Allow to configure shutdown timeout for kafka 
consumer/producer waiting for its worker threads to shutdown graceful.
e677a65 is described below

commit e677a65f171b0a5f7083edc048c823923f75c071
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Wed May 6 14:00:28 2020 +0200

    CAMEL-14968: Allow to configure shutdown timeout for kafka 
consumer/producer waiting for its worker threads to shutdown graceful.
---
 .../component/kafka/KafkaComponentConfigurer.java  |  5 ++
 .../component/kafka/KafkaEndpointConfigurer.java   |  5 ++
 .../org/apache/camel/component/kafka/kafka.json    |  2 +
 .../camel-kafka/src/main/docs/kafka-component.adoc |  6 +-
 .../camel/component/kafka/KafkaConfiguration.java  | 13 ++++
 .../camel/component/kafka/KafkaConsumer.java       | 10 ++-
 .../camel/component/kafka/KafkaProducer.java       |  4 +-
 .../dsl/KafkaComponentBuilderFactory.java          | 14 ++++
 .../endpoint/dsl/KafkaEndpointBuilderFactory.java  | 80 ++++++++++++++++++++++
 9 files changed, 135 insertions(+), 4 deletions(-)

diff --git 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
index 23e52e03..c36877a 100644
--- 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
+++ 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
@@ -169,6 +169,8 @@ public class KafkaComponentConfigurer extends 
PropertyConfigurerSupport implemen
         case "serializerClass": 
getOrCreateConfiguration(target).setSerializerClass(property(camelContext, 
java.lang.String.class, value)); return true;
         case "sessiontimeoutms":
         case "sessionTimeoutMs": 
getOrCreateConfiguration(target).setSessionTimeoutMs(property(camelContext, 
java.lang.Integer.class, value)); return true;
+        case "shutdowntimeout":
+        case "shutdownTimeout": 
getOrCreateConfiguration(target).setShutdownTimeout(property(camelContext, 
int.class, value)); return true;
         case "specificavroreader":
         case "specificAvroReader": 
getOrCreateConfiguration(target).setSpecificAvroReader(property(camelContext, 
boolean.class, value)); return true;
         case "sslciphersuites":
@@ -294,6 +296,7 @@ public class KafkaComponentConfigurer extends 
PropertyConfigurerSupport implemen
         answer.put("sendBufferBytes", java.lang.Integer.class);
         answer.put("serializerClass", java.lang.String.class);
         answer.put("sessionTimeoutMs", java.lang.Integer.class);
+        answer.put("shutdownTimeout", int.class);
         answer.put("specificAvroReader", boolean.class);
         answer.put("sslCipherSuites", java.lang.String.class);
         answer.put("sslContextParameters", 
org.apache.camel.support.jsse.SSLContextParameters.class);
@@ -466,6 +469,8 @@ public class KafkaComponentConfigurer extends 
PropertyConfigurerSupport implemen
         case "serializerClass": return 
getOrCreateConfiguration(target).getSerializerClass();
         case "sessiontimeoutms":
         case "sessionTimeoutMs": return 
getOrCreateConfiguration(target).getSessionTimeoutMs();
+        case "shutdowntimeout":
+        case "shutdownTimeout": return 
getOrCreateConfiguration(target).getShutdownTimeout();
         case "specificavroreader":
         case "specificAvroReader": return 
getOrCreateConfiguration(target).isSpecificAvroReader();
         case "sslciphersuites":
diff --git 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
index 7ed526a..ddae0f4 100644
--- 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
+++ 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
@@ -163,6 +163,8 @@ public class KafkaEndpointConfigurer extends 
PropertyConfigurerSupport implement
         case "serializerClass": 
target.getConfiguration().setSerializerClass(property(camelContext, 
java.lang.String.class, value)); return true;
         case "sessiontimeoutms":
         case "sessionTimeoutMs": 
target.getConfiguration().setSessionTimeoutMs(property(camelContext, 
java.lang.Integer.class, value)); return true;
+        case "shutdowntimeout":
+        case "shutdownTimeout": 
target.getConfiguration().setShutdownTimeout(property(camelContext, int.class, 
value)); return true;
         case "specificavroreader":
         case "specificAvroReader": 
target.getConfiguration().setSpecificAvroReader(property(camelContext, 
boolean.class, value)); return true;
         case "sslciphersuites":
@@ -287,6 +289,7 @@ public class KafkaEndpointConfigurer extends 
PropertyConfigurerSupport implement
         answer.put("sendBufferBytes", java.lang.Integer.class);
         answer.put("serializerClass", java.lang.String.class);
         answer.put("sessionTimeoutMs", java.lang.Integer.class);
+        answer.put("shutdownTimeout", int.class);
         answer.put("specificAvroReader", boolean.class);
         answer.put("sslCipherSuites", java.lang.String.class);
         answer.put("sslContextParameters", 
org.apache.camel.support.jsse.SSLContextParameters.class);
@@ -460,6 +463,8 @@ public class KafkaEndpointConfigurer extends 
PropertyConfigurerSupport implement
         case "serializerClass": return 
target.getConfiguration().getSerializerClass();
         case "sessiontimeoutms":
         case "sessionTimeoutMs": return 
target.getConfiguration().getSessionTimeoutMs();
+        case "shutdowntimeout":
+        case "shutdownTimeout": return 
target.getConfiguration().getShutdownTimeout();
         case "specificavroreader":
         case "specificAvroReader": return 
target.getConfiguration().isSpecificAvroReader();
         case "sslciphersuites":
diff --git 
a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
 
b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
index 756303b..239eb77 100644
--- 
a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
+++ 
b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
@@ -27,6 +27,7 @@
     "configuration": { "kind": "property", "displayName": "Configuration", 
"group": "common", "label": "", "required": false, "type": "object", 
"javaType": "org.apache.camel.component.kafka.KafkaConfiguration", 
"deprecated": false, "secret": false, "description": "Allows to pre-configure 
the Kafka component with common options that the endpoints will reuse." },
     "headerFilterStrategy": { "kind": "property", "displayName": "Header 
Filter Strategy", "group": "common", "label": "common", "required": false, 
"type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy", 
"deprecated": false, "secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "To use a custom HeaderFilterStrategy to filter 
header to and from Camel message." },
     "reconnectBackoffMaxMs": { "kind": "property", "displayName": "Reconnect 
Backoff Max Ms", "group": "common", "label": "common", "required": false, 
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false, 
"secret": false, "defaultValue": "1000", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The maximum amount of time in milliseconds to 
wait when reconnecting to a broker that has repea [...]
+    "shutdownTimeout": { "kind": "property", "displayName": "Shutdown 
Timeout", "group": "common", "label": "common", "required": false, "type": 
"integer", "javaType": "int", "deprecated": false, "secret": false, 
"defaultValue": "30000", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Timeout in milli seconds to wait gracefully 
for the consumer or producer to shutdown and terminate its worker threads." },
     "allowManualCommit": { "kind": "property", "displayName": "Allow Manual 
Commit", "group": "consumer", "label": "consumer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "secret": false, 
"defaultValue": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Whether to allow doing manual commits via 
KafkaManualCommit. If this option is enabled then an instance of Kafk [...]
     "autoCommitEnable": { "kind": "property", "displayName": "Auto Commit 
Enable", "group": "consumer", "label": "consumer", "required": false, "type": 
"boolean", "javaType": "java.lang.Boolean", "deprecated": false, "secret": 
false, "defaultValue": "true", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "If true, periodically commit to ZooKeeper the 
offset of messages already fetched by the consumer. This [...]
     "autoCommitIntervalMs": { "kind": "property", "displayName": "Auto Commit 
Interval Ms", "group": "consumer", "label": "consumer", "required": false, 
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false, 
"secret": false, "defaultValue": "5000", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The frequency in ms that the consumer offsets 
are committed to zookeeper." },
@@ -125,6 +126,7 @@
     "clientId": { "kind": "parameter", "displayName": "Client Id", "group": 
"common", "label": "common", "required": false, "type": "string", "javaType": 
"java.lang.String", "deprecated": false, "secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The client id is a user-specified string sent 
in each request to help trace calls. It should logically identify the 
application making the request." },
     "headerFilterStrategy": { "kind": "parameter", "displayName": "Header 
Filter Strategy", "group": "common", "label": "common", "required": false, 
"type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy", 
"deprecated": false, "secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "To use a custom HeaderFilterStrategy to filter 
header to and from Camel message." },
     "reconnectBackoffMaxMs": { "kind": "parameter", "displayName": "Reconnect 
Backoff Max Ms", "group": "common", "label": "common", "required": false, 
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false, 
"secret": false, "defaultValue": "1000", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The maximum amount of time in milliseconds to 
wait when reconnecting to a broker that has repe [...]
+    "shutdownTimeout": { "kind": "parameter", "displayName": "Shutdown 
Timeout", "group": "common", "label": "common", "required": false, "type": 
"integer", "javaType": "int", "deprecated": false, "secret": false, 
"defaultValue": "30000", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Timeout in milli seconds to wait gracefully 
for the consumer or producer to shutdown and terminate its worker threads." },
     "allowManualCommit": { "kind": "parameter", "displayName": "Allow Manual 
Commit", "group": "consumer", "label": "consumer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "secret": false, 
"defaultValue": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Whether to allow doing manual commits via 
KafkaManualCommit. If this option is enabled then an instance of Kaf [...]
     "autoCommitEnable": { "kind": "parameter", "displayName": "Auto Commit 
Enable", "group": "consumer", "label": "consumer", "required": false, "type": 
"boolean", "javaType": "java.lang.Boolean", "deprecated": false, "secret": 
false, "defaultValue": "true", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "If true, periodically commit to ZooKeeper the 
offset of messages already fetched by the consumer. Thi [...]
     "autoCommitIntervalMs": { "kind": "parameter", "displayName": "Auto Commit 
Interval Ms", "group": "consumer", "label": "consumer", "required": false, 
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false, 
"secret": false, "defaultValue": "5000", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The frequency in ms that the consumer offsets 
are committed to zookeeper." },
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc 
b/components/camel-kafka/src/main/docs/kafka-component.adoc
index bf5c294..e00506f5 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -40,7 +40,7 @@ kafka:topic[?options]
 
 
 // component options: START
-The Kafka component supports 96 options, which are listed below.
+The Kafka component supports 97 options, which are listed below.
 
 
 
@@ -53,6 +53,7 @@ The Kafka component supports 96 options, which are listed 
below.
 | *configuration* (common) | Allows to pre-configure the Kafka component with 
common options that the endpoints will reuse. |  | KafkaConfiguration
 | *headerFilterStrategy* (common) | To use a custom HeaderFilterStrategy to 
filter header to and from Camel message. |  | HeaderFilterStrategy
 | *reconnectBackoffMaxMs* (common) | The maximum amount of time in 
milliseconds to wait when reconnecting to a broker that has repeatedly failed 
to connect. If provided, the backoff per host will increase exponentially for 
each consecutive connection failure, up to this maximum. After calculating the 
backoff increase, 20% random jitter is added to avoid connection storms. | 1000 
| Integer
+| *shutdownTimeout* (common) | Timeout in milli seconds to wait gracefully for 
the consumer or producer to shutdown and terminate its worker threads. | 30000 
| int
 | *allowManualCommit* (consumer) | Whether to allow doing manual commits via 
KafkaManualCommit. If this option is enabled then an instance of 
KafkaManualCommit is stored on the Exchange message header, which allows end 
users to access this API and perform manual offset commits via the Kafka 
consumer. | false | boolean
 | *autoCommitEnable* (consumer) | If true, periodically commit to ZooKeeper 
the offset of messages already fetched by the consumer. This committed offset 
will be used when the process fails as the position from which the new consumer 
will begin. | true | Boolean
 | *autoCommitIntervalMs* (consumer) | The frequency in ms that the consumer 
offsets are committed to zookeeper. | 5000 | Integer
@@ -167,7 +168,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (96 parameters):
+=== Query Parameters (97 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -178,6 +179,7 @@ with the following path and query parameters:
 | *clientId* (common) | The client id is a user-specified string sent in each 
request to help trace calls. It should logically identify the application 
making the request. |  | String
 | *headerFilterStrategy* (common) | To use a custom HeaderFilterStrategy to 
filter header to and from Camel message. |  | HeaderFilterStrategy
 | *reconnectBackoffMaxMs* (common) | The maximum amount of time in 
milliseconds to wait when reconnecting to a broker that has repeatedly failed 
to connect. If provided, the backoff per host will increase exponentially for 
each consecutive connection failure, up to this maximum. After calculating the 
backoff increase, 20% random jitter is added to avoid connection storms. | 1000 
| Integer
+| *shutdownTimeout* (common) | Timeout in milli seconds to wait gracefully for 
the consumer or producer to shutdown and terminate its worker threads. | 30000 
| int
 | *allowManualCommit* (consumer) | Whether to allow doing manual commits via 
KafkaManualCommit. If this option is enabled then an instance of 
KafkaManualCommit is stored on the Exchange message header, which allows end 
users to access this API and perform manual offset commits via the Kafka 
consumer. | false | boolean
 | *autoCommitEnable* (consumer) | If true, periodically commit to ZooKeeper 
the offset of messages already fetched by the consumer. This committed offset 
will be used when the process fails as the position from which the new consumer 
will begin. | true | Boolean
 | *autoCommitIntervalMs* (consumer) | The frequency in ms that the consumer 
offsets are committed to zookeeper. | 5000 | Integer
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 5f99fb5..0fd1c64 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -319,6 +319,8 @@ public class KafkaConfiguration implements Cloneable, 
HeaderFilterStrategyAware
     // Additional properties
     @UriParam(label = "common", prefix = "additionalProperties.", multiValue = 
true)
     private Map<String, Object> additionalProperties = new HashMap<>();
+    @UriParam(label = "common", defaultValue = "30000")
+    private int shutdownTimeout = 30000;
 
     public KafkaConfiguration() {
     }
@@ -661,6 +663,17 @@ public class KafkaConfiguration implements Cloneable, 
HeaderFilterStrategyAware
         this.allowManualCommit = allowManualCommit;
     }
 
+    public int getShutdownTimeout() {
+        return shutdownTimeout;
+    }
+
+    /**
+     * Timeout in milli seconds to wait gracefully for the consumer or 
producer to shutdown and terminate its worker threads.
+     */
+    public void setShutdownTimeout(int shutdownTimeout) {
+        this.shutdownTimeout = shutdownTimeout;
+    }
+
     public StateRepository<String, String> getOffsetRepository() {
         return offsetRepository;
     }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 622f0c7..5bad75b 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.regex.Pattern;
 import java.util.stream.StreamSupport;
 
+import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer;
@@ -78,6 +79,11 @@ public class KafkaConsumer extends DefaultConsumer {
         }
     }
 
+    @Override
+    public KafkaEndpoint getEndpoint() {
+        return (KafkaEndpoint) super.getEndpoint();
+    }
+
     Properties getProps() {
         Properties props = 
endpoint.getConfiguration().createConsumerProperties();
         endpoint.updateClassProperties(props);
@@ -142,7 +148,9 @@ public class KafkaConsumer extends DefaultConsumer {
 
         if (executor != null) {
             if (getEndpoint() != null && getEndpoint().getCamelContext() != 
null) {
-                
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(executor);
+                int timeout = 
getEndpoint().getConfiguration().getShutdownTimeout();
+                LOG.debug("Shutting down Kafka consumer worker threads with 
timeout {} millis", timeout);
+                
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(executor,
 timeout);
             } else {
                 executor.shutdownNow();
             }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index dd15bba..12c5239 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -137,7 +137,9 @@ public class KafkaProducer extends DefaultAsyncProducer {
         }
 
         if (shutdownWorkerPool && workerPool != null) {
-            
endpoint.getCamelContext().getExecutorServiceManager().shutdown(workerPool);
+            int timeout = endpoint.getConfiguration().getShutdownTimeout();
+            LOG.debug("Shutting down Kafka producer worker threads with 
timeout {} millis", timeout);
+            
endpoint.getCamelContext().getExecutorServiceManager().shutdownGraceful(workerPool,
 timeout);
             workerPool = null;
         }
     }
diff --git 
a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
 
b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
index 38de2e6..b3943fd 100644
--- 
a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
+++ 
b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
@@ -138,6 +138,19 @@ public interface KafkaComponentBuilderFactory {
             return this;
         }
         /**
+         * Timeout in milli seconds to wait gracefully for the consumer or
+         * producer to shutdown and terminate its worker threads.
+         * 
+         * The option is a: <code>int</code> type.
+         * 
+         * Default: 30000
+         * Group: common
+         */
+        default KafkaComponentBuilder shutdownTimeout(int shutdownTimeout) {
+            doSetProperty("shutdownTimeout", shutdownTimeout);
+            return this;
+        }
+        /**
          * Whether to allow doing manual commits via KafkaManualCommit. If this
          * option is enabled then an instance of KafkaManualCommit is stored on
          * the Exchange message header, which allows end users to access this
@@ -1555,6 +1568,7 @@ public interface KafkaComponentBuilderFactory {
             case "configuration": ((KafkaComponent) 
component).setConfiguration((org.apache.camel.component.kafka.KafkaConfiguration)
 value); return true;
             case "headerFilterStrategy": 
getOrCreateConfiguration((KafkaComponent) 
component).setHeaderFilterStrategy((org.apache.camel.spi.HeaderFilterStrategy) 
value); return true;
             case "reconnectBackoffMaxMs": 
getOrCreateConfiguration((KafkaComponent) 
component).setReconnectBackoffMaxMs((java.lang.Integer) value); return true;
+            case "shutdownTimeout": getOrCreateConfiguration((KafkaComponent) 
component).setShutdownTimeout((int) value); return true;
             case "allowManualCommit": 
getOrCreateConfiguration((KafkaComponent) 
component).setAllowManualCommit((boolean) value); return true;
             case "autoCommitEnable": getOrCreateConfiguration((KafkaComponent) 
component).setAutoCommitEnable((java.lang.Boolean) value); return true;
             case "autoCommitIntervalMs": 
getOrCreateConfiguration((KafkaComponent) 
component).setAutoCommitIntervalMs((java.lang.Integer) value); return true;
diff --git 
a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
 
b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
index 0d0941d..5a83015 100644
--- 
a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
+++ 
b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
@@ -173,6 +173,33 @@ public interface KafkaEndpointBuilderFactory {
             return this;
         }
         /**
+         * Timeout in milli seconds to wait gracefully for the consumer or
+         * producer to shutdown and terminate its worker threads.
+         * 
+         * The option is a: <code>int</code> type.
+         * 
+         * Default: 30000
+         * Group: common
+         */
+        default KafkaEndpointConsumerBuilder shutdownTimeout(int 
shutdownTimeout) {
+            doSetProperty("shutdownTimeout", shutdownTimeout);
+            return this;
+        }
+        /**
+         * Timeout in milli seconds to wait gracefully for the consumer or
+         * producer to shutdown and terminate its worker threads.
+         * 
+         * The option will be converted to a <code>int</code> type.
+         * 
+         * Default: 30000
+         * Group: common
+         */
+        default KafkaEndpointConsumerBuilder shutdownTimeout(
+                String shutdownTimeout) {
+            doSetProperty("shutdownTimeout", shutdownTimeout);
+            return this;
+        }
+        /**
          * Whether to allow doing manual commits via KafkaManualCommit. If this
          * option is enabled then an instance of KafkaManualCommit is stored on
          * the Exchange message header, which allows end users to access this
@@ -1597,6 +1624,33 @@ public interface KafkaEndpointBuilderFactory {
             return this;
         }
         /**
+         * Timeout in milli seconds to wait gracefully for the consumer or
+         * producer to shutdown and terminate its worker threads.
+         * 
+         * The option is a: <code>int</code> type.
+         * 
+         * Default: 30000
+         * Group: common
+         */
+        default KafkaEndpointProducerBuilder shutdownTimeout(int 
shutdownTimeout) {
+            doSetProperty("shutdownTimeout", shutdownTimeout);
+            return this;
+        }
+        /**
+         * Timeout in milli seconds to wait gracefully for the consumer or
+         * producer to shutdown and terminate its worker threads.
+         * 
+         * The option will be converted to a <code>int</code> type.
+         * 
+         * Default: 30000
+         * Group: common
+         */
+        default KafkaEndpointProducerBuilder shutdownTimeout(
+                String shutdownTimeout) {
+            doSetProperty("shutdownTimeout", shutdownTimeout);
+            return this;
+        }
+        /**
          * The total bytes of memory the producer can use to buffer records
          * waiting to be sent to the server. If records are sent faster than
          * they can be delivered to the server the producer will either block 
or
@@ -3190,6 +3244,32 @@ public interface KafkaEndpointBuilderFactory {
             return this;
         }
         /**
+         * Timeout in milli seconds to wait gracefully for the consumer or
+         * producer to shutdown and terminate its worker threads.
+         * 
+         * The option is a: <code>int</code> type.
+         * 
+         * Default: 30000
+         * Group: common
+         */
+        default KafkaEndpointBuilder shutdownTimeout(int shutdownTimeout) {
+            doSetProperty("shutdownTimeout", shutdownTimeout);
+            return this;
+        }
+        /**
+         * Timeout in milli seconds to wait gracefully for the consumer or
+         * producer to shutdown and terminate its worker threads.
+         * 
+         * The option will be converted to a <code>int</code> type.
+         * 
+         * Default: 30000
+         * Group: common
+         */
+        default KafkaEndpointBuilder shutdownTimeout(String shutdownTimeout) {
+            doSetProperty("shutdownTimeout", shutdownTimeout);
+            return this;
+        }
+        /**
          * URL of the Confluent Platform schema registry servers to use. The
          * format is host1:port1,host2:port2. This is known as
          * schema.registry.url in the Confluent Platform documentation. This

Reply via email to