This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 44b53c27e881475af8cc710498371371f40930b5
Author: Andrea Cosentino <anco...@gmail.com>
AuthorDate: Fri Dec 4 18:06:53 2020 +0100

    Add a removeHeaders option on both source and sink
---
 .../camel/kafkaconnector/CamelConnectorConfig.java |  4 ++
 .../kafkaconnector/CamelSinkConnectorConfig.java   |  3 +-
 .../apache/camel/kafkaconnector/CamelSinkTask.java |  2 +
 .../kafkaconnector/CamelSourceConnectorConfig.java |  3 +-
 .../camel/kafkaconnector/CamelSourceTask.java      |  2 +
 .../utils/CamelKafkaConnectMain.java               | 45 ++++++++++++++++----
 .../camel/kafkaconnector/CamelSinkTaskTest.java    | 49 ++++++++++++++++++++++
 .../camel/kafkaconnector/CamelSourceTaskTest.java  | 34 +++++++++++++++
 8 files changed, 133 insertions(+), 9 deletions(-)

diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java
index 1b5e014..00c3ce4 100644
--- 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java
+++ 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java
@@ -39,6 +39,10 @@ public abstract class CamelConnectorConfig extends 
AbstractConfig {
     public static final String CAMEL_CONNECTOR_ERROR_HANDLER_CONF = 
"camel.error.handler";
     public static final String CAMEL_CONNECTOR_ERROR_HANDLER_DOC = "The error 
handler to use: possible value are 'no' or 'default'";
     
+    public static final String CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_DEFAULT 
= "";
+    public static final String CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF = 
"camel.remove.headers.pattern";
+    public static final String CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_DOC = 
"The pattern of the headers we want to exclude from the exchange";
+    
     public static final int 
CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DEFAULT = 0;
     public static final String 
CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_CONF = 
"camel.error.handler.max.redeliveries";
     public static final String 
CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DOC = "The maximum 
redeliveries to be use in case of Default Error Handler";
diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
index bf766b7..6350fe9 100644
--- 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
+++ 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
@@ -66,7 +66,8 @@ public class CamelSinkConnectorConfig extends 
CamelConnectorConfig {
         .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_CONF, Type.STRING, 
CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_DEFAULT, Importance.LOW, 
CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_DOC)
         .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF, 
Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DEFAULT, 
Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DOC)
         .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_CONF, 
Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_DEFAULT, 
Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_DOC)
-        .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF, 
Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DEFAULT, 
Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DOC);
+        .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF, 
Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DEFAULT, 
Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DOC)
+        .define(CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF, Type.STRING, 
CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_DEFAULT, Importance.MEDIUM, 
CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_DOC);
     
     public CamelSinkConnectorConfig(ConfigDef config, Map<String, String> 
parsedConfig) {
         super(config, parsedConfig);
diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index e4c9341..5b5e0d5 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -94,6 +94,7 @@ public class CamelSinkTask extends SinkTask {
             final String idempotentRepositoryBootstrapServers = 
config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF);
             final int idempotentRepositoryKafkaMaxCacheSize = 
config.getInt(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_CONF);
             final int idempotentRepositoryKafkaPollDuration = 
config.getInt(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF);
+            final String headersRemovePattern = 
config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF);
             
             CamelContext camelContext = new DefaultCamelContext();
             if (remoteUrl == null) {
@@ -122,6 +123,7 @@ public class CamelSinkTask extends SinkTask {
                 
.withIdempotentRepositoryKafkaServers(idempotentRepositoryBootstrapServers)
                 
.withIdempotentRepositoryKafkaMaxCacheSize(idempotentRepositoryKafkaMaxCacheSize)
                 
.withIdempotentRepositoryKafkaPollDuration(idempotentRepositoryKafkaPollDuration)
+                .withHeadersExcludePattern(headersRemovePattern)
                 .build(camelContext);
 
 
diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
index 5d93e50..a703b18 100644
--- 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
+++ 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
@@ -102,7 +102,8 @@ public class CamelSourceConnectorConfig extends 
CamelConnectorConfig {
         .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_CONF, Type.STRING, 
CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_DEFAULT, Importance.LOW, 
CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_DOC)
         .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF, 
Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DEFAULT, 
Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DOC)
         .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_CONF, 
Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_DEFAULT, 
Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_DOC)
-        .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF, 
Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DEFAULT, 
Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DOC);
+        .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF, 
Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DEFAULT, 
Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DOC)
+        .define(CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF, Type.STRING, 
CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_DEFAULT, Importance.MEDIUM, 
CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_DOC);
     
     public CamelSourceConnectorConfig(ConfigDef config, Map<String, String> 
parsedConfig) {
         super(config, parsedConfig);
diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 7848af4..94125d5 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -100,6 +100,7 @@ public class CamelSourceTask extends SourceTask {
             final String idempotentRepositoryBootstrapServers = 
config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF);
             final int idempotentRepositoryKafkaMaxCacheSize = 
config.getInt(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_CONF);
             final int idempotentRepositoryKafkaPollDuration = 
config.getInt(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF);
+            final String headersRemovePattern = 
config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF);
             
             topics = 
