This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch fix-segment-upload-race-condition
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit f8f6a9fdfe09686730e2409c20a631dda5256a79
Author: Jack Li(Analytics Engineering) <j...@jlli-mn1.linkedin.biz>
AuthorDate: Fri Dec 2 14:06:07 2022 -0800

    Fix race condition when 2 segment upload occurred for the same segment
---
 .../pinot/controller/api/upload/ZKOperator.java    | 34 ++++++++++++++++++----
 1 file changed, 29 insertions(+), 5 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
index 3d33a358f3..57406dd131 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
@@ -261,6 +261,9 @@ public class ZKOperator {
   }
 
   private void checkCRC(HttpHeaders headers, String tableNameWithType, String 
segmentName, long existingCrc) {
+    if (headers == null) {
+      return;
+    }
     String expectedCrcStr = headers.getHeaderString(HttpHeaders.IF_MATCH);
     if (expectedCrcStr != null) {
       long expectedCrc;
@@ -271,7 +274,7 @@ public class ZKOperator {
             String.format("Caught exception for segment: %s of table: %s while 
parsing IF-MATCH CRC: \"%s\"",
                 segmentName, tableNameWithType, expectedCrcStr), 
Response.Status.PRECONDITION_FAILED);
       }
-      if (expectedCrc != existingCrc) {
+      if (!isCRCMatched(expectedCrc, existingCrc)) {
         throw new ControllerApplicationException(LOGGER,
             String.format("For segment: %s of table: %s, expected CRC: %d does 
not match existing CRC: %d", segmentName,
                 tableNameWithType, expectedCrc, existingCrc), 
Response.Status.PRECONDITION_FAILED);
@@ -279,6 +282,10 @@ public class ZKOperator {
     }
   }
 
+  private boolean isCRCMatched(long expectedCRC, long existingCRC) {
+    return expectedCRC == existingCRC;
+  }
+
   private void processNewSegment(String tableNameWithType, SegmentMetadata 
segmentMetadata, FileUploadType uploadType,
       @Nullable URI finalSegmentLocationURI, File segmentFile, @Nullable 
String sourceDownloadURIStr,
       String segmentDownloadURIStr, @Nullable String crypterName, long 
segmentSizeInBytes,
@@ -344,10 +351,27 @@ public class ZKOperator {
       // Release lock. Expected version will be 0 as we hold a lock and no 
updates could take place meanwhile.
       newSegmentZKMetadata.setSegmentUploadStartTime(-1);
       if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, 
newSegmentZKMetadata, 0)) {
-        _pinotHelixResourceManager.deleteSegment(tableNameWithType, 
segmentName);
-        LOGGER.info("Deleted zk entry and segment {} for table {}.", 
segmentName, tableNameWithType);
-        throw new RuntimeException(
-            String.format("Failed to update ZK metadata for segment: %s of 
table: %s", segmentFile, tableNameWithType));
+        // There is a race condition when it took too much time for the 1st 
segment upload to process (due to slow
+        // PinotFS access), which leads to the 2nd attempt of segment upload, 
and the 2nd segment upload succeeded.
+        // In this case, when the 1st upload comes back, it shouldn't blindly 
delete the segment when it failed to
+        // update the zk metadata. Instead, the 1st attempt should validate 
the crc one more time. If crc remains the
+        // same, segment deletion should be skipped.
+        ZNRecord existingSegmentMetadataZNRecord =
+            
_pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType, 
segmentName);
+        // Check if CRC match when IF-MATCH header is set
+        SegmentZKMetadata segmentZKMetadata = new 
SegmentZKMetadata(existingSegmentMetadataZNRecord);
+        long existingCrc = segmentZKMetadata.getCrc();
+        try {
+          checkCRC(headers, tableNameWithType, segmentName, existingCrc);
+          LOGGER.info("CRC is the same as the one in ZK. Skip updating the zk 
metadata for segment: " + segmentName);
+        } catch (ControllerApplicationException e) {
+          LOGGER.error("Failed to validate CRC for segment: " + segmentName, 
e);
+          _pinotHelixResourceManager.deleteSegment(tableNameWithType, 
segmentName);
+          LOGGER.info("Deleted zk entry and segment {} for table {}.", 
segmentName, tableNameWithType);
+          throw new RuntimeException(
+              String.format("Failed to update ZK metadata for segment: %s of 
table: %s", segmentFile,
+                  tableNameWithType));
+        }
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to