This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit b86c25511c9a02690547f4b45d645d18f56a3866
Author: Otavio Rodolfo Piske <opi...@redhat.com>
AuthorDate: Fri Sep 10 18:21:00 2021 +0200

    CAMEL-16949: rework the KafkaFetchRecords to avoid the sleep
    
    This refactoring reworks the KafkaFetchRecords to avoid the sleep and
    make better use of the non-blocking Kafka client APIs.
    
    In addition to simplifying the code, it also solves the issue
    CAMEL-16973 that could cause the client to not perform manual commits if a 
wakeup call is
    issued and the client is performing a commit.
---
 .../org/apache/camel/catalog/components/kafka.json |   6 +-
 .../component/kafka/KafkaComponentConfigurer.java  |   6 +
 .../component/kafka/KafkaEndpointConfigurer.java   |   6 +
 .../component/kafka/KafkaEndpointUriFactory.java   |   3 +-
 .../org/apache/camel/component/kafka/kafka.json    |   6 +-
 .../camel-kafka/src/main/docs/kafka-component.adoc |  10 +-
 .../component/kafka/DefaultKafkaManualCommit.java  |  11 +-
 .../kafka/DefaultKafkaManualCommitFactory.java     |   5 +-
 .../camel/component/kafka/KafkaConfiguration.java  |  17 +-
 .../camel/component/kafka/KafkaConsumer.java       |  13 +-
 .../camel/component/kafka/KafkaFetchRecords.java   | 730 +++++++++------------
 .../component/kafka/KafkaManualCommitFactory.java  |   2 +-
 .../consumer/support/KafkaRecordProcessor.java     | 243 +++++++
 .../consumer/support/OffsetResumeStrategy.java     |  62 ++
 .../support/PartitionAssignmentListener.java       | 106 +++
 .../support/ResumeStrategy.java}                   |  19 +-
 .../support/ResumeStrategyFactory.java}            |  41 +-
 .../consumer/support/SeekPolicyResumeStrategy.java |  56 ++
 .../kafka/integration/KafkaConsumerFullIT.java     |   1 -
 .../integration/KafkaConsumerManualCommitIT.java   |   8 +-
 .../dsl/KafkaComponentBuilderFactory.java          |  20 +-
 .../endpoint/dsl/KafkaEndpointBuilderFactory.java  |  47 +-
 22 files changed, 960 insertions(+), 458 deletions(-)

diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
index 0a52add..e664ddd 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
@@ -28,7 +28,7 @@
     "configuration": { "kind": "property", "displayName": "Configuration", 
"group": "common", "label": "", "required": false, "type": "object", 
"javaType": "org.apache.camel.component.kafka.KafkaConfiguration", 
"deprecated": false, "autowired": false, "secret": false, "description": 
"Allows to pre-configure the Kafka component with common options that the 
endpoints will reuse." },
     "headerFilterStrategy": { "kind": "property", "displayName": "Header 
Filter Strategy", "group": "common", "label": "common", "required": false, 
"type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy", 
"deprecated": false, "autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "To use a custom HeaderFilterStrategy to filter 
header to and from Camel message." },
     "reconnectBackoffMaxMs": { "kind": "property", "displayName": "Reconnect 
Backoff Max Ms", "group": "common", "label": "common", "required": false, 
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "1000", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "The maximum amount of 
time in milliseconds to wait when reconnecting to a b [...]
-    "shutdownTimeout": { "kind": "property", "displayName": "Shutdown 
Timeout", "group": "common", "label": "common", "required": false, "type": 
"integer", "javaType": "int", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": 30000, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Timeout in milli seconds to wait gracefully 
for the consumer or producer to shutdown and terminate its w [...]
+    "shutdownTimeout": { "kind": "property", "displayName": "Shutdown 
Timeout", "group": "common", "label": "common", "required": false, "type": 
"integer", "javaType": "int", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": 30000, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Timeout in milliseconds to wait gracefully for 
the consumer or producer to shutdown and terminate its wo [...]
     "allowManualCommit": { "kind": "property", "displayName": "Allow Manual 
Commit", "group": "consumer", "label": "consumer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Whether to allow doing manual commits via 
KafkaManualCommit. If this option is enabled then [...]
     "autoCommitEnable": { "kind": "property", "displayName": "Auto Commit 
Enable", "group": "consumer", "label": "consumer", "required": false, "type": 
"boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "true", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "If true, periodically commit to ZooKeeper the 
offset of messages already fetched b [...]
     "autoCommitIntervalMs": { "kind": "property", "displayName": "Auto Commit 
Interval Ms", "group": "consumer", "label": "consumer", "required": false, 
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "5000", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "The frequency in ms that 
the consumer offsets are committed to zookeeper." },
@@ -37,6 +37,7 @@
     "breakOnFirstError": { "kind": "property", "displayName": "Break On First 
Error", "group": "consumer", "label": "consumer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "This options controls what happens when a 
consumer is processing an exchange and it fails. [...]
     "bridgeErrorHandler": { "kind": "property", "displayName": "Bridge Error 
Handler", "group": "consumer", "label": "consumer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "description": "Allows for bridging the 
consumer to the Camel routing Error Handler, which mean any exceptions occurred 
while the consumer is trying to pickup incoming messages, or the likes, will 
now be processed as a me [...]
     "checkCrcs": { "kind": "property", "displayName": "Check Crcs", "group": 
"consumer", "label": "consumer", "required": false, "type": "boolean", 
"javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": "true", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Automatically check the CRC32 of the records 
consumed. This ensures no on-the-wire or on-disk cor [...]
+    "commitTimeoutMs": { "kind": "property", "displayName": "Commit Timeout 
Ms", "group": "consumer", "label": "consumer", "required": false, "type": 
"duration", "javaType": "java.lang.Long", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "5000", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The maximum time, in milliseconds, that the 
code will wait for a synchronous commit to [...]
     "consumerRequestTimeoutMs": { "kind": "property", "displayName": "Consumer 
Request Timeout Ms", "group": "consumer", "label": "consumer", "required": 
false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "40000", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "The configuration 
controls the maximum amount of time the client [...]
     "consumersCount": { "kind": "property", "displayName": "Consumers Count", 
"group": "consumer", "label": "consumer", "required": false, "type": "integer", 
"javaType": "int", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": 1, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The number of consumers that connect to kafka 
server" },
     "consumerStreams": { "kind": "property", "displayName": "Consumer 
Streams", "group": "consumer", "label": "consumer", "required": false, "type": 
"integer", "javaType": "int", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": 10, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Number of concurrent consumers on the 
consumer" },
@@ -133,7 +134,7 @@
     "clientId": { "kind": "parameter", "displayName": "Client Id", "group": 
"common", "label": "common", "required": false, "type": "string", "javaType": 
"java.lang.String", "deprecated": false, "autowired": false, "secret": false, 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "The client id is a 
user-specified string sent in each request to help trace calls. It should 
logically identify the application  [...]
     "headerFilterStrategy": { "kind": "parameter", "displayName": "Header 
Filter Strategy", "group": "common", "label": "common", "required": false, 
"type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy", 
"deprecated": false, "autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "To use a custom HeaderFilterStrategy to filter 
header to and from Camel message." },
     "reconnectBackoffMaxMs": { "kind": "parameter", "displayName": "Reconnect 
Backoff Max Ms", "group": "common", "label": "common", "required": false, 
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "1000", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "The maximum amount of 
time in milliseconds to wait when reconnecting to a  [...]
-    "shutdownTimeout": { "kind": "parameter", "displayName": "Shutdown 
Timeout", "group": "common", "label": "common", "required": false, "type": 
"integer", "javaType": "int", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": 30000, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Timeout in milli seconds to wait gracefully 
for the consumer or producer to shutdown and terminate its  [...]
+    "shutdownTimeout": { "kind": "parameter", "displayName": "Shutdown 
Timeout", "group": "common", "label": "common", "required": false, "type": 
"integer", "javaType": "int", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": 30000, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Timeout in milliseconds to wait gracefully for 
the consumer or producer to shutdown and terminate its w [...]
     "allowManualCommit": { "kind": "parameter", "displayName": "Allow Manual 
Commit", "group": "consumer", "label": "consumer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Whether to allow doing manual commits via 
KafkaManualCommit. If this option is enabled the [...]
     "autoCommitEnable": { "kind": "parameter", "displayName": "Auto Commit 
Enable", "group": "consumer", "label": "consumer", "required": false, "type": 
"boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "true", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "If true, periodically commit to ZooKeeper the 
offset of messages already fetched  [...]
     "autoCommitIntervalMs": { "kind": "parameter", "displayName": "Auto Commit 
Interval Ms", "group": "consumer", "label": "consumer", "required": false, 
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "5000", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "The frequency in ms that 
the consumer offsets are committed to zookeeper." },
@@ -142,6 +143,7 @@
     "breakOnFirstError": { "kind": "parameter", "displayName": "Break On First 
Error", "group": "consumer", "label": "consumer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "This options controls what happens when a 
consumer is processing an exchange and it fails [...]
     "bridgeErrorHandler": { "kind": "parameter", "displayName": "Bridge Error 
Handler", "group": "consumer", "label": "consumer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "description": "Allows for bridging the 
consumer to the Camel routing Error Handler, which mean any exceptions occurred 
while the consumer is trying to pickup incoming messages, or the likes, will 
now be processed as a m [...]
     "checkCrcs": { "kind": "parameter", "displayName": "Check Crcs", "group": 
"consumer", "label": "consumer", "required": false, "type": "boolean", 
"javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": "true", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Automatically check the CRC32 of the records 
consumed. This ensures no on-the-wire or on-disk co [...]
+    "commitTimeoutMs": { "kind": "parameter", "displayName": "Commit Timeout 
Ms", "group": "consumer", "label": "consumer", "required": false, "type": 
"duration", "javaType": "java.lang.Long", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "5000", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The maximum time, in milliseconds, that the 
code will wait for a synchronous commit t [...]
     "consumerRequestTimeoutMs": { "kind": "parameter", "displayName": 
"Consumer Request Timeout Ms", "group": "consumer", "label": "consumer", 
"required": false, "type": "integer", "javaType": "java.lang.Integer", 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 
"40000", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The configuration controls the maximum amount 
of time the clien [...]
     "consumersCount": { "kind": "parameter", "displayName": "Consumers Count", 
"group": "consumer", "label": "consumer", "required": false, "type": "integer", 
"javaType": "int", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": 1, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The number of consumers that connect to kafka 
server" },
     "consumerStreams": { "kind": "parameter", "displayName": "Consumer 
Streams", "group": "consumer", "label": "consumer", "required": false, "type": 
"integer", "javaType": "int", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": 10, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Number of concurrent consumers on the 
consumer" },
diff --git 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
index 83998ac..fe9ff62 100644
--- 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
+++ 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
@@ -53,6 +53,8 @@ public class KafkaComponentConfigurer extends 
PropertyConfigurerSupport implemen
         case "checkCrcs": 
getOrCreateConfiguration(target).setCheckCrcs(property(camelContext, 
java.lang.Boolean.class, value)); return true;
         case "clientid":
         case "clientId": 
getOrCreateConfiguration(target).setClientId(property(camelContext, 
java.lang.String.class, value)); return true;
+        case "committimeoutms":
+        case "commitTimeoutMs": 
getOrCreateConfiguration(target).setCommitTimeoutMs(property(camelContext, 
java.lang.Long.class, value)); return true;
         case "compressioncodec":
         case "compressionCodec": 
getOrCreateConfiguration(target).setCompressionCodec(property(camelContext, 
java.lang.String.class, value)); return true;
         case "configuration": target.setConfiguration(property(camelContext, 
org.apache.camel.component.kafka.KafkaConfiguration.class, value)); return true;
@@ -265,6 +267,8 @@ public class KafkaComponentConfigurer extends 
PropertyConfigurerSupport implemen
         case "checkCrcs": return java.lang.Boolean.class;
         case "clientid":
         case "clientId": return java.lang.String.class;
+        case "committimeoutms":
+        case "commitTimeoutMs": return java.lang.Long.class;
         case "compressioncodec":
         case "compressionCodec": return java.lang.String.class;
         case "configuration": return 
org.apache.camel.component.kafka.KafkaConfiguration.class;
@@ -473,6 +477,8 @@ public class KafkaComponentConfigurer extends 
PropertyConfigurerSupport implemen
         case "checkCrcs": return 
getOrCreateConfiguration(target).getCheckCrcs();
         case "clientid":
         case "clientId": return getOrCreateConfiguration(target).getClientId();
+        case "committimeoutms":
+        case "commitTimeoutMs": return 
getOrCreateConfiguration(target).getCommitTimeoutMs();
         case "compressioncodec":
         case "compressionCodec": return 
getOrCreateConfiguration(target).getCompressionCodec();
         case "configuration": return target.getConfiguration();
diff --git 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
index 6378a21..f71dd44 100644
--- 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
+++ 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
@@ -44,6 +44,8 @@ public class KafkaEndpointConfigurer extends 
PropertyConfigurerSupport implement
         case "checkCrcs": 
target.getConfiguration().setCheckCrcs(property(camelContext, 
java.lang.Boolean.class, value)); return true;
         case "clientid":
         case "clientId": 
target.getConfiguration().setClientId(property(camelContext, 
java.lang.String.class, value)); return true;
+        case "committimeoutms":
+        case "commitTimeoutMs": 
target.getConfiguration().setCommitTimeoutMs(property(camelContext, 
java.lang.Long.class, value)); return true;
         case "compressioncodec":
         case "compressionCodec": 
target.getConfiguration().setCompressionCodec(property(camelContext, 
java.lang.String.class, value)); return true;
         case "connectionmaxidlems":
@@ -244,6 +246,8 @@ public class KafkaEndpointConfigurer extends 
PropertyConfigurerSupport implement
         case "checkCrcs": return java.lang.Boolean.class;
         case "clientid":
         case "clientId": return java.lang.String.class;
+        case "committimeoutms":
+        case "commitTimeoutMs": return java.lang.Long.class;
         case "compressioncodec":
         case "compressionCodec": return java.lang.String.class;
         case "connectionmaxidlems":
@@ -445,6 +449,8 @@ public class KafkaEndpointConfigurer extends 
PropertyConfigurerSupport implement
         case "checkCrcs": return target.getConfiguration().getCheckCrcs();
         case "clientid":
         case "clientId": return target.getConfiguration().getClientId();
+        case "committimeoutms":
+        case "commitTimeoutMs": return 
target.getConfiguration().getCommitTimeoutMs();
         case "compressioncodec":
         case "compressionCodec": return 
target.getConfiguration().getCompressionCodec();
         case "connectionmaxidlems":
diff --git 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java
 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java
index e433eb8..02b385e 100644
--- 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java
+++ 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java
@@ -20,7 +20,7 @@ public class KafkaEndpointUriFactory extends 
org.apache.camel.support.component.
     private static final Set<String> PROPERTY_NAMES;
     private static final Set<String> SECRET_PROPERTY_NAMES;
     static {
-        Set<String> props = new HashSet<>(100);
+        Set<String> props = new HashSet<>(101);
         props.add("synchronous");
         props.add("queueBufferingMaxMessages");
         props.add("allowManualCommit");
@@ -66,6 +66,7 @@ public class KafkaEndpointUriFactory extends 
org.apache.camel.support.component.
         props.add("sslTrustmanagerAlgorithm");
         props.add("compressionCodec");
         props.add("autoCommitOnStop");
+        props.add("commitTimeoutMs");
         props.add("workerPoolCoreSize");
         props.add("autoCommitEnable");
         props.add("consumerRequestTimeoutMs");
diff --git 
a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
 
b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
index 0a52add..e664ddd 100644
--- 
a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
+++ 
b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
@@ -28,7 +28,7 @@
     "configuration": { "kind": "property", "displayName": "Configuration", 
"group": "common", "label": "", "required": false, "type": "object", 
"javaType": "org.apache.camel.component.kafka.KafkaConfiguration", 
"deprecated": false, "autowired": false, "secret": false, "description": 
"Allows to pre-configure the Kafka component with common options that the 
endpoints will reuse." },
     "headerFilterStrategy": { "kind": "property", "displayName": "Header 
Filter Strategy", "group": "common", "label": "common", "required": false, 
"type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy", 
"deprecated": false, "autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "To use a custom HeaderFilterStrategy to filter 
header to and from Camel message." },
     "reconnectBackoffMaxMs": { "kind": "property", "displayName": "Reconnect 
Backoff Max Ms", "group": "common", "label": "common", "required": false, 
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "1000", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "The maximum amount of 
time in milliseconds to wait when reconnecting to a b [...]
-    "shutdownTimeout": { "kind": "property", "displayName": "Shutdown 
Timeout", "group": "common", "label": "common", "required": false, "type": 
"integer", "javaType": "int", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": 30000, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Timeout in milli seconds to wait gracefully 
for the consumer or producer to shutdown and terminate its w [...]
+    "shutdownTimeout": { "kind": "property", "displayName": "Shutdown 
Timeout", "group": "common", "label": "common", "required": false, "type": 
"integer", "javaType": "int", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": 30000, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Timeout in milliseconds to wait gracefully for 
the consumer or producer to shutdown and terminate its wo [...]
     "allowManualCommit": { "kind": "property", "displayName": "Allow Manual 
Commit", "group": "consumer", "label": "consumer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Whether to allow doing manual commits via 
KafkaManualCommit. If this option is enabled then [...]
     "autoCommitEnable": { "kind": "property", "displayName": "Auto Commit 
Enable", "group": "consumer", "label": "consumer", "required": false, "type": 
"boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "true", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "If true, periodically commit to ZooKeeper the 
offset of messages already fetched b [...]
     "autoCommitIntervalMs": { "kind": "property", "displayName": "Auto Commit 
Interval Ms", "group": "consumer", "label": "consumer", "required": false, 
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "5000", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "The frequency in ms that 
the consumer offsets are committed to zookeeper." },
@@ -37,6 +37,7 @@
     "breakOnFirstError": { "kind": "property", "displayName": "Break On First 
Error", "group": "consumer", "label": "consumer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "This options controls what happens when a 
consumer is processing an exchange and it fails. [...]
     "bridgeErrorHandler": { "kind": "property", "displayName": "Bridge Error 
Handler", "group": "consumer", "label": "consumer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "description": "Allows for bridging the 
consumer to the Camel routing Error Handler, which mean any exceptions occurred 
while the consumer is trying to pickup incoming messages, or the likes, will 
now be processed as a me [...]
     "checkCrcs": { "kind": "property", "displayName": "Check Crcs", "group": 
"consumer", "label": "consumer", "required": false, "type": "boolean", 
"javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": "true", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Automatically check the CRC32 of the records 
consumed. This ensures no on-the-wire or on-disk cor [...]
+    "commitTimeoutMs": { "kind": "property", "displayName": "Commit Timeout 
Ms", "group": "consumer", "label": "consumer", "required": false, "type": 
"duration", "javaType": "java.lang.Long", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "5000", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The maximum time, in milliseconds, that the 
code will wait for a synchronous commit to [...]
     "consumerRequestTimeoutMs": { "kind": "property", "displayName": "Consumer 
Request Timeout Ms", "group": "consumer", "label": "consumer", "required": 
false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "40000", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "The configuration 
controls the maximum amount of time the client [...]
     "consumersCount": { "kind": "property", "displayName": "Consumers Count", 
"group": "consumer", "label": "consumer", "required": false, "type": "integer", 
"javaType": "int", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": 1, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The number of consumers that connect to kafka 
server" },
     "consumerStreams": { "kind": "property", "displayName": "Consumer 
Streams", "group": "consumer", "label": "consumer", "required": false, "type": 
"integer", "javaType": "int", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": 10, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Number of concurrent consumers on the 
consumer" },
@@ -133,7 +134,7 @@
     "clientId": { "kind": "parameter", "displayName": "Client Id", "group": 
"common", "label": "common", "required": false, "type": "string", "javaType": 
"java.lang.String", "deprecated": false, "autowired": false, "secret": false, 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "The client id is a 
user-specified string sent in each request to help trace calls. It should 
logically identify the application  [...]
     "headerFilterStrategy": { "kind": "parameter", "displayName": "Header 
Filter Strategy", "group": "common", "label": "common", "required": false, 
"type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy", 
"deprecated": false, "autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "To use a custom HeaderFilterStrategy to filter 
header to and from Camel message." },
     "reconnectBackoffMaxMs": { "kind": "parameter", "displayName": "Reconnect 
Backoff Max Ms", "group": "common", "label": "common", "required": false, 
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "1000", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "The maximum amount of 
time in milliseconds to wait when reconnecting to a  [...]
-    "shutdownTimeout": { "kind": "parameter", "displayName": "Shutdown 
Timeout", "group": "common", "label": "common", "required": false, "type": 
"integer", "javaType": "int", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": 30000, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Timeout in milli seconds to wait gracefully 
for the consumer or producer to shutdown and terminate its  [...]
+    "shutdownTimeout": { "kind": "parameter", "displayName": "Shutdown 
Timeout", "group": "common", "label": "common", "required": false, "type": 
"integer", "javaType": "int", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": 30000, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Timeout in milliseconds to wait gracefully for 
the consumer or producer to shutdown and terminate its w [...]
     "allowManualCommit": { "kind": "parameter", "displayName": "Allow Manual 
Commit", "group": "consumer", "label": "consumer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Whether to allow doing manual commits via 
KafkaManualCommit. If this option is enabled the [...]
     "autoCommitEnable": { "kind": "parameter", "displayName": "Auto Commit 
Enable", "group": "consumer", "label": "consumer", "required": false, "type": 
"boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "true", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "If true, periodically commit to ZooKeeper the 
offset of messages already fetched  [...]
     "autoCommitIntervalMs": { "kind": "parameter", "displayName": "Auto Commit 
Interval Ms", "group": "consumer", "label": "consumer", "required": false, 
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "5000", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "The frequency in ms that 
the consumer offsets are committed to zookeeper." },
@@ -142,6 +143,7 @@
     "breakOnFirstError": { "kind": "parameter", "displayName": "Break On First 
Error", "group": "consumer", "label": "consumer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "This options controls what happens when a 
consumer is processing an exchange and it fails [...]
     "bridgeErrorHandler": { "kind": "parameter", "displayName": "Bridge Error 
Handler", "group": "consumer", "label": "consumer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "description": "Allows for bridging the 
consumer to the Camel routing Error Handler, which mean any exceptions occurred 
while the consumer is trying to pickup incoming messages, or the likes, will 
now be processed as a m [...]
     "checkCrcs": { "kind": "parameter", "displayName": "Check Crcs", "group": 
"consumer", "label": "consumer", "required": false, "type": "boolean", 
"javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": "true", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Automatically check the CRC32 of the records 
consumed. This ensures no on-the-wire or on-disk co [...]
+    "commitTimeoutMs": { "kind": "parameter", "displayName": "Commit Timeout 
Ms", "group": "consumer", "label": "consumer", "required": false, "type": 
"duration", "javaType": "java.lang.Long", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "5000", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The maximum time, in milliseconds, that the 
code will wait for a synchronous commit t [...]
     "consumerRequestTimeoutMs": { "kind": "parameter", "displayName": 
"Consumer Request Timeout Ms", "group": "consumer", "label": "consumer", 
"required": false, "type": "integer", "javaType": "java.lang.Integer", 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 
"40000", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The configuration controls the maximum amount 
of time the clien [...]
     "consumersCount": { "kind": "parameter", "displayName": "Consumers Count", 
"group": "consumer", "label": "consumer", "required": false, "type": "integer", 
"javaType": "int", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": 1, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The number of consumers that connect to kafka 
server" },
     "consumerStreams": { "kind": "parameter", "displayName": "Consumer 
Streams", "group": "consumer", "label": "consumer", "required": false, "type": 
"integer", "javaType": "int", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": 10, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Number of concurrent consumers on the 
consumer" },
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc 
b/components/camel-kafka/src/main/docs/kafka-component.adoc
index f37f524..998df1d 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -75,7 +75,7 @@ The following two sections lists all the options, firstly for 
the component foll
 == Component Options
 
 
-The Kafka component supports 103 options, which are listed below.
+The Kafka component supports 104 options, which are listed below.
 
 
 
@@ -88,7 +88,7 @@ The Kafka component supports 103 options, which are listed 
below.
 | *configuration* (common) | Allows to pre-configure the Kafka component with 
common options that the endpoints will reuse. |  | KafkaConfiguration
 | *headerFilterStrategy* (common) | To use a custom HeaderFilterStrategy to 
filter header to and from Camel message. |  | HeaderFilterStrategy
 | *reconnectBackoffMaxMs* (common) | The maximum amount of time in 
milliseconds to wait when reconnecting to a broker that has repeatedly failed 
to connect. If provided, the backoff per host will increase exponentially for 
each consecutive connection failure, up to this maximum. After calculating the 
backoff increase, 20% random jitter is added to avoid connection storms. | 1000 
| Integer
-| *shutdownTimeout* (common) | Timeout in milli seconds to wait gracefully for 
the consumer or producer to shutdown and terminate its worker threads. | 30000 
| int
+| *shutdownTimeout* (common) | Timeout in milliseconds to wait gracefully for 
the consumer or producer to shutdown and terminate its worker threads. | 30000 
| int
 | *allowManualCommit* (consumer) | Whether to allow doing manual commits via 
KafkaManualCommit. If this option is enabled then an instance of 
KafkaManualCommit is stored on the Exchange message header, which allows end 
users to access this API and perform manual offset commits via the Kafka 
consumer. | false | boolean
 | *autoCommitEnable* (consumer) | If true, periodically commit to ZooKeeper 
the offset of messages already fetched by the consumer. This committed offset 
will be used when the process fails as the position from which the new consumer 
will begin. | true | Boolean
 | *autoCommitIntervalMs* (consumer) | The frequency in ms that the consumer 
offsets are committed to zookeeper. | 5000 | Integer
@@ -97,6 +97,7 @@ The Kafka component supports 103 options, which are listed 
below.
 | *breakOnFirstError* (consumer) | This options controls what happens when a 
consumer is processing an exchange and it fails. If the option is false then 
the consumer continues to the next message and processes it. If the option is 
true then the consumer breaks out, and will seek back to offset of the message 
that caused a failure, and then re-attempt to process this message. However 
this can lead to endless processing of the same message if its bound to fail 
every time, eg a poison mess [...]
 | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the 
Camel routing Error Handler, which mean any exceptions occurred while the 
consumer is trying to pickup incoming messages, or the likes, will now be 
processed as a message and handled by the routing Error Handler. By default the 
consumer will use the org.apache.camel.spi.ExceptionHandler to deal with 
exceptions, that will be logged at WARN or ERROR level and ignored. | false | 
boolean
 | *checkCrcs* (consumer) | Automatically check the CRC32 of the records 
consumed. This ensures no on-the-wire or on-disk corruption to the messages 
occurred. This check adds some overhead, so it may be disabled in cases seeking 
extreme performance. | true | Boolean
+| *commitTimeoutMs* (consumer) | The maximum time, in milliseconds, that the 
code will wait for a synchronous commit to complete | 5000 | Long
 | *consumerRequestTimeoutMs* (consumer) | The configuration controls the 
maximum amount of time the client will wait for the response of a request. If 
the response is not received before the timeout elapses the client will resend 
the request if necessary or fail the request if retries are exhausted. | 40000 
| Integer
 | *consumersCount* (consumer) | The number of consumers that connect to kafka 
server | 1 | int
 | *consumerStreams* (consumer) | Number of concurrent consumers on the 
consumer | 10 | int
@@ -209,7 +210,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (99 parameters):
+=== Query Parameters (100 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -220,7 +221,7 @@ with the following path and query parameters:
 | *clientId* (common) | The client id is a user-specified string sent in each 
request to help trace calls. It should logically identify the application 
making the request. |  | String
 | *headerFilterStrategy* (common) | To use a custom HeaderFilterStrategy to 
filter header to and from Camel message. |  | HeaderFilterStrategy
 | *reconnectBackoffMaxMs* (common) | The maximum amount of time in 
milliseconds to wait when reconnecting to a broker that has repeatedly failed 
to connect. If provided, the backoff per host will increase exponentially for 
each consecutive connection failure, up to this maximum. After calculating the 
backoff increase, 20% random jitter is added to avoid connection storms. | 1000 
| Integer
-| *shutdownTimeout* (common) | Timeout in milli seconds to wait gracefully for 
the consumer or producer to shutdown and terminate its worker threads. | 30000 
| int
+| *shutdownTimeout* (common) | Timeout in milliseconds to wait gracefully for 
the consumer or producer to shutdown and terminate its worker threads. | 30000 
| int
 | *allowManualCommit* (consumer) | Whether to allow doing manual commits via 
KafkaManualCommit. If this option is enabled then an instance of 
KafkaManualCommit is stored on the Exchange message header, which allows end 
users to access this API and perform manual offset commits via the Kafka 
consumer. | false | boolean
 | *autoCommitEnable* (consumer) | If true, periodically commit to ZooKeeper 
the offset of messages already fetched by the consumer. This committed offset 
will be used when the process fails as the position from which the new consumer 
will begin. | true | Boolean
 | *autoCommitIntervalMs* (consumer) | The frequency in ms that the consumer 
offsets are committed to zookeeper. | 5000 | Integer
@@ -229,6 +230,7 @@ with the following path and query parameters:
 | *breakOnFirstError* (consumer) | This options controls what happens when a 
consumer is processing an exchange and it fails. If the option is false then 
the consumer continues to the next message and processes it. If the option is 
true then the consumer breaks out, and will seek back to offset of the message 
that caused a failure, and then re-attempt to process this message. However 
this can lead to endless processing of the same message if its bound to fail 
every time, eg a poison mess [...]
 | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the 
Camel routing Error Handler, which mean any exceptions occurred while the 
consumer is trying to pickup incoming messages, or the likes, will now be 
processed as a message and handled by the routing Error Handler. By default the 
consumer will use the org.apache.camel.spi.ExceptionHandler to deal with 
exceptions, that will be logged at WARN or ERROR level and ignored. | false | 
boolean
 | *checkCrcs* (consumer) | Automatically check the CRC32 of the records 
consumed. This ensures no on-the-wire or on-disk corruption to the messages 
occurred. This check adds some overhead, so it may be disabled in cases seeking 
extreme performance. | true | Boolean
+| *commitTimeoutMs* (consumer) | The maximum time, in milliseconds, that the 
code will wait for a synchronous commit to complete | 5000 | Long
 | *consumerRequestTimeoutMs* (consumer) | The configuration controls the 
maximum amount of time the client will wait for the response of a request. If 
the response is not received before the timeout elapses the client will resend 
the request if necessary or fail the request if retries are exhausted. | 40000 
| Integer
 | *consumersCount* (consumer) | The number of consumers that connect to kafka 
server | 1 | int
 | *consumerStreams* (consumer) | Number of concurrent consumers on the 
consumer | 10 | int
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommit.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommit.java
index 039e5a9..cd03b00 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommit.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommit.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.kafka;
 
+import java.time.Duration;
 import java.util.Collections;
 
 import org.apache.camel.spi.StateRepository;
@@ -35,16 +36,20 @@ public class DefaultKafkaManualCommit implements 
KafkaManualCommit {
     private final StateRepository<String, String> offsetRepository;
     private final TopicPartition partition;
     private final long recordOffset;
+    private final long commitTimeout;
 
     public DefaultKafkaManualCommit(KafkaConsumer consumer, String topicName, 
String threadId,
                                     StateRepository<String, String> 
offsetRepository, TopicPartition partition,
-                                    long recordOffset) {
+                                    long recordOffset, long commitTimeout) {
         this.consumer = consumer;
         this.topicName = topicName;
         this.threadId = threadId;
         this.offsetRepository = offsetRepository;
         this.partition = partition;
         this.recordOffset = recordOffset;
+        this.commitTimeout = commitTimeout;
+
+        LOG.debug("Using commit timeout of {}", commitTimeout);
     }
 
     @Override
@@ -58,7 +63,9 @@ public class DefaultKafkaManualCommit implements 
KafkaManualCommit {
                 offsetRepository.setState(serializeOffsetKey(partition), 
serializeOffsetValue(recordOffset));
             } else {
                 LOG.debug("CommitSync {} from topic {} with offset: {}", 
threadId, topicName, recordOffset);
-                consumer.commitSync(Collections.singletonMap(partition, new 
OffsetAndMetadata(recordOffset + 1)));
+                consumer.commitSync(Collections.singletonMap(partition, new 
OffsetAndMetadata(recordOffset + 1)),
+                        Duration.ofMillis(commitTimeout));
+                LOG.debug("CommitSync done for {} from topic {} with offset: 
{}", threadId, topicName, recordOffset);
             }
         }
     }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommitFactory.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommitFactory.java
index e92a20e..83c14c5 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommitFactory.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommitFactory.java
@@ -27,7 +27,8 @@ public class DefaultKafkaManualCommitFactory implements 
KafkaManualCommitFactory
     public KafkaManualCommit newInstance(
             Exchange exchange, KafkaConsumer consumer, String topicName, 
String threadId,
             StateRepository<String, String> offsetRepository,
-            TopicPartition partition, long recordOffset) {
-        return new DefaultKafkaManualCommit(consumer, topicName, threadId, 
offsetRepository, partition, recordOffset);
+            TopicPartition partition, long recordOffset, long commitTimeout) {
+        return new DefaultKafkaManualCommit(
+                consumer, topicName, threadId, offsetRepository, partition, 
recordOffset, commitTimeout);
     }
 }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index c70f7cb..3197c41 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -145,6 +145,8 @@ public class KafkaConfiguration implements Cloneable, 
HeaderFilterStrategyAware
     private StateRepository<String, String> offsetRepository;
     @UriParam(label = "consumer", defaultValue = "ERROR_HANDLER")
     private PollOnError pollOnError = PollOnError.ERROR_HANDLER;
+    @UriParam(label = "consumer", defaultValue = "5000", javaType = 
"java.time.Duration")
+    private Long commitTimeoutMs = 5000L;
 
     // Producer configuration properties
     @UriParam(label = "producer", defaultValue = 
KafkaConstants.KAFKA_DEFAULT_PARTITIONER)
@@ -695,7 +697,7 @@ public class KafkaConfiguration implements Cloneable, 
HeaderFilterStrategyAware
     }
 
     /**
-     * Timeout in milli seconds to wait gracefully for the consumer or 
producer to shutdown and terminate its worker
+     * Timeout in milliseconds to wait gracefully for the consumer or producer 
to shutdown and terminate its worker
      * threads.
      */
     public void setShutdownTimeout(int shutdownTimeout) {
@@ -1784,4 +1786,17 @@ public class KafkaConfiguration implements Cloneable, 
HeaderFilterStrategyAware
     public void setPollOnError(PollOnError pollOnError) {
         this.pollOnError = pollOnError;
     }
+
+    public Long getCommitTimeoutMs() {
+        return commitTimeoutMs;
+    }
+
+    /**
+     * The maximum time, in milliseconds, that the code will wait for a 
synchronous commit to complete
+     * 
+     * @param commitTimeoutMs
+     */
+    public void setCommitTimeoutMs(Long commitTimeoutMs) {
+        this.commitTimeoutMs = commitTimeoutMs;
+    }
 }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index f80ac69..ef2e4e9 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -21,6 +21,7 @@ import java.util.List;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 
 import org.apache.camel.Processor;
@@ -120,7 +121,9 @@ public class KafkaConsumer extends DefaultConsumer {
             // pre-initialize task during startup so if there is any error we
             // have it thrown asap
             task.preInit();
+
             executor.submit(task);
+
             tasks.add(task);
         }
     }
@@ -139,8 +142,16 @@ public class KafkaConsumer extends DefaultConsumer {
                 LOG.debug("Shutting down Kafka consumer worker threads with 
timeout {} millis", timeout);
                 
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(executor,
 timeout);
             } else {
-                executor.shutdownNow();
+                executor.shutdown();
+
+                int timeout = endpoint.getConfiguration().getShutdownTimeout();
+                LOG.debug("Shutting down Kafka consumer worker threads with 
timeout {} millis", timeout);
+                if (!executor.awaitTermination(timeout, 
TimeUnit.MILLISECONDS)) {
+                    LOG.warn("Shutting down Kafka {} consumer worker threads 
did not finish within {} millis",
+                            tasks.size());
+                }
             }
+
             if (!executor.isTerminated()) {
                 tasks.forEach(KafkaFetchRecords::shutdown);
                 executor.shutdownNow();
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index ee6921c..2acc771 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -18,35 +18,35 @@ package org.apache.camel.component.kafka;
 
 import java.time.Duration;
 import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Pattern;
-import java.util.stream.StreamSupport;
 
 import org.apache.camel.Exchange;
-import org.apache.camel.Message;
-import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer;
-import org.apache.camel.spi.HeaderFilterStrategy;
+import org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor;
+import 
org.apache.camel.component.kafka.consumer.support.PartitionAssignmentListener;
+import org.apache.camel.component.kafka.consumer.support.ResumeStrategy;
+import org.apache.camel.component.kafka.consumer.support.ResumeStrategyFactory;
 import org.apache.camel.spi.StateRepository;
 import org.apache.camel.support.BridgeExceptionHandlerToErrorHandler;
 import org.apache.camel.util.IOHelper;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InterruptException;
-import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.errors.WakeupException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class KafkaFetchRecords implements Runnable, ConsumerRebalanceListener {
+import static 
org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor.serializeOffsetKey;
+
+class KafkaFetchRecords implements Runnable {
     private static final Logger LOG = 
LoggerFactory.getLogger(KafkaFetchRecords.class);
 
     private final KafkaConsumer kafkaConsumer;
@@ -58,6 +58,12 @@ class KafkaFetchRecords implements Runnable, 
ConsumerRebalanceListener {
     private final Map<String, Long> lastProcessedOffset = new 
ConcurrentHashMap<>();
     private final PollExceptionStrategy pollExceptionStrategy;
     private final BridgeExceptionHandlerToErrorHandler bridge;
+    private final ReentrantLock lock = new ReentrantLock();
+    private final AtomicBoolean stopping = new AtomicBoolean(false);
+
+    private volatile boolean retry = true;
+    private volatile boolean reconnect = true;
+    private ResumeStrategy resumeStrategy;
 
     KafkaFetchRecords(KafkaConsumer kafkaConsumer, PollExceptionStrategy 
pollExceptionStrategy,
                       BridgeExceptionHandlerToErrorHandler bridge, String 
topicName, Pattern topicPattern, String id,
@@ -71,361 +77,343 @@ class KafkaFetchRecords implements Runnable, 
ConsumerRebalanceListener {
         this.kafkaProps = kafkaProps;
     }
 
+    void preInit() {
+        createConsumer();
+
+        StateRepository<String, String> offsetRepository = 
kafkaConsumer.getEndpoint().getConfiguration().getOffsetRepository();
+        String seekPolicy = 
kafkaConsumer.getEndpoint().getConfiguration().getSeekTo();
+        resumeStrategy = ResumeStrategyFactory.newResumeStrategy(consumer, 
offsetRepository, seekPolicy);
+    }
+
     @Override
     public void run() {
-        boolean first = true;
-        final AtomicBoolean reTry = new AtomicBoolean(true);
-        final AtomicBoolean reConnect = new AtomicBoolean(true);
+        if (!isKafkaConsumerRunnable()) {
+            return;
+        }
 
-        while (reTry.get() || reConnect.get()) {
+        if (isRetrying() || isReconnecting()) {
             try {
-                if (first || reConnect.get()) {
+                if (isReconnecting()) {
                     // re-initialize on re-connect so we have a fresh consumer
-                    doInit();
+                    createConsumer();
                 }
             } catch (Exception e) {
                 // ensure this is logged so users can see the problem
                 LOG.warn("Error creating 
org.apache.kafka.clients.consumer.KafkaConsumer due {}", e.getMessage(), e);
             }
 
-            if (!first) {
-                // skip one poll timeout before trying again
-                long delay = 
kafkaConsumer.getEndpoint().getConfiguration().getPollTimeoutMs();
-                String prefix = reConnect.get() ? "Reconnecting" : "Retrying";
-                LOG.info("{} {} to topic {} after {} ms", prefix, threadId, 
topicName, delay);
-                try {
-                    Thread.sleep(delay);
-                } catch (InterruptedException e) {
-                    boolean stopping = 
kafkaConsumer.getEndpoint().getCamelContext().isStopping();
-                    if (stopping) {
-                        LOG.info(
-                                "CamelContext is stopping so terminating 
KafkaConsumer thread: {} receiving from topic: {}",
-                                threadId, topicName);
-                        return;
-                    }
-                }
-            }
-
-            first = false;
-
-            if (isCloseable()) {
-                LOG.debug("Closing consumer {}", threadId);
-                IOHelper.close(consumer);
-                return;
-            }
+            long delay = 
kafkaConsumer.getEndpoint().getConfiguration().getPollTimeoutMs();
+            String prefix = isReconnecting() ? "Reconnecting" : "Retrying";
+            LOG.info("{} {} to topic {} after {} ms", prefix, threadId, 
topicName, delay);
 
-            // doRun keeps running until we either shutdown or is told to 
re-connect
-            doRun(reTry, reConnect);
+            doRun();
         }
 
         LOG.info("Terminating KafkaConsumer thread: {} receiving from topic: 
{}", threadId, topicName);
+        safeUnsubscribe();
+        IOHelper.close(consumer);
     }
 
-    private boolean isCloseable() {
-        return !kafkaConsumer.isRunAllowed() || 
kafkaConsumer.isStoppingOrStopped() || kafkaConsumer.isSuspendingOrSuspended();
-    }
-
-    void preInit() {
-        doInit();
-    }
-
-    protected void doInit() {
+    protected void createConsumer() {
         // create consumer
         ClassLoader threadClassLoader = 
Thread.currentThread().getContextClassLoader();
         try {
-            // Kafka uses reflection for loading authentication settings,
-            // use its classloader
+            // Kafka uses reflection for loading authentication settings, use 
its classloader
             Thread.currentThread()
                     
.setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader());
-            // this may throw an exception if something is wrong with kafka
-            // consumer
+
+            // this may throw an exception if something is wrong with kafka 
consumer
             this.consumer = 
kafkaConsumer.getEndpoint().getKafkaClientFactory().getConsumer(kafkaProps);
         } finally {
             Thread.currentThread().setContextClassLoader(threadClassLoader);
         }
     }
 
-    @SuppressWarnings("unchecked")
-    protected void doRun(AtomicBoolean retry, AtomicBoolean reconnect) {
-        if (reconnect.get()) {
+    protected void doRun() {
+        if (isReconnecting()) {
+            subscribe();
+
             // on first run or reconnecting
-            doReconnectRun();
-            // set reconnect to false as its done now
-            reconnect.set(false);
+            resume();
+
+            // set reconnect to false as the connection and resume is done at 
this point
+            setReconnect(false);
+
             // set retry to true to continue polling
-            retry.set(true);
+            setRetry(true);
         }
-        // polling
-        doPollRun(retry, reconnect);
+
+        // start polling
+        startPolling();
+    }
+
+    protected void resume() {
+        resumeStrategy.resume();
     }
 
-    protected void doReconnectRun() {
+    private void subscribe() {
+        PartitionAssignmentListener listener = new PartitionAssignmentListener(
+                threadId, topicName,
+                kafkaConsumer.getEndpoint().getConfiguration(), consumer, 
lastProcessedOffset, this::isRunnable);
+
         if (topicPattern != null) {
             LOG.info("Subscribing {} to topic pattern {}", threadId, 
topicName);
-            consumer.subscribe(topicPattern, this);
+            consumer.subscribe(topicPattern, listener);
         } else {
             LOG.info("Subscribing {} to topic {}", threadId, topicName);
-            consumer.subscribe(Arrays.asList(topicName.split(",")), this);
-        }
-
-        StateRepository<String, String> offsetRepository = 
kafkaConsumer.getEndpoint().getConfiguration().getOffsetRepository();
-        if (offsetRepository != null) {
-            resumeFromOffsetRepository(offsetRepository);
-        } else if (kafkaConsumer.getEndpoint().getConfiguration().getSeekTo() 
!= null) {
-            resumeFromSeekPolicy();
-        }
-    }
-
-    private void resumeFromOffsetRepository(StateRepository<String, String> 
offsetRepository) {
-        for (TopicPartition topicPartition : (Set<TopicPartition>) 
consumer.assignment()) {
-            String offsetState = 
offsetRepository.getState(serializeOffsetKey(topicPartition));
-            if (offsetState != null && !offsetState.isEmpty()) {
-                resumeFromOffset(topicPartition, offsetState);
-            }
+            consumer.subscribe(Arrays.asList(topicName.split(",")), listener);
         }
     }
 
-    private void resumeFromOffset(TopicPartition topicPartition, String 
offsetState) {
-        // The state contains the last read offset, so you need to seek from 
the next one
-        long offset = deserializeOffsetValue(offsetState) + 1;
-        LOG.debug("Resuming partition {} from offset {} from state", 
topicPartition.partition(), offset);
-        consumer.seek(topicPartition, offset);
-    }
-
-    private void resumeFromSeekPolicy() {
-        if 
(kafkaConsumer.getEndpoint().getConfiguration().getSeekTo().equals("beginning"))
 {
-            LOG.debug("{} is seeking to the beginning on topic {}", threadId, 
topicName);
-            // This poll to ensure we have an assigned partition
-            // otherwise seek won't work
-            consumer.poll(Duration.ofMillis(100));
-            consumer.seekToBeginning(consumer.assignment());
-        } else if 
(kafkaConsumer.getEndpoint().getConfiguration().getSeekTo().equals("end")) {
-            LOG.debug("{} is seeking to the end on topic {}", threadId, 
topicName);
-            // This poll to ensures we have an assigned partition
-            // otherwise seek won't work
-            consumer.poll(Duration.ofMillis(100));
-            consumer.seekToEnd(consumer.assignment());
-        }
-    }
-
-    protected void doPollRun(AtomicBoolean retry, AtomicBoolean reconnect) {
-        StateRepository<String, String> offsetRepository = 
kafkaConsumer.getEndpoint().getConfiguration().getOffsetRepository();
-
-        // allow to re-connect thread in case we use that to retry failed 
messages
-        boolean unsubscribing = false;
-
-        TopicPartition partition = null;
+    protected void startPolling() {
         long partitionLastOffset = -1;
 
         try {
-            while (kafkaConsumer.isRunAllowed() && 
!kafkaConsumer.isStoppingOrStopped()
-                    && !kafkaConsumer.isSuspendingOrSuspended()
-                    && retry.get() && !reconnect.get()) {
-
-                // flag to break out processing on the first exception
-                boolean breakOnErrorHit = false;
+            while (isKafkaConsumerRunnable() && isRetrying() && 
!isReconnecting()) {
                 long pollTimeoutMs = 
kafkaConsumer.getEndpoint().getConfiguration().getPollTimeoutMs();
 
                 LOG.trace("Polling {} from topic: {} with timeout: {}", 
threadId, topicName, pollTimeoutMs);
                 ConsumerRecords<Object, Object> allRecords = 
consumer.poll(Duration.ofMillis(pollTimeoutMs));
-
-                Iterator<TopicPartition> partitionIterator = 
allRecords.partitions().iterator();
-                while (partitionIterator.hasNext()) {
-                    partition = partitionIterator.next();
-                    partitionLastOffset = -1;
-
-                    Iterator<ConsumerRecord<Object, Object>> recordIterator = 
allRecords.records(partition).iterator();
-                    LOG.debug("Records count {} received for partition {}", 
allRecords.records(partition).size(),
-                            partition);
-                    if (!breakOnErrorHit && recordIterator.hasNext()) {
-
-                        while (!breakOnErrorHit && recordIterator.hasNext()) {
-                            ConsumerRecord<Object, Object> record = 
recordIterator.next();
-                            if (LOG.isTraceEnabled()) {
-                                LOG.trace("Partition = {}, offset = {}, key = 
{}, value = {}", record.partition(),
-                                        record.offset(), record.key(), 
record.value());
-                            }
-                            Exchange exchange = createKafkaExchange(record);
-
-                            propagateHeaders(record, exchange, 
kafkaConsumer.getEndpoint().getConfiguration());
-
-                            // if not auto commit then we have additional
-                            // information on the exchange
-                            if (!isAutoCommitEnabled()) {
-                                
exchange.getIn().setHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT,
-                                        !recordIterator.hasNext());
-                            }
-                            if 
(kafkaConsumer.getEndpoint().getConfiguration().isAllowManualCommit()) {
-                                // allow Camel users to access the Kafka
-                                // consumer API to be able to do for example
-                                // manual commits
-                                KafkaManualCommit manual
-                                        = 
kafkaConsumer.getEndpoint().getComponent().getKafkaManualCommitFactory()
-                                                .newInstance(exchange, 
consumer, topicName, threadId,
-                                                        offsetRepository, 
partition, record.offset());
-                                
exchange.getIn().setHeader(KafkaConstants.MANUAL_COMMIT, manual);
-                            }
-                            // if commit management is on user side give 
additional info for the end of poll loop
-                            if (!isAutoCommitEnabled()
-                                    || 
kafkaConsumer.getEndpoint().getConfiguration().isAllowManualCommit()) {
-                                
exchange.getIn().setHeader(KafkaConstants.LAST_POLL_RECORD,
-                                        !recordIterator.hasNext() && 
!partitionIterator.hasNext());
-                            }
-
-                            try {
-                                kafkaConsumer.getProcessor().process(exchange);
-                            } catch (Exception e) {
-                                exchange.setException(e);
-                            }
-
-                            if (exchange.getException() != null) {
-                                // processing failed due to an unhandled
-                                // exception, what should we do
-                                if 
(kafkaConsumer.getEndpoint().getConfiguration().isBreakOnFirstError()) {
-                                    // we are failing and we should break out
-                                    LOG.warn(
-                                            "Error during processing {} from 
topic: {}. Will seek consumer to offset: {} and re-connect and start polling 
again.",
-                                            exchange, topicName, 
partitionLastOffset, exchange.getException());
-                                    // force commit so we resume on next poll 
where we failed
-                                    commitOffset(offsetRepository, partition, 
partitionLastOffset, false, true);
-                                    // continue to next partition
-                                    breakOnErrorHit = true;
-                                } else {
-                                    // will handle/log the exception and
-                                    // then continue to next
-                                    
kafkaConsumer.getExceptionHandler().handleException("Error during processing", 
exchange,
-                                            exchange.getException());
-                                }
-                            } else {
-                                // record was success so remember its offset
-                                partitionLastOffset = record.offset();
-                                // lastOffsetProcessed would be used by
-                                // Consumer re-balance listener to preserve
-                                // offset state upon partition revoke
-                                
lastProcessedOffset.put(serializeOffsetKey(partition), partitionLastOffset);
-                            }
-
-                            // success so release the exchange
-                            kafkaConsumer.releaseExchange(exchange, false);
-                        }
-
-                        if (!breakOnErrorHit) {
-                            // all records processed from partition so commit 
them
-                            commitOffset(offsetRepository, partition, 
partitionLastOffset, false, false);
-                        }
-                    }
+                if (allRecords.isEmpty()) {
+                    LOG.debug("No records received when polling ... 
(continuing)");
                 }
 
-                if (breakOnErrorHit) {
-                    // force re-connect
-                    reconnect.set(true);
-                    retry.set(false); // to close the current consumer
-                }
+                partitionLastOffset = processPolledRecords(allRecords);
             }
 
-            if (!reconnect.get()) {
-                if (isAutoCommitEnabled()) {
-                    if 
("async".equals(kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitOnStop()))
 {
-                        LOG.info("Auto commitAsync on stop {} from topic {}", 
threadId, topicName);
-                        consumer.commitAsync();
-                    } else if 
("sync".equals(kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitOnStop()))
 {
-                        LOG.info("Auto commitSync on stop {} from topic {}", 
threadId, topicName);
-                        consumer.commitSync();
-                    } else if 
("none".equals(kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitOnStop()))
 {
-                        LOG.info("Auto commit on stop {} from topic {} is 
disabled (none)", threadId, topicName);
-                    }
-                }
+            if (!isReconnecting()) {
+                LOG.debug("Not reconnecting, check whether to auto-commit or 
not ...");
+                commit();
             }
 
-            LOG.info("Unsubscribing {} from topic {}", threadId, topicName);
-            // we are unsubscribing so do not re connect
-            unsubscribing = true;
-            consumer.unsubscribe();
+            safeUnsubscribe();
         } catch (InterruptException e) {
             kafkaConsumer.getExceptionHandler().handleException("Interrupted 
while consuming " + threadId + " from kafka topic",
                     e);
+            commit();
+
             LOG.info("Unsubscribing {} from topic {}", threadId, topicName);
-            consumer.unsubscribe();
+            safeUnsubscribe();
             Thread.currentThread().interrupt();
+        } catch (WakeupException e) {
+            // This is normal: it raises this exception when calling the 
wakeUp (which happens when we stop)
+            LOG.trace("The kafka consumer was woken up while polling on thread 
{} for topic {}", threadId, topicName);
+            safeUnsubscribe();
         } catch (Exception e) {
             if (LOG.isDebugEnabled()) {
-                LOG.debug("Exception caught while polling " + threadId + " 
from kafka topic " + topicName
-                          + " at offset " + lastProcessedOffset + ". Deciding 
what to do.",
-                        e);
-            }
-            if (unsubscribing) {
-                // some kind of error in kafka, it may happen during 
unsubscribing
-                kafkaConsumer.getExceptionHandler().handleException(
-                        "Error unsubscribing " + threadId + " from kafka topic 
" + topicName,
-                        e);
+                LOG.warn("Exception {} caught while polling {} from kafka 
topic {} at offset {}: {}",
+                        e.getClass().getName(), threadId, topicName, 
lastProcessedOffset, e.getMessage(), e);
             } else {
-                PollOnError onError = pollExceptionStrategy.handleException(e);
-                if (PollOnError.RETRY == onError) {
-                    LOG.warn(
-                            "{} consuming {} from topic {} causedby {}. Will 
attempt again polling the same message (stacktrace in DEBUG logging level)",
-                            e.getClass().getName(), threadId, topicName, 
e.getMessage());
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug(
-                                "KafkaException consuming {} from topic {} 
causedby {}. Will attempt again polling the same message",
-                                threadId, topicName, e.getMessage(), e);
-                    }
-                    // consumer retry the same message again
-                    retry.set(true);
-                } else if (PollOnError.RECONNECT == onError) {
-                    LOG.warn(
-                            "{} consuming {} from topic {} causedby {}. Will 
attempt to re-connect on next run (stacktrace in DEBUG logging level)",
-                            e.getClass().getName(), threadId, topicName, 
e.getMessage());
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug(
-                                "{} consuming {} from topic {} causedby {}. 
Will attempt to re-connect on next run",
-                                e.getClass().getName(), threadId, topicName, 
e.getMessage(), e);
-                    }
-                    // re-connect so the consumer can try the same message 
again
-                    reconnect.set(true);
-                    retry.set(false); // to close the current consumer
-                } else if (PollOnError.ERROR_HANDLER == onError) {
-                    // use bridge error handler to route with exception
-                    bridge.handleException(e);
-                    // skip this poison message and seek to next message
-                    seekToNextOffset(partitionLastOffset);
-                } else if (PollOnError.DISCARD == onError) {
-                    // discard message
-                    LOG.warn(
-                            "{} consuming {} from topic {} causedby {}. Will 
discard the message and continue to poll the next message (stracktrace in DEBUG 
logging level).",
-                            e.getClass().getName(), threadId, topicName, 
e.getMessage());
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug(
-                                "{} consuming {} from topic {} causedby {}. 
Will discard the message and continue to poll the next message.",
-                                e.getClass().getName(), threadId, topicName, 
e.getMessage(), e);
-                    }
-                    // skip this poison message and seek to next message
-                    seekToNextOffset(partitionLastOffset);
-                } else if (PollOnError.STOP == onError) {
-                    // stop and terminate consumer
-                    LOG.warn(
-                            "{} consuming {} from topic {} causedby {}. Will 
stop consumer (stacktrace in DEBUG logging level).",
-                            e.getClass().getName(), threadId, topicName, 
e.getMessage());
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug(
-                                "{} consuming {} from topic {} causedby {}. 
Will stop consumer.",
-                                e.getClass().getName(), threadId, topicName, 
e.getMessage(), e);
-                    }
-                    retry.set(false);
-                    reconnect.set(false);
-                }
+                LOG.warn("Exception {} caught while polling {} from kafka 
topic {} at offset {}: {}",
+                        e.getClass().getName(), threadId, topicName, 
lastProcessedOffset, e.getMessage());
             }
+
+            handleAccordingToStrategy(partitionLastOffset, e);
         } finally {
             // only close if not retry
-            if (!retry.get()) {
+            if (!isRetrying()) {
                 LOG.debug("Closing consumer {}", threadId);
                 IOHelper.close(consumer);
             }
         }
     }
 
+    private void handleAccordingToStrategy(long partitionLastOffset, Exception 
e) {
+        PollOnError onError = pollExceptionStrategy.handleException(e);
+        if (PollOnError.RETRY == onError) {
+            handlePollRetry();
+        } else if (PollOnError.RECONNECT == onError) {
+            handlePollReconnect();
+        } else if (PollOnError.ERROR_HANDLER == onError) {
+            handlePollErrorHandler(partitionLastOffset, e);
+        } else if (PollOnError.DISCARD == onError) {
+            handlePollDiscard(partitionLastOffset);
+        } else if (PollOnError.STOP == onError) {
+            handlePollStop();
+        }
+    }
+
+    private void safeUnsubscribe() {
+        try {
+            consumer.unsubscribe();
+        } catch (Exception e) {
+            kafkaConsumer.getExceptionHandler().handleException(
+                    "Error unsubscribing " + threadId + " from kafka topic " + 
topicName,
+                    e);
+        }
+    }
+
+    private void commit() {
+        if (isAutoCommitEnabled()) {
+            if 
("async".equals(kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitOnStop()))
 {
+                LOG.info("Auto commitAsync on stop {} from topic {}", 
threadId, topicName);
+                consumer.commitAsync();
+            } else if 
("sync".equals(kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitOnStop()))
 {
+                LOG.info("Auto commitSync on stop {} from topic {}", threadId, 
topicName);
+                consumer.commitSync();
+            } else if 
("none".equals(kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitOnStop()))
 {
+                LOG.info("Auto commit on stop {} from topic {} is disabled 
(none)", threadId, topicName);
+            }
+        }
+    }
+
+    private void handlePollStop() {
+        // stop and terminate consumer
+        LOG.warn("Requesting the consumer to stop based on polling exception 
strategy");
+
+        setRetry(false);
+        setReconnect(false);
+    }
+
+    private void handlePollDiscard(long partitionLastOffset) {
+        LOG.warn("Requesting the consumer to discard the message and continue 
to the next based on polling exception strategy");
+
+        // skip this poison message and seek to next message
+        seekToNextOffset(partitionLastOffset);
+    }
+
+    private void handlePollErrorHandler(long partitionLastOffset, Exception e) 
{
+        LOG.warn("Deferring processing to the exception handler based on 
polling exception strategy");
+
+        // use bridge error handler to route with exception
+        bridge.handleException(e);
+        // skip this poison message and seek to next message
+        seekToNextOffset(partitionLastOffset);
+    }
+
+    private void handlePollReconnect() {
+        LOG.warn("Requesting the consumer to re-connect on the next run based 
on polling exception strategy");
+
+        // re-connect so the consumer can try the same message again
+        setReconnect(true);
+
+        // to close the current consumer
+        setRetry(false);
+    }
+
+    private void handlePollRetry() {
+        LOG.warn("Requesting the consumer to retry polling the same message 
based on polling exception strategy");
+
+        // consumer retry the same message again
+        setRetry(true);
+    }
+
+    private boolean isKafkaConsumerRunnable() {
+        return kafkaConsumer.isRunAllowed() && 
!kafkaConsumer.isStoppingOrStopped()
+                && !kafkaConsumer.isSuspendingOrSuspended();
+    }
+
+    private boolean isRunnable() {
+        return kafkaConsumer.getEndpoint().getCamelContext().isStopping() && 
!kafkaConsumer.isRunAllowed();
+    }
+
+    private long processPolledRecords(ConsumerRecords<Object, Object> 
allRecords) {
+        logRecords(allRecords);
+
+        Iterator<TopicPartition> partitionIterator = 
allRecords.partitions().iterator();
+        KafkaRecordProcessor.ProcessResult lastResult = 
KafkaRecordProcessor.ProcessResult.newUnprocessed();
+
+        while (partitionIterator.hasNext() && !isStopping()) {
+            lastResult = KafkaRecordProcessor.ProcessResult.newUnprocessed();
+            TopicPartition partition = partitionIterator.next();
+
+            Iterator<ConsumerRecord<Object, Object>> recordIterator = 
allRecords.records(partition).iterator();
+
+            logRecordsInPartition(allRecords, partition);
+
+            KafkaRecordProcessor kafkaRecordProcessor = 
buildKafkaRecordProcessor();
+
+            while (!lastResult.isBreakOnErrorHit() && recordIterator.hasNext() 
&& !isStopping()) {
+                ConsumerRecord<Object, Object> record = recordIterator.next();
+
+                try {
+                    /* 
+                     * We lock the processing of the record to avoid raising a 
WakeUpException as a result to a call 
+                     * to stop() or shutdown().  
+                     */
+                    lock.lock();
+
+                    lastResult = processRecord(partition, 
partitionIterator.hasNext(), recordIterator.hasNext(), lastResult,
+                            kafkaRecordProcessor, record);
+                } finally {
+                    lock.unlock();
+                }
+            }
+        }
+
+        if (lastResult.isBreakOnErrorHit()) {
+            LOG.debug("We hit an error ... setting flags to force reconnect");
+            // force re-connect
+            setReconnect(true);
+            setRetry(false); // to close the current consumer
+        }
+
+        return lastResult.getPartitionLastOffset();
+    }
+
+    private void logRecordsInPartition(ConsumerRecords<Object, Object> 
allRecords, TopicPartition partition) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Records count {} received for partition {}", 
allRecords.records(partition).size(),
+                    partition);
+        }
+    }
+
+    private void logRecords(ConsumerRecords<Object, Object> allRecords) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Last poll on thread {} resulted on {} records to 
process", threadId, allRecords.count());
+        }
+    }
+
+    private KafkaRecordProcessor.ProcessResult processRecord(
+            TopicPartition partition,
+            boolean partitionHasNext,
+            boolean recordHasNext,
+            KafkaRecordProcessor.ProcessResult lastResult,
+            KafkaRecordProcessor kafkaRecordProcessor,
+            ConsumerRecord<Object, Object> record) {
+
+        logRecord(record);
+
+        Exchange exchange = kafkaConsumer.createExchange(false);
+
+        lastResult = kafkaRecordProcessor.processExchange(exchange, partition, 
partitionHasNext,
+                recordHasNext, record, lastResult, 
kafkaConsumer.getExceptionHandler());
+
+        if (!lastResult.isBreakOnErrorHit()) {
+            lastProcessedOffset.put(serializeOffsetKey(partition), 
lastResult.getPartitionLastOffset());
+        }
+
+        // success so release the exchange
+        kafkaConsumer.releaseExchange(exchange, false);
+
+        if (!lastResult.isBreakOnErrorHit()) {
+            LOG.debug("Committing offset on successful execution");
+            // all records processed from partition so commit them
+            kafkaRecordProcessor.commitOffset(partition, 
lastResult.getPartitionLastOffset(), false, false,
+                    threadId);
+        }
+        return lastResult;
+    }
+
+    private void logRecord(ConsumerRecord<Object, Object> record) {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Partition = {}, offset = {}, key = {}, value = {}", 
record.partition(),
+                    record.offset(), record.key(), record.value());
+        }
+    }
+
+    private KafkaRecordProcessor buildKafkaRecordProcessor() {
+        return new KafkaRecordProcessor(
+                isAutoCommitEnabled(),
+                kafkaConsumer.getEndpoint().getConfiguration(),
+                kafkaConsumer.getProcessor(),
+                consumer,
+                
kafkaConsumer.getEndpoint().getComponent().getKafkaManualCommitFactory(), 
threadId);
+    }
+
     private void seekToNextOffset(long partitionLastOffset) {
         boolean logged = false;
-        Set<TopicPartition> tps = (Set<TopicPartition>) consumer.assignment();
+        Set<TopicPartition> tps = consumer.assignment();
         if (tps != null && partitionLastOffset != -1) {
             long next = partitionLastOffset + 1;
             LOG.info("Consumer seeking to next offset {} to continue polling 
next message from topic: {}", next, topicName);
@@ -445,124 +433,67 @@ class KafkaFetchRecords implements Runnable, 
ConsumerRebalanceListener {
         }
     }
 
-    private void commitOffset(
-            StateRepository<String, String> offsetRepository, TopicPartition 
partition, long partitionLastOffset,
-            boolean stopping, boolean forceCommit) {
-        if (partitionLastOffset != -1) {
-            if 
(!kafkaConsumer.getEndpoint().getConfiguration().isAllowManualCommit() && 
offsetRepository != null) {
-                LOG.debug("Saving offset repository state {} [topic: {} 
partition: {} offset: {}]", threadId, topicName,
-                        partition.partition(),
-                        partitionLastOffset);
-                offsetRepository.setState(serializeOffsetKey(partition), 
serializeOffsetValue(partitionLastOffset));
-            } else if (stopping) {
-                // if we are stopping then react according to the configured 
option
-                if 
("async".equals(kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitOnStop()))
 {
-                    LOG.debug("Auto commitAsync on stop {} from topic {}", 
threadId, topicName);
-                    consumer.commitAsync(
-                            Collections.singletonMap(partition, new 
OffsetAndMetadata(partitionLastOffset + 1)), null);
-                } else if 
("sync".equals(kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitOnStop()))
 {
-                    LOG.debug("Auto commitSync on stop {} from topic {}", 
threadId, topicName);
-                    consumer.commitSync(
-                            Collections.singletonMap(partition, new 
OffsetAndMetadata(partitionLastOffset + 1)));
-                } else if 
("none".equals(kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitOnStop()))
 {
-                    LOG.debug("Auto commit on stop {} from topic {} is 
disabled (none)", threadId, topicName);
-                }
-            } else if (forceCommit) {
-                LOG.debug("Forcing commitSync {} [topic: {} partition: {} 
offset: {}]", threadId, topicName,
-                        partition.partition(), partitionLastOffset);
-                consumer.commitSync(Collections.singletonMap(partition, new 
OffsetAndMetadata(partitionLastOffset + 1)));
-            }
-        }
+    private boolean isRetrying() {
+        return retry;
     }
 
-    void stop() {
-        // As advised in the KAFKA-1894 ticket, calling this wakeup method
-        // breaks the infinite loop
-        consumer.wakeup();
+    private void setRetry(boolean value) {
+        retry = value;
     }
 
-    void shutdown() {
-        // As advised in the KAFKA-1894 ticket, calling this wakeup method
-        // breaks the infinite loop
-        consumer.wakeup();
+    private boolean isReconnecting() {
+        return reconnect;
     }
 
-    @Override
-    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
-        LOG.debug("onPartitionsRevoked: {} from topic {}", threadId, 
topicName);
-
-        // if camel is stopping, or we are not running
-        boolean stopping = 
kafkaConsumer.getEndpoint().getCamelContext().isStopping() && 
!kafkaConsumer.isRunAllowed();
-
-        StateRepository<String, String> offsetRepository = 
kafkaConsumer.getEndpoint().getConfiguration().getOffsetRepository();
-        for (TopicPartition partition : partitions) {
-            String offsetKey = serializeOffsetKey(partition);
-            Long offset = lastProcessedOffset.get(offsetKey);
-            if (offset == null) {
-                offset = -1L;
-            }
-            try {
-                // only commit offsets if the component has control
-                if 
(kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitEnable()) {
-                    commitOffset(offsetRepository, partition, offset, 
stopping, false);
-                }
-            } catch (Exception e) {
-                LOG.error("Error saving offset repository state {} from 
offsetKey {} with offset: {}", threadId, offsetKey,
-                        offset);
-                throw e;
-            } finally {
-                lastProcessedOffset.remove(offsetKey);
-            }
-        }
+    private void setReconnect(boolean value) {
+        reconnect = value;
     }
 
-    @Override
-    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
-        LOG.debug("onPartitionsAssigned: {} from topic {}", threadId, 
topicName);
+    private void setStopping(boolean value) {
+        stopping.set(value);
+    }
 
-        StateRepository<String, String> offsetRepository = 
kafkaConsumer.getEndpoint().getConfiguration().getOffsetRepository();
-        if (offsetRepository != null) {
-            for (TopicPartition partition : partitions) {
-                String offsetState = 
offsetRepository.getState(serializeOffsetKey(partition));
-                if (offsetState != null && !offsetState.isEmpty()) {
-                    // The state contains the last read offset, so you need to 
seek from the next one
-                    resumeFromOffset(partition, offsetState);
-                }
-            }
-        }
+    private boolean isStopping() {
+        return stopping.get();
     }
 
-    @SuppressWarnings("rawtypes")
-    private Exchange createKafkaExchange(ConsumerRecord record) {
-        Exchange exchange = kafkaConsumer.createExchange(false);
+    /*
+     * This wraps a safe stop procedure that should help ensure a clean 
termination procedure for consumer code.
+     * This means that it should wait for the last process call to finish 
cleanly, including the commit of the
+     * record being processed at the current moment.
+     *
+     * Note: keep in mind that the KafkaConsumer is not thread-safe, so no 
other call to the consumer instance
+     * should be made here besides the wakeUp.
+     */
+    private void safeStop() {
+        setStopping(true);
+        long timeout = 
kafkaConsumer.getEndpoint().getConfiguration().getShutdownTimeout();
+        try {
+            /*
+             Try to wait for the processing to finish before giving up and 
waking up the Kafka consumer regardless
+             of whether the processing have finished or not.
+             */
+            LOG.info("Waiting up to {} milliseconds for the processing to 
finish", timeout);
+            if (!lock.tryLock(timeout, TimeUnit.MILLISECONDS)) {
+                LOG.warn("The processing of the current record did not finish 
within {} seconds", timeout);
+            }
 
-        Message message = exchange.getIn();
-        message.setHeader(KafkaConstants.PARTITION, record.partition());
-        message.setHeader(KafkaConstants.TOPIC, record.topic());
-        message.setHeader(KafkaConstants.OFFSET, record.offset());
-        message.setHeader(KafkaConstants.HEADERS, record.headers());
-        message.setHeader(KafkaConstants.TIMESTAMP, record.timestamp());
-        message.setHeader(Exchange.MESSAGE_TIMESTAMP, record.timestamp());
-        if (record.key() != null) {
-            message.setHeader(KafkaConstants.KEY, record.key());
+            // As advised in the KAFKA-1894 ticket, calling this wakeup method 
breaks the infinite loop
+            consumer.wakeup();
+        } catch (InterruptedException e) {
+            consumer.wakeup();
+            Thread.currentThread().interrupt();
+        } finally {
+            lock.unlock();
         }
-        message.setBody(record.value());
-
-        return exchange;
     }
 
-    private void propagateHeaders(
-            ConsumerRecord<Object, Object> record, Exchange exchange, 
KafkaConfiguration kafkaConfiguration) {
-        HeaderFilterStrategy headerFilterStrategy = 
kafkaConfiguration.getHeaderFilterStrategy();
-        KafkaHeaderDeserializer headerDeserializer = 
kafkaConfiguration.getHeaderDeserializer();
-        StreamSupport.stream(record.headers().spliterator(), false)
-                .filter(header -> shouldBeFiltered(header, exchange, 
headerFilterStrategy))
-                .forEach(header -> exchange.getIn().setHeader(header.key(),
-                        headerDeserializer.deserialize(header.key(), 
header.value())));
+    void stop() {
+        safeStop();
     }
 
-    private boolean shouldBeFiltered(Header header, Exchange exchange, 
HeaderFilterStrategy headerFilterStrategy) {
-        return 
!headerFilterStrategy.applyFilterToExternalHeaders(header.key(), 
header.value(), exchange);
+    void shutdown() {
+        safeStop();
     }
 
     private boolean isAutoCommitEnabled() {
@@ -570,15 +501,4 @@ class KafkaFetchRecords implements Runnable, 
ConsumerRebalanceListener {
                 && 
kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitEnable();
     }
 
-    private String serializeOffsetKey(TopicPartition topicPartition) {
-        return topicPartition.topic() + '/' + topicPartition.partition();
-    }
-
-    private String serializeOffsetValue(long offset) {
-        return String.valueOf(offset);
-    }
-
-    private long deserializeOffsetValue(String offset) {
-        return Long.parseLong(offset);
-    }
 }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaManualCommitFactory.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaManualCommitFactory.java
index c8b8d7a..030a561 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaManualCommitFactory.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaManualCommitFactory.java
@@ -32,5 +32,5 @@ public interface KafkaManualCommitFactory {
     KafkaManualCommit newInstance(
             Exchange exchange, KafkaConsumer consumer, String topicName, 
String threadId,
             StateRepository<String, String> offsetRepository,
-            TopicPartition partition, long recordOffset);
+            TopicPartition partition, long recordOffset, long commitTimeout);
 }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
new file mode 100644
index 0000000..0361426
--- /dev/null
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.kafka.consumer.support;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.stream.StreamSupport;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.component.kafka.KafkaConfiguration;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.component.kafka.KafkaManualCommit;
+import org.apache.camel.component.kafka.KafkaManualCommitFactory;
+import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer;
+import org.apache.camel.spi.ExceptionHandler;
+import org.apache.camel.spi.HeaderFilterStrategy;
+import org.apache.camel.spi.StateRepository;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Header;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaRecordProcessor {
+    public static final long START_OFFSET = -1;
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaRecordProcessor.class);
+
+    private final boolean autoCommitEnabled;
+    private final KafkaConfiguration configuration;
+    private final Processor processor;
+    private final KafkaConsumer<?, ?> consumer;
+    private final KafkaManualCommitFactory manualCommitFactory;
+    private final String threadId;
+
+    public static final class ProcessResult {
+        private boolean breakOnErrorHit;
+        private long partitionLastOffset;
+
+        private ProcessResult(boolean breakOnErrorHit, long 
partitionLastOffset) {
+            this.breakOnErrorHit = breakOnErrorHit;
+            this.partitionLastOffset = partitionLastOffset;
+        }
+
+        public boolean isBreakOnErrorHit() {
+            return breakOnErrorHit;
+        }
+
+        public long getPartitionLastOffset() {
+            return partitionLastOffset;
+        }
+
+        public static ProcessResult newUnprocessed() {
+            return new ProcessResult(false, START_OFFSET);
+        }
+    }
+
+    public KafkaRecordProcessor(boolean autoCommitEnabled, KafkaConfiguration 
configuration,
+                                Processor processor, KafkaConsumer<?, ?> 
consumer,
+                                KafkaManualCommitFactory manualCommitFactory,
+                                String threadId) {
+        this.autoCommitEnabled = autoCommitEnabled;
+        this.configuration = configuration;
+        this.processor = processor;
+        this.consumer = consumer;
+        this.manualCommitFactory = manualCommitFactory;
+        this.threadId = threadId;
+    }
+
+    private void setupExchangeMessage(Message message, ConsumerRecord record) {
+        message.setHeader(KafkaConstants.PARTITION, record.partition());
+        message.setHeader(KafkaConstants.TOPIC, record.topic());
+        message.setHeader(KafkaConstants.OFFSET, record.offset());
+        message.setHeader(KafkaConstants.HEADERS, record.headers());
+        message.setHeader(KafkaConstants.TIMESTAMP, record.timestamp());
+        message.setHeader(Exchange.MESSAGE_TIMESTAMP, record.timestamp());
+
+        if (record.key() != null) {
+            message.setHeader(KafkaConstants.KEY, record.key());
+        }
+
+        message.setBody(record.value());
+    }
+
+    private boolean shouldBeFiltered(Header header, Exchange exchange, 
HeaderFilterStrategy headerFilterStrategy) {
+        return 
!headerFilterStrategy.applyFilterToExternalHeaders(header.key(), 
header.value(), exchange);
+    }
+
+    private void propagateHeaders(ConsumerRecord<Object, Object> record, 
Exchange exchange) {
+
+        HeaderFilterStrategy headerFilterStrategy = 
configuration.getHeaderFilterStrategy();
+        KafkaHeaderDeserializer headerDeserializer = 
configuration.getHeaderDeserializer();
+
+        StreamSupport.stream(record.headers().spliterator(), false)
+                .filter(header -> shouldBeFiltered(header, exchange, 
headerFilterStrategy))
+                .forEach(header -> exchange.getIn().setHeader(header.key(),
+                        headerDeserializer.deserialize(header.key(), 
header.value())));
+    }
+
+    public ProcessResult processExchange(
+            Exchange exchange, TopicPartition partition, boolean 
partitionHasNext,
+            boolean recordHasNext, ConsumerRecord<Object, Object> record, 
ProcessResult lastResult,
+            ExceptionHandler exceptionHandler) {
+
+        setupExchangeMessage(exchange.getMessage(), record);
+
+        propagateHeaders(record, exchange);
+
+        // if not auto commit then we have additional information on the 
exchange
+        if (!autoCommitEnabled) {
+            
exchange.getIn().setHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT, 
!recordHasNext);
+        }
+
+        if (configuration.isAllowManualCommit()) {
+            StateRepository<String, String> offsetRepository = 
configuration.getOffsetRepository();
+
+            // allow Camel users to access the Kafka consumer API to be able 
to do for example manual commits
+            KafkaManualCommit manual = 
manualCommitFactory.newInstance(exchange, consumer, partition.topic(), threadId,
+                    offsetRepository, partition, record.offset(), 
configuration.getCommitTimeoutMs());
+            exchange.getIn().setHeader(KafkaConstants.MANUAL_COMMIT, manual);
+        }
+        // if commit management is on user side give additional info for the 
end of poll loop
+        if (!autoCommitEnabled || configuration.isAllowManualCommit()) {
+            exchange.getIn().setHeader(KafkaConstants.LAST_POLL_RECORD, 
!recordHasNext && !partitionHasNext);
+        }
+
+        try {
+            processor.process(exchange);
+        } catch (Exception e) {
+            exchange.setException(e);
+
+            boolean breakOnErrorExit = processException(exchange, partition, 
lastResult.getPartitionLastOffset(),
+                    exceptionHandler);
+
+            return new ProcessResult(breakOnErrorExit, 
lastResult.getPartitionLastOffset());
+        }
+
+        return new ProcessResult(false, record.offset());
+    }
+
+    public boolean processException(
+            Exchange exchange, TopicPartition partition, long 
partitionLastOffset,
+            ExceptionHandler exceptionHandler) {
+
+        // processing failed due to an unhandled exception, what should we do
+        if (configuration.isBreakOnFirstError()) {
+            // we are failing and we should break out
+            LOG.warn("Error during processing {} from topic: {}", exchange, 
partition.topic(), exchange.getException());
+            LOG.warn("Will seek consumer to offset {} and start polling 
again.", partitionLastOffset);
+
+            // force commit, so we resume on next poll where we failed
+            commitOffset(partition, partitionLastOffset, false, true, 
threadId);
+
+            // continue to next partition
+            return true;
+        } else {
+            // will handle/log the exception and then continue to next
+            exceptionHandler.handleException("Error during processing", 
exchange, exchange.getException());
+        }
+
+        return false;
+    }
+
+    public void commitOffset(
+            TopicPartition partition, long partitionLastOffset, boolean 
stopping, boolean forceCommit, String threadId) {
+        commitOffset(configuration, consumer, partition, partitionLastOffset, 
stopping, forceCommit, threadId);
+    }
+
+    public static void commitOffset(
+            KafkaConfiguration configuration, KafkaConsumer<?, ?> consumer, 
TopicPartition partition, long partitionLastOffset,
+            boolean stopping, boolean forceCommit, String threadId) {
+
+        if (partitionLastOffset == START_OFFSET) {
+            return;
+        }
+
+        StateRepository<String, String> offsetRepository = 
configuration.getOffsetRepository();
+
+        if (!configuration.isAllowManualCommit() && offsetRepository != null) {
+            LOG.debug("Saving offset repository state {} [topic: {} partition: 
{} offset: {}]", threadId, partition.topic(),
+                    partition.partition(),
+                    partitionLastOffset);
+            offsetRepository.setState(serializeOffsetKey(partition), 
serializeOffsetValue(partitionLastOffset));
+        } else if (stopping) {
+            // if we are stopping then react according to the configured option
+            if ("async".equals(configuration.getAutoCommitOnStop())) {
+                LOG.debug("Auto commitAsync on stop {} from topic {}", 
threadId, partition.topic());
+                consumer.commitAsync(
+                        Collections.singletonMap(partition, new 
OffsetAndMetadata(partitionLastOffset + 1)), null);
+            } else if ("sync".equals(configuration.getAutoCommitOnStop())) {
+                LOG.debug("Auto commitSync on stop {} from topic {}", 
threadId, partition.topic());
+                commitOffset(configuration, consumer, partition, 
partitionLastOffset);
+
+            } else if ("none".equals(configuration.getAutoCommitOnStop())) {
+                LOG.debug("Auto commit on stop {} from topic {} is disabled 
(none)", threadId, partition.topic());
+            }
+        } else if (forceCommit) {
+            LOG.debug("Forcing commitSync {} [topic: {} partition: {} offset: 
{}]", threadId, partition.topic(),
+                    partition.partition(), partitionLastOffset);
+            commitOffset(configuration, consumer, partition, 
partitionLastOffset);
+        }
+    }
+
+    private static void commitOffset(
+            KafkaConfiguration configuration, KafkaConsumer<?, ?> consumer, 
TopicPartition partition,
+            long partitionLastOffset) {
+        long timeout = configuration.getCommitTimeoutMs();
+        consumer.commitSync(
+                Collections.singletonMap(partition, new 
OffsetAndMetadata(partitionLastOffset + 1)),
+                Duration.ofMillis(timeout));
+    }
+
+    public static String serializeOffsetKey(TopicPartition topicPartition) {
+        return topicPartition.topic() + '/' + topicPartition.partition();
+    }
+
+    public static String serializeOffsetValue(long offset) {
+        return String.valueOf(offset);
+    }
+
+    public static long deserializeOffsetValue(String offset) {
+        return Long.parseLong(offset);
+    }
+}
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetResumeStrategy.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetResumeStrategy.java
new file mode 100644
index 0000000..6313337
--- /dev/null
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetResumeStrategy.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.kafka.consumer.support;
+
+import java.util.Set;
+
+import org.apache.camel.spi.StateRepository;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor.deserializeOffsetValue;
+import static 
org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor.serializeOffsetKey;
+
+/**
+ * A resume strategy that uses Kafka's offset for resuming
+ */
+public class OffsetResumeStrategy implements ResumeStrategy {
+    private static final Logger LOG = 
LoggerFactory.getLogger(OffsetResumeStrategy.class);
+
+    private final KafkaConsumer<?, ?> consumer;
+    private final StateRepository<String, String> offsetRepository;
+
+    public OffsetResumeStrategy(KafkaConsumer<?, ?> consumer, 
StateRepository<String, String> offsetRepository) {
+        this.consumer = consumer;
+        this.offsetRepository = offsetRepository;
+    }
+
+    private void resumeFromOffset(TopicPartition topicPartition, String 
offsetState) {
+        // The state contains the last read offset, so you need to seek from 
the next one
+        long offset = deserializeOffsetValue(offsetState) + 1;
+        LOG.debug("Resuming partition {} from offset {} from state", 
topicPartition.partition(), offset);
+        consumer.seek(topicPartition, offset);
+    }
+
+    @Override
+    public void resume() {
+        Set<TopicPartition> assignments = consumer.assignment();
+        for (TopicPartition topicPartition : assignments) {
+            String offsetState = 
offsetRepository.getState(serializeOffsetKey(topicPartition));
+            if (offsetState != null && !offsetState.isEmpty()) {
+                resumeFromOffset(topicPartition, offsetState);
+            }
+        }
+    }
+}
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
new file mode 100644
index 0000000..b67ab0c
--- /dev/null
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.kafka.consumer.support;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.function.Supplier;
+
+import org.apache.camel.component.kafka.KafkaConfiguration;
+import org.apache.camel.spi.StateRepository;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor.deserializeOffsetValue;
+import static 
org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor.serializeOffsetKey;
+
+public class PartitionAssignmentListener implements ConsumerRebalanceListener {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PartitionAssignmentListener.class);
+
+    private final String threadId;
+    private final String topicName;
+    private final KafkaConfiguration configuration;
+    private final KafkaConsumer consumer;
+    private final Map<String, Long> lastProcessedOffset;
+    private Supplier<Boolean> stopStateSupplier;
+
+    public PartitionAssignmentListener(String threadId, String topicName, 
KafkaConfiguration configuration,
+                                       KafkaConsumer consumer, Map<String, 
Long> lastProcessedOffset,
+                                       Supplier<Boolean> stopStateSupplier) {
+        this.threadId = threadId;
+        this.topicName = topicName;
+        this.configuration = configuration;
+        this.consumer = consumer;
+        this.lastProcessedOffset = lastProcessedOffset;
+        this.stopStateSupplier = stopStateSupplier;
+    }
+
+    private void resumeFromOffset(TopicPartition topicPartition, String 
offsetState) {
+        // The state contains the last read offset, so you need to seek from 
the next one
+        long offset = deserializeOffsetValue(offsetState) + 1;
+        LOG.debug("Resuming partition {} from offset {} from state", 
topicPartition.partition(), offset);
+        consumer.seek(topicPartition, offset);
+    }
+
+    @Override
+    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+        LOG.debug("onPartitionsRevoked: {} from topic {}", threadId, 
topicName);
+
+        // if camel is stopping, or we are not running
+        boolean stopping = stopStateSupplier.get();
+
+        for (TopicPartition partition : partitions) {
+            String offsetKey = serializeOffsetKey(partition);
+            Long offset = lastProcessedOffset.get(offsetKey);
+            if (offset == null) {
+                offset = -1L;
+            }
+            try {
+                // only commit offsets if the component has control
+                if (configuration.getAutoCommitEnable()) {
+                    KafkaRecordProcessor.commitOffset(configuration, consumer, 
partition, offset, stopping, false, threadId);
+                }
+            } catch (Exception e) {
+                LOG.error("Error saving offset repository state {} from 
offsetKey {} with offset: {}", threadId, offsetKey,
+                        offset);
+                throw e;
+            } finally {
+                lastProcessedOffset.remove(offsetKey);
+            }
+        }
+    }
+
+    @Override
+    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+        LOG.debug("onPartitionsAssigned: {} from topic {}", threadId, 
topicName);
+
+        StateRepository<String, String> offsetRepository = 
configuration.getOffsetRepository();
+        if (offsetRepository != null) {
+            for (TopicPartition partition : partitions) {
+                String offsetState = 
offsetRepository.getState(serializeOffsetKey(partition));
+                if (offsetState != null && !offsetState.isEmpty()) {
+                    // The state contains the last read offset, so you need to 
seek from the next one
+                    resumeFromOffset(partition, offsetState);
+                }
+            }
+        }
+    }
+}
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaManualCommitFactory.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategy.java
similarity index 56%
copy from 
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaManualCommitFactory.java
copy to 
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategy.java
index c8b8d7a..e9b6511 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaManualCommitFactory.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategy.java
@@ -14,23 +14,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.component.kafka;
 
-import org.apache.camel.Exchange;
-import org.apache.camel.spi.StateRepository;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.TopicPartition;
+package org.apache.camel.component.kafka.consumer.support;
 
 /**
- * Factory to create a new {@link KafkaManualCommit} to store on the {@link 
Exchange}.
+ * Defines a strategy for handling resume operations. Implementations can 
define different ways to handle how to resume
+ * processing records.
  */
-public interface KafkaManualCommitFactory {
-
+public interface ResumeStrategy {
     /**
-     * Creates a new instance
+     * Perform the resume operation
      */
-    KafkaManualCommit newInstance(
-            Exchange exchange, KafkaConsumer consumer, String topicName, 
String threadId,
-            StateRepository<String, String> offsetRepository,
-            TopicPartition partition, long recordOffset);
+    void resume();
 }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaManualCommitFactory.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
similarity index 50%
copy from 
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaManualCommitFactory.java
copy to 
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
index c8b8d7a..7386c88 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaManualCommitFactory.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
@@ -14,23 +14,38 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.component.kafka;
 
-import org.apache.camel.Exchange;
+package org.apache.camel.component.kafka.consumer.support;
+
 import org.apache.camel.spi.StateRepository;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.TopicPartition;
-
-/**
- * Factory to create a new {@link KafkaManualCommit} to store on the {@link 
Exchange}.
- */
-public interface KafkaManualCommitFactory {
 
+public final class ResumeStrategyFactory {
     /**
-     * Creates a new instance
+     * A NO-OP resume strategy that does nothing (i.e.: no resume)
      */
-    KafkaManualCommit newInstance(
-            Exchange exchange, KafkaConsumer consumer, String topicName, 
String threadId,
-            StateRepository<String, String> offsetRepository,
-            TopicPartition partition, long recordOffset);
+    private static class NoOpResumeStrategy implements ResumeStrategy {
+        @Override
+        public void resume() {
+
+        }
+    }
+
+    private static final NoOpResumeStrategy NO_OP_RESUME_STRATEGY = new 
NoOpResumeStrategy();
+
+    private ResumeStrategyFactory() {
+    }
+
+
+    public static ResumeStrategy newResumeStrategy(
+            KafkaConsumer<?, ?> consumer, StateRepository<String, String> 
offsetRepository,
+            String seekTo) {
+        if (offsetRepository != null) {
+            return new OffsetResumeStrategy(consumer, offsetRepository);
+        } else if (seekTo != null) {
+            return new SeekPolicyResumeStrategy(consumer, seekTo);
+        }
+
+        return NO_OP_RESUME_STRATEGY;
+    }
 }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyResumeStrategy.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyResumeStrategy.java
new file mode 100644
index 0000000..8e5361d
--- /dev/null
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyResumeStrategy.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.kafka.consumer.support;
+
+import java.time.Duration;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A resume strategy that uses Camel's seekTo configuration for resuming
+ */
+public class SeekPolicyResumeStrategy implements ResumeStrategy {
+    private static final Logger LOG = 
LoggerFactory.getLogger(SeekPolicyResumeStrategy.class);
+
+    private final String seekPolicy;
+    private final KafkaConsumer<?, ?> consumer;
+
+    public SeekPolicyResumeStrategy(KafkaConsumer<?, ?> consumer, String 
seekPolicy) {
+        this.seekPolicy = seekPolicy;
+        this.consumer = consumer;
+    }
+
+    @Override
+    public void resume() {
+        if (seekPolicy.equals("beginning")) {
+            LOG.debug("Seeking from the beginning of topic");
+            // This poll to ensure we have an assigned partition
+            // otherwise seek won't work
+            consumer.poll(Duration.ofMillis(100));
+            consumer.seekToBeginning(consumer.assignment());
+        } else if (seekPolicy.equals("end")) {
+            LOG.debug("Seeking from the end off the topic");
+            // This poll to ensure we have an assigned partition
+            // otherwise seek won't work
+            consumer.poll(Duration.ofMillis(100));
+            consumer.seekToEnd(consumer.assignment());
+        }
+    }
+}
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
index 5cdcc35..1cf42fd 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
@@ -35,7 +35,6 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.header.internals.RecordHeader;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
 
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerManualCommitIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerManualCommitIT.java
index c3c45f6..798c508 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerManualCommitIT.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerManualCommitIT.java
@@ -28,7 +28,7 @@ import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.RepeatedTest;
 
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 
@@ -80,7 +80,7 @@ public class KafkaConsumerManualCommitIT extends 
BaseEmbeddedKafkaTestSupport {
         };
     }
 
-    @Test
+    @RepeatedTest(4)
     public void kafkaAutoCommitDisabledDuringRebalance() throws Exception {
         to.expectedMessageCount(1);
         String firstMessage = "message-0";
@@ -128,7 +128,7 @@ public class KafkaConsumerManualCommitIT extends 
BaseEmbeddedKafkaTestSupport {
         to.assertIsSatisfied(3000);
     }
 
-    @Test
+    @RepeatedTest(4)
     public void kafkaManualCommit() throws Exception {
         to.expectedMessageCount(5);
         to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", 
"message-2", "message-3", "message-4");
@@ -146,6 +146,8 @@ public class KafkaConsumerManualCommitIT extends 
BaseEmbeddedKafkaTestSupport {
 
         to.reset();
 
+        //        Thread.sleep(5000);
+
         // Second step: We shut down our route, we expect nothing will be 
recovered by our route
         context.getRouteController().stopRoute("foo");
         to.expectedMessageCount(0);
diff --git 
a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
 
b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
index ddc4c28..c70bcea 100644
--- 
a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
+++ 
b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
@@ -158,7 +158,7 @@ public interface KafkaComponentBuilderFactory {
             return this;
         }
         /**
-         * Timeout in milli seconds to wait gracefully for the consumer or
+         * Timeout in milliseconds to wait gracefully for the consumer or
          * producer to shutdown and terminate its worker threads.
          * 
          * The option is a: &lt;code&gt;int&lt;/code&gt; type.
@@ -331,6 +331,23 @@ public interface KafkaComponentBuilderFactory {
             return this;
         }
         /**
+         * The maximum time, in milliseconds, that the code will wait for a
+         * synchronous commit to complete.
+         * 
+         * The option is a: &lt;code&gt;java.lang.Long&lt;/code&gt; type.
+         * 
+         * Default: 5000
+         * Group: consumer
+         * 
+         * @param commitTimeoutMs the value to set
+         * @return the dsl builder
+         */
+        default KafkaComponentBuilder commitTimeoutMs(
+                java.lang.Long commitTimeoutMs) {
+            doSetProperty("commitTimeoutMs", commitTimeoutMs);
+            return this;
+        }
+        /**
          * The configuration controls the maximum amount of time the client 
will
          * wait for the response of a request. If the response is not received
          * before the timeout elapses the client will resend the request if
@@ -1995,6 +2012,7 @@ public interface KafkaComponentBuilderFactory {
             case "breakOnFirstError": 
getOrCreateConfiguration((KafkaComponent) 
component).setBreakOnFirstError((boolean) value); return true;
             case "bridgeErrorHandler": ((KafkaComponent) 
component).setBridgeErrorHandler((boolean) value); return true;
             case "checkCrcs": getOrCreateConfiguration((KafkaComponent) 
component).setCheckCrcs((java.lang.Boolean) value); return true;
+            case "commitTimeoutMs": getOrCreateConfiguration((KafkaComponent) 
component).setCommitTimeoutMs((java.lang.Long) value); return true;
             case "consumerRequestTimeoutMs": 
getOrCreateConfiguration((KafkaComponent) 
component).setConsumerRequestTimeoutMs((java.lang.Integer) value); return true;
             case "consumersCount": getOrCreateConfiguration((KafkaComponent) 
component).setConsumersCount((int) value); return true;
             case "consumerStreams": getOrCreateConfiguration((KafkaComponent) 
component).setConsumerStreams((int) value); return true;
diff --git 
a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
 
b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
index 3fca0d8..1bc303f 100644
--- 
a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
+++ 
b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
@@ -205,7 +205,7 @@ public interface KafkaEndpointBuilderFactory {
             return this;
         }
         /**
-         * Timeout in milli seconds to wait gracefully for the consumer or
+         * Timeout in milliseconds to wait gracefully for the consumer or
          * producer to shutdown and terminate its worker threads.
          * 
          * The option is a: &lt;code&gt;int&lt;/code&gt; type.
@@ -221,7 +221,7 @@ public interface KafkaEndpointBuilderFactory {
             return this;
         }
         /**
-         * Timeout in milli seconds to wait gracefully for the consumer or
+         * Timeout in milliseconds to wait gracefully for the consumer or
          * producer to shutdown and terminate its worker threads.
          * 
          * The option will be converted to a &lt;code&gt;int&lt;/code&gt; type.
@@ -520,6 +520,41 @@ public interface KafkaEndpointBuilderFactory {
             return this;
         }
         /**
+         * The maximum time, in milliseconds, that the code will wait for a
+         * synchronous commit to complete.
+         * 
+         * The option is a: &lt;code&gt;java.lang.Long&lt;/code&gt; type.
+         * 
+         * Default: 5000
+         * Group: consumer
+         * 
+         * @param commitTimeoutMs the value to set
+         * @return the dsl builder
+         */
+        default KafkaEndpointConsumerBuilder commitTimeoutMs(
+                Long commitTimeoutMs) {
+            doSetProperty("commitTimeoutMs", commitTimeoutMs);
+            return this;
+        }
+        /**
+         * The maximum time, in milliseconds, that the code will wait for a
+         * synchronous commit to complete.
+         * 
+         * The option will be converted to a
+         * &lt;code&gt;java.lang.Long&lt;/code&gt; type.
+         * 
+         * Default: 5000
+         * Group: consumer
+         * 
+         * @param commitTimeoutMs the value to set
+         * @return the dsl builder
+         */
+        default KafkaEndpointConsumerBuilder commitTimeoutMs(
+                String commitTimeoutMs) {
+            doSetProperty("commitTimeoutMs", commitTimeoutMs);
+            return this;
+        }
+        /**
          * The configuration controls the maximum amount of time the client 
will
          * wait for the response of a request. If the response is not received
          * before the timeout elapses the client will resend the request if
@@ -1998,7 +2033,7 @@ public interface KafkaEndpointBuilderFactory {
             return this;
         }
         /**
-         * Timeout in milli seconds to wait gracefully for the consumer or
+         * Timeout in milliseconds to wait gracefully for the consumer or
          * producer to shutdown and terminate its worker threads.
          * 
          * The option is a: &lt;code&gt;int&lt;/code&gt; type.
@@ -2014,7 +2049,7 @@ public interface KafkaEndpointBuilderFactory {
             return this;
         }
         /**
-         * Timeout in milli seconds to wait gracefully for the consumer or
+         * Timeout in milliseconds to wait gracefully for the consumer or
          * producer to shutdown and terminate its worker threads.
          * 
          * The option will be converted to a &lt;code&gt;int&lt;/code&gt; type.
@@ -3944,7 +3979,7 @@ public interface KafkaEndpointBuilderFactory {
             return this;
         }
         /**
-         * Timeout in milli seconds to wait gracefully for the consumer or
+         * Timeout in milliseconds to wait gracefully for the consumer or
          * producer to shutdown and terminate its worker threads.
          * 
          * The option is a: &lt;code&gt;int&lt;/code&gt; type.
@@ -3960,7 +3995,7 @@ public interface KafkaEndpointBuilderFactory {
             return this;
         }
         /**
-         * Timeout in milli seconds to wait gracefully for the consumer or
+         * Timeout in milliseconds to wait gracefully for the consumer or
          * producer to shutdown and terminate its worker threads.
          * 
          * The option will be converted to a &lt;code&gt;int&lt;/code&gt; type.

Reply via email to