config.getString(CamelSourceConnectorConfig.TOPIC_CONF).split(",");
 
@@ -131,6 +132,7 @@ public class CamelSourceTask extends SourceTask {
                 
.withIdempotentRepositoryKafkaServers(idempotentRepositoryBootstrapServers)
                 
.withIdempotentRepositoryKafkaMaxCacheSize(idempotentRepositoryKafkaMaxCacheSize)
                 
.withIdempotentRepositoryKafkaPollDuration(idempotentRepositoryKafkaPollDuration)
+                .withHeadersExcludePattern(headersRemovePattern)
                 .build(camelContext);
 
             consumer = 
cms.getCamelContext().getEndpoint(localUrl).createPollingConsumer();
diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
 
b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
index 982d7d7..7d3e7d7 100644
--- 
a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
+++ 
b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
@@ -106,6 +106,7 @@ public class CamelKafkaConnectMain extends SimpleMain {
         private String idempotentRepositoryKafkaServers;
         private int idempotentRepositoryKafkaMaxCacheSize;
         private int idempotentRepositoryKafkaPollDuration;
+        private String headersExcludePattern;
 
         public Builder(String from, String to) {
             this.from = from;
@@ -196,6 +197,11 @@ public class CamelKafkaConnectMain extends SimpleMain {
             this.idempotentRepositoryKafkaPollDuration = 
idempotentRepositoryKafkaPollDuration;
             return this;
         }
+        
+        public Builder withHeadersExcludePattern(String headersExcludePattern) 
{
+            this.headersExcludePattern = headersExcludePattern;
+            return this;
+        }
 
         public CamelKafkaConnectMain build(CamelContext camelContext) {
             CamelKafkaConnectMain camelMain = new 
CamelKafkaConnectMain(camelContext);
@@ -261,14 +267,23 @@ public class CamelKafkaConnectMain extends SimpleMain {
                                     
LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({}).idempotentConsumer(body(),
 + "
                                            + 
"MemoryIdempotentRepository.memoryIdempotentRepository({}))", s, 
aggregationSize, aggregationTimeout, memoryDimension);
                                     LOG.info(".to({})", to);
-                                    
rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).idempotentConsumer(body()).messageIdRepositoryRef("idempotentRepository").toD(to);
+                                    if 
(ObjectHelper.isEmpty(headersExcludePattern)) {
+                                        
rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).idempotentConsumer(body()).messageIdRepositoryRef("idempotentRepository").toD(to);
+                                    } else {
+                                       
rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).idempotentConsumer(body()).messageIdRepositoryRef("idempotentRepository").removeHeaders(headersExcludePattern).toD(to);
+                                    }
                                     break;
                                 case "header":
                                     
LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({}).idempotentConsumer(header(expressionHeader),
 + "
                                            + 
"MemoryIdempotentRepository.memoryIdempotentRepository({}))", s, 
aggregationSize, aggregationTimeout, memoryDimension);
                                     LOG.info(".to({})", to);
-                                    
rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout)
-                                        
.idempotentConsumer(header(expressionHeader)).messageIdRepositoryRef("idempotentRepository").toD(to);
+                                    if 
(ObjectHelper.isEmpty(headersExcludePattern)) {
+                                        
rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout)
+                                            
.idempotentConsumer(header(expressionHeader)).messageIdRepositoryRef("idempotentRepository").toD(to);
+                                    } else {
+                                        
rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout)
+                                            
.idempotentConsumer(header(expressionHeader)).messageIdRepositoryRef("idempotentRepository").removeHeaders(headersExcludePattern).toD(to);
 
