This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new d9c0790a46 Periodically delete Tmp Segment file brought by the SplitCommit End phase (#10815) d9c0790a46 is described below commit d9c0790a464eaf2575a2b7225827af36f4b0cd25 Author: Xuanyi Li <xuany...@uber.com> AuthorDate: Mon Sep 18 19:08:07 2023 -0700 Periodically delete Tmp Segment file brought by the SplitCommit End phase (#10815) --- .../pinot/common/metrics/AbstractMetrics.java | 20 +++-- .../pinot/common/metrics/ControllerMeter.java | 1 + .../pinot/controller/BaseControllerStarter.java | 4 +- .../apache/pinot/controller/ControllerConf.java | 15 ++++ .../resources/LLCSegmentCompletionHandlers.java | 4 +- .../realtime/PinotLLCRealtimeSegmentManager.java | 100 ++++++++++++++++++--- .../RealtimeSegmentValidationManager.java | 15 ++++ .../PinotLLCRealtimeSegmentManagerTest.java | 65 +++++++++++--- .../manager/realtime/PinotFSSegmentUploader.java | 3 +- .../manager/realtime}/SegmentCompletionUtils.java | 22 ++++- .../realtime}/SegmentCompletionUtilsTest.java | 19 ++-- 11 files changed, 224 insertions(+), 44 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java index ff41b12abf..98f65a4be9 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java @@ -285,9 +285,7 @@ public abstract class AbstractMetrics<QP extends AbstractMetrics.QueryPhase, M e */ public PinotMeter addMeteredTableValue(final String tableName, final M meter, final long unitCount, PinotMeter reusedMeter) { - String meterName = meter.getMeterName(); - final String fullMeterName = _metricPrefix + getTableName(tableName) + "." + meterName; - return addValueToMeter(fullMeterName, meter.getUnit(), unitCount, reusedMeter); + return addValueToMeter(getTableFullMeterName(tableName, meter), meter.getUnit(), unitCount, reusedMeter); } /** @@ -331,14 +329,17 @@ public abstract class AbstractMetrics<QP extends AbstractMetrics.QueryPhase, M e } public PinotMeter getMeteredTableValue(final String tableName, final M meter) { - final String fullMeterName; - String meterName = meter.getMeterName(); - fullMeterName = _metricPrefix + getTableName(tableName) + "." + meterName; - final PinotMetricName metricName = PinotMetricUtils.makePinotMetricName(_clazz, fullMeterName); + final PinotMetricName metricName = PinotMetricUtils.makePinotMetricName(_clazz, + getTableFullMeterName(tableName, meter)); return PinotMetricUtils.makePinotMeter(_metricsRegistry, metricName, meter.getUnit(), TimeUnit.SECONDS); } + private String getTableFullMeterName(final String tableName, final M meter) { + String meterName = meter.getMeterName(); + return _metricPrefix + getTableName(tableName) + "." + meterName; + } + /** * @deprecated Please use addMeteredTableValue(final String tableName, final M meter, final long unitCount), which is * designed for tracking count and rates. @@ -723,6 +724,11 @@ public abstract class AbstractMetrics<QP extends AbstractMetrics.QueryPhase, M e removeGaugeFromMetricRegistry(gaugeName); } + public void removeTableMeter(final String tableName, final M meter) { + PinotMetricUtils.removeMetric(_metricsRegistry, + PinotMetricUtils.makePinotMetricName(_clazz, getTableFullMeterName(tableName, meter))); + } + /** * Remove callback gauge. * @param metricName metric name diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java index 8de33d0e41..a3e6da0086 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java @@ -59,6 +59,7 @@ public enum ControllerMeter implements AbstractMetrics.Meter { LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_SUCCESS("LLCSegmentDeepStoreUploadRetrySuccess", false), LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_ERROR("LLCSegmentDeepStoreUploadRetryError", false), SEGMENT_MISSING_DEEP_STORE_LINK("RealtimeSegmentMissingDeepStoreLink", false), + DELETED_TMP_SEGMENT_COUNT("DeletedTmpSegmentCount", false), NUMBER_ADHOC_TASKS_SUBMITTED("adhocTasks", false); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java index e983fd80ff..e7f02b57df 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java @@ -156,6 +156,7 @@ public abstract class BaseControllerStarter implements ServiceStartable { protected HelixManager _helixParticipantManager; protected PinotMetricsRegistry _metricsRegistry; protected ControllerMetrics _controllerMetrics; + protected ValidationMetrics _validationMetrics; protected SqlQueryExecutor _sqlQueryExecutor; // Can only be constructed after resource manager getting started protected OfflineSegmentIntervalChecker _offlineSegmentIntervalChecker; @@ -586,6 +587,7 @@ public abstract class BaseControllerStarter implements ServiceStartable { _controllerMetrics = new ControllerMetrics(_config.getMetricsPrefix(), _metricsRegistry); _controllerMetrics.initializeGlobalMeters(); _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.VERSION, PinotVersion.VERSION_METRIC_NAME, 1); + _validationMetrics = new ValidationMetrics(_metricsRegistry); } private void initPinotFSFactory() { @@ -705,7 +707,7 @@ public abstract class BaseControllerStarter implements ServiceStartable { periodicTasks.add(_offlineSegmentIntervalChecker); _realtimeSegmentValidationManager = new RealtimeSegmentValidationManager(_config, _helixResourceManager, _leadControllerManager, - _pinotLLCRealtimeSegmentManager, new ValidationMetrics(_metricsRegistry), _controllerMetrics); + _pinotLLCRealtimeSegmentManager, _validationMetrics, _controllerMetrics); periodicTasks.add(_realtimeSegmentValidationManager); _brokerResourceValidationManager = new BrokerResourceValidationManager(_config, _helixResourceManager, _leadControllerManager, _controllerMetrics); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java index 73abaa14a4..22a838faf1 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java @@ -216,9 +216,15 @@ public class ControllerConf extends PinotConfiguration { "controller.realtime.segment.deepStoreUploadRetryEnabled"; public static final String DEEP_STORE_RETRY_UPLOAD_TIMEOUT_MS = "controller.realtime.segment.deepStoreUploadRetry.timeoutMs"; + public static final String ENABLE_TMP_SEGMENT_ASYNC_DELETION = + "controller.realtime.segment.tmpFileAsyncDeletionEnabled"; + // temporary segments within expiration won't be deleted so that ongoing split commit won't be impacted. + public static final String TMP_SEGMENT_RETENTION_IN_SECONDS = + "controller.realtime.segment.tmpFileRetentionInSeconds"; public static final int MIN_INITIAL_DELAY_IN_SECONDS = 120; public static final int MAX_INITIAL_DELAY_IN_SECONDS = 300; + public static final int DEFAULT_SPLIT_COMMIT_TMP_SEGMENT_LIFETIME_SECOND = 60 * 60; // 1 Hour. private static final Random RANDOM = new Random(); @@ -924,10 +930,19 @@ public class ControllerConf extends PinotConfiguration { return getProperty(ControllerPeriodicTasksConf.ENABLE_DEEP_STORE_RETRY_UPLOAD_LLC_SEGMENT, false); } + public boolean isTmpSegmentAsyncDeletionEnabled() { + return getProperty(ControllerPeriodicTasksConf.ENABLE_TMP_SEGMENT_ASYNC_DELETION, false); + } + public int getDeepStoreRetryUploadTimeoutMs() { return getProperty(ControllerPeriodicTasksConf.DEEP_STORE_RETRY_UPLOAD_TIMEOUT_MS, -1); } + public int getTmpSegmentRetentionInSeconds() { + return getProperty(ControllerPeriodicTasksConf.TMP_SEGMENT_RETENTION_IN_SECONDS, + ControllerPeriodicTasksConf.DEFAULT_SPLIT_COMMIT_TMP_SEGMENT_LIFETIME_SECOND); + } + public long getPinotTaskManagerInitialDelaySeconds() { return getPeriodicTaskInitialDelayInSeconds(); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java index bf852f582e..836f58052d 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java @@ -47,10 +47,10 @@ import org.apache.pinot.controller.api.access.AccessType; import org.apache.pinot.controller.api.access.Authenticate; import org.apache.pinot.controller.helix.core.realtime.SegmentCompletionManager; import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor; -import org.apache.pinot.controller.util.SegmentCompletionUtils; import org.apache.pinot.core.auth.Actions; import org.apache.pinot.core.auth.Authorize; import org.apache.pinot.core.auth.TargetType; +import org.apache.pinot.core.data.manager.realtime.SegmentCompletionUtils; import org.apache.pinot.segment.spi.V1Constants; import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; import org.apache.pinot.spi.filesystem.PinotFS; @@ -378,7 +378,7 @@ public class LLCSegmentCompletionHandlers { String rawTableName = new LLCSegmentName(segmentName).getTableName(); URI segmentFileURI = URIUtils .getUri(ControllerFilePathProvider.getInstance().getDataDirURI().toString(), rawTableName, - URIUtils.encode(SegmentCompletionUtils.generateSegmentFileName(segmentName))); + URIUtils.encode(SegmentCompletionUtils.generateTmpSegmentFileName(segmentName))); PinotFSFactory.create(segmentFileURI.getScheme()).copyFromLocalFile(localTempFile, segmentFileURI); SegmentCompletionProtocol.Response.Params responseParams = new SegmentCompletionProtocol.Response.Params() .withStreamPartitionMsgOffset(requestParams.getStreamPartitionMsgOffset()) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index e55f0e6463..971dd4333d 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -72,8 +72,8 @@ import org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpd import org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdater; import org.apache.pinot.controller.helix.core.retention.strategy.RetentionStrategy; import org.apache.pinot.controller.helix.core.retention.strategy.TimeRetentionStrategy; -import org.apache.pinot.controller.util.SegmentCompletionUtils; import org.apache.pinot.controller.validation.RealtimeSegmentValidationManager; +import org.apache.pinot.core.data.manager.realtime.SegmentCompletionUtils; import org.apache.pinot.core.util.PeerServerSegmentFinder; import org.apache.pinot.segment.spi.ColumnMetadata; import org.apache.pinot.segment.spi.creator.SegmentVersion; @@ -165,6 +165,7 @@ public class PinotLLCRealtimeSegmentManager { private final Lock[] _idealStateUpdateLocks; private final FlushThresholdUpdateManager _flushThresholdUpdateManager; private final boolean _isDeepStoreLLCSegmentUploadRetryEnabled; + private final boolean _isTmpSegmentAsyncDeletionEnabled; private final int _deepstoreUploadRetryTimeoutMs; private final FileUploadDownloadClient _fileUploadDownloadClient; private final AtomicInteger _numCompletingSegments = new AtomicInteger(0); @@ -190,6 +191,7 @@ public class PinotLLCRealtimeSegmentManager { } _flushThresholdUpdateManager = new FlushThresholdUpdateManager(); _isDeepStoreLLCSegmentUploadRetryEnabled = controllerConf.isDeepStoreRetryUploadLLCSegmentEnabled(); + _isTmpSegmentAsyncDeletionEnabled = controllerConf.isTmpSegmentAsyncDeletionEnabled(); _deepstoreUploadRetryTimeoutMs = controllerConf.getDeepStoreRetryUploadTimeoutMs(); _fileUploadDownloadClient = _isDeepStoreLLCSegmentUploadRetryEnabled ? initFileUploadDownloadClient() : null; } @@ -198,6 +200,10 @@ public class PinotLLCRealtimeSegmentManager { return _isDeepStoreLLCSegmentUploadRetryEnabled; } + public boolean isTmpSegmentAsyncDeletionEnabled() { + return _isTmpSegmentAsyncDeletionEnabled; + } + @VisibleForTesting FileUploadDownloadClient initFileUploadDownloadClient() { return new FileUploadDownloadClient(); @@ -460,19 +466,17 @@ public class PinotLLCRealtimeSegmentManager { PinotFS pinotFS = PinotFSFactory.create(tableDirURI.getScheme()); String uriToMoveTo = moveSegmentFile(rawTableName, segmentName, segmentLocation, pinotFS); - // Cleans up tmp segment files under table dir. - // We only clean up tmp segment files in table level dir, so there's no need to list recursively. - // See LLCSegmentCompletionHandlers.uploadSegment(). - // TODO: move tmp file logic into SegmentCompletionUtils. - try { - for (String uri : pinotFS.listFiles(tableDirURI, false)) { - if (uri.contains(SegmentCompletionUtils.getSegmentNamePrefix(segmentName))) { - LOGGER.warn("Deleting temporary segment file: {}", uri); - Preconditions.checkState(pinotFS.delete(new URI(uri), true), "Failed to delete file: %s", uri); + if (!isTmpSegmentAsyncDeletionEnabled()) { + try { + for (String uri : pinotFS.listFiles(tableDirURI, false)) { + if (uri.contains(SegmentCompletionUtils.getTmpSegmentNamePrefix(segmentName))) { + LOGGER.warn("Deleting temporary segment file: {}", uri); + Preconditions.checkState(pinotFS.delete(new URI(uri), true), "Failed to delete file: %s", uri); + } } + } catch (Exception e) { + LOGGER.warn("Caught exception while deleting temporary segment files for segment: {}", segmentName, e); } - } catch (Exception e) { - LOGGER.warn("Caught exception while deleting temporary segment files for segment: {}", segmentName, e); } committingSegmentDescriptor.setSegmentLocation(uriToMoveTo); } @@ -1444,6 +1448,78 @@ public class PinotLLCRealtimeSegmentManager { } } + /** + * Delete tmp segments for realtime table with low level consumer, split commit and async deletion is enabled. + * @param tableNameWithType + * @param segmentsZKMetadata + * @return number of deleted orphan temporary segments + * + */ + public long deleteTmpSegments(String tableNameWithType, List<SegmentZKMetadata> segmentsZKMetadata) { + Preconditions.checkState(!_isStopping, "Segment manager is stopping"); + + if (!TableNameBuilder.isRealtimeTableResource(tableNameWithType)) { + return 0L; + } + + TableConfig tableConfig = _helixResourceManager.getTableConfig(tableNameWithType); + if (tableConfig == null) { + LOGGER.warn("Failed to find table config for table: {}, skipping deletion of tmp segments", tableNameWithType); + return 0L; + } + + if (!isLowLevelConsumer(tableNameWithType, tableConfig) + || !getIsSplitCommitEnabled() + || !isTmpSegmentAsyncDeletionEnabled()) { + return 0L; + } + + Set<String> deepURIs = segmentsZKMetadata.stream().filter(meta -> meta.getStatus() == Status.DONE + && !CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(meta.getDownloadUrl())).map( + SegmentZKMetadata::getDownloadUrl).collect( + Collectors.toSet()); + + String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); + URI tableDirURI = URIUtils.getUri(_controllerConf.getDataDir(), rawTableName); + PinotFS pinotFS = PinotFSFactory.create(tableDirURI.getScheme()); + long deletedTmpSegments = 0; + try { + for (String filePath : pinotFS.listFiles(tableDirURI, false)) { + // prepend scheme + URI uri = URIUtils.getUri(filePath); + if (isTmpAndCanDelete(uri, deepURIs, pinotFS)) { + LOGGER.info("Deleting temporary segment file: {}", uri); + if (pinotFS.delete(uri, true)) { + LOGGER.info("Succeed to delete file: {}", uri); + deletedTmpSegments++; + } else { + LOGGER.warn("Failed to delete file: {}", uri); + } + } + } + } catch (Exception e) { + LOGGER.warn("Caught exception while deleting temporary files for table: {}", rawTableName, e); + } + return deletedTmpSegments; + } + + private boolean isTmpAndCanDelete(URI uri, Set<String> deepURIs, PinotFS pinotFS) throws Exception { + long lastModified = pinotFS.lastModified(uri); + if (lastModified <= 0) { + LOGGER.warn("file {} modification time {} is not positive, ineligible for delete", uri.toString(), lastModified); + return false; + } + String uriString = uri.toString(); + return SegmentCompletionUtils.isTmpFile(uriString) && !deepURIs.contains(uriString) + && getCurrentTimeMs() - lastModified > _controllerConf.getTmpSegmentRetentionInSeconds() * 1000L; + } + + private boolean isLowLevelConsumer(String tableNameWithType, TableConfig tableConfig) { + PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableNameWithType, + IngestionConfigUtils.getStreamConfigMap(tableConfig)); + return streamConfig.hasLowLevelConsumerType(); + } + /** * Force commit the current segments in consuming state and restart consumption */ diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java index d0a7bb1a37..42ed189bbf 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Properties; import java.util.concurrent.TimeUnit; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.metrics.ControllerMeter; import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.common.metrics.ValidationMetrics; import org.apache.pinot.controller.ControllerConf; @@ -49,6 +50,7 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea private final PinotLLCRealtimeSegmentManager _llcRealtimeSegmentManager; private final ValidationMetrics _validationMetrics; + private final ControllerMetrics _controllerMetrics; private final int _segmentLevelValidationIntervalInSeconds; private long _lastSegmentLevelValidationRunTimeMs = 0L; @@ -64,6 +66,7 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea leadControllerManager, controllerMetrics); _llcRealtimeSegmentManager = llcRealtimeSegmentManager; _validationMetrics = validationMetrics; + _controllerMetrics = controllerMetrics; _segmentLevelValidationIntervalInSeconds = config.getSegmentLevelValidationIntervalInSeconds(); Preconditions.checkState(_segmentLevelValidationIntervalInSeconds > 0); @@ -111,8 +114,19 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea private void runSegmentLevelValidation(TableConfig tableConfig, StreamConfig streamConfig) { String realtimeTableName = tableConfig.getTableName(); + List<SegmentZKMetadata> segmentsZKMetadata = _pinotHelixResourceManager.getSegmentsZKMetadata(realtimeTableName); + // Delete tmp segments + try { + long numDeleteTmpSegments = _llcRealtimeSegmentManager.deleteTmpSegments(realtimeTableName, segmentsZKMetadata); + LOGGER.info("Deleted {} tmp segments for table: {}", numDeleteTmpSegments, realtimeTableName); + _controllerMetrics.addMeteredTableValue(realtimeTableName, ControllerMeter.DELETED_TMP_SEGMENT_COUNT, + numDeleteTmpSegments); + } catch (Exception e) { + LOGGER.error("Failed to delete tmp segments for table: {}", realtimeTableName, e); + } + // Update the total document count gauge _validationMetrics.updateTotalDocumentCountGauge(realtimeTableName, computeTotalDocumentCount(segmentsZKMetadata)); @@ -127,6 +141,7 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea for (String tableNameWithType : tableNamesWithType) { if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) { _validationMetrics.cleanupTotalDocumentCountGauge(tableNameWithType); + _controllerMetrics.removeTableMeter(tableNameWithType, ControllerMeter.DELETED_TMP_SEGMENT_COUNT); } } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java index d367741080..9a53ad839e 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java @@ -23,6 +23,7 @@ import com.google.common.collect.Lists; import java.io.File; import java.io.IOException; import java.net.URISyntaxException; +import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -57,7 +58,7 @@ import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment; import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor; -import org.apache.pinot.controller.util.SegmentCompletionUtils; +import org.apache.pinot.core.data.manager.realtime.SegmentCompletionUtils; import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils; import org.apache.pinot.segment.spi.creator.SegmentVersion; import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; @@ -85,6 +86,10 @@ import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.Test; +import static org.apache.pinot.controller.ControllerConf.ControllerPeriodicTasksConf.ENABLE_TMP_SEGMENT_ASYNC_DELETION; +import static org.apache.pinot.controller.ControllerConf.ControllerPeriodicTasksConf.TMP_SEGMENT_RETENTION_IN_SECONDS; +import static org.apache.pinot.controller.ControllerConf.ENABLE_SPLIT_COMMIT; +import static org.apache.pinot.spi.utils.CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; @@ -787,7 +792,7 @@ public class PinotLLCRealtimeSegmentManagerTest { PinotFSFactory.init(new PinotConfiguration()); File tableDir = new File(TEMP_DIR, RAW_TABLE_NAME); String segmentName = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, CURRENT_TIME_MS).getSegmentName(); - String segmentFileName = SegmentCompletionUtils.generateSegmentFileName(segmentName); + String segmentFileName = SegmentCompletionUtils.generateTmpSegmentFileName(segmentName); File segmentFile = new File(tableDir, segmentFileName); FileUtils.write(segmentFile, "temporary file contents"); @@ -808,9 +813,9 @@ public class PinotLLCRealtimeSegmentManagerTest { File tableDir = new File(TEMP_DIR, RAW_TABLE_NAME); String segmentName = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, CURRENT_TIME_MS).getSegmentName(); String otherSegmentName = new LLCSegmentName(RAW_TABLE_NAME, 1, 0, CURRENT_TIME_MS).getSegmentName(); - String segmentFileName = SegmentCompletionUtils.generateSegmentFileName(segmentName); - String extraSegmentFileName = SegmentCompletionUtils.generateSegmentFileName(segmentName); - String otherSegmentFileName = SegmentCompletionUtils.generateSegmentFileName(otherSegmentName); + String segmentFileName = SegmentCompletionUtils.generateTmpSegmentFileName(segmentName); + String extraSegmentFileName = SegmentCompletionUtils.generateTmpSegmentFileName(segmentName); + String otherSegmentFileName = SegmentCompletionUtils.generateTmpSegmentFileName(otherSegmentName); File segmentFile = new File(tableDir, segmentFileName); File extraSegmentFile = new File(tableDir, extraSegmentFileName); File otherSegmentFile = new File(tableDir, otherSegmentFileName); @@ -957,7 +962,7 @@ public class PinotLLCRealtimeSegmentManagerTest { // Change 1st segment status to be DONE, but with default peer download url. // Verify later the download url is fixed after upload success. segmentsZKMetadata.get(0).setStatus(Status.DONE); - segmentsZKMetadata.get(0).setDownloadUrl(CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD); + segmentsZKMetadata.get(0).setDownloadUrl(METADATA_URI_FOR_PEER_DOWNLOAD); // set up the external view for 1st segment String instance0 = "instance0"; externalView.setState(segmentsZKMetadata.get(0).getSegmentName(), instance0, "ONLINE"); @@ -986,7 +991,7 @@ public class PinotLLCRealtimeSegmentManagerTest { // Change 2nd segment status to be DONE, but with default peer download url. // Verify later the download url isn't fixed after upload failure. segmentsZKMetadata.get(1).setStatus(Status.DONE); - segmentsZKMetadata.get(1).setDownloadUrl(CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD); + segmentsZKMetadata.get(1).setDownloadUrl(METADATA_URI_FOR_PEER_DOWNLOAD); // set up the external view for 2nd segment String instance1 = "instance1"; externalView.setState(segmentsZKMetadata.get(1).getSegmentName(), instance1, "ONLINE"); @@ -1010,7 +1015,7 @@ public class PinotLLCRealtimeSegmentManagerTest { // Verify later the download url isn't fixed because no ONLINE replica found in any server. segmentsZKMetadata.get(2).setStatus(Status.DONE); segmentsZKMetadata.get(2).setDownloadUrl( - CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD); + METADATA_URI_FOR_PEER_DOWNLOAD); // set up the external view for 3rd segment String instance2 = "instance2"; externalView.setState(segmentsZKMetadata.get(2).getSegmentName(), instance2, "OFFLINE"); @@ -1039,10 +1044,10 @@ public class PinotLLCRealtimeSegmentManagerTest { assertEquals( segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, segmentNames.get(1), null).getDownloadUrl(), - CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD); + METADATA_URI_FOR_PEER_DOWNLOAD); assertEquals( segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, segmentNames.get(2), null).getDownloadUrl(), - CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD); + METADATA_URI_FOR_PEER_DOWNLOAD); assertEquals( segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, segmentNames.get(3), null).getDownloadUrl(), defaultDownloadUrl); @@ -1050,6 +1055,46 @@ public class PinotLLCRealtimeSegmentManagerTest { segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, segmentNames.get(4), null).getDownloadUrl()); } + @Test + public void testDeleteTmpSegmentFiles() throws Exception { + // turn on knobs for async deletion of tmp files + ControllerConf config = new ControllerConf(); + config.setDataDir(TEMP_DIR.toString()); + config.setProperty(ENABLE_SPLIT_COMMIT, true); + config.setProperty(TMP_SEGMENT_RETENTION_IN_SECONDS, Integer.MIN_VALUE); + config.setProperty(ENABLE_TMP_SEGMENT_ASYNC_DELETION, true); + + // simulate there's an orphan tmp file in localFS + PinotFSFactory.init(new PinotConfiguration()); + File tableDir = new File(TEMP_DIR, RAW_TABLE_NAME); + String segmentName = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, CURRENT_TIME_MS).getSegmentName(); + String segmentFileName = SegmentCompletionUtils.generateTmpSegmentFileName(segmentName); + File segmentFile = new File(tableDir, segmentFileName); + FileUtils.write(segmentFile, "temporary file contents", Charset.defaultCharset()); + + SegmentZKMetadata segZKMeta = mock(SegmentZKMetadata.class); + PinotHelixResourceManager helixResourceManager = mock(PinotHelixResourceManager.class); + when(helixResourceManager.getTableConfig(REALTIME_TABLE_NAME)) + .thenReturn(new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setLLC(true) + .setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap()).build()); + PinotLLCRealtimeSegmentManager segmentManager = new FakePinotLLCRealtimeSegmentManager( + helixResourceManager, config); + + long deletedTmpSegCount; + // case 1: the segmentMetadata download uri is identical to the uri of the tmp segment. Should not delete + when(segZKMeta.getStatus()).thenReturn(Status.DONE); + when(segZKMeta.getDownloadUrl()).thenReturn(SCHEME + tableDir + "/" + segmentFileName); + deletedTmpSegCount = segmentManager.deleteTmpSegments(REALTIME_TABLE_NAME, Collections.singletonList(segZKMeta)); + assertTrue(segmentFile.exists()); + assertEquals(0L, deletedTmpSegCount); + + // case 2: download url is empty, indicating the tmp segment is absolutely orphan. Delete the file + when(segZKMeta.getDownloadUrl()).thenReturn(METADATA_URI_FOR_PEER_DOWNLOAD); + deletedTmpSegCount = segmentManager.deleteTmpSegments(REALTIME_TABLE_NAME, Collections.singletonList(segZKMeta)); + assertFalse(segmentFile.exists()); + assertEquals(1L, deletedTmpSegCount); + } + ////////////////////////////////////////////////////////////////////////////////// // Fake classes ///////////////////////////////////////////////////////////////////////////////// diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java index 4f173521c9..949fc7a0b7 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java @@ -20,7 +20,6 @@ package org.apache.pinot.core.data.manager.realtime; import java.io.File; import java.net.URI; -import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -74,7 +73,7 @@ public class PinotFSSegmentUploader implements SegmentUploader { final String rawTableName = TableNameBuilder.extractRawTableName(segmentName.getTableName()); Callable<URI> uploadTask = () -> { URI destUri = new URI(StringUtil.join(File.separator, _segmentStoreUriStr, segmentName.getTableName(), - segmentName.getSegmentName() + UUID.randomUUID().toString())); + SegmentCompletionUtils.generateTmpSegmentFileName(segmentName.getSegmentName()))); long startTime = System.currentTimeMillis(); try { PinotFS pinotFS = PinotFSFactory.create(new URI(_segmentStoreUriStr).getScheme()); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/SegmentCompletionUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCompletionUtils.java similarity index 69% rename from pinot-controller/src/main/java/org/apache/pinot/controller/util/SegmentCompletionUtils.java rename to pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCompletionUtils.java index 9feb6cdbc6..61f270c3bf 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/SegmentCompletionUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCompletionUtils.java @@ -16,9 +16,10 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.controller.util; +package org.apache.pinot.core.data.manager.realtime; import java.util.UUID; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,11 +38,24 @@ public class SegmentCompletionUtils { * @param segmentName segment name * @return */ - public static String getSegmentNamePrefix(String segmentName) { + public static String getTmpSegmentNamePrefix(String segmentName) { return segmentName + TMP; } - public static String generateSegmentFileName(String segmentNameStr) { - return getSegmentNamePrefix(segmentNameStr) + UUID.randomUUID().toString(); + public static String generateTmpSegmentFileName(String segmentNameStr) { + return getTmpSegmentNamePrefix(segmentNameStr) + UUID.randomUUID(); + } + + public static boolean isTmpFile(String uri) { + String[] splits = StringUtils.splitByWholeSeparator(uri, TMP); + if (splits.length < 2) { + return false; + } + try { + UUID.fromString(splits[splits.length - 1]); + return true; + } catch (IllegalArgumentException e) { + return false; + } } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/SegmentCompletionUtilsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/SegmentCompletionUtilsTest.java similarity index 60% rename from pinot-controller/src/test/java/org/apache/pinot/controller/api/SegmentCompletionUtilsTest.java rename to pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/SegmentCompletionUtilsTest.java index 06f9cd43c0..270f763dc3 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/SegmentCompletionUtilsTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/SegmentCompletionUtilsTest.java @@ -16,27 +16,34 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.controller.api; +package org.apache.pinot.core.data.manager.realtime; -import org.apache.pinot.controller.util.SegmentCompletionUtils; import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; - public class SegmentCompletionUtilsTest { @Test public void testGenerateSegmentFilePrefix() { String segmentName = "segment"; - assertEquals(SegmentCompletionUtils.getSegmentNamePrefix(segmentName), "segment.tmp."); + assertEquals(SegmentCompletionUtils.getTmpSegmentNamePrefix(segmentName), "segment.tmp."); } @Test public void testGenerateSegmentLocation() { String segmentName = "segment"; - String segmentNamePrefix = SegmentCompletionUtils.getSegmentNamePrefix(segmentName); - assertTrue(SegmentCompletionUtils.generateSegmentFileName(segmentName).startsWith(segmentNamePrefix)); + String segmentNamePrefix = SegmentCompletionUtils.getTmpSegmentNamePrefix(segmentName); + assertTrue(SegmentCompletionUtils.generateTmpSegmentFileName(segmentName).startsWith(segmentNamePrefix)); + } + + @Test + public void testIsTmpFile() { + assertTrue(SegmentCompletionUtils.isTmpFile("hdfs://foo.tmp.550e8400-e29b-41d4-a716-446655440000")); + assertFalse(SegmentCompletionUtils.isTmpFile("hdfs://foo.tmp.")); + assertFalse(SegmentCompletionUtils.isTmpFile(".tmp.550e8400-e29b-41d4-a716-446655440000")); + assertFalse(SegmentCompletionUtils.isTmpFile("hdfs://foo.tmp.55")); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org