This is an automated email from the ASF dual-hosted git repository. kharekartik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 63c4c448e7 Handle kafka backward incompatible props for AWS (#14446) 63c4c448e7 is described below commit 63c4c448e7b7ec32a0a6a00156de8a85ad4bc85c Author: Kartik Khare <kharekar...@gmail.com> AuthorDate: Fri Nov 15 08:54:37 2024 +0530 Handle kafka backward incompatible props for AWS (#14446) * Handle kafka backward incompatible props * Fix refactoring --------- Co-authored-by: Kartik Khare <kharekar...@kartiks-macbook-pro.wyvern-sun.ts.net> --- .../kafka/KafkaConfigBackwardCompatibleUtils.java | 23 +++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaConfigBackwardCompatibleUtils.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaConfigBackwardCompatibleUtils.java index ede6b61369..65175d0af2 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaConfigBackwardCompatibleUtils.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaConfigBackwardCompatibleUtils.java @@ -25,15 +25,18 @@ import java.io.InputStreamReader; import java.util.Map; import org.apache.commons.lang3.StringUtils; import org.apache.pinot.spi.stream.StreamConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class KafkaConfigBackwardCompatibleUtils { private KafkaConfigBackwardCompatibleUtils() { } - + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConfigBackwardCompatibleUtils.class); public static final String KAFKA_COMMON_PACKAGE_PREFIX = "org.apache.kafka.common"; public static final String PINOT_SHADED_PACKAGE_PREFIX = "org.apache.pinot.shaded."; + public static final String AWS_PROPS_PREFIX = "software.amazon"; public static final String SASL_JAAS_CONFIG = "sasl.jaas.config"; /** @@ -62,11 +65,29 @@ public class KafkaConfigBackwardCompatibleUtils { // Do nothing, shaded class is not found as well, keep the original class } } + } else if (valueParts[i].startsWith(PINOT_SHADED_PACKAGE_PREFIX + AWS_PROPS_PREFIX)) { + // Replace the AWS SDK classes with the shaded version + try { + Class.forName(valueParts[i]); + } catch (ClassNotFoundException e1) { + // If not, replace the class with the unshaded version + try { + String unShadedClassName = valueParts[i].replace(PINOT_SHADED_PACKAGE_PREFIX, ""); + Class.forName(unShadedClassName); + valueParts[i] = unShadedClassName; + updated = true; + } catch (ClassNotFoundException e2) { + // Do nothing, shaded class is not found as well, keep the original class + } + } } } if (updated) { + String originalValue = entry.getValue(); entry.setValue(String.join(" ", valueParts)); + LOGGER.info("Updated stream config key: {} fromValue: {} toValue: {}", entry.getKey(), originalValue, + entry.getValue()); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org