+                                    }
                                     break;
                                 default:
                                     break;
@@ -276,18 +291,30 @@ public class CamelKafkaConnectMain extends SimpleMain {
                         } else {
                             
LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({})",
 s, aggregationSize, aggregationTimeout);
                             LOG.info(".to({})", to);
-                            
rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).toD(to);
+                            if (ObjectHelper.isEmpty(headersExcludePattern)) {
+                                
rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).toD(to);
+                            } else {
+                               
rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).removeHeaders(headersExcludePattern).toD(to);
+                            }
                         }
                     } else {
                         if (idempotencyEnabled) {
                             switch (expressionType) {
                                 case "body":
                                     LOG.info("idempotentConsumer(body(), 
MemoryIdempotentRepository.memoryIdempotentRepository({})).to({})", 
memoryDimension, to);
-                                    
rd.idempotentConsumer(body()).messageIdRepositoryRef("idempotentRepository").toD(to);
+                                    if 
(ObjectHelper.isEmpty(headersExcludePattern)) {
+                                        
rd.idempotentConsumer(body()).messageIdRepositoryRef("idempotentRepository").toD(to);
+                                    } else {
+                                       
rd.idempotentConsumer(body()).messageIdRepositoryRef("idempotentRepository").removeHeaders(headersExcludePattern).toD(to);
      
+                                    }
                                     break;
                                 case "header":
                                     
LOG.info("idempotentConsumer(header(expressionHeader), 
MemoryIdempotentRepository.memoryIdempotentRepository({})).to({})", 
memoryDimension, to);
-                                    
rd.idempotentConsumer(header(expressionHeader)).messageIdRepositoryRef("idempotentRepository").toD(to);
+                                    if 
(ObjectHelper.isEmpty(headersExcludePattern)) {
+                                        
rd.idempotentConsumer(header(expressionHeader)).messageIdRepositoryRef("idempotentRepository").toD(to);
+                                    } else {
+                                       
rd.idempotentConsumer(header(expressionHeader)).messageIdRepositoryRef("idempotentRepository").removeHeaders(headersExcludePattern).toD(to);
+                                    }
                                     break;
                                 default:
                                     break;
@@ -295,7 +322,11 @@ public class CamelKafkaConnectMain extends SimpleMain {
                         } else {
                             //to
                             LOG.info(".to({})", to);
-                            rd.toD(to);
+                            if (ObjectHelper.isEmpty(headersExcludePattern)) {
+                                rd.toD(to);
+                            } else {
+                               rd.removeHeaders(headersExcludePattern).toD(to);
+                            }
                         }
                     }
                 }
diff --git 
a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java 
b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index 96e7eb6..74ebd32 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -37,6 +37,7 @@ import org.junit.jupiter.api.Test;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -146,6 +147,54 @@ public class CamelSinkTaskTest {
 
         sinkTask.stop();
     }
