This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 0ee230f90a3 CAMEL-17913: camel-kafka - Add isolationLevel option for 
consumer
0ee230f90a3 is described below

commit 0ee230f90a3dec7dcc44a93bc4ad8223d405c7c2
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Wed May 11 21:17:33 2022 +0200

    CAMEL-17913: camel-kafka - Add isolationLevel option for consumer
---
 .../org/apache/camel/catalog/components/kafka.json |  2 ++
 .../component/kafka/KafkaComponentConfigurer.java  |  6 ++++++
 .../component/kafka/KafkaEndpointConfigurer.java   |  6 ++++++
 .../component/kafka/KafkaEndpointUriFactory.java   |  3 ++-
 .../org/apache/camel/component/kafka/kafka.json    |  2 ++
 .../camel/component/kafka/KafkaConfiguration.java  | 24 ++++++++++++++++++++--
 6 files changed, 40 insertions(+), 3 deletions(-)

diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
index b7e0f5bb104..29422a48c6c 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
@@ -61,6 +61,7 @@
     "valueDeserializer": { "kind": "property", "displayName": "Value 
Deserializer", "group": "consumer", "label": "consumer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": 
"org.apache.kafka.common.serialization.StringDeserializer", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "Deserializer class for 
value th [...]
     "createConsumerBackoffInterval": { "kind": "property", "displayName": 
"Create Consumer Backoff Interval", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "integer", "javaType": "long", 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 5000, 
"description": "The delay in millis seconds to wait before trying again to 
create the kafka consumer (kafka-client)." },
     "createConsumerBackoffMaxAttempts": { "kind": "property", "displayName": 
"Create Consumer Backoff Max Attempts", "group": "consumer (advanced)", 
"label": "consumer,advanced", "required": false, "type": "integer", "javaType": 
"int", "deprecated": false, "autowired": false, "secret": false, "description": 
"Maximum attempts to create the kafka consumer (kafka-client), before 
eventually giving up and failing. Error during creating the consumer may be 
fatal due to invalid configuration an [...]
+    "isolationLevel": { "kind": "property", "displayName": "Isolation Level", 
"group": "consumer (advanced)", "label": "consumer,advanced", "required": 
false, "type": "string", "javaType": "java.lang.String", "enum": [ 
"read_uncommitted", "read_committed" ], "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "read_uncommitted", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "Control [...]
     "kafkaManualCommitFactory": { "kind": "property", "displayName": "Kafka 
Manual Commit Factory", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "object", "javaType": 
"org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory", 
"deprecated": false, "autowired": true, "secret": false, "description": 
"Factory to use for creating KafkaManualCommit instances. This allows to plugin 
a custom factory to create custom KafkaManualCommit instanc [...]
     "pollExceptionStrategy": { "kind": "property", "displayName": "Poll 
Exception Strategy", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "object", "javaType": 
"org.apache.camel.component.kafka.PollExceptionStrategy", "deprecated": false, 
"autowired": true, "secret": false, "description": "To use a custom strategy 
with the consumer to control how to handle exceptions thrown from the Kafka 
broker while pooling messages." },
     "subscribeConsumerBackoffInterval": { "kind": "property", "displayName": 
"Subscribe Consumer Backoff Interval", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "integer", "javaType": "long", 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 5000, 
"description": "The delay in millis seconds to wait before trying again to 
subscribe to the kafka broker." },
@@ -184,6 +185,7 @@
     "valueDeserializer": { "kind": "parameter", "displayName": "Value 
Deserializer", "group": "consumer", "label": "consumer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": 
"org.apache.kafka.common.serialization.StringDeserializer", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "Deserializer class for 
value t [...]
     "exceptionHandler": { "kind": "parameter", "displayName": "Exception 
Handler", "group": "consumer (advanced)", "label": "consumer,advanced", 
"required": false, "type": "object", "javaType": 
"org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", 
"deprecated": false, "autowired": false, "secret": false, "description": "To 
let the consumer use a custom ExceptionHandler. Notice if the option 
bridgeErrorHandler is enabled then this option is not in use. By default the 
con [...]
     "exchangePattern": { "kind": "parameter", "displayName": "Exchange 
Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", 
"required": false, "type": "object", "javaType": 
"org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut", 
"InOptionalOut" ], "deprecated": false, "autowired": false, "secret": false, 
"description": "Sets the exchange pattern when the consumer creates an 
exchange." },
+    "isolationLevel": { "kind": "parameter", "displayName": "Isolation Level", 
"group": "consumer (advanced)", "label": "consumer,advanced", "required": 
false, "type": "string", "javaType": "java.lang.String", "enum": [ 
"read_uncommitted", "read_committed" ], "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "read_uncommitted", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "Contro [...]
     "kafkaManualCommitFactory": { "kind": "parameter", "displayName": "Kafka 
Manual Commit Factory", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "object", "javaType": 
"org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory", 
"deprecated": false, "autowired": false, "secret": false, "description": 
"Factory to use for creating KafkaManualCommit instances. This allows to plugin 
a custom factory to create custom KafkaManualCommit insta [...]
     "bufferMemorySize": { "kind": "parameter", "displayName": "Buffer Memory 
Size", "group": "producer", "label": "producer", "required": false, "type": 
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "33554432", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The total bytes of memory the producer can use 
to buffer records waiting to b [...]
     "compressionCodec": { "kind": "parameter", "displayName": "Compression 
Codec", "group": "producer", "label": "producer", "required": false, "type": 
"string", "javaType": "java.lang.String", "enum": [ "none", "gzip", "snappy", 
"lz4" ], "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": "none", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "This parameter allows you to specify th [...]
diff --git 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
index a9e413a5361..3be52693578 100644
--- 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
+++ 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
@@ -90,6 +90,8 @@ public class KafkaComponentConfigurer extends 
PropertyConfigurerSupport implemen
         case "heartbeatIntervalMs": 
getOrCreateConfiguration(target).setHeartbeatIntervalMs(property(camelContext, 
java.lang.Integer.class, value)); return true;
         case "interceptorclasses":
         case "interceptorClasses": 
getOrCreateConfiguration(target).setInterceptorClasses(property(camelContext, 
java.lang.String.class, value)); return true;
+        case "isolationlevel":
+        case "isolationLevel": 
getOrCreateConfiguration(target).setIsolationLevel(property(camelContext, 
java.lang.String.class, value)); return true;
         case "kafkaclientfactory":
         case "kafkaClientFactory": 
target.setKafkaClientFactory(property(camelContext, 
org.apache.camel.component.kafka.KafkaClientFactory.class, value)); return true;
         case "kafkamanualcommitfactory":
@@ -308,6 +310,8 @@ public class KafkaComponentConfigurer extends 
PropertyConfigurerSupport implemen
         case "heartbeatIntervalMs": return java.lang.Integer.class;
         case "interceptorclasses":
         case "interceptorClasses": return java.lang.String.class;
+        case "isolationlevel":
+        case "isolationLevel": return java.lang.String.class;
         case "kafkaclientfactory":
         case "kafkaClientFactory": return 
org.apache.camel.component.kafka.KafkaClientFactory.class;
         case "kafkamanualcommitfactory":
@@ -522,6 +526,8 @@ public class KafkaComponentConfigurer extends 
PropertyConfigurerSupport implemen
         case "heartbeatIntervalMs": return 
getOrCreateConfiguration(target).getHeartbeatIntervalMs();
         case "interceptorclasses":
         case "interceptorClasses": return 
getOrCreateConfiguration(target).getInterceptorClasses();
+        case "isolationlevel":
+        case "isolationLevel": return 
getOrCreateConfiguration(target).getIsolationLevel();
         case "kafkaclientfactory":
         case "kafkaClientFactory": return target.getKafkaClientFactory();
         case "kafkamanualcommitfactory":
diff --git 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
index be6375442c4..bba6d9d9c94 100644
--- 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
+++ 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
@@ -80,6 +80,8 @@ public class KafkaEndpointConfigurer extends 
PropertyConfigurerSupport implement
         case "heartbeatIntervalMs": 
target.getConfiguration().setHeartbeatIntervalMs(property(camelContext, 
java.lang.Integer.class, value)); return true;
         case "interceptorclasses":
         case "interceptorClasses": 
target.getConfiguration().setInterceptorClasses(property(camelContext, 
java.lang.String.class, value)); return true;
+        case "isolationlevel":
+        case "isolationLevel": 
target.getConfiguration().setIsolationLevel(property(camelContext, 
java.lang.String.class, value)); return true;
         case "kafkaclientfactory":
         case "kafkaClientFactory": 
target.setKafkaClientFactory(property(camelContext, 
org.apache.camel.component.kafka.KafkaClientFactory.class, value)); return true;
         case "kafkamanualcommitfactory":
@@ -282,6 +284,8 @@ public class KafkaEndpointConfigurer extends 
PropertyConfigurerSupport implement
         case "heartbeatIntervalMs": return java.lang.Integer.class;
         case "interceptorclasses":
         case "interceptorClasses": return java.lang.String.class;
+        case "isolationlevel":
+        case "isolationLevel": return java.lang.String.class;
         case "kafkaclientfactory":
         case "kafkaClientFactory": return 
org.apache.camel.component.kafka.KafkaClientFactory.class;
         case "kafkamanualcommitfactory":
@@ -485,6 +489,8 @@ public class KafkaEndpointConfigurer extends 
PropertyConfigurerSupport implement
         case "heartbeatIntervalMs": return 
target.getConfiguration().getHeartbeatIntervalMs();
         case "interceptorclasses":
         case "interceptorClasses": return 
target.getConfiguration().getInterceptorClasses();
+        case "isolationlevel":
+        case "isolationLevel": return 
target.getConfiguration().getIsolationLevel();
         case "kafkaclientfactory":
         case "kafkaClientFactory": return target.getKafkaClientFactory();
         case "kafkamanualcommitfactory":
diff --git 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java
 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java
index 6bbcedab179..8e6b17ec99a 100644
--- 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java
+++ 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java
@@ -21,7 +21,7 @@ public class KafkaEndpointUriFactory extends 
org.apache.camel.support.component.
     private static final Set<String> SECRET_PROPERTY_NAMES;
     private static final Set<String> MULTI_VALUE_PREFIXES;
     static {
-        Set<String> props = new HashSet<>(101);
+        Set<String> props = new HashSet<>(102);
         props.add("additionalProperties");
         props.add("allowManualCommit");
         props.add("autoCommitEnable");
@@ -52,6 +52,7 @@ public class KafkaEndpointUriFactory extends 
org.apache.camel.support.component.
         props.add("headerSerializer");
         props.add("heartbeatIntervalMs");
         props.add("interceptorClasses");
+        props.add("isolationLevel");
         props.add("kafkaClientFactory");
         props.add("kafkaManualCommitFactory");
         props.add("kerberosBeforeReloginMinTime");
diff --git 
a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
 
b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
index b7e0f5bb104..29422a48c6c 100644
--- 
a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
+++ 
b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
@@ -61,6 +61,7 @@
     "valueDeserializer": { "kind": "property", "displayName": "Value 
Deserializer", "group": "consumer", "label": "consumer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": 
"org.apache.kafka.common.serialization.StringDeserializer", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "Deserializer class for 
value th [...]
     "createConsumerBackoffInterval": { "kind": "property", "displayName": 
"Create Consumer Backoff Interval", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "integer", "javaType": "long", 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 5000, 
"description": "The delay in millis seconds to wait before trying again to 
create the kafka consumer (kafka-client)." },
     "createConsumerBackoffMaxAttempts": { "kind": "property", "displayName": 
"Create Consumer Backoff Max Attempts", "group": "consumer (advanced)", 
"label": "consumer,advanced", "required": false, "type": "integer", "javaType": 
"int", "deprecated": false, "autowired": false, "secret": false, "description": 
"Maximum attempts to create the kafka consumer (kafka-client), before 
eventually giving up and failing. Error during creating the consumer may be 
fatal due to invalid configuration an [...]
+    "isolationLevel": { "kind": "property", "displayName": "Isolation Level", 
"group": "consumer (advanced)", "label": "consumer,advanced", "required": 
false, "type": "string", "javaType": "java.lang.String", "enum": [ 
"read_uncommitted", "read_committed" ], "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "read_uncommitted", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "Control [...]
     "kafkaManualCommitFactory": { "kind": "property", "displayName": "Kafka 
Manual Commit Factory", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "object", "javaType": 
"org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory", 
"deprecated": false, "autowired": true, "secret": false, "description": 
"Factory to use for creating KafkaManualCommit instances. This allows to plugin 
a custom factory to create custom KafkaManualCommit instanc [...]
     "pollExceptionStrategy": { "kind": "property", "displayName": "Poll 
Exception Strategy", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "object", "javaType": 
"org.apache.camel.component.kafka.PollExceptionStrategy", "deprecated": false, 
"autowired": true, "secret": false, "description": "To use a custom strategy 
with the consumer to control how to handle exceptions thrown from the Kafka 
broker while pooling messages." },
     "subscribeConsumerBackoffInterval": { "kind": "property", "displayName": 
"Subscribe Consumer Backoff Interval", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "integer", "javaType": "long", 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 5000, 
"description": "The delay in millis seconds to wait before trying again to 
subscribe to the kafka broker." },
@@ -184,6 +185,7 @@
     "valueDeserializer": { "kind": "parameter", "displayName": "Value 
Deserializer", "group": "consumer", "label": "consumer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": 
"org.apache.kafka.common.serialization.StringDeserializer", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "Deserializer class for 
value t [...]
     "exceptionHandler": { "kind": "parameter", "displayName": "Exception 
Handler", "group": "consumer (advanced)", "label": "consumer,advanced", 
"required": false, "type": "object", "javaType": 
"org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", 
"deprecated": false, "autowired": false, "secret": false, "description": "To 
let the consumer use a custom ExceptionHandler. Notice if the option 
bridgeErrorHandler is enabled then this option is not in use. By default the 
con [...]
     "exchangePattern": { "kind": "parameter", "displayName": "Exchange 
Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", 
"required": false, "type": "object", "javaType": 
"org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut", 
"InOptionalOut" ], "deprecated": false, "autowired": false, "secret": false, 
"description": "Sets the exchange pattern when the consumer creates an 
exchange." },
+    "isolationLevel": { "kind": "parameter", "displayName": "Isolation Level", 
"group": "consumer (advanced)", "label": "consumer,advanced", "required": 
false, "type": "string", "javaType": "java.lang.String", "enum": [ 
"read_uncommitted", "read_committed" ], "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "read_uncommitted", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "Contro [...]
     "kafkaManualCommitFactory": { "kind": "parameter", "displayName": "Kafka 
Manual Commit Factory", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "object", "javaType": 
"org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory", 
"deprecated": false, "autowired": false, "secret": false, "description": 
"Factory to use for creating KafkaManualCommit instances. This allows to plugin 
a custom factory to create custom KafkaManualCommit insta [...]
     "bufferMemorySize": { "kind": "parameter", "displayName": "Buffer Memory 
Size", "group": "producer", "label": "producer", "required": false, "type": 
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "33554432", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The total bytes of memory the producer can use 
to buffer records waiting to b [...]
     "compressionCodec": { "kind": "parameter", "displayName": "Compression 
Codec", "group": "producer", "label": "producer", "required": false, "type": 
"string", "javaType": "java.lang.String", "enum": [ "none", "gzip", "snappy", 
"lz4" ], "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": "none", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "This parameter allows you to specify th [...]
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index df624832871..7211084f567 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -145,6 +145,8 @@ public class KafkaConfiguration implements Cloneable, 
HeaderFilterStrategyAware
     private PollOnError pollOnError = PollOnError.ERROR_HANDLER;
     @UriParam(label = "consumer", defaultValue = "5000", javaType = 
"java.time.Duration")
     private Long commitTimeoutMs = 5000L;
+    @UriParam(label = "consumer,advanced", defaultValue = "read_uncommitted", 
enums = "read_uncommitted,read_committed")
+    private String isolationLevel;
 
     // Producer configuration properties
     @UriParam(label = "producer", defaultValue = 
KafkaConstants.KAFKA_DEFAULT_PARTITIONER)
@@ -466,6 +468,7 @@ public class KafkaConfiguration implements Cloneable, 
HeaderFilterStrategyAware
         addPropertyIfNotEmpty(props, 
ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, getReconnectBackoffMs());
         addPropertyIfNotEmpty(props, ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, 
getRetryBackoffMs());
         addPropertyIfNotEmpty(props, 
ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, getReconnectBackoffMaxMs());
+        addPropertyIfNotEmpty(props, ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
getIsolationLevel());
         addPropertyIfNotEmpty(props, "schema.registry.url", 
getSchemaRegistryURL());
         addPropertyIfNotFalse(props, "specific.avro.reader", 
isSpecificAvroReader());
 
@@ -1804,10 +1807,27 @@ public class KafkaConfiguration implements Cloneable, 
HeaderFilterStrategyAware
 
     /**
      * The maximum time, in milliseconds, that the code will wait for a 
synchronous commit to complete
-     * 
-     * @param commitTimeoutMs
      */
     public void setCommitTimeoutMs(Long commitTimeoutMs) {
         this.commitTimeoutMs = commitTimeoutMs;
     }
+
+    public String getIsolationLevel() {
+        return isolationLevel;
+    }
+
+    /**
+     * Controls how to read messages written transactionally. If set to 
read_committed, consumer.poll() will only return
+     * transactional messages which have been committed. If set to 
read_uncommitted (the default), consumer.poll() will
+     * return all messages, even transactional messages which have been 
aborted. Non-transactional messages will be
+     * returned unconditionally in either mode. Messages will always be 
returned in offset order. Hence, in
+     * read_committed mode, consumer.poll() will only return messages up to 
the last stable offset (LSO), which is the
+     * one less than the offset of the first open transaction. In particular 
any messages appearing after messages
+     * belonging to ongoing transactions will be withheld until the relevant 
transaction has been completed. As a
+     * result, read_committed</code> consumers will not be able to read up to 
the high watermark when there are in
+     * flight transactions. Further, when in read_committed the seekToEnd 
method will return the LSO
+     */
+    public void setIsolationLevel(String isolationLevel) {
+        this.isolationLevel = isolationLevel;
+    }
 }

Reply via email to