This is an automated email from the ASF dual-hosted git repository. cdeppisch pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-kamelets.git
The following commit(s) were added to refs/heads/main by this push: new 791443d1 fix: Avoid Class cast errors in TimestampRouter 791443d1 is described below commit 791443d1fbf9a237d0c53e91da735f9790d8d4c0 Author: Christoph Deppisch <cdeppi...@redhat.com> AuthorDate: Thu Jun 13 15:15:06 2024 +0200 fix: Avoid Class cast errors in TimestampRouter - Avoids String to Long Class cast errors in TimestampRouter utility - Add test for timestamp-router-action Kamelet --- .../utils/transform/MessageTimestampRouter.java | 22 ++++---- .../kamelets/utils/transform/TimestampRouter.java | 16 +++--- .../resources/kafka/timestamp-router-pipe.yaml | 60 ++++++++++++++++++++++ .../test/resources/kafka/timestamp-router.feature | 55 ++++++++++++++++++++ .../src/test/resources/kafka/yaks-config.yaml | 1 + 5 files changed, 135 insertions(+), 19 deletions(-) diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/MessageTimestampRouter.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/MessageTimestampRouter.java index 55c81829..aac0c7f0 100644 --- a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/MessageTimestampRouter.java +++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/MessageTimestampRouter.java @@ -16,14 +16,6 @@ */ package org.apache.camel.kamelets.utils.transform; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.camel.Exchange; -import org.apache.camel.ExchangeProperty; -import org.apache.camel.component.kafka.KafkaConstants; -import org.apache.camel.util.ObjectHelper; - import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; @@ -36,6 +28,14 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangeProperty; +import org.apache.camel.component.kafka.KafkaConstants; +import org.apache.camel.util.ObjectHelper; + public class MessageTimestampRouter { public void process(@ExchangeProperty("topicFormat") String topicFormat, @ExchangeProperty("timestampFormat") String timestampFormat, @ExchangeProperty("timestampKeys") String timestampKeys, @ExchangeProperty("timestampKeyFormat") String timestampKeyFormat, Exchange ex) throws ParseException { @@ -63,13 +63,13 @@ public class MessageTimestampRouter { break; } } - long timestamp; + Long timestamp = null; if (ObjectHelper.isNotEmpty(timestampKeyFormat) && ObjectHelper.isNotEmpty(rawTimestamp) && !timestampKeyFormat.equalsIgnoreCase("timestamp")) { final SimpleDateFormat timestampKeyFmt = new SimpleDateFormat(timestampKeyFormat); timestampKeyFmt.setTimeZone(TimeZone.getTimeZone("UTC")); timestamp = timestampKeyFmt.parse((String) rawTimestamp).getTime(); - } else { - timestamp = Long.valueOf((String) rawTimestamp); + } else if (ObjectHelper.isNotEmpty(rawTimestamp)) { + timestamp = Long.parseLong(rawTimestamp.toString()); } if (ObjectHelper.isNotEmpty(timestamp)) { final String formattedTimestamp = fmt.format(new Date(timestamp)); diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/TimestampRouter.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/TimestampRouter.java index c0ac9b65..f102bc3d 100644 --- a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/TimestampRouter.java +++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/TimestampRouter.java @@ -16,11 +16,6 @@ */ package org.apache.camel.kamelets.utils.transform; -import org.apache.camel.Exchange; -import org.apache.camel.ExchangeProperty; -import org.apache.camel.component.kafka.KafkaConstants; -import org.apache.camel.util.ObjectHelper; - import java.text.SimpleDateFormat; import java.time.Instant; import java.util.Date; @@ -28,6 +23,11 @@ import java.util.TimeZone; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangeProperty; +import org.apache.camel.component.kafka.KafkaConstants; +import org.apache.camel.util.ObjectHelper; + public class TimestampRouter { public void process(@ExchangeProperty("topicFormat") String topicFormat, @ExchangeProperty("timestampFormat") String timestampFormat, @ExchangeProperty("timestampHeaderName") String timestampHeaderName, Exchange ex) { @@ -38,15 +38,15 @@ public class TimestampRouter { final SimpleDateFormat fmt = new SimpleDateFormat(timestampFormat); fmt.setTimeZone(TimeZone.getTimeZone("UTC")); - long timestamp; + Long timestamp = null; String topicName = ex.getMessage().getHeader(KafkaConstants.TOPIC, String.class); Object rawTimestamp = ex.getMessage().getHeader(timestampHeaderName); if (rawTimestamp instanceof Long) { timestamp = (Long) rawTimestamp; } else if (rawTimestamp instanceof Instant) { timestamp = ((Instant) rawTimestamp).toEpochMilli(); - } else { - timestamp = (Long) rawTimestamp; + } else if (ObjectHelper.isNotEmpty(rawTimestamp)) { + timestamp = Long.parseLong(rawTimestamp.toString()); } if (ObjectHelper.isNotEmpty(timestamp)) { final String formattedTimestamp = fmt.format(new Date(timestamp)); diff --git a/tests/camel-kamelets-itest/src/test/resources/kafka/timestamp-router-pipe.yaml b/tests/camel-kamelets-itest/src/test/resources/kafka/timestamp-router-pipe.yaml new file mode 100644 index 00000000..c47dde10 --- /dev/null +++ b/tests/camel-kamelets-itest/src/test/resources/kafka/timestamp-router-pipe.yaml @@ -0,0 +1,60 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- + +apiVersion: camel.apache.org/v1 +kind: Pipe +metadata: + name: timestamp-router-pipe +spec: + source: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: webhook-source + properties: + subpath: messages + steps: + - ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: timestamp-router-action + properties: + topicFormat: $[topic]_$[timestamp] + timestampFormat: YYYY-MM-dd + - ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: set-body-action + properties: + value: $simple{header[message]} + - ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: log-action + properties: + showHeaders: true + sink: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: kafka-sink + properties: + bootstrapServers: ${YAKS_TESTCONTAINERS_REDPANDA_LOCAL_BOOTSTRAP_SERVERS} + user: ${user} + password: ${password} + topic: dummy + securityProtocol: ${securityProtocol} diff --git a/tests/camel-kamelets-itest/src/test/resources/kafka/timestamp-router.feature b/tests/camel-kamelets-itest/src/test/resources/kafka/timestamp-router.feature new file mode 100644 index 00000000..f54c0c45 --- /dev/null +++ b/tests/camel-kamelets-itest/src/test/resources/kafka/timestamp-router.feature @@ -0,0 +1,55 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- + +Feature: Kafka Timestamp Router + + Background: + Given variable user is "" + Given variable password is "" + Given variables + | securityProtocol | PLAINTEXT | + | topicName | my-topic | + | timestamp | yaks:unixTimestamp()000 | + | topic | ${topicName}_yaks:currentDate('YYYY-MM-dd') | + | message | Camel K rocks! | + Given Kafka topic: ${topic} + Given Kafka topic partition: 0 + + Scenario: Create infrastructure + Given start Redpanda container + + Scenario: Create Pipe + When load Pipe timestamp-router-pipe.yaml + Then Camel K integration timestamp-router-pipe should be running + Then Camel K integration timestamp-router-pipe should print Routes startup + + Scenario: Receive message on Kafka topic and verify sink output + Given new Kafka connection + | url | ${YAKS_TESTCONTAINERS_REDPANDA_LOCAL_BOOTSTRAP_SERVERS} | + | consumerGroup | consumer-1 | + Given URL: yaks:resolveURL('timestamp-router-pipe',8080) + Given HTTP request query parameter kafka.TOPIC="${topicName}" + Given HTTP request query parameter kafka.TIMESTAMP="${timestamp}" + Given HTTP request query parameter message="yaks:urlEncode(${message})" + Given HTTP request fork mode is enabled + When send GET /messages + Then receive Kafka message with body: ${message} + And receive HTTP 200 OK + + Scenario: Remove resources + Given delete Pipe timestamp-router-pipe + And stop Redpanda container diff --git a/tests/camel-kamelets-itest/src/test/resources/kafka/yaks-config.yaml b/tests/camel-kamelets-itest/src/test/resources/kafka/yaks-config.yaml index 63ba6c1d..df55ae63 100644 --- a/tests/camel-kamelets-itest/src/test/resources/kafka/yaks-config.yaml +++ b/tests/camel-kamelets-itest/src/test/resources/kafka/yaks-config.yaml @@ -39,6 +39,7 @@ config: resources: - kafka-source-pipe.yaml - kafka-sink-pipe.yaml + - timestamp-router-pipe.yaml dump: enabled: true failedOnly: true