rajagopr commented on code in PR #13646: URL: https://github.com/apache/pinot/pull/13646#discussion_r1755531409
########## pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java: ########## @@ -110,6 +115,61 @@ public void completeSegmentOperations(String tableNameWithType, SegmentMetadata } } + // Complete segment operations for a list of segments in batch mode + public void completeSegmentsOperations(String tableNameWithType, FileUploadType uploadType, + boolean enableParallelPushProtection, boolean allowRefresh, HttpHeaders headers, + List<SegmentUploadMetadata> segmentUploadMetadataList) + throws Exception { + boolean refreshOnly = + Boolean.parseBoolean(headers.getHeaderString(FileUploadDownloadClient.CustomHeaders.REFRESH_ONLY)); + List<SegmentUploadMetadata> newSegmentsList = new ArrayList<>(); + List<SegmentUploadMetadata> existingSegmentsList = new ArrayList<>(); + for (SegmentUploadMetadata segmentUploadMetadata: segmentUploadMetadataList) { + SegmentMetadata segmentMetadata = segmentUploadMetadata.getSegmentMetadata(); + String segmentName = segmentMetadata.getName(); + + ZNRecord existingSegmentMetadataZNRecord = + _pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType, segmentName); + if (existingSegmentMetadataZNRecord != null && shouldProcessAsNewSegment(tableNameWithType, segmentName, + existingSegmentMetadataZNRecord, enableParallelPushProtection)) { + LOGGER.warn("Removing segment ZK metadata (recovering from previous upload failure) for table: {}, segment: {}", + tableNameWithType, segmentName); + Preconditions.checkState(_pinotHelixResourceManager.removeSegmentZKMetadata(tableNameWithType, segmentName), + "Failed to remove segment ZK metadata for table: %s, segment: %s", tableNameWithType, segmentName); + existingSegmentMetadataZNRecord = null; + } + + if (existingSegmentMetadataZNRecord == null) { + // Add a new segment + if (refreshOnly) { + throw new ControllerApplicationException(LOGGER, + String.format("Cannot refresh non-existing segment: %s for table: %s", segmentName, tableNameWithType), + Response.Status.GONE); + } + LOGGER.info("Adding new segment: {} to table: {}", segmentName, tableNameWithType); + newSegmentsList.add(segmentUploadMetadata); + } else { + // Refresh an existing segment + if (!allowRefresh) { + // We cannot perform this check up-front in UploadSegment API call. If a segment doesn't exist during the + // check done up-front but ends up getting created before the check here, we could incorrectly refresh an + // existing segment. + throw new ControllerApplicationException(LOGGER, + String.format("Segment: %s already exists in table: %s. Refresh not permitted.", segmentName, + tableNameWithType), Response.Status.CONFLICT); + } + LOGGER.info("Segment: {} already exists in table: {}, refreshing it", segmentName, tableNameWithType); + segmentUploadMetadata.setSegmentMetadataZNRecord(existingSegmentMetadataZNRecord); + existingSegmentsList.add(segmentUploadMetadata); + } + } + // process new segments + processNewSegments(tableNameWithType, uploadType, enableParallelPushProtection, headers, newSegmentsList); + + // process existing segments + processExistingSegments(tableNameWithType, uploadType, enableParallelPushProtection, headers, existingSegmentsList); Review Comment: Yes, if the batch call fails after processing a subset of the segments it would be safe to call the method completeSegmentOperations again with the same set of inputs and the segments would be automatically classified as new or existing segments. If the segment already exists it would get refreshed – if nothing has changed, the segment refreshed timestamp would get updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org