This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new ecdfbbfe58c CAMEL-20701: added a custom deserializer to be used when 
interoperating with JMS
ecdfbbfe58c is described below

commit ecdfbbfe58cd648c2efada77d1536aec6f8d929f
Author: Otavio Rodolfo Piske <angusyo...@gmail.com>
AuthorDate: Fri Apr 19 10:06:01 2024 +0200

    CAMEL-20701: added a custom deserializer to be used when interoperating 
with JMS
---
 .../camel-kafka/src/main/docs/kafka-component.adoc | 30 ++++++++-
 .../consumer/support/interop/JMSDeserializer.java  | 78 ++++++++++++++++++++++
 2 files changed, 107 insertions(+), 1 deletion(-)

diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc 
b/components/camel-kafka/src/main/docs/kafka-component.adoc
index ae166242489..a00f0e78863 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -656,13 +656,41 @@ public class CustomSubscribeAdapter implements 
SubscribeAdapter {
 
 Then, it is necessary to add it as named bean instance to the registry:
 
-
 [source,java]
 .Add to registry example
 ----
 context.getRegistry().bind(KafkaConstants.KAFKA_SUBSCRIBE_ADAPTER, new 
CustomSubscribeAdapter());
 ----
 
+== Interoperability
+
+=== JMS
+
+When interoperating Kafka and JMS, it may be necessary to coerce the JMS 
headers into their expected type.
+
+For instance, when consuming messages from Kafka carrying JMS headers and then 
sending them to a JMS broker, those headers are
+first deserialized into a byte array. Then, the `camel-jms` component tries to 
coerce this byte array into the
+specific type used by.
+However, both the origin endpoint as well as how this was setup on the code 
itsef may affect how the data is serialized and
+deserialized. As such, it is not feasible to naively assume the data type of 
the byte array.
+
+To address this issue, we provide a custom header deserializer to force Kafka 
to de-serialize the JMS data according to
+the JMS specification. This approach ensures that the headers are properly 
interpreted and processed by the camel-jms component.
+
+Due to the inherent complexity of each possible system and endpoint, it may 
not be possible for this deserializer to cover all
+possible scenarios. As such, it is provided as model that can be modified 
and/or adapted for the specific needs of each application.
+
+To utilize this solution, you need to modify the route URI on the consumer end 
of the pipeline by including the
+`headerDeserializer` option.
+For example:
+
+[source,java]
+.Route snippet
+----
+from("kafka:topic?headerDeserializer=#class:org.apache.camel.component.kafka.consumer.support.interop.JMSDeserializer")
+    .to("...");
+----
+
 include::spring-boot:partial$starter.adoc[]
 
 
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/interop/JMSDeserializer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/interop/JMSDeserializer.java
new file mode 100644
index 00000000000..61a489d81af
--- /dev/null
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/interop/JMSDeserializer.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.kafka.consumer.support.interop;
+
+import java.nio.ByteBuffer;
+
+import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer;
+
+public class JMSDeserializer implements KafkaHeaderDeserializer {
+
+    public boolean isLong(byte[] bytes) {
+        return bytes.length == Long.BYTES;
+    }
+
+    private static long bytesToLong(byte[] bytes) {
+        final ByteBuffer buffer = toByteBuffer(bytes, Long.BYTES);
+        return buffer.getLong();
+    }
+
+    private static int bytesToInt(byte[] bytes) {
+        final ByteBuffer buffer = toByteBuffer(bytes, Integer.BYTES);
+        return buffer.getInt();
+    }
+
+    private static ByteBuffer toByteBuffer(byte[] bytes, int size) {
+        ByteBuffer buffer = ByteBuffer.allocate(size);
+        buffer.put(bytes);
+        buffer.flip();
+        return buffer;
+    }
+
+    @Override
+    public Object deserialize(String key, byte[] value) {
+        if (key.startsWith("JMS")) {
+            switch (key) {
+                case "JMSDestination":
+                    return new String(value);
+                case "JMSDeliveryMode":
+                    return bytesToInt(value);
+                case "JMSTimestamp":
+                    return bytesToLong(value);
+                case "JMSCorrelationID":
+                    return value;
+                case "JMSReplyTo":
+                    return new String(value);
+                case "JMSRedelivered":
+                    return bytesToInt(value);
+                case "JMSType":
+                    return new String(value);
+                case "JMSExpiration":
+                    return isLong(value) ? bytesToLong(value) : 
bytesToInt(value);
+                case "JMSPriority":
+                    return bytesToInt(value);
+                case "JMSMessageID":
+                    return new String(value);
+                default:
+                    return value;
+            }
+        }
+
+        return value;
+    }
+}

Reply via email to