This is an automated email from the ASF dual-hosted git repository. manishswaminathan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 68260f9870 Add support for orphan segment cleanup (#15142) 68260f9870 is described below commit 68260f98705e15a4005bdcb8ea4d454736aec652 Author: 9aman <35227405+9a...@users.noreply.github.com> AuthorDate: Tue Mar 18 22:42:50 2025 +0530 Add support for orphan segment cleanup (#15142) * Add support for orphan segment cleanup * Add unit tests to the code * Minor refactoring * Fixing checkstyle violations * Add support for realtime tables * Fix test failures * Adding metrics, improving logs for better debuggability * Reducing code repetition * Add support for providing match size for number of untracked segments to be deleted in an single run of RetentionManager * Add test cases to test batch sizes * Provide additional comments for test class * Fixing linting issues * Adding a controller config to enable/ disable deletion of untracked segments from deepstore during retention manager run --- .../pinot/common/metrics/ControllerGauge.java | 6 +- .../apache/pinot/controller/ControllerConf.java | 13 + .../helix/core/SegmentDeletionManager.java | 4 +- .../helix/core/retention/RetentionManager.java | 162 +++++++++- .../core/retention/strategy/RetentionStrategy.java | 13 +- .../retention/strategy/TimeRetentionStrategy.java | 14 +- .../helix/core/retention/RetentionManagerTest.java | 348 ++++++++++++++++++++- .../SegmentsValidationAndRetentionConfig.java | 10 + 8 files changed, 538 insertions(+), 32 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java index 40e2d09b4f..4777573604 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java @@ -190,7 +190,11 @@ public enum ControllerGauge implements AbstractMetrics.Gauge { REINGESTED_SEGMENT_UPLOADS_IN_PROGRESS("reingestedSegmentUploadsInProgress", true), // Resource utilization is within limits or not for a table - RESOURCE_UTILIZATION_LIMIT_EXCEEDED("ResourceUtilizationLimitExceeded", false); + RESOURCE_UTILIZATION_LIMIT_EXCEEDED("ResourceUtilizationLimitExceeded", false), + + // The number of segments in deepstore that do not have corresponding metadata in ZooKeeper. + // These segments are untracked and should be considered for deletion based on retention policies. + UNTRACKED_SEGMENTS_COUNT("untrackedSegmentsCount", false); private final String _gaugeName; private final String _unit; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java index d3396cd3a7..384371e2e8 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java @@ -241,6 +241,10 @@ public class ControllerConf extends PinotConfiguration { public static final String TMP_SEGMENT_RETENTION_IN_SECONDS = "controller.realtime.segment.tmpFileRetentionInSeconds"; + // Enables the deletion of untracked segments during the retention manager run. + // Untracked segments are those that exist in deep store but have no corresponding entry in the ZK property store. + public static final String ENABLE_UNTRACKED_SEGMENT_DELETION = + "controller.retentionManager.untrackedSegmentDeletionEnabled"; public static final int MIN_INITIAL_DELAY_IN_SECONDS = 120; public static final int MAX_INITIAL_DELAY_IN_SECONDS = 300; public static final int DEFAULT_SPLIT_COMMIT_TMP_SEGMENT_LIFETIME_SECOND = 60 * 60; // 1 Hour. @@ -1081,6 +1085,15 @@ public class ControllerConf extends PinotConfiguration { ControllerPeriodicTasksConf.DEFAULT_SPLIT_COMMIT_TMP_SEGMENT_LIFETIME_SECOND); } + public boolean getUntrackedSegmentDeletionEnabled() { + return getProperty(ControllerPeriodicTasksConf.ENABLE_UNTRACKED_SEGMENT_DELETION, false); + } + + public void setUntrackedSegmentDeletionEnabled(boolean untrackedSegmentDeletionEnabled) { + setProperty(ControllerPeriodicTasksConf.ENABLE_UNTRACKED_SEGMENT_DELETION, untrackedSegmentDeletionEnabled); + } + + public long getPinotTaskManagerInitialDelaySeconds() { return getPeriodicTaskInitialDelayInSeconds(); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java index e5ab692fc9..c001042f9a 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java @@ -222,7 +222,9 @@ public class SegmentDeletionManager { URI segmentMetadataUri = SegmentPushUtils.generateSegmentMetadataURI(segmentFileUri.toString(), segmentId); if (pinotFS.exists(segmentMetadataUri)) { LOGGER.info("Deleting segment metadata {} from {}", segmentId, segmentMetadataUri); - pinotFS.delete(segmentMetadataUri, true); + if (!pinotFS.delete(segmentMetadataUri, true)) { + LOGGER.warn("Could not delete segment metadata: {} from: {}", segmentId, segmentMetadataUri); + } } } catch (IOException e) { LOGGER.warn("Could not delete segment metadata {} from {}", segmentId, segmentFileUri, e); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java index 9bd8b9f62f..5b365d7768 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java @@ -18,18 +18,26 @@ */ package org.apache.pinot.controller.helix.core.retention; +import java.io.IOException; +import java.net.URI; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import javax.annotation.Nullable; import org.apache.helix.model.IdealState; import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.logging.log4j.util.Strings; import org.apache.pinot.common.lineage.SegmentLineage; import org.apache.pinot.common.lineage.SegmentLineageAccessHelper; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.metrics.ControllerGauge; import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.common.utils.TarCompressionUtils; +import org.apache.pinot.common.utils.URIUtils; import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.LeadControllerManager; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; @@ -39,6 +47,9 @@ import org.apache.pinot.controller.helix.core.retention.strategy.TimeRetentionSt import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.filesystem.FileMetadata; +import org.apache.pinot.spi.filesystem.PinotFS; +import org.apache.pinot.spi.filesystem.PinotFSFactory; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status; import org.apache.pinot.spi.utils.IngestionConfigUtils; @@ -55,7 +66,9 @@ import org.slf4j.LoggerFactory; */ public class RetentionManager extends ControllerPeriodicTask<Void> { public static final long OLD_LLC_SEGMENTS_RETENTION_IN_MILLIS = TimeUnit.DAYS.toMillis(5L); + public static final int DEFAULT_UNTRACKED_SEGMENTS_DELETION_BATCH_SIZE = 100; private static final RetryPolicy DEFAULT_RETRY_POLICY = RetryPolicies.randomDelayRetryPolicy(20, 100L, 200L); + private final boolean _untrackedSegmentDeletionEnabled; private static final Logger LOGGER = LoggerFactory.getLogger(RetentionManager.class); @@ -64,7 +77,7 @@ public class RetentionManager extends ControllerPeriodicTask<Void> { super("RetentionManager", config.getRetentionControllerFrequencyInSeconds(), config.getRetentionManagerInitialDelayInSeconds(), pinotHelixResourceManager, leadControllerManager, controllerMetrics); - + _untrackedSegmentDeletionEnabled = config.getUntrackedSegmentDeletionEnabled(); LOGGER.info("Starting RetentionManager with runFrequencyInSeconds: {}", getIntervalInSeconds()); } @@ -105,6 +118,10 @@ public class RetentionManager extends ControllerPeriodicTask<Void> { } String retentionTimeUnit = validationConfig.getRetentionTimeUnit(); String retentionTimeValue = validationConfig.getRetentionTimeValue(); + int untrackedSegmentsDeletionBatchSize = + validationConfig.getUntrackedSegmentsDeletionBatchSize() != null ? Integer.parseInt( + validationConfig.getUntrackedSegmentsDeletionBatchSize()) : DEFAULT_UNTRACKED_SEGMENTS_DELETION_BATCH_SIZE; + RetentionStrategy retentionStrategy; try { retentionStrategy = new TimeRetentionStrategy(TimeUnit.valueOf(retentionTimeUnit.toUpperCase()), @@ -117,15 +134,23 @@ public class RetentionManager extends ControllerPeriodicTask<Void> { // Scan all segment ZK metadata and purge segments if necessary if (TableNameBuilder.isOfflineTableResource(tableNameWithType)) { - manageRetentionForOfflineTable(tableNameWithType, retentionStrategy); + manageRetentionForOfflineTable(tableNameWithType, retentionStrategy, untrackedSegmentsDeletionBatchSize); } else { - manageRetentionForRealtimeTable(tableNameWithType, retentionStrategy); + manageRetentionForRealtimeTable(tableNameWithType, retentionStrategy, untrackedSegmentsDeletionBatchSize); } } - private void manageRetentionForOfflineTable(String offlineTableName, RetentionStrategy retentionStrategy) { - List<String> segmentsToDelete = new ArrayList<>(); - for (SegmentZKMetadata segmentZKMetadata : _pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) { + private void manageRetentionForOfflineTable(String offlineTableName, RetentionStrategy retentionStrategy, + int untrackedSegmentsDeletionBatchSize) { + List<SegmentZKMetadata> segmentZKMetadataList = _pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName); + + // fetch those segments that are beyond the retention period and don't have an entry in ZK i.e. + // SegmentZkMetadata is missing for those segments + List<String> segmentsToDelete = + getSegmentsToDeleteFromDeepstore(offlineTableName, retentionStrategy, segmentZKMetadataList, + untrackedSegmentsDeletionBatchSize); + + for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) { if (retentionStrategy.isPurgeable(offlineTableName, segmentZKMetadata)) { segmentsToDelete.add(segmentZKMetadata.getSegmentName()); } @@ -136,11 +161,20 @@ public class RetentionManager extends ControllerPeriodicTask<Void> { } } - private void manageRetentionForRealtimeTable(String realtimeTableName, RetentionStrategy retentionStrategy) { - List<String> segmentsToDelete = new ArrayList<>(); + private void manageRetentionForRealtimeTable(String realtimeTableName, RetentionStrategy retentionStrategy, + int untrackedSegmentsDeletionBatchSize) { + List<SegmentZKMetadata> segmentZKMetadataList = _pinotHelixResourceManager.getSegmentsZKMetadata(realtimeTableName); + + // fetch those segments that are beyond the retention period and don't have an entry in ZK i.e. + // SegmentZkMetadata is missing for those segments + List<String> segmentsToDelete = + getSegmentsToDeleteFromDeepstore(realtimeTableName, retentionStrategy, segmentZKMetadataList, + untrackedSegmentsDeletionBatchSize); + IdealState idealState = _pinotHelixResourceManager.getHelixAdmin() .getResourceIdealState(_pinotHelixResourceManager.getHelixClusterName(), realtimeTableName); - for (SegmentZKMetadata segmentZKMetadata : _pinotHelixResourceManager.getSegmentsZKMetadata(realtimeTableName)) { + + for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) { String segmentName = segmentZKMetadata.getSegmentName(); if (segmentZKMetadata.getStatus() == Status.IN_PROGRESS) { // Delete old LLC segment that hangs around. Do not delete segment that are current since there may be a race @@ -189,6 +223,116 @@ public class RetentionManager extends ControllerPeriodicTask<Void> { } } + private List<String> getSegmentsToDeleteFromDeepstore(String tableNameWithType, RetentionStrategy retentionStrategy, + List<SegmentZKMetadata> segmentZKMetadataList, int untrackedSegmentsDeletionBatchSize) { + List<String> segmentsToDelete = new ArrayList<>(); + + if (!_untrackedSegmentDeletionEnabled) { + LOGGER.info( + "Not scanning deep store for untracked segments for table: {}", tableNameWithType); + return segmentsToDelete; + } + + if (untrackedSegmentsDeletionBatchSize <= 0) { + // return an empty list in case untracked segment deletion batch size is configured < 0 in table config + LOGGER.info( + "Not scanning deep store for untracked segments for table: {} as untrackedSegmentsDeletionBatchSize is set " + + "to: {}", + tableNameWithType, untrackedSegmentsDeletionBatchSize); + return segmentsToDelete; + } + + List<String> segmentsPresentInZK = + segmentZKMetadataList.stream().map(SegmentZKMetadata::getSegmentName).collect(Collectors.toList()); + try { + LOGGER.info("Fetch segments present in deep store that are beyond retention period for table: {}", + tableNameWithType); + segmentsToDelete = + findUntrackedSegmentsToDeleteFromDeepstore(tableNameWithType, retentionStrategy, segmentsPresentInZK); + _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.UNTRACKED_SEGMENTS_COUNT, + segmentsToDelete.size()); + + if (segmentsToDelete.size() > untrackedSegmentsDeletionBatchSize) { + LOGGER.info("Truncating segments to delete from {} to {} for table: {}", + segmentsToDelete.size(), untrackedSegmentsDeletionBatchSize, tableNameWithType); + segmentsToDelete = segmentsToDelete.subList(0, untrackedSegmentsDeletionBatchSize); + } + } catch (IOException e) { + LOGGER.warn("Unable to fetch segments from deep store that are beyond retention period for table: {}", + tableNameWithType); + } + + return segmentsToDelete; + } + + + /** + * Identifies segments in deepstore that are ready for deletion based on the retention strategy. + * + * This method finds segments that are beyond the retention period and are ready to be purged. + * It only considers segments that do not have entries in ZooKeeper metadata i.e. untracked segments. + * The lastModified time of the file in deepstore is used to determine whether the segment + * should be retained or purged. + * + * @param tableNameWithType Name of the offline table + * @param retentionStrategy Strategy to determine if a segment should be purged + * @param segmentsToExclude List of segment names that should be excluded from deletion + * @return List of segment names that should be deleted from deepstore + * @throws IOException If there's an error accessing the filesystem + */ + private List<String> findUntrackedSegmentsToDeleteFromDeepstore(String tableNameWithType, + RetentionStrategy retentionStrategy, List<String> segmentsToExclude) + throws IOException { + + List<String> segmentsToDelete = new ArrayList<>(); + String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); + URI tableDataUri = URIUtils.getUri(_pinotHelixResourceManager.getDataDir(), rawTableName); + PinotFS pinotFS = PinotFSFactory.create(tableDataUri.getScheme()); + + long startTimeMs = System.currentTimeMillis(); + + List<FileMetadata> deepstoreFiles = pinotFS.listFilesWithMetadata(tableDataUri, false); + long listEndTimeMs = System.currentTimeMillis(); + LOGGER.info("Found: {} segments in deepstore for table: {}. Time taken to list segments: {} ms", + deepstoreFiles.size(), tableNameWithType, listEndTimeMs - startTimeMs); + + for (FileMetadata fileMetadata : deepstoreFiles) { + if (fileMetadata.isDirectory()) { + continue; + } + + String segmentName = extractSegmentName(fileMetadata.getFilePath()); + if (Strings.isEmpty(segmentName) || segmentsToExclude.contains(segmentName)) { + continue; + } + + // determine whether the segment should be purged or not based on the last modified time of the file + long lastModifiedTime = fileMetadata.getLastModifiedTime(); + + if (retentionStrategy.isPurgeable(tableNameWithType, segmentName, lastModifiedTime)) { + segmentsToDelete.add(segmentName); + } + } + long endTimeMs = System.currentTimeMillis(); + LOGGER.info( + "Took: {} ms to identify {} segments for deletion from deep store for table: {} as they have no corresponding" + + " entry in the property store.", + endTimeMs - startTimeMs, segmentsToDelete.size(), tableNameWithType); + return segmentsToDelete; + } + + @Nullable + private String extractSegmentName(@Nullable String filePath) { + if (Strings.isEmpty(filePath)) { + return null; + } + String segmentName = filePath.substring(filePath.lastIndexOf("/") + 1); + if (segmentName.endsWith(TarCompressionUtils.TAR_GZ_FILE_EXTENSION)) { + segmentName = segmentName.substring(0, segmentName.length() - TarCompressionUtils.TAR_GZ_FILE_EXTENSION.length()); + } + return segmentName; + } + private void manageSegmentLineageCleanupForTable(TableConfig tableConfig) { String tableNameWithType = tableConfig.getTableName(); List<String> segmentsToDelete = new ArrayList<>(); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/RetentionStrategy.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/RetentionStrategy.java index e8f6336961..8e31cce37c 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/RetentionStrategy.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/RetentionStrategy.java @@ -34,4 +34,15 @@ public interface RetentionStrategy { * @return Whether the segment should be purged */ boolean isPurgeable(String tableNameWithType, SegmentZKMetadata segmentZKMetadata); -} + + /** + * Determines whether a segment is eligible for purging + * + * @param tableNameWithType The table name, including its type. + * @param segmentName The name of the segment to evaluate. + * @param segmentTimeMs The segment's timestamp in milliseconds, which could be the end time from ZK metadata or + * the modification time (mTime) for the file in deep store etc. + * @return {@code true} if the segment should be purged; {@code false} otherwise. + */ + boolean isPurgeable(String tableNameWithType, String segmentName, long segmentTimeMs); + } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java index b98fe5b534..cda94dbb5d 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java @@ -39,15 +39,19 @@ public class TimeRetentionStrategy implements RetentionStrategy { @Override public boolean isPurgeable(String tableNameWithType, SegmentZKMetadata segmentZKMetadata) { - long endTimeMs = segmentZKMetadata.getEndTimeMs(); + return isPurgeable(tableNameWithType, segmentZKMetadata.getSegmentName(), segmentZKMetadata.getEndTimeMs()); + } + + @Override + public boolean isPurgeable(String tableNameWithType, String segmentName, long segmentTimeMs) { // Check that the end time is between 1971 and 2071 - if (!TimeUtils.timeValueInValidRange(endTimeMs)) { - LOGGER.warn("Segment: {} of table: {} has invalid end time in millis: {}", segmentZKMetadata.getSegmentName(), - tableNameWithType, endTimeMs); + if (!TimeUtils.timeValueInValidRange(segmentTimeMs)) { + LOGGER.warn("Segment: {} of table: {} has invalid end time in millis: {}", segmentName, + tableNameWithType, segmentTimeMs); return false; } - return System.currentTimeMillis() - endTimeMs > _retentionMs; + return System.currentTimeMillis() - segmentTimeMs > _retentionMs; } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java index b3e656de9e..f4a71c55f1 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java @@ -18,10 +18,16 @@ */ package org.apache.pinot.controller.helix.core.retention; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.attribute.FileTime; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import org.apache.commons.io.FileUtils; import org.apache.helix.HelixAdmin; import org.apache.helix.model.IdealState; import org.apache.helix.store.zk.ZkHelixPropertyStore; @@ -42,12 +48,16 @@ import org.apache.pinot.spi.stream.LongMsgOffset; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import static org.apache.pinot.controller.helix.core.retention.RetentionManager.DEFAULT_UNTRACKED_SEGMENTS_DELETION_BATCH_SIZE; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.*; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; @@ -57,7 +67,31 @@ public class RetentionManagerTest { private static final String OFFLINE_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType(TEST_TABLE_NAME); private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(TEST_TABLE_NAME); - private void testDifferentTimeUnits(long pastTimeStamp, TimeUnit timeUnit, long dayAfterTomorrowTimeStamp) { + // Variables for real file test + private Path _tempDir; + private File _tableDir; + + @BeforeMethod + public void setUp() throws Exception { + // Setup for real file test + _tempDir = Files.createTempDirectory("pinot-retention-test"); + _tableDir = new File(_tempDir.toFile(), TEST_TABLE_NAME); + _tableDir.mkdirs(); + + final long pastMillisSinceEpoch = 1343001600000L; + final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() / 1000 / 60 / 60 / 24 + 2; + } + + @AfterMethod + public void tearDown() throws Exception { + // Clean up the temporary directory after each test + if (_tempDir != null) { + FileUtils.deleteDirectory(_tempDir.toFile()); + } + } + + private void testDifferentTimeUnits(long pastTimeStamp, TimeUnit timeUnit, long dayAfterTomorrowTimeStamp, + String untrackedSegmentsDeletionBatchSize, int untrackedSegmentsInDeepstoreSize) { List<SegmentZKMetadata> segmentsZKMetadata = new ArrayList<>(); // Create metadata for 10 segments really old, that will be removed by the retention manager. final int numOlderSegments = 10; @@ -73,19 +107,77 @@ public class RetentionManagerTest { mockSegmentZKMetadata(dayAfterTomorrowTimeStamp, dayAfterTomorrowTimeStamp, timeUnit); segmentsZKMetadata.add(segmentZKMetadata); } + + // Create actual segment files with specific modification times + // 1. A file that should be kept (in ZK metadata) + File segment1File = new File(_tableDir, segmentsZKMetadata.get(0).getSegmentName()); + createFileWithContent(segment1File, "segment1 data"); + setFileModificationTime(segment1File, timeUnit.toMillis(pastTimeStamp)); + + // 2. A file that should be kept (in ZK metadata) + File segment2File = new File(_tableDir, segmentsZKMetadata.get(10).getSegmentName()); + createFileWithContent(segment2File, "segment2 data"); + setFileModificationTime(segment2File, timeUnit.toMillis(pastTimeStamp)); + + // 3. A file that should not be deleted (not in ZK metadata but recent) + File segment3File = new File(_tableDir, "segment3.tar.gz"); + createFileWithContent(segment3File, "segment3 data"); + setFileModificationTime(segment3File, timeUnit.toMillis(dayAfterTomorrowTimeStamp)); + + int deletionBatchSize = untrackedSegmentsDeletionBatchSize == null ? DEFAULT_UNTRACKED_SEGMENTS_DELETION_BATCH_SIZE + : Integer.parseInt(untrackedSegmentsDeletionBatchSize); + + // Create additional untracked segment files to test batch size limit + if (untrackedSegmentsInDeepstoreSize > 0) { + // Create more untracked segments + for (int i = 0; i < untrackedSegmentsInDeepstoreSize; i++) { + String segmentName = "extraSegment" + i; + File segmentFile = new File(_tableDir, segmentName); + createFileWithContent(segmentFile, "extra segment " + i + " data"); + setFileModificationTime(segmentFile, timeUnit.toMillis(pastTimeStamp)); + if (i < deletionBatchSize) { + // Add segments to the removed list till we reach untrackedSegmentsDeletionBatchSize + removedSegments.add(segmentName); + } + } + } + final TableConfig tableConfig = createOfflineTableConfig(); + // Set untrackedSegmentsDeletionBatchSize if not null + if (untrackedSegmentsDeletionBatchSize != null) { + tableConfig.getValidationConfig().setUntrackedSegmentsDeletionBatchSize(untrackedSegmentsDeletionBatchSize); + } + LeadControllerManager leadControllerManager = mock(LeadControllerManager.class); when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true); PinotHelixResourceManager pinotHelixResourceManager = mock(PinotHelixResourceManager.class); - setupPinotHelixResourceManager(tableConfig, removedSegments, pinotHelixResourceManager, leadControllerManager); + + // Use appropriate setup based on test case + // In case of untrackedSegmentsDeletionBatchSize < untrackedSegmentsInDeepstoreSize, we cannot guarantee which + // files/ segments will be picked for deletion as there is not ordering/ sorting done before selecting + // untrackedSegmentsDeletionBatchSize out of untrackedSegmentsInDeepstoreSize to delete. + // For the case untrackedSegmentsDeletionBatchSize < untrackedSegmentsInDeepstoreSize we just check the size of the + // segments that will get deleted. + // if the untrackedSegmentsDeletionBatchSize all the segments will be deleted as the batch size by default is 100 + if (deletionBatchSize >= untrackedSegmentsInDeepstoreSize) { + // Use original setup for the case when all the segments will be included + setupPinotHelixResourceManager(tableConfig, removedSegments, pinotHelixResourceManager, leadControllerManager); + } else { + // Use batch size specific setup + setupPinotHelixResourceManagerForBatchSize(tableConfig, numOlderSegments, + deletionBatchSize, segmentsZKMetadata, + pinotHelixResourceManager, leadControllerManager); + } when(pinotHelixResourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig); when(pinotHelixResourceManager.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(segmentsZKMetadata); + when(pinotHelixResourceManager.getDataDir()).thenReturn(_tempDir.toString()); ControllerConf conf = new ControllerConf(); ControllerMetrics controllerMetrics = new ControllerMetrics(PinotMetricUtils.getPinotMetricsRegistry()); conf.setRetentionControllerFrequencyInSeconds(0); conf.setDeletedSegmentsRetentionInDays(0); + conf.setUntrackedSegmentDeletionEnabled(true); RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, leadControllerManager, conf, controllerMetrics); retentionManager.start(); @@ -93,52 +185,212 @@ public class RetentionManagerTest { SegmentDeletionManager deletionManager = pinotHelixResourceManager.getSegmentDeletionManager(); - // Verify that the removeAgedDeletedSegments() method in deletion manager is actually called. + // Verify that the removeAgedDeletedSegments() method in deletion manager is called verify(deletionManager, times(1)).removeAgedDeletedSegments(leadControllerManager); - // Verify that the deleteSegments method is actually called. - verify(pinotHelixResourceManager, times(1)).deleteSegments(anyString(), anyList()); + // Verify deleteSegments is called + verify(pinotHelixResourceManager, times(1)).deleteSegments(eq(OFFLINE_TABLE_NAME), anyList()); } @Test - public void testRetentionWithMinutes() { + public void testRetentionWithMinutesNoBatchSizeAndSegmentsInDeepStore() { final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() / 1000 / 60 / 60 / 24 + 2; final long minutesSinceEpochTimeStamp = theDayAfterTomorrowSinceEpoch * 24 * 60; final long pastMinutesSinceEpoch = 22383360L; - testDifferentTimeUnits(pastMinutesSinceEpoch, TimeUnit.MINUTES, minutesSinceEpochTimeStamp); + testDifferentTimeUnits(pastMinutesSinceEpoch, TimeUnit.MINUTES, minutesSinceEpochTimeStamp, null, 4); + } + + @Test + public void testRetentionWithMinutesNoBatchSizeAndMoreSegmentsInDeepStore() { + // For this test the default batch size will get picked + final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() / 1000 / 60 / 60 / 24 + 2; + final long minutesSinceEpochTimeStamp = theDayAfterTomorrowSinceEpoch * 24 * 60; + final long pastMinutesSinceEpoch = 22383360L; + testDifferentTimeUnits(pastMinutesSinceEpoch, TimeUnit.MINUTES, minutesSinceEpochTimeStamp, null, 105); + } + + + @Test + public void testRetentionWithMinutesWithBatchSizeAndLessSegmentsInDeepStore() { + final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() / 1000 / 60 / 60 / 24 + 2; + final long minutesSinceEpochTimeStamp = theDayAfterTomorrowSinceEpoch * 24 * 60; + final long pastMinutesSinceEpoch = 22383360L; + testDifferentTimeUnits(pastMinutesSinceEpoch, TimeUnit.MINUTES, minutesSinceEpochTimeStamp, "5", 3); + } + + @Test + public void testRetentionWithMinutesWithBatchSizeAndMoreSegmentsInDeepStore() { + final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() / 1000 / 60 / 60 / 24 + 2; + final long minutesSinceEpochTimeStamp = theDayAfterTomorrowSinceEpoch * 24 * 60; + final long pastMinutesSinceEpoch = 22383360L; + testDifferentTimeUnits(pastMinutesSinceEpoch, TimeUnit.MINUTES, minutesSinceEpochTimeStamp, "5", 10); + } + + + @Test + public void testRetentionWithSecondsNoBatchSizeAndSegmentsInDeepStore() { + final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() / 1000 / 60 / 60 / 24 + 2; + final long secondsSinceEpochTimeStamp = theDayAfterTomorrowSinceEpoch * 24 * 60 * 60; + final long pastSecondsSinceEpoch = 1343001600L; + testDifferentTimeUnits(pastSecondsSinceEpoch, TimeUnit.SECONDS, secondsSinceEpochTimeStamp, null, 4); + } + + @Test + public void testRetentionWithSecondsWithBatchSizeAndLessSegmentsInDeepStore() { + final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() / 1000 / 60 / 60 / 24 + 2; + final long secondsSinceEpochTimeStamp = theDayAfterTomorrowSinceEpoch * 24 * 60 * 60; + final long pastSecondsSinceEpoch = 1343001600L; + testDifferentTimeUnits(pastSecondsSinceEpoch, TimeUnit.SECONDS, secondsSinceEpochTimeStamp, "5", 3); } @Test - public void testRetentionWithSeconds() { + public void testRetentionWithSecondsWithBatchSizeAndMoreSegmentsInDeepStore() { final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() / 1000 / 60 / 60 / 24 + 2; final long secondsSinceEpochTimeStamp = theDayAfterTomorrowSinceEpoch * 24 * 60 * 60; final long pastSecondsSinceEpoch = 1343001600L; - testDifferentTimeUnits(pastSecondsSinceEpoch, TimeUnit.SECONDS, secondsSinceEpochTimeStamp); + testDifferentTimeUnits(pastSecondsSinceEpoch, TimeUnit.SECONDS, secondsSinceEpochTimeStamp, "5", 10); } @Test - public void testRetentionWithMillis() { + public void testRetentionWithMillisNoBatchSizeAndSegmentsInDeepStore() { final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() / 1000 / 60 / 60 / 24 + 2; final long millisSinceEpochTimeStamp = theDayAfterTomorrowSinceEpoch * 24 * 60 * 60 * 1000; final long pastMillisSinceEpoch = 1343001600000L; - testDifferentTimeUnits(pastMillisSinceEpoch, TimeUnit.MILLISECONDS, millisSinceEpochTimeStamp); + testDifferentTimeUnits(pastMillisSinceEpoch, TimeUnit.MILLISECONDS, millisSinceEpochTimeStamp, null, 4); } @Test - public void testRetentionWithHours() { + public void testRetentionWithMillisWithBatchSizeAndLessSegmentsInDeepStore() { + final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() / 1000 / 60 / 60 / 24 + 2; + final long millisSinceEpochTimeStamp = theDayAfterTomorrowSinceEpoch * 24 * 60 * 60 * 1000; + final long pastMillisSinceEpoch = 1343001600000L; + testDifferentTimeUnits(pastMillisSinceEpoch, TimeUnit.MILLISECONDS, millisSinceEpochTimeStamp, "5", 3); + } + + @Test + public void testRetentionWithMillisWithBatchSizeAndMoreSegmentsInDeepStore() { + final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() / 1000 / 60 / 60 / 24 + 2; + final long millisSinceEpochTimeStamp = theDayAfterTomorrowSinceEpoch * 24 * 60 * 60 * 1000; + final long pastMillisSinceEpoch = 1343001600000L; + testDifferentTimeUnits(pastMillisSinceEpoch, TimeUnit.MILLISECONDS, millisSinceEpochTimeStamp, "5", 10); + } + + @Test + public void testRetentionWithHoursNoBatchSizeAndSegmentsInDeepStore() { final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() / 1000 / 60 / 60 / 24 + 2; final long hoursSinceEpochTimeStamp = theDayAfterTomorrowSinceEpoch * 24; final long pastHoursSinceEpoch = 373056L; - testDifferentTimeUnits(pastHoursSinceEpoch, TimeUnit.HOURS, hoursSinceEpochTimeStamp); + testDifferentTimeUnits(pastHoursSinceEpoch, TimeUnit.HOURS, hoursSinceEpochTimeStamp, null, 4); } @Test - public void testRetentionWithDays() { + public void testRetentionWithHoursWithBatchSizeAndLessSegmentsInDeepStore() { + final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() / 1000 / 60 / 60 / 24 + 2; + final long hoursSinceEpochTimeStamp = theDayAfterTomorrowSinceEpoch * 24; + final long pastHoursSinceEpoch = 373056L; + testDifferentTimeUnits(pastHoursSinceEpoch, TimeUnit.HOURS, hoursSinceEpochTimeStamp, "5", 3); + } + + @Test + public void testRetentionWithHoursWithBatchSizeAndMoreSegmentsInDeepStore() { + final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() / 1000 / 60 / 60 / 24 + 2; + final long hoursSinceEpochTimeStamp = theDayAfterTomorrowSinceEpoch * 24; + final long pastHoursSinceEpoch = 373056L; + testDifferentTimeUnits(pastHoursSinceEpoch, TimeUnit.HOURS, hoursSinceEpochTimeStamp, "5", 10); + } + + + @Test + public void testRetentionWithDaysNoBatchSizeAndSegmentsInDeepStore() { + final long daysSinceEpochTimeStamp = System.currentTimeMillis() / 1000 / 60 / 60 / 24 + 2; + final long pastDaysSinceEpoch = 15544L; + testDifferentTimeUnits(pastDaysSinceEpoch, TimeUnit.DAYS, daysSinceEpochTimeStamp, null, 4); + } + + @Test + public void testRetentionWithDaysWithBatchSizeAndLessSegmentsInDeepStore() { final long daysSinceEpochTimeStamp = System.currentTimeMillis() / 1000 / 60 / 60 / 24 + 2; final long pastDaysSinceEpoch = 15544L; - testDifferentTimeUnits(pastDaysSinceEpoch, TimeUnit.DAYS, daysSinceEpochTimeStamp); + testDifferentTimeUnits(pastDaysSinceEpoch, TimeUnit.DAYS, daysSinceEpochTimeStamp, "5", 3); } + @Test + public void testRetentionWithDaysWithBatchSizeAndMoreSegmentsInDeepStore() { + final long daysSinceEpochTimeStamp = System.currentTimeMillis() / 1000 / 60 / 60 / 24 + 2; + final long pastDaysSinceEpoch = 15544L; + testDifferentTimeUnits(pastDaysSinceEpoch, TimeUnit.DAYS, daysSinceEpochTimeStamp, "5", 10); + } + + @Test + public void testOffByDefaultForUntrackedSegmentsDeletion() { + long pastTimeStamp = 15544L; + TimeUnit timeUnit = TimeUnit.DAYS; + long dayAfterTomorrowTimeStamp = System.currentTimeMillis() / 1000 / 60 / 60 / 24 + 2; + + List<SegmentZKMetadata> segmentsZKMetadata = new ArrayList<>(); + // Create metadata for 10 segments really old, that will be removed by the retention manager. + final int numOlderSegments = 10; + List<String> removedSegments = new ArrayList<>(); + for (int i = 0; i < numOlderSegments; i++) { + SegmentZKMetadata segmentZKMetadata = mockSegmentZKMetadata(pastTimeStamp, pastTimeStamp, timeUnit); + segmentsZKMetadata.add(segmentZKMetadata); + removedSegments.add(segmentZKMetadata.getSegmentName()); + } + // Create metadata for 5 segments that will not be removed. + for (int i = 0; i < 5; i++) { + SegmentZKMetadata segmentZKMetadata = + mockSegmentZKMetadata(dayAfterTomorrowTimeStamp, dayAfterTomorrowTimeStamp, timeUnit); + segmentsZKMetadata.add(segmentZKMetadata); + } + + // Create actual segment files with specific modification times + // 1. A file that should be kept (in ZK metadata) + File segment1File = new File(_tableDir, segmentsZKMetadata.get(0).getSegmentName()); + createFileWithContent(segment1File, "segment1 data"); + setFileModificationTime(segment1File, timeUnit.toMillis(pastTimeStamp)); + + // 2. A file that should be kept (in ZK metadata) + File segment2File = new File(_tableDir, segmentsZKMetadata.get(10).getSegmentName()); + createFileWithContent(segment2File, "segment2 data"); + setFileModificationTime(segment2File, timeUnit.toMillis(pastTimeStamp)); + + // 3. A file that should not be deleted as the deletion of untracked segments is off by default + File segment3File = new File(_tableDir, "segment3.tar.gz"); + createFileWithContent(segment3File, "segment3 data"); + setFileModificationTime(segment3File, timeUnit.toMillis(pastTimeStamp)); + + final TableConfig tableConfig = createOfflineTableConfig(); + + LeadControllerManager leadControllerManager = mock(LeadControllerManager.class); + when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true); + PinotHelixResourceManager pinotHelixResourceManager = mock(PinotHelixResourceManager.class); + + setupPinotHelixResourceManager(tableConfig, removedSegments, pinotHelixResourceManager, leadControllerManager); + + when(pinotHelixResourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig); + when(pinotHelixResourceManager.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(segmentsZKMetadata); + when(pinotHelixResourceManager.getDataDir()).thenReturn(_tempDir.toString()); + + ControllerConf conf = new ControllerConf(); + ControllerMetrics controllerMetrics = new ControllerMetrics(PinotMetricUtils.getPinotMetricsRegistry()); + conf.setRetentionControllerFrequencyInSeconds(0); + conf.setDeletedSegmentsRetentionInDays(0); + + RetentionManager retentionManager = + new RetentionManager(pinotHelixResourceManager, leadControllerManager, conf, controllerMetrics); + retentionManager.start(); + retentionManager.run(); + + SegmentDeletionManager deletionManager = pinotHelixResourceManager.getSegmentDeletionManager(); + + // Verify that the removeAgedDeletedSegments() method in deletion manager is called + verify(deletionManager, times(1)).removeAgedDeletedSegments(leadControllerManager); + + // Verify deleteSegments is called + verify(pinotHelixResourceManager, times(1)).deleteSegments(eq(OFFLINE_TABLE_NAME), anyList()); + } + + private TableConfig createOfflineTableConfig() { return new TableConfigBuilder(TableType.OFFLINE).setTableName(TEST_TABLE_NAME).setRetentionTimeUnit("DAYS") .setRetentionTimeValue("365").setNumReplicas(2).build(); @@ -179,6 +431,47 @@ public class RetentionManagerTest { }).when(resourceManager).deleteSegments(anyString(), anyList()); } + private void setupPinotHelixResourceManagerForBatchSize(TableConfig tableConfig, int numOlderSegments, + int untrackedSegmentsDeletionBatchSize, List<SegmentZKMetadata> segmentsZKMetadata, + PinotHelixResourceManager resourceManager, LeadControllerManager leadControllerManager) { + + String tableNameWithType = tableConfig.getTableName(); + when(resourceManager.getAllTables()).thenReturn(List.of(tableNameWithType)); + + ZkHelixPropertyStore<ZNRecord> propertyStore = mock(ZkHelixPropertyStore.class); + when(resourceManager.getPropertyStore()).thenReturn(propertyStore); + + SegmentDeletionManager deletionManager = mock(SegmentDeletionManager.class); + doAnswer(invocationOnMock -> null).when(deletionManager).removeAgedDeletedSegments(leadControllerManager); + when(resourceManager.getSegmentDeletionManager()).thenReturn(deletionManager); + + // Set up verification for deleteSegments with focus on the count and segment inclusion rules + doAnswer(invocationOnMock -> { + Object[] args = invocationOnMock.getArguments(); + String tableNameArg = (String) args[0]; + assertEquals(tableNameArg, tableNameWithType); + List<String> segmentListArg = (List<String>) args[1]; + + // Verify all the old metadata segments are included + for (int i = 0; i < numOlderSegments; i++) { + assertTrue(segmentListArg.contains(segmentsZKMetadata.get(i).getSegmentName())); + } + + // Verify segment3 (recent untracked segment) is NOT included + assertFalse(segmentListArg.contains("segment3.tar.gz")); + + // Calculate expected total segments that should be deleted + // ZK metadata segments + untracked segments up to the batch size limit + int expectedTotalSegments = numOlderSegments + untrackedSegmentsDeletionBatchSize; + + // Verify the total count is as expected + assertEquals(expectedTotalSegments, segmentListArg.size()); + + return null; + }).when(resourceManager).deleteSegments(anyString(), anyList()); + } + + // This test makes sure that we clean up the segments marked OFFLINE in realtime for more than 7 days @Test public void testRealtimeLLCCleanup() { @@ -194,6 +487,7 @@ public class RetentionManagerTest { PinotHelixResourceManager pinotHelixResourceManager = setupSegmentMetadata(tableConfig, now, initialNumSegments, removedSegments); setupPinotHelixResourceManager(tableConfig, removedSegments, pinotHelixResourceManager, leadControllerManager); + when(pinotHelixResourceManager.getDataDir()).thenReturn(_tempDir.toString()); ControllerConf conf = new ControllerConf(); ControllerMetrics controllerMetrics = new ControllerMetrics(PinotMetricUtils.getPinotMetricsRegistry()); @@ -226,6 +520,7 @@ public class RetentionManagerTest { PinotHelixResourceManager pinotHelixResourceManager = setupSegmentMetadataForPausedTable(tableConfig, now, removedSegments); setupPinotHelixResourceManager(tableConfig, removedSegments, pinotHelixResourceManager, leadControllerManager); + when(pinotHelixResourceManager.getDataDir()).thenReturn(_tempDir.toString()); ControllerConf conf = new ControllerConf(); ControllerMetrics controllerMetrics = new ControllerMetrics(PinotMetricUtils.getPinotMetricsRegistry()); @@ -372,4 +667,27 @@ public class RetentionManagerTest { when(segmentZKMetadata.getEndTimeMs()).thenReturn(timeUnit.toMillis(endTime)); return segmentZKMetadata; } + + /** + * Helper method to create a file with content + */ + private void createFileWithContent(File file, String content) { + try { + Files.write(file.toPath(), content.getBytes()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** + * Helper method to set file modification time + */ + private void setFileModificationTime(File file, long timestamp) { + FileTime fileTime = FileTime.fromMillis(timestamp); + try { + Files.setLastModifiedTime(file.toPath(), fileTime); + } catch (IOException e) { + throw new RuntimeException(e); + } + } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java index 592a6c1960..2bfdc051a6 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java @@ -55,6 +55,8 @@ public class SegmentsValidationAndRetentionConfig extends BaseJsonConfig { // For more usage of this field, please refer to this design doc: https://tinyurl.com/f63ru4sb private String _peerSegmentDownloadScheme; + private String _untrackedSegmentsDeletionBatchSize; + /** * @deprecated Use {@link InstanceAssignmentConfig} instead */ @@ -250,4 +252,12 @@ public class SegmentsValidationAndRetentionConfig extends BaseJsonConfig { public void setMinimizeDataMovement(boolean minimizeDataMovement) { _minimizeDataMovement = minimizeDataMovement; } + + public String getUntrackedSegmentsDeletionBatchSize() { + return _untrackedSegmentsDeletionBatchSize; + } + + public void setUntrackedSegmentsDeletionBatchSize(String untrackedSegmentsDeletionBatchSize) { + _untrackedSegmentsDeletionBatchSize = untrackedSegmentsDeletionBatchSize; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org