This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch sink-headers
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit fbed92f4d8c71658118ec9450b7d938045dfcfd1
Author: Andrea Cosentino <anco...@gmail.com>
AuthorDate: Wed May 20 17:46:02 2020 +0200

    CamelSinkTask headers must be cleaned before sending them ahead
---
 .../apache/camel/kafkaconnector/CamelSinkTask.java |  24 ++---
 .../camel/kafkaconnector/CamelSinkTaskTest.java    | 104 ++++++++++-----------
 2 files changed, 65 insertions(+), 63 deletions(-)

diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index 521c0ad..c32a330 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -31,6 +31,7 @@ import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.kafkaconnector.utils.CamelMainSupport;
 import org.apache.camel.kafkaconnector.utils.TaskHelper;
 import org.apache.camel.support.DefaultExchange;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.errors.ConnectException;
@@ -140,29 +141,30 @@ public class CamelSinkTask extends SinkTask {
     }
 
     private void addHeader(Map<String, Object> map, Header singleHeader) {
+       String camelHeaderKey = StringUtils.removeStart(singleHeader.key(), 
HEADER_CAMEL_PREFIX);
         Schema schema = singleHeader.schema();
         if 
(schema.type().getName().equals(Schema.STRING_SCHEMA.type().getName())) {
-            map.put(singleHeader.key(), (String)singleHeader.value());
+            map.put(camelHeaderKey, (String)singleHeader.value());
         } else if 
(schema.type().getName().equalsIgnoreCase(Schema.BOOLEAN_SCHEMA.type().getName()))
 {
-            map.put(singleHeader.key(), (Boolean)singleHeader.value());
+            map.put(camelHeaderKey, (Boolean)singleHeader.value());
         } else if 
(schema.type().getName().equalsIgnoreCase(Schema.INT32_SCHEMA.type().getName()))
 {
-            map.put(singleHeader.key(), singleHeader.value());
+            map.put(camelHeaderKey, singleHeader.value());
         } else if 
(schema.type().getName().equalsIgnoreCase(Schema.BYTES_SCHEMA.type().getName()))
 {
-            map.put(singleHeader.key(), (byte[])singleHeader.value());
+            map.put(camelHeaderKey, (byte[])singleHeader.value());
         } else if 
(schema.type().getName().equalsIgnoreCase(Schema.FLOAT32_SCHEMA.type().getName()))
 {
-            map.put(singleHeader.key(), (float)singleHeader.value());
+            map.put(camelHeaderKey, (float)singleHeader.value());
         } else if 
(schema.type().getName().equalsIgnoreCase(Schema.FLOAT64_SCHEMA.type().getName()))
 {
-            map.put(singleHeader.key(), (double)singleHeader.value());
+            map.put(camelHeaderKey, (double)singleHeader.value());
         } else if 
(schema.type().getName().equalsIgnoreCase(Schema.INT16_SCHEMA.type().getName()))
 {
-            map.put(singleHeader.key(), (short)singleHeader.value());
+            map.put(camelHeaderKey, (short)singleHeader.value());
         } else if 
(schema.type().getName().equalsIgnoreCase(Schema.INT64_SCHEMA.type().getName()))
 {
-            map.put(singleHeader.key(), (long)singleHeader.value());
+            map.put(camelHeaderKey, (long)singleHeader.value());
         } else if 
(schema.type().getName().equalsIgnoreCase(Schema.INT8_SCHEMA.type().getName())) 
{
-            map.put(singleHeader.key(), (byte)singleHeader.value());
+            map.put(camelHeaderKey, (byte)singleHeader.value());
         } else if 
(schema.type().getName().equalsIgnoreCase(SchemaBuilder.map(Schema.STRING_SCHEMA,
 Schema.STRING_SCHEMA).type().getName())) {
-            map.put(singleHeader.key(), (Map<?, ?>)singleHeader.value());
+            map.put(camelHeaderKey, (Map<?, ?>)singleHeader.value());
         } else if 
(schema.type().getName().equalsIgnoreCase(SchemaBuilder.array(Schema.STRING_SCHEMA).type().getName()))
 {
-            map.put(singleHeader.key(), (List<?>)singleHeader.value());
+            map.put(camelHeaderKey, (List<?>)singleHeader.value());
         }
     }
 
