CAMEL-11215: Allow breakOnFirstError to be configured on component level also. Fixed CS
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4b98c9a9 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4b98c9a9 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4b98c9a9 Branch: refs/heads/camel-2.19.x Commit: 4b98c9a9b0f9d468298b99b11e32e3a971e7c810 Parents: 67608a3 Author: Claus Ibsen <davscl...@apache.org> Authored: Thu May 25 14:34:28 2017 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Thu May 25 14:36:05 2017 +0200 ---------------------------------------------------------------------- .../src/main/docs/kafka-component.adoc | 3 ++- .../camel/component/kafka/KafkaComponent.java | 20 ++++++++++++++++++++ .../camel/component/kafka/KafkaConsumer.java | 8 +++++--- .../springboot/KafkaComponentConfiguration.java | 19 +++++++++++++++++++ 4 files changed, 46 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/4b98c9a9/components/camel-kafka/src/main/docs/kafka-component.adoc ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc index 5181e33..7b01d05 100644 --- a/components/camel-kafka/src/main/docs/kafka-component.adoc +++ b/components/camel-kafka/src/main/docs/kafka-component.adoc @@ -34,7 +34,7 @@ kafka:topic[?options] // component options: START -The Kafka component supports 5 options which are listed below. +The Kafka component supports 6 options which are listed below. @@ -45,6 +45,7 @@ The Kafka component supports 5 options which are listed below. | **brokers** (common) | URL of the Kafka brokers to use. The format is host1:port1host2:port2 and the list can be a subset of brokers or a VIP pointing to a subset of brokers. This option is known as bootstrap.servers in the Kafka documentation. | | String | **workerPool** (advanced) | To use a shared custom worker pool for continue routing Exchange after kafka server has acknowledge the message that was sent to it from KafkaProducer using asynchronous non-blocking processing. If using this option then you must handle the lifecycle of the thread pool to shut the pool down when no longer needed. | | ExecutorService | **useGlobalSslContext Parameters** (security) | Enable usage of global SSL context parameters. | false | boolean +| **breakOnFirstError** (consumer) | This options controls what happens when a consumer is processing an exchange and it fails. If the option is false then the consumer continues to the next message and processes it. If the option is true then the consumer breaks out and will seek back to offset of the message that caused a failure and then re-attempt to process this message. However this can lead to endless processing of the same message if its bound to fail every time eg a poison message. Therefore its recommended to deal with that for example by using Camel's error handler. | false | boolean | **resolveProperty Placeholders** (advanced) | Whether the component should resolve property placeholders on itself when starting. Only properties which are of String type can use property placeholders. | true | boolean |======================================================================= // component options: END http://git-wip-us.apache.org/repos/asf/camel/blob/4b98c9a9/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java index 34b3fd5..175dda1 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java @@ -34,6 +34,8 @@ public class KafkaComponent extends UriEndpointComponent implements SSLContextPa private ExecutorService workerPool; @Metadata(label = "security", defaultValue = "false") private boolean useGlobalSslContextParameters; + @Metadata(label = "consumer", defaultValue = "false") + private boolean breakOnFirstError; public KafkaComponent() { super(KafkaEndpoint.class); @@ -58,6 +60,7 @@ public class KafkaComponent extends UriEndpointComponent implements SSLContextPa endpoint.getConfiguration().setTopic(remaining); endpoint.getConfiguration().setWorkerPool(getWorkerPool()); + endpoint.getConfiguration().setBreakOnFirstError(isBreakOnFirstError()); // brokers can be configured on either component or endpoint level // and the consumer and produce is aware of this and act accordingly @@ -127,4 +130,21 @@ public class KafkaComponent extends UriEndpointComponent implements SSLContextPa this.useGlobalSslContextParameters = useGlobalSslContextParameters; } + public boolean isBreakOnFirstError() { + return breakOnFirstError; + } + + /** + * This options controls what happens when a consumer is processing an exchange and it fails. + * If the option is <tt>false</tt> then the consumer continues to the next message and processes it. + * If the option is <tt>true</tt> then the consumer breaks out, and will seek back to offset of the + * message that caused a failure, and then re-attempt to process this message. However this can lead + * to endless processing of the same message if its bound to fail every time, eg a poison message. + * Therefore its recommended to deal with that for example by using Camel's error handler. + */ + public void setBreakOnFirstError(boolean breakOnFirstError) { + this.breakOnFirstError = breakOnFirstError; + } + + } http://git-wip-us.apache.org/repos/asf/camel/blob/4b98c9a9/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java index ad172cb..26bb126 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java @@ -84,7 +84,8 @@ public class KafkaConsumer extends DefaultConsumer { @Override protected void doStart() throws Exception { - log.info("Starting Kafka consumer"); + log.info("Starting Kafka consumer on topic: {} with breakOnFirstError: {}", + endpoint.getConfiguration().getTopic(), endpoint.getConfiguration().isBreakOnFirstError()); super.doStart(); executor = endpoint.createExecutor(); @@ -97,7 +98,7 @@ public class KafkaConsumer extends DefaultConsumer { @Override protected void doStop() throws Exception { - log.info("Stopping Kafka consumer"); + log.info("Stopping Kafka consumer on topic: {}", endpoint.getConfiguration().getTopic()); if (executor != null) { if (getEndpoint() != null && getEndpoint().getCamelContext() != null) { @@ -246,7 +247,8 @@ public class KafkaConsumer extends DefaultConsumer { // processing failed due to an unhandled exception, what should we do if (endpoint.getConfiguration().isBreakOnFirstError()) { // we are failing and we should break out - log.warn("Error during processing {} from topic: {}. Will seek consumer to offset: {} and re-connect and start polling again.", exchange, topicName, partitionLastOffset); + log.warn("Error during processing {} from topic: {}. Will seek consumer to offset: {} and re-connect and start polling again.", + exchange, topicName, partitionLastOffset); // force commit so we resume on next poll where we failed commitOffset(offsetRepository, partition, partitionLastOffset, true); // continue to next partition http://git-wip-us.apache.org/repos/asf/camel/blob/4b98c9a9/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java ---------------------------------------------------------------------- diff --git a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java index e94f799..a220c06 100644 --- a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java +++ b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java @@ -58,6 +58,17 @@ public class KafkaComponentConfiguration { */ private Boolean useGlobalSslContextParameters = false; /** + * This options controls what happens when a consumer is processing an + * exchange and it fails. If the option is false then the consumer continues + * to the next message and processes it. If the option is true then the + * consumer breaks out and will seek back to offset of the message that + * caused a failure and then re-attempt to process this message. However + * this can lead to endless processing of the same message if its bound to + * fail every time eg a poison message. Therefore its recommended to deal + * with that for example by using Camel's error handler. + */ + private Boolean breakOnFirstError = false; + /** * Whether the component should resolve property placeholders on itself when * starting. Only properties which are of String type can use property * placeholders. @@ -98,6 +109,14 @@ public class KafkaComponentConfiguration { this.useGlobalSslContextParameters = useGlobalSslContextParameters; } + public Boolean getBreakOnFirstError() { + return breakOnFirstError; + } + + public void setBreakOnFirstError(Boolean breakOnFirstError) { + this.breakOnFirstError = breakOnFirstError; + } + public Boolean getResolvePropertyPlaceholders() { return resolvePropertyPlaceholders; }