rajagopr commented on code in PR #13597:
URL: https://github.com/apache/pinot/pull/13597#discussion_r1681484482


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -2327,6 +2327,82 @@ public void assignTableSegment(String tableNameWithType, 
String segmentName) {
     }
   }
 
+  // Assign a list of segments in batch mode
+  public void assignTableSegments(String tableNameWithType, List<String> 
segmentNames) {
+    Map<String, String> segmentZKMetadataPathMap = new HashMap<>();
+    for (String segmentName: segmentNames) {
+      String segmentZKMetadataPath = 
ZKMetadataProvider.constructPropertyStorePathForSegment(tableNameWithType,
+          segmentName);
+      segmentZKMetadataPathMap.put(segmentName, segmentZKMetadataPath);
+    }
+    // Assign instances for the segment and add it into IdealState
+    try {
+      TableConfig tableConfig = getTableConfig(tableNameWithType);
+      Preconditions.checkState(tableConfig != null, "Failed to find table 
config for table: " + tableNameWithType);
+
+      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
+          fetchOrComputeInstancePartitions(tableNameWithType, tableConfig);
+
+      // Initialize tier information only in case direct tier assignment is 
configured
+      if (_enableTieredSegmentAssignment && 
CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList())) {
+        List<Tier> sortedTiers = 
TierConfigUtils.getSortedTiersForStorageType(tableConfig.getTierConfigsList(),
+            TierFactory.PINOT_SERVER_STORAGE_TYPE, _helixZkManager);
+        for (String segmentName: segmentNames) {
+          // Update segment tier to support direct assignment for multiple 
data directories
+          updateSegmentTargetTier(tableNameWithType, segmentName, sortedTiers);
+          InstancePartitions tierInstancePartitions = 
TierConfigUtils.getTieredInstancePartitionsForSegment(
+              tableNameWithType, segmentName, sortedTiers, _helixZkManager);
+          if (tierInstancePartitions != null && 
TableNameBuilder.isOfflineTableResource(tableNameWithType)) {
+            // Override instance partitions for offline table
+            LOGGER.info("Overriding with tiered instance partitions: {} for 
segment: {} of table: {}",
+                tierInstancePartitions, segmentName, tableNameWithType);
+            instancePartitionsMap = 
Collections.singletonMap(InstancePartitionsType.OFFLINE, 
tierInstancePartitions);
+          }
+        }
+      }
+
+      SegmentAssignment segmentAssignment =
+          SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager, 
tableConfig, _controllerMetrics);
+      synchronized (getTableUpdaterLock(tableNameWithType)) {

Review Comment:
   Added the logs. The metrics would be too sparse since this is not a 
continuous activity and only happens when segments are uploaded?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -2327,6 +2327,82 @@ public void assignTableSegment(String tableNameWithType, 
String segmentName) {
     }
   }
 
+  // Assign a list of segments in batch mode
+  public void assignTableSegments(String tableNameWithType, List<String> 
segmentNames) {
+    Map<String, String> segmentZKMetadataPathMap = new HashMap<>();
+    for (String segmentName: segmentNames) {
+      String segmentZKMetadataPath = 
ZKMetadataProvider.constructPropertyStorePathForSegment(tableNameWithType,
+          segmentName);
+      segmentZKMetadataPathMap.put(segmentName, segmentZKMetadataPath);
+    }
+    // Assign instances for the segment and add it into IdealState
+    try {
+      TableConfig tableConfig = getTableConfig(tableNameWithType);
+      Preconditions.checkState(tableConfig != null, "Failed to find table 
config for table: " + tableNameWithType);
+
+      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
+          fetchOrComputeInstancePartitions(tableNameWithType, tableConfig);
+
+      // Initialize tier information only in case direct tier assignment is 
configured
+      if (_enableTieredSegmentAssignment && 
CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList())) {
+        List<Tier> sortedTiers = 
TierConfigUtils.getSortedTiersForStorageType(tableConfig.getTierConfigsList(),
+            TierFactory.PINOT_SERVER_STORAGE_TYPE, _helixZkManager);
+        for (String segmentName: segmentNames) {
+          // Update segment tier to support direct assignment for multiple 
data directories
+          updateSegmentTargetTier(tableNameWithType, segmentName, sortedTiers);
+          InstancePartitions tierInstancePartitions = 
TierConfigUtils.getTieredInstancePartitionsForSegment(
+              tableNameWithType, segmentName, sortedTiers, _helixZkManager);
+          if (tierInstancePartitions != null && 
TableNameBuilder.isOfflineTableResource(tableNameWithType)) {
+            // Override instance partitions for offline table
+            LOGGER.info("Overriding with tiered instance partitions: {} for 
segment: {} of table: {}",
+                tierInstancePartitions, segmentName, tableNameWithType);
+            instancePartitionsMap = 
Collections.singletonMap(InstancePartitionsType.OFFLINE, 
tierInstancePartitions);
+          }
+        }
+      }
+
+      SegmentAssignment segmentAssignment =
+          SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager, 
tableConfig, _controllerMetrics);
+      synchronized (getTableUpdaterLock(tableNameWithType)) {
+        Map<InstancePartitionsType, InstancePartitions> 
finalInstancePartitionsMap = instancePartitionsMap;
+        HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, 
idealState -> {
+          assert idealState != null;
+          for (String segmentName: segmentNames) {
+            Map<String, Map<String, String>> currentAssignment = 
idealState.getRecord().getMapFields();
+            if (currentAssignment.containsKey(segmentName)) {
+              LOGGER.warn("Segment: {} already exists in the IdealState for 
table: {}, do not update", segmentName,
+                  tableNameWithType);
+            } else {
+              List<String> assignedInstances =
+                  segmentAssignment.assignSegment(segmentName, 
currentAssignment, finalInstancePartitionsMap);
+              LOGGER.info("Assigning segment: {} to instances: {} for table: 
{}", segmentName, assignedInstances,
+                  tableNameWithType);
+              currentAssignment.put(segmentName, 
SegmentAssignmentUtils.getInstanceStateMap(assignedInstances,
+                  SegmentStateModel.ONLINE));
+            }
+          }
+          return idealState;
+        });
+        LOGGER.info("Added segments: {} to IdealState for table: {}", 
segmentNames, tableNameWithType);
+      }
+    } catch (Exception e) {
+      LOGGER.error(
+          "Caught exception while adding segments: {} to IdealState for table: 
{}, deleting segments ZK metadata",
+          segmentNames, tableNameWithType, e);
+      for (Map.Entry<String, String> segmentZKMetadataPathEntry: 
segmentZKMetadataPathMap.entrySet()) {
+        String segmentName = segmentZKMetadataPathEntry.getKey();
+        String segmentZKMetadataPath = segmentZKMetadataPathEntry.getValue();
+        if (_propertyStore.remove(segmentZKMetadataPath, 
AccessOption.PERSISTENT)) {
+          LOGGER.info("Deleted segment ZK metadata for segment: {} of table: 
{}", segmentName, tableNameWithType);
+        } else {
+          LOGGER.error("Failed to deleted segment ZK metadata for segment: {} 
of table: {}", segmentName,

Review Comment:
   Yes, the original logic doesn't retry as well. I'm not sure if we have 
reason to believe that retry would make the remove operation succeed. It looks 
like the helix library itself does some error handling once the remove method 
is called.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java:
##########
@@ -277,6 +278,8 @@ public static void 
sendSegmentUriAndMetadata(SegmentGenerationJobSpec spec, Pino
       Map<String, String> segmentUriToTarPathMap, List<Header> headers, 
List<NameValuePair> parameters)
       throws Exception {
     String tableName = spec.getTableSpec().getTableName();
+    Map<String, File> segmentMetadataFileMap = new HashMap<>();
+    Map<String, String> segmentUriPathMap = new HashMap<>();

Review Comment:
   If we introduce a limit, then we need to invoke this API in batches. For 
example, if we have 100 segments to upload and if we have set the limit on 
batch size as 20, then we have to call this API 5 times. As of now, we are 
dependent on the SegmentConversionResult size and expect the size to be a 
reasonable number. Let's discuss and set reasonable limits.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to