Jackie-Jiang commented on a change in pull request #6567:
URL: https://github.com/apache/incubator-pinot/pull/6567#discussion_r598967441



##########
File path: 
pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java
##########
@@ -118,6 +125,47 @@ public static void fetchSegmentToLocal(String uri, File 
dest)
     fetchSegmentToLocal(new URI(uri), dest);
   }
 
+  /**
+   * Fetches a segment from a given URI and untar the segment file to the dest 
dir (i.e., tableDataDir + segmentName).
+   */
+  public static void fetchAndUntarSegmentToLocal(String uri, File 
tableDataDir, String segmentName)

Review comment:
       Why do we need this method? There is already an `untarAndMoveSegment()` 
within the `RealtimeTableDataManager` class. We should be able to use that

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
##########
@@ -57,41 +57,37 @@ public ZKOperator(PinotHelixResourceManager 
pinotHelixResourceManager, Controlle
     _controllerMetrics = controllerMetrics;
   }
 
-  public void completeSegmentOperations(String rawTableName, SegmentMetadata 
segmentMetadata,
+  public void completeSegmentOperations(String tableNameWithType, 
SegmentMetadata segmentMetadata,
       URI finalSegmentLocationURI, File currentSegmentLocation, boolean 
enableParallelPushProtection,
       HttpHeaders headers, String zkDownloadURI, boolean 
moveSegmentToFinalLocation, String crypter)
       throws Exception {
-    String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
     String segmentName = segmentMetadata.getName();
-
-    // Brand new segment, not refresh, directly add the segment
-    ZNRecord segmentMetadataZnRecord =
-        
_pinotHelixResourceManager.getSegmentMetadataZnRecord(offlineTableName, 
segmentName);
+    ZNRecord segmentMetadataZnRecord = 
_pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType, 
segmentName);
     if (segmentMetadataZnRecord == null) {
-      LOGGER.info("Adding new segment {} from table {}", segmentName, 
rawTableName);
+      LOGGER.info("Adding new segment {} from table {}", segmentName, 
tableNameWithType);
       processNewSegment(segmentMetadata, finalSegmentLocationURI, 
currentSegmentLocation, zkDownloadURI, crypter,
-          rawTableName, segmentName, moveSegmentToFinalLocation);
+          tableNameWithType, segmentName, moveSegmentToFinalLocation);
       return;
     }
 
-    LOGGER.info("Segment {} from table {} already exists, refreshing if 
necessary", segmentName, rawTableName);
+    LOGGER.info("Segment {} from table {} already exists, refreshing if 
necessary", segmentName, tableNameWithType);
 
     processExistingSegment(segmentMetadata, finalSegmentLocationURI, 
currentSegmentLocation,
-        enableParallelPushProtection, headers, zkDownloadURI, crypter, 
offlineTableName, segmentName,
+        enableParallelPushProtection, headers, zkDownloadURI, crypter, 
tableNameWithType, segmentName,
         segmentMetadataZnRecord, moveSegmentToFinalLocation);
   }
 
   private void processExistingSegment(SegmentMetadata segmentMetadata, URI 
finalSegmentLocationURI,
       File currentSegmentLocation, boolean enableParallelPushProtection, 
HttpHeaders headers, String zkDownloadURI,
-      String crypter, String offlineTableName, String segmentName, ZNRecord 
znRecord,
+      String crypter, String tableNameWithType, String segmentName, ZNRecord 
znRecord,
       boolean moveSegmentToFinalLocation)
       throws Exception {
 
     OfflineSegmentZKMetadata existingSegmentZKMetadata = new 
OfflineSegmentZKMetadata(znRecord);

Review comment:
       (Critical) For real-time segment, this should not be 
`OfflineSegmentZKMetadata`, where the `status` and `offset` info are lost

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
##########
@@ -19,6 +19,7 @@
 package org.apache.pinot.controller.helix.core.assignment.segment;
 
 import com.google.common.base.Preconditions;
+

Review comment:
       (nit) reformat

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
##########
@@ -403,10 +414,13 @@ private void decryptFile(String crypterClassName, File 
tempEncryptedFile, File t
   // it keeps it at the downloadURI header that is set. We will not support 
this endpoint going forward.
   public void uploadSegmentAsJson(String segmentJsonStr,
       @ApiParam(value = "Name of the table") 
@QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_NAME) String 
tableName,
+      @ApiParam(value = "Type of the table") 
@QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_TYPE) 
@DefaultValue("OFFLINE") String tableType,
       @ApiParam(value = "Whether to enable parallel push protection") 
@DefaultValue("false") 
@QueryParam(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION)
 boolean enableParallelPushProtection,
       @Context HttpHeaders headers, @Context Request request, @Suspended final 
AsyncResponse asyncResponse) {
     try {
-      asyncResponse.resume(uploadSegment(tableName, null, 
enableParallelPushProtection, headers, request, false));
+      asyncResponse.resume(
+          uploadSegment(tableName, "OFFLINE".equalsIgnoreCase(tableType) ? 
TableType.OFFLINE : TableType.REALTIME, null,

Review comment:
       Use `TableType.valueOf(tableType.toUpperCase())`? We don't want to 
upload segment to real-time table when getting random `tableType`. Same for 
other places




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to