jackjlli commented on code in PR #9905:
URL: https://github.com/apache/pinot/pull/9905#discussion_r1041817991


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java:
##########
@@ -344,14 +349,37 @@ private void processNewSegment(String tableNameWithType, 
SegmentMetadata segment
       // Release lock. Expected version will be 0 as we hold a lock and no 
updates could take place meanwhile.
       newSegmentZKMetadata.setSegmentUploadStartTime(-1);
       if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, 
newSegmentZKMetadata, 0)) {
-        _pinotHelixResourceManager.deleteSegment(tableNameWithType, 
segmentName);
-        LOGGER.info("Deleted zk entry and segment {} for table {}.", 
segmentName, tableNameWithType);
-        throw new RuntimeException(
-            String.format("Failed to update ZK metadata for segment: %s of 
table: %s", segmentFile, tableNameWithType));
+        // There is a race condition when it took too much time for the 1st 
segment upload to process (due to slow
+        // PinotFS access), which leads to the 2nd attempt of segment upload, 
and the 2nd segment upload succeeded.
+        // In this case, when the 1st upload comes back, it shouldn't blindly 
delete the segment when it failed to
+        // update the zk metadata. Instead, the 1st attempt should validate 
the upload start time one more time. If the
+        // start time doesn't match with the one persisted in zk metadata, 
segment deletion should be skipped.
+        String errorMsg =
+            String.format("Failed to update ZK metadata for segment: %s of 
table: %s", segmentFile, tableNameWithType);
+        LOGGER.error(errorMsg);
+        deleteSegmentIfNeeded(tableNameWithType, segmentName, 
segmentUploadStartTime);
+        throw new RuntimeException(errorMsg);
       }
     }
   }
 
+  /**
+   * Deletes the segment to be uploaded if the uploadStartTime matches with 
the one persisted in ZK metadata.
+   */
+  private void deleteSegmentIfNeeded(String tableNameWithType, String 
segmentName, long currentSegmentUploadStartTime) {
+    ZNRecord existingSegmentMetadataZNRecord =
+        
_pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType, 
segmentName);
+    if (existingSegmentMetadataZNRecord == null) {
+      return;
+    }
+    // Check if the upload start time is set by this thread itself, if yes 
delete the segment.
+    SegmentZKMetadata segmentZKMetadata = new 
SegmentZKMetadata(existingSegmentMetadataZNRecord);
+    long existingSegmentUploadStartTime = 
segmentZKMetadata.getSegmentUploadStartTime();
+    if (currentSegmentUploadStartTime == existingSegmentUploadStartTime) {

Review Comment:
   If `enableParallelPushProtection` is not enabled, controller won't update 
the ZK metadata. So we should be good.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to