This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d9c0790a46 Periodically delete Tmp Segment file brought by the 
SplitCommit End phase (#10815)
d9c0790a46 is described below

commit d9c0790a464eaf2575a2b7225827af36f4b0cd25
Author: Xuanyi Li <xuany...@uber.com>
AuthorDate: Mon Sep 18 19:08:07 2023 -0700

    Periodically delete Tmp Segment file brought by the SplitCommit End phase 
(#10815)
---
 .../pinot/common/metrics/AbstractMetrics.java      |  20 +++--
 .../pinot/common/metrics/ControllerMeter.java      |   1 +
 .../pinot/controller/BaseControllerStarter.java    |   4 +-
 .../apache/pinot/controller/ControllerConf.java    |  15 ++++
 .../resources/LLCSegmentCompletionHandlers.java    |   4 +-
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 100 ++++++++++++++++++---
 .../RealtimeSegmentValidationManager.java          |  15 ++++
 .../PinotLLCRealtimeSegmentManagerTest.java        |  65 +++++++++++---
 .../manager/realtime/PinotFSSegmentUploader.java   |   3 +-
 .../manager/realtime}/SegmentCompletionUtils.java  |  22 ++++-
 .../realtime}/SegmentCompletionUtilsTest.java      |  19 ++--
 11 files changed, 224 insertions(+), 44 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
