This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit ab9f9b1194004437809c7094e3bb26a68e3e74d5
Author: Andrea Cosentino <anco...@gmail.com>
AuthorDate: Thu Nov 19 07:24:02 2020 +0100

    Idempotency: Lets build the idempotentRepository before and use a reference 
to it in registry
---
 .../camel/kafkaconnector/utils/CamelKafkaConnectMain.java | 15 +++++++++++----
 1 file changed, 11 insertions(+), 4 deletions(-)

diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
 
b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
index acc97ca..1627a8d 100644
--- 
a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
+++ 
b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
@@ -28,6 +28,7 @@ import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.kafkaconnector.CamelConnectorConfig;
 import org.apache.camel.main.SimpleMain;
 import org.apache.camel.model.RouteDefinition;
+import org.apache.camel.spi.IdempotentRepository;
 import 
org.apache.camel.support.processor.idempotent.MemoryIdempotentRepository;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.util.ObjectHelper;
@@ -174,6 +175,12 @@ public class CamelKafkaConnectMain extends SimpleMain {
 
             LOG.info("Setting initial properties in Camel context: [{}]", 
camelProperties);
             camelMain.setInitialProperties(camelProperties);
+            
+            // Instantianting the idempotent Repository here and inject it in 
registry to be referenced
+            if (idempotencyEnabled) {
+               IdempotentRepository idempotentRepo = 
MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension);
+               
camelMain.getCamelContext().getRegistry().bind("idempotentRepository", 
idempotentRepo);
+            }
 
             //creating the actual route
             camelMain.configure().addRoutesBuilder(new RouteBuilder() {
@@ -213,14 +220,14 @@ public class CamelKafkaConnectMain extends SimpleMain {
                                     
LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({}).idempotentConsumer(body(),
 + "
                                            + 
"MemoryIdempotentRepository.memoryIdempotentRepository({}))", s, 
aggregationSize, aggregationTimeout, memoryDimension);
                                     LOG.info(".to({})", to);
-                                    
rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).idempotentConsumer(body(),
 
MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to);
+                                    
rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).idempotentConsumer(body()).messageIdRepositoryRef("idempotentRepository").toD(to);
                                     break;
                                 case "header":
                                     
LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({}).idempotentConsumer(header(expressionHeader),
 + "
                                            + 
"MemoryIdempotentRepository.memoryIdempotentRepository({}))", s, 
aggregationSize, aggregationTimeout, memoryDimension);
                                     LOG.info(".to({})", to);
                                     
rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout)
-                                        
.idempotentConsumer(header(expressionHeader), 
MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to);
+                                        
.idempotentConsumer(header(expressionHeader)).messageIdRepositoryRef("idempotentRepository").toD(to);
                                     break;
                                 default:
                                     break;
@@ -235,11 +242,11 @@ public class CamelKafkaConnectMain extends SimpleMain {
                             switch (expressionType) {
                                 case "body":
                                     LOG.info("idempotentConsumer(body(), 
MemoryIdempotentRepository.memoryIdempotentRepository({})).to({})", 
memoryDimension, to);
-                                    rd.idempotentConsumer(body(), 
MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to);
+                                    
rd.idempotentConsumer(body()).messageIdRepositoryRef("idempotentRepository").toD(to);
                                     break;
                                 case "header":
                                     
LOG.info("idempotentConsumer(header(expressionHeader), 
MemoryIdempotentRepository.memoryIdempotentRepository({})).to({})", 
memoryDimension, to);
-                                    
rd.idempotentConsumer(header(expressionHeader), 
MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to);
+                                    
rd.idempotentConsumer(header(expressionHeader)).messageIdRepositoryRef("idempotentRepository").toD(to);
                                     break;
                                 default:
                                     break;

Reply via email to