CAMEL-11215: Allow breakOnFirstError to be configured on component level also. 
Fixed CS


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/da1815d5
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/da1815d5
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/da1815d5

Branch: refs/heads/master
Commit: da1815d53205585ba10c16cd3d3e179ecc3f6a69
Parents: c171712
Author: Claus Ibsen <davscl...@apache.org>
Authored: Thu May 25 14:34:28 2017 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Thu May 25 14:34:28 2017 +0200

----------------------------------------------------------------------
 .../src/main/docs/kafka-component.adoc          |  3 ++-
 .../camel/component/kafka/KafkaComponent.java   | 20 ++++++++++++++++++++
 .../camel/component/kafka/KafkaConsumer.java    |  8 +++++---
 .../springboot/KafkaComponentConfiguration.java | 19 +++++++++++++++++++
 4 files changed, 46 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/da1815d5/components/camel-kafka/src/main/docs/kafka-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc 
b/components/camel-kafka/src/main/docs/kafka-component.adoc
index 5181e33..7b01d05 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -34,7 +34,7 @@ kafka:topic[?options]
 
 
 // component options: START
-The Kafka component supports 5 options which are listed below.
+The Kafka component supports 6 options which are listed below.
 
 
 
@@ -45,6 +45,7 @@ The Kafka component supports 5 options which are listed below.
 | **brokers** (common) | URL of the Kafka brokers to use. The format is 
host1:port1host2:port2 and the list can be a subset of brokers or a VIP 
pointing to a subset of brokers. This option is known as bootstrap.servers in 
the Kafka documentation. |  | String
 | **workerPool** (advanced) | To use a shared custom worker pool for continue 
routing Exchange after kafka server has acknowledge the message that was sent 
to it from KafkaProducer using asynchronous non-blocking processing. If using 
this option then you must handle the lifecycle of the thread pool to shut the 
pool down when no longer needed. |  | ExecutorService
 | **useGlobalSslContext Parameters** (security) | Enable usage of global SSL 
context parameters. | false | boolean
+| **breakOnFirstError** (consumer) | This options controls what happens when a 
consumer is processing an exchange and it fails. If the option is false then 
the consumer continues to the next message and processes it. If the option is 
true then the consumer breaks out and will seek back to offset of the message 
that caused a failure and then re-attempt to process this message. However this 
can lead to endless processing of the same message if its bound to fail every 
time eg a poison message. Therefore its recommended to deal with that for 
example by using Camel's error handler. | false | boolean
 | **resolveProperty Placeholders** (advanced) | Whether the component should 
resolve property placeholders on itself when starting. Only properties which 
are of String type can use property placeholders. | true | boolean
 |=======================================================================
 // component options: END

http://git-wip-us.apache.org/repos/asf/camel/blob/da1815d5/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
index 34b3fd5..175dda1 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
@@ -34,6 +34,8 @@ public class KafkaComponent extends UriEndpointComponent 
implements SSLContextPa
     private ExecutorService workerPool;
     @Metadata(label = "security", defaultValue = "false")
     private boolean useGlobalSslContextParameters;
+    @Metadata(label = "consumer", defaultValue = "false")
+    private boolean breakOnFirstError;
 
     public KafkaComponent() {
         super(KafkaEndpoint.class);
@@ -58,6 +60,7 @@ public class KafkaComponent extends UriEndpointComponent 
implements SSLContextPa
 
         endpoint.getConfiguration().setTopic(remaining);
         endpoint.getConfiguration().setWorkerPool(getWorkerPool());
+        
endpoint.getConfiguration().setBreakOnFirstError(isBreakOnFirstError());
 
         // brokers can be configured on either component or endpoint level
         // and the consumer and produce is aware of this and act accordingly
@@ -127,4 +130,21 @@ public class KafkaComponent extends UriEndpointComponent 
implements SSLContextPa
         this.useGlobalSslContextParameters = useGlobalSslContextParameters;
     }
 
