This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit b120dabac70faa2f0b6dca0d6c399e432ed8673e
Author: Andrea Cosentino <anco...@gmail.com>
AuthorDate: Tue Dec 1 18:08:39 2020 +0100

    Support Poll timeout and key cache size as option for kafka idempotent 
repository
---
 .../apache/camel/kafkaconnector/CamelConnectorConfig.java  |  8 ++++++++
 .../camel/kafkaconnector/CamelSinkConnectorConfig.java     |  5 ++++-
 .../org/apache/camel/kafkaconnector/CamelSinkTask.java     |  4 ++++
 .../camel/kafkaconnector/CamelSourceConnectorConfig.java   |  4 +++-
 .../org/apache/camel/kafkaconnector/CamelSourceTask.java   |  4 ++++
 .../camel/kafkaconnector/utils/CamelKafkaConnectMain.java  | 14 +++++++++++++-
 6 files changed, 36 insertions(+), 3 deletions(-)

diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java
index 42c9a85..1b5e014 100644
--- 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java
+++ 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java
@@ -75,6 +75,14 @@ public abstract class CamelConnectorConfig extends 
AbstractConfig {
     public static final String 
CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF = 
"camel.idempotency.kafka.bootstrap.servers";
     public static final String  
CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DOC = "A comma-separated 
list of host and port pairs that are the addresses of the Kafka brokers where 
the idempotent repository should live"; 
     
+    public static final int 
CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_DEFAULT = 1000;
+    public static final String 
CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_CONF = 
"camel.idempotency.kafka.max.cache.size";
+    public static final String  
CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_DOC = "Sets the maximum size 
of the local key cache";
+    
+    public static final int 
CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DEFAULT = 100;
+    public static final String 
CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF = 
"camel.idempotency.kafka.poll.duration.ms";
+    public static final String  
CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DOC = "Sets the poll duration 
(in milliseconds) of the Kafka consumer";
+    
     protected CamelConnectorConfig(ConfigDef definition, Map<?, ?> originals, 
Map<String, ?> configProviderProps, boolean doLog) {
         super(definition, originals, configProviderProps, doLog);
     }
diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
index 26f9a6f..3d922fb 100644
--- 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
+++ 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
@@ -64,7 +64,10 @@ public class CamelSinkConnectorConfig extends 
CamelConnectorConfig {
         .define(CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF, Type.INT, 
CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DEFAULT, Importance.LOW, 
CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DOC)
         .define(CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_CONF, Type.STRING, 
CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_DEFAULT, Importance.LOW, 
CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_DOC)
         .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_CONF, Type.STRING, 
CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_DEFAULT, Importance.LOW, 
CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_DOC)
-        .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF, 
Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DEFAULT, 
Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DOC);
+        .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF, 
Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DEFAULT, 
Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DOC)
+        .define(CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_CONF, Type.STRING, 
CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_DEFAULT, Importance.LOW, 
CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_DOC)
+        .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_CONF, 
Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_DEFAULT, 
Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_DOC)
+        .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF, 
Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DEFAULT, 
Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DOC);
     
     public CamelSinkConnectorConfig(ConfigDef config, Map<String, String> 
parsedConfig) {
         super(config, parsedConfig);
diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index 21b6aa4..e4c9341 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -92,6 +92,8 @@ public class CamelSinkTask extends SinkTask {
             final String idempotentRepositoryType = 
config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_CONF);
             final String idempotentRepositoryKafkaTopic = 
config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_CONF);
             final String idempotentRepositoryBootstrapServers = 