diff --git 
a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java 
b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index 32e3bc6..ba147b8 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -91,13 +91,13 @@ public class CamelSinkTaskTest {
         Exchange exchange = c.receive("seda:test", 1000L);
         assertEquals("camel", exchange.getMessage().getBody());
         assertEquals("test", 
exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
-        assertTrue(exchange.getIn().getHeader("CamelHeaderMyBoolean", 
Boolean.class));
-        assertEquals(myByte, exchange.getIn().getHeader("CamelHeaderMyByte", 
Byte.class));
-        assertEquals(myFloat, exchange.getIn().getHeader("CamelHeaderMyFloat", 
Float.class));
-        assertEquals(myShort, exchange.getIn().getHeader("CamelHeaderMyShort", 
Short.class));
-        assertEquals(myDouble, 
exchange.getIn().getHeader("CamelHeaderMyDouble", Double.class));
-        assertEquals(myInteger, 
exchange.getIn().getHeader("CamelHeaderMyInteger"));
-        assertEquals(myLong, exchange.getIn().getHeader("CamelHeaderMyLong", 
Long.class));
+        assertTrue(exchange.getIn().getHeader("MyBoolean", Boolean.class));
+        assertEquals(myByte, exchange.getIn().getHeader("MyByte", Byte.class));
+        assertEquals(myFloat, exchange.getIn().getHeader("MyFloat", 
Float.class));
+        assertEquals(myShort, exchange.getIn().getHeader("MyShort", 
Short.class));
+        assertEquals(myDouble, exchange.getIn().getHeader("MyDouble", 
Double.class));
+        assertEquals(myInteger, exchange.getIn().getHeader("MyInteger"));
+        assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class));
 
         camelSinkTask.stop();
     }
@@ -193,13 +193,13 @@ public class CamelSinkTaskTest {
         assertEquals(myDouble, (Double) 
exchange.getProperties().get("CamelPropertyMyDouble"));
         assertEquals(myInteger, 
exchange.getProperties().get("CamelPropertyMyInteger"));
         assertEquals(myLong, (Long) 
exchange.getProperties().get("CamelPropertyMyLong"));
-        assertTrue(exchange.getIn().getHeader("CamelHeaderMyBoolean", 
Boolean.class));
-        assertEquals(myByte, exchange.getIn().getHeader("CamelHeaderMyByte", 
Byte.class));
-        assertEquals(myFloat, exchange.getIn().getHeader("CamelHeaderMyFloat", 
Float.class));
-        assertEquals(myShort, exchange.getIn().getHeader("CamelHeaderMyShort", 
Short.class));
-        assertEquals(myDouble, 
exchange.getIn().getHeader("CamelHeaderMyDouble", Double.class));
-        assertEquals(myInteger, 
exchange.getIn().getHeader("CamelHeaderMyInteger"));
-        assertEquals(myLong, exchange.getIn().getHeader("CamelHeaderMyLong", 
Long.class));
+        assertTrue(exchange.getIn().getHeader("MyBoolean", Boolean.class));
+        assertEquals(myByte, exchange.getIn().getHeader("MyByte", Byte.class));
+        assertEquals(myFloat, exchange.getIn().getHeader("MyFloat", 
Float.class));
+        assertEquals(myShort, exchange.getIn().getHeader("MyShort", 
Short.class));
+        assertEquals(myDouble, exchange.getIn().getHeader("MyDouble", 
Double.class));
+        assertEquals(myInteger, exchange.getIn().getHeader("MyInteger"));
+        assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class));
 
         camelSinkTask.stop();
     }