+    public boolean isBreakOnFirstError() {
+        return breakOnFirstError;
+    }
+
+    /**
+     * This options controls what happens when a consumer is processing an 
exchange and it fails.
+     * If the option is <tt>false</tt> then the consumer continues to the next 
message and processes it.
+     * If the option is <tt>true</tt> then the consumer breaks out, and will 
seek back to offset of the
+     * message that caused a failure, and then re-attempt to process this 
message. However this can lead
+     * to endless processing of the same message if its bound to fail every 
time, eg a poison message.
+     * Therefore its recommended to deal with that for example by using 
Camel's error handler.
+     */
+    public void setBreakOnFirstError(boolean breakOnFirstError) {
+        this.breakOnFirstError = breakOnFirstError;
+    }
+
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/da1815d5/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index ad172cb..26bb126 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -84,7 +84,8 @@ public class KafkaConsumer extends DefaultConsumer {
 
     @Override
     protected void doStart() throws Exception {
-        log.info("Starting Kafka consumer");
+        log.info("Starting Kafka consumer on topic: {} with breakOnFirstError: 
{}",
+            endpoint.getConfiguration().getTopic(), 
endpoint.getConfiguration().isBreakOnFirstError());
         super.doStart();
 
         executor = endpoint.createExecutor();
@@ -97,7 +98,7 @@ public class KafkaConsumer extends DefaultConsumer {
 
     @Override
     protected void doStop() throws Exception {
-        log.info("Stopping Kafka consumer");
+        log.info("Stopping Kafka consumer on topic: {}", 
endpoint.getConfiguration().getTopic());
 
         if (executor != null) {
             if (getEndpoint() != null && getEndpoint().getCamelContext() != 
null) {
@@ -246,7 +247,8 @@ public class KafkaConsumer extends DefaultConsumer {
                                     // processing failed due to an unhandled 
exception, what should we do
                                     if 
(endpoint.getConfiguration().isBreakOnFirstError()) {
                                         // we are failing and we should break 
out
-                                        log.warn("Error during processing {} 
from topic: {}. Will seek consumer to offset: {} and re-connect and start 
polling again.", exchange, topicName, partitionLastOffset);
+                                        log.warn("Error during processing {} 
from topic: {}. Will seek consumer to offset: {} and re-connect and start 
polling again.",
+                                            exchange, topicName, 
partitionLastOffset);
                                         // force commit so we resume on next 
poll where we failed
                                         commitOffset(offsetRepository, 
partition, partitionLastOffset, true);
                                         // continue to next partition

http://git-wip-us.apache.org/repos/asf/camel/blob/da1815d5/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
----------------------------------------------------------------------
diff --git 
a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
 
b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
index 991f0c6..2ced5ac 100644
--- 
a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
+++ 
b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
@@ -61,6 +61,17 @@ public class KafkaComponentConfiguration
      */
     private Boolean useGlobalSslContextParameters = false;
     /**
+     * This options controls what happens when a consumer is processing an
+     * exchange and it fails. If the option is false then the consumer 
continues
+     * to the next message and processes it. If the option is true then the
+     * consumer breaks out and will seek back to offset of the message that
+     * caused a failure and then re-attempt to process this message. However
+     * this can lead to endless processing of the same message if its bound to
+     * fail every time eg a poison message. Therefore its recommended to deal
+     * with that for example by using Camel's error handler.
+     */
+    private Boolean breakOnFirstError = false;
+    /**
      * Whether the component should resolve property placeholders on itself 
when
      * starting. Only properties which are of String type can use property
      * placeholders.
@@ -101,6 +112,14 @@ public class KafkaComponentConfiguration
         this.useGlobalSslContextParameters = useGlobalSslContextParameters;
     }
 
+    public Boolean getBreakOnFirstError() {
+        return breakOnFirstError;
+    }
+
+    public void setBreakOnFirstError(Boolean breakOnFirstError) {
+        this.breakOnFirstError = breakOnFirstError;
+    }
+
     public Boolean getResolvePropertyPlaceholders() {
         return resolvePropertyPlaceholders;
     }

Reply via email to