This is an automated email from the ASF dual-hosted git repository. igarashitm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 8c33b7beb1a CAMEL-18380: camel-kafka: Support using TypeConverter as a fallback in the header serializer (#8146) 8c33b7beb1a is described below commit 8c33b7beb1a544d2308090c48df3d75f34847de5 Author: Tomohisa Igarashi <tm.igara...@gmail.com> AuthorDate: Thu Aug 11 06:52:03 2022 -0400 CAMEL-18380: camel-kafka: Support using TypeConverter as a fallback in the header serializer (#8146) --- components/camel-kafka/pom.xml | 6 ++++++ .../kafka/serde/DefaultKafkaHeaderSerializer.java | 22 +++++++++++++++++++++- .../serde/DefaultKafkaHeaderSerializerTest.java | 6 +++++- 3 files changed, 32 insertions(+), 2 deletions(-) diff --git a/components/camel-kafka/pom.xml b/components/camel-kafka/pom.xml index b25d7acb729..2cf93c82245 100644 --- a/components/camel-kafka/pom.xml +++ b/components/camel-kafka/pom.xml @@ -76,6 +76,12 @@ <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-jackson</artifactId> + <scope>test</scope> + </dependency> + <!-- Required by the admin client--> <dependency> <groupId>org.apache.camel</groupId> diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/serde/DefaultKafkaHeaderSerializer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/serde/DefaultKafkaHeaderSerializer.java index c9d6b5f0fb7..79846df5843 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/serde/DefaultKafkaHeaderSerializer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/serde/DefaultKafkaHeaderSerializer.java @@ -18,12 +18,15 @@ package org.apache.camel.component.kafka.serde; import java.nio.ByteBuffer; +import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class DefaultKafkaHeaderSerializer implements KafkaHeaderSerializer { +public class DefaultKafkaHeaderSerializer implements KafkaHeaderSerializer, CamelContextAware { private static final Logger LOG = LoggerFactory.getLogger(DefaultKafkaHeaderSerializer.class); + private CamelContext camelContext; @Override public byte[] serialize(final String key, final Object value) { @@ -46,9 +49,26 @@ public class DefaultKafkaHeaderSerializer implements KafkaHeaderSerializer { } else if (value instanceof byte[]) { return (byte[]) value; } + if (camelContext != null) { + byte[] converted = camelContext.getTypeConverter().tryConvertTo(byte[].class, value); + if (converted != null) { + return converted; + } + } + LOG.debug("Cannot propagate header value of type[{}], skipping... " + "Supported types: String, Integer, Long, Double, byte[].", value != null ? value.getClass() : "null"); return null; } + + @Override + public CamelContext getCamelContext() { + return camelContext; + } + + @Override + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; + } } diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/serde/DefaultKafkaHeaderSerializerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/serde/DefaultKafkaHeaderSerializerTest.java index 69de631b0e3..9cfe65dc42f 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/serde/DefaultKafkaHeaderSerializerTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/serde/DefaultKafkaHeaderSerializerTest.java @@ -19,6 +19,8 @@ package org.apache.camel.component.kafka.serde; import java.util.Arrays; import java.util.Collection; +import com.fasterxml.jackson.databind.node.TextNode; +import org.apache.camel.impl.DefaultCamelContext; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; @@ -26,11 +28,12 @@ import static org.junit.jupiter.api.Assertions.assertArrayEquals; public class DefaultKafkaHeaderSerializerTest { - private KafkaHeaderSerializer serializer = new DefaultKafkaHeaderSerializer(); + private DefaultKafkaHeaderSerializer serializer = new DefaultKafkaHeaderSerializer(); @ParameterizedTest @MethodSource("primeNumbers") public void serialize(Object value, byte[] expectedResult) { + serializer.setCamelContext(new DefaultCamelContext()); byte[] result = serializer.serialize("someKey", value); assertArrayEquals(expectedResult, result); @@ -44,6 +47,7 @@ public class DefaultKafkaHeaderSerializerTest { { 22.0D, new byte[] { 64, 54, 0, 0, 0, 0, 0, 0 } }, // double { "someValue", "someValue".getBytes() }, // string { new byte[] { 0, 2, -43 }, new byte[] { 0, 2, -43 } }, // byte[] + { new TextNode("foo"), "foo".getBytes() }, // jackson TextNode { null, null }, // null { new Object(), null } // unknown // type