This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 65f658d0dc Improve async temp segment delete in validation manager 
(#14339)
65f658d0dc is described below

commit 65f658d0dc38b73d65a9b22a4f7a206808f4c18f
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Wed Oct 30 20:46:54 2024 -0700

    Improve async temp segment delete in validation manager (#14339)
---
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 87 ++++++++++++----------
 .../RealtimeSegmentValidationManager.java          | 18 +++--
 .../PinotLLCRealtimeSegmentManagerTest.java        | 13 ++--
 .../manager/realtime/SegmentCompletionUtils.java   |  3 -
 4 files changed, 64 insertions(+), 57 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 7a459d7ddb..91e4bdab2e 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -21,6 +21,7 @@ package org.apache.pinot.controller.helix.core.realtime;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.net.URI;
 import java.sql.Timestamp;
@@ -1592,67 +1593,71 @@ public class PinotLLCRealtimeSegmentManager {
 
   /**
    * Delete tmp segments for realtime table with low level consumer, split 
commit and async deletion is enabled.
-   * @param tableNameWithType
-   * @param segmentsZKMetadata
    * @return number of deleted orphan temporary segments
-   *
    */
-  public long deleteTmpSegments(String tableNameWithType, 
List<SegmentZKMetadata> segmentsZKMetadata) {
+  public int deleteTmpSegments(String realtimeTableName, 
List<SegmentZKMetadata> segmentsZKMetadata)
+      throws IOException {
     Preconditions.checkState(!_isStopping, "Segment manager is stopping");
 
-    if (!TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
-      return 0L;
-    }
-
-    TableConfig tableConfig = 
_helixResourceManager.getTableConfig(tableNameWithType);
-    if (tableConfig == null) {
-      LOGGER.warn("Failed to find table config for table: {}, skipping 
deletion of tmp segments", tableNameWithType);
-      return 0L;
-    }
-
-    if (!isTmpSegmentAsyncDeletionEnabled()) {
-      return 0L;
+    // NOTE: Do not delete the file if it is used as download URL. This could 
happen when user uses temporary file to
+    // backfill segment.
+    Set<String> downloadUrls = 
Sets.newHashSetWithExpectedSize(segmentsZKMetadata.size());
+    for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) {
+      if (segmentZKMetadata.getStatus() == Status.DONE) {
+        downloadUrls.add(segmentZKMetadata.getDownloadUrl());
+      }
     }
 
-    Set<String> deepURIs = segmentsZKMetadata.stream().filter(meta -> 
meta.getStatus() == Status.DONE
-        && 
!CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(meta.getDownloadUrl())).map(
-        SegmentZKMetadata::getDownloadUrl).collect(
-        Collectors.toSet());
-
-    String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+    String rawTableName = 
TableNameBuilder.extractRawTableName(realtimeTableName);
     URI tableDirURI = URIUtils.getUri(_controllerConf.getDataDir(), 
rawTableName);
     PinotFS pinotFS = PinotFSFactory.create(tableDirURI.getScheme());
-    long deletedTmpSegments = 0;
-    try {
-      for (String filePath : pinotFS.listFiles(tableDirURI, false)) {
-        // prepend scheme
+    int numDeletedTmpSegments = 0;
+    for (String filePath : pinotFS.listFiles(tableDirURI, false)) {
+      if (isTmpAndCanDelete(filePath, downloadUrls, pinotFS)) {
         URI uri = URIUtils.getUri(filePath);
-        if (isTmpAndCanDelete(uri, deepURIs, pinotFS)) {
-          LOGGER.info("Deleting temporary segment file: {}", uri);
+        String canonicalPath = uri.toString();
+        LOGGER.info("Deleting temporary segment file: {}", canonicalPath);
+        try {
           if (pinotFS.delete(uri, true)) {
-            LOGGER.info("Succeed to delete file: {}", uri);
-            deletedTmpSegments++;
+            LOGGER.info("Deleted temporary segment file: {}", canonicalPath);
+            numDeletedTmpSegments++;
           } else {
-            LOGGER.warn("Failed to delete file: {}", uri);
+            LOGGER.warn("Failed to delete temporary segment file: {}", 
canonicalPath);
           }
+        } catch (Exception e) {
+          LOGGER.error("Caught exception while deleting temporary segment 
file: {}", canonicalPath, e);
         }
       }
-    } catch (Exception e) {
-      LOGGER.warn("Caught exception while deleting temporary files for table: 
{}", rawTableName, e);
     }
-    return deletedTmpSegments;
+    return numDeletedTmpSegments;
   }
 
-  private boolean isTmpAndCanDelete(URI uri, Set<String> deepURIs, PinotFS 
pinotFS)
-      throws Exception {
-    long lastModified = pinotFS.lastModified(uri);
+  private boolean isTmpAndCanDelete(String filePath, Set<String> downloadUrls, 
PinotFS pinotFS) {
+    if (!SegmentCompletionUtils.isTmpFile(filePath)) {
+      return false;
+    }
+    // Prepend scheme
+    URI uri = URIUtils.getUri(filePath);
+    String canonicalPath = uri.toString();
+    // NOTE: Do not delete the file if it is used as download URL. This could 
happen when user uses temporary file to
+    // backfill segment.
+    if (downloadUrls.contains(canonicalPath)) {
+      return false;
+    }
+    long lastModified;
+    try {
+      lastModified = pinotFS.lastModified(uri);
+    } catch (Exception e) {
+      LOGGER.error("Caught exception while getting last modified time for 
file: {}, ineligible for delete",
+          canonicalPath, e);
+      return false;
+    }
     if (lastModified <= 0) {
-      LOGGER.warn("file {} modification time {} is not positive, ineligible 
for delete", uri.toString(), lastModified);
+      LOGGER.warn("Last modified time for file: {} is not positive: {}, 
ineligible for delete", canonicalPath,
+          lastModified);
       return false;
     }
-    String uriString = uri.toString();
-    return SegmentCompletionUtils.isTmpFile(uriString) && 
!deepURIs.contains(uriString)
-        && getCurrentTimeMs() - lastModified > 
_controllerConf.getTmpSegmentRetentionInSeconds() * 1000L;
+    return getCurrentTimeMs() - lastModified > 
_controllerConf.getTmpSegmentRetentionInSeconds() * 1000L;
   }
 
   /**
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index b8460a406a..88f1bc6ee6 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -153,13 +153,17 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<Rea
     List<SegmentZKMetadata> segmentsZKMetadata = 
_pinotHelixResourceManager.getSegmentsZKMetadata(realtimeTableName);
 
     // Delete tmp segments
-    try {
-      long numDeleteTmpSegments = 
_llcRealtimeSegmentManager.deleteTmpSegments(realtimeTableName, 
segmentsZKMetadata);
-      LOGGER.info("Deleted {} tmp segments for table: {}", 
numDeleteTmpSegments, realtimeTableName);
-      _controllerMetrics.addMeteredTableValue(realtimeTableName, 
ControllerMeter.DELETED_TMP_SEGMENT_COUNT,
-          numDeleteTmpSegments);
-    } catch (Exception e) {
-      LOGGER.error("Failed to delete tmp segments for table: {}", 
realtimeTableName, e);
+    if (_llcRealtimeSegmentManager.isTmpSegmentAsyncDeletionEnabled()) {
+      try {
+        long startTimeMs = System.currentTimeMillis();
+        int numDeletedTmpSegments = 
_llcRealtimeSegmentManager.deleteTmpSegments(realtimeTableName, 
segmentsZKMetadata);
+        LOGGER.info("Deleted {} tmp segments for table: {} in {}ms", 
numDeletedTmpSegments, realtimeTableName,
+            System.currentTimeMillis() - startTimeMs);
+        _controllerMetrics.addMeteredTableValue(realtimeTableName, 
ControllerMeter.DELETED_TMP_SEGMENT_COUNT,
+            numDeletedTmpSegments);
+      } catch (Exception e) {
+        LOGGER.error("Failed to delete tmp segments for table: {}", 
realtimeTableName, e);
+      }
     }
 
     // Update the total document count gauge
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 435303f5e9..bed25bb16c 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -1046,7 +1046,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
   }
 
   @Test
-  public void testDeleteTmpSegmentFiles() throws Exception {
+  public void testDeleteTmpSegmentFiles()
+      throws Exception {
     // turn on knobs for async deletion of tmp files
     ControllerConf config = new ControllerConf();
     config.setDataDir(TEMP_DIR.toString());
@@ -1069,19 +1070,19 @@ public class PinotLLCRealtimeSegmentManagerTest {
     PinotLLCRealtimeSegmentManager segmentManager = new 
FakePinotLLCRealtimeSegmentManager(
         helixResourceManager, config);
 
-    long deletedTmpSegCount;
+    int numDeletedTmpSegments;
     // case 1: the segmentMetadata download uri is identical to the uri of the 
tmp segment. Should not delete
     when(segZKMeta.getStatus()).thenReturn(Status.DONE);
     when(segZKMeta.getDownloadUrl()).thenReturn(SCHEME + tableDir + "/" + 
segmentFileName);
-    deletedTmpSegCount = segmentManager.deleteTmpSegments(REALTIME_TABLE_NAME, 
Collections.singletonList(segZKMeta));
+    numDeletedTmpSegments = 
segmentManager.deleteTmpSegments(REALTIME_TABLE_NAME, 
Collections.singletonList(segZKMeta));
     assertTrue(segmentFile.exists());
-    assertEquals(0L, deletedTmpSegCount);
+    assertEquals(numDeletedTmpSegments, 0);
 
     // case 2: download url is empty, indicating the tmp segment is absolutely 
orphan. Delete the file
     
when(segZKMeta.getDownloadUrl()).thenReturn(METADATA_URI_FOR_PEER_DOWNLOAD);
-    deletedTmpSegCount = segmentManager.deleteTmpSegments(REALTIME_TABLE_NAME, 
Collections.singletonList(segZKMeta));
+    numDeletedTmpSegments = 
segmentManager.deleteTmpSegments(REALTIME_TABLE_NAME, 
Collections.singletonList(segZKMeta));
     assertFalse(segmentFile.exists());
-    assertEquals(1L, deletedTmpSegCount);
+    assertEquals(numDeletedTmpSegments, 1);
   }
 
   
//////////////////////////////////////////////////////////////////////////////////
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCompletionUtils.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCompletionUtils.java
index c7198eded7..b47bf147fc 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCompletionUtils.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCompletionUtils.java
@@ -20,15 +20,12 @@ package org.apache.pinot.core.data.manager.realtime;
 
 import java.util.UUID;
 import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 public class SegmentCompletionUtils {
   private SegmentCompletionUtils() {
   }
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentCompletionUtils.class);
   // Used to create temporary segment file names
   private static final String TMP = ".tmp.";
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to