This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new b07965c camel-kafka - Set default value in metadata for pollOnError option b07965c is described below commit b07965c2572f0d1b99ed7dd2eab4ec39a2b01d0b Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Tue Mar 23 06:21:36 2021 +0100 camel-kafka - Set default value in metadata for pollOnError option --- .../resources/org/apache/camel/catalog/docs/kafka-component.adoc | 4 ++-- .../generated/resources/org/apache/camel/component/kafka/kafka.json | 4 ++-- components/camel-kafka/src/main/docs/kafka-component.adoc | 4 ++-- .../main/java/org/apache/camel/component/kafka/KafkaComponent.java | 1 - .../java/org/apache/camel/component/kafka/KafkaConfiguration.java | 4 ++-- .../camel/builder/component/dsl/KafkaComponentBuilderFactory.java | 1 + .../camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java | 2 ++ docs/components/modules/ROOT/pages/kafka-component.adoc | 4 ++-- 8 files changed, 13 insertions(+), 11 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/kafka-component.adoc b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/kafka-component.adoc index 78044a8..14e80bf 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/kafka-component.adoc +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/kafka-component.adoc @@ -78,7 +78,7 @@ The Kafka component supports 101 options, which are listed below. | *maxPollRecords* (consumer) | The maximum number of records returned in a single call to poll() | 500 | Integer | *offsetRepository* (consumer) | The offset repository to use in order to locally store the offset of each partition of the topic. Defining one will disable the autocommit. | | StateRepository | *partitionAssignor* (consumer) | The class name of the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used | org.apache.kafka.clients.consumer.RangeAssignor | String -| *pollOnError* (consumer) | What to do if kafka threw an exception while polling for new messages. Will by default use the value from the component configuration unless an explicit value has been configured on the endpoint level. DISCARD will discard the message and continue to poll next message. ERROR_HANDLER will use Camel's error handler to process the exception, and afterwards continue to poll next message. RECONNECT will re-connect the consumer and try poll the message again RETRY [...] +| *pollOnError* (consumer) | What to do if kafka threw an exception while polling for new messages. Will by default use the value from the component configuration unless an explicit value has been configured on the endpoint level. DISCARD will discard the message and continue to poll next message. ERROR_HANDLER will use Camel's error handler to process the exception, and afterwards continue to poll next message. RECONNECT will re-connect the consumer and try poll the message again RETRY [...] | *pollTimeoutMs* (consumer) | The timeout used when polling the KafkaConsumer. | 5000 | Long | *seekTo* (consumer) | Set if KafkaConsumer will read from beginning or end on startup: beginning : read from beginning end : read from end This is replacing the earlier property seekToBeginning. There are 2 enums and the value can be one of: beginning, end | | String | *sessionTimeoutMs* (consumer) | The timeout used to detect failures when using Kafka's group management facilities. | 10000 | Integer @@ -208,7 +208,7 @@ with the following path and query parameters: | *maxPollRecords* (consumer) | The maximum number of records returned in a single call to poll() | 500 | Integer | *offsetRepository* (consumer) | The offset repository to use in order to locally store the offset of each partition of the topic. Defining one will disable the autocommit. | | StateRepository | *partitionAssignor* (consumer) | The class name of the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used | org.apache.kafka.clients.consumer.RangeAssignor | String -| *pollOnError* (consumer) | What to do if kafka threw an exception while polling for new messages. Will by default use the value from the component configuration unless an explicit value has been configured on the endpoint level. DISCARD will discard the message and continue to poll next message. ERROR_HANDLER will use Camel's error handler to process the exception, and afterwards continue to poll next message. RECONNECT will re-connect the consumer and try poll the message again RETRY [...] +| *pollOnError* (consumer) | What to do if kafka threw an exception while polling for new messages. Will by default use the value from the component configuration unless an explicit value has been configured on the endpoint level. DISCARD will discard the message and continue to poll next message. ERROR_HANDLER will use Camel's error handler to process the exception, and afterwards continue to poll next message. RECONNECT will re-connect the consumer and try poll the message again RETRY [...] | *pollTimeoutMs* (consumer) | The timeout used when polling the KafkaConsumer. | 5000 | Long | *seekTo* (consumer) | Set if KafkaConsumer will read from beginning or end on startup: beginning : read from beginning end : read from end This is replacing the earlier property seekToBeginning. There are 2 enums and the value can be one of: beginning, end | | String | *sessionTimeoutMs* (consumer) | The timeout used to detect failures when using Kafka's group management facilities. | 10000 | Integer diff --git a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json index b194d54..6e20197 100644 --- a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json +++ b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json @@ -52,7 +52,7 @@ "maxPollRecords": { "kind": "property", "displayName": "Max Poll Records", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "500", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The maximum number of records returned in a single call to poll()" }, "offsetRepository": { "kind": "property", "displayName": "Offset Repository", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.spi.StateRepository<java.lang.String, java.lang.String>", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The offset repository to use in order to locally stor [...] "partitionAssignor": { "kind": "property", "displayName": "Partition Assignor", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "org.apache.kafka.clients.consumer.RangeAssignor", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The class name of the partition assignme [...] - "pollOnError": { "kind": "property", "displayName": "Poll On Error", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.PollOnError", "enum": [ "DISCARD", "ERROR_HANDLER", "RECONNECT", "RETRY", "STOP" ], "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "What to do if kafka [...] + "pollOnError": { "kind": "property", "displayName": "Poll On Error", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.PollOnError", "enum": [ "DISCARD", "ERROR_HANDLER", "RECONNECT", "RETRY", "STOP" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "ERROR_HANDLER", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "de [...] "pollTimeoutMs": { "kind": "property", "displayName": "Poll Timeout Ms", "group": "consumer", "label": "consumer", "required": false, "type": "duration", "javaType": "java.lang.Long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "5000", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The timeout used when polling the KafkaConsumer." }, "seekTo": { "kind": "property", "displayName": "Seek To", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "beginning", "end" ], "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Set if KafkaConsumer will read from beginning or end on startup: beginning : read from beginning [...] "sessionTimeoutMs": { "kind": "property", "displayName": "Session Timeout Ms", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "10000", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The timeout used to detect failures when using Kafka's group management facilities." }, @@ -155,7 +155,7 @@ "maxPollRecords": { "kind": "parameter", "displayName": "Max Poll Records", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "500", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The maximum number of records returned in a single call to poll()" }, "offsetRepository": { "kind": "parameter", "displayName": "Offset Repository", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.spi.StateRepository<java.lang.String, java.lang.String>", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The offset repository to use in order to locally sto [...] "partitionAssignor": { "kind": "parameter", "displayName": "Partition Assignor", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "org.apache.kafka.clients.consumer.RangeAssignor", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The class name of the partition assignm [...] - "pollOnError": { "kind": "parameter", "displayName": "Poll On Error", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.PollOnError", "enum": [ "DISCARD", "ERROR_HANDLER", "RECONNECT", "RETRY", "STOP" ], "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "What to do if kafka [...] + "pollOnError": { "kind": "parameter", "displayName": "Poll On Error", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.PollOnError", "enum": [ "DISCARD", "ERROR_HANDLER", "RECONNECT", "RETRY", "STOP" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "ERROR_HANDLER", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "d [...] "pollTimeoutMs": { "kind": "parameter", "displayName": "Poll Timeout Ms", "group": "consumer", "label": "consumer", "required": false, "type": "duration", "javaType": "java.lang.Long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "5000", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The timeout used when polling the KafkaConsumer." }, "seekTo": { "kind": "parameter", "displayName": "Seek To", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "beginning", "end" ], "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Set if KafkaConsumer will read from beginning or end on startup: beginning : read from beginning [...] "sessionTimeoutMs": { "kind": "parameter", "displayName": "Session Timeout Ms", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "10000", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The timeout used to detect failures when using Kafka's group management facilities." }, diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc index 78044a8..14e80bf 100644 --- a/components/camel-kafka/src/main/docs/kafka-component.adoc +++ b/components/camel-kafka/src/main/docs/kafka-component.adoc @@ -78,7 +78,7 @@ The Kafka component supports 101 options, which are listed below. | *maxPollRecords* (consumer) | The maximum number of records returned in a single call to poll() | 500 | Integer | *offsetRepository* (consumer) | The offset repository to use in order to locally store the offset of each partition of the topic. Defining one will disable the autocommit. | | StateRepository | *partitionAssignor* (consumer) | The class name of the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used | org.apache.kafka.clients.consumer.RangeAssignor | String -| *pollOnError* (consumer) | What to do if kafka threw an exception while polling for new messages. Will by default use the value from the component configuration unless an explicit value has been configured on the endpoint level. DISCARD will discard the message and continue to poll next message. ERROR_HANDLER will use Camel's error handler to process the exception, and afterwards continue to poll next message. RECONNECT will re-connect the consumer and try poll the message again RETRY [...] +| *pollOnError* (consumer) | What to do if kafka threw an exception while polling for new messages. Will by default use the value from the component configuration unless an explicit value has been configured on the endpoint level. DISCARD will discard the message and continue to poll next message. ERROR_HANDLER will use Camel's error handler to process the exception, and afterwards continue to poll next message. RECONNECT will re-connect the consumer and try poll the message again RETRY [...] | *pollTimeoutMs* (consumer) | The timeout used when polling the KafkaConsumer. | 5000 | Long | *seekTo* (consumer) | Set if KafkaConsumer will read from beginning or end on startup: beginning : read from beginning end : read from end This is replacing the earlier property seekToBeginning. There are 2 enums and the value can be one of: beginning, end | | String | *sessionTimeoutMs* (consumer) | The timeout used to detect failures when using Kafka's group management facilities. | 10000 | Integer @@ -208,7 +208,7 @@ with the following path and query parameters: | *maxPollRecords* (consumer) | The maximum number of records returned in a single call to poll() | 500 | Integer | *offsetRepository* (consumer) | The offset repository to use in order to locally store the offset of each partition of the topic. Defining one will disable the autocommit. | | StateRepository | *partitionAssignor* (consumer) | The class name of the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used | org.apache.kafka.clients.consumer.RangeAssignor | String -| *pollOnError* (consumer) | What to do if kafka threw an exception while polling for new messages. Will by default use the value from the component configuration unless an explicit value has been configured on the endpoint level. DISCARD will discard the message and continue to poll next message. ERROR_HANDLER will use Camel's error handler to process the exception, and afterwards continue to poll next message. RECONNECT will re-connect the consumer and try poll the message again RETRY [...] +| *pollOnError* (consumer) | What to do if kafka threw an exception while polling for new messages. Will by default use the value from the component configuration unless an explicit value has been configured on the endpoint level. DISCARD will discard the message and continue to poll next message. ERROR_HANDLER will use Camel's error handler to process the exception, and afterwards continue to poll next message. RECONNECT will re-connect the consumer and try poll the message again RETRY [...] | *pollTimeoutMs* (consumer) | The timeout used when polling the KafkaConsumer. | 5000 | Long | *seekTo* (consumer) | Set if KafkaConsumer will read from beginning or end on startup: beginning : read from beginning end : read from end This is replacing the earlier property seekToBeginning. There are 2 enums and the value can be one of: beginning, end | | String | *sessionTimeoutMs* (consumer) | The timeout used to detect failures when using Kafka's group management facilities. | 10000 | Integer diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java index aa536ea..c39200b 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java @@ -41,7 +41,6 @@ public class KafkaComponent extends DefaultComponent implements SSLContextParame private PollExceptionStrategy pollExceptionStrategy; public KafkaComponent() { - configuration.setPollOnError(PollOnError.ERROR_HANDLER); } public KafkaComponent(CamelContext context) { diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java index edad497..4126bd8 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java @@ -140,8 +140,8 @@ public class KafkaConfiguration implements Cloneable, HeaderFilterStrategyAware private boolean breakOnFirstError; @UriParam(label = "consumer") private StateRepository<String, String> offsetRepository; - @UriParam(label = "consumer") - private PollOnError pollOnError; + @UriParam(label = "consumer", defaultValue = "ERROR_HANDLER") + private PollOnError pollOnError = PollOnError.ERROR_HANDLER; // Producer configuration properties @UriParam(label = "producer", defaultValue = KafkaConstants.KAFKA_DEFAULT_PARTITIONER) diff --git a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java index 4c7611c..ce62711 100644 --- a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java +++ b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java @@ -621,6 +621,7 @@ public interface KafkaComponentBuilderFactory { * <code>org.apache.camel.component.kafka.PollOnError</code> * type. * + * Default: ERROR_HANDLER * Group: consumer * * @param pollOnError the value to set diff --git a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java index 5c18f44..b5b2505 100644 --- a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java +++ b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java @@ -1039,6 +1039,7 @@ public interface KafkaEndpointBuilderFactory { * <code>org.apache.camel.component.kafka.PollOnError</code> * type. * + * Default: ERROR_HANDLER * Group: consumer * * @param pollOnError the value to set @@ -1064,6 +1065,7 @@ public interface KafkaEndpointBuilderFactory { * <code>org.apache.camel.component.kafka.PollOnError</code> * type. * + * Default: ERROR_HANDLER * Group: consumer * * @param pollOnError the value to set diff --git a/docs/components/modules/ROOT/pages/kafka-component.adoc b/docs/components/modules/ROOT/pages/kafka-component.adoc index 3de8eba..02aca65 100644 --- a/docs/components/modules/ROOT/pages/kafka-component.adoc +++ b/docs/components/modules/ROOT/pages/kafka-component.adoc @@ -80,7 +80,7 @@ The Kafka component supports 101 options, which are listed below. | *maxPollRecords* (consumer) | The maximum number of records returned in a single call to poll() | 500 | Integer | *offsetRepository* (consumer) | The offset repository to use in order to locally store the offset of each partition of the topic. Defining one will disable the autocommit. | | StateRepository | *partitionAssignor* (consumer) | The class name of the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used | org.apache.kafka.clients.consumer.RangeAssignor | String -| *pollOnError* (consumer) | What to do if kafka threw an exception while polling for new messages. Will by default use the value from the component configuration unless an explicit value has been configured on the endpoint level. DISCARD will discard the message and continue to poll next message. ERROR_HANDLER will use Camel's error handler to process the exception, and afterwards continue to poll next message. RECONNECT will re-connect the consumer and try poll the message again RETRY [...] +| *pollOnError* (consumer) | What to do if kafka threw an exception while polling for new messages. Will by default use the value from the component configuration unless an explicit value has been configured on the endpoint level. DISCARD will discard the message and continue to poll next message. ERROR_HANDLER will use Camel's error handler to process the exception, and afterwards continue to poll next message. RECONNECT will re-connect the consumer and try poll the message again RETRY [...] | *pollTimeoutMs* (consumer) | The timeout used when polling the KafkaConsumer. | 5000 | Long | *seekTo* (consumer) | Set if KafkaConsumer will read from beginning or end on startup: beginning : read from beginning end : read from end This is replacing the earlier property seekToBeginning. There are 2 enums and the value can be one of: beginning, end | | String | *sessionTimeoutMs* (consumer) | The timeout used to detect failures when using Kafka's group management facilities. | 10000 | Integer @@ -210,7 +210,7 @@ with the following path and query parameters: | *maxPollRecords* (consumer) | The maximum number of records returned in a single call to poll() | 500 | Integer | *offsetRepository* (consumer) | The offset repository to use in order to locally store the offset of each partition of the topic. Defining one will disable the autocommit. | | StateRepository | *partitionAssignor* (consumer) | The class name of the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used | org.apache.kafka.clients.consumer.RangeAssignor | String -| *pollOnError* (consumer) | What to do if kafka threw an exception while polling for new messages. Will by default use the value from the component configuration unless an explicit value has been configured on the endpoint level. DISCARD will discard the message and continue to poll next message. ERROR_HANDLER will use Camel's error handler to process the exception, and afterwards continue to poll next message. RECONNECT will re-connect the consumer and try poll the message again RETRY [...] +| *pollOnError* (consumer) | What to do if kafka threw an exception while polling for new messages. Will by default use the value from the component configuration unless an explicit value has been configured on the endpoint level. DISCARD will discard the message and continue to poll next message. ERROR_HANDLER will use Camel's error handler to process the exception, and afterwards continue to poll next message. RECONNECT will re-connect the consumer and try poll the message again RETRY [...] | *pollTimeoutMs* (consumer) | The timeout used when polling the KafkaConsumer. | 5000 | Long | *seekTo* (consumer) | Set if KafkaConsumer will read from beginning or end on startup: beginning : read from beginning end : read from end This is replacing the earlier property seekToBeginning. There are 2 enums and the value can be one of: beginning, end | | String | *sessionTimeoutMs* (consumer) | The timeout used to detect failures when using Kafka's group management facilities. | 10000 | Integer