Jackie-Jiang commented on a change in pull request #8110: URL: https://github.com/apache/pinot/pull/8110#discussion_r798078236
########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java ########## @@ -255,6 +275,21 @@ private void processNewSegment(SegmentMetadata segmentMetadata, URI finalSegment segmentZKMetadata.setCustomMap(segmentZKMetadataCustomMapModifier.modifyMap(segmentZKMetadata.getCustomMap())); if (!_pinotHelixResourceManager .updateZkMetadata(tableNameWithType, segmentZKMetadata, segmentMetadataZnRecord.getVersion())) { + _pinotHelixResourceManager.deleteSegment(tableNameWithType, segmentName); + throw new RuntimeException( + "Failed to update ZK metadata for segment: " + segmentName + " of table: " + tableNameWithType); + } + } + + if (enableParallelPushProtection) { + // Release lock. + ZNRecord segmentMetadataZnRecord = Review comment: We should not read a new ZNRecord because this might not be the original one. We should reuse the `newSegmentZKMetadata` ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java ########## @@ -222,8 +222,16 @@ private void checkCRC(HttpHeaders headers, String offlineTableName, String segme private void processNewSegment(SegmentMetadata segmentMetadata, URI finalSegmentLocationURI, File currentSegmentLocation, String zkDownloadURI, HttpHeaders headers, String crypter, String tableNameWithType, - String segmentName, boolean moveSegmentToFinalLocation) + String segmentName, boolean moveSegmentToFinalLocation, boolean enableParallelPushProtection) throws Exception { + SegmentZKMetadata newSegmentZKMetadata = _pinotHelixResourceManager + .constructZkMetadataForNewSegment(tableNameWithType, segmentMetadata, zkDownloadURI, crypter, + enableParallelPushProtection); Review comment: Suggest not passing in `enableParallelPushProtection` but check it in this method and call `setSegmentUploadStartTime()` within this method to limit the scope of parallel push protection ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java ########## @@ -222,8 +222,16 @@ private void checkCRC(HttpHeaders headers, String offlineTableName, String segme private void processNewSegment(SegmentMetadata segmentMetadata, URI finalSegmentLocationURI, File currentSegmentLocation, String zkDownloadURI, HttpHeaders headers, String crypter, String tableNameWithType, - String segmentName, boolean moveSegmentToFinalLocation) + String segmentName, boolean moveSegmentToFinalLocation, boolean enableParallelPushProtection) throws Exception { + SegmentZKMetadata newSegmentZKMetadata = _pinotHelixResourceManager + .constructZkMetadataForNewSegment(tableNameWithType, segmentMetadata, zkDownloadURI, crypter, + enableParallelPushProtection); + if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, newSegmentZKMetadata)) { Review comment: Expected version 0 (current version) won't work if there is no existing ZK record. I don't know if ZK can create record only if it does not exist. IIRC expected version -1 means override anyway. If not, then there is still a race condition if 2 uploads happen at the same time and both of them run into this method. ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java ########## @@ -232,15 +240,27 @@ private void processNewSegment(SegmentMetadata segmentMetadata, URI finalSegment .info("Moved segment {} from temp location {} to {}", segmentName, currentSegmentLocation.getAbsolutePath(), finalSegmentLocationURI.getPath()); } catch (Exception e) { + // Cleanup the Zk entry and the segment from the permanent directory if it exists. LOGGER .error("Could not move segment {} from table {} to permanent directory", segmentName, tableNameWithType, e); + _pinotHelixResourceManager.deleteSegment(tableNameWithType, segmentName); Review comment: Can we add some tests to ensure this can properly clean up the ZK entry and the segment file? ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java ########## @@ -232,15 +240,27 @@ private void processNewSegment(SegmentMetadata segmentMetadata, URI finalSegment .info("Moved segment {} from temp location {} to {}", segmentName, currentSegmentLocation.getAbsolutePath(), finalSegmentLocationURI.getPath()); } catch (Exception e) { + // Cleanup the Zk entry and the segment from the permanent directory if it exists. LOGGER .error("Could not move segment {} from table {} to permanent directory", segmentName, tableNameWithType, e); + _pinotHelixResourceManager.deleteSegment(tableNameWithType, segmentName); throw new RuntimeException(e); } } else { LOGGER.info("Skipping segment move, keeping segment {} from table {} at {}", segmentName, tableNameWithType, zkDownloadURI); } - _pinotHelixResourceManager.addNewSegment(tableNameWithType, segmentMetadata, zkDownloadURI, crypter); + + try { + _pinotHelixResourceManager.assignTableSegment(tableNameWithType, segmentMetadata.getName()); + } catch (Exception e) { + // assignTableSegment removes the zk entry. Call deleteSegment to remove the segment from permanent location. + LOGGER + .error("Caught exception while calling assignTableSegment for adding segment: {} to table: {}", segmentName, + tableNameWithType, e); + _pinotHelixResourceManager.deleteSegment(tableNameWithType, segmentName); + throw new RuntimeException(e); + } // Update zk metadata customer map String segmentZKMetadataCustomMapModifierStr = headers != null ? headers Review comment: This modification should be applied when creating the initial metadata -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org