This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 0ee230f90a3 CAMEL-17913: camel-kafka - Add isolationLevel option for consumer 0ee230f90a3 is described below commit 0ee230f90a3dec7dcc44a93bc4ad8223d405c7c2 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Wed May 11 21:17:33 2022 +0200 CAMEL-17913: camel-kafka - Add isolationLevel option for consumer --- .../org/apache/camel/catalog/components/kafka.json | 2 ++ .../component/kafka/KafkaComponentConfigurer.java | 6 ++++++ .../component/kafka/KafkaEndpointConfigurer.java | 6 ++++++ .../component/kafka/KafkaEndpointUriFactory.java | 3 ++- .../org/apache/camel/component/kafka/kafka.json | 2 ++ .../camel/component/kafka/KafkaConfiguration.java | 24 ++++++++++++++++++++-- 6 files changed, 40 insertions(+), 3 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json index b7e0f5bb104..29422a48c6c 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json @@ -61,6 +61,7 @@ "valueDeserializer": { "kind": "property", "displayName": "Value Deserializer", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "org.apache.kafka.common.serialization.StringDeserializer", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Deserializer class for value th [...] "createConsumerBackoffInterval": { "kind": "property", "displayName": "Create Consumer Backoff Interval", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 5000, "description": "The delay in millis seconds to wait before trying again to create the kafka consumer (kafka-client)." }, "createConsumerBackoffMaxAttempts": { "kind": "property", "displayName": "Create Consumer Backoff Max Attempts", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "description": "Maximum attempts to create the kafka consumer (kafka-client), before eventually giving up and failing. Error during creating the consumer may be fatal due to invalid configuration an [...] + "isolationLevel": { "kind": "property", "displayName": "Isolation Level", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "read_uncommitted", "read_committed" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "read_uncommitted", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Control [...] "kafkaManualCommitFactory": { "kind": "property", "displayName": "Kafka Manual Commit Factory", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory", "deprecated": false, "autowired": true, "secret": false, "description": "Factory to use for creating KafkaManualCommit instances. This allows to plugin a custom factory to create custom KafkaManualCommit instanc [...] "pollExceptionStrategy": { "kind": "property", "displayName": "Poll Exception Strategy", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.PollExceptionStrategy", "deprecated": false, "autowired": true, "secret": false, "description": "To use a custom strategy with the consumer to control how to handle exceptions thrown from the Kafka broker while pooling messages." }, "subscribeConsumerBackoffInterval": { "kind": "property", "displayName": "Subscribe Consumer Backoff Interval", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 5000, "description": "The delay in millis seconds to wait before trying again to subscribe to the kafka broker." }, @@ -184,6 +185,7 @@ "valueDeserializer": { "kind": "parameter", "displayName": "Value Deserializer", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "org.apache.kafka.common.serialization.StringDeserializer", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Deserializer class for value t [...] "exceptionHandler": { "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the con [...] "exchangePattern": { "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut", "InOptionalOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." }, + "isolationLevel": { "kind": "parameter", "displayName": "Isolation Level", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "read_uncommitted", "read_committed" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "read_uncommitted", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Contro [...] "kafkaManualCommitFactory": { "kind": "parameter", "displayName": "Kafka Manual Commit Factory", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory", "deprecated": false, "autowired": false, "secret": false, "description": "Factory to use for creating KafkaManualCommit instances. This allows to plugin a custom factory to create custom KafkaManualCommit insta [...] "bufferMemorySize": { "kind": "parameter", "displayName": "Buffer Memory Size", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "33554432", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The total bytes of memory the producer can use to buffer records waiting to b [...] "compressionCodec": { "kind": "parameter", "displayName": "Compression Codec", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "none", "gzip", "snappy", "lz4" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "none", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "This parameter allows you to specify th [...] diff --git a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java index a9e413a5361..3be52693578 100644 --- a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java +++ b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java @@ -90,6 +90,8 @@ public class KafkaComponentConfigurer extends PropertyConfigurerSupport implemen case "heartbeatIntervalMs": getOrCreateConfiguration(target).setHeartbeatIntervalMs(property(camelContext, java.lang.Integer.class, value)); return true; case "interceptorclasses": case "interceptorClasses": getOrCreateConfiguration(target).setInterceptorClasses(property(camelContext, java.lang.String.class, value)); return true; + case "isolationlevel": + case "isolationLevel": getOrCreateConfiguration(target).setIsolationLevel(property(camelContext, java.lang.String.class, value)); return true; case "kafkaclientfactory": case "kafkaClientFactory": target.setKafkaClientFactory(property(camelContext, org.apache.camel.component.kafka.KafkaClientFactory.class, value)); return true; case "kafkamanualcommitfactory": @@ -308,6 +310,8 @@ public class KafkaComponentConfigurer extends PropertyConfigurerSupport implemen case "heartbeatIntervalMs": return java.lang.Integer.class; case "interceptorclasses": case "interceptorClasses": return java.lang.String.class; + case "isolationlevel": + case "isolationLevel": return java.lang.String.class; case "kafkaclientfactory": case "kafkaClientFactory": return org.apache.camel.component.kafka.KafkaClientFactory.class; case "kafkamanualcommitfactory": @@ -522,6 +526,8 @@ public class KafkaComponentConfigurer extends PropertyConfigurerSupport implemen case "heartbeatIntervalMs": return getOrCreateConfiguration(target).getHeartbeatIntervalMs(); case "interceptorclasses": case "interceptorClasses": return getOrCreateConfiguration(target).getInterceptorClasses(); + case "isolationlevel": + case "isolationLevel": return getOrCreateConfiguration(target).getIsolationLevel(); case "kafkaclientfactory": case "kafkaClientFactory": return target.getKafkaClientFactory(); case "kafkamanualcommitfactory": diff --git a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java index be6375442c4..bba6d9d9c94 100644 --- a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java +++ b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java @@ -80,6 +80,8 @@ public class KafkaEndpointConfigurer extends PropertyConfigurerSupport implement case "heartbeatIntervalMs": target.getConfiguration().setHeartbeatIntervalMs(property(camelContext, java.lang.Integer.class, value)); return true; case "interceptorclasses": case "interceptorClasses": target.getConfiguration().setInterceptorClasses(property(camelContext, java.lang.String.class, value)); return true; + case "isolationlevel": + case "isolationLevel": target.getConfiguration().setIsolationLevel(property(camelContext, java.lang.String.class, value)); return true; case "kafkaclientfactory": case "kafkaClientFactory": target.setKafkaClientFactory(property(camelContext, org.apache.camel.component.kafka.KafkaClientFactory.class, value)); return true; case "kafkamanualcommitfactory": @@ -282,6 +284,8 @@ public class KafkaEndpointConfigurer extends PropertyConfigurerSupport implement case "heartbeatIntervalMs": return java.lang.Integer.class; case "interceptorclasses": case "interceptorClasses": return java.lang.String.class; + case "isolationlevel": + case "isolationLevel": return java.lang.String.class; case "kafkaclientfactory": case "kafkaClientFactory": return org.apache.camel.component.kafka.KafkaClientFactory.class; case "kafkamanualcommitfactory": @@ -485,6 +489,8 @@ public class KafkaEndpointConfigurer extends PropertyConfigurerSupport implement case "heartbeatIntervalMs": return target.getConfiguration().getHeartbeatIntervalMs(); case "interceptorclasses": case "interceptorClasses": return target.getConfiguration().getInterceptorClasses(); + case "isolationlevel": + case "isolationLevel": return target.getConfiguration().getIsolationLevel(); case "kafkaclientfactory": case "kafkaClientFactory": return target.getKafkaClientFactory(); case "kafkamanualcommitfactory": diff --git a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java index 6bbcedab179..8e6b17ec99a 100644 --- a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java +++ b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java @@ -21,7 +21,7 @@ public class KafkaEndpointUriFactory extends org.apache.camel.support.component. private static final Set<String> SECRET_PROPERTY_NAMES; private static final Set<String> MULTI_VALUE_PREFIXES; static { - Set<String> props = new HashSet<>(101); + Set<String> props = new HashSet<>(102); props.add("additionalProperties"); props.add("allowManualCommit"); props.add("autoCommitEnable"); @@ -52,6 +52,7 @@ public class KafkaEndpointUriFactory extends org.apache.camel.support.component. props.add("headerSerializer"); props.add("heartbeatIntervalMs"); props.add("interceptorClasses"); + props.add("isolationLevel"); props.add("kafkaClientFactory"); props.add("kafkaManualCommitFactory"); props.add("kerberosBeforeReloginMinTime"); diff --git a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json index b7e0f5bb104..29422a48c6c 100644 --- a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json +++ b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json @@ -61,6 +61,7 @@ "valueDeserializer": { "kind": "property", "displayName": "Value Deserializer", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "org.apache.kafka.common.serialization.StringDeserializer", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Deserializer class for value th [...] "createConsumerBackoffInterval": { "kind": "property", "displayName": "Create Consumer Backoff Interval", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 5000, "description": "The delay in millis seconds to wait before trying again to create the kafka consumer (kafka-client)." }, "createConsumerBackoffMaxAttempts": { "kind": "property", "displayName": "Create Consumer Backoff Max Attempts", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "description": "Maximum attempts to create the kafka consumer (kafka-client), before eventually giving up and failing. Error during creating the consumer may be fatal due to invalid configuration an [...] + "isolationLevel": { "kind": "property", "displayName": "Isolation Level", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "read_uncommitted", "read_committed" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "read_uncommitted", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Control [...] "kafkaManualCommitFactory": { "kind": "property", "displayName": "Kafka Manual Commit Factory", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory", "deprecated": false, "autowired": true, "secret": false, "description": "Factory to use for creating KafkaManualCommit instances. This allows to plugin a custom factory to create custom KafkaManualCommit instanc [...] "pollExceptionStrategy": { "kind": "property", "displayName": "Poll Exception Strategy", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.PollExceptionStrategy", "deprecated": false, "autowired": true, "secret": false, "description": "To use a custom strategy with the consumer to control how to handle exceptions thrown from the Kafka broker while pooling messages." }, "subscribeConsumerBackoffInterval": { "kind": "property", "displayName": "Subscribe Consumer Backoff Interval", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 5000, "description": "The delay in millis seconds to wait before trying again to subscribe to the kafka broker." }, @@ -184,6 +185,7 @@ "valueDeserializer": { "kind": "parameter", "displayName": "Value Deserializer", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "org.apache.kafka.common.serialization.StringDeserializer", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Deserializer class for value t [...] "exceptionHandler": { "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the con [...] "exchangePattern": { "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut", "InOptionalOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." }, + "isolationLevel": { "kind": "parameter", "displayName": "Isolation Level", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "read_uncommitted", "read_committed" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "read_uncommitted", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Contro [...] "kafkaManualCommitFactory": { "kind": "parameter", "displayName": "Kafka Manual Commit Factory", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory", "deprecated": false, "autowired": false, "secret": false, "description": "Factory to use for creating KafkaManualCommit instances. This allows to plugin a custom factory to create custom KafkaManualCommit insta [...] "bufferMemorySize": { "kind": "parameter", "displayName": "Buffer Memory Size", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "33554432", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The total bytes of memory the producer can use to buffer records waiting to b [...] "compressionCodec": { "kind": "parameter", "displayName": "Compression Codec", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "none", "gzip", "snappy", "lz4" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "none", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "This parameter allows you to specify th [...] diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java index df624832871..7211084f567 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java @@ -145,6 +145,8 @@ public class KafkaConfiguration implements Cloneable, HeaderFilterStrategyAware private PollOnError pollOnError = PollOnError.ERROR_HANDLER; @UriParam(label = "consumer", defaultValue = "5000", javaType = "java.time.Duration") private Long commitTimeoutMs = 5000L; + @UriParam(label = "consumer,advanced", defaultValue = "read_uncommitted", enums = "read_uncommitted,read_committed") + private String isolationLevel; // Producer configuration properties @UriParam(label = "producer", defaultValue = KafkaConstants.KAFKA_DEFAULT_PARTITIONER) @@ -466,6 +468,7 @@ public class KafkaConfiguration implements Cloneable, HeaderFilterStrategyAware addPropertyIfNotEmpty(props, ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, getReconnectBackoffMs()); addPropertyIfNotEmpty(props, ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, getRetryBackoffMs()); addPropertyIfNotEmpty(props, ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, getReconnectBackoffMaxMs()); + addPropertyIfNotEmpty(props, ConsumerConfig.ISOLATION_LEVEL_CONFIG, getIsolationLevel()); addPropertyIfNotEmpty(props, "schema.registry.url", getSchemaRegistryURL()); addPropertyIfNotFalse(props, "specific.avro.reader", isSpecificAvroReader()); @@ -1804,10 +1807,27 @@ public class KafkaConfiguration implements Cloneable, HeaderFilterStrategyAware /** * The maximum time, in milliseconds, that the code will wait for a synchronous commit to complete - * - * @param commitTimeoutMs */ public void setCommitTimeoutMs(Long commitTimeoutMs) { this.commitTimeoutMs = commitTimeoutMs; } + + public String getIsolationLevel() { + return isolationLevel; + } + + /** + * Controls how to read messages written transactionally. If set to read_committed, consumer.poll() will only return + * transactional messages which have been committed. If set to read_uncommitted (the default), consumer.poll() will + * return all messages, even transactional messages which have been aborted. Non-transactional messages will be + * returned unconditionally in either mode. Messages will always be returned in offset order. Hence, in + * read_committed mode, consumer.poll() will only return messages up to the last stable offset (LSO), which is the + * one less than the offset of the first open transaction. In particular any messages appearing after messages + * belonging to ongoing transactions will be withheld until the relevant transaction has been completed. As a + * result, read_committed</code> consumers will not be able to read up to the high watermark when there are in + * flight transactions. Further, when in read_committed the seekToEnd method will return the LSO + */ + public void setIsolationLevel(String isolationLevel) { + this.isolationLevel = isolationLevel; + } }