chenboat commented on a change in pull request #6567: URL: https://github.com/apache/incubator-pinot/pull/6567#discussion_r599980587
########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java ########## @@ -57,41 +57,37 @@ public ZKOperator(PinotHelixResourceManager pinotHelixResourceManager, Controlle _controllerMetrics = controllerMetrics; } - public void completeSegmentOperations(String rawTableName, SegmentMetadata segmentMetadata, + public void completeSegmentOperations(String tableNameWithType, SegmentMetadata segmentMetadata, URI finalSegmentLocationURI, File currentSegmentLocation, boolean enableParallelPushProtection, HttpHeaders headers, String zkDownloadURI, boolean moveSegmentToFinalLocation, String crypter) throws Exception { - String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(rawTableName); String segmentName = segmentMetadata.getName(); - - // Brand new segment, not refresh, directly add the segment - ZNRecord segmentMetadataZnRecord = - _pinotHelixResourceManager.getSegmentMetadataZnRecord(offlineTableName, segmentName); + ZNRecord segmentMetadataZnRecord = _pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType, segmentName); if (segmentMetadataZnRecord == null) { - LOGGER.info("Adding new segment {} from table {}", segmentName, rawTableName); + LOGGER.info("Adding new segment {} from table {}", segmentName, tableNameWithType); processNewSegment(segmentMetadata, finalSegmentLocationURI, currentSegmentLocation, zkDownloadURI, crypter, - rawTableName, segmentName, moveSegmentToFinalLocation); + tableNameWithType, segmentName, moveSegmentToFinalLocation); return; } - LOGGER.info("Segment {} from table {} already exists, refreshing if necessary", segmentName, rawTableName); + LOGGER.info("Segment {} from table {} already exists, refreshing if necessary", segmentName, tableNameWithType); processExistingSegment(segmentMetadata, finalSegmentLocationURI, currentSegmentLocation, - enableParallelPushProtection, headers, zkDownloadURI, crypter, offlineTableName, segmentName, + enableParallelPushProtection, headers, zkDownloadURI, crypter, tableNameWithType, segmentName, segmentMetadataZnRecord, moveSegmentToFinalLocation); } private void processExistingSegment(SegmentMetadata segmentMetadata, URI finalSegmentLocationURI, File currentSegmentLocation, boolean enableParallelPushProtection, HttpHeaders headers, String zkDownloadURI, - String crypter, String offlineTableName, String segmentName, ZNRecord znRecord, + String crypter, String tableNameWithType, String segmentName, ZNRecord znRecord, boolean moveSegmentToFinalLocation) throws Exception { OfflineSegmentZKMetadata existingSegmentZKMetadata = new OfflineSegmentZKMetadata(znRecord); Review comment: for processExistingSegment(), I think we should leave it to another PR and let this PR focus on adding new segments. For now, this PR will reject upload segments of the same name. Notice that for our Upsert table use case, uploading new segments would be sufficient. I will have a follow up PR for refreshing existing segments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org