This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new e677a65 CAMEL-14968: Allow to configure shutdown timeout for kafka consumer/producer waiting for its worker threads to shutdown graceful. e677a65 is described below commit e677a65f171b0a5f7083edc048c823923f75c071 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Wed May 6 14:00:28 2020 +0200 CAMEL-14968: Allow to configure shutdown timeout for kafka consumer/producer waiting for its worker threads to shutdown graceful. --- .../component/kafka/KafkaComponentConfigurer.java | 5 ++ .../component/kafka/KafkaEndpointConfigurer.java | 5 ++ .../org/apache/camel/component/kafka/kafka.json | 2 + .../camel-kafka/src/main/docs/kafka-component.adoc | 6 +- .../camel/component/kafka/KafkaConfiguration.java | 13 ++++ .../camel/component/kafka/KafkaConsumer.java | 10 ++- .../camel/component/kafka/KafkaProducer.java | 4 +- .../dsl/KafkaComponentBuilderFactory.java | 14 ++++ .../endpoint/dsl/KafkaEndpointBuilderFactory.java | 80 ++++++++++++++++++++++ 9 files changed, 135 insertions(+), 4 deletions(-) diff --git a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java index 23e52e03..c36877a 100644 --- a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java +++ b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java @@ -169,6 +169,8 @@ public class KafkaComponentConfigurer extends PropertyConfigurerSupport implemen case "serializerClass": getOrCreateConfiguration(target).setSerializerClass(property(camelContext, java.lang.String.class, value)); return true; case "sessiontimeoutms": case "sessionTimeoutMs": getOrCreateConfiguration(target).setSessionTimeoutMs(property(camelContext, java.lang.Integer.class, value)); return true; + case "shutdowntimeout": + case "shutdownTimeout": getOrCreateConfiguration(target).setShutdownTimeout(property(camelContext, int.class, value)); return true; case "specificavroreader": case "specificAvroReader": getOrCreateConfiguration(target).setSpecificAvroReader(property(camelContext, boolean.class, value)); return true; case "sslciphersuites": @@ -294,6 +296,7 @@ public class KafkaComponentConfigurer extends PropertyConfigurerSupport implemen answer.put("sendBufferBytes", java.lang.Integer.class); answer.put("serializerClass", java.lang.String.class); answer.put("sessionTimeoutMs", java.lang.Integer.class); + answer.put("shutdownTimeout", int.class); answer.put("specificAvroReader", boolean.class); answer.put("sslCipherSuites", java.lang.String.class); answer.put("sslContextParameters", org.apache.camel.support.jsse.SSLContextParameters.class); @@ -466,6 +469,8 @@ public class KafkaComponentConfigurer extends PropertyConfigurerSupport implemen case "serializerClass": return getOrCreateConfiguration(target).getSerializerClass(); case "sessiontimeoutms": case "sessionTimeoutMs": return getOrCreateConfiguration(target).getSessionTimeoutMs(); + case "shutdowntimeout": + case "shutdownTimeout": return getOrCreateConfiguration(target).getShutdownTimeout(); case "specificavroreader": case "specificAvroReader": return getOrCreateConfiguration(target).isSpecificAvroReader(); case "sslciphersuites": diff --git a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java index 7ed526a..ddae0f4 100644 --- a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java +++ b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java @@ -163,6 +163,8 @@ public class KafkaEndpointConfigurer extends PropertyConfigurerSupport implement case "serializerClass": target.getConfiguration().setSerializerClass(property(camelContext, java.lang.String.class, value)); return true; case "sessiontimeoutms": case "sessionTimeoutMs": target.getConfiguration().setSessionTimeoutMs(property(camelContext, java.lang.Integer.class, value)); return true; + case "shutdowntimeout": + case "shutdownTimeout": target.getConfiguration().setShutdownTimeout(property(camelContext, int.class, value)); return true; case "specificavroreader": case "specificAvroReader": target.getConfiguration().setSpecificAvroReader(property(camelContext, boolean.class, value)); return true; case "sslciphersuites": @@ -287,6 +289,7 @@ public class KafkaEndpointConfigurer extends PropertyConfigurerSupport implement answer.put("sendBufferBytes", java.lang.Integer.class); answer.put("serializerClass", java.lang.String.class); answer.put("sessionTimeoutMs", java.lang.Integer.class); + answer.put("shutdownTimeout", int.class); answer.put("specificAvroReader", boolean.class); answer.put("sslCipherSuites", java.lang.String.class); answer.put("sslContextParameters", org.apache.camel.support.jsse.SSLContextParameters.class); @@ -460,6 +463,8 @@ public class KafkaEndpointConfigurer extends PropertyConfigurerSupport implement case "serializerClass": return target.getConfiguration().getSerializerClass(); case "sessiontimeoutms": case "sessionTimeoutMs": return target.getConfiguration().getSessionTimeoutMs(); + case "shutdowntimeout": + case "shutdownTimeout": return target.getConfiguration().getShutdownTimeout(); case "specificavroreader": case "specificAvroReader": return target.getConfiguration().isSpecificAvroReader(); case "sslciphersuites": diff --git a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json index 756303b..239eb77 100644 --- a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json +++ b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json @@ -27,6 +27,7 @@ "configuration": { "kind": "property", "displayName": "Configuration", "group": "common", "label": "", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.KafkaConfiguration", "deprecated": false, "secret": false, "description": "Allows to pre-configure the Kafka component with common options that the endpoints will reuse." }, "headerFilterStrategy": { "kind": "property", "displayName": "Header Filter Strategy", "group": "common", "label": "common", "required": false, "type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "To use a custom HeaderFilterStrategy to filter header to and from Camel message." }, "reconnectBackoffMaxMs": { "kind": "property", "displayName": "Reconnect Backoff Max Ms", "group": "common", "label": "common", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "secret": false, "defaultValue": "1000", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The maximum amount of time in milliseconds to wait when reconnecting to a broker that has repea [...] + "shutdownTimeout": { "kind": "property", "displayName": "Shutdown Timeout", "group": "common", "label": "common", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "secret": false, "defaultValue": "30000", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Timeout in milli seconds to wait gracefully for the consumer or producer to shutdown and terminate its worker threads." }, "allowManualCommit": { "kind": "property", "displayName": "Allow Manual Commit", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Whether to allow doing manual commits via KafkaManualCommit. If this option is enabled then an instance of Kafk [...] "autoCommitEnable": { "kind": "property", "displayName": "Auto Commit Enable", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "secret": false, "defaultValue": "true", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "If true, periodically commit to ZooKeeper the offset of messages already fetched by the consumer. This [...] "autoCommitIntervalMs": { "kind": "property", "displayName": "Auto Commit Interval Ms", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "secret": false, "defaultValue": "5000", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The frequency in ms that the consumer offsets are committed to zookeeper." }, @@ -125,6 +126,7 @@ "clientId": { "kind": "parameter", "displayName": "Client Id", "group": "common", "label": "common", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The client id is a user-specified string sent in each request to help trace calls. It should logically identify the application making the request." }, "headerFilterStrategy": { "kind": "parameter", "displayName": "Header Filter Strategy", "group": "common", "label": "common", "required": false, "type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "To use a custom HeaderFilterStrategy to filter header to and from Camel message." }, "reconnectBackoffMaxMs": { "kind": "parameter", "displayName": "Reconnect Backoff Max Ms", "group": "common", "label": "common", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "secret": false, "defaultValue": "1000", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The maximum amount of time in milliseconds to wait when reconnecting to a broker that has repe [...] + "shutdownTimeout": { "kind": "parameter", "displayName": "Shutdown Timeout", "group": "common", "label": "common", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "secret": false, "defaultValue": "30000", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Timeout in milli seconds to wait gracefully for the consumer or producer to shutdown and terminate its worker threads." }, "allowManualCommit": { "kind": "parameter", "displayName": "Allow Manual Commit", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Whether to allow doing manual commits via KafkaManualCommit. If this option is enabled then an instance of Kaf [...] "autoCommitEnable": { "kind": "parameter", "displayName": "Auto Commit Enable", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "secret": false, "defaultValue": "true", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "If true, periodically commit to ZooKeeper the offset of messages already fetched by the consumer. Thi [...] "autoCommitIntervalMs": { "kind": "parameter", "displayName": "Auto Commit Interval Ms", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "secret": false, "defaultValue": "5000", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The frequency in ms that the consumer offsets are committed to zookeeper." }, diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc index bf5c294..e00506f5 100644 --- a/components/camel-kafka/src/main/docs/kafka-component.adoc +++ b/components/camel-kafka/src/main/docs/kafka-component.adoc @@ -40,7 +40,7 @@ kafka:topic[?options] // component options: START -The Kafka component supports 96 options, which are listed below. +The Kafka component supports 97 options, which are listed below. @@ -53,6 +53,7 @@ The Kafka component supports 96 options, which are listed below. | *configuration* (common) | Allows to pre-configure the Kafka component with common options that the endpoints will reuse. | | KafkaConfiguration | *headerFilterStrategy* (common) | To use a custom HeaderFilterStrategy to filter header to and from Camel message. | | HeaderFilterStrategy | *reconnectBackoffMaxMs* (common) | The maximum amount of time in milliseconds to wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. After calculating the backoff increase, 20% random jitter is added to avoid connection storms. | 1000 | Integer +| *shutdownTimeout* (common) | Timeout in milli seconds to wait gracefully for the consumer or producer to shutdown and terminate its worker threads. | 30000 | int | *allowManualCommit* (consumer) | Whether to allow doing manual commits via KafkaManualCommit. If this option is enabled then an instance of KafkaManualCommit is stored on the Exchange message header, which allows end users to access this API and perform manual offset commits via the Kafka consumer. | false | boolean | *autoCommitEnable* (consumer) | If true, periodically commit to ZooKeeper the offset of messages already fetched by the consumer. This committed offset will be used when the process fails as the position from which the new consumer will begin. | true | Boolean | *autoCommitIntervalMs* (consumer) | The frequency in ms that the consumer offsets are committed to zookeeper. | 5000 | Integer @@ -167,7 +168,7 @@ with the following path and query parameters: |=== -=== Query Parameters (96 parameters): +=== Query Parameters (97 parameters): [width="100%",cols="2,5,^1,2",options="header"] @@ -178,6 +179,7 @@ with the following path and query parameters: | *clientId* (common) | The client id is a user-specified string sent in each request to help trace calls. It should logically identify the application making the request. | | String | *headerFilterStrategy* (common) | To use a custom HeaderFilterStrategy to filter header to and from Camel message. | | HeaderFilterStrategy | *reconnectBackoffMaxMs* (common) | The maximum amount of time in milliseconds to wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. After calculating the backoff increase, 20% random jitter is added to avoid connection storms. | 1000 | Integer +| *shutdownTimeout* (common) | Timeout in milli seconds to wait gracefully for the consumer or producer to shutdown and terminate its worker threads. | 30000 | int | *allowManualCommit* (consumer) | Whether to allow doing manual commits via KafkaManualCommit. If this option is enabled then an instance of KafkaManualCommit is stored on the Exchange message header, which allows end users to access this API and perform manual offset commits via the Kafka consumer. | false | boolean | *autoCommitEnable* (consumer) | If true, periodically commit to ZooKeeper the offset of messages already fetched by the consumer. This committed offset will be used when the process fails as the position from which the new consumer will begin. | true | Boolean | *autoCommitIntervalMs* (consumer) | The frequency in ms that the consumer offsets are committed to zookeeper. | 5000 | Integer diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java index 5f99fb5..0fd1c64 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java @@ -319,6 +319,8 @@ public class KafkaConfiguration implements Cloneable, HeaderFilterStrategyAware // Additional properties @UriParam(label = "common", prefix = "additionalProperties.", multiValue = true) private Map<String, Object> additionalProperties = new HashMap<>(); + @UriParam(label = "common", defaultValue = "30000") + private int shutdownTimeout = 30000; public KafkaConfiguration() { } @@ -661,6 +663,17 @@ public class KafkaConfiguration implements Cloneable, HeaderFilterStrategyAware this.allowManualCommit = allowManualCommit; } + public int getShutdownTimeout() { + return shutdownTimeout; + } + + /** + * Timeout in milli seconds to wait gracefully for the consumer or producer to shutdown and terminate its worker threads. + */ + public void setShutdownTimeout(int shutdownTimeout) { + this.shutdownTimeout = shutdownTimeout; + } + public StateRepository<String, String> getOffsetRepository() { return offsetRepository; } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java index 622f0c7..5bad75b 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java @@ -32,6 +32,7 @@ import java.util.concurrent.ExecutorService; import java.util.regex.Pattern; import java.util.stream.StreamSupport; +import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer; @@ -78,6 +79,11 @@ public class KafkaConsumer extends DefaultConsumer { } } + @Override + public KafkaEndpoint getEndpoint() { + return (KafkaEndpoint) super.getEndpoint(); + } + Properties getProps() { Properties props = endpoint.getConfiguration().createConsumerProperties(); endpoint.updateClassProperties(props); @@ -142,7 +148,9 @@ public class KafkaConsumer extends DefaultConsumer { if (executor != null) { if (getEndpoint() != null && getEndpoint().getCamelContext() != null) { - getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(executor); + int timeout = getEndpoint().getConfiguration().getShutdownTimeout(); + LOG.debug("Shutting down Kafka consumer worker threads with timeout {} millis", timeout); + getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(executor, timeout); } else { executor.shutdownNow(); } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java index dd15bba..12c5239 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java @@ -137,7 +137,9 @@ public class KafkaProducer extends DefaultAsyncProducer { } if (shutdownWorkerPool && workerPool != null) { - endpoint.getCamelContext().getExecutorServiceManager().shutdown(workerPool); + int timeout = endpoint.getConfiguration().getShutdownTimeout(); + LOG.debug("Shutting down Kafka producer worker threads with timeout {} millis", timeout); + endpoint.getCamelContext().getExecutorServiceManager().shutdownGraceful(workerPool, timeout); workerPool = null; } } diff --git a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java index 38de2e6..b3943fd 100644 --- a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java +++ b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java @@ -138,6 +138,19 @@ public interface KafkaComponentBuilderFactory { return this; } /** + * Timeout in milli seconds to wait gracefully for the consumer or + * producer to shutdown and terminate its worker threads. + * + * The option is a: <code>int</code> type. + * + * Default: 30000 + * Group: common + */ + default KafkaComponentBuilder shutdownTimeout(int shutdownTimeout) { + doSetProperty("shutdownTimeout", shutdownTimeout); + return this; + } + /** * Whether to allow doing manual commits via KafkaManualCommit. If this * option is enabled then an instance of KafkaManualCommit is stored on * the Exchange message header, which allows end users to access this @@ -1555,6 +1568,7 @@ public interface KafkaComponentBuilderFactory { case "configuration": ((KafkaComponent) component).setConfiguration((org.apache.camel.component.kafka.KafkaConfiguration) value); return true; case "headerFilterStrategy": getOrCreateConfiguration((KafkaComponent) component).setHeaderFilterStrategy((org.apache.camel.spi.HeaderFilterStrategy) value); return true; case "reconnectBackoffMaxMs": getOrCreateConfiguration((KafkaComponent) component).setReconnectBackoffMaxMs((java.lang.Integer) value); return true; + case "shutdownTimeout": getOrCreateConfiguration((KafkaComponent) component).setShutdownTimeout((int) value); return true; case "allowManualCommit": getOrCreateConfiguration((KafkaComponent) component).setAllowManualCommit((boolean) value); return true; case "autoCommitEnable": getOrCreateConfiguration((KafkaComponent) component).setAutoCommitEnable((java.lang.Boolean) value); return true; case "autoCommitIntervalMs": getOrCreateConfiguration((KafkaComponent) component).setAutoCommitIntervalMs((java.lang.Integer) value); return true; diff --git a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java index 0d0941d..5a83015 100644 --- a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java +++ b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java @@ -173,6 +173,33 @@ public interface KafkaEndpointBuilderFactory { return this; } /** + * Timeout in milli seconds to wait gracefully for the consumer or + * producer to shutdown and terminate its worker threads. + * + * The option is a: <code>int</code> type. + * + * Default: 30000 + * Group: common + */ + default KafkaEndpointConsumerBuilder shutdownTimeout(int shutdownTimeout) { + doSetProperty("shutdownTimeout", shutdownTimeout); + return this; + } + /** + * Timeout in milli seconds to wait gracefully for the consumer or + * producer to shutdown and terminate its worker threads. + * + * The option will be converted to a <code>int</code> type. + * + * Default: 30000 + * Group: common + */ + default KafkaEndpointConsumerBuilder shutdownTimeout( + String shutdownTimeout) { + doSetProperty("shutdownTimeout", shutdownTimeout); + return this; + } + /** * Whether to allow doing manual commits via KafkaManualCommit. If this * option is enabled then an instance of KafkaManualCommit is stored on * the Exchange message header, which allows end users to access this @@ -1597,6 +1624,33 @@ public interface KafkaEndpointBuilderFactory { return this; } /** + * Timeout in milli seconds to wait gracefully for the consumer or + * producer to shutdown and terminate its worker threads. + * + * The option is a: <code>int</code> type. + * + * Default: 30000 + * Group: common + */ + default KafkaEndpointProducerBuilder shutdownTimeout(int shutdownTimeout) { + doSetProperty("shutdownTimeout", shutdownTimeout); + return this; + } + /** + * Timeout in milli seconds to wait gracefully for the consumer or + * producer to shutdown and terminate its worker threads. + * + * The option will be converted to a <code>int</code> type. + * + * Default: 30000 + * Group: common + */ + default KafkaEndpointProducerBuilder shutdownTimeout( + String shutdownTimeout) { + doSetProperty("shutdownTimeout", shutdownTimeout); + return this; + } + /** * The total bytes of memory the producer can use to buffer records * waiting to be sent to the server. If records are sent faster than * they can be delivered to the server the producer will either block or @@ -3190,6 +3244,32 @@ public interface KafkaEndpointBuilderFactory { return this; } /** + * Timeout in milli seconds to wait gracefully for the consumer or + * producer to shutdown and terminate its worker threads. + * + * The option is a: <code>int</code> type. + * + * Default: 30000 + * Group: common + */ + default KafkaEndpointBuilder shutdownTimeout(int shutdownTimeout) { + doSetProperty("shutdownTimeout", shutdownTimeout); + return this; + } + /** + * Timeout in milli seconds to wait gracefully for the consumer or + * producer to shutdown and terminate its worker threads. + * + * The option will be converted to a <code>int</code> type. + * + * Default: 30000 + * Group: common + */ + default KafkaEndpointBuilder shutdownTimeout(String shutdownTimeout) { + doSetProperty("shutdownTimeout", shutdownTimeout); + return this; + } + /** * URL of the Confluent Platform schema registry servers to use. The * format is host1:port1,host2:port2. This is known as * schema.registry.url in the Confluent Platform documentation. This