This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9046af27a1 Return interfaces in Segment processor framework classes 
instead of Implementations (#14252)
9046af27a1 is described below

commit 9046af27a1917f75ce3c0dfa4a140a959112fb8e
Author: Kartik Khare <kharekar...@gmail.com>
AuthorDate: Fri Oct 18 16:55:55 2024 +0530

    Return interfaces in Segment processor framework classes instead of 
Implementations (#14252)
    
    * change abstraction for writer.
    
    * fix segment fetcher for localfs.
    
    ---------
    
    Co-authored-by: Aishik <ais...@startree.ai>
---
 .../apache/pinot/common/utils/fetcher/PinotFSSegmentFetcher.java    | 6 +++++-
 .../apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java    | 3 +++
 .../core/segment/processing/genericrow/AdaptiveSizeBasedWriter.java | 4 ++--
 .../apache/pinot/core/segment/processing/mapper/SegmentMapper.java  | 4 ++--
 4 files changed, 12 insertions(+), 5 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/PinotFSSegmentFetcher.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/PinotFSSegmentFetcher.java
index b0bb4706a3..c11a0239c1 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/PinotFSSegmentFetcher.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/PinotFSSegmentFetcher.java
@@ -28,6 +28,10 @@ public class PinotFSSegmentFetcher extends 
BaseSegmentFetcher {
   @Override
   protected void fetchSegmentToLocalWithoutRetry(URI uri, File dest)
       throws Exception {
-    PinotFSFactory.create(uri.getScheme()).copyToLocalFile(uri, dest);
+    if (uri.getScheme() == null) {
+      
PinotFSFactory.create(PinotFSFactory.LOCAL_PINOT_FS_SCHEME).copyToLocalFile(uri,
 dest);
+    } else {
+      PinotFSFactory.create(uri.getScheme()).copyToLocalFile(uri, dest);
+    }
   }
 }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java
index 543db8c403..235c63bf0c 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java
@@ -106,6 +106,9 @@ public class SegmentFetcherFactory {
       return segmentFetcher;
     } else {
       LOGGER.info("Segment fetcher is not configured for protocol: {}, using 
default", protocol);
+      if (protocol == null) {
+        return PINOT_FS_SEGMENT_FETCHER;
+      }
       switch (protocol) {
         case CommonConstants.HTTP_PROTOCOL:
         case CommonConstants.HTTPS_PROTOCOL:
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/AdaptiveSizeBasedWriter.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/AdaptiveSizeBasedWriter.java
index 541bd14e26..d7db6509f3 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/AdaptiveSizeBasedWriter.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/AdaptiveSizeBasedWriter.java
@@ -22,7 +22,7 @@ import java.io.IOException;
 import org.apache.pinot.spi.data.readers.GenericRow;
 
 
-public class AdaptiveSizeBasedWriter implements 
AdaptiveConstraintsWriter<GenericRowFileWriter, GenericRow> {
+public class AdaptiveSizeBasedWriter implements 
AdaptiveConstraintsWriter<FileWriter<GenericRow>, GenericRow> {
 
   private final long _bytesLimit;
   private long _numBytesWritten;
@@ -45,7 +45,7 @@ public class AdaptiveSizeBasedWriter implements 
AdaptiveConstraintsWriter<Generi
   }
 
   @Override
-  public void write(GenericRowFileWriter writer, GenericRow row) throws 
IOException {
+  public void write(FileWriter<GenericRow> writer, GenericRow row) throws 
IOException {
     _numBytesWritten += writer.writeData(row);
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
index 49ae88b19f..cca0a0ef58 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
@@ -31,8 +31,8 @@ import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import 
org.apache.pinot.core.segment.processing.framework.SegmentProcessorConfig;
 import 
org.apache.pinot.core.segment.processing.genericrow.AdaptiveSizeBasedWriter;
+import org.apache.pinot.core.segment.processing.genericrow.FileWriter;
 import 
org.apache.pinot.core.segment.processing.genericrow.GenericRowFileManager;
-import 
org.apache.pinot.core.segment.processing.genericrow.GenericRowFileWriter;
 import org.apache.pinot.core.segment.processing.partitioner.Partitioner;
 import org.apache.pinot.core.segment.processing.partitioner.PartitionerConfig;
 import org.apache.pinot.core.segment.processing.partitioner.PartitionerFactory;
@@ -245,7 +245,7 @@ public class SegmentMapper {
     }
 
     // Get the file writer.
-    GenericRowFileWriter fileWriter = fileManager.getFileWriter();
+    FileWriter<GenericRow> fileWriter = fileManager.getFileWriter();
 
     // Write the row.
     _adaptiveSizeBasedWriter.write(fileWriter, row);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to