fx19880617 commented on a change in pull request #6567:
URL: https://github.com/apache/incubator-pinot/pull/6567#discussion_r593726121



##########
File path: 
pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java
##########
@@ -118,6 +125,47 @@ public static void fetchSegmentToLocal(String uri, File 
dest)
     fetchSegmentToLocal(new URI(uri), dest);
   }
 
+  /**
+   * Fetches a segment from a given URI and untar the segment file to the dest 
dir (i.e., tableDataDir + segmentName).
+   */
+  public static void fetchAndUtarSegmentToLocal(String uri, File tableDataDir, 
String segmentName)

Review comment:
       nit: fetchAndUntarSegmentToLocal

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
##########
@@ -85,15 +92,19 @@
   private HelixManager _helixManager;
   private String _realtimeTableName;
   private int _replication;
+  private String _partitionColumn;
 
   @Override
   public void init(HelixManager helixManager, TableConfig tableConfig) {
     _helixManager = helixManager;
     _realtimeTableName = tableConfig.getTableName();
     _replication = 
tableConfig.getValidationConfig().getReplicasPerPartitionNumber();
+    ReplicaGroupStrategyConfig replicaGroupStrategyConfig =
+            tableConfig.getValidationConfig().getReplicaGroupStrategyConfig();
+    _partitionColumn = replicaGroupStrategyConfig != null ? 
replicaGroupStrategyConfig.getPartitionColumn() : null;
 
-    LOGGER.info("Initialized RealtimeSegmentAssignment with replication: {} 
for table: {}", _replication,
-        _realtimeTableName);
+    LOGGER.info("Initialized RealtimeSegmentAssignment with replication: {} 
partitionColumn: {} for table: {}",

Review comment:
       nit: `with replication: {}, partitionColumn: {}`

##########
File path: 
pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
##########
@@ -358,7 +358,7 @@
   public static class Segment {
     public static class Realtime {
       public enum Status {
-        IN_PROGRESS, DONE
+        IN_PROGRESS, DONE, UPLOAD

Review comment:
       Do you think if we need to have separated status for `UPLOAD_UPSERT` and 
`UPLOAD_APPEND`?

##########
File path: 
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
##########
@@ -285,7 +285,7 @@ public static void buildSegmentFromAvro(File avroFile, 
TableConfig tableConfig,
     segmentGeneratorConfig.setOutDir(segmentDir.getPath());
     segmentGeneratorConfig.setTableName(tableConfig.getTableName());
     // Test segment with space and special character in the file name
-    segmentGeneratorConfig.setSegmentNamePostfix(segmentIndex + " %");
+    
segmentGeneratorConfig.setSegmentNamePostfix(Integer.toString(segmentIndex));

Review comment:
       why change this?

##########
File path: pinot-integration-tests/src/test/resources/upsert_table_test.schema
##########
@@ -0,0 +1,33 @@
+{
+  "dimensionFieldSpecs": [
+    {
+      "dataType": "INT",
+      "singleValueField": true,
+      "name": "clientId"
+    },
+    {
+      "dataType": "STRING",
+      "singleValueField": true,
+      "name": "city"
+    },
+    {
+      "dataType": "STRING",
+      "singleValueField": true,
+      "name": "description"
+    },
+    {
+      "dataType": "INT",
+      "singleValueField": true,
+      "name": "salary"
+    }
+  ],
+  "timeFieldSpec": {
+    "incomingGranularitySpec": {
+      "timeType": "DAYS",
+      "dataType": "INT",
+      "name": "DaysSinceEpoch"
+    }
+  },
+  "primaryKeyColumns": ["clientId"],
+  "schemaName": "upsertSchema"
+}

Review comment:
       nit: newline

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
##########
@@ -245,21 +245,28 @@ private SuccessResponse uploadSegment(@Nullable String 
tableName, FormDataMultiP
         LOGGER.info("Uploading a segment {} to table: {}, push type {}, 
(Derived from segment metadata)", segmentName, tableName, uploadType);
       }
 
-      String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
+      String tableNameWithType;
+      if (_pinotHelixResourceManager.isRealtimeOnlyTable(rawTableName)) {

Review comment:
       how to upload segments to a real-time table part for a hybrid table?

##########
File path: 
pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java
##########
@@ -118,6 +125,47 @@ public static void fetchSegmentToLocal(String uri, File 
dest)
     fetchSegmentToLocal(new URI(uri), dest);
   }
 
+  /**
+   * Fetches a segment from a given URI and untar the segment file to the dest 
dir (i.e., tableDataDir + segmentName).
+   */
+  public static void fetchAndUtarSegmentToLocal(String uri, File tableDataDir, 
String segmentName)
+      throws Exception {
+    File tempDir = new File(tableDataDir, "tmp-" + segmentName + "-" + 
UUID.randomUUID());
+    FileUtils.forceMkdir(tempDir);
+    File tempTarFile = new File(tempDir, segmentName + TAR_GZ_SUFFIX);
+    File tempSegmentDir = new File(tempDir, segmentName);
+    try {
+      try {
+        SegmentFetcherFactory.fetchSegmentToLocal(uri, tempTarFile);
+        LOGGER.info("Downloaded tarred segment: {} from: {} to: {}, file 
length: {}", segmentName, uri, tempTarFile,
+            tempTarFile.length());
+      } catch (AttemptsExceededException e) {
+        LOGGER.error("Attempts exceeded when downloading segment: {} : {} 
from: {} to: {}", segmentName, uri,

Review comment:
       Log the error `LOGGER.error(..., e)`?

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
##########
@@ -245,21 +245,28 @@ private SuccessResponse uploadSegment(@Nullable String 
tableName, FormDataMultiP
         LOGGER.info("Uploading a segment {} to table: {}, push type {}, 
(Derived from segment metadata)", segmentName, tableName, uploadType);
       }
 
-      String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
+      String tableNameWithType;
+      if (_pinotHelixResourceManager.isUpsertTable(rawTableName)) {

Review comment:
       Shall we throw an exception on the non-upsert realtime table?

##########
File path: 
pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java
##########
@@ -118,6 +125,47 @@ public static void fetchSegmentToLocal(String uri, File 
dest)
     fetchSegmentToLocal(new URI(uri), dest);
   }
 
+  /**
+   * Fetches a segment from a given URI and untar the segment file to the dest 
dir (i.e., tableDataDir + segmentName).
+   */
+  public static void fetchAndUtarSegmentToLocal(String uri, File tableDataDir, 
String segmentName)
+      throws Exception {
+    File tempDir = new File(tableDataDir, "tmp-" + segmentName + "-" + 
UUID.randomUUID());
+    FileUtils.forceMkdir(tempDir);
+    File tempTarFile = new File(tempDir, segmentName + TAR_GZ_SUFFIX);
+    File tempSegmentDir = new File(tempDir, segmentName);
+    try {
+      try {
+        SegmentFetcherFactory.fetchSegmentToLocal(uri, tempTarFile);
+        LOGGER.info("Downloaded tarred segment: {} from: {} to: {}, file 
length: {}", segmentName, uri, tempTarFile,
+            tempTarFile.length());
+      } catch (AttemptsExceededException e) {
+        LOGGER.error("Attempts exceeded when downloading segment: {} : {} 
from: {} to: {}", segmentName, uri,
+            tempTarFile);
+        Utils.rethrowException(e);
+      }
+
+      try {
+        // If an exception is thrown when untarring, it means the tar file is 
broken OR not found after the retry.
+        // Thus, there's no need to retry again.
+        File tempIndexDir = TarGzCompressionUtils.untar(tempTarFile, 
tempSegmentDir).get(0);
+        File segmentDir = new File(tableDataDir, segmentName);
+        if (segmentDir.exists()) {
+          LOGGER.info("Deleting existing index directory for segment: {}", 
segmentName);
+          FileUtils.deleteDirectory(segmentDir);
+        }
+        FileUtils.moveDirectory(tempIndexDir, segmentDir);
+        LOGGER.info("Successfully downloaded segment: {} to: {}", segmentName, 
segmentDir);
+      } catch (Exception e) {
+        LOGGER
+            .error("Exception when untarring segment: {} for from {} to {}", 
segmentName, tempTarFile, tempSegmentDir);

Review comment:
       nit: `LOGGER
               .error("Exception when untarring segment: {} from {} to {}", 
segmentName, tempTarFile, tempSegmentDir, e);`

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
##########
@@ -528,4 +542,41 @@ private boolean isValid(Schema schema, IndexingConfig 
indexingConfig) {
     }
     return isValid;
   }
+
+  private int getSegmentPartitionId(String segmentName, String tableName) {
+    // A fast path if the segmentName is a LLC segment name and we can get the 
partition id from the name directly.
+    if (LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)) {
+      return new LLCSegmentName(segmentName).getPartitionId();
+    }
+    // Otherwise, retrieve the partition id from the segment zk metadata. 
Currently only realtime segments from upsert
+    // enabled tables have partition ids in their segment metadata.
+    RealtimeSegmentZKMetadata segmentZKMetadata =
+        
ZKMetadataProvider.getRealtimeSegmentZKMetadata(_helixManager.getHelixPropertyStore(),
 tableName, segmentName);
+    Preconditions.checkState(isUpsertEnabled(),
+        "Only upsert enabled table has partition ids in its segment metadata: 
seg %s of table %s", segmentName,
+        tableName);
+    Preconditions
+        .checkState(segmentZKMetadata != null, "Failed to find segment ZK 
metadata for segment: %s of table: %s",
+            segmentName, tableName);
+    return getSegmentPartitionIdFromZkMetaData(segmentZKMetadata, tableName);
+  }
+
+  private int getSegmentPartitionIdFromZkMetaData(RealtimeSegmentZKMetadata 
segmentZKMetadata, String tableName) {
+    String segmentName = segmentZKMetadata.getSegmentName();
+    Preconditions.checkState(segmentZKMetadata.getPartitionMetadata() != null,
+        "Segment ZK metadata for segment: %s of table: %s does not contain 
partition metadata", segmentName,
+        tableName);
+
+    // Use any primary key column to fetch the partition metadata
+    ColumnPartitionMetadata partitionMetadata =
+        
segmentZKMetadata.getPartitionMetadata().getColumnPartitionMap().get(_primaryKeyColumns.get(0));
+    Preconditions.checkState(partitionMetadata != null,
+        "Segment ZK metadata for segment: %s of table: %s does not contain 
partition metadata for column: %s",
+        segmentName, tableName, _primaryKeyColumns.get(0));
+    Set<Integer> partitions = partitionMetadata.getPartitions();
+    Preconditions.checkState(partitions.size() == 1,
+        "Segment ZK metadata for segment: %s of table: %s contains multiple 
partitions for column: %s", segmentName,

Review comment:
       also dump all the partitions in the error log

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1649,63 +1651,100 @@ public void addNewSegment(String tableName, 
SegmentMetadata segmentMetadata, Str
   public void addNewSegment(String tableName, SegmentMetadata segmentMetadata, 
String downloadUrl,
       @Nullable String crypter) {
     String segmentName = segmentMetadata.getName();
-    String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(tableName);
-
+    String tableNameWithType;
+    InstancePartitionsType instancePartitionsType;
     // NOTE: must first set the segment ZK metadata before assigning segment 
to instances because segment assignment
     // might need them to determine the partition of the segment, and server 
will need them to download the segment
-    OfflineSegmentZKMetadata offlineSegmentZKMetadata = new 
OfflineSegmentZKMetadata();
-    ZKMetadataUtils.updateSegmentMetadata(offlineSegmentZKMetadata, 
segmentMetadata);
-    offlineSegmentZKMetadata.setDownloadUrl(downloadUrl);
-    offlineSegmentZKMetadata.setCrypterName(crypter);
-    offlineSegmentZKMetadata.setPushTime(System.currentTimeMillis());
+    ZNRecord znRecord;
+
+    if (isUpsertTable(tableName)) {

Review comment:
       shall we also add a check and throw an exception for a REALTIME 
non-upsert table?

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1649,63 +1651,100 @@ public void addNewSegment(String tableName, 
SegmentMetadata segmentMetadata, Str
   public void addNewSegment(String tableName, SegmentMetadata segmentMetadata, 
String downloadUrl,
       @Nullable String crypter) {
     String segmentName = segmentMetadata.getName();
-    String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(tableName);
-
+    String tableNameWithType;
+    InstancePartitionsType instancePartitionsType;
     // NOTE: must first set the segment ZK metadata before assigning segment 
to instances because segment assignment
     // might need them to determine the partition of the segment, and server 
will need them to download the segment
-    OfflineSegmentZKMetadata offlineSegmentZKMetadata = new 
OfflineSegmentZKMetadata();
-    ZKMetadataUtils.updateSegmentMetadata(offlineSegmentZKMetadata, 
segmentMetadata);
-    offlineSegmentZKMetadata.setDownloadUrl(downloadUrl);
-    offlineSegmentZKMetadata.setCrypterName(crypter);
-    offlineSegmentZKMetadata.setPushTime(System.currentTimeMillis());
+    ZNRecord znRecord;
+
+    if (isUpsertTable(tableName)) {
+      tableNameWithType = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+      instancePartitionsType = InstancePartitionsType.CONSUMING;
+      // Build the realtime segment zk metadata with necessary fields.
+      LLCRealtimeSegmentZKMetadata segmentZKMetadata = new 
LLCRealtimeSegmentZKMetadata();
+      ZKMetadataUtils
+          .updateSegmentMetadata(segmentZKMetadata, segmentMetadata, 
CommonConstants.Segment.SegmentType.REALTIME);
+      segmentZKMetadata.setDownloadUrl(downloadUrl);
+      segmentZKMetadata.setCrypterName(crypter);
+      
segmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.UPLOAD);
+      znRecord = segmentZKMetadata.toZNRecord();
+    } else {
+      tableNameWithType = 
TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+      instancePartitionsType = InstancePartitionsType.OFFLINE;
+      // Build the offline segment zk metadata with necessary fields.
+      OfflineSegmentZKMetadata segmentZKMetadata = new 
OfflineSegmentZKMetadata();
+      ZKMetadataUtils
+          .updateSegmentMetadata(segmentZKMetadata, segmentMetadata, 
CommonConstants.Segment.SegmentType.OFFLINE);
+      segmentZKMetadata.setDownloadUrl(downloadUrl);
+      segmentZKMetadata.setCrypterName(crypter);
+      segmentZKMetadata.setPushTime(System.currentTimeMillis());
+      znRecord = segmentZKMetadata.toZNRecord();
+    }
     String segmentZKMetadataPath =
-        
ZKMetadataProvider.constructPropertyStorePathForSegment(offlineTableName, 
segmentName);
+        
ZKMetadataProvider.constructPropertyStorePathForSegment(tableNameWithType, 
segmentName);
     Preconditions.checkState(
-        _propertyStore.set(segmentZKMetadataPath, 
offlineSegmentZKMetadata.toZNRecord(), AccessOption.PERSISTENT),
-        "Failed to set segment ZK metadata for table: " + offlineTableName + 
", segment: " + segmentName);
-    LOGGER.info("Added segment: {} of table: {} to property store", 
segmentName, offlineTableName);
+        _propertyStore.set(segmentZKMetadataPath, znRecord, 
AccessOption.PERSISTENT),
+        "Failed to set segment ZK metadata for table: " + tableNameWithType + 
", segment: " + segmentName);
+    LOGGER.info("Added segment: {} of table: {} to property store", 
segmentName, tableNameWithType);
+    assignTableSegment(tableNameWithType, segmentName, segmentZKMetadataPath, 
instancePartitionsType);
+  }
+
 
+  private void assignTableSegment(String tableNameWithType, String 
segmentName, String segmentZKMetadataPath,
+      InstancePartitionsType instancePartitionsType) {
     // Assign instances for the segment and add it into IdealState
     try {
-      TableConfig offlineTableConfig = getTableConfig(offlineTableName);
+      TableConfig tableConfig = getTableConfig(tableNameWithType);
       Preconditions
-          .checkState(offlineTableConfig != null, "Failed to find table config 
for table: " + offlineTableName);
+          .checkState(tableConfig != null, "Failed to find table config for 
table: " + tableNameWithType);
       SegmentAssignment segmentAssignment =
-          SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager, 
offlineTableConfig);
+          SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager, 
tableConfig);
       Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = 
Collections
-          .singletonMap(InstancePartitionsType.OFFLINE, InstancePartitionsUtils
-              .fetchOrComputeInstancePartitions(_helixZkManager, 
offlineTableConfig, InstancePartitionsType.OFFLINE));
-      synchronized (getTableUpdaterLock(offlineTableName)) {
-        HelixHelper.updateIdealState(_helixZkManager, offlineTableName, 
idealState -> {
+          .singletonMap(instancePartitionsType, InstancePartitionsUtils
+              .fetchOrComputeInstancePartitions(_helixZkManager, tableConfig, 
instancePartitionsType));
+      synchronized (getTableUpdaterLock(tableNameWithType)) {
+        HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, 
idealState -> {
           assert idealState != null;
           Map<String, Map<String, String>> currentAssignment = 
idealState.getRecord().getMapFields();
           if (currentAssignment.containsKey(segmentName)) {
             LOGGER.warn("Segment: {} already exists in the IdealState for 
table: {}, do not update", segmentName,
-                offlineTableName);
+                tableNameWithType);
           } else {
             List<String> assignedInstances = 
segmentAssignment.assignSegment(segmentName, currentAssignment, 
instancePartitionsMap);
             LOGGER.info("Assigning segment: {} to instances: {} for table: 
{}", segmentName, assignedInstances,
-                offlineTableName);
-            currentAssignment.put(segmentName, 
SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, 
SegmentStateModel.ONLINE));
+                tableNameWithType);
+            currentAssignment.put(segmentName, SegmentAssignmentUtils
+                .getInstanceStateMap(assignedInstances, 
SegmentStateModel.ONLINE));
           }
           return idealState;
         });
-        LOGGER.info("Added segment: {} to IdealState for table: {}", 
segmentName, offlineTableName);
+        LOGGER.info("Added segment: {} to IdealState for table: {}", 
segmentName, tableNameWithType);
       }
     } catch (Exception e) {
       LOGGER
           .error("Caught exception while adding segment: {} to IdealState for 
table: {}, deleting segment ZK metadata",
-              segmentName, offlineTableName, e);
+              segmentName, tableNameWithType, e);
       if (_propertyStore.remove(segmentZKMetadataPath, 
AccessOption.PERSISTENT)) {
-        LOGGER.info("Deleted segment ZK metadata for segment: {} of table: 
{}", segmentName, offlineTableName);
+        LOGGER.info("Deleted segment ZK metadata for segment: {} of table: 
{}", segmentName, tableNameWithType);
       } else {
         LOGGER
-            .error("Failed to deleted segment ZK metadata for segment: {} of 
table: {}", segmentName, offlineTableName);
+            .error("Failed to deleted segment ZK metadata for segment: {} of 
table: {}", segmentName, tableNameWithType);
       }
       throw e;
     }
   }
 
+  public boolean isUpsertTable(String tableName) {
+    if (hasOfflineTable(tableName))
+      return false;
+    if (!hasRealtimeTable(tableName))

Review comment:
       is this if-condition redundant?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to