This is an automated email from the ASF dual-hosted git repository.

cdeppisch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-kamelets.git


The following commit(s) were added to refs/heads/main by this push:
     new 791443d1 fix: Avoid Class cast errors in TimestampRouter
791443d1 is described below

commit 791443d1fbf9a237d0c53e91da735f9790d8d4c0
Author: Christoph Deppisch <cdeppi...@redhat.com>
AuthorDate: Thu Jun 13 15:15:06 2024 +0200

    fix: Avoid Class cast errors in TimestampRouter
    
    - Avoids String to Long Class cast errors in TimestampRouter utility
    - Add test for timestamp-router-action Kamelet
---
 .../utils/transform/MessageTimestampRouter.java    | 22 ++++----
 .../kamelets/utils/transform/TimestampRouter.java  | 16 +++---
 .../resources/kafka/timestamp-router-pipe.yaml     | 60 ++++++++++++++++++++++
 .../test/resources/kafka/timestamp-router.feature  | 55 ++++++++++++++++++++
 .../src/test/resources/kafka/yaks-config.yaml      |  1 +
 5 files changed, 135 insertions(+), 19 deletions(-)

diff --git 
a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/MessageTimestampRouter.java
 
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/MessageTimestampRouter.java
index 55c81829..aac0c7f0 100644
--- 
a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/MessageTimestampRouter.java
+++ 
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/MessageTimestampRouter.java
@@ -16,14 +16,6 @@
  */
 package org.apache.camel.kamelets.utils.transform;
 
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.camel.Exchange;
-import org.apache.camel.ExchangeProperty;
-import org.apache.camel.component.kafka.KafkaConstants;
-import org.apache.camel.util.ObjectHelper;
-
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
@@ -36,6 +28,14 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeProperty;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.util.ObjectHelper;
+
 public class MessageTimestampRouter {
 
     public void process(@ExchangeProperty("topicFormat") String topicFormat, 
@ExchangeProperty("timestampFormat") String timestampFormat, 
@ExchangeProperty("timestampKeys") String timestampKeys, 
@ExchangeProperty("timestampKeyFormat") String timestampKeyFormat, Exchange ex) 
throws ParseException {
@@ -63,13 +63,13 @@ public class MessageTimestampRouter {
                break;
              }
         }
-        long timestamp;
+        Long timestamp = null;
         if (ObjectHelper.isNotEmpty(timestampKeyFormat) && 
ObjectHelper.isNotEmpty(rawTimestamp) && 
!timestampKeyFormat.equalsIgnoreCase("timestamp")) {
             final SimpleDateFormat timestampKeyFmt = new 
SimpleDateFormat(timestampKeyFormat);
             timestampKeyFmt.setTimeZone(TimeZone.getTimeZone("UTC"));
             timestamp = timestampKeyFmt.parse((String) rawTimestamp).getTime();
-        } else {
-            timestamp = Long.valueOf((String) rawTimestamp);
+        } else if (ObjectHelper.isNotEmpty(rawTimestamp)) {
+            timestamp = Long.parseLong(rawTimestamp.toString());
         }
         if (ObjectHelper.isNotEmpty(timestamp)) {
             final String formattedTimestamp = fmt.format(new Date(timestamp));
diff --git 
a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/TimestampRouter.java
 
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/TimestampRouter.java
index c0ac9b65..f102bc3d 100644
--- 
a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/TimestampRouter.java
+++ 
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/TimestampRouter.java
@@ -16,11 +16,6 @@
  */
 package org.apache.camel.kamelets.utils.transform;
 
-import org.apache.camel.Exchange;
-import org.apache.camel.ExchangeProperty;
-import org.apache.camel.component.kafka.KafkaConstants;
-import org.apache.camel.util.ObjectHelper;
-
 import java.text.SimpleDateFormat;
 import java.time.Instant;
 import java.util.Date;
@@ -28,6 +23,11 @@ import java.util.TimeZone;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeProperty;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.util.ObjectHelper;
+
 public class TimestampRouter {
 
     public void process(@ExchangeProperty("topicFormat") String topicFormat, 
@ExchangeProperty("timestampFormat") String timestampFormat, 
@ExchangeProperty("timestampHeaderName") String timestampHeaderName, Exchange 
ex) {
@@ -38,15 +38,15 @@ public class TimestampRouter {
         final SimpleDateFormat fmt = new SimpleDateFormat(timestampFormat);
         fmt.setTimeZone(TimeZone.getTimeZone("UTC"));
 
-        long timestamp;
+        Long timestamp = null;
         String topicName = ex.getMessage().getHeader(KafkaConstants.TOPIC, 
String.class);
         Object rawTimestamp = ex.getMessage().getHeader(timestampHeaderName);
         if (rawTimestamp instanceof Long) {
             timestamp = (Long) rawTimestamp;
         } else if (rawTimestamp instanceof Instant) {
             timestamp = ((Instant) rawTimestamp).toEpochMilli();
-        } else {
-            timestamp = (Long) rawTimestamp;
+        } else if (ObjectHelper.isNotEmpty(rawTimestamp)) {
+            timestamp = Long.parseLong(rawTimestamp.toString());
         }
         if (ObjectHelper.isNotEmpty(timestamp)) {
             final String formattedTimestamp = fmt.format(new Date(timestamp));
diff --git 
a/tests/camel-kamelets-itest/src/test/resources/kafka/timestamp-router-pipe.yaml
 
b/tests/camel-kamelets-itest/src/test/resources/kafka/timestamp-router-pipe.yaml
new file mode 100644
index 00000000..c47dde10
--- /dev/null
+++ 
b/tests/camel-kamelets-itest/src/test/resources/kafka/timestamp-router-pipe.yaml
@@ -0,0 +1,60 @@
+# ---------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ---------------------------------------------------------------------------
+
+apiVersion: camel.apache.org/v1
+kind: Pipe
+metadata:
+  name: timestamp-router-pipe
+spec:
+  source:
+    ref:
+      kind: Kamelet
+      apiVersion: camel.apache.org/v1
+      name: webhook-source
+    properties:
+      subpath: messages
+  steps:
+    - ref:
+        kind: Kamelet
+        apiVersion: camel.apache.org/v1
+        name: timestamp-router-action
+      properties:
+        topicFormat: $[topic]_$[timestamp]
+        timestampFormat: YYYY-MM-dd
+    - ref:
+        kind: Kamelet
+        apiVersion: camel.apache.org/v1
+        name: set-body-action
+      properties:
+        value: $simple{header[message]}
+    - ref:
+        kind: Kamelet
+        apiVersion: camel.apache.org/v1
+        name: log-action
+      properties:
+        showHeaders: true
+  sink:
+    ref:
+      kind: Kamelet
+      apiVersion: camel.apache.org/v1
+      name: kafka-sink
+    properties:
+      bootstrapServers: ${YAKS_TESTCONTAINERS_REDPANDA_LOCAL_BOOTSTRAP_SERVERS}
+      user: ${user}
+      password: ${password}
+      topic: dummy
+      securityProtocol: ${securityProtocol}
diff --git 
a/tests/camel-kamelets-itest/src/test/resources/kafka/timestamp-router.feature 
b/tests/camel-kamelets-itest/src/test/resources/kafka/timestamp-router.feature
new file mode 100644
index 00000000..f54c0c45
--- /dev/null
+++ 
b/tests/camel-kamelets-itest/src/test/resources/kafka/timestamp-router.feature
@@ -0,0 +1,55 @@
+# ---------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ---------------------------------------------------------------------------
+
+Feature: Kafka Timestamp Router
+
+  Background:
+    Given variable user is ""
+    Given variable password is ""
+    Given variables
+      | securityProtocol          | PLAINTEXT |
+      | topicName                 | my-topic |
+      | timestamp                 | yaks:unixTimestamp()000 |
+      | topic                     | 
${topicName}_yaks:currentDate('YYYY-MM-dd') |
+      | message                   | Camel K rocks! |
+    Given Kafka topic: ${topic}
+    Given Kafka topic partition: 0
+
+  Scenario: Create infrastructure
+    Given start Redpanda container
+
+  Scenario: Create Pipe
+    When load Pipe timestamp-router-pipe.yaml
+    Then Camel K integration timestamp-router-pipe should be running
+    Then Camel K integration timestamp-router-pipe should print Routes startup
+
+  Scenario: Receive message on Kafka topic and verify sink output
+    Given new Kafka connection
+      | url           | 
${YAKS_TESTCONTAINERS_REDPANDA_LOCAL_BOOTSTRAP_SERVERS} |
+      | consumerGroup | consumer-1                                             
 |
+    Given URL: yaks:resolveURL('timestamp-router-pipe',8080)
+    Given HTTP request query parameter kafka.TOPIC="${topicName}"
+    Given HTTP request query parameter kafka.TIMESTAMP="${timestamp}"
+    Given HTTP request query parameter message="yaks:urlEncode(${message})"
+    Given HTTP request fork mode is enabled
+    When send GET /messages
+    Then receive Kafka message with body: ${message}
+    And receive HTTP 200 OK
+
+  Scenario: Remove resources
+    Given delete Pipe timestamp-router-pipe
+    And stop Redpanda container
diff --git 
a/tests/camel-kamelets-itest/src/test/resources/kafka/yaks-config.yaml 
b/tests/camel-kamelets-itest/src/test/resources/kafka/yaks-config.yaml
index 63ba6c1d..df55ae63 100644
--- a/tests/camel-kamelets-itest/src/test/resources/kafka/yaks-config.yaml
+++ b/tests/camel-kamelets-itest/src/test/resources/kafka/yaks-config.yaml
@@ -39,6 +39,7 @@ config:
     resources:
       - kafka-source-pipe.yaml
       - kafka-sink-pipe.yaml
+      - timestamp-router-pipe.yaml
   dump:
     enabled: true
     failedOnly: true

Reply via email to