KKcorps commented on code in PR #15316: URL: https://github.com/apache/pinot/pull/15316#discussion_r2011333641
########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java: ########## @@ -564,77 +564,74 @@ private void doAddConsumingSegment(String segmentName) @Override public File downloadSegment(SegmentZKMetadata zkMetadata) throws Exception { - Preconditions.checkState(zkMetadata.getStatus() != Status.IN_PROGRESS, - "Segment: %s is still IN_PROGRESS and cannot be downloaded", zkMetadata.getSegmentName()); - - // Case: The commit protocol has completed, and the segment is ready to be downloaded either - // from deep storage or from a peer (if peer-to-peer download is enabled). - if (zkMetadata.getStatus() == Status.DONE) { + Status status = zkMetadata.getStatus(); + if (status.isCompleted()) { + // Segment is completed and ready to be downloaded either from deep storage or from a peer (if peer-to-peer + // download is enabled). return super.downloadSegment(zkMetadata); } // The segment status is COMMITTING, indicating that the segment commit process is incomplete. // Attempting a waited download within the configured time limit. - long downloadTimeoutMilliseconds = - getDownloadTimeOutMilliseconds(ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType)); - final long startTime = System.currentTimeMillis(); - List<URI> onlineServerURIs; - while (System.currentTimeMillis() - startTime < downloadTimeoutMilliseconds) { + String segmentName = zkMetadata.getSegmentName(); + Preconditions.checkState(status == Status.COMMITTING, "Invalid status: %s for segment: %s to be downloaded", status, + segmentName); + TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType); + Preconditions.checkState(tableConfig != null, "Failed to find table config for table: %s", _tableNameWithType); + long downloadTimeoutMs = getDownloadTimeoutMs(tableConfig); + long deadlineMs = System.currentTimeMillis() + downloadTimeoutMs; + while (System.currentTimeMillis() < deadlineMs) { // ZK Metadata may change during segment download process; fetch it on every retry. - zkMetadata = fetchZKMetadata(zkMetadata.getSegmentName()); - - if (zkMetadata.getDownloadUrl() != null) { - // The downloadSegment() will throw an exception in case there are some genuine issues. - // We don't want to retry in those scenarios and will throw an exception - return downloadSegmentFromDeepStore(zkMetadata); + zkMetadata = fetchZKMetadata(segmentName); + if (zkMetadata.getStatus().isCompleted()) { + return super.downloadSegment(zkMetadata); } + // Segment is still in COMMITTING status, but it might already be ONLINE on some peer servers. Try to find ONLINE + // segment and download it from peers. if (_peerDownloadScheme != null) { - _logger.info("Peer download is enabled for the segment: {}", zkMetadata.getSegmentName()); try { - onlineServerURIs = new ArrayList<>(); + List<URI> onlineServerURIs = new ArrayList<>(); PeerServerSegmentFinder.getOnlineServersFromExternalView(_helixManager.getClusterManagmentTool(), _helixManager.getClusterName(), _tableNameWithType, zkMetadata.getSegmentName(), _peerDownloadScheme, onlineServerURIs); if (!onlineServerURIs.isEmpty()) { return downloadSegmentFromPeers(zkMetadata); } } catch (Exception e) { - _logger.warn("Could not download segment: {} from peer", zkMetadata.getSegmentName(), e); + _logger.warn("Caught exception while trying to download segment: {} from peers, continue retrying", + segmentName, e); } } - long timeElapsed = System.currentTimeMillis() - startTime; - long timeRemaining = downloadTimeoutMilliseconds - timeElapsed; - - if (timeRemaining <= 0) { + long timeRemainingMs = deadlineMs - System.currentTimeMillis(); + if (timeRemainingMs <= 0) { break; } - _logger.info("Sleeping for 30 seconds as the segment url is missing. Time remaining: {} minutes", - Math.round(timeRemaining / 60000.0)); - - // Sleep for the shorter of our normal interval or remaining time - Thread.sleep(Math.min(SLEEP_INTERVAL_MS, timeRemaining)); + long sleepTimeMs = Math.min(SLEEP_INTERVAL_MS, timeRemainingMs); + _logger.info("Sleeping for: {}ms waiting for segment: {} to be completed. Time remaining: {}ms", sleepTimeMs, + segmentName, timeRemainingMs); + //noinspection BusyWait + Thread.sleep(sleepTimeMs); } // If we exit the loop without returning, throw an exception throw new TimeoutException( - "Failed to download segment after " + TimeUnit.MILLISECONDS.toMinutes(downloadTimeoutMilliseconds) - + " minutes of retrying. Segment: " + zkMetadata.getSegmentName()); - } - - private long getDownloadTimeOutMilliseconds(@Nullable TableConfig tableConfig) { - return Optional.ofNullable(tableConfig).map(TableConfig::getIngestionConfig) - .map(IngestionConfig::getStreamIngestionConfig).map(StreamIngestionConfig::getStreamConfigMaps) - .filter(maps -> !maps.isEmpty()).map(maps -> maps.get(0)).map(map -> map.get(SEGMENT_DOWNLOAD_TIMEOUT_MINUTES)) - .map(timeoutStr -> { - try { - return TimeUnit.MINUTES.toMillis(Long.parseLong(timeoutStr)); - } catch (NumberFormatException e) { - return DEFAULT_SEGMENT_DOWNLOAD_TIMEOUT_MS; - } - }).orElse(DEFAULT_SEGMENT_DOWNLOAD_TIMEOUT_MS); + "Failed to download segment: " + segmentName + " after: " + downloadTimeoutMs + "ms of retrying"); + } + + private long getDownloadTimeoutMs(TableConfig tableConfig) { + Map<String, String> streamConfigMap = IngestionConfigUtils.getStreamConfigMaps(tableConfig).get(0); Review Comment: nit: Add a TODO that we are only honoring the value from first stream config map in case of multi-stream ingestion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org