Jackie-Jiang commented on a change in pull request #6567: URL: https://github.com/apache/incubator-pinot/pull/6567#discussion_r598967441
########## File path: pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java ########## @@ -118,6 +125,47 @@ public static void fetchSegmentToLocal(String uri, File dest) fetchSegmentToLocal(new URI(uri), dest); } + /** + * Fetches a segment from a given URI and untar the segment file to the dest dir (i.e., tableDataDir + segmentName). + */ + public static void fetchAndUntarSegmentToLocal(String uri, File tableDataDir, String segmentName) Review comment: Why do we need this method? There is already an `untarAndMoveSegment()` within the `RealtimeTableDataManager` class. We should be able to use that ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java ########## @@ -57,41 +57,37 @@ public ZKOperator(PinotHelixResourceManager pinotHelixResourceManager, Controlle _controllerMetrics = controllerMetrics; } - public void completeSegmentOperations(String rawTableName, SegmentMetadata segmentMetadata, + public void completeSegmentOperations(String tableNameWithType, SegmentMetadata segmentMetadata, URI finalSegmentLocationURI, File currentSegmentLocation, boolean enableParallelPushProtection, HttpHeaders headers, String zkDownloadURI, boolean moveSegmentToFinalLocation, String crypter) throws Exception { - String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(rawTableName); String segmentName = segmentMetadata.getName(); - - // Brand new segment, not refresh, directly add the segment - ZNRecord segmentMetadataZnRecord = - _pinotHelixResourceManager.getSegmentMetadataZnRecord(offlineTableName, segmentName); + ZNRecord segmentMetadataZnRecord = _pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType, segmentName); if (segmentMetadataZnRecord == null) { - LOGGER.info("Adding new segment {} from table {}", segmentName, rawTableName); + LOGGER.info("Adding new segment {} from table {}", segmentName, tableNameWithType); processNewSegment(segmentMetadata, finalSegmentLocationURI, currentSegmentLocation, zkDownloadURI, crypter, - rawTableName, segmentName, moveSegmentToFinalLocation); + tableNameWithType, segmentName, moveSegmentToFinalLocation); return; } - LOGGER.info("Segment {} from table {} already exists, refreshing if necessary", segmentName, rawTableName); + LOGGER.info("Segment {} from table {} already exists, refreshing if necessary", segmentName, tableNameWithType); processExistingSegment(segmentMetadata, finalSegmentLocationURI, currentSegmentLocation, - enableParallelPushProtection, headers, zkDownloadURI, crypter, offlineTableName, segmentName, + enableParallelPushProtection, headers, zkDownloadURI, crypter, tableNameWithType, segmentName, segmentMetadataZnRecord, moveSegmentToFinalLocation); } private void processExistingSegment(SegmentMetadata segmentMetadata, URI finalSegmentLocationURI, File currentSegmentLocation, boolean enableParallelPushProtection, HttpHeaders headers, String zkDownloadURI, - String crypter, String offlineTableName, String segmentName, ZNRecord znRecord, + String crypter, String tableNameWithType, String segmentName, ZNRecord znRecord, boolean moveSegmentToFinalLocation) throws Exception { OfflineSegmentZKMetadata existingSegmentZKMetadata = new OfflineSegmentZKMetadata(znRecord); Review comment: (Critical) For real-time segment, this should not be `OfflineSegmentZKMetadata`, where the `status` and `offset` info are lost ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java ########## @@ -19,6 +19,7 @@ package org.apache.pinot.controller.helix.core.assignment.segment; import com.google.common.base.Preconditions; + Review comment: (nit) reformat ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java ########## @@ -403,10 +414,13 @@ private void decryptFile(String crypterClassName, File tempEncryptedFile, File t // it keeps it at the downloadURI header that is set. We will not support this endpoint going forward. public void uploadSegmentAsJson(String segmentJsonStr, @ApiParam(value = "Name of the table") @QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_NAME) String tableName, + @ApiParam(value = "Type of the table") @QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_TYPE) @DefaultValue("OFFLINE") String tableType, @ApiParam(value = "Whether to enable parallel push protection") @DefaultValue("false") @QueryParam(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION) boolean enableParallelPushProtection, @Context HttpHeaders headers, @Context Request request, @Suspended final AsyncResponse asyncResponse) { try { - asyncResponse.resume(uploadSegment(tableName, null, enableParallelPushProtection, headers, request, false)); + asyncResponse.resume( + uploadSegment(tableName, "OFFLINE".equalsIgnoreCase(tableType) ? TableType.OFFLINE : TableType.REALTIME, null, Review comment: Use `TableType.valueOf(tableType.toUpperCase())`? We don't want to upload segment to real-time table when getting random `tableType`. Same for other places -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org