Jackie-Jiang commented on code in PR #12886: URL: https://github.com/apache/pinot/pull/12886#discussion_r1577264671
########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java: ########## @@ -608,172 +721,153 @@ protected SegmentDataManager unregisterSegment(String segmentName) { } } - protected boolean allowDownload(String segmentName, SegmentZKMetadata zkMetadata) { - return true; - } - - protected File downloadSegment(String segmentName, SegmentZKMetadata zkMetadata) - throws Exception { - // TODO: may support download from peer servers for RealTime table. - return downloadSegmentFromDeepStore(segmentName, zkMetadata); - } - - private File downloadSegmentFromDeepStore(String segmentName, SegmentZKMetadata zkMetadata) + /** + * Downloads an immutable segment into the index directory. + * Segment can be downloaded from deep store or from peer servers. Downloaded segment might be compressed or + * encrypted, and this method takes care of decompressing and decrypting the segment. + */ + protected File downloadSegment(SegmentZKMetadata zkMetadata) throws Exception { - File tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "-" + UUID.randomUUID()); - if (_isStreamSegmentDownloadUntar && zkMetadata.getCrypterName() == null) { - try { - File untaredSegDir = downloadAndStreamUntarWithRateLimit(segmentName, zkMetadata, tempRootDir, - _streamSegmentDownloadUntarRateLimitBytesPerSec); - return moveSegment(segmentName, untaredSegDir); - } finally { - FileUtils.deleteQuietly(tempRootDir); - } - } else { - try { - File tarFile = downloadAndDecrypt(segmentName, zkMetadata, tempRootDir); - return untarAndMoveSegment(segmentName, tarFile, tempRootDir); - } finally { - FileUtils.deleteQuietly(tempRootDir); - } - } - } - - private File moveSegment(String segmentName, File untaredSegDir) - throws IOException { + String segmentName = zkMetadata.getSegmentName(); + String downloadUrl = zkMetadata.getDownloadUrl(); + Preconditions.checkState(downloadUrl != null, + "Failed to find download URL in ZK metadata for segment: %s of table: %s", segmentName, _tableNameWithType); try { - File indexDir = getSegmentDataDir(segmentName); - FileUtils.deleteDirectory(indexDir); - FileUtils.moveDirectory(untaredSegDir, indexDir); - return indexDir; + if (!CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(downloadUrl)) { + try { + return downloadSegmentFromDeepStore(zkMetadata); + } catch (Exception e) { + if (_peerDownloadScheme != null) { + return downloadSegmentFromPeers(zkMetadata); + } else { + throw e; + } + } + } else { + return downloadSegmentFromPeers(zkMetadata); + } } catch (Exception e) { - LOGGER.error("Failed to move segment: {} of table: {}", segmentName, _tableNameWithType); - _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.SEGMENT_DIR_MOVEMENT_FAILURES, 1L); + _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.SEGMENT_DOWNLOAD_FAILURES, 1); throw e; } } @VisibleForTesting - File downloadAndDecrypt(String segmentName, SegmentZKMetadata zkMetadata, File tempRootDir) + File downloadSegmentFromDeepStore(SegmentZKMetadata zkMetadata) throws Exception { - File tarFile = new File(tempRootDir, segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION); - String uri = zkMetadata.getDownloadUrl(); - boolean downloadSuccess = false; + String segmentName = zkMetadata.getSegmentName(); + String downloadUrl = zkMetadata.getDownloadUrl(); + _logger.info("Downloading segment: {} from: {}", segmentName, downloadUrl); + File tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "-" + UUID.randomUUID()); + if (_segmentDownloadSemaphore != null) { + long startTime = System.currentTimeMillis(); + _logger.info("Acquiring segment download semaphore for segment: {}, queue-length: {} ", segmentName, + _segmentDownloadSemaphore.getQueueLength()); + _segmentDownloadSemaphore.acquire(); + _logger.info("Acquired segment download semaphore for segment: {} (lock-time={}ms, queue-length={}).", + segmentName, System.currentTimeMillis() - startTime, _segmentDownloadSemaphore.getQueueLength()); + } try { - if (_segmentDownloadSemaphore != null) { - long startTime = System.currentTimeMillis(); - LOGGER.info("Trying to acquire segment download semaphore for: {}. queue-length: {} ", segmentName, - _segmentDownloadSemaphore.getQueueLength()); - _segmentDownloadSemaphore.acquire(); - LOGGER.info("Acquired segment download semaphore for: {} (lock-time={}ms, queue-length={}).", segmentName, - System.currentTimeMillis() - startTime, _segmentDownloadSemaphore.getQueueLength()); - } - SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(uri, tarFile, zkMetadata.getCrypterName()); - LOGGER.info("Downloaded tarred segment: {} for table: {} from: {} to: {}, file length: {}", segmentName, - _tableNameWithType, uri, tarFile, tarFile.length()); - downloadSuccess = true; - return tarFile; - } catch (AttemptsExceededException e) { - LOGGER.error("Attempts exceeded when downloading segment: {} for table: {} from: {} to: {}", segmentName, - _tableNameWithType, uri, tarFile); - _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.SEGMENT_DOWNLOAD_FROM_REMOTE_FAILURES, 1L); - if (_peerDownloadScheme == null) { - throw e; + File untarredSegmentDir; + if (_isStreamSegmentDownloadUntar && zkMetadata.getCrypterName() == null) { + _logger.info("Downloading segment: {} using streamed download-untar with maxStreamRateInByte: {}", segmentName, + _streamSegmentDownloadUntarRateLimitBytesPerSec); + AtomicInteger attempts = new AtomicInteger(0); + try { + untarredSegmentDir = SegmentFetcherFactory.fetchAndStreamUntarToLocal(downloadUrl, tempRootDir, + _streamSegmentDownloadUntarRateLimitBytesPerSec, attempts); + _logger.info("Downloaded and untarred segment: {} from: {}, attempts: {}", segmentName, downloadUrl, + attempts.get()); + } finally { + _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.SEGMENT_STREAMED_DOWNLOAD_UNTAR_FAILURES, + attempts.get()); + } + } else { + File segmentTarFile = new File(tempRootDir, segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION); + SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(downloadUrl, segmentTarFile, zkMetadata.getCrypterName()); + _logger.info("Downloaded tarred segment: {} from: {} to: {}, file length: {}", segmentName, downloadUrl, + segmentTarFile, segmentTarFile.length()); + untarredSegmentDir = untarSegment(segmentName, segmentTarFile, tempRootDir); } - downloadFromPeersWithoutStreaming(segmentName, zkMetadata, tarFile); - downloadSuccess = true; - return tarFile; + File indexDir = moveSegment(segmentName, untarredSegmentDir); + _logger.info("Downloaded segment: {} from: {} to: {}", segmentName, downloadUrl, indexDir); + return indexDir; + } catch (Exception e) { + _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.SEGMENT_DOWNLOAD_FROM_REMOTE_FAILURES, 1); + throw e; } finally { - if (!downloadSuccess) { - _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.SEGMENT_DOWNLOAD_FAILURES, 1L); - } if (_segmentDownloadSemaphore != null) { _segmentDownloadSemaphore.release(); } + FileUtils.deleteQuietly(tempRootDir); } } - protected void downloadFromPeersWithoutStreaming(String segmentName, SegmentZKMetadata zkMetadata, File destTarFile) + @VisibleForTesting + File downloadSegmentFromPeers(SegmentZKMetadata zkMetadata) throws Exception { + String segmentName = zkMetadata.getSegmentName(); Preconditions.checkState(_peerDownloadScheme != null, "Peer download is not enabled for table: %s", _tableNameWithType); + _logger.info("Downloading segment: {} from peers", segmentName); + File tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "-" + UUID.randomUUID()); Review Comment: Actually after a second thought, I feel keeping them within the method might be safer. In case download segment from deep store failed, we want to delete the tmp dir before trying to download from peers -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org