This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new d530695c57 Use ideal state as source of truth for segment existence (#9735) d530695c57 is described below commit d530695c5759ae042896d40c989bed1b4ea872ec Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Fri Nov 4 21:51:01 2022 -0700 Use ideal state as source of truth for segment existence (#9735) --- .../api/resources/PinotSegmentRestletResource.java | 4 +- .../helix/core/PinotHelixResourceManager.java | 56 ++++++++++------------ .../PinotHelixResourceManagerStatelessTest.java | 3 +- 3 files changed, 27 insertions(+), 36 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java index ea77b9caec..c467c3f816 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java @@ -829,8 +829,8 @@ public class PinotSegmentRestletResource { } String tableNameWithType = ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0); - deleteSegmentsInternal(tableNameWithType, _pinotHelixResourceManager.getSegmentsFor(tableNameWithType, false), - retentionPeriod); + deleteSegmentsInternal(tableNameWithType, + _pinotHelixResourceManager.getSegmentsFromPropertyStore(tableNameWithType), retentionPeriod); return new SuccessResponse("All segments of table " + tableNameWithType + " deleted"); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 3e7ed24d5d..01b778a5f4 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -730,22 +730,29 @@ public class PinotHelixResourceManager { */ /** - * Returns the segments for the given table. + * Returns the segments for the given table from the ideal state. * * @param tableNameWithType Table name with type suffix * @param shouldExcludeReplacedSegments whether to return the list of segments that doesn't contain replaced segments. * @return List of segment names */ public List<String> getSegmentsFor(String tableNameWithType, boolean shouldExcludeReplacedSegments) { - List<String> segmentsFromPropertiesStore = ZKMetadataProvider.getSegments(_propertyStore, tableNameWithType); - if (shouldExcludeReplacedSegments) { - return excludeReplacedSegments(tableNameWithType, segmentsFromPropertiesStore); - } - return segmentsFromPropertiesStore; + IdealState idealState = getTableIdealState(tableNameWithType); + Preconditions.checkState(idealState != null, "Failed to find ideal state for table: %s", tableNameWithType); + List<String> segments = new ArrayList<>(idealState.getPartitionSet()); + return shouldExcludeReplacedSegments ? excludeReplacedSegments(tableNameWithType, segments) : segments; } /** - * Returns the segments for the given table based on the start and end timestamp. + * Returns the segments for the given table from the property store. This API is useful to track the orphan segments + * that are removed from the ideal state but not the property store. + */ + public List<String> getSegmentsFromPropertyStore(String tableNameWithType) { + return ZKMetadataProvider.getSegments(_propertyStore, tableNameWithType); + } + + /** + * Returns the segments for the given table based on the start and end timestamp from the ideal state. * * @param tableNameWithType Table name with type suffix * @param startTimestamp start timestamp in milliseconds (inclusive) @@ -754,21 +761,24 @@ public class PinotHelixResourceManager { */ public List<String> getSegmentsForTableWithTimestamps(String tableNameWithType, long startTimestamp, long endTimestamp, boolean excludeOverlapping) { - List<String> selectedSegments; + IdealState idealState = getTableIdealState(tableNameWithType); + Preconditions.checkState(idealState != null, "Failed to find ideal state for table: %s", tableNameWithType); + Set<String> segments = idealState.getPartitionSet(); // If no start and end timestamp specified, just select all the segments. if (startTimestamp == Long.MIN_VALUE && endTimestamp == Long.MAX_VALUE) { - selectedSegments = getSegmentsFor(tableNameWithType, false); + return excludeReplacedSegments(tableNameWithType, new ArrayList<>(segments)); } else { - selectedSegments = new ArrayList<>(); + List<String> selectedSegments = new ArrayList<>(); List<SegmentZKMetadata> segmentZKMetadataList = getSegmentsZKMetadata(tableNameWithType); for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) { String segmentName = segmentZKMetadata.getSegmentName(); - if (isSegmentWithinTimeStamps(segmentZKMetadata, startTimestamp, endTimestamp, excludeOverlapping)) { + if (segments.contains(segmentName) && isSegmentWithinTimeStamps(segmentZKMetadata, startTimestamp, endTimestamp, + excludeOverlapping)) { selectedSegments.add(segmentName); } } + return excludeReplacedSegments(tableNameWithType, selectedSegments); } - return excludeReplacedSegments(tableNameWithType, selectedSegments); } /** @@ -1890,7 +1900,7 @@ public class PinotHelixResourceManager { // Remove all stored segments for the table Long retentionPeriodMs = retentionPeriod != null ? TimeUtils.convertPeriodToMillis(retentionPeriod) : null; - _segmentDeletionManager.removeSegmentsFromStore(offlineTableName, getSegmentsFor(offlineTableName, false), + _segmentDeletionManager.removeSegmentsFromStore(offlineTableName, getSegmentsFromPropertyStore(offlineTableName), retentionPeriodMs); LOGGER.info("Deleting table {}: Removed stored segments", offlineTableName); @@ -1947,7 +1957,7 @@ public class PinotHelixResourceManager { // Remove all stored segments for the table Long retentionPeriodMs = retentionPeriod != null ? TimeUtils.convertPeriodToMillis(retentionPeriod) : null; - _segmentDeletionManager.removeSegmentsFromStore(realtimeTableName, getSegmentsFor(realtimeTableName, false), + _segmentDeletionManager.removeSegmentsFromStore(realtimeTableName, getSegmentsFromPropertyStore(realtimeTableName), retentionPeriodMs); LOGGER.info("Deleting table {}: Removed stored segments", realtimeTableName); @@ -3334,7 +3344,6 @@ public class PinotHelixResourceManager { if (!segmentsToCleanUp.isEmpty()) { LOGGER.info("Cleaning up the segments while startReplaceSegments: {}", segmentsToCleanUp); deleteSegments(tableNameWithType, segmentsToCleanUp); - waitForSegmentsToDelete(tableNameWithType, segmentsToCleanUp, SEGMENT_CLEANUP_TIMEOUT_MS); } return true; } else { @@ -3355,23 +3364,6 @@ public class PinotHelixResourceManager { return segmentLineageEntryId; } - private void waitForSegmentsToDelete(String tableNameWithType, List<String> segments, long timeOutInMillis) - throws InterruptedException { - LOGGER.info("Waiting for {} segments to delete for table: {}. timeout = {}ms, segments = {}", segments.size(), - tableNameWithType, timeOutInMillis, segments); - long endTimeMs = System.currentTimeMillis() + timeOutInMillis; - do { - if (Collections.disjoint(getSegmentsFor(tableNameWithType, false), segments)) { - return; - } else { - Thread.sleep(SEGMENT_CLEANUP_CHECK_INTERVAL_MS); - } - } while (System.currentTimeMillis() < endTimeMs); - throw new RuntimeException( - "Timeout while waiting for segments to be deleted for table: " + tableNameWithType + ", timeout: " - + timeOutInMillis + "ms"); - } - /** * Computes the end segment replace phase * diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java index 5e9def8a56..6569a6a4be 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java @@ -1053,8 +1053,7 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest { // Call revert segment replacements (s3, s4, s5) <- (s9, s10, s11) to check if the revertReplaceSegments correctly // deleted (s9, s10, s11). _helixResourceManager.revertReplaceSegments(OFFLINE_TABLE_NAME, lineageEntryId3, false); - TestUtils.waitForCondition(aVoid -> _helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME, false).size() == 3, - 60_000L, "Failed to delete the segments"); + assertEquals(_helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME, false).size(), 3); assertSetEquals(_helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME, true), "s3", "s4", "s5"); // Re-upload (s9, s10, s11) to test the segment clean up from startReplaceSegments --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org