chenboat commented on a change in pull request #6567:
URL: https://github.com/apache/incubator-pinot/pull/6567#discussion_r599980587



##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
##########
@@ -57,41 +57,37 @@ public ZKOperator(PinotHelixResourceManager 
pinotHelixResourceManager, Controlle
     _controllerMetrics = controllerMetrics;
   }
 
-  public void completeSegmentOperations(String rawTableName, SegmentMetadata 
segmentMetadata,
+  public void completeSegmentOperations(String tableNameWithType, 
SegmentMetadata segmentMetadata,
       URI finalSegmentLocationURI, File currentSegmentLocation, boolean 
enableParallelPushProtection,
       HttpHeaders headers, String zkDownloadURI, boolean 
moveSegmentToFinalLocation, String crypter)
       throws Exception {
-    String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
     String segmentName = segmentMetadata.getName();
-
-    // Brand new segment, not refresh, directly add the segment
-    ZNRecord segmentMetadataZnRecord =
-        
_pinotHelixResourceManager.getSegmentMetadataZnRecord(offlineTableName, 
segmentName);
+    ZNRecord segmentMetadataZnRecord = 
_pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType, 
segmentName);
     if (segmentMetadataZnRecord == null) {
-      LOGGER.info("Adding new segment {} from table {}", segmentName, 
rawTableName);
+      LOGGER.info("Adding new segment {} from table {}", segmentName, 
tableNameWithType);
       processNewSegment(segmentMetadata, finalSegmentLocationURI, 
currentSegmentLocation, zkDownloadURI, crypter,
-          rawTableName, segmentName, moveSegmentToFinalLocation);
+          tableNameWithType, segmentName, moveSegmentToFinalLocation);
       return;
     }
 
-    LOGGER.info("Segment {} from table {} already exists, refreshing if 
necessary", segmentName, rawTableName);
+    LOGGER.info("Segment {} from table {} already exists, refreshing if 
necessary", segmentName, tableNameWithType);
 
     processExistingSegment(segmentMetadata, finalSegmentLocationURI, 
currentSegmentLocation,
-        enableParallelPushProtection, headers, zkDownloadURI, crypter, 
offlineTableName, segmentName,
+        enableParallelPushProtection, headers, zkDownloadURI, crypter, 
tableNameWithType, segmentName,
         segmentMetadataZnRecord, moveSegmentToFinalLocation);
   }
 
   private void processExistingSegment(SegmentMetadata segmentMetadata, URI 
finalSegmentLocationURI,
       File currentSegmentLocation, boolean enableParallelPushProtection, 
HttpHeaders headers, String zkDownloadURI,
-      String crypter, String offlineTableName, String segmentName, ZNRecord 
znRecord,
+      String crypter, String tableNameWithType, String segmentName, ZNRecord 
znRecord,
       boolean moveSegmentToFinalLocation)
       throws Exception {
 
     OfflineSegmentZKMetadata existingSegmentZKMetadata = new 
OfflineSegmentZKMetadata(znRecord);

Review comment:
       for processExistingSegment(), I think we should leave it to another PR 
and let this PR focus on adding new segments. For now, this PR will reject 
upload segments of the same name. Notice that for our Upsert table use case, 
uploading new segments would be sufficient. I will have a follow up PR for 
refreshing existing segments.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to