tibrewalpratik17 commented on code in PR #14506: URL: https://github.com/apache/pinot/pull/14506#discussion_r1851836657
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -1555,26 +1555,38 @@ public void uploadToDeepStoreIfMissing(TableConfig tableConfig, List<SegmentZKMe segmentName)); } - // Randomly ask one server to upload - URI uri = peerSegmentURIs.get(RANDOM.nextInt(peerSegmentURIs.size())); - String serverUploadRequestUrl = StringUtil.join("/", uri.toString(), "upload"); - serverUploadRequestUrl = - String.format("%s?uploadTimeoutMs=%d", serverUploadRequestUrl, _deepstoreUploadRetryTimeoutMs); - LOGGER.info("Ask server to upload LLC segment {} to deep store by this path: {}", segmentName, - serverUploadRequestUrl); - String tempSegmentDownloadUrl = _fileUploadDownloadClient.uploadToSegmentStore(serverUploadRequestUrl); - String segmentDownloadUrl = - moveSegmentFile(rawTableName, segmentName, tempSegmentDownloadUrl, pinotFS); - LOGGER.info("Updating segment {} download url in ZK to be {}", segmentName, segmentDownloadUrl); - - // Update segment ZK metadata by adding the download URL - segmentZKMetadata.setDownloadUrl(segmentDownloadUrl); - // TODO: add version check when persist segment ZK metadata - persistSegmentZKMetadata(realtimeTableName, segmentZKMetadata, -1); - LOGGER.info("Successfully uploaded LLC segment {} to deep store with download url: {}", segmentName, - segmentDownloadUrl); - _controllerMetrics.addMeteredTableValue(realtimeTableName, - ControllerMeter.LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_SUCCESS, 1L); + int iteration = 0; + // Round robin the servers until we find the one with the correct crc and successful upload + // If server is the last valid URI left then skip crc check as deepstore copy reliability takes a higher + // priority + for (URI uri: peerSegmentURIs) { + String serverUploadRequestUrl = StringUtil.join("/", uri.toString(), "upload"); + serverUploadRequestUrl = + String.format("%s?uploadTimeoutMs=%d&expectedCrc=%d", serverUploadRequestUrl, + _deepstoreUploadRetryTimeoutMs, + (iteration == (peerSegmentURIs.size() - 1) ? -1 : segmentZKMetadata.getCrc())); + LOGGER.info("Ask server {} to upload LLC segment {} to deep store by this path: {}", uri, segmentName, + serverUploadRequestUrl); + String tempSegmentDownloadUrl; + try { + tempSegmentDownloadUrl = _fileUploadDownloadClient.uploadToSegmentStore(serverUploadRequestUrl); + } catch (Exception e) { + LOGGER.warn("Failed to upload LLC segment {} to deepstore from server {}", segmentName, uri); + iteration++; + continue; + } + + String segmentDownloadUrl = moveSegmentFile(rawTableName, segmentName, tempSegmentDownloadUrl, pinotFS); + // Update segment ZK metadata by adding the download URL + LOGGER.info("Updating segment {} download url in ZK to be {}", segmentName, segmentDownloadUrl); + segmentZKMetadata.setDownloadUrl(segmentDownloadUrl); + // TODO: add version check when persist segment ZK metadata + persistSegmentZKMetadata(realtimeTableName, segmentZKMetadata, -1); + LOGGER.info("Successfully uploaded LLC segment {} to deep store with download url: {}", segmentName, Review Comment: Yeah my bad fixed now! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org