This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch idempotency in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 8cd6e2cef43d4951f1c2895b5a2c8a1d10ca6e6e Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Tue Nov 17 21:51:54 2020 +0100 Fixed CS --- .../camel/kafkaconnector/CamelSourceTask.java | 1 + .../utils/CamelKafkaConnectMain.java | 33 ++++++------ .../camel/kafkaconnector/CamelSinkTaskTest.java | 1 - .../camel/kafkaconnector/CamelSourceTaskTest.java | 59 +++++++++------------- 4 files changed, 43 insertions(+), 51 deletions(-) diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java index e32d1c2..4e893ab 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java @@ -95,6 +95,7 @@ public class CamelSourceTask extends SourceTask { final String expressionType = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF); final String expressionHeader = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF); final int memoryDimension = config.getInt(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF); + topics = config.getString(CamelSourceConnectorConfig.TOPIC_CONF).split(","); String localUrl = getLocalUrlWithPollingOptions(config); diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java index d0695bd..4954ed9 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java @@ -146,23 +146,23 @@ public class CamelKafkaConnectMain extends SimpleMain { } public Builder withIdempotencyEnabled(boolean idempotencyEnabled) { - this.idempotencyEnabled = idempotencyEnabled; - return this; + this.idempotencyEnabled = idempotencyEnabled; + return this; } public Builder withExpressionType(String expressionType) { - this.expressionType = expressionType; - return this; + this.expressionType = expressionType; + return this; } public Builder withExpressionHeader(String expressionHeader) { - this.expressionHeader = expressionHeader; - return this; + this.expressionHeader = expressionHeader; + return this; } public Builder withMemoryDimension(int memoryDimension) { - this.memoryDimension = memoryDimension; - return this; + this.memoryDimension = memoryDimension; + return this; } public CamelKafkaConnectMain build(CamelContext camelContext) { @@ -211,14 +211,17 @@ public class CamelKafkaConnectMain extends SimpleMain { if (idempotencyEnabled) { switch (expressionType) { case "body": - LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({}).idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository({}))", s, aggregationSize, aggregationTimeout, memoryDimension); + LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({}).idempotentConsumer(body(), + " + + "MemoryIdempotentRepository.memoryIdempotentRepository({}))", s, aggregationSize, aggregationTimeout, memoryDimension); LOG.info(".to({})", to); - rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to); + rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to); break; case "header": - LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({}).idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository({}))", s, aggregationSize, aggregationTimeout, memoryDimension); + LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({}).idempotentConsumer(header(expressionHeader), + " + + "MemoryIdempotentRepository.memoryIdempotentRepository({}))", s, aggregationSize, aggregationTimeout, memoryDimension); LOG.info(".to({})", to); - rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to); + rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout) + .idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to); break; default: break; @@ -232,11 +235,11 @@ public class CamelKafkaConnectMain extends SimpleMain { if (idempotencyEnabled) { switch (expressionType) { case "body": - LOG.info("idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository({})).to({})", memoryDimension, to); - rd.idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to); + LOG.info("idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository({})).to({})", memoryDimension, to); + rd.idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to); break; case "header": - LOG.info("idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository({})).to({})", memoryDimension, to); + LOG.info("idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository({})).to({})", memoryDimension, to); rd.idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to); break; default: diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java index 9472206..96e7eb6 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java @@ -32,7 +32,6 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.errors.ConnectException; -import org.apache.kafka.connect.header.Header; import org.apache.kafka.connect.sink.SinkRecord; import org.junit.jupiter.api.Test; diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java index ea7bb1b..c6fa7eb 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java @@ -441,19 +441,16 @@ public class CamelSourceTaskTest { public void testSourcePollingWithIdempotencyEnabledAndBody() { CamelSourceTask sourceTask = new CamelSourceTask(); - sourceTask.start(mapOf( - CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME, - CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI, - CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, true, - CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "body" - )); + sourceTask.start(mapOf(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME, CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI, + CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, true, + CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "body")); try { - sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, "Test"); - sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, "Test1"); - sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, "Test"); - sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, "Test2"); + sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, "Test"); + sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, "Test1"); + sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, "Test"); + sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, "Test2"); List<SourceRecord> records = sourceTask.poll(); @@ -465,25 +462,22 @@ public class CamelSourceTaskTest { sourceTask.stop(); } } - + @Test public void testSourcePollingWithIdempotencyEnabledAndHeader() { CamelSourceTask sourceTask = new CamelSourceTask(); - sourceTask.start(mapOf( - CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME, - CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI, - CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, true, - CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "header", - CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF, "headerIdempotency" - )); + sourceTask + .start(mapOf(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME, CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI, + CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, true, CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, + "header", CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF, "headerIdempotency")); try { - sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "Test", "headerIdempotency", "Test"); - sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "Test1", "headerIdempotency", "Test1"); - sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "TestTest", "headerIdempotency", "Test"); - sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "Test2", "headerIdempotency", "Test2"); + sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "Test", "headerIdempotency", "Test"); + sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "Test1", "headerIdempotency", "Test1"); + sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "TestTest", "headerIdempotency", "Test"); + sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "Test2", "headerIdempotency", "Test2"); List<SourceRecord> records = sourceTask.poll(); @@ -495,26 +489,21 @@ public class CamelSourceTaskTest { sourceTask.stop(); } } - + @Test public void testSourcePollingWithAggregationAndIdempotencyBySizeAndTimeout() { final int chunkSize = 2; CamelSourceTask sourceTask = new CamelSourceTask(); - sourceTask.start(mapOf( - CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME, - CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI, - CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, true, - CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "body", - CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF, "#class:org.apache.camel.kafkaconnector.utils.StringJoinerAggregator", - CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF + ".delimiter", "|", - CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, chunkSize - )); + sourceTask + .start(mapOf(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME, CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI, + CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, true, CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, + "body", CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF, "#class:org.apache.camel.kafkaconnector.utils.StringJoinerAggregator", + CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF + ".delimiter", "|", CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, chunkSize)); try { assertThat(sourceTask.getCms().getCamelContext().getRegistry().lookupByName(CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME)) - .isInstanceOf(StringJoinerAggregator.class) - .hasFieldOrPropertyWithValue("delimiter", "|"); + .isInstanceOf(StringJoinerAggregator.class).hasFieldOrPropertyWithValue("delimiter", "|"); sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, 0); sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, 1); @@ -524,7 +513,7 @@ public class CamelSourceTaskTest { sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, 1); sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, 3); sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, 2); - + List<SourceRecord> records = sourceTask.poll(); assertThat(records).hasSize(3);