This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch add-retry-to-download-segment in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit e108f058c44d499e7c497f6de991429b2c6a4e48 Author: Jack Li(Analytics Engineering) <j...@jlli-mn1.linkedin.biz> AuthorDate: Tue Jul 28 14:45:39 2020 -0700 Add retry logic to download segment tar file in pinot server --- .../starter/helix/SegmentFetcherAndLoader.java | 67 ++++++++++++---------- 1 file changed, 38 insertions(+), 29 deletions(-) diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentFetcherAndLoader.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentFetcherAndLoader.java index 85256af..4f013d1 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentFetcherAndLoader.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentFetcherAndLoader.java @@ -40,6 +40,7 @@ import org.apache.pinot.spi.crypt.PinotCrypter; import org.apache.pinot.spi.crypt.PinotCrypterFactory; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.filesystem.PinotFSFactory; +import org.apache.pinot.spi.utils.retry.RetryPolicies; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -184,39 +185,47 @@ public class SegmentFetcherAndLoader { private String downloadSegmentToLocal(String uri, PinotCrypter crypter, String tableName, String segmentName) throws Exception { - File tempDir = new File(new File(_instanceDataManager.getSegmentFileDirectory(), tableName), - "tmp-" + segmentName + "-" + UUID.randomUUID()); - FileUtils.forceMkdir(tempDir); - File tempDownloadFile = new File(tempDir, segmentName + ENCODED_SUFFIX); - File tempTarFile = new File(tempDir, segmentName + TAR_GZ_SUFFIX); - File tempSegmentDir = new File(tempDir, segmentName); - try { - SegmentFetcherFactory.fetchSegmentToLocal(uri, tempDownloadFile); - if (crypter != null) { - crypter.decrypt(tempDownloadFile, tempTarFile); - } else { - tempTarFile = tempDownloadFile; - } + // Even if the tar file has been downloaded successfully, the file itself could be corrupted during the transmission. + // Thus, we should re-download it again. + RetryPolicies.fixedDelayRetryPolicy(5, 5_000L).attempt(() -> { + File tempDir = new File(new File(_instanceDataManager.getSegmentFileDirectory(), tableName), + "tmp-" + segmentName + "-" + UUID.randomUUID()); + FileUtils.forceMkdir(tempDir); + File tempDownloadFile = new File(tempDir, segmentName + ENCODED_SUFFIX); + File tempTarFile = new File(tempDir, segmentName + TAR_GZ_SUFFIX); + File tempSegmentDir = new File(tempDir, segmentName); + try { + SegmentFetcherFactory.fetchSegmentToLocal(uri, tempDownloadFile); + if (crypter != null) { + crypter.decrypt(tempDownloadFile, tempTarFile); + } else { + tempTarFile = tempDownloadFile; + } - LOGGER - .info("Downloaded tarred segment: {} for table: {} from: {} to: {}, file length: {}", segmentName, tableName, - uri, tempTarFile, tempTarFile.length()); + LOGGER.info("Downloaded tarred segment: {} for table: {} from: {} to: {}, file length: {}", segmentName, + tableName, uri, tempTarFile, tempTarFile.length()); - // If an exception is thrown when untarring, it means the tar file is broken OR not found after the retry. - // Thus, there's no need to retry again. - File tempIndexDir = TarGzCompressionUtils.untar(tempTarFile, tempSegmentDir).get(0); + // If an exception is thrown when untarring, it means the tar file is broken OR not found after the retry. + // Thus, there's no need to retry again. + File tempIndexDir = TarGzCompressionUtils.untar(tempTarFile, tempSegmentDir).get(0); - File indexDir = new File(new File(_instanceDataManager.getSegmentDataDirectory(), tableName), segmentName); - if (indexDir.exists()) { - LOGGER.info("Deleting existing index directory for segment: {} for table: {}", segmentName, tableName); - FileUtils.deleteDirectory(indexDir); + File indexDir = new File(new File(_instanceDataManager.getSegmentDataDirectory(), tableName), segmentName); + if (indexDir.exists()) { + LOGGER.info("Deleting existing index directory for segment: {} for table: {}", segmentName, tableName); + FileUtils.deleteDirectory(indexDir); + } + FileUtils.moveDirectory(tempIndexDir, indexDir); + LOGGER.info("Successfully downloaded segment: {} for table: {} to: {}", segmentName, tableName, indexDir); + return Boolean.TRUE; + } catch (Exception e) { + LOGGER.error("Caught exception when downloading segment to local", e); + return Boolean.FALSE; + } finally { + FileUtils.deleteQuietly(tempDir); } - FileUtils.moveDirectory(tempIndexDir, indexDir); - LOGGER.info("Successfully downloaded segment: {} for table: {} to: {}", segmentName, tableName, indexDir); - return indexDir.getAbsolutePath(); - } finally { - FileUtils.deleteQuietly(tempDir); - } + }); + + return new File(new File(_instanceDataManager.getSegmentDataDirectory(), tableName), segmentName).getAbsolutePath(); } public String getSegmentLocalDirectory(String tableName, String segmentId) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org