index ff41b12abf..98f65a4be9 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
@@ -285,9 +285,7 @@ public abstract class AbstractMetrics<QP extends 
AbstractMetrics.QueryPhase, M e
    */
   public PinotMeter addMeteredTableValue(final String tableName, final M 
meter, final long unitCount,
       PinotMeter reusedMeter) {
-    String meterName = meter.getMeterName();
-    final String fullMeterName = _metricPrefix + getTableName(tableName) + "." 
+ meterName;
-    return addValueToMeter(fullMeterName, meter.getUnit(), unitCount, 
reusedMeter);
+    return addValueToMeter(getTableFullMeterName(tableName, meter), 
meter.getUnit(), unitCount, reusedMeter);
   }
 
   /**
@@ -331,14 +329,17 @@ public abstract class AbstractMetrics<QP extends 
AbstractMetrics.QueryPhase, M e
   }
 
   public PinotMeter getMeteredTableValue(final String tableName, final M 
meter) {
-    final String fullMeterName;
-    String meterName = meter.getMeterName();
-    fullMeterName = _metricPrefix + getTableName(tableName) + "." + meterName;
-    final PinotMetricName metricName = 
PinotMetricUtils.makePinotMetricName(_clazz, fullMeterName);
+    final PinotMetricName metricName = 
PinotMetricUtils.makePinotMetricName(_clazz,
+        getTableFullMeterName(tableName, meter));
 
     return PinotMetricUtils.makePinotMeter(_metricsRegistry, metricName, 
meter.getUnit(), TimeUnit.SECONDS);
   }
 
+  private String getTableFullMeterName(final String tableName, final M meter) {
+    String meterName = meter.getMeterName();
+    return _metricPrefix + getTableName(tableName) + "." + meterName;
+  }
+
   /**
    * @deprecated Please use addMeteredTableValue(final String tableName, final 
M meter, final long unitCount), which is
    * designed for tracking count and rates.
@@ -723,6 +724,11 @@ public abstract class AbstractMetrics<QP extends 
AbstractMetrics.QueryPhase, M e
     removeGaugeFromMetricRegistry(gaugeName);
   }
 
+  public void removeTableMeter(final String tableName, final M meter) {
+    PinotMetricUtils.removeMetric(_metricsRegistry,
+        PinotMetricUtils.makePinotMetricName(_clazz, 
getTableFullMeterName(tableName, meter)));
+  }
+
   /**
    * Remove callback gauge.
    * @param metricName metric name
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
index 8de33d0e41..a3e6da0086 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
@@ -59,6 +59,7 @@ public enum ControllerMeter implements AbstractMetrics.Meter {
   
LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_SUCCESS("LLCSegmentDeepStoreUploadRetrySuccess",
 false),
   
LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_ERROR("LLCSegmentDeepStoreUploadRetryError",
 false),
   SEGMENT_MISSING_DEEP_STORE_LINK("RealtimeSegmentMissingDeepStoreLink", 
false),
+  DELETED_TMP_SEGMENT_COUNT("DeletedTmpSegmentCount", false),
   NUMBER_ADHOC_TASKS_SUBMITTED("adhocTasks", false);
 
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index e983fd80ff..e7f02b57df 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -156,6 +156,7 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
   protected HelixManager _helixParticipantManager;
   protected PinotMetricsRegistry _metricsRegistry;
   protected ControllerMetrics _controllerMetrics;
+  protected ValidationMetrics _validationMetrics;
   protected SqlQueryExecutor _sqlQueryExecutor;
   // Can only be constructed after resource manager getting started
   protected OfflineSegmentIntervalChecker _offlineSegmentIntervalChecker;
@@ -586,6 +587,7 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
     _controllerMetrics = new ControllerMetrics(_config.getMetricsPrefix(), 
_metricsRegistry);
     _controllerMetrics.initializeGlobalMeters();
     _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.VERSION, 
PinotVersion.VERSION_METRIC_NAME, 1);
+    _validationMetrics = new ValidationMetrics(_metricsRegistry);
   }
 
   private void initPinotFSFactory() {
@@ -705,7 +707,7 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
     periodicTasks.add(_offlineSegmentIntervalChecker);
     _realtimeSegmentValidationManager =
         new RealtimeSegmentValidationManager(_config, _helixResourceManager, 
_leadControllerManager,
-            _pinotLLCRealtimeSegmentManager, new 
ValidationMetrics(_metricsRegistry), _controllerMetrics);
+            _pinotLLCRealtimeSegmentManager, _validationMetrics, 
_controllerMetrics);
     periodicTasks.add(_realtimeSegmentValidationManager);
     _brokerResourceValidationManager =
         new BrokerResourceValidationManager(_config, _helixResourceManager, 
_leadControllerManager, _controllerMetrics);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 73abaa14a4..22a838faf1 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -216,9 +216,15 @@ public class ControllerConf extends PinotConfiguration {
         "controller.realtime.segment.deepStoreUploadRetryEnabled";
     public static final String DEEP_STORE_RETRY_UPLOAD_TIMEOUT_MS =
         "controller.realtime.segment.deepStoreUploadRetry.timeoutMs";
+    public static final String ENABLE_TMP_SEGMENT_ASYNC_DELETION =
+        "controller.realtime.segment.tmpFileAsyncDeletionEnabled";
+    // temporary segments within expiration won't be deleted so that ongoing 
split commit won't be impacted.
+    public static final String TMP_SEGMENT_RETENTION_IN_SECONDS =
+        "controller.realtime.segment.tmpFileRetentionInSeconds";
 
     public static final int MIN_INITIAL_DELAY_IN_SECONDS = 120;
     public static final int MAX_INITIAL_DELAY_IN_SECONDS = 300;
+    public static final int DEFAULT_SPLIT_COMMIT_TMP_SEGMENT_LIFETIME_SECOND = 
60 * 60; // 1 Hour.
 
     private static final Random RANDOM = new Random();
 
@@ -924,10 +930,19 @@ public class ControllerConf extends PinotConfiguration {
     return 
getProperty(ControllerPeriodicTasksConf.ENABLE_DEEP_STORE_RETRY_UPLOAD_LLC_SEGMENT,
 false);
   }
 
+  public boolean isTmpSegmentAsyncDeletionEnabled() {
+    return 
getProperty(ControllerPeriodicTasksConf.ENABLE_TMP_SEGMENT_ASYNC_DELETION, 
false);
+  }
+
   public int getDeepStoreRetryUploadTimeoutMs() {
     return 
getProperty(ControllerPeriodicTasksConf.DEEP_STORE_RETRY_UPLOAD_TIMEOUT_MS, -1);
   }
 
+  public int getTmpSegmentRetentionInSeconds() {
+    return 
getProperty(ControllerPeriodicTasksConf.TMP_SEGMENT_RETENTION_IN_SECONDS,
+        
ControllerPeriodicTasksConf.DEFAULT_SPLIT_COMMIT_TMP_SEGMENT_LIFETIME_SECOND);
+  }
+
   public long getPinotTaskManagerInitialDelaySeconds() {
     return getPeriodicTaskInitialDelayInSeconds();
   }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
index bf852f582e..836f58052d 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
@@ -47,10 +47,10 @@ import org.apache.pinot.controller.api.access.AccessType;
 import org.apache.pinot.controller.api.access.Authenticate;
 import 
org.apache.pinot.controller.helix.core.realtime.SegmentCompletionManager;
 import 
org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
-import org.apache.pinot.controller.util.SegmentCompletionUtils;
 import org.apache.pinot.core.auth.Actions;
 import org.apache.pinot.core.auth.Authorize;
 import org.apache.pinot.core.auth.TargetType;
+import org.apache.pinot.core.data.manager.realtime.SegmentCompletionUtils;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.spi.filesystem.PinotFS;
@@ -378,7 +378,7 @@ public class LLCSegmentCompletionHandlers {
       String rawTableName = new LLCSegmentName(segmentName).getTableName();
       URI segmentFileURI = URIUtils
           
.getUri(ControllerFilePathProvider.getInstance().getDataDirURI().toString(), 
rawTableName,
-              
URIUtils.encode(SegmentCompletionUtils.generateSegmentFileName(segmentName)));
+              
URIUtils.encode(SegmentCompletionUtils.generateTmpSegmentFileName(segmentName)));
       
PinotFSFactory.create(segmentFileURI.getScheme()).copyFromLocalFile(localTempFile,
 segmentFileURI);
       SegmentCompletionProtocol.Response.Params responseParams = new 
SegmentCompletionProtocol.Response.Params()
           
.withStreamPartitionMsgOffset(requestParams.getStreamPartitionMsgOffset())
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index e55f0e6463..971dd4333d 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -72,8 +72,8 @@ import 
org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpd
 import 
org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdater;
 import 
org.apache.pinot.controller.helix.core.retention.strategy.RetentionStrategy;
 import 
org.apache.pinot.controller.helix.core.retention.strategy.TimeRetentionStrategy;
-import org.apache.pinot.controller.util.SegmentCompletionUtils;
 import org.apache.pinot.controller.validation.RealtimeSegmentValidationManager;
+import org.apache.pinot.core.data.manager.realtime.SegmentCompletionUtils;
 import org.apache.pinot.core.util.PeerServerSegmentFinder;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
@@ -165,6 +165,7 @@ public class PinotLLCRealtimeSegmentManager {
   private final Lock[] _idealStateUpdateLocks;
   private final FlushThresholdUpdateManager _flushThresholdUpdateManager;
   private final boolean _isDeepStoreLLCSegmentUploadRetryEnabled;
+  private final boolean _isTmpSegmentAsyncDeletionEnabled;
   private final int _deepstoreUploadRetryTimeoutMs;
   private final FileUploadDownloadClient _fileUploadDownloadClient;
   private final AtomicInteger _numCompletingSegments = new AtomicInteger(0);
@@ -190,6 +191,7 @@ public class PinotLLCRealtimeSegmentManager {
     }
     _flushThresholdUpdateManager = new FlushThresholdUpdateManager();
     _isDeepStoreLLCSegmentUploadRetryEnabled = 
controllerConf.isDeepStoreRetryUploadLLCSegmentEnabled();
+    _isTmpSegmentAsyncDeletionEnabled = 
controllerConf.isTmpSegmentAsyncDeletionEnabled();
     _deepstoreUploadRetryTimeoutMs = 
controllerConf.getDeepStoreRetryUploadTimeoutMs();
     _fileUploadDownloadClient = _isDeepStoreLLCSegmentUploadRetryEnabled ? 
initFileUploadDownloadClient() : null;
   }
@@ -198,6 +200,10 @@ public class PinotLLCRealtimeSegmentManager {
     return _isDeepStoreLLCSegmentUploadRetryEnabled;
   }
 
+  public boolean isTmpSegmentAsyncDeletionEnabled() {
+    return _isTmpSegmentAsyncDeletionEnabled;
+  }
+
   @VisibleForTesting
   FileUploadDownloadClient initFileUploadDownloadClient() {
     return new FileUploadDownloadClient();
@@ -460,19 +466,17 @@ public class PinotLLCRealtimeSegmentManager {
     PinotFS pinotFS = PinotFSFactory.create(tableDirURI.getScheme());
     String uriToMoveTo = moveSegmentFile(rawTableName, segmentName, 
segmentLocation, pinotFS);
 
-    // Cleans up tmp segment files under table dir.
-    // We only clean up tmp segment files in table level dir, so there's no 
need to list recursively.
-    // See LLCSegmentCompletionHandlers.uploadSegment().
-    // TODO: move tmp file logic into SegmentCompletionUtils.
-    try {
-      for (String uri : pinotFS.listFiles(tableDirURI, false)) {
-        if 
(uri.contains(SegmentCompletionUtils.getSegmentNamePrefix(segmentName))) {
-          LOGGER.warn("Deleting temporary segment file: {}", uri);
-          Preconditions.checkState(pinotFS.delete(new URI(uri), true), "Failed 
to delete file: %s", uri);
+    if (!isTmpSegmentAsyncDeletionEnabled()) {
+      try {
+        for (String uri : pinotFS.listFiles(tableDirURI, false)) {
+          if 
(uri.contains(SegmentCompletionUtils.getTmpSegmentNamePrefix(segmentName))) {
+            LOGGER.warn("Deleting temporary segment file: {}", uri);
+            Preconditions.checkState(pinotFS.delete(new URI(uri), true), 
"Failed to delete file: %s", uri);
+          }
         }
+      } catch (Exception e) {
+        LOGGER.warn("Caught exception while deleting temporary segment files 
for segment: {}", segmentName, e);
       }
-    } catch (Exception e) {
-      LOGGER.warn("Caught exception while deleting temporary segment files for 
segment: {}", segmentName, e);
     }
     committingSegmentDescriptor.setSegmentLocation(uriToMoveTo);
   }
@@ -1444,6 +1448,78 @@ public class PinotLLCRealtimeSegmentManager {
     }
   }
 
+  /**
+   * Delete tmp segments for realtime table with low level consumer, split 
commit and async deletion is enabled.
+   * @param tableNameWithType
+   * @param segmentsZKMetadata
+   * @return number of deleted orphan temporary segments
+   *
+   */
+  public long deleteTmpSegments(String tableNameWithType, 
List<SegmentZKMetadata> segmentsZKMetadata) {
+    Preconditions.checkState(!_isStopping, "Segment manager is stopping");
+
+    if (!TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
+      return 0L;
+    }
+
+    TableConfig tableConfig = 
_helixResourceManager.getTableConfig(tableNameWithType);
+    if (tableConfig == null) {
+      LOGGER.warn("Failed to find table config for table: {}, skipping 
deletion of tmp segments", tableNameWithType);
+      return 0L;
+    }
+
+    if (!isLowLevelConsumer(tableNameWithType, tableConfig)
+        || !getIsSplitCommitEnabled()
+        || !isTmpSegmentAsyncDeletionEnabled()) {
+      return 0L;
+    }
+
+    Set<String> deepURIs = segmentsZKMetadata.stream().filter(meta -> 
meta.getStatus() == Status.DONE
+        && 
!CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(meta.getDownloadUrl())).map(
+        SegmentZKMetadata::getDownloadUrl).collect(
+        Collectors.toSet());
+
+    String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+    URI tableDirURI = URIUtils.getUri(_controllerConf.getDataDir(), 
rawTableName);
+    PinotFS pinotFS = PinotFSFactory.create(tableDirURI.getScheme());
+    long deletedTmpSegments = 0;
+    try {
+      for (String filePath : pinotFS.listFiles(tableDirURI, false)) {
+        // prepend scheme
+        URI uri = URIUtils.getUri(filePath);
+        if (isTmpAndCanDelete(uri, deepURIs, pinotFS)) {
+          LOGGER.info("Deleting temporary segment file: {}", uri);
+          if (pinotFS.delete(uri, true)) {
+            LOGGER.info("Succeed to delete file: {}", uri);
+            deletedTmpSegments++;
+          } else {
+            LOGGER.warn("Failed to delete file: {}", uri);
+          }
+        }
+      }
+    } catch (Exception e) {
+      LOGGER.warn("Caught exception while deleting temporary files for table: 
{}", rawTableName, e);
+    }
+    return deletedTmpSegments;
+  }
+
+  private boolean isTmpAndCanDelete(URI uri, Set<String> deepURIs, PinotFS 
pinotFS) throws Exception {
+    long lastModified = pinotFS.lastModified(uri);
+    if (lastModified <= 0) {
+      LOGGER.warn("file {} modification time {} is not positive, ineligible 
for delete", uri.toString(), lastModified);
+      return false;
+    }
+    String uriString = uri.toString();
+    return SegmentCompletionUtils.isTmpFile(uriString) && 
!deepURIs.contains(uriString)
+        && getCurrentTimeMs() - lastModified > 
_controllerConf.getTmpSegmentRetentionInSeconds() * 1000L;
+  }
+
+  private boolean isLowLevelConsumer(String tableNameWithType, TableConfig 
tableConfig) {
+    PartitionLevelStreamConfig streamConfig = new 
PartitionLevelStreamConfig(tableNameWithType,
+        IngestionConfigUtils.getStreamConfigMap(tableConfig));
+    return streamConfig.hasLowLevelConsumerType();
+  }
+
   /**
    * Force commit the current segments in consuming state and restart 
consumption
    */
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index d0a7bb1a37..42ed189bbf 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.metrics.ControllerMeter;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.metrics.ValidationMetrics;
 import org.apache.pinot.controller.ControllerConf;
@@ -49,6 +50,7 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<Rea
 
   private final PinotLLCRealtimeSegmentManager _llcRealtimeSegmentManager;
   private final ValidationMetrics _validationMetrics;
+  private final ControllerMetrics _controllerMetrics;
 
   private final int _segmentLevelValidationIntervalInSeconds;
   private long _lastSegmentLevelValidationRunTimeMs = 0L;
@@ -64,6 +66,7 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<Rea
         leadControllerManager, controllerMetrics);
     _llcRealtimeSegmentManager = llcRealtimeSegmentManager;
     _validationMetrics = validationMetrics;
+    _controllerMetrics = controllerMetrics;
 
     _segmentLevelValidationIntervalInSeconds = 
config.getSegmentLevelValidationIntervalInSeconds();
     Preconditions.checkState(_segmentLevelValidationIntervalInSeconds > 0);
@@ -111,8 +114,19 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<Rea
 
   private void runSegmentLevelValidation(TableConfig tableConfig, StreamConfig 
streamConfig) {
     String realtimeTableName = tableConfig.getTableName();
+
     List<SegmentZKMetadata> segmentsZKMetadata = 
_pinotHelixResourceManager.getSegmentsZKMetadata(realtimeTableName);
 
+    // Delete tmp segments
+    try {
+      long numDeleteTmpSegments = 
_llcRealtimeSegmentManager.deleteTmpSegments(realtimeTableName, 
segmentsZKMetadata);
+      LOGGER.info("Deleted {} tmp segments for table: {}", 
numDeleteTmpSegments, realtimeTableName);
+      _controllerMetrics.addMeteredTableValue(realtimeTableName, 
ControllerMeter.DELETED_TMP_SEGMENT_COUNT,
+          numDeleteTmpSegments);
+    } catch (Exception e) {
+      LOGGER.error("Failed to delete tmp segments for table: {}", 
realtimeTableName, e);
+    }
+
     // Update the total document count gauge
     _validationMetrics.updateTotalDocumentCountGauge(realtimeTableName, 
computeTotalDocumentCount(segmentsZKMetadata));
 
@@ -127,6 +141,7 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<Rea
     for (String tableNameWithType : tableNamesWithType) {
       if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
         _validationMetrics.cleanupTotalDocumentCountGauge(tableNameWithType);
+        _controllerMetrics.removeTableMeter(tableNameWithType, 
ControllerMeter.DELETED_TMP_SEGMENT_COUNT);
       }
     }
   }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index d367741080..9a53ad839e 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Lists;
 import java.io.File;
 import java.io.IOException;
 import java.net.URISyntaxException;
+import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -57,7 +58,7 @@ import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
 import 
org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
-import org.apache.pinot.controller.util.SegmentCompletionUtils;
+import org.apache.pinot.core.data.manager.realtime.SegmentCompletionUtils;
 import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
@@ -85,6 +86,10 @@ import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.Test;
 
+import static 
org.apache.pinot.controller.ControllerConf.ControllerPeriodicTasksConf.ENABLE_TMP_SEGMENT_ASYNC_DELETION;
+import static 
org.apache.pinot.controller.ControllerConf.ControllerPeriodicTasksConf.TMP_SEGMENT_RETENTION_IN_SECONDS;
+import static org.apache.pinot.controller.ControllerConf.ENABLE_SPLIT_COMMIT;
+import static 
org.apache.pinot.spi.utils.CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
@@ -787,7 +792,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
     PinotFSFactory.init(new PinotConfiguration());
     File tableDir = new File(TEMP_DIR, RAW_TABLE_NAME);
     String segmentName = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, 
CURRENT_TIME_MS).getSegmentName();
-    String segmentFileName = 
SegmentCompletionUtils.generateSegmentFileName(segmentName);
+    String segmentFileName = 
SegmentCompletionUtils.generateTmpSegmentFileName(segmentName);
     File segmentFile = new File(tableDir, segmentFileName);
     FileUtils.write(segmentFile, "temporary file contents");
 
