rajagopr commented on code in PR #13646:
URL: https://github.com/apache/pinot/pull/13646#discussion_r1755532575


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -2358,6 +2358,84 @@ public void assignTableSegment(String tableNameWithType, 
String segmentName) {
     }
   }
 
+  // Assign a list of segments in batch mode
+  public void assignTableSegments(String tableNameWithType, List<String> 
segmentNames) {
+    Map<String, String> segmentZKMetadataPathMap = new HashMap<>();
+    for (String segmentName: segmentNames) {
+      String segmentZKMetadataPath = 
ZKMetadataProvider.constructPropertyStorePathForSegment(tableNameWithType,
+          segmentName);
+      segmentZKMetadataPathMap.put(segmentName, segmentZKMetadataPath);
+    }
+    // Assign instances for the segment and add it into IdealState
+    try {
+      TableConfig tableConfig = getTableConfig(tableNameWithType);
+      Preconditions.checkState(tableConfig != null, "Failed to find table 
config for table: " + tableNameWithType);
+
+      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
+          fetchOrComputeInstancePartitions(tableNameWithType, tableConfig);
+
+      // Initialize tier information only in case direct tier assignment is 
configured
+      if (_enableTieredSegmentAssignment && 
CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList())) {
+        List<Tier> sortedTiers = 
TierConfigUtils.getSortedTiersForStorageType(tableConfig.getTierConfigsList(),
+            TierFactory.PINOT_SERVER_STORAGE_TYPE, _helixZkManager);
+        for (String segmentName: segmentNames) {
+          // Update segment tier to support direct assignment for multiple 
data directories
+          updateSegmentTargetTier(tableNameWithType, segmentName, sortedTiers);
+          InstancePartitions tierInstancePartitions = 
TierConfigUtils.getTieredInstancePartitionsForSegment(
+              tableNameWithType, segmentName, sortedTiers, _helixZkManager);
+          if (tierInstancePartitions != null && 
TableNameBuilder.isOfflineTableResource(tableNameWithType)) {
+            // Override instance partitions for offline table
+            LOGGER.info("Overriding with tiered instance partitions: {} for 
segment: {} of table: {}",
+                tierInstancePartitions, segmentName, tableNameWithType);
+            instancePartitionsMap = 
Collections.singletonMap(InstancePartitionsType.OFFLINE, 
tierInstancePartitions);
+          }
+        }
+      }
+
+      SegmentAssignment segmentAssignment =
+          SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager, 
tableConfig, _controllerMetrics);
+      synchronized (getIdealStateUpdaterLock(tableNameWithType)) {
+        long segmentAssignmentStartTs = System.currentTimeMillis();
+        Map<InstancePartitionsType, InstancePartitions> 
finalInstancePartitionsMap = instancePartitionsMap;
+        HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, 
idealState -> {
+          assert idealState != null;
+          for (String segmentName: segmentNames) {
+            Map<String, Map<String, String>> currentAssignment = 
idealState.getRecord().getMapFields();
+            if (currentAssignment.containsKey(segmentName)) {
+              LOGGER.warn("Segment: {} already exists in the IdealState for 
table: {}, do not update", segmentName,
+                  tableNameWithType);
+            } else {
+              List<String> assignedInstances =
+                  segmentAssignment.assignSegment(segmentName, 
currentAssignment, finalInstancePartitionsMap);
+              LOGGER.info("Assigning segment: {} to instances: {} for table: 
{}", segmentName, assignedInstances,
+                  tableNameWithType);
+              currentAssignment.put(segmentName, 
SegmentAssignmentUtils.getInstanceStateMap(assignedInstances,
+                  SegmentStateModel.ONLINE));
+            }
+          }
+          return idealState;
+        });
+        LOGGER.info("Added {} segments: {} to IdealState for table: {} in {} 
ms", segmentNames.size(), segmentNames,
+            tableNameWithType, System.currentTimeMillis() - 
segmentAssignmentStartTs);
+      }
+    } catch (Exception e) {
+      LOGGER.error(

Review Comment:
   In the failure path, the code removes the segments from the ZK property 
store. This path is covered in the tests related to `SegmentDeletionManager`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to