This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch sink-headers in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit fbed92f4d8c71658118ec9450b7d938045dfcfd1 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Wed May 20 17:46:02 2020 +0200 CamelSinkTask headers must be cleaned before sending them ahead --- .../apache/camel/kafkaconnector/CamelSinkTask.java | 24 ++--- .../camel/kafkaconnector/CamelSinkTaskTest.java | 104 ++++++++++----------- 2 files changed, 65 insertions(+), 63 deletions(-) diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java index 521c0ad..c32a330 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java @@ -31,6 +31,7 @@ import org.apache.camel.impl.DefaultCamelContext; import org.apache.camel.kafkaconnector.utils.CamelMainSupport; import org.apache.camel.kafkaconnector.utils.TaskHelper; import org.apache.camel.support.DefaultExchange; +import org.apache.commons.lang3.StringUtils; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.errors.ConnectException; @@ -140,29 +141,30 @@ public class CamelSinkTask extends SinkTask { } private void addHeader(Map<String, Object> map, Header singleHeader) { + String camelHeaderKey = StringUtils.removeStart(singleHeader.key(), HEADER_CAMEL_PREFIX); Schema schema = singleHeader.schema(); if (schema.type().getName().equals(Schema.STRING_SCHEMA.type().getName())) { - map.put(singleHeader.key(), (String)singleHeader.value()); + map.put(camelHeaderKey, (String)singleHeader.value()); } else if (schema.type().getName().equalsIgnoreCase(Schema.BOOLEAN_SCHEMA.type().getName())) { - map.put(singleHeader.key(), (Boolean)singleHeader.value()); + map.put(camelHeaderKey, (Boolean)singleHeader.value()); } else if (schema.type().getName().equalsIgnoreCase(Schema.INT32_SCHEMA.type().getName())) { - map.put(singleHeader.key(), singleHeader.value()); + map.put(camelHeaderKey, singleHeader.value()); } else if (schema.type().getName().equalsIgnoreCase(Schema.BYTES_SCHEMA.type().getName())) { - map.put(singleHeader.key(), (byte[])singleHeader.value()); + map.put(camelHeaderKey, (byte[])singleHeader.value()); } else if (schema.type().getName().equalsIgnoreCase(Schema.FLOAT32_SCHEMA.type().getName())) { - map.put(singleHeader.key(), (float)singleHeader.value()); + map.put(camelHeaderKey, (float)singleHeader.value()); } else if (schema.type().getName().equalsIgnoreCase(Schema.FLOAT64_SCHEMA.type().getName())) { - map.put(singleHeader.key(), (double)singleHeader.value()); + map.put(camelHeaderKey, (double)singleHeader.value()); } else if (schema.type().getName().equalsIgnoreCase(Schema.INT16_SCHEMA.type().getName())) { - map.put(singleHeader.key(), (short)singleHeader.value()); + map.put(camelHeaderKey, (short)singleHeader.value()); } else if (schema.type().getName().equalsIgnoreCase(Schema.INT64_SCHEMA.type().getName())) { - map.put(singleHeader.key(), (long)singleHeader.value()); + map.put(camelHeaderKey, (long)singleHeader.value()); } else if (schema.type().getName().equalsIgnoreCase(Schema.INT8_SCHEMA.type().getName())) { - map.put(singleHeader.key(), (byte)singleHeader.value()); + map.put(camelHeaderKey, (byte)singleHeader.value()); } else if (schema.type().getName().equalsIgnoreCase(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).type().getName())) { - map.put(singleHeader.key(), (Map<?, ?>)singleHeader.value()); + map.put(camelHeaderKey, (Map<?, ?>)singleHeader.value()); } else if (schema.type().getName().equalsIgnoreCase(SchemaBuilder.array(Schema.STRING_SCHEMA).type().getName())) { - map.put(singleHeader.key(), (List<?>)singleHeader.value()); + map.put(camelHeaderKey, (List<?>)singleHeader.value()); } } diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java index 32e3bc6..ba147b8 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java @@ -91,13 +91,13 @@ public class CamelSinkTaskTest { Exchange exchange = c.receive("seda:test", 1000L); assertEquals("camel", exchange.getMessage().getBody()); assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); - assertTrue(exchange.getIn().getHeader("CamelHeaderMyBoolean", Boolean.class)); - assertEquals(myByte, exchange.getIn().getHeader("CamelHeaderMyByte", Byte.class)); - assertEquals(myFloat, exchange.getIn().getHeader("CamelHeaderMyFloat", Float.class)); - assertEquals(myShort, exchange.getIn().getHeader("CamelHeaderMyShort", Short.class)); - assertEquals(myDouble, exchange.getIn().getHeader("CamelHeaderMyDouble", Double.class)); - assertEquals(myInteger, exchange.getIn().getHeader("CamelHeaderMyInteger")); - assertEquals(myLong, exchange.getIn().getHeader("CamelHeaderMyLong", Long.class)); + assertTrue(exchange.getIn().getHeader("MyBoolean", Boolean.class)); + assertEquals(myByte, exchange.getIn().getHeader("MyByte", Byte.class)); + assertEquals(myFloat, exchange.getIn().getHeader("MyFloat", Float.class)); + assertEquals(myShort, exchange.getIn().getHeader("MyShort", Short.class)); + assertEquals(myDouble, exchange.getIn().getHeader("MyDouble", Double.class)); + assertEquals(myInteger, exchange.getIn().getHeader("MyInteger")); + assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class)); camelSinkTask.stop(); } @@ -193,13 +193,13 @@ public class CamelSinkTaskTest { assertEquals(myDouble, (Double) exchange.getProperties().get("CamelPropertyMyDouble")); assertEquals(myInteger, exchange.getProperties().get("CamelPropertyMyInteger")); assertEquals(myLong, (Long) exchange.getProperties().get("CamelPropertyMyLong")); - assertTrue(exchange.getIn().getHeader("CamelHeaderMyBoolean", Boolean.class)); - assertEquals(myByte, exchange.getIn().getHeader("CamelHeaderMyByte", Byte.class)); - assertEquals(myFloat, exchange.getIn().getHeader("CamelHeaderMyFloat", Float.class)); - assertEquals(myShort, exchange.getIn().getHeader("CamelHeaderMyShort", Short.class)); - assertEquals(myDouble, exchange.getIn().getHeader("CamelHeaderMyDouble", Double.class)); - assertEquals(myInteger, exchange.getIn().getHeader("CamelHeaderMyInteger")); - assertEquals(myLong, exchange.getIn().getHeader("CamelHeaderMyLong", Long.class)); + assertTrue(exchange.getIn().getHeader("MyBoolean", Boolean.class)); + assertEquals(myByte, exchange.getIn().getHeader("MyByte", Byte.class)); + assertEquals(myFloat, exchange.getIn().getHeader("MyFloat", Float.class)); + assertEquals(myShort, exchange.getIn().getHeader("MyShort", Short.class)); + assertEquals(myDouble, exchange.getIn().getHeader("MyDouble", Double.class)); + assertEquals(myInteger, exchange.getIn().getHeader("MyInteger")); + assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class)); camelSinkTask.stop(); } @@ -246,16 +246,16 @@ public class CamelSinkTaskTest { Exchange exchange = c.receive("seda:test", 1000L); assertEquals("camel", exchange.getMessage().getBody()); assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); - assertTrue(exchange.getIn().getHeader("CamelHeaderMyBoolean", Boolean.class)); - assertEquals(myByte, exchange.getIn().getHeader("CamelHeaderMyByte", Byte.class)); - assertEquals(myFloat, exchange.getIn().getHeader("CamelHeaderMyFloat", Float.class)); - assertEquals(myShort, exchange.getIn().getHeader("CamelHeaderMyShort", Short.class)); - assertEquals(myDouble, exchange.getIn().getHeader("CamelHeaderMyDouble", Double.class)); - assertEquals(myInteger, exchange.getIn().getHeader("CamelHeaderMyInteger")); - assertEquals(myLong, exchange.getIn().getHeader("CamelHeaderMyLong", Long.class)); - assertEquals(map, exchange.getIn().getHeader("CamelHeaderMyMap", Map.class)); - assertEquals(map1, exchange.getIn().getHeader("CamelHeaderMyMap1", Map.class)); - assertEquals(map2, exchange.getIn().getHeader("CamelHeaderMyMap2", Map.class)); + assertTrue(exchange.getIn().getHeader("MyBoolean", Boolean.class)); + assertEquals(myByte, exchange.getIn().getHeader("MyByte", Byte.class)); + assertEquals(myFloat, exchange.getIn().getHeader("MyFloat", Float.class)); + assertEquals(myShort, exchange.getIn().getHeader("MyShort", Short.class)); + assertEquals(myDouble, exchange.getIn().getHeader("MyDouble", Double.class)); + assertEquals(myInteger, exchange.getIn().getHeader("MyInteger")); + assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class)); + assertEquals(map, exchange.getIn().getHeader("MyMap", Map.class)); + assertEquals(map1, exchange.getIn().getHeader("MyMap1", Map.class)); + assertEquals(map2, exchange.getIn().getHeader("MyMap2", Map.class)); camelSinkTask.stop(); } @@ -321,16 +321,16 @@ public class CamelSinkTaskTest { assertEquals(map, exchange.getProperties().get("CamelPropertyMyMap")); assertEquals(map1, exchange.getProperties().get("CamelPropertyMyMap1")); assertEquals(map2, exchange.getProperties().get("CamelPropertyMyMap2")); - assertTrue(exchange.getIn().getHeader("CamelHeaderMyBoolean", Boolean.class)); - assertEquals(myByte, exchange.getIn().getHeader("CamelHeaderMyByte", Byte.class)); - assertEquals(myFloat, exchange.getIn().getHeader("CamelHeaderMyFloat", Float.class)); - assertEquals(myShort, exchange.getIn().getHeader("CamelHeaderMyShort", Short.class)); - assertEquals(myDouble, exchange.getIn().getHeader("CamelHeaderMyDouble", Double.class)); - assertEquals(myInteger, exchange.getIn().getHeader("CamelHeaderMyInteger")); - assertEquals(myLong, exchange.getIn().getHeader("CamelHeaderMyLong", Long.class)); - assertEquals(map, exchange.getIn().getHeader("CamelHeaderMyMap", Map.class)); - assertEquals(map1, exchange.getIn().getHeader("CamelHeaderMyMap1", Map.class)); - assertEquals(map2, exchange.getIn().getHeader("CamelHeaderMyMap2", Map.class)); + assertTrue(exchange.getIn().getHeader("MyBoolean", Boolean.class)); + assertEquals(myByte, exchange.getIn().getHeader("MyByte", Byte.class)); + assertEquals(myFloat, exchange.getIn().getHeader("MyFloat", Float.class)); + assertEquals(myShort, exchange.getIn().getHeader("MyShort", Short.class)); + assertEquals(myDouble, exchange.getIn().getHeader("MyDouble", Double.class)); + assertEquals(myInteger, exchange.getIn().getHeader("MyInteger")); + assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class)); + assertEquals(map, exchange.getIn().getHeader("MyMap", Map.class)); + assertEquals(map1, exchange.getIn().getHeader("MyMap1", Map.class)); + assertEquals(map2, exchange.getIn().getHeader("MyMap2", Map.class)); camelSinkTask.stop(); } @@ -374,15 +374,15 @@ public class CamelSinkTaskTest { Exchange exchange = c.receive("seda:test", 1000L); assertEquals("camel", exchange.getMessage().getBody()); assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); - assertTrue(exchange.getIn().getHeader("CamelHeaderMyBoolean", Boolean.class)); - assertEquals(myByte, exchange.getIn().getHeader("CamelHeaderMyByte", Byte.class)); - assertEquals(myFloat, exchange.getIn().getHeader("CamelHeaderMyFloat", Float.class)); - assertEquals(myShort, exchange.getIn().getHeader("CamelHeaderMyShort", Short.class)); - assertEquals(myDouble, exchange.getIn().getHeader("CamelHeaderMyDouble", Double.class)); - assertEquals(myInteger, exchange.getIn().getHeader("CamelHeaderMyInteger")); - assertEquals(myLong, exchange.getIn().getHeader("CamelHeaderMyLong", Long.class)); - assertEquals(list, exchange.getIn().getHeader("CamelHeaderMyList", List.class)); - assertEquals(list1, exchange.getIn().getHeader("CamelHeaderMyList1", List.class)); + assertTrue(exchange.getIn().getHeader("MyBoolean", Boolean.class)); + assertEquals(myByte, exchange.getIn().getHeader("MyByte", Byte.class)); + assertEquals(myFloat, exchange.getIn().getHeader("MyFloat", Float.class)); + assertEquals(myShort, exchange.getIn().getHeader("MyShort", Short.class)); + assertEquals(myDouble, exchange.getIn().getHeader("MyDouble", Double.class)); + assertEquals(myInteger, exchange.getIn().getHeader("MyInteger")); + assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class)); + assertEquals(list, exchange.getIn().getHeader("MyList", List.class)); + assertEquals(list1, exchange.getIn().getHeader("MyList1", List.class)); camelSinkTask.stop(); } @@ -443,15 +443,15 @@ public class CamelSinkTaskTest { assertEquals(myLong, (Long) exchange.getProperties().get("CamelPropertyMyLong")); assertEquals(list, exchange.getProperties().get("CamelPropertyMyList")); assertEquals(list1, exchange.getProperties().get("CamelPropertyMyList1")); - assertTrue(exchange.getIn().getHeader("CamelHeaderMyBoolean", Boolean.class)); - assertEquals(myByte, exchange.getIn().getHeader("CamelHeaderMyByte", Byte.class)); - assertEquals(myFloat, exchange.getIn().getHeader("CamelHeaderMyFloat", Float.class)); - assertEquals(myShort, exchange.getIn().getHeader("CamelHeaderMyShort", Short.class)); - assertEquals(myDouble, exchange.getIn().getHeader("CamelHeaderMyDouble", Double.class)); - assertEquals(myInteger, exchange.getIn().getHeader("CamelHeaderMyInteger")); - assertEquals(myLong, exchange.getIn().getHeader("CamelHeaderMyLong", Long.class)); - assertEquals(list, exchange.getIn().getHeader("CamelHeaderMyList", List.class)); - assertEquals(list1, exchange.getIn().getHeader("CamelHeaderMyList1", List.class)); + assertTrue(exchange.getIn().getHeader("MyBoolean", Boolean.class)); + assertEquals(myByte, exchange.getIn().getHeader("MyByte", Byte.class)); + assertEquals(myFloat, exchange.getIn().getHeader("MyFloat", Float.class)); + assertEquals(myShort, exchange.getIn().getHeader("MyShort", Short.class)); + assertEquals(myDouble, exchange.getIn().getHeader("MyDouble", Double.class)); + assertEquals(myInteger, exchange.getIn().getHeader("MyInteger")); + assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class)); + assertEquals(list, exchange.getIn().getHeader("MyList", List.class)); + assertEquals(list1, exchange.getIn().getHeader("MyList1", List.class)); camelSinkTask.stop(); }