Jackie-Jiang commented on a change in pull request #6567:
URL: https://github.com/apache/incubator-pinot/pull/6567#discussion_r599978452



##########
File path: 
pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
##########
@@ -551,6 +554,30 @@ public SimpleHttpResponse uploadSegment(URI uri, String 
segmentName, File segmen
     return uploadSegment(uri, segmentName, segmentFile, null, parameters, 
DEFAULT_SOCKET_TIMEOUT_MS);
   }
 
+  /**
+   * Upload segment with segment file using default settings. Include table 
name and type as a request parameters.
+   *
+   * @param uri URI
+   * @param segmentName Segment name
+   * @param segmentFile Segment file
+   * @param tableName Table name with or without type suffix
+   * @param tableType Table type
+   * @return Response
+   * @throws IOException
+   * @throws HttpErrorStatusException
+   */
+  public SimpleHttpResponse uploadSegment(URI uri, String segmentName, File 
segmentFile, String tableName,
+      TableType tableType)
+      throws IOException, HttpErrorStatusException {
+    // Add table name and type request parameters
+    NameValuePair tableNameValuePair = new 
BasicNameValuePair(QueryParameters.TABLE_NAME, tableName);
+    NameValuePair tableTypeValuePair = new 
BasicNameValuePair(QueryParameters.TABLE_TYPE, tableType.name());
+    List<NameValuePair> parameters = new ArrayList<>();

Review comment:
       (nit) Can be simplified to `Arrays.asList(tableNameValuePair, 
tableTypeValuePair)`

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
##########
@@ -247,24 +248,35 @@ private SuccessResponse uploadSegment(@Nullable String 
tableName, FormDataMultiP
         LOGGER.info("Uploading a segment {} to table: {}, push type {}, 
(Derived from segment metadata)", segmentName, tableName, uploadType);
       }
 
-      String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
+      String tableNameWithType;
+      if (tableType == TableType.OFFLINE) {
+        tableNameWithType = 
TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
+      } else {
+        if (!_pinotHelixResourceManager.isUpsertTable(rawTableName)) {
+          throw new UnsupportedOperationException(
+              "Upload segment to non-upsert realtime table is not supported " 
+ rawTableName);
+        }
+        tableNameWithType = 
TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
+      }
+
       String clientAddress = 
InetAddress.getByName(request.getRemoteAddr()).getHostName();
       LOGGER.info("Processing upload request for segment: {} of table: {} from 
client: {}, ingestion descriptor: {}",
-          segmentName, offlineTableName, clientAddress, ingestionDescriptor);
+          segmentName, tableNameWithType, clientAddress, ingestionDescriptor);
 
-      // Skip segment validation if upload only segment metadata
-      if (uploadType != FileUploadDownloadClient.FileUploadType.METADATA) {
+      // Skip segment validation if upload only segment metadata or it is a 
realtime table segment.
+      if (uploadType != FileUploadDownloadClient.FileUploadType.METADATA && 
TableNameBuilder.isOfflineTableResource(tableNameWithType)) {

Review comment:
       (nit)
   ```suggestion
         if (tableType == TableType.OFFLINE && uploadType != 
FileUploadDownloadClient.FileUploadType.METADATA) {
   ```

##########
File path: 
pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java
##########
@@ -25,10 +25,16 @@
 import java.util.List;
 import java.util.Map;
 
+import java.util.UUID;

Review comment:
       Revert the changes to this file

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
##########
@@ -35,24 +33,24 @@
   private ZKMetadataUtils() {
   }
 
-  public static void updateSegmentMetadata(OfflineSegmentZKMetadata 
offlineSegmentZKMetadata,
-      SegmentMetadata segmentMetadata) {
-    offlineSegmentZKMetadata.setSegmentName(segmentMetadata.getName());
-    offlineSegmentZKMetadata.setTableName(segmentMetadata.getTableName());
-    offlineSegmentZKMetadata.setIndexVersion(segmentMetadata.getVersion());
-    offlineSegmentZKMetadata.setSegmentType(SegmentType.OFFLINE);
+  public static void updateSegmentMetadata(SegmentZKMetadata segmentZKMetadata,
+                                           SegmentMetadata segmentMetadata, 
SegmentType segmentType) {

Review comment:
       Reformat

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1642,70 +1644,106 @@ public PinotResourceManagerResponse 
toggleTableState(String tableNameWithType, S
     return instanceSet;
   }
 
-  public void addNewSegment(String tableName, SegmentMetadata segmentMetadata, 
String downloadUrl) {
-    addNewSegment(tableName, segmentMetadata, downloadUrl, null);
+  public void addNewSegment(String tableNameWithType, SegmentMetadata 
segmentMetadata, String downloadUrl) {
+    addNewSegment(tableNameWithType, segmentMetadata, downloadUrl, null);
   }
 
-  public void addNewSegment(String tableName, SegmentMetadata segmentMetadata, 
String downloadUrl,
+  public void addNewSegment(String tableNameWithType, SegmentMetadata 
segmentMetadata, String downloadUrl,
       @Nullable String crypter) {
     String segmentName = segmentMetadata.getName();
-    String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(tableName);
-
+    InstancePartitionsType instancePartitionsType;
     // NOTE: must first set the segment ZK metadata before assigning segment 
to instances because segment assignment
     // might need them to determine the partition of the segment, and server 
will need them to download the segment
-    OfflineSegmentZKMetadata offlineSegmentZKMetadata = new 
OfflineSegmentZKMetadata();
-    ZKMetadataUtils.updateSegmentMetadata(offlineSegmentZKMetadata, 
segmentMetadata);
-    offlineSegmentZKMetadata.setDownloadUrl(downloadUrl);
-    offlineSegmentZKMetadata.setCrypterName(crypter);
-    offlineSegmentZKMetadata.setPushTime(System.currentTimeMillis());
+    ZNRecord znRecord;
+    if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
+      Preconditions.checkState(isUpsertTable(tableNameWithType),
+          "Upload segment " + segmentName + " for non upsert enabled realtime 
table " + tableNameWithType
+              + " is not supported");
+      instancePartitionsType = InstancePartitionsType.CONSUMING;
+      // Build the realtime segment zk metadata with necessary fields.
+      LLCRealtimeSegmentZKMetadata segmentZKMetadata = new 
LLCRealtimeSegmentZKMetadata();
+      ZKMetadataUtils
+          .updateSegmentMetadata(segmentZKMetadata, segmentMetadata, 
CommonConstants.Segment.SegmentType.REALTIME);
+      segmentZKMetadata.setDownloadUrl(downloadUrl);
+      segmentZKMetadata.setCrypterName(crypter);
+      
segmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.UPLOADED);
+      znRecord = segmentZKMetadata.toZNRecord();
+    } else {
+      instancePartitionsType = InstancePartitionsType.OFFLINE;
+      // Build the offline segment zk metadata with necessary fields.
+      OfflineSegmentZKMetadata segmentZKMetadata = new 
OfflineSegmentZKMetadata();
+      ZKMetadataUtils
+          .updateSegmentMetadata(segmentZKMetadata, segmentMetadata, 
CommonConstants.Segment.SegmentType.OFFLINE);
+      segmentZKMetadata.setDownloadUrl(downloadUrl);
+      segmentZKMetadata.setCrypterName(crypter);
+      segmentZKMetadata.setPushTime(System.currentTimeMillis());
+      znRecord = segmentZKMetadata.toZNRecord();
+    }
     String segmentZKMetadataPath =
-        
ZKMetadataProvider.constructPropertyStorePathForSegment(offlineTableName, 
segmentName);
+        
ZKMetadataProvider.constructPropertyStorePathForSegment(tableNameWithType, 
segmentName);
     Preconditions.checkState(
-        _propertyStore.set(segmentZKMetadataPath, 
offlineSegmentZKMetadata.toZNRecord(), AccessOption.PERSISTENT),
-        "Failed to set segment ZK metadata for table: " + offlineTableName + 
", segment: " + segmentName);
-    LOGGER.info("Added segment: {} of table: {} to property store", 
segmentName, offlineTableName);
+        _propertyStore.set(segmentZKMetadataPath, znRecord, 
AccessOption.PERSISTENT),
+        "Failed to set segment ZK metadata for table: " + tableNameWithType + 
", segment: " + segmentName);
+    LOGGER.info("Added segment: {} of table: {} to property store", 
segmentName, tableNameWithType);
+    assignTableSegment(tableNameWithType, segmentName, segmentZKMetadataPath, 
instancePartitionsType);
+  }
+
 
+  private void assignTableSegment(String tableNameWithType, String 
segmentName, String segmentZKMetadataPath,
+      InstancePartitionsType instancePartitionsType) {
     // Assign instances for the segment and add it into IdealState
     try {
-      TableConfig offlineTableConfig = getTableConfig(offlineTableName);
+      TableConfig tableConfig = getTableConfig(tableNameWithType);
       Preconditions
-          .checkState(offlineTableConfig != null, "Failed to find table config 
for table: " + offlineTableName);
+          .checkState(tableConfig != null, "Failed to find table config for 
table: " + tableNameWithType);
       SegmentAssignment segmentAssignment =
-          SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager, 
offlineTableConfig);
+          SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager, 
tableConfig);
       Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = 
Collections
-          .singletonMap(InstancePartitionsType.OFFLINE, InstancePartitionsUtils
-              .fetchOrComputeInstancePartitions(_helixZkManager, 
offlineTableConfig, InstancePartitionsType.OFFLINE));
-      synchronized (getTableUpdaterLock(offlineTableName)) {
-        HelixHelper.updateIdealState(_helixZkManager, offlineTableName, 
idealState -> {
+          .singletonMap(instancePartitionsType, InstancePartitionsUtils
+              .fetchOrComputeInstancePartitions(_helixZkManager, tableConfig, 
instancePartitionsType));
+      synchronized (getTableUpdaterLock(tableNameWithType)) {
+        HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, 
idealState -> {
           assert idealState != null;
           Map<String, Map<String, String>> currentAssignment = 
idealState.getRecord().getMapFields();
           if (currentAssignment.containsKey(segmentName)) {
             LOGGER.warn("Segment: {} already exists in the IdealState for 
table: {}, do not update", segmentName,
-                offlineTableName);
+                tableNameWithType);
           } else {
             List<String> assignedInstances = 
segmentAssignment.assignSegment(segmentName, currentAssignment, 
instancePartitionsMap);
             LOGGER.info("Assigning segment: {} to instances: {} for table: 
{}", segmentName, assignedInstances,
-                offlineTableName);
-            currentAssignment.put(segmentName, 
SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, 
SegmentStateModel.ONLINE));
+                tableNameWithType);
+            currentAssignment.put(segmentName, SegmentAssignmentUtils
+                .getInstanceStateMap(assignedInstances, 
SegmentStateModel.ONLINE));
           }
           return idealState;
         });
-        LOGGER.info("Added segment: {} to IdealState for table: {}", 
segmentName, offlineTableName);
+        LOGGER.info("Added segment: {} to IdealState for table: {}", 
segmentName, tableNameWithType);
       }
     } catch (Exception e) {
       LOGGER
           .error("Caught exception while adding segment: {} to IdealState for 
table: {}, deleting segment ZK metadata",
-              segmentName, offlineTableName, e);
+              segmentName, tableNameWithType, e);
       if (_propertyStore.remove(segmentZKMetadataPath, 
AccessOption.PERSISTENT)) {
-        LOGGER.info("Deleted segment ZK metadata for segment: {} of table: 
{}", segmentName, offlineTableName);
+        LOGGER.info("Deleted segment ZK metadata for segment: {} of table: 
{}", segmentName, tableNameWithType);
       } else {
         LOGGER
-            .error("Failed to deleted segment ZK metadata for segment: {} of 
table: {}", segmentName, offlineTableName);
+            .error("Failed to deleted segment ZK metadata for segment: {} of 
table: {}", segmentName, tableNameWithType);
       }
       throw e;
     }
   }
 
+  public boolean isUpsertTable(String tableName) {
+    if (hasOfflineTable(tableName))
+      return false;
+    if (!hasRealtimeTable(tableName))
+      return false;
+
+    UpsertConfig upsertConfig =

Review comment:
       Check if table config is `null` before calling `getUpsertConfig()`.
   I think we can skip the table existence check. We should not return `false` 
when offline table exist because we will still enable upsert even though it 
might not work. The table config null check can check the existence of the 
realtime table.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to