This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 44b53c27e881475af8cc710498371371f40930b5 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Fri Dec 4 18:06:53 2020 +0100 Add a removeHeaders option on both source and sink --- .../camel/kafkaconnector/CamelConnectorConfig.java | 4 ++ .../kafkaconnector/CamelSinkConnectorConfig.java | 3 +- .../apache/camel/kafkaconnector/CamelSinkTask.java | 2 + .../kafkaconnector/CamelSourceConnectorConfig.java | 3 +- .../camel/kafkaconnector/CamelSourceTask.java | 2 + .../utils/CamelKafkaConnectMain.java | 45 ++++++++++++++++---- .../camel/kafkaconnector/CamelSinkTaskTest.java | 49 ++++++++++++++++++++++ .../camel/kafkaconnector/CamelSourceTaskTest.java | 34 +++++++++++++++ 8 files changed, 133 insertions(+), 9 deletions(-) diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java index 1b5e014..00c3ce4 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java @@ -39,6 +39,10 @@ public abstract class CamelConnectorConfig extends AbstractConfig { public static final String CAMEL_CONNECTOR_ERROR_HANDLER_CONF = "camel.error.handler"; public static final String CAMEL_CONNECTOR_ERROR_HANDLER_DOC = "The error handler to use: possible value are 'no' or 'default'"; + public static final String CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_DEFAULT = ""; + public static final String CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF = "camel.remove.headers.pattern"; + public static final String CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_DOC = "The pattern of the headers we want to exclude from the exchange"; + public static final int CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DEFAULT = 0; public static final String CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_CONF = "camel.error.handler.max.redeliveries"; public static final String CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DOC = "The maximum redeliveries to be use in case of Default Error Handler"; diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java index bf766b7..6350fe9 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java @@ -66,7 +66,8 @@ public class CamelSinkConnectorConfig extends CamelConnectorConfig { .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_DOC) .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DOC) .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_DOC) - .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DOC); + .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DOC) + .define(CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF, Type.STRING, CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_DOC); public CamelSinkConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) { super(config, parsedConfig); diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java index e4c9341..5b5e0d5 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java @@ -94,6 +94,7 @@ public class CamelSinkTask extends SinkTask { final String idempotentRepositoryBootstrapServers = config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF); final int idempotentRepositoryKafkaMaxCacheSize = config.getInt(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_CONF); final int idempotentRepositoryKafkaPollDuration = config.getInt(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF); + final String headersRemovePattern = config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF); CamelContext camelContext = new DefaultCamelContext(); if (remoteUrl == null) { @@ -122,6 +123,7 @@ public class CamelSinkTask extends SinkTask { .withIdempotentRepositoryKafkaServers(idempotentRepositoryBootstrapServers) .withIdempotentRepositoryKafkaMaxCacheSize(idempotentRepositoryKafkaMaxCacheSize) .withIdempotentRepositoryKafkaPollDuration(idempotentRepositoryKafkaPollDuration) + .withHeadersExcludePattern(headersRemovePattern) .build(camelContext); diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java index 5d93e50..a703b18 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java @@ -102,7 +102,8 @@ public class CamelSourceConnectorConfig extends CamelConnectorConfig { .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_DOC) .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DOC) .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_DOC) - .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DOC); + .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DOC) + .define(CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF, Type.STRING, CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_DOC); public CamelSourceConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) { super(config, parsedConfig); diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java index 7848af4..94125d5 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java @@ -100,6 +100,7 @@ public class CamelSourceTask extends SourceTask { final String idempotentRepositoryBootstrapServers = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF); final int idempotentRepositoryKafkaMaxCacheSize = config.getInt(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_CONF); final int idempotentRepositoryKafkaPollDuration = config.getInt(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF); + final String headersRemovePattern = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF); topics = config.getString(CamelSourceConnectorConfig.TOPIC_CONF).split(","); @@ -131,6 +132,7 @@ public class CamelSourceTask extends SourceTask { .withIdempotentRepositoryKafkaServers(idempotentRepositoryBootstrapServers) .withIdempotentRepositoryKafkaMaxCacheSize(idempotentRepositoryKafkaMaxCacheSize) .withIdempotentRepositoryKafkaPollDuration(idempotentRepositoryKafkaPollDuration) + .withHeadersExcludePattern(headersRemovePattern) .build(camelContext); consumer = cms.getCamelContext().getEndpoint(localUrl).createPollingConsumer(); diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java index 982d7d7..7d3e7d7 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java @@ -106,6 +106,7 @@ public class CamelKafkaConnectMain extends SimpleMain { private String idempotentRepositoryKafkaServers; private int idempotentRepositoryKafkaMaxCacheSize; private int idempotentRepositoryKafkaPollDuration; + private String headersExcludePattern; public Builder(String from, String to) { this.from = from; @@ -196,6 +197,11 @@ public class CamelKafkaConnectMain extends SimpleMain { this.idempotentRepositoryKafkaPollDuration = idempotentRepositoryKafkaPollDuration; return this; } + + public Builder withHeadersExcludePattern(String headersExcludePattern) { + this.headersExcludePattern = headersExcludePattern; + return this; + } public CamelKafkaConnectMain build(CamelContext camelContext) { CamelKafkaConnectMain camelMain = new CamelKafkaConnectMain(camelContext); @@ -261,14 +267,23 @@ public class CamelKafkaConnectMain extends SimpleMain { LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({}).idempotentConsumer(body(), + " + "MemoryIdempotentRepository.memoryIdempotentRepository({}))", s, aggregationSize, aggregationTimeout, memoryDimension); LOG.info(".to({})", to); - rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).idempotentConsumer(body()).messageIdRepositoryRef("idempotentRepository").toD(to); + if (ObjectHelper.isEmpty(headersExcludePattern)) { + rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).idempotentConsumer(body()).messageIdRepositoryRef("idempotentRepository").toD(to); + } else { + rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).idempotentConsumer(body()).messageIdRepositoryRef("idempotentRepository").removeHeaders(headersExcludePattern).toD(to); + } break; case "header": LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({}).idempotentConsumer(header(expressionHeader), + " + "MemoryIdempotentRepository.memoryIdempotentRepository({}))", s, aggregationSize, aggregationTimeout, memoryDimension); LOG.info(".to({})", to); - rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout) - .idempotentConsumer(header(expressionHeader)).messageIdRepositoryRef("idempotentRepository").toD(to); + if (ObjectHelper.isEmpty(headersExcludePattern)) { + rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout) + .idempotentConsumer(header(expressionHeader)).messageIdRepositoryRef("idempotentRepository").toD(to); + } else { + rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout) + .idempotentConsumer(header(expressionHeader)).messageIdRepositoryRef("idempotentRepository").removeHeaders(headersExcludePattern).toD(to); + } break; default: break; @@ -276,18 +291,30 @@ public class CamelKafkaConnectMain extends SimpleMain { } else { LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({})", s, aggregationSize, aggregationTimeout); LOG.info(".to({})", to); - rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).toD(to); + if (ObjectHelper.isEmpty(headersExcludePattern)) { + rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).toD(to); + } else { + rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).removeHeaders(headersExcludePattern).toD(to); + } } } else { if (idempotencyEnabled) { switch (expressionType) { case "body": LOG.info("idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository({})).to({})", memoryDimension, to); - rd.idempotentConsumer(body()).messageIdRepositoryRef("idempotentRepository").toD(to); + if (ObjectHelper.isEmpty(headersExcludePattern)) { + rd.idempotentConsumer(body()).messageIdRepositoryRef("idempotentRepository").toD(to); + } else { + rd.idempotentConsumer(body()).messageIdRepositoryRef("idempotentRepository").removeHeaders(headersExcludePattern).toD(to); + } break; case "header": LOG.info("idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository({})).to({})", memoryDimension, to); - rd.idempotentConsumer(header(expressionHeader)).messageIdRepositoryRef("idempotentRepository").toD(to); + if (ObjectHelper.isEmpty(headersExcludePattern)) { + rd.idempotentConsumer(header(expressionHeader)).messageIdRepositoryRef("idempotentRepository").toD(to); + } else { + rd.idempotentConsumer(header(expressionHeader)).messageIdRepositoryRef("idempotentRepository").removeHeaders(headersExcludePattern).toD(to); + } break; default: break; @@ -295,7 +322,11 @@ public class CamelKafkaConnectMain extends SimpleMain { } else { //to LOG.info(".to({})", to); - rd.toD(to); + if (ObjectHelper.isEmpty(headersExcludePattern)) { + rd.toD(to); + } else { + rd.removeHeaders(headersExcludePattern).toD(to); + } } } } diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java index 96e7eb6..74ebd32 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java @@ -37,6 +37,7 @@ import org.junit.jupiter.api.Test; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -146,6 +147,54 @@ public class CamelSinkTaskTest { sinkTask.stop(); } + + @Test + public void testBodyAndHeadersExclusions() { + Map<String, String> props = new HashMap<>(); + props.put(TOPIC_CONF, TOPIC_NAME); + props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI); + props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF, "MyBoolean" + "|" + "MyShort"); + + CamelSinkTask sinkTask = new CamelSinkTask(); + sinkTask.start(props); + + Byte myByte = new Byte("100"); + Float myFloat = new Float("100"); + Short myShort = new Short("100"); + Double myDouble = new Double("100"); + int myInteger = 100; + Long myLong = new Long("100"); + BigDecimal myBigDecimal = new BigDecimal(1234567890); + Schema schema = Decimal.schema(myBigDecimal.scale()); + + List<SinkRecord> records = new ArrayList<SinkRecord>(); + SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42); + record.headers().addBoolean(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyBoolean", true); + record.headers().addByte(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyByte", myByte); + record.headers().addFloat(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyFloat", myFloat); + record.headers().addShort(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyShort", myShort); + record.headers().addDouble(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyDouble", myDouble); + record.headers().addInt(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyInteger", myInteger); + record.headers().addLong(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyLong", myLong); + record.headers().add(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyBigDecimal", Decimal.fromLogical(schema, myBigDecimal), schema); + records.add(record); + sinkTask.put(records); + + ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate(); + Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT); + assertEquals("camel", exchange.getMessage().getBody()); + assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); + assertNull(exchange.getIn().getHeader("MyBoolean", Boolean.class)); + assertEquals(myByte, exchange.getIn().getHeader("MyByte", Byte.class)); + assertEquals(myFloat, exchange.getIn().getHeader("MyFloat", Float.class)); + assertNull(exchange.getIn().getHeader("MyShort", Short.class)); + assertEquals(myDouble, exchange.getIn().getHeader("MyDouble", Double.class)); + assertEquals(myInteger, exchange.getIn().getHeader("MyInteger")); + assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class)); + assertEquals(myBigDecimal, exchange.getIn().getHeader("MyBigDecimal", BigDecimal.class)); + + sinkTask.stop(); + } @Test public void testBodyAndProperties() { diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java index 2a85664..de611a0 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java @@ -20,6 +20,7 @@ import java.math.BigDecimal; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -31,12 +32,14 @@ import org.apache.camel.kafkaconnector.utils.StringJoinerAggregator; import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.header.Header; +import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.source.SourceRecord; import org.junit.jupiter.api.Test; import static org.apache.camel.util.CollectionHelper.mapOf; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -526,4 +529,35 @@ public class CamelSourceTaskTest { sourceTask.stop(); } } + + @Test + public void testSourcePollingWithIdempotencyEnabledAndHeaderExclusion() { + + CamelSourceTask sourceTask = new CamelSourceTask(); + sourceTask + .start(mapOf(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME, CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI, + CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, true, CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, + "header", CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF, "headerIdempotency", + CamelSourceConnectorConfig.CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF, "headerIdempotency")); + + try { + + sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "Test", "headerIdempotency", "Test"); + sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "Test1", "headerIdempotency", "Test1"); + sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "TestTest", "headerIdempotency", "Test"); + sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "Test2", "headerIdempotency", "Test2"); + + List<SourceRecord> records = sourceTask.poll(); + + assertThat(records).hasSize(3); + assertThat(records).element(0).hasFieldOrPropertyWithValue("value", "Test"); + assertThat(records).element(1).hasFieldOrPropertyWithValue("value", "Test1"); + assertThat(records).element(2).hasFieldOrPropertyWithValue("value", "Test2"); + assertFalse(records.get(0).headers().allWithName("headerIdempotency").hasNext()); + assertFalse(records.get(1).headers().allWithName("headerIdempotency").hasNext()); + assertFalse(records.get(2).headers().allWithName("headerIdempotency").hasNext()); + } finally { + sourceTask.stop(); + } + } }