This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch kafka-idempotency in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 94fcc9e6926df78e296e626ee79c30f567fe160b Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Thu Nov 19 14:50:31 2020 +0100 Added Support for KafkaIdempotentRepository --- core/pom.xml | 4 +++ .../camel/kafkaconnector/CamelConnectorConfig.java | 14 +++++++++- .../kafkaconnector/CamelSinkConnectorConfig.java | 5 +++- .../apache/camel/kafkaconnector/CamelSinkTask.java | 6 +++++ .../kafkaconnector/CamelSourceConnectorConfig.java | 5 +++- .../camel/kafkaconnector/CamelSourceTask.java | 6 +++++ .../utils/CamelKafkaConnectMain.java | 31 +++++++++++++++++++++- 7 files changed, 67 insertions(+), 4 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index c7ea38d..825f908 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -49,6 +49,10 @@ </dependency> <dependency> <groupId>org.apache.camel</groupId> + <artifactId>camel-kafka</artifactId> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> <artifactId>camel-core-languages</artifactId> </dependency> diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java index 0332576..42c9a85 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java @@ -49,7 +49,11 @@ public abstract class CamelConnectorConfig extends AbstractConfig { public static final Boolean CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_DEFAULT = false; public static final String CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF = "camel.idempotency.enabled"; - public static final String CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_DOC = "If in memory idempotency must be enabled or not"; + public static final String CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_DOC = "If idempotency must be enabled or not"; + + public static final String CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_DEFAULT = "memory"; + public static final String CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_CONF = "camel.idempotency.repository.type"; + public static final String CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_DOC = "The idempotent repository type to use, possible values are memory and kafka"; public static final String CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_DEFAULT = "body"; public static final String CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF = "camel.idempotency.expression.type"; @@ -63,6 +67,14 @@ public abstract class CamelConnectorConfig extends AbstractConfig { public static final String CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF = "camel.idempotency.memory.dimension"; public static final String CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DOC = "The Memory dimension of the in memory idempotent Repository"; + public static final String CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_DEFAULT = "kafka_idempotent_repository"; + public static final String CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_CONF = "camel.idempotency.kafka.topic"; + public static final String CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_DOC = "The Kafka topic name to use for the idempotent repository"; + + public static final String CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DEFAULT = "localhost:9092"; + public static final String CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF = "camel.idempotency.kafka.bootstrap.servers"; + public static final String CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DOC = "A comma-separated list of host and port pairs that are the addresses of the Kafka brokers where the idempotent repository should live"; + protected CamelConnectorConfig(ConfigDef definition, Map<?, ?> originals, Map<String, ?> configProviderProps, boolean doLog) { super(definition, originals, configProviderProps, doLog); } diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java index 299d578..26f9a6f 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java @@ -61,7 +61,10 @@ public class CamelSinkConnectorConfig extends CamelConnectorConfig { .define(CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, Type.BOOLEAN, CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_DOC) .define(CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_DOC) .define(CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_DOC) - .define(CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DOC); + .define(CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DOC) + .define(CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_DOC) + .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_DOC) + .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DOC); public CamelSinkConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) { super(config, parsedConfig); diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java index 2cc01a2..256de92 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java @@ -89,6 +89,9 @@ public class CamelSinkTask extends SinkTask { final String expressionType = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF); final String expressionHeader = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF); final int memoryDimension = config.getInt(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF); + final String idempotentRepositoryType = config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_CONF); + final String idempotentRepositoryKafkaTopic = config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_CONF); + final String idempotentRepositoryBootstrapServers = config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF); CamelContext camelContext = new DefaultCamelContext(); if (remoteUrl == null) { @@ -112,6 +115,9 @@ public class CamelSinkTask extends SinkTask { .withExpressionType(expressionType) .withExpressionHeader(expressionHeader) .withMemoryDimension(memoryDimension) + .withIdempotentRepositoryType(idempotentRepositoryType) + .withIdempotentRepositoryTopicName(idempotentRepositoryKafkaTopic) + .withIdempotentRepositoryKafkaServers(idempotentRepositoryBootstrapServers) .build(camelContext); diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java index 70e2b09..edc8e41 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java @@ -97,7 +97,10 @@ public class CamelSourceConnectorConfig extends CamelConnectorConfig { .define(CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, Type.BOOLEAN, CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_DOC) .define(CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_DOC) .define(CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_DOC) - .define(CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DOC); + .define(CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DOC) + .define(CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_DOC) + .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_DOC) + .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DOC); public CamelSourceConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) { super(config, parsedConfig); diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java index 4e893ab..439984e 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java @@ -95,6 +95,9 @@ public class CamelSourceTask extends SourceTask { final String expressionType = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF); final String expressionHeader = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF); final int memoryDimension = config.getInt(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF); + final String idempotentRepositoryType = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_CONF); + final String idempotentRepositoryKafkaTopic = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_CONF); + final String idempotentRepositoryBootstrapServers = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF); topics = config.getString(CamelSourceConnectorConfig.TOPIC_CONF).split(","); @@ -121,6 +124,9 @@ public class CamelSourceTask extends SourceTask { .withExpressionType(expressionType) .withExpressionHeader(expressionHeader) .withMemoryDimension(memoryDimension) + .withIdempotentRepositoryType(idempotentRepositoryType) + .withIdempotentRepositoryTopicName(idempotentRepositoryKafkaTopic) + .withIdempotentRepositoryKafkaServers(idempotentRepositoryBootstrapServers) .build(camelContext); consumer = cms.getCamelContext().getEndpoint(localUrl).createPollingConsumer(); diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java index bebe4a9..8f95655 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java @@ -28,6 +28,7 @@ import org.apache.camel.builder.RouteBuilder; import org.apache.camel.kafkaconnector.CamelConnectorConfig; import org.apache.camel.main.SimpleMain; import org.apache.camel.model.RouteDefinition; +import org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository; import org.apache.camel.spi.IdempotentRepository; import org.apache.camel.support.processor.idempotent.MemoryIdempotentRepository; import org.apache.camel.support.service.ServiceHelper; @@ -100,6 +101,9 @@ public class CamelKafkaConnectMain extends SimpleMain { private String expressionType; private String expressionHeader; private int memoryDimension; + private String idempotentRepositoryType; + private String idempotentRepositoryTopicName; + private String idempotentRepositoryKafkaServers; public Builder(String from, String to) { this.from = from; @@ -165,6 +169,21 @@ public class CamelKafkaConnectMain extends SimpleMain { this.memoryDimension = memoryDimension; return this; } + + public Builder withIdempotentRepositoryType(String idempotentRepositoryType) { + this.idempotentRepositoryType = idempotentRepositoryType; + return this; + } + + public Builder withIdempotentRepositoryTopicName(String idempotentRepositoryTopicName) { + this.idempotentRepositoryTopicName = idempotentRepositoryTopicName; + return this; + } + + public Builder withIdempotentRepositoryKafkaServers(String idempotentRepositoryKafkaServers) { + this.idempotentRepositoryKafkaServers = idempotentRepositoryKafkaServers; + return this; + } public CamelKafkaConnectMain build(CamelContext camelContext) { CamelKafkaConnectMain camelMain = new CamelKafkaConnectMain(camelContext); @@ -178,7 +197,17 @@ public class CamelKafkaConnectMain extends SimpleMain { // Instantianting the idempotent Repository here and inject it in registry to be referenced if (idempotencyEnabled) { - IdempotentRepository idempotentRepo = MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension); + IdempotentRepository idempotentRepo = null; + switch (idempotentRepositoryType) { + case "memory": + idempotentRepo = MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension); + break; + case "kafka": + idempotentRepo = new KafkaIdempotentRepository(idempotentRepositoryTopicName, idempotentRepositoryKafkaServers); + break; + default: + break; + } camelMain.getCamelContext().getRegistry().bind("idempotentRepository", idempotentRepo); }