This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 729f08524639f41cc735be7716e5c3b970168af7 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Thu Oct 29 22:59:07 2020 +0100 First attempt Camel Error Handling level --- .../camel/kafkaconnector/CamelConnectorConfig.java | 12 ++++++++ .../kafkaconnector/CamelSinkConnectorConfig.java | 5 +++- .../apache/camel/kafkaconnector/CamelSinkTask.java | 7 ++++- .../kafkaconnector/CamelSourceConnectorConfig.java | 5 +++- .../camel/kafkaconnector/CamelSourceTask.java | 8 +++++- .../utils/CamelKafkaConnectMain.java | 32 ++++++++++++++++++++++ 6 files changed, 65 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java index ae67309..4271938 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java @@ -35,6 +35,18 @@ public abstract class CamelConnectorConfig extends AbstractConfig { public static final String CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF = "camel.aggregation.timeout"; public static final String CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DOC = "The timeout of the aggregation, to be used in combination with camel.beans.aggregate"; + public static final String CAMEL_CONNECTOR_ERROR_HANDLER_DEFAULT = "default"; + public static final String CAMEL_CONNECTOR_ERROR_HANDLER_CONF = "camel.error.handler"; + public static final String CAMEL_CONNECTOR_ERROR_HANDLER_DOC = "The error handler to use: possible value are 'no' or 'default'"; + + public static final int CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DEFAULT = 0; + public static final String CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_CONF = "camel.error.handler.max.redeliveries"; + public static final String CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DOC = "The maximum redeliveries to be use in case of Default Error Handler"; + + public static final Long CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DEFAULT = 1000L; + public static final String CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF = "camel.error.handler.redelivery.delay"; + public static final String CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DOC = "The initial redelivery delay in milliseconds in case of Default Error Handler"; + protected CamelConnectorConfig(ConfigDef definition, Map<?, ?> originals, Map<String, ?> configProviderProps, boolean doLog) { super(definition, originals, configProviderProps, doLog); } diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java index 980aad1..e86e921 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java @@ -54,7 +54,10 @@ public class CamelSinkConnectorConfig extends CamelConnectorConfig { .define(CAMEL_SINK_CONTENT_LOG_LEVEL_CONF, Type.STRING, CAMEL_SINK_CONTENT_LOG_LEVEL_DEFAULT, Importance.HIGH, CAMEL_SINK_CONTENT_LOG_LEVEL_DOC) .define(CAMEL_CONNECTOR_AGGREGATE_CONF, Type.STRING, CAMEL_CONNECTOR_AGGREGATE_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_DOC) .define(CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, Type.INT, CAMEL_CONNECTOR_AGGREGATE_SIZE_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_SIZE_DOC) - .define(CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, Type.LONG, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DOC); + .define(CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, Type.LONG, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DOC) + .define(CAMEL_CONNECTOR_ERROR_HANDLER_CONF, Type.STRING, CAMEL_CONNECTOR_ERROR_HANDLER_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_ERROR_HANDLER_DOC) + .define(CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_CONF, Type.INT, CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DOC) + .define(CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF, Type.LONG, CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DOC); public CamelSinkConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) { super(config, parsedConfig); diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java index 08967cc..a38afea 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java @@ -82,7 +82,9 @@ public class CamelSinkTask extends SinkTask { final String unmarshaller = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_UNMARSHAL_CONF); final int size = config.getInt(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF); final long timeout = config.getLong(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF); - + final int maxRedeliveries = config.getInt(CamelSinkConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_CONF); + final long redeliveryDelay = config.getLong(CamelSinkConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF); + final String errorHandler = config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_CONF); CamelContext camelContext = new DefaultCamelContext(); if (remoteUrl == null) { remoteUrl = TaskHelper.buildUrl(camelContext, @@ -98,6 +100,9 @@ public class CamelSinkTask extends SinkTask { .withMarshallDataFormat(marshaller) .withAggregationSize(size) .withAggregationTimeout(timeout) + .withErrorHandler(errorHandler) + .withMaxRedeliveries(maxRedeliveries) + .withRedeliveryDelay(redeliveryDelay) .build(camelContext); diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java index bdfc5c7..2ecfb45 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java @@ -90,7 +90,10 @@ public class CamelSourceConnectorConfig extends CamelConnectorConfig { .define(CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF, Type.STRING, CAMEL_SOURCE_CONTENT_LOG_LEVEL_DEFAULT, Importance.HIGH, CAMEL_SOURCE_CONTENT_LOG_LEVEL_DOC) .define(CAMEL_CONNECTOR_AGGREGATE_CONF, Type.STRING, CAMEL_CONNECTOR_AGGREGATE_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_DOC) .define(CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, Type.INT, CAMEL_CONNECTOR_AGGREGATE_SIZE_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_SIZE_DOC) - .define(CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, Type.LONG, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DOC); + .define(CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, Type.LONG, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DOC) + .define(CAMEL_CONNECTOR_ERROR_HANDLER_CONF, Type.STRING, CAMEL_CONNECTOR_ERROR_HANDLER_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_ERROR_HANDLER_DOC) + .define(CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_CONF, Type.INT, CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DOC) + .define(CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF, Type.LONG, CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DOC); public CamelSourceConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) { super(config, parsedConfig); diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java index aef600f..165ffff 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java @@ -88,7 +88,10 @@ public class CamelSourceTask extends SourceTask { final String marshaller = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_MARSHAL_CONF); final int size = config.getInt(CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF); final long timeout = config.getLong(CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF); - + final int maxRedeliveries = config.getInt(CamelSinkConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_CONF); + final long redeliveryDelay = config.getLong(CamelSinkConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF); + final String errorHandler = config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_CONF); + topics = config.getString(CamelSourceConnectorConfig.TOPIC_CONF).split(","); String localUrl = getLocalUrlWithPollingOptions(config); @@ -107,6 +110,9 @@ public class CamelSourceTask extends SourceTask { .withMarshallDataFormat(marshaller) .withAggregationSize(size) .withAggregationTimeout(timeout) + .withErrorHandler(errorHandler) + .withMaxRedeliveries(maxRedeliveries) + .withRedeliveryDelay(redeliveryDelay) .build(camelContext); consumer = cms.getCamelContext().getEndpoint(localUrl).createPollingConsumer(); diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java index 36ec56a..3dbf045 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java @@ -91,6 +91,9 @@ public class CamelKafkaConnectMain extends SimpleMain { private String unmarshallDataFormat; private int aggregationSize; private long aggregationTimeout; + private String errorHandler; + private int maxRedeliveries; + private long redeliveryDelay; public Builder(String from, String to) { this.from = from; @@ -121,6 +124,21 @@ public class CamelKafkaConnectMain extends SimpleMain { this.aggregationTimeout = aggregationTimeout; return this; } + + public Builder withErrorHandler(String errorHandler) { + this.errorHandler = errorHandler; + return this; + } + + public Builder withMaxRedeliveries(int maxRedeliveries) { + this.maxRedeliveries = maxRedeliveries; + return this; + } + + public Builder withRedeliveryDelay(long redeliveryDelay) { + this.redeliveryDelay = redeliveryDelay; + return this; + } public CamelKafkaConnectMain build(CamelContext camelContext) { CamelKafkaConnectMain camelMain = new CamelKafkaConnectMain(camelContext); @@ -138,6 +156,20 @@ public class CamelKafkaConnectMain extends SimpleMain { //from RouteDefinition rd = from(from); LOG.info("Creating Camel route from({})", from); + + if (!ObjectHelper.isEmpty(props.get(CamelConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_CONF))) { + String errorHandler = props.get(CamelConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_CONF); + switch (errorHandler) { + case "no": + rd.errorHandler(noErrorHandler()); + break; + case "default": + rd.errorHandler(defaultErrorHandler().maximumRedeliveries(maxRedeliveries).redeliveryDelay(redeliveryDelay)); + break; + default: + break; + } + } //dataformats if (!ObjectHelper.isEmpty(marshallDataFormat)) {