9aman commented on code in PR #15316: URL: https://github.com/apache/pinot/pull/15316#discussion_r2007034972
########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java: ########## @@ -564,77 +562,59 @@ private void doAddConsumingSegment(String segmentName) @Override public File downloadSegment(SegmentZKMetadata zkMetadata) throws Exception { - Preconditions.checkState(zkMetadata.getStatus() != Status.IN_PROGRESS, - "Segment: %s is still IN_PROGRESS and cannot be downloaded", zkMetadata.getSegmentName()); + String segmentName = zkMetadata.getSegmentName(); + Status status = zkMetadata.getStatus(); + Preconditions.checkState(status != Status.IN_PROGRESS, "Segment: %s is still IN_PROGRESS and cannot be downloaded", + segmentName); - // Case: The commit protocol has completed, and the segment is ready to be downloaded either - // from deep storage or from a peer (if peer-to-peer download is enabled). - if (zkMetadata.getStatus() == Status.DONE) { + // The commit protocol has completed, and the segment is ready to be downloaded either from deep storage or from a + // peer (if peer-to-peer download is enabled). + if (status.isCompleted()) { return super.downloadSegment(zkMetadata); } // The segment status is COMMITTING, indicating that the segment commit process is incomplete. // Attempting a waited download within the configured time limit. - long downloadTimeoutMilliseconds = - getDownloadTimeOutMilliseconds(ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType)); - final long startTime = System.currentTimeMillis(); - List<URI> onlineServerURIs; - while (System.currentTimeMillis() - startTime < downloadTimeoutMilliseconds) { + Preconditions.checkState(status == Status.COMMITTING, "Invalid status: %s for segment: %s", status, segmentName); + TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType); + Preconditions.checkState(tableConfig != null, "Failed to find table config for table: %s", _tableNameWithType); + long downloadTimeoutMs = getDownloadTimeoutMs(tableConfig); + long deadlineMs = System.currentTimeMillis() + downloadTimeoutMs; + while (System.currentTimeMillis() < deadlineMs) { // ZK Metadata may change during segment download process; fetch it on every retry. - zkMetadata = fetchZKMetadata(zkMetadata.getSegmentName()); - - if (zkMetadata.getDownloadUrl() != null) { - // The downloadSegment() will throw an exception in case there are some genuine issues. - // We don't want to retry in those scenarios and will throw an exception - return downloadSegmentFromDeepStore(zkMetadata); - } - - if (_peerDownloadScheme != null) { - _logger.info("Peer download is enabled for the segment: {}", zkMetadata.getSegmentName()); - try { - onlineServerURIs = new ArrayList<>(); - PeerServerSegmentFinder.getOnlineServersFromExternalView(_helixManager.getClusterManagmentTool(), - _helixManager.getClusterName(), _tableNameWithType, zkMetadata.getSegmentName(), _peerDownloadScheme, - onlineServerURIs); - if (!onlineServerURIs.isEmpty()) { - return downloadSegmentFromPeers(zkMetadata); - } - } catch (Exception e) { - _logger.warn("Could not download segment: {} from peer", zkMetadata.getSegmentName(), e); - } + zkMetadata = fetchZKMetadata(segmentName); + if (zkMetadata.getStatus().isCompleted()) { Review Comment: What would happen when the `commitEndMetadata` call to update the ZK metadata with the download url, etc fails ? The segment will be stuck in the `COMMITTING` state and might also be available on the lead server for peer download. We will not enter there as the `zkMetadata.getStatus().isCompleted()` will always be `false`. These situation of missing download url's is fixed during `segment level validations` done in RealtimeSegmentValidationManager. The frequency of these runs is kept lower as it scans all the segments ZK metadata. If we remove this check the `downloadSegment` will fail as ``` Preconditions.checkState(downloadUrl != null, "Failed to find download URL in ZK metadata for segment: %s of table: %s", segmentName, _tableNameWithType); ``` Normal ingestion ensures that we either have a proper download url or an empty string in case upload fails and we have peer download enabled. This is the reason for writing a different `downloadSegment` instead of using the parent class one's. It felt that I will end up changing a lot of things for the OFFLINE tables as well that use the base class download function. ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java: ########## @@ -564,77 +562,59 @@ private void doAddConsumingSegment(String segmentName) @Override public File downloadSegment(SegmentZKMetadata zkMetadata) throws Exception { - Preconditions.checkState(zkMetadata.getStatus() != Status.IN_PROGRESS, - "Segment: %s is still IN_PROGRESS and cannot be downloaded", zkMetadata.getSegmentName()); + String segmentName = zkMetadata.getSegmentName(); + Status status = zkMetadata.getStatus(); + Preconditions.checkState(status != Status.IN_PROGRESS, "Segment: %s is still IN_PROGRESS and cannot be downloaded", + segmentName); - // Case: The commit protocol has completed, and the segment is ready to be downloaded either - // from deep storage or from a peer (if peer-to-peer download is enabled). - if (zkMetadata.getStatus() == Status.DONE) { + // The commit protocol has completed, and the segment is ready to be downloaded either from deep storage or from a + // peer (if peer-to-peer download is enabled). + if (status.isCompleted()) { return super.downloadSegment(zkMetadata); } // The segment status is COMMITTING, indicating that the segment commit process is incomplete. // Attempting a waited download within the configured time limit. - long downloadTimeoutMilliseconds = - getDownloadTimeOutMilliseconds(ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType)); - final long startTime = System.currentTimeMillis(); - List<URI> onlineServerURIs; - while (System.currentTimeMillis() - startTime < downloadTimeoutMilliseconds) { + Preconditions.checkState(status == Status.COMMITTING, "Invalid status: %s for segment: %s", status, segmentName); + TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType); + Preconditions.checkState(tableConfig != null, "Failed to find table config for table: %s", _tableNameWithType); + long downloadTimeoutMs = getDownloadTimeoutMs(tableConfig); + long deadlineMs = System.currentTimeMillis() + downloadTimeoutMs; + while (System.currentTimeMillis() < deadlineMs) { // ZK Metadata may change during segment download process; fetch it on every retry. - zkMetadata = fetchZKMetadata(zkMetadata.getSegmentName()); - - if (zkMetadata.getDownloadUrl() != null) { - // The downloadSegment() will throw an exception in case there are some genuine issues. - // We don't want to retry in those scenarios and will throw an exception - return downloadSegmentFromDeepStore(zkMetadata); - } - - if (_peerDownloadScheme != null) { - _logger.info("Peer download is enabled for the segment: {}", zkMetadata.getSegmentName()); - try { - onlineServerURIs = new ArrayList<>(); - PeerServerSegmentFinder.getOnlineServersFromExternalView(_helixManager.getClusterManagmentTool(), - _helixManager.getClusterName(), _tableNameWithType, zkMetadata.getSegmentName(), _peerDownloadScheme, - onlineServerURIs); - if (!onlineServerURIs.isEmpty()) { - return downloadSegmentFromPeers(zkMetadata); - } - } catch (Exception e) { - _logger.warn("Could not download segment: {} from peer", zkMetadata.getSegmentName(), e); - } + zkMetadata = fetchZKMetadata(segmentName); + if (zkMetadata.getStatus().isCompleted()) { Review Comment: cc: @KKcorps -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org