@@ -808,9 +813,9 @@ public class PinotLLCRealtimeSegmentManagerTest {
     File tableDir = new File(TEMP_DIR, RAW_TABLE_NAME);
     String segmentName = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, 
CURRENT_TIME_MS).getSegmentName();
     String otherSegmentName = new LLCSegmentName(RAW_TABLE_NAME, 1, 0, 
CURRENT_TIME_MS).getSegmentName();
-    String segmentFileName = 
SegmentCompletionUtils.generateSegmentFileName(segmentName);
-    String extraSegmentFileName = 
SegmentCompletionUtils.generateSegmentFileName(segmentName);
-    String otherSegmentFileName = 
SegmentCompletionUtils.generateSegmentFileName(otherSegmentName);
+    String segmentFileName = 
SegmentCompletionUtils.generateTmpSegmentFileName(segmentName);
+    String extraSegmentFileName = 
SegmentCompletionUtils.generateTmpSegmentFileName(segmentName);
+    String otherSegmentFileName = 
SegmentCompletionUtils.generateTmpSegmentFileName(otherSegmentName);
     File segmentFile = new File(tableDir, segmentFileName);
     File extraSegmentFile = new File(tableDir, extraSegmentFileName);
     File otherSegmentFile = new File(tableDir, otherSegmentFileName);
