This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit b120dabac70faa2f0b6dca0d6c399e432ed8673e Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Tue Dec 1 18:08:39 2020 +0100 Support Poll timeout and key cache size as option for kafka idempotent repository --- .../apache/camel/kafkaconnector/CamelConnectorConfig.java | 8 ++++++++ .../camel/kafkaconnector/CamelSinkConnectorConfig.java | 5 ++++- .../org/apache/camel/kafkaconnector/CamelSinkTask.java | 4 ++++ .../camel/kafkaconnector/CamelSourceConnectorConfig.java | 4 +++- .../org/apache/camel/kafkaconnector/CamelSourceTask.java | 4 ++++ .../camel/kafkaconnector/utils/CamelKafkaConnectMain.java | 14 +++++++++++++- 6 files changed, 36 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java index 42c9a85..1b5e014 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java @@ -75,6 +75,14 @@ public abstract class CamelConnectorConfig extends AbstractConfig { public static final String CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF = "camel.idempotency.kafka.bootstrap.servers"; public static final String CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DOC = "A comma-separated list of host and port pairs that are the addresses of the Kafka brokers where the idempotent repository should live"; + public static final int CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_DEFAULT = 1000; + public static final String CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_CONF = "camel.idempotency.kafka.max.cache.size"; + public static final String CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_DOC = "Sets the maximum size of the local key cache"; + + public static final int CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DEFAULT = 100; + public static final String CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF = "camel.idempotency.kafka.poll.duration.ms"; + public static final String CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DOC = "Sets the poll duration (in milliseconds) of the Kafka consumer"; + protected CamelConnectorConfig(ConfigDef definition, Map<?, ?> originals, Map<String, ?> configProviderProps, boolean doLog) { super(definition, originals, configProviderProps, doLog); } diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java index 26f9a6f..3d922fb 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java @@ -64,7 +64,10 @@ public class CamelSinkConnectorConfig extends CamelConnectorConfig { .define(CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DOC) .define(CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_DOC) .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_DOC) - .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DOC); + .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DOC) + .define(CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_DOC) + .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_DOC) + .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DOC); public CamelSinkConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) { super(config, parsedConfig); diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java index 21b6aa4..e4c9341 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java @@ -92,6 +92,8 @@ public class CamelSinkTask extends SinkTask { final String idempotentRepositoryType = config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_CONF); final String idempotentRepositoryKafkaTopic = config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_CONF); final String idempotentRepositoryBootstrapServers = config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF); + final int idempotentRepositoryKafkaMaxCacheSize = config.getInt(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_CONF); + final int idempotentRepositoryKafkaPollDuration = config.getInt(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF); CamelContext camelContext = new DefaultCamelContext(); if (remoteUrl == null) { @@ -118,6 +120,8 @@ public class CamelSinkTask extends SinkTask { .withIdempotentRepositoryType(idempotentRepositoryType) .withIdempotentRepositoryTopicName(idempotentRepositoryKafkaTopic) .withIdempotentRepositoryKafkaServers(idempotentRepositoryBootstrapServers) + .withIdempotentRepositoryKafkaMaxCacheSize(idempotentRepositoryKafkaMaxCacheSize) + .withIdempotentRepositoryKafkaPollDuration(idempotentRepositoryKafkaPollDuration) .build(camelContext); diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java index edc8e41..5d93e50 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java @@ -100,7 +100,9 @@ public class CamelSourceConnectorConfig extends CamelConnectorConfig { .define(CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DOC) .define(CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_DOC) .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_DOC) - .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DOC); + .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DOC) + .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_DOC) + .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DOC); public CamelSourceConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) { super(config, parsedConfig); diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java index f196c25..7848af4 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java @@ -98,6 +98,8 @@ public class CamelSourceTask extends SourceTask { final String idempotentRepositoryType = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_CONF); final String idempotentRepositoryKafkaTopic = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_CONF); final String idempotentRepositoryBootstrapServers = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF); + final int idempotentRepositoryKafkaMaxCacheSize = config.getInt(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_CONF); + final int idempotentRepositoryKafkaPollDuration = config.getInt(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF); topics = config.getString(CamelSourceConnectorConfig.TOPIC_CONF).split(","); @@ -127,6 +129,8 @@ public class CamelSourceTask extends SourceTask { .withIdempotentRepositoryType(idempotentRepositoryType) .withIdempotentRepositoryTopicName(idempotentRepositoryKafkaTopic) .withIdempotentRepositoryKafkaServers(idempotentRepositoryBootstrapServers) + .withIdempotentRepositoryKafkaMaxCacheSize(idempotentRepositoryKafkaMaxCacheSize) + .withIdempotentRepositoryKafkaPollDuration(idempotentRepositoryKafkaPollDuration) .build(camelContext); consumer = cms.getCamelContext().getEndpoint(localUrl).createPollingConsumer(); diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java index c926bbc..982d7d7 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java @@ -104,6 +104,8 @@ public class CamelKafkaConnectMain extends SimpleMain { private String idempotentRepositoryType; private String idempotentRepositoryTopicName; private String idempotentRepositoryKafkaServers; + private int idempotentRepositoryKafkaMaxCacheSize; + private int idempotentRepositoryKafkaPollDuration; public Builder(String from, String to) { this.from = from; @@ -184,6 +186,16 @@ public class CamelKafkaConnectMain extends SimpleMain { this.idempotentRepositoryKafkaServers = idempotentRepositoryKafkaServers; return this; } + + public Builder withIdempotentRepositoryKafkaMaxCacheSize(int idempotentRepositoryKafkaMaxCacheSize) { + this.idempotentRepositoryKafkaMaxCacheSize = idempotentRepositoryKafkaMaxCacheSize; + return this; + } + + public Builder withIdempotentRepositoryKafkaPollDuration(int idempotentRepositoryKafkaPollDuration) { + this.idempotentRepositoryKafkaPollDuration = idempotentRepositoryKafkaPollDuration; + return this; + } public CamelKafkaConnectMain build(CamelContext camelContext) { CamelKafkaConnectMain camelMain = new CamelKafkaConnectMain(camelContext); @@ -203,7 +215,7 @@ public class CamelKafkaConnectMain extends SimpleMain { idempotentRepo = MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension); break; case "kafka": - idempotentRepo = new KafkaIdempotentRepository(idempotentRepositoryTopicName, idempotentRepositoryKafkaServers); + idempotentRepo = new KafkaIdempotentRepository(idempotentRepositoryTopicName, idempotentRepositoryKafkaServers, idempotentRepositoryKafkaMaxCacheSize, idempotentRepositoryKafkaPollDuration); break; default: break;