rajagopr commented on code in PR #13646:
URL: https://github.com/apache/pinot/pull/13646#discussion_r1755531409


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java:
##########
@@ -110,6 +115,61 @@ public void completeSegmentOperations(String 
tableNameWithType, SegmentMetadata
     }
   }
 
+  // Complete segment operations for a list of segments in batch mode
+  public void completeSegmentsOperations(String tableNameWithType, 
FileUploadType uploadType,
+      boolean enableParallelPushProtection, boolean allowRefresh, HttpHeaders 
headers,
+      List<SegmentUploadMetadata> segmentUploadMetadataList)
+      throws Exception {
+    boolean refreshOnly =
+        
Boolean.parseBoolean(headers.getHeaderString(FileUploadDownloadClient.CustomHeaders.REFRESH_ONLY));
+    List<SegmentUploadMetadata> newSegmentsList = new ArrayList<>();
+    List<SegmentUploadMetadata> existingSegmentsList = new ArrayList<>();
+    for (SegmentUploadMetadata segmentUploadMetadata: 
segmentUploadMetadataList) {
+      SegmentMetadata segmentMetadata = 
segmentUploadMetadata.getSegmentMetadata();
+      String segmentName = segmentMetadata.getName();
+
+      ZNRecord existingSegmentMetadataZNRecord =
+          
_pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType, 
segmentName);
+      if (existingSegmentMetadataZNRecord != null && 
shouldProcessAsNewSegment(tableNameWithType, segmentName,
+          existingSegmentMetadataZNRecord, enableParallelPushProtection)) {
+        LOGGER.warn("Removing segment ZK metadata (recovering from previous 
upload failure) for table: {}, segment: {}",
+            tableNameWithType, segmentName);
+        
Preconditions.checkState(_pinotHelixResourceManager.removeSegmentZKMetadata(tableNameWithType,
 segmentName),
+            "Failed to remove segment ZK metadata for table: %s, segment: %s", 
tableNameWithType, segmentName);
+        existingSegmentMetadataZNRecord = null;
+      }
+
+      if (existingSegmentMetadataZNRecord == null) {
+        // Add a new segment
+        if (refreshOnly) {
+          throw new ControllerApplicationException(LOGGER,
+              String.format("Cannot refresh non-existing segment: %s for 
table: %s", segmentName, tableNameWithType),
+              Response.Status.GONE);
+        }
+        LOGGER.info("Adding new segment: {} to table: {}", segmentName, 
tableNameWithType);
+        newSegmentsList.add(segmentUploadMetadata);
+      } else {
+        // Refresh an existing segment
+        if (!allowRefresh) {
+          // We cannot perform this check up-front in UploadSegment API call. 
If a segment doesn't exist during the
+          // check done up-front but ends up getting created before the check 
here, we could incorrectly refresh an
+          // existing segment.
+          throw new ControllerApplicationException(LOGGER,
+              String.format("Segment: %s already exists in table: %s. Refresh 
not permitted.", segmentName,
+                  tableNameWithType), Response.Status.CONFLICT);
+        }
+        LOGGER.info("Segment: {} already exists in table: {}, refreshing it", 
segmentName, tableNameWithType);
+        
segmentUploadMetadata.setSegmentMetadataZNRecord(existingSegmentMetadataZNRecord);
+        existingSegmentsList.add(segmentUploadMetadata);
+      }
+    }
+    // process new segments
+    processNewSegments(tableNameWithType, uploadType, 
enableParallelPushProtection, headers, newSegmentsList);
+
+    // process existing segments
+    processExistingSegments(tableNameWithType, uploadType, 
enableParallelPushProtection, headers, existingSegmentsList);

Review Comment:
   Yes, if the batch call fails after processing a subset of the segments it 
would be safe to call the method completeSegmentOperations again with the same 
set of inputs and the segments would be automatically classified as new or 
existing segments. If the segment already exists it would get refreshed – if 
nothing has changed, the segment refreshed timestamp would get updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to