@@ -957,7 +962,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
     // Change 1st segment status to be DONE, but with default peer download 
url.
     // Verify later the download url is fixed after upload success.
     segmentsZKMetadata.get(0).setStatus(Status.DONE);
-    
segmentsZKMetadata.get(0).setDownloadUrl(CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD);
+    segmentsZKMetadata.get(0).setDownloadUrl(METADATA_URI_FOR_PEER_DOWNLOAD);
     // set up the external view for 1st segment
     String instance0 = "instance0";
     externalView.setState(segmentsZKMetadata.get(0).getSegmentName(), 
instance0, "ONLINE");
@@ -986,7 +991,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
     // Change 2nd segment status to be DONE, but with default peer download 
url.
     // Verify later the download url isn't fixed after upload failure.
     segmentsZKMetadata.get(1).setStatus(Status.DONE);
-    
segmentsZKMetadata.get(1).setDownloadUrl(CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD);
+    segmentsZKMetadata.get(1).setDownloadUrl(METADATA_URI_FOR_PEER_DOWNLOAD);
     // set up the external view for 2nd segment
     String instance1 = "instance1";
     externalView.setState(segmentsZKMetadata.get(1).getSegmentName(), 
instance1, "ONLINE");
@@ -1010,7 +1015,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
     // Verify later the download url isn't fixed because no ONLINE replica 
