This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 65f658d0dc Improve async temp segment delete in validation manager
(#14339)
65f658d0dc is described below
commit 65f658d0dc38b73d65a9b22a4f7a206808f4c18f
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Wed Oct 30 20:46:54 2024 -0700
Improve async temp segment delete in validation manager (#14339)
---
.../realtime/PinotLLCRealtimeSegmentManager.java | 87 ++++++++++++----------
.../RealtimeSegmentValidationManager.java | 18 +++--
.../PinotLLCRealtimeSegmentManagerTest.java | 13 ++--
.../manager/realtime/SegmentCompletionUtils.java | 3 -
4 files changed, 64 insertions(+), 57 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 7a459d7ddb..91e4bdab2e 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -21,6 +21,7 @@ package org.apache.pinot.controller.helix.core.realtime;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
import java.io.IOException;
import java.net.URI;
import java.sql.Timestamp;
@@ -1592,67 +1593,71 @@ public class PinotLLCRealtimeSegmentManager {
/**
* Delete tmp segments for realtime table with low level consumer, split
commit and async deletion is enabled.
- * @param tableNameWithType
- * @param segmentsZKMetadata
* @return number of deleted orphan temporary segments
- *
*/
- public long deleteTmpSegments(String tableNameWithType,
List<SegmentZKMetadata> segmentsZKMetadata) {
+ public int deleteTmpSegments(String realtimeTableName,
List<SegmentZKMetadata> segmentsZKMetadata)
+ throws IOException {
Preconditions.checkState(!_isStopping, "Segment manager is stopping");
- if (!TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
- return 0L;
- }
-
- TableConfig tableConfig =
_helixResourceManager.getTableConfig(tableNameWithType);
- if (tableConfig == null) {
- LOGGER.warn("Failed to find table config for table: {}, skipping
deletion of tmp segments", tableNameWithType);
- return 0L;
- }
-
- if (!isTmpSegmentAsyncDeletionEnabled()) {
- return 0L;
+ // NOTE: Do not delete the file if it is used as download URL. This could
happen when user uses temporary file to
+ // backfill segment.
+ Set<String> downloadUrls =
Sets.newHashSetWithExpectedSize(segmentsZKMetadata.size());
+ for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) {
+ if (segmentZKMetadata.getStatus() == Status.DONE) {
+ downloadUrls.add(segmentZKMetadata.getDownloadUrl());
+ }
}
- Set<String> deepURIs = segmentsZKMetadata.stream().filter(meta ->
meta.getStatus() == Status.DONE
- &&
!CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(meta.getDownloadUrl())).map(
- SegmentZKMetadata::getDownloadUrl).collect(
- Collectors.toSet());
-
- String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
+ String rawTableName =
TableNameBuilder.extractRawTableName(realtimeTableName);
URI tableDirURI = URIUtils.getUri(_controllerConf.getDataDir(),
rawTableName);
PinotFS pinotFS = PinotFSFactory.create(tableDirURI.getScheme());
- long deletedTmpSegments = 0;
- try {
- for (String filePath : pinotFS.listFiles(tableDirURI, false)) {
- // prepend scheme
+ int numDeletedTmpSegments = 0;
+ for (String filePath : pinotFS.listFiles(tableDirURI, false)) {
+ if (isTmpAndCanDelete(filePath, downloadUrls, pinotFS)) {
URI uri = URIUtils.getUri(filePath);
- if (isTmpAndCanDelete(uri, deepURIs, pinotFS)) {
- LOGGER.info("Deleting temporary segment file: {}", uri);
+ String canonicalPath = uri.toString();
+ LOGGER.info("Deleting temporary segment file: {}", canonicalPath);
+ try {
if (pinotFS.delete(uri, true)) {
- LOGGER.info("Succeed to delete file: {}", uri);
- deletedTmpSegments++;
+ LOGGER.info("Deleted temporary segment file: {}", canonicalPath);
+ numDeletedTmpSegments++;
} else {
- LOGGER.warn("Failed to delete file: {}", uri);
+ LOGGER.warn("Failed to delete temporary segment file: {}",
canonicalPath);
}
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while deleting temporary segment
file: {}", canonicalPath, e);
}
}
- } catch (Exception e) {
- LOGGER.warn("Caught exception while deleting temporary files for table:
{}", rawTableName, e);
}
- return deletedTmpSegments;
+ return numDeletedTmpSegments;
}
- private boolean isTmpAndCanDelete(URI uri, Set<String> deepURIs, PinotFS
pinotFS)
- throws Exception {
- long lastModified = pinotFS.lastModified(uri);
+ private boolean isTmpAndCanDelete(String filePath, Set<String> downloadUrls,
PinotFS pinotFS) {
+ if (!SegmentCompletionUtils.isTmpFile(filePath)) {
+ return false;
+ }
+ // Prepend scheme
+ URI uri = URIUtils.getUri(filePath);
+ String canonicalPath = uri.toString();
+ // NOTE: Do not delete the file if it is used as download URL. This could
happen when user uses temporary file to
+ // backfill segment.
+ if (downloadUrls.contains(canonicalPath)) {
+ return false;
+ }
+ long lastModified;
+ try {
+ lastModified = pinotFS.lastModified(uri);
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while getting last modified time for
file: {}, ineligible for delete",
+ canonicalPath, e);
+ return false;
+ }
if (lastModified <= 0) {
- LOGGER.warn("file {} modification time {} is not positive, ineligible
for delete", uri.toString(), lastModified);
+ LOGGER.warn("Last modified time for file: {} is not positive: {},
ineligible for delete", canonicalPath,
+ lastModified);
return false;
}
- String uriString = uri.toString();
- return SegmentCompletionUtils.isTmpFile(uriString) &&
!deepURIs.contains(uriString)
- && getCurrentTimeMs() - lastModified >
_controllerConf.getTmpSegmentRetentionInSeconds() * 1000L;
+ return getCurrentTimeMs() - lastModified >
_controllerConf.getTmpSegmentRetentionInSeconds() * 1000L;
}
/**
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index b8460a406a..88f1bc6ee6 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -153,13 +153,17 @@ public class RealtimeSegmentValidationManager extends
ControllerPeriodicTask<Rea
List<SegmentZKMetadata> segmentsZKMetadata =
_pinotHelixResourceManager.getSegmentsZKMetadata(realtimeTableName);
// Delete tmp segments
- try {
- long numDeleteTmpSegments =
_llcRealtimeSegmentManager.deleteTmpSegments(realtimeTableName,
segmentsZKMetadata);
- LOGGER.info("Deleted {} tmp segments for table: {}",
numDeleteTmpSegments, realtimeTableName);
- _controllerMetrics.addMeteredTableValue(realtimeTableName,
ControllerMeter.DELETED_TMP_SEGMENT_COUNT,
- numDeleteTmpSegments);
- } catch (Exception e) {
- LOGGER.error("Failed to delete tmp segments for table: {}",
realtimeTableName, e);
+ if (_llcRealtimeSegmentManager.isTmpSegmentAsyncDeletionEnabled()) {
+ try {
+ long startTimeMs = System.currentTimeMillis();
+ int numDeletedTmpSegments =
_llcRealtimeSegmentManager.deleteTmpSegments(realtimeTableName,
segmentsZKMetadata);
+ LOGGER.info("Deleted {} tmp segments for table: {} in {}ms",
numDeletedTmpSegments, realtimeTableName,
+ System.currentTimeMillis() - startTimeMs);
+ _controllerMetrics.addMeteredTableValue(realtimeTableName,
ControllerMeter.DELETED_TMP_SEGMENT_COUNT,
+ numDeletedTmpSegments);
+ } catch (Exception e) {
+ LOGGER.error("Failed to delete tmp segments for table: {}",
realtimeTableName, e);
+ }
}
// Update the total document count gauge
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 435303f5e9..bed25bb16c 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -1046,7 +1046,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
}
@Test
- public void testDeleteTmpSegmentFiles() throws Exception {
+ public void testDeleteTmpSegmentFiles()
+ throws Exception {
// turn on knobs for async deletion of tmp files
ControllerConf config = new ControllerConf();
config.setDataDir(TEMP_DIR.toString());
@@ -1069,19 +1070,19 @@ public class PinotLLCRealtimeSegmentManagerTest {
PinotLLCRealtimeSegmentManager segmentManager = new
FakePinotLLCRealtimeSegmentManager(
helixResourceManager, config);
- long deletedTmpSegCount;
+ int numDeletedTmpSegments;
// case 1: the segmentMetadata download uri is identical to the uri of the
tmp segment. Should not delete
when(segZKMeta.getStatus()).thenReturn(Status.DONE);
when(segZKMeta.getDownloadUrl()).thenReturn(SCHEME + tableDir + "/" +
segmentFileName);
- deletedTmpSegCount = segmentManager.deleteTmpSegments(REALTIME_TABLE_NAME,
Collections.singletonList(segZKMeta));
+ numDeletedTmpSegments =
segmentManager.deleteTmpSegments(REALTIME_TABLE_NAME,
Collections.singletonList(segZKMeta));
assertTrue(segmentFile.exists());
- assertEquals(0L, deletedTmpSegCount);
+ assertEquals(numDeletedTmpSegments, 0);
// case 2: download url is empty, indicating the tmp segment is absolutely
orphan. Delete the file
when(segZKMeta.getDownloadUrl()).thenReturn(METADATA_URI_FOR_PEER_DOWNLOAD);
- deletedTmpSegCount = segmentManager.deleteTmpSegments(REALTIME_TABLE_NAME,
Collections.singletonList(segZKMeta));
+ numDeletedTmpSegments =
segmentManager.deleteTmpSegments(REALTIME_TABLE_NAME,
Collections.singletonList(segZKMeta));
assertFalse(segmentFile.exists());
- assertEquals(1L, deletedTmpSegCount);
+ assertEquals(numDeletedTmpSegments, 1);
}
//////////////////////////////////////////////////////////////////////////////////
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCompletionUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCompletionUtils.java
index c7198eded7..b47bf147fc 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCompletionUtils.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCompletionUtils.java
@@ -20,15 +20,12 @@ package org.apache.pinot.core.data.manager.realtime;
import java.util.UUID;
import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class SegmentCompletionUtils {
private SegmentCompletionUtils() {
}
- private static final Logger LOGGER =
LoggerFactory.getLogger(SegmentCompletionUtils.class);
// Used to create temporary segment file names
private static final String TMP = ".tmp.";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]