This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit ab9f9b1194004437809c7094e3bb26a68e3e74d5 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Thu Nov 19 07:24:02 2020 +0100 Idempotency: Lets build the idempotentRepository before and use a reference to it in registry --- .../camel/kafkaconnector/utils/CamelKafkaConnectMain.java | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java index acc97ca..1627a8d 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java @@ -28,6 +28,7 @@ import org.apache.camel.builder.RouteBuilder; import org.apache.camel.kafkaconnector.CamelConnectorConfig; import org.apache.camel.main.SimpleMain; import org.apache.camel.model.RouteDefinition; +import org.apache.camel.spi.IdempotentRepository; import org.apache.camel.support.processor.idempotent.MemoryIdempotentRepository; import org.apache.camel.support.service.ServiceHelper; import org.apache.camel.util.ObjectHelper; @@ -174,6 +175,12 @@ public class CamelKafkaConnectMain extends SimpleMain { LOG.info("Setting initial properties in Camel context: [{}]", camelProperties); camelMain.setInitialProperties(camelProperties); + + // Instantianting the idempotent Repository here and inject it in registry to be referenced + if (idempotencyEnabled) { + IdempotentRepository idempotentRepo = MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension); + camelMain.getCamelContext().getRegistry().bind("idempotentRepository", idempotentRepo); + } //creating the actual route camelMain.configure().addRoutesBuilder(new RouteBuilder() { @@ -213,14 +220,14 @@ public class CamelKafkaConnectMain extends SimpleMain { LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({}).idempotentConsumer(body(), + " + "MemoryIdempotentRepository.memoryIdempotentRepository({}))", s, aggregationSize, aggregationTimeout, memoryDimension); LOG.info(".to({})", to); - rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to); + rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).idempotentConsumer(body()).messageIdRepositoryRef("idempotentRepository").toD(to); break; case "header": LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({}).idempotentConsumer(header(expressionHeader), + " + "MemoryIdempotentRepository.memoryIdempotentRepository({}))", s, aggregationSize, aggregationTimeout, memoryDimension); LOG.info(".to({})", to); rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout) - .idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to); + .idempotentConsumer(header(expressionHeader)).messageIdRepositoryRef("idempotentRepository").toD(to); break; default: break; @@ -235,11 +242,11 @@ public class CamelKafkaConnectMain extends SimpleMain { switch (expressionType) { case "body": LOG.info("idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository({})).to({})", memoryDimension, to); - rd.idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to); + rd.idempotentConsumer(body()).messageIdRepositoryRef("idempotentRepository").toD(to); break; case "header": LOG.info("idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository({})).to({})", memoryDimension, to); - rd.idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to); + rd.idempotentConsumer(header(expressionHeader)).messageIdRepositoryRef("idempotentRepository").toD(to); break; default: break;