found in any server.
     segmentsZKMetadata.get(2).setStatus(Status.DONE);
     segmentsZKMetadata.get(2).setDownloadUrl(
-        CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD);
+        METADATA_URI_FOR_PEER_DOWNLOAD);
     // set up the external view for 3rd segment
     String instance2 = "instance2";
     externalView.setState(segmentsZKMetadata.get(2).getSegmentName(), 
instance2, "OFFLINE");
@@ -1039,10 +1044,10 @@ public class PinotLLCRealtimeSegmentManagerTest {
 
     assertEquals(
         segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, 
segmentNames.get(1), null).getDownloadUrl(),
-        CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD);
+        METADATA_URI_FOR_PEER_DOWNLOAD);
     assertEquals(
         segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, 
segmentNames.get(2), null).getDownloadUrl(),
-        CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD);
+        METADATA_URI_FOR_PEER_DOWNLOAD);
     assertEquals(
         segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, 
segmentNames.get(3), null).getDownloadUrl(),
         defaultDownloadUrl);
@@ -1050,6 +1055,46 @@ public class PinotLLCRealtimeSegmentManagerTest {
         segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, 
segmentNames.get(4), null).getDownloadUrl());
   }
 
+  @Test
+  public void testDeleteTmpSegmentFiles() throws Exception {
+    // turn on knobs for async deletion of tmp files
+    ControllerConf config = new ControllerConf();
+    config.setDataDir(TEMP_DIR.toString());
+    config.setProperty(ENABLE_SPLIT_COMMIT, true);
+    config.setProperty(TMP_SEGMENT_RETENTION_IN_SECONDS, Integer.MIN_VALUE);
+    config.setProperty(ENABLE_TMP_SEGMENT_ASYNC_DELETION, true);
+
+    // simulate there's an orphan tmp file in localFS
+    PinotFSFactory.init(new PinotConfiguration());
+    File tableDir = new File(TEMP_DIR, RAW_TABLE_NAME);
+    String segmentName = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, 
CURRENT_TIME_MS).getSegmentName();
+    String segmentFileName = 
SegmentCompletionUtils.generateTmpSegmentFileName(segmentName);
+    File segmentFile = new File(tableDir, segmentFileName);
+    FileUtils.write(segmentFile, "temporary file contents", 
Charset.defaultCharset());
+
+    SegmentZKMetadata segZKMeta = mock(SegmentZKMetadata.class);
+    PinotHelixResourceManager helixResourceManager = 
mock(PinotHelixResourceManager.class);
+    when(helixResourceManager.getTableConfig(REALTIME_TABLE_NAME))
+        .thenReturn(new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setLLC(true)
+            
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap()).build());
+    PinotLLCRealtimeSegmentManager segmentManager = new 
FakePinotLLCRealtimeSegmentManager(
+        helixResourceManager, config);
+
+    long deletedTmpSegCount;
+    // case 1: the segmentMetadata download uri is identical to the uri of the 
tmp segment. Should not delete
+    when(segZKMeta.getStatus()).thenReturn(Status.DONE);
+    when(segZKMeta.getDownloadUrl()).thenReturn(SCHEME + tableDir + "/" + 
segmentFileName);
+    deletedTmpSegCount = segmentManager.deleteTmpSegments(REALTIME_TABLE_NAME, 
Collections.singletonList(segZKMeta));
+    assertTrue(segmentFile.exists());
+    assertEquals(0L, deletedTmpSegCount);
+
+    // case 2: download url is empty, indicating the tmp segment is absolutely 
orphan. Delete the file
+    
when(segZKMeta.getDownloadUrl()).thenReturn(METADATA_URI_FOR_PEER_DOWNLOAD);
+    deletedTmpSegCount = segmentManager.deleteTmpSegments(REALTIME_TABLE_NAME, 
Collections.singletonList(segZKMeta));
+    assertFalse(segmentFile.exists());
+    assertEquals(1L, deletedTmpSegCount);
+  }
+
   
