This is an automated email from the ASF dual-hosted git repository.

ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 26709271f2 Move untar method to BaseTaskExecutor and untar with 
peerDownload segment if deepstore corrupted (#13964)
26709271f2 is described below

commit 26709271f2be1e82eb02478dc0a1bdaa1dd67970
Author: Pratik Tibrewal <tibrewalpra...@uber.com>
AuthorDate: Thu Sep 12 10:52:05 2024 +0530

    Move untar method to BaseTaskExecutor and untar with peerDownload segment 
if deepstore corrupted (#13964)
---
 .../BaseMultipleSegmentsConversionExecutor.java     | 19 +++++--------------
 .../tasks/BaseSingleSegmentConversionExecutor.java  | 19 ++++++-------------
 .../pinot/plugin/minion/tasks/BaseTaskExecutor.java | 21 ++++++++++++++++++---
 3 files changed, 29 insertions(+), 30 deletions(-)

diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
index a00883f34e..f16d93b151 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
@@ -200,30 +200,21 @@ public abstract class 
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
 
       for (int i = 0; i < downloadURLs.length; i++) {
         String segmentName = segmentNames[i];
-        // Download the segment file
+        // Download and decompress the segment file
         _eventObserver.notifyProgress(_pinotTaskConfig,
-            String.format("Downloading segment from: %s (%d out of %d)", 
downloadURLs[i], (i + 1),
+            String.format("Downloading and decompressing segment from: %s (%d 
out of %d)", downloadURLs[i], (i + 1),
                 downloadURLs.length));
-        File tarredSegmentFile = new File(tempDataDir, "tarredSegmentFile_" + 
i);
+        File indexDir;
         try {
-          downloadSegmentToLocal(tableNameWithType, segmentName, 
downloadURLs[i], taskType, tarredSegmentFile);
+          indexDir = downloadSegmentToLocalAndUntar(tableNameWithType, 
segmentName, downloadURLs[i], taskType,
+              tempDataDir, "_" + i);
         } catch (Exception e) {
           LOGGER.error("Failed to download segment from download url: {}", 
downloadURLs[i], e);
           _minionMetrics.addMeteredTableValue(tableNameWithType, 
MinionMeter.SEGMENT_DOWNLOAD_FAIL_COUNT, 1L);
           _eventObserver.notifyTaskError(_pinotTaskConfig, e);
           throw e;
         }
-
-        // Un-tar the segment file
-        _eventObserver.notifyProgress(_pinotTaskConfig,
-            String.format("Decompressing segment from: %s (%d out of %d)", 
downloadURLs[i], (i + 1),
-                downloadURLs.length));
-        File segmentDir = new File(tempDataDir, "segmentDir_" + i);
-        File indexDir = TarCompressionUtils.untar(tarredSegmentFile, 
segmentDir).get(0);
         inputSegmentDirs.add(indexDir);
-        if (!FileUtils.deleteQuietly(tarredSegmentFile)) {
-          LOGGER.warn("Failed to delete tarred input segment: {}", 
tarredSegmentFile.getAbsolutePath());
-        }
 
         reportSegmentDownloadMetrics(indexDir, tableNameWithType, taskType);
         SegmentMetadataImpl segmentMetadata = new 
SegmentMetadataImpl(indexDir);
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
index c75fa99bf8..e35a9c5cd5 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
@@ -97,12 +97,13 @@ public abstract class BaseSingleSegmentConversionExecutor 
extends BaseTaskExecut
     File tempDataDir = new File(new File(MINION_CONTEXT.getDataDir(), 
taskType), "tmp-" + UUID.randomUUID());
     Preconditions.checkState(tempDataDir.mkdirs(), "Failed to create temporary 
directory: %s", tempDataDir);
     try {
-      // Download the tarred segment file
-      _eventObserver.notifyProgress(_pinotTaskConfig, "Downloading segment 
from: " + downloadURL);
-      File tarredSegmentFile = new File(tempDataDir, "tarredSegment");
-      LOGGER.info("Downloading segment from {} to {}", downloadURL, 
tarredSegmentFile.getAbsolutePath());
+      // Download and decompress the segment file
+      _eventObserver.notifyProgress(_pinotTaskConfig, "Downloading and 
decompressing segment from: "
+          + downloadURL);
+      File indexDir;
       try {
-        downloadSegmentToLocal(tableNameWithType, segmentName, downloadURL, 
taskType, tarredSegmentFile);
+        indexDir = downloadSegmentToLocalAndUntar(tableNameWithType, 
segmentName, downloadURL, taskType,
+            tempDataDir, "");
       } catch (Exception e) {
         LOGGER.error("Failed to download segment from download url: {}", 
downloadURL, e);
         _minionMetrics.addMeteredTableValue(tableNameWithType, 
MinionMeter.SEGMENT_DOWNLOAD_FAIL_COUNT, 1L);
@@ -110,14 +111,6 @@ public abstract class BaseSingleSegmentConversionExecutor 
extends BaseTaskExecut
         throw e;
       }
 
-      // Un-tar the segment file
-      _eventObserver.notifyProgress(_pinotTaskConfig, "Decompressing segment 
from: " + downloadURL);
-      File segmentDir = new File(tempDataDir, "segmentDir");
-      File indexDir = TarCompressionUtils.untar(tarredSegmentFile, 
segmentDir).get(0);
-      if (!FileUtils.deleteQuietly(tarredSegmentFile)) {
-        LOGGER.warn("Failed to delete tarred input segment: {}", 
tarredSegmentFile.getAbsolutePath());
-      }
-
       // Publish metrics related to segment download
       reportSegmentDownloadMetrics(indexDir, tableNameWithType, taskType);
 
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseTaskExecutor.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseTaskExecutor.java
index a33b2c4005..c61a5b58d9 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseTaskExecutor.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseTaskExecutor.java
@@ -29,6 +29,7 @@ import 
org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import 
org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
 import org.apache.pinot.common.metrics.MinionMeter;
 import org.apache.pinot.common.metrics.MinionMetrics;
+import org.apache.pinot.common.utils.TarCompressionUtils;
 import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
 import org.apache.pinot.core.minion.PinotTaskConfig;
 import org.apache.pinot.core.util.PeerServerSegmentFinder;
@@ -110,20 +111,27 @@ public abstract class BaseTaskExecutor implements 
PinotTaskExecutor {
     _minionMetrics.addMeteredTableValue(tableName, taskType, meter, unitCount);
   }
 
-  protected void downloadSegmentToLocal(String tableNameWithType, String 
segmentName, String deepstoreURL,
-      String taskType, File tarredSegmentFile)
+  protected File downloadSegmentToLocalAndUntar(String tableNameWithType, 
String segmentName, String deepstoreURL,
+      String taskType, File tempDataDir, String suffix)
       throws Exception {
-    LOGGER.info("Downloading segment {} from {} to {}", segmentName, 
deepstoreURL, tarredSegmentFile.getAbsolutePath());
+    File tarredSegmentFile = new File(tempDataDir, "tarredSegmentFile" + 
suffix);
+    File segmentDir = new File(tempDataDir, "segmentDir" + suffix);
+    File indexDir;
     TableConfig tableConfig = getTableConfig(tableNameWithType);
     String crypterName = 
tableConfig.getValidationConfig().getCrypterClassName();
+    LOGGER.info("Downloading segment {} from {} to {}", segmentName, 
deepstoreURL, tarredSegmentFile.getAbsolutePath());
+
     try {
       // download from deepstore first
       SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(deepstoreURL, 
tarredSegmentFile, crypterName);
+      // untar the segment file
+      indexDir = TarCompressionUtils.untar(tarredSegmentFile, 
segmentDir).get(0);
     } catch (Exception e) {
       LOGGER.error("Segment download failed from deepstore for {}, 
crypter:{}", deepstoreURL, crypterName, e);
       String peerDownloadScheme = 
tableConfig.getValidationConfig().getPeerSegmentDownloadScheme();
       if (MinionTaskUtils.extractMinionAllowDownloadFromServer(tableConfig, 
taskType,
           MINION_CONTEXT.isAllowDownloadFromServer()) && peerDownloadScheme != 
null) {
+        // if allowDownloadFromServer is enabled, download the segment from a 
peer server as deepstore download failed
         LOGGER.info("Trying to download from servers for segment {} post 
deepstore download failed", segmentName);
         SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(segmentName, 
peerDownloadScheme, () -> {
           List<URI> uris =
@@ -132,9 +140,16 @@ public abstract class BaseTaskExecutor implements 
PinotTaskExecutor {
           Collections.shuffle(uris);
           return uris;
         }, tarredSegmentFile, crypterName);
+        // untar the segment file
+        indexDir = TarCompressionUtils.untar(tarredSegmentFile, 
segmentDir).get(0);
       } else {
         throw e;
       }
+    } finally {
+      if (!FileUtils.deleteQuietly(tarredSegmentFile)) {
+        LOGGER.warn("Failed to delete tarred input segment: {}", 
tarredSegmentFile.getAbsolutePath());
+      }
     }
+    return indexDir;
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to