config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF);
+            final int idempotentRepositoryKafkaMaxCacheSize = 
config.getInt(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_CONF);
+            final int idempotentRepositoryKafkaPollDuration = 
config.getInt(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF);
             
             CamelContext camelContext = new DefaultCamelContext();
             if (remoteUrl == null) {
@@ -118,6 +120,8 @@ public class CamelSinkTask extends SinkTask {
                 .withIdempotentRepositoryType(idempotentRepositoryType)
                 
.withIdempotentRepositoryTopicName(idempotentRepositoryKafkaTopic)
                 
.withIdempotentRepositoryKafkaServers(idempotentRepositoryBootstrapServers)
+                
.withIdempotentRepositoryKafkaMaxCacheSize(idempotentRepositoryKafkaMaxCacheSize)
+                
.withIdempotentRepositoryKafkaPollDuration(idempotentRepositoryKafkaPollDuration)
                 .build(camelContext);
 
 
diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
index edc8e41..5d93e50 100644
--- 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
+++ 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
@@ -100,7 +100,9 @@ public class CamelSourceConnectorConfig extends 
CamelConnectorConfig {
         .define(CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF, Type.INT, 
CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DEFAULT, Importance.LOW, 
CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DOC)
         .define(CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_CONF, Type.STRING, 
CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_DEFAULT, Importance.LOW, 
CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_DOC)
         .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_CONF, Type.STRING, 
CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_DEFAULT, Importance.LOW, 
CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_DOC)
-        .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF, 
Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DEFAULT, 
Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DOC);
+        .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF, 
Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DEFAULT, 
Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DOC)
+        .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_CONF, 
Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_DEFAULT, 
Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_DOC)
+        .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF, 
Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DEFAULT, 
Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DOC);
     
     public CamelSourceConnectorConfig(ConfigDef config, Map<String, String> 
parsedConfig) {
         super(config, parsedConfig);
diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index f196c25..7848af4 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -98,6 +98,8 @@ public class CamelSourceTask extends SourceTask {
             final String idempotentRepositoryType = 
config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_CONF);
             final String idempotentRepositoryKafkaTopic = 
config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_CONF);
             final String idempotentRepositoryBootstrapServers = 
config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF);
+            final int idempotentRepositoryKafkaMaxCacheSize = 
config.getInt(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_CONF);
+            final int idempotentRepositoryKafkaPollDuration = 
config.getInt(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF);
             
             topics = 
config.getString(CamelSourceConnectorConfig.TOPIC_CONF).split(",");
 
@@ -127,6 +129,8 @@ public class CamelSourceTask extends SourceTask {
                 .withIdempotentRepositoryType(idempotentRepositoryType)
                 
.withIdempotentRepositoryTopicName(idempotentRepositoryKafkaTopic)
                 
.withIdempotentRepositoryKafkaServers(idempotentRepositoryBootstrapServers)
+                
.withIdempotentRepositoryKafkaMaxCacheSize(idempotentRepositoryKafkaMaxCacheSize)
+                
.withIdempotentRepositoryKafkaPollDuration(idempotentRepositoryKafkaPollDuration)
                 .build(camelContext);
 
             consumer = 
cms.getCamelContext().getEndpoint(localUrl).createPollingConsumer();
diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
 
b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
index c926bbc..982d7d7 100644
--- 
a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
+++ 
b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
@@ -104,6 +104,8 @@ public class CamelKafkaConnectMain extends SimpleMain {
         private String idempotentRepositoryType;
         private String idempotentRepositoryTopicName;
         private String idempotentRepositoryKafkaServers;
+        private int idempotentRepositoryKafkaMaxCacheSize;
+        private int idempotentRepositoryKafkaPollDuration;
 
         public Builder(String from, String to) {
             this.from = from;
@@ -184,6 +186,16 @@ public class CamelKafkaConnectMain extends SimpleMain {
             this.idempotentRepositoryKafkaServers = 
idempotentRepositoryKafkaServers;
             return this;
         }
+        
+        public Builder withIdempotentRepositoryKafkaMaxCacheSize(int 
idempotentRepositoryKafkaMaxCacheSize) {
+            this.idempotentRepositoryKafkaMaxCacheSize = 
idempotentRepositoryKafkaMaxCacheSize;
+            return this;
+        }
+        
+        public Builder withIdempotentRepositoryKafkaPollDuration(int 
idempotentRepositoryKafkaPollDuration) {
+            this.idempotentRepositoryKafkaPollDuration = 
idempotentRepositoryKafkaPollDuration;
+            return this;
+        }
 
         public CamelKafkaConnectMain build(CamelContext camelContext) {
             CamelKafkaConnectMain camelMain = new 
CamelKafkaConnectMain(camelContext);
@@ -203,7 +215,7 @@ public class CamelKafkaConnectMain extends SimpleMain {
                         idempotentRepo = 
MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension);
                         break;
                     case "kafka":
-                        idempotentRepo = new 
KafkaIdempotentRepository(idempotentRepositoryTopicName, 
idempotentRepositoryKafkaServers);
+                        idempotentRepo = new 
KafkaIdempotentRepository(idempotentRepositoryTopicName, 
idempotentRepositoryKafkaServers, idempotentRepositoryKafkaMaxCacheSize, 
idempotentRepositoryKafkaPollDuration);
                         break;
                     default:
                         break;

Reply via email to