//////////////////////////////////////////////////////////////////////////////////
   // Fake classes
   
/////////////////////////////////////////////////////////////////////////////////
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
index 4f173521c9..949fc7a0b7 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
@@ -20,7 +20,6 @@ package org.apache.pinot.core.data.manager.realtime;
 
 import java.io.File;
 import java.net.URI;
-import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -74,7 +73,7 @@ public class PinotFSSegmentUploader implements 
SegmentUploader {
     final String rawTableName = 
TableNameBuilder.extractRawTableName(segmentName.getTableName());
     Callable<URI> uploadTask = () -> {
       URI destUri = new URI(StringUtil.join(File.separator, 
_segmentStoreUriStr, segmentName.getTableName(),
-          segmentName.getSegmentName() + UUID.randomUUID().toString()));
+          
SegmentCompletionUtils.generateTmpSegmentFileName(segmentName.getSegmentName())));
       long startTime = System.currentTimeMillis();
       try {
         PinotFS pinotFS = PinotFSFactory.create(new 
URI(_segmentStoreUriStr).getScheme());
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/SegmentCompletionUtils.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCompletionUtils.java
similarity index 69%
rename from 
pinot-controller/src/main/java/org/apache/pinot/controller/util/SegmentCompletionUtils.java
rename to 
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCompletionUtils.java
index 9feb6cdbc6..61f270c3bf 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/SegmentCompletionUtils.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCompletionUtils.java
@@ -16,9 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.controller.util;
+package org.apache.pinot.core.data.manager.realtime;
 
 import java.util.UUID;
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,11 +38,24 @@ public class SegmentCompletionUtils {
    * @param segmentName segment name
    * @return
    */
-  public static String getSegmentNamePrefix(String segmentName) {
+  public static String getTmpSegmentNamePrefix(String segmentName) {
     return segmentName + TMP;
   }
 
-  public static String generateSegmentFileName(String segmentNameStr) {
-    return getSegmentNamePrefix(segmentNameStr) + UUID.randomUUID().toString();
+  public static String generateTmpSegmentFileName(String segmentNameStr) {
+    return getTmpSegmentNamePrefix(segmentNameStr) + UUID.randomUUID();
+  }
+
+  public static boolean isTmpFile(String uri) {
+    String[] splits = StringUtils.splitByWholeSeparator(uri, TMP);
+    if (splits.length < 2) {
+      return false;
+    }
+    try {
+      UUID.fromString(splits[splits.length - 1]);
+      return true;
+    } catch (IllegalArgumentException e) {
+      return false;
+    }
   }
 }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/SegmentCompletionUtilsTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/SegmentCompletionUtilsTest.java
similarity index 60%
rename from 
pinot-controller/src/test/java/org/apache/pinot/controller/api/SegmentCompletionUtilsTest.java
rename to 
pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/SegmentCompletionUtilsTest.java
index 06f9cd43c0..270f763dc3 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/SegmentCompletionUtilsTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/SegmentCompletionUtilsTest.java
@@ -16,27 +16,34 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.controller.api;
+package org.apache.pinot.core.data.manager.realtime;
 
-import org.apache.pinot.controller.util.SegmentCompletionUtils;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 
-
 public class SegmentCompletionUtilsTest {
 
   @Test
   public void testGenerateSegmentFilePrefix() {
     String segmentName = "segment";
-    assertEquals(SegmentCompletionUtils.getSegmentNamePrefix(segmentName), 
"segment.tmp.");
+    assertEquals(SegmentCompletionUtils.getTmpSegmentNamePrefix(segmentName), 
"segment.tmp.");
   }
 
   @Test
   public void testGenerateSegmentLocation() {
     String segmentName = "segment";
-    String segmentNamePrefix = 
SegmentCompletionUtils.getSegmentNamePrefix(segmentName);
-    
assertTrue(SegmentCompletionUtils.generateSegmentFileName(segmentName).startsWith(segmentNamePrefix));
+    String segmentNamePrefix = 
SegmentCompletionUtils.getTmpSegmentNamePrefix(segmentName);
+    
assertTrue(SegmentCompletionUtils.generateTmpSegmentFileName(segmentName).startsWith(segmentNamePrefix));
+  }
+
+  @Test
+  public void testIsTmpFile() {
+    
assertTrue(SegmentCompletionUtils.isTmpFile("hdfs://foo.tmp.550e8400-e29b-41d4-a716-446655440000"));
+    assertFalse(SegmentCompletionUtils.isTmpFile("hdfs://foo.tmp."));
+    
assertFalse(SegmentCompletionUtils.isTmpFile(".tmp.550e8400-e29b-41d4-a716-446655440000"));
+    assertFalse(SegmentCompletionUtils.isTmpFile("hdfs://foo.tmp.55"));
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to