This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch idempotency in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit ee5d8e96b6c10e893f5f38979a59e8e2ede33b56 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Tue Nov 17 19:40:15 2020 +0100 Added Memory Idempotency support for both sink and source --- .../camel/kafkaconnector/CamelConnectorConfig.java | 16 ++ .../kafkaconnector/CamelSinkConnectorConfig.java | 8 +- .../apache/camel/kafkaconnector/CamelSinkTask.java | 9 ++ .../kafkaconnector/CamelSourceConnectorConfig.java | 8 +- .../camel/kafkaconnector/CamelSourceTask.java | 9 +- .../utils/CamelKafkaConnectMain.java | 69 ++++++++- .../camel/kafkaconnector/CamelSinkTaskTest.java | 172 +++++++++++++++++++++ .../camel/kafkaconnector/CamelSourceTaskTest.java | 99 ++++++++++++ 8 files changed, 379 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java index 4271938..0332576 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java @@ -47,6 +47,22 @@ public abstract class CamelConnectorConfig extends AbstractConfig { public static final String CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF = "camel.error.handler.redelivery.delay"; public static final String CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DOC = "The initial redelivery delay in milliseconds in case of Default Error Handler"; + public static final Boolean CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_DEFAULT = false; + public static final String CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF = "camel.idempotency.enabled"; + public static final String CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_DOC = "If in memory idempotency must be enabled or not"; + + public static final String CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_DEFAULT = "body"; + public static final String CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF = "camel.idempotency.expression.type"; + public static final String CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_DOC = "How the idempotency will be evaluated: possible values are body and header"; + + public static final String CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_DEFAULT = null; + public static final String CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF = "camel.idempotency.expression.header"; + public static final String CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_DOC = "The header name that will be evaluated in case of camel.idempotency.expression.type equals to header"; + + public static final int CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DEFAULT = 100; + public static final String CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF = "camel.idempotency.memory.dimension"; + public static final String CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DOC = "The Memory dimension of the in memory idempotent Repository"; + protected CamelConnectorConfig(ConfigDef definition, Map<?, ?> originals, Map<String, ?> configProviderProps, boolean doLog) { super(definition, originals, configProviderProps, doLog); } diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java index e86e921..299d578 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java @@ -57,8 +57,12 @@ public class CamelSinkConnectorConfig extends CamelConnectorConfig { .define(CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, Type.LONG, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DOC) .define(CAMEL_CONNECTOR_ERROR_HANDLER_CONF, Type.STRING, CAMEL_CONNECTOR_ERROR_HANDLER_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_ERROR_HANDLER_DOC) .define(CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_CONF, Type.INT, CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DOC) - .define(CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF, Type.LONG, CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DOC); - + .define(CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF, Type.LONG, CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DOC) + .define(CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, Type.BOOLEAN, CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_DOC) + .define(CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_DOC) + .define(CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_DOC) + .define(CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DOC); + public CamelSinkConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) { super(config, parsedConfig); } diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java index a38afea..2cc01a2 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java @@ -85,6 +85,11 @@ public class CamelSinkTask extends SinkTask { final int maxRedeliveries = config.getInt(CamelSinkConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_CONF); final long redeliveryDelay = config.getLong(CamelSinkConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF); final String errorHandler = config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_CONF); + final Boolean idempotencyEnabled = config.getBoolean(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF); + final String expressionType = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF); + final String expressionHeader = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF); + final int memoryDimension = config.getInt(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF); + CamelContext camelContext = new DefaultCamelContext(); if (remoteUrl == null) { remoteUrl = TaskHelper.buildUrl(camelContext, @@ -103,6 +108,10 @@ public class CamelSinkTask extends SinkTask { .withErrorHandler(errorHandler) .withMaxRedeliveries(maxRedeliveries) .withRedeliveryDelay(redeliveryDelay) + .withIdempotencyEnabled(idempotencyEnabled) + .withExpressionType(expressionType) + .withExpressionHeader(expressionHeader) + .withMemoryDimension(memoryDimension) .build(camelContext); diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java index 2ecfb45..70e2b09 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java @@ -93,8 +93,12 @@ public class CamelSourceConnectorConfig extends CamelConnectorConfig { .define(CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, Type.LONG, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DOC) .define(CAMEL_CONNECTOR_ERROR_HANDLER_CONF, Type.STRING, CAMEL_CONNECTOR_ERROR_HANDLER_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_ERROR_HANDLER_DOC) .define(CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_CONF, Type.INT, CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DOC) - .define(CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF, Type.LONG, CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DOC); - + .define(CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF, Type.LONG, CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DOC) + .define(CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, Type.BOOLEAN, CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_DOC) + .define(CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_DOC) + .define(CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_DOC) + .define(CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DOC); + public CamelSourceConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) { super(config, parsedConfig); } diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java index 165ffff..e32d1c2 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java @@ -91,7 +91,10 @@ public class CamelSourceTask extends SourceTask { final int maxRedeliveries = config.getInt(CamelSinkConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_CONF); final long redeliveryDelay = config.getLong(CamelSinkConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF); final String errorHandler = config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_CONF); - + final Boolean idempotencyEnabled = config.getBoolean(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF); + final String expressionType = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF); + final String expressionHeader = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF); + final int memoryDimension = config.getInt(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF); topics = config.getString(CamelSourceConnectorConfig.TOPIC_CONF).split(","); String localUrl = getLocalUrlWithPollingOptions(config); @@ -113,6 +116,10 @@ public class CamelSourceTask extends SourceTask { .withErrorHandler(errorHandler) .withMaxRedeliveries(maxRedeliveries) .withRedeliveryDelay(redeliveryDelay) + .withIdempotencyEnabled(idempotencyEnabled) + .withExpressionType(expressionType) + .withExpressionHeader(expressionHeader) + .withMemoryDimension(memoryDimension) .build(camelContext); consumer = cms.getCamelContext().getEndpoint(localUrl).createPollingConsumer(); diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java index 7a43f0f..d0695bd 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java @@ -28,6 +28,7 @@ import org.apache.camel.builder.RouteBuilder; import org.apache.camel.kafkaconnector.CamelConnectorConfig; import org.apache.camel.main.SimpleMain; import org.apache.camel.model.RouteDefinition; +import org.apache.camel.support.processor.idempotent.MemoryIdempotentRepository; import org.apache.camel.support.service.ServiceHelper; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; @@ -94,6 +95,10 @@ public class CamelKafkaConnectMain extends SimpleMain { private String errorHandler; private int maxRedeliveries; private long redeliveryDelay; + private boolean idempotencyEnabled; + private String expressionType; + private String expressionHeader; + private int memoryDimension; public Builder(String from, String to) { this.from = from; @@ -139,6 +144,26 @@ public class CamelKafkaConnectMain extends SimpleMain { this.redeliveryDelay = redeliveryDelay; return this; } + + public Builder withIdempotencyEnabled(boolean idempotencyEnabled) { + this.idempotencyEnabled = idempotencyEnabled; + return this; + } + + public Builder withExpressionType(String expressionType) { + this.expressionType = expressionType; + return this; + } + + public Builder withExpressionHeader(String expressionHeader) { + this.expressionHeader = expressionHeader; + return this; + } + + public Builder withMemoryDimension(int memoryDimension) { + this.memoryDimension = memoryDimension; + return this; + } public CamelKafkaConnectMain build(CamelContext camelContext) { CamelKafkaConnectMain camelMain = new CamelKafkaConnectMain(camelContext); @@ -183,13 +208,45 @@ public class CamelKafkaConnectMain extends SimpleMain { if (getContext().getRegistry().lookupByName("aggregate") != null) { //aggregation AggregationStrategy s = getContext().getRegistry().lookupByNameAndType(CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME, AggregationStrategy.class); - LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({})", s, aggregationSize, aggregationTimeout); - LOG.info(".to({})", to); - rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).toD(to); + if (idempotencyEnabled) { + switch (expressionType) { + case "body": + LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({}).idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository({}))", s, aggregationSize, aggregationTimeout, memoryDimension); + LOG.info(".to({})", to); + rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to); + break; + case "header": + LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({}).idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository({}))", s, aggregationSize, aggregationTimeout, memoryDimension); + LOG.info(".to({})", to); + rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to); + break; + default: + break; + } + } else { + LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({})", s, aggregationSize, aggregationTimeout); + LOG.info(".to({})", to); + rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).toD(to); + } } else { - //to - LOG.info(".to({})", to); - rd.toD(to); + if (idempotencyEnabled) { + switch (expressionType) { + case "body": + LOG.info("idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository({})).to({})", memoryDimension, to); + rd.idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to); + break; + case "header": + LOG.info("idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository({})).to({})", memoryDimension, to); + rd.idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to); + break; + default: + break; + } + } else { + //to + LOG.info(".to({})", to); + rd.toD(to); + } } } }); diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java index 6559456..9472206 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java @@ -29,8 +29,10 @@ import org.apache.camel.Exchange; import org.apache.camel.LoggingLevel; import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.header.Header; import org.apache.kafka.connect.sink.SinkRecord; import org.junit.jupiter.api.Test; @@ -663,6 +665,176 @@ public class CamelSinkTaskTest { sinkTask.stop(); } + + @Test + public void testAggregationWithIdempotencyBodyAndTimeout() throws InterruptedException { + Map<String, String> props = new HashMap<>(); + props.put(TOPIC_CONF, TOPIC_NAME); + props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI); + props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF, "#class:org.apache.camel.kafkaconnector.utils.SampleAggregator"); + props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, "5"); + props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, "100"); + props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, "true"); + props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "body"); + CamelSinkTask sinkTask = new CamelSinkTask(); + sinkTask.start(props); + + List<SinkRecord> records = new ArrayList<SinkRecord>(); + SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42); + SinkRecord record1 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel1", 42); + SinkRecord record2 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel2", 42); + SinkRecord record3 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel3", 42); + SinkRecord record4 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel4", 42); + SinkRecord record5 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42); + SinkRecord record6 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel1", 42); + SinkRecord record7 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel2", 42); + SinkRecord record8 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel3", 42); + SinkRecord record9 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel4", 42); + SinkRecord record10 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel5", 42); + SinkRecord record11 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel6", 42); + SinkRecord record12 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel7", 42); + SinkRecord record13 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel8", 42); + SinkRecord record14 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel9", 42); + records.add(record); + records.add(record1); + records.add(record2); + records.add(record3); + records.add(record4); + records.add(record5); + records.add(record6); + records.add(record7); + records.add(record8); + records.add(record9); + records.add(record10); + records.add(record11); + records.add(record12); + records.add(record13); + records.add(record14); + + sinkTask.put(records); + + ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate(); + Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT); + assertEquals("camel camel1 camel2 camel3 camel4", exchange.getMessage().getBody()); + assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); + assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props) + .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF)); + + exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT); + assertEquals("camel5 camel6 camel7 camel8 camel9", exchange.getMessage().getBody()); + assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); + assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props) + .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF)); + + sinkTask.stop(); + } + + @Test + public void testWithIdempotency() throws InterruptedException { + Map<String, String> props = new HashMap<>(); + props.put(TOPIC_CONF, TOPIC_NAME); + props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI); + props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, "true"); + props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "body"); + CamelSinkTask sinkTask = new CamelSinkTask(); + sinkTask.start(props); + + List<SinkRecord> records = new ArrayList<SinkRecord>(); + SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42); + SinkRecord record1 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42); + SinkRecord record2 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42); + SinkRecord record3 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42); + SinkRecord record4 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel1", 42); + SinkRecord record5 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42); + SinkRecord record6 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42); + SinkRecord record7 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42); + SinkRecord record8 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42); + SinkRecord record9 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel2", 42); + SinkRecord record10 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42); + SinkRecord record11 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42); + SinkRecord record12 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42); + SinkRecord record13 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42); + SinkRecord record14 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42); + records.add(record); + records.add(record1); + records.add(record2); + records.add(record3); + records.add(record4); + records.add(record5); + records.add(record6); + records.add(record7); + records.add(record8); + records.add(record9); + records.add(record10); + records.add(record11); + records.add(record12); + records.add(record13); + records.add(record14); + + sinkTask.put(records); + + ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate(); + Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT); + assertEquals("camel", exchange.getMessage().getBody()); + assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); + assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props) + .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF)); + + exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT); + assertEquals("camel1", exchange.getMessage().getBody()); + assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); + assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props) + .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF)); + + exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT); + assertEquals("camel2", exchange.getMessage().getBody()); + assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); + assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props) + .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF)); + + sinkTask.stop(); + } + + @Test + public void testWithIdempotencyAndHeader() throws InterruptedException { + Map<String, String> props = new HashMap<>(); + props.put(TOPIC_CONF, TOPIC_NAME); + props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI); + props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, "true"); + props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "header"); + props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF, "headerIdempotency"); + CamelSinkTask sinkTask = new CamelSinkTask(); + sinkTask.start(props); + + List<SinkRecord> records = new ArrayList<SinkRecord>(); + SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42); + record.headers().add("CamelHeader.headerIdempotency", new SchemaAndValue(Schema.STRING_SCHEMA, "Test")); + SinkRecord record1 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42); + record1.headers().add("CamelHeader.headerIdempotency", new SchemaAndValue(Schema.STRING_SCHEMA, "Test")); + SinkRecord record2 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel1", 42); + record2.headers().add("CamelHeader.headerIdempotency", new SchemaAndValue(Schema.STRING_SCHEMA, "Test1")); + + records.add(record); + records.add(record1); + records.add(record2); + + sinkTask.put(records); + + ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate(); + Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT); + assertEquals("camel", exchange.getMessage().getBody()); + assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); + assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props) + .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF)); + + exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT); + assertEquals("camel1", exchange.getMessage().getBody()); + assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); + assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props) + .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF)); + + sinkTask.stop(); + } @Test public void testSecretRaw() { diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java index ef393f9..ea7bb1b 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java @@ -436,4 +436,103 @@ public class CamelSourceTaskTest { sourceTask.stop(); } } + + @Test + public void testSourcePollingWithIdempotencyEnabledAndBody() { + + CamelSourceTask sourceTask = new CamelSourceTask(); + sourceTask.start(mapOf( + CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME, + CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI, + CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, true, + CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "body" + )); + + try { + + sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, "Test"); + sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, "Test1"); + sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, "Test"); + sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, "Test2"); + + List<SourceRecord> records = sourceTask.poll(); + + assertThat(records).hasSize(3); + assertThat(records).element(0).hasFieldOrPropertyWithValue("value", "Test"); + assertThat(records).element(1).hasFieldOrPropertyWithValue("value", "Test1"); + assertThat(records).element(2).hasFieldOrPropertyWithValue("value", "Test2"); + } finally { + sourceTask.stop(); + } + } + + @Test + public void testSourcePollingWithIdempotencyEnabledAndHeader() { + + CamelSourceTask sourceTask = new CamelSourceTask(); + sourceTask.start(mapOf( + CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME, + CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI, + CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, true, + CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "header", + CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF, "headerIdempotency" + )); + + try { + + sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "Test", "headerIdempotency", "Test"); + sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "Test1", "headerIdempotency", "Test1"); + sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "TestTest", "headerIdempotency", "Test"); + sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "Test2", "headerIdempotency", "Test2"); + + List<SourceRecord> records = sourceTask.poll(); + + assertThat(records).hasSize(3); + assertThat(records).element(0).hasFieldOrPropertyWithValue("value", "Test"); + assertThat(records).element(1).hasFieldOrPropertyWithValue("value", "Test1"); + assertThat(records).element(2).hasFieldOrPropertyWithValue("value", "Test2"); + } finally { + sourceTask.stop(); + } + } + + @Test + public void testSourcePollingWithAggregationAndIdempotencyBySizeAndTimeout() { + final int chunkSize = 2; + + CamelSourceTask sourceTask = new CamelSourceTask(); + sourceTask.start(mapOf( + CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME, + CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI, + CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, true, + CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "body", + CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF, "#class:org.apache.camel.kafkaconnector.utils.StringJoinerAggregator", + CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF + ".delimiter", "|", + CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, chunkSize + )); + + try { + assertThat(sourceTask.getCms().getCamelContext().getRegistry().lookupByName(CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME)) + .isInstanceOf(StringJoinerAggregator.class) + .hasFieldOrPropertyWithValue("delimiter", "|"); + + sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, 0); + sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, 1); + sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, 2); + sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, 3); + sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, 0); + sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, 1); + sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, 3); + sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, 2); + + List<SourceRecord> records = sourceTask.poll(); + + assertThat(records).hasSize(3); + assertThat(records).element(0).hasFieldOrPropertyWithValue("value", "0|1"); + assertThat(records).element(1).hasFieldOrPropertyWithValue("value", "2|3"); + assertThat(records).element(2).hasFieldOrPropertyWithValue("value", "3|2"); + } finally { + sourceTask.stop(); + } + } }