This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch fix-segment-upload-race-condition in repository https://gitbox.apache.org/repos/asf/pinot.git
commit f8f6a9fdfe09686730e2409c20a631dda5256a79 Author: Jack Li(Analytics Engineering) <j...@jlli-mn1.linkedin.biz> AuthorDate: Fri Dec 2 14:06:07 2022 -0800 Fix race condition when 2 segment upload occurred for the same segment --- .../pinot/controller/api/upload/ZKOperator.java | 34 ++++++++++++++++++---- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java index 3d33a358f3..57406dd131 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java @@ -261,6 +261,9 @@ public class ZKOperator { } private void checkCRC(HttpHeaders headers, String tableNameWithType, String segmentName, long existingCrc) { + if (headers == null) { + return; + } String expectedCrcStr = headers.getHeaderString(HttpHeaders.IF_MATCH); if (expectedCrcStr != null) { long expectedCrc; @@ -271,7 +274,7 @@ public class ZKOperator { String.format("Caught exception for segment: %s of table: %s while parsing IF-MATCH CRC: \"%s\"", segmentName, tableNameWithType, expectedCrcStr), Response.Status.PRECONDITION_FAILED); } - if (expectedCrc != existingCrc) { + if (!isCRCMatched(expectedCrc, existingCrc)) { throw new ControllerApplicationException(LOGGER, String.format("For segment: %s of table: %s, expected CRC: %d does not match existing CRC: %d", segmentName, tableNameWithType, expectedCrc, existingCrc), Response.Status.PRECONDITION_FAILED); @@ -279,6 +282,10 @@ public class ZKOperator { } } + private boolean isCRCMatched(long expectedCRC, long existingCRC) { + return expectedCRC == existingCRC; + } + private void processNewSegment(String tableNameWithType, SegmentMetadata segmentMetadata, FileUploadType uploadType, @Nullable URI finalSegmentLocationURI, File segmentFile, @Nullable String sourceDownloadURIStr, String segmentDownloadURIStr, @Nullable String crypterName, long segmentSizeInBytes, @@ -344,10 +351,27 @@ public class ZKOperator { // Release lock. Expected version will be 0 as we hold a lock and no updates could take place meanwhile. newSegmentZKMetadata.setSegmentUploadStartTime(-1); if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, newSegmentZKMetadata, 0)) { - _pinotHelixResourceManager.deleteSegment(tableNameWithType, segmentName); - LOGGER.info("Deleted zk entry and segment {} for table {}.", segmentName, tableNameWithType); - throw new RuntimeException( - String.format("Failed to update ZK metadata for segment: %s of table: %s", segmentFile, tableNameWithType)); + // There is a race condition when it took too much time for the 1st segment upload to process (due to slow + // PinotFS access), which leads to the 2nd attempt of segment upload, and the 2nd segment upload succeeded. + // In this case, when the 1st upload comes back, it shouldn't blindly delete the segment when it failed to + // update the zk metadata. Instead, the 1st attempt should validate the crc one more time. If crc remains the + // same, segment deletion should be skipped. + ZNRecord existingSegmentMetadataZNRecord = + _pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType, segmentName); + // Check if CRC match when IF-MATCH header is set + SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(existingSegmentMetadataZNRecord); + long existingCrc = segmentZKMetadata.getCrc(); + try { + checkCRC(headers, tableNameWithType, segmentName, existingCrc); + LOGGER.info("CRC is the same as the one in ZK. Skip updating the zk metadata for segment: " + segmentName); + } catch (ControllerApplicationException e) { + LOGGER.error("Failed to validate CRC for segment: " + segmentName, e); + _pinotHelixResourceManager.deleteSegment(tableNameWithType, segmentName); + LOGGER.info("Deleted zk entry and segment {} for table {}.", segmentName, tableNameWithType); + throw new RuntimeException( + String.format("Failed to update ZK metadata for segment: %s of table: %s", segmentFile, + tableNameWithType)); + } } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org