This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 63c4c448e7 Handle kafka backward incompatible props for AWS (#14446)
63c4c448e7 is described below

commit 63c4c448e7b7ec32a0a6a00156de8a85ad4bc85c
Author: Kartik Khare <kharekar...@gmail.com>
AuthorDate: Fri Nov 15 08:54:37 2024 +0530

    Handle kafka backward incompatible props for AWS (#14446)
    
    * Handle kafka backward incompatible props
    
    * Fix refactoring
    
    ---------
    
    Co-authored-by: Kartik Khare 
<kharekar...@kartiks-macbook-pro.wyvern-sun.ts.net>
---
 .../kafka/KafkaConfigBackwardCompatibleUtils.java  | 23 +++++++++++++++++++++-
 1 file changed, 22 insertions(+), 1 deletion(-)

diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaConfigBackwardCompatibleUtils.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaConfigBackwardCompatibleUtils.java
index ede6b61369..65175d0af2 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaConfigBackwardCompatibleUtils.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaConfigBackwardCompatibleUtils.java
@@ -25,15 +25,18 @@ import java.io.InputStreamReader;
 import java.util.Map;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.spi.stream.StreamConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public class KafkaConfigBackwardCompatibleUtils {
 
   private KafkaConfigBackwardCompatibleUtils() {
   }
-
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(KafkaConfigBackwardCompatibleUtils.class);
   public static final String KAFKA_COMMON_PACKAGE_PREFIX = 
"org.apache.kafka.common";
   public static final String PINOT_SHADED_PACKAGE_PREFIX = 
"org.apache.pinot.shaded.";
+  public static final String AWS_PROPS_PREFIX = "software.amazon";
   public static final String SASL_JAAS_CONFIG = "sasl.jaas.config";
 
   /**
@@ -62,11 +65,29 @@ public class KafkaConfigBackwardCompatibleUtils {
               // Do nothing, shaded class is not found as well, keep the 
original class
             }
           }
+        } else if (valueParts[i].startsWith(PINOT_SHADED_PACKAGE_PREFIX + 
AWS_PROPS_PREFIX)) {
+          // Replace the AWS SDK classes with the shaded version
+          try {
+            Class.forName(valueParts[i]);
+          } catch (ClassNotFoundException e1) {
+            // If not, replace the class with the unshaded version
+            try {
+              String unShadedClassName = 
valueParts[i].replace(PINOT_SHADED_PACKAGE_PREFIX, "");
+              Class.forName(unShadedClassName);
+              valueParts[i] = unShadedClassName;
+              updated = true;
+            } catch (ClassNotFoundException e2) {
+              // Do nothing, shaded class is not found as well, keep the 
original class
+            }
+          }
         }
       }
 
       if (updated) {
+        String originalValue = entry.getValue();
         entry.setValue(String.join(" ", valueParts));
+        LOGGER.info("Updated stream config key: {} fromValue: {} toValue: {}", 
entry.getKey(), originalValue,
+            entry.getValue());
       }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to