@@ -246,16 +246,16 @@ public class CamelSinkTaskTest {
         Exchange exchange = c.receive("seda:test", 1000L);
         assertEquals("camel", exchange.getMessage().getBody());
         assertEquals("test", 
exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
-        assertTrue(exchange.getIn().getHeader("CamelHeaderMyBoolean", 
Boolean.class));
-        assertEquals(myByte, exchange.getIn().getHeader("CamelHeaderMyByte", 
Byte.class));
-        assertEquals(myFloat, exchange.getIn().getHeader("CamelHeaderMyFloat", 
Float.class));
-        assertEquals(myShort, exchange.getIn().getHeader("CamelHeaderMyShort", 
Short.class));
-        assertEquals(myDouble, 
exchange.getIn().getHeader("CamelHeaderMyDouble", Double.class));
-        assertEquals(myInteger, 
exchange.getIn().getHeader("CamelHeaderMyInteger"));
-        assertEquals(myLong, exchange.getIn().getHeader("CamelHeaderMyLong", 
Long.class));
-        assertEquals(map, exchange.getIn().getHeader("CamelHeaderMyMap", 
Map.class));
-        assertEquals(map1, exchange.getIn().getHeader("CamelHeaderMyMap1", 
Map.class));
-        assertEquals(map2, exchange.getIn().getHeader("CamelHeaderMyMap2", 
Map.class));
+        assertTrue(exchange.getIn().getHeader("MyBoolean", Boolean.class));
+        assertEquals(myByte, exchange.getIn().getHeader("MyByte", Byte.class));
+        assertEquals(myFloat, exchange.getIn().getHeader("MyFloat", 
Float.class));
+        assertEquals(myShort, exchange.getIn().getHeader("MyShort", 
Short.class));
+        assertEquals(myDouble, exchange.getIn().getHeader("MyDouble", 
Double.class));
+        assertEquals(myInteger, exchange.getIn().getHeader("MyInteger"));
+        assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class));
+        assertEquals(map, exchange.getIn().getHeader("MyMap", Map.class));
+        assertEquals(map1, exchange.getIn().getHeader("MyMap1", Map.class));
+        assertEquals(map2, exchange.getIn().getHeader("MyMap2", Map.class));
         camelSinkTask.stop();
     }
     
@@ -321,16 +321,16 @@ public class CamelSinkTaskTest {
         assertEquals(map, exchange.getProperties().get("CamelPropertyMyMap"));
         assertEquals(map1, 
exchange.getProperties().get("CamelPropertyMyMap1"));
         assertEquals(map2, 
exchange.getProperties().get("CamelPropertyMyMap2"));
-        assertTrue(exchange.getIn().getHeader("CamelHeaderMyBoolean", 
Boolean.class));
-        assertEquals(myByte, exchange.getIn().getHeader("CamelHeaderMyByte", 
Byte.class));
-        assertEquals(myFloat, exchange.getIn().getHeader("CamelHeaderMyFloat", 
Float.class));
-        assertEquals(myShort, exchange.getIn().getHeader("CamelHeaderMyShort", 
Short.class));
-        assertEquals(myDouble, 
exchange.getIn().getHeader("CamelHeaderMyDouble", Double.class));
-        assertEquals(myInteger, 
exchange.getIn().getHeader("CamelHeaderMyInteger"));
-        assertEquals(myLong, exchange.getIn().getHeader("CamelHeaderMyLong", 
Long.class));
-        assertEquals(map, exchange.getIn().getHeader("CamelHeaderMyMap", 
Map.class));
-        assertEquals(map1, exchange.getIn().getHeader("CamelHeaderMyMap1", 
Map.class));
-        assertEquals(map2, exchange.getIn().getHeader("CamelHeaderMyMap2", 
Map.class));
+        assertTrue(exchange.getIn().getHeader("MyBoolean", Boolean.class));
+        assertEquals(myByte, exchange.getIn().getHeader("MyByte", Byte.class));
+        assertEquals(myFloat, exchange.getIn().getHeader("MyFloat", 
Float.class));
+        assertEquals(myShort, exchange.getIn().getHeader("MyShort", 
Short.class));
+        assertEquals(myDouble, exchange.getIn().getHeader("MyDouble", 
Double.class));
+        assertEquals(myInteger, exchange.getIn().getHeader("MyInteger"));
+        assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class));
+        assertEquals(map, exchange.getIn().getHeader("MyMap", Map.class));
+        assertEquals(map1, exchange.getIn().getHeader("MyMap1", Map.class));
+        assertEquals(map2, exchange.getIn().getHeader("MyMap2", Map.class));
 
         camelSinkTask.stop();
     }
