Jackie-Jiang commented on a change in pull request #8110:
URL: https://github.com/apache/pinot/pull/8110#discussion_r798078236



##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
##########
@@ -255,6 +275,21 @@ private void processNewSegment(SegmentMetadata 
segmentMetadata, URI finalSegment
       
segmentZKMetadata.setCustomMap(segmentZKMetadataCustomMapModifier.modifyMap(segmentZKMetadata.getCustomMap()));
       if (!_pinotHelixResourceManager
           .updateZkMetadata(tableNameWithType, segmentZKMetadata, 
segmentMetadataZnRecord.getVersion())) {
+        _pinotHelixResourceManager.deleteSegment(tableNameWithType, 
segmentName);
+        throw new RuntimeException(
+            "Failed to update ZK metadata for segment: " + segmentName + " of 
table: " + tableNameWithType);
+      }
+    }
+
+    if (enableParallelPushProtection) {
+      // Release lock.
+      ZNRecord segmentMetadataZnRecord =

Review comment:
       We should not read a new ZNRecord because this might not be the original 
one. We should reuse the `newSegmentZKMetadata`

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
##########
@@ -222,8 +222,16 @@ private void checkCRC(HttpHeaders headers, String 
offlineTableName, String segme
 
   private void processNewSegment(SegmentMetadata segmentMetadata, URI 
finalSegmentLocationURI,
       File currentSegmentLocation, String zkDownloadURI, HttpHeaders headers, 
String crypter, String tableNameWithType,
-      String segmentName, boolean moveSegmentToFinalLocation)
+      String segmentName, boolean moveSegmentToFinalLocation, boolean 
enableParallelPushProtection)
       throws Exception {
+    SegmentZKMetadata newSegmentZKMetadata = _pinotHelixResourceManager
+        .constructZkMetadataForNewSegment(tableNameWithType, segmentMetadata, 
zkDownloadURI, crypter,
+            enableParallelPushProtection);

Review comment:
       Suggest not passing in `enableParallelPushProtection` but check it in 
this method and call `setSegmentUploadStartTime()` within this method to limit 
the scope of parallel push protection

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
##########
@@ -222,8 +222,16 @@ private void checkCRC(HttpHeaders headers, String 
offlineTableName, String segme
 
   private void processNewSegment(SegmentMetadata segmentMetadata, URI 
finalSegmentLocationURI,
       File currentSegmentLocation, String zkDownloadURI, HttpHeaders headers, 
String crypter, String tableNameWithType,
-      String segmentName, boolean moveSegmentToFinalLocation)
+      String segmentName, boolean moveSegmentToFinalLocation, boolean 
enableParallelPushProtection)
       throws Exception {
+    SegmentZKMetadata newSegmentZKMetadata = _pinotHelixResourceManager
+        .constructZkMetadataForNewSegment(tableNameWithType, segmentMetadata, 
zkDownloadURI, crypter,
+            enableParallelPushProtection);
+    if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, 
newSegmentZKMetadata)) {

Review comment:
       Expected version 0 (current version) won't work if there is no existing 
ZK record. I don't know if ZK can create record only if it does not exist. IIRC 
expected version -1 means override anyway. If not, then there is still a race 
condition if 2 uploads happen at the same time and both of them run into this 
method.

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
##########
@@ -232,15 +240,27 @@ private void processNewSegment(SegmentMetadata 
segmentMetadata, URI finalSegment
             .info("Moved segment {} from temp location {} to {}", segmentName, 
currentSegmentLocation.getAbsolutePath(),
                 finalSegmentLocationURI.getPath());
       } catch (Exception e) {
+        // Cleanup the Zk entry and the segment from the permanent directory 
if it exists.
         LOGGER
             .error("Could not move segment {} from table {} to permanent 
directory", segmentName, tableNameWithType, e);
+        _pinotHelixResourceManager.deleteSegment(tableNameWithType, 
segmentName);

Review comment:
       Can we add some tests to ensure this can properly clean up the ZK entry 
and the segment file?

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
##########
@@ -232,15 +240,27 @@ private void processNewSegment(SegmentMetadata 
segmentMetadata, URI finalSegment
             .info("Moved segment {} from temp location {} to {}", segmentName, 
currentSegmentLocation.getAbsolutePath(),
                 finalSegmentLocationURI.getPath());
       } catch (Exception e) {
+        // Cleanup the Zk entry and the segment from the permanent directory 
if it exists.
         LOGGER
             .error("Could not move segment {} from table {} to permanent 
directory", segmentName, tableNameWithType, e);
+        _pinotHelixResourceManager.deleteSegment(tableNameWithType, 
segmentName);
         throw new RuntimeException(e);
       }
     } else {
       LOGGER.info("Skipping segment move, keeping segment {} from table {} at 
{}", segmentName, tableNameWithType,
           zkDownloadURI);
     }
-    _pinotHelixResourceManager.addNewSegment(tableNameWithType, 
segmentMetadata, zkDownloadURI, crypter);
+
+    try {
+      _pinotHelixResourceManager.assignTableSegment(tableNameWithType, 
segmentMetadata.getName());
+    } catch (Exception e) {
+      // assignTableSegment removes the zk entry. Call deleteSegment to remove 
the segment from permanent location.
+      LOGGER
+          .error("Caught exception while calling assignTableSegment for adding 
segment: {} to table: {}", segmentName,
+              tableNameWithType, e);
+      _pinotHelixResourceManager.deleteSegment(tableNameWithType, segmentName);
+      throw new RuntimeException(e);
+    }
 
     // Update zk metadata customer map
     String segmentZKMetadataCustomMapModifierStr = headers != null ? headers

Review comment:
       This modification should be applied when creating the initial metadata




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to