Jackie-Jiang commented on code in PR #15316: URL: https://github.com/apache/pinot/pull/15316#discussion_r2005914016
########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java: ########## @@ -564,77 +562,59 @@ private void doAddConsumingSegment(String segmentName) @Override public File downloadSegment(SegmentZKMetadata zkMetadata) throws Exception { - Preconditions.checkState(zkMetadata.getStatus() != Status.IN_PROGRESS, - "Segment: %s is still IN_PROGRESS and cannot be downloaded", zkMetadata.getSegmentName()); + String segmentName = zkMetadata.getSegmentName(); + Status status = zkMetadata.getStatus(); + Preconditions.checkState(status != Status.IN_PROGRESS, "Segment: %s is still IN_PROGRESS and cannot be downloaded", + segmentName); - // Case: The commit protocol has completed, and the segment is ready to be downloaded either - // from deep storage or from a peer (if peer-to-peer download is enabled). - if (zkMetadata.getStatus() == Status.DONE) { + // The commit protocol has completed, and the segment is ready to be downloaded either from deep storage or from a + // peer (if peer-to-peer download is enabled). + if (status.isCompleted()) { return super.downloadSegment(zkMetadata); } // The segment status is COMMITTING, indicating that the segment commit process is incomplete. // Attempting a waited download within the configured time limit. - long downloadTimeoutMilliseconds = - getDownloadTimeOutMilliseconds(ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType)); - final long startTime = System.currentTimeMillis(); - List<URI> onlineServerURIs; - while (System.currentTimeMillis() - startTime < downloadTimeoutMilliseconds) { + Preconditions.checkState(status == Status.COMMITTING, "Invalid status: %s for segment: %s", status, segmentName); + TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType); + Preconditions.checkState(tableConfig != null, "Failed to find table config for table: %s", _tableNameWithType); + long downloadTimeoutMs = getDownloadTimeoutMs(tableConfig); + long deadlineMs = System.currentTimeMillis() + downloadTimeoutMs; + while (System.currentTimeMillis() < deadlineMs) { // ZK Metadata may change during segment download process; fetch it on every retry. - zkMetadata = fetchZKMetadata(zkMetadata.getSegmentName()); - - if (zkMetadata.getDownloadUrl() != null) { - // The downloadSegment() will throw an exception in case there are some genuine issues. - // We don't want to retry in those scenarios and will throw an exception - return downloadSegmentFromDeepStore(zkMetadata); - } - - if (_peerDownloadScheme != null) { - _logger.info("Peer download is enabled for the segment: {}", zkMetadata.getSegmentName()); - try { - onlineServerURIs = new ArrayList<>(); - PeerServerSegmentFinder.getOnlineServersFromExternalView(_helixManager.getClusterManagmentTool(), - _helixManager.getClusterName(), _tableNameWithType, zkMetadata.getSegmentName(), _peerDownloadScheme, - onlineServerURIs); - if (!onlineServerURIs.isEmpty()) { - return downloadSegmentFromPeers(zkMetadata); - } - } catch (Exception e) { - _logger.warn("Could not download segment: {} from peer", zkMetadata.getSegmentName(), e); - } + zkMetadata = fetchZKMetadata(segmentName); + if (zkMetadata.getStatus().isCompleted()) { + return super.downloadSegment(zkMetadata); } - long timeElapsed = System.currentTimeMillis() - startTime; - long timeRemaining = downloadTimeoutMilliseconds - timeElapsed; - - if (timeRemaining <= 0) { + long timeRemainingMs = deadlineMs - System.currentTimeMillis(); + if (timeRemainingMs <= 0) { break; } - _logger.info("Sleeping for 30 seconds as the segment url is missing. Time remaining: {} minutes", - Math.round(timeRemaining / 60000.0)); - - // Sleep for the shorter of our normal interval or remaining time - Thread.sleep(Math.min(SLEEP_INTERVAL_MS, timeRemaining)); + long sleepTimeMs = Math.min(SLEEP_INTERVAL_MS, timeRemainingMs); + _logger.info("Sleeping for: {}ms waiting for segment: {} to be completed. Time remaining: {}ms", sleepTimeMs, + segmentName, timeRemainingMs); + //noinspection BusyWait + Thread.sleep(sleepTimeMs); } // If we exit the loop without returning, throw an exception throw new TimeoutException( - "Failed to download segment after " + TimeUnit.MILLISECONDS.toMinutes(downloadTimeoutMilliseconds) - + " minutes of retrying. Segment: " + zkMetadata.getSegmentName()); - } - - private long getDownloadTimeOutMilliseconds(@Nullable TableConfig tableConfig) { - return Optional.ofNullable(tableConfig).map(TableConfig::getIngestionConfig) - .map(IngestionConfig::getStreamIngestionConfig).map(StreamIngestionConfig::getStreamConfigMaps) - .filter(maps -> !maps.isEmpty()).map(maps -> maps.get(0)).map(map -> map.get(SEGMENT_DOWNLOAD_TIMEOUT_MINUTES)) - .map(timeoutStr -> { - try { - return TimeUnit.MINUTES.toMillis(Long.parseLong(timeoutStr)); - } catch (NumberFormatException e) { - return DEFAULT_SEGMENT_DOWNLOAD_TIMEOUT_MS; - } - }).orElse(DEFAULT_SEGMENT_DOWNLOAD_TIMEOUT_MS); + "Failed to download segment: " + segmentName + " after: " + downloadTimeoutMs + "ms of retrying"); + } + + private long getDownloadTimeoutMs(TableConfig tableConfig) { + Map<String, String> streamConfigMap = IngestionConfigUtils.getStreamConfigMaps(tableConfig).get(0); + String timeoutSeconds = streamConfigMap.get(StreamConfigProperties.PAUSELESS_SEGMENT_DOWNLOAD_TIMEOUT_SECONDS); Review Comment: It is for backward compatibility, and I've marked the current one as deprecated. I don't want to make it in minutes because it is not flexible. We don't have any timeout in minute -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org