@@ -374,15 +374,15 @@ public class CamelSinkTaskTest {
         Exchange exchange = c.receive("seda:test", 1000L);
         assertEquals("camel", exchange.getMessage().getBody());
         assertEquals("test", 
exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
-        assertTrue(exchange.getIn().getHeader("CamelHeaderMyBoolean", 
Boolean.class));
-        assertEquals(myByte, exchange.getIn().getHeader("CamelHeaderMyByte", 
Byte.class));
-        assertEquals(myFloat, exchange.getIn().getHeader("CamelHeaderMyFloat", 
Float.class));
-        assertEquals(myShort, exchange.getIn().getHeader("CamelHeaderMyShort", 
Short.class));
-        assertEquals(myDouble, 
exchange.getIn().getHeader("CamelHeaderMyDouble", Double.class));
-        assertEquals(myInteger, 
exchange.getIn().getHeader("CamelHeaderMyInteger"));
-        assertEquals(myLong, exchange.getIn().getHeader("CamelHeaderMyLong", 
Long.class));
-        assertEquals(list, exchange.getIn().getHeader("CamelHeaderMyList", 
List.class));
-        assertEquals(list1, exchange.getIn().getHeader("CamelHeaderMyList1", 
List.class));
+        assertTrue(exchange.getIn().getHeader("MyBoolean", Boolean.class));
+        assertEquals(myByte, exchange.getIn().getHeader("MyByte", Byte.class));
+        assertEquals(myFloat, exchange.getIn().getHeader("MyFloat", 
Float.class));
+        assertEquals(myShort, exchange.getIn().getHeader("MyShort", 
Short.class));
+        assertEquals(myDouble, exchange.getIn().getHeader("MyDouble", 
Double.class));
+        assertEquals(myInteger, exchange.getIn().getHeader("MyInteger"));
+        assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class));
+        assertEquals(list, exchange.getIn().getHeader("MyList", List.class));
+        assertEquals(list1, exchange.getIn().getHeader("MyList1", List.class));
         camelSinkTask.stop();
     }
     
@@ -443,15 +443,15 @@ public class CamelSinkTaskTest {
         assertEquals(myLong, (Long) 
exchange.getProperties().get("CamelPropertyMyLong"));
         assertEquals(list, 
exchange.getProperties().get("CamelPropertyMyList"));
         assertEquals(list1, 
exchange.getProperties().get("CamelPropertyMyList1"));
-        assertTrue(exchange.getIn().getHeader("CamelHeaderMyBoolean", 
Boolean.class));
-        assertEquals(myByte, exchange.getIn().getHeader("CamelHeaderMyByte", 
Byte.class));
-        assertEquals(myFloat, exchange.getIn().getHeader("CamelHeaderMyFloat", 
Float.class));
-        assertEquals(myShort, exchange.getIn().getHeader("CamelHeaderMyShort", 
Short.class));
-        assertEquals(myDouble, 
exchange.getIn().getHeader("CamelHeaderMyDouble", Double.class));
-        assertEquals(myInteger, 
exchange.getIn().getHeader("CamelHeaderMyInteger"));
-        assertEquals(myLong, exchange.getIn().getHeader("CamelHeaderMyLong", 
Long.class));
-        assertEquals(list, exchange.getIn().getHeader("CamelHeaderMyList", 
List.class));
-        assertEquals(list1, exchange.getIn().getHeader("CamelHeaderMyList1", 
List.class));
+        assertTrue(exchange.getIn().getHeader("MyBoolean", Boolean.class));
+        assertEquals(myByte, exchange.getIn().getHeader("MyByte", Byte.class));
+        assertEquals(myFloat, exchange.getIn().getHeader("MyFloat", 
Float.class));
+        assertEquals(myShort, exchange.getIn().getHeader("MyShort", 
Short.class));
+        assertEquals(myDouble, exchange.getIn().getHeader("MyDouble", 
Double.class));
+        assertEquals(myInteger, exchange.getIn().getHeader("MyInteger"));
+        assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class));
+        assertEquals(list, exchange.getIn().getHeader("MyList", List.class));
+        assertEquals(list1, exchange.getIn().getHeader("MyList1", List.class));
 
         camelSinkTask.stop();
     }

Reply via email to