+    
+    @Test
+    public void testBodyAndHeadersExclusions() {
+        Map<String, String> props = new HashMap<>();
+        props.put(TOPIC_CONF, TOPIC_NAME);
+        props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
+        
props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF, 
"MyBoolean" + "|" + "MyShort");
+
+        CamelSinkTask sinkTask = new CamelSinkTask();
+        sinkTask.start(props);
+
+        Byte myByte = new Byte("100");
+        Float myFloat = new Float("100");
+        Short myShort = new Short("100");
+        Double myDouble = new Double("100");
+        int myInteger = 100;
+        Long myLong = new Long("100");
+        BigDecimal myBigDecimal = new BigDecimal(1234567890);
+        Schema schema = Decimal.schema(myBigDecimal.scale());
+
+        List<SinkRecord> records = new ArrayList<SinkRecord>();
+        SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, 
"camel", 42);
+        record.headers().addBoolean(CamelSinkTask.HEADER_CAMEL_PREFIX + 
"MyBoolean", true);
+        record.headers().addByte(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyByte", 
myByte);
+        record.headers().addFloat(CamelSinkTask.HEADER_CAMEL_PREFIX + 
"MyFloat", myFloat);
+        record.headers().addShort(CamelSinkTask.HEADER_CAMEL_PREFIX + 
"MyShort", myShort);
+        record.headers().addDouble(CamelSinkTask.HEADER_CAMEL_PREFIX + 
"MyDouble", myDouble);
+        record.headers().addInt(CamelSinkTask.HEADER_CAMEL_PREFIX + 
"MyInteger", myInteger);
+        record.headers().addLong(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyLong", 
myLong);
+        record.headers().add(CamelSinkTask.HEADER_CAMEL_PREFIX + 
"MyBigDecimal", Decimal.fromLogical(schema, myBigDecimal), schema);
+        records.add(record);
+        sinkTask.put(records);
+
+        ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
+        Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
+        assertEquals("camel", exchange.getMessage().getBody());
+        assertEquals("test", 
exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
+        assertNull(exchange.getIn().getHeader("MyBoolean", Boolean.class));
+        assertEquals(myByte, exchange.getIn().getHeader("MyByte", Byte.class));
+        assertEquals(myFloat, exchange.getIn().getHeader("MyFloat", 
Float.class));
+        assertNull(exchange.getIn().getHeader("MyShort", Short.class));
+        assertEquals(myDouble, exchange.getIn().getHeader("MyDouble", 
Double.class));
+        assertEquals(myInteger, exchange.getIn().getHeader("MyInteger"));
+        assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class));
+        assertEquals(myBigDecimal, exchange.getIn().getHeader("MyBigDecimal", 
BigDecimal.class));
+
+        sinkTask.stop();
+    }
 
     @Test
     public void testBodyAndProperties() {
diff --git 
a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java 
b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index 2a85664..de611a0 100644
--- 
a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ 
b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -20,6 +20,7 @@ import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -31,12 +32,14 @@ import 
org.apache.camel.kafkaconnector.utils.StringJoinerAggregator;
 import org.apache.kafka.connect.data.Decimal;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.junit.jupiter.api.Test;
 
 import static org.apache.camel.util.CollectionHelper.mapOf;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -526,4 +529,35 @@ public class CamelSourceTaskTest {
             sourceTask.stop();
         }
     }
+    
+    @Test
+    public void testSourcePollingWithIdempotencyEnabledAndHeaderExclusion() {
+
+        CamelSourceTask sourceTask = new CamelSourceTask();
+        sourceTask
+            .start(mapOf(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME, 
CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI,
+                         
CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, true, 
CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF,
+                         "header", 
CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF, 
"headerIdempotency",
+                       
CamelSourceConnectorConfig.CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF, 
"headerIdempotency"));
+
+        try {
+
+            
sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "Test", 
"headerIdempotency", "Test");
+            
sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, 
"Test1", "headerIdempotency", "Test1");
+            
sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, 
"TestTest", "headerIdempotency", "Test");
+            
sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, 
"Test2", "headerIdempotency", "Test2");
+
+            List<SourceRecord> records = sourceTask.poll();
+
+            assertThat(records).hasSize(3);
+            
assertThat(records).element(0).hasFieldOrPropertyWithValue("value", "Test");
+            
assertThat(records).element(1).hasFieldOrPropertyWithValue("value", "Test1");
+            
assertThat(records).element(2).hasFieldOrPropertyWithValue("value", "Test2");
+            
assertFalse(records.get(0).headers().allWithName("headerIdempotency").hasNext());
+            
assertFalse(records.get(1).headers().allWithName("headerIdempotency").hasNext());
+            
assertFalse(records.get(2).headers().allWithName("headerIdempotency").hasNext());
+        } finally {
+            sourceTask.stop();
+        }
+    }
 }

Reply via email to