This is an automated email from the ASF dual-hosted git repository. ankitsultana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 26709271f2 Move untar method to BaseTaskExecutor and untar with peerDownload segment if deepstore corrupted (#13964) 26709271f2 is described below commit 26709271f2be1e82eb02478dc0a1bdaa1dd67970 Author: Pratik Tibrewal <tibrewalpra...@uber.com> AuthorDate: Thu Sep 12 10:52:05 2024 +0530 Move untar method to BaseTaskExecutor and untar with peerDownload segment if deepstore corrupted (#13964) --- .../BaseMultipleSegmentsConversionExecutor.java | 19 +++++-------------- .../tasks/BaseSingleSegmentConversionExecutor.java | 19 ++++++------------- .../pinot/plugin/minion/tasks/BaseTaskExecutor.java | 21 ++++++++++++++++++--- 3 files changed, 29 insertions(+), 30 deletions(-) diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java index a00883f34e..f16d93b151 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java @@ -200,30 +200,21 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe for (int i = 0; i < downloadURLs.length; i++) { String segmentName = segmentNames[i]; - // Download the segment file + // Download and decompress the segment file _eventObserver.notifyProgress(_pinotTaskConfig, - String.format("Downloading segment from: %s (%d out of %d)", downloadURLs[i], (i + 1), + String.format("Downloading and decompressing segment from: %s (%d out of %d)", downloadURLs[i], (i + 1), downloadURLs.length)); - File tarredSegmentFile = new File(tempDataDir, "tarredSegmentFile_" + i); + File indexDir; try { - downloadSegmentToLocal(tableNameWithType, segmentName, downloadURLs[i], taskType, tarredSegmentFile); + indexDir = downloadSegmentToLocalAndUntar(tableNameWithType, segmentName, downloadURLs[i], taskType, + tempDataDir, "_" + i); } catch (Exception e) { LOGGER.error("Failed to download segment from download url: {}", downloadURLs[i], e); _minionMetrics.addMeteredTableValue(tableNameWithType, MinionMeter.SEGMENT_DOWNLOAD_FAIL_COUNT, 1L); _eventObserver.notifyTaskError(_pinotTaskConfig, e); throw e; } - - // Un-tar the segment file - _eventObserver.notifyProgress(_pinotTaskConfig, - String.format("Decompressing segment from: %s (%d out of %d)", downloadURLs[i], (i + 1), - downloadURLs.length)); - File segmentDir = new File(tempDataDir, "segmentDir_" + i); - File indexDir = TarCompressionUtils.untar(tarredSegmentFile, segmentDir).get(0); inputSegmentDirs.add(indexDir); - if (!FileUtils.deleteQuietly(tarredSegmentFile)) { - LOGGER.warn("Failed to delete tarred input segment: {}", tarredSegmentFile.getAbsolutePath()); - } reportSegmentDownloadMetrics(indexDir, tableNameWithType, taskType); SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir); diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java index c75fa99bf8..e35a9c5cd5 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java @@ -97,12 +97,13 @@ public abstract class BaseSingleSegmentConversionExecutor extends BaseTaskExecut File tempDataDir = new File(new File(MINION_CONTEXT.getDataDir(), taskType), "tmp-" + UUID.randomUUID()); Preconditions.checkState(tempDataDir.mkdirs(), "Failed to create temporary directory: %s", tempDataDir); try { - // Download the tarred segment file - _eventObserver.notifyProgress(_pinotTaskConfig, "Downloading segment from: " + downloadURL); - File tarredSegmentFile = new File(tempDataDir, "tarredSegment"); - LOGGER.info("Downloading segment from {} to {}", downloadURL, tarredSegmentFile.getAbsolutePath()); + // Download and decompress the segment file + _eventObserver.notifyProgress(_pinotTaskConfig, "Downloading and decompressing segment from: " + + downloadURL); + File indexDir; try { - downloadSegmentToLocal(tableNameWithType, segmentName, downloadURL, taskType, tarredSegmentFile); + indexDir = downloadSegmentToLocalAndUntar(tableNameWithType, segmentName, downloadURL, taskType, + tempDataDir, ""); } catch (Exception e) { LOGGER.error("Failed to download segment from download url: {}", downloadURL, e); _minionMetrics.addMeteredTableValue(tableNameWithType, MinionMeter.SEGMENT_DOWNLOAD_FAIL_COUNT, 1L); @@ -110,14 +111,6 @@ public abstract class BaseSingleSegmentConversionExecutor extends BaseTaskExecut throw e; } - // Un-tar the segment file - _eventObserver.notifyProgress(_pinotTaskConfig, "Decompressing segment from: " + downloadURL); - File segmentDir = new File(tempDataDir, "segmentDir"); - File indexDir = TarCompressionUtils.untar(tarredSegmentFile, segmentDir).get(0); - if (!FileUtils.deleteQuietly(tarredSegmentFile)) { - LOGGER.warn("Failed to delete tarred input segment: {}", tarredSegmentFile.getAbsolutePath()); - } - // Publish metrics related to segment download reportSegmentDownloadMetrics(indexDir, tableNameWithType, taskType); diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseTaskExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseTaskExecutor.java index a33b2c4005..c61a5b58d9 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseTaskExecutor.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseTaskExecutor.java @@ -29,6 +29,7 @@ import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier; import org.apache.pinot.common.metrics.MinionMeter; import org.apache.pinot.common.metrics.MinionMetrics; +import org.apache.pinot.common.utils.TarCompressionUtils; import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory; import org.apache.pinot.core.minion.PinotTaskConfig; import org.apache.pinot.core.util.PeerServerSegmentFinder; @@ -110,20 +111,27 @@ public abstract class BaseTaskExecutor implements PinotTaskExecutor { _minionMetrics.addMeteredTableValue(tableName, taskType, meter, unitCount); } - protected void downloadSegmentToLocal(String tableNameWithType, String segmentName, String deepstoreURL, - String taskType, File tarredSegmentFile) + protected File downloadSegmentToLocalAndUntar(String tableNameWithType, String segmentName, String deepstoreURL, + String taskType, File tempDataDir, String suffix) throws Exception { - LOGGER.info("Downloading segment {} from {} to {}", segmentName, deepstoreURL, tarredSegmentFile.getAbsolutePath()); + File tarredSegmentFile = new File(tempDataDir, "tarredSegmentFile" + suffix); + File segmentDir = new File(tempDataDir, "segmentDir" + suffix); + File indexDir; TableConfig tableConfig = getTableConfig(tableNameWithType); String crypterName = tableConfig.getValidationConfig().getCrypterClassName(); + LOGGER.info("Downloading segment {} from {} to {}", segmentName, deepstoreURL, tarredSegmentFile.getAbsolutePath()); + try { // download from deepstore first SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(deepstoreURL, tarredSegmentFile, crypterName); + // untar the segment file + indexDir = TarCompressionUtils.untar(tarredSegmentFile, segmentDir).get(0); } catch (Exception e) { LOGGER.error("Segment download failed from deepstore for {}, crypter:{}", deepstoreURL, crypterName, e); String peerDownloadScheme = tableConfig.getValidationConfig().getPeerSegmentDownloadScheme(); if (MinionTaskUtils.extractMinionAllowDownloadFromServer(tableConfig, taskType, MINION_CONTEXT.isAllowDownloadFromServer()) && peerDownloadScheme != null) { + // if allowDownloadFromServer is enabled, download the segment from a peer server as deepstore download failed LOGGER.info("Trying to download from servers for segment {} post deepstore download failed", segmentName); SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(segmentName, peerDownloadScheme, () -> { List<URI> uris = @@ -132,9 +140,16 @@ public abstract class BaseTaskExecutor implements PinotTaskExecutor { Collections.shuffle(uris); return uris; }, tarredSegmentFile, crypterName); + // untar the segment file + indexDir = TarCompressionUtils.untar(tarredSegmentFile, segmentDir).get(0); } else { throw e; } + } finally { + if (!FileUtils.deleteQuietly(tarredSegmentFile)) { + LOGGER.warn("Failed to delete tarred input segment: {}", tarredSegmentFile.getAbsolutePath()); + } } + return indexDir; } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org