This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new e07b576797 Handle shaded classes in all methods of kafka factory 
(#13087)
e07b576797 is described below

commit e07b576797e2dc834a60d8ee0da5d26a0b597e23
Author: Kartik Khare <kharekar...@gmail.com>
AuthorDate: Mon May 6 18:12:07 2024 +0530

    Handle shaded classes in all methods of kafka factory (#13087)
    
    * Handle shaded classes in all methods of kafka factory
    
    * handle most ridiculous maven shading error
    
    ---------
    
    Co-authored-by: Kartik Khare <kharekartik@Kartiks-MacBook-Pro.local>
---
 .../plugin/stream/kafka20/KafkaConfigBackwardCompatibleUtils.java | 4 +++-
 .../apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java  | 8 +++++++-
 .../java/org/apache/pinot/spi/stream/StreamConsumerFactory.java   | 2 +-
 3 files changed, 11 insertions(+), 3 deletions(-)

diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConfigBackwardCompatibleUtils.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConfigBackwardCompatibleUtils.java
index 820984439a..14bc6dcb06 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConfigBackwardCompatibleUtils.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConfigBackwardCompatibleUtils.java
@@ -35,11 +35,13 @@ public class KafkaConfigBackwardCompatibleUtils {
    */
   public static void handleStreamConfig(StreamConfig streamConfig) {
     Map<String, String> streamConfigMap = streamConfig.getStreamConfigsMap();
+    //FIXME: This needs to be done because maven shade plugin also overwrites 
the constants in the classes
+    String prefixToReplace = 
KAFKA_COMMON_PACKAGE_PREFIX.replace(PINOT_SHADED_PACKAGE_PREFIX, "");
     for (Map.Entry<String, String> entry : streamConfigMap.entrySet()) {
       String[] valueParts = StringUtils.split(entry.getValue(), ' ');
       boolean updated = false;
       for (int i = 0; i < valueParts.length; i++) {
-        if (valueParts[i].startsWith(KAFKA_COMMON_PACKAGE_PREFIX)) {
+        if (valueParts[i].startsWith(prefixToReplace)) {
           try {
             Class.forName(valueParts[i]);
           } catch (ClassNotFoundException e1) {
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
index 0ca63f856d..0e3bca3f47 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
@@ -20,12 +20,19 @@ package org.apache.pinot.plugin.stream.kafka20;
 
 import org.apache.pinot.spi.stream.PartitionGroupConsumer;
 import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
+import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamConsumerFactory;
 import org.apache.pinot.spi.stream.StreamMetadataProvider;
 
 
 public class KafkaConsumerFactory extends StreamConsumerFactory {
 
+  @Override
+  protected void init(StreamConfig streamConfig) {
+    KafkaConfigBackwardCompatibleUtils.handleStreamConfig(streamConfig);
+    super.init(streamConfig);
+  }
+
   @Override
   public StreamMetadataProvider createPartitionMetadataProvider(String 
clientId, int partition) {
     return new KafkaStreamMetadataProvider(clientId, _streamConfig, partition);
@@ -39,7 +46,6 @@ public class KafkaConsumerFactory extends 
StreamConsumerFactory {
   @Override
   public PartitionGroupConsumer createPartitionGroupConsumer(String clientId,
       PartitionGroupConsumptionStatus partitionGroupConsumptionStatus) {
-    KafkaConfigBackwardCompatibleUtils.handleStreamConfig(_streamConfig);
     return new KafkaPartitionLevelConsumer(clientId, _streamConfig,
         partitionGroupConsumptionStatus.getPartitionGroupId());
   }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
index 5729dc2814..812b7b8e0f 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
@@ -31,7 +31,7 @@ public abstract class StreamConsumerFactory {
    * Initializes the stream consumer factory with the stream metadata for the 
table
    * @param streamConfig the stream config object from the table config
    */
-  void init(StreamConfig streamConfig) {
+  protected void init(StreamConfig streamConfig) {
     _streamConfig = streamConfig;
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to