rajagopr commented on code in PR #13646: URL: https://github.com/apache/pinot/pull/13646#discussion_r1755532575
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java: ########## @@ -2358,6 +2358,84 @@ public void assignTableSegment(String tableNameWithType, String segmentName) { } } + // Assign a list of segments in batch mode + public void assignTableSegments(String tableNameWithType, List<String> segmentNames) { + Map<String, String> segmentZKMetadataPathMap = new HashMap<>(); + for (String segmentName: segmentNames) { + String segmentZKMetadataPath = ZKMetadataProvider.constructPropertyStorePathForSegment(tableNameWithType, + segmentName); + segmentZKMetadataPathMap.put(segmentName, segmentZKMetadataPath); + } + // Assign instances for the segment and add it into IdealState + try { + TableConfig tableConfig = getTableConfig(tableNameWithType); + Preconditions.checkState(tableConfig != null, "Failed to find table config for table: " + tableNameWithType); + + Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = + fetchOrComputeInstancePartitions(tableNameWithType, tableConfig); + + // Initialize tier information only in case direct tier assignment is configured + if (_enableTieredSegmentAssignment && CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList())) { + List<Tier> sortedTiers = TierConfigUtils.getSortedTiersForStorageType(tableConfig.getTierConfigsList(), + TierFactory.PINOT_SERVER_STORAGE_TYPE, _helixZkManager); + for (String segmentName: segmentNames) { + // Update segment tier to support direct assignment for multiple data directories + updateSegmentTargetTier(tableNameWithType, segmentName, sortedTiers); + InstancePartitions tierInstancePartitions = TierConfigUtils.getTieredInstancePartitionsForSegment( + tableNameWithType, segmentName, sortedTiers, _helixZkManager); + if (tierInstancePartitions != null && TableNameBuilder.isOfflineTableResource(tableNameWithType)) { + // Override instance partitions for offline table + LOGGER.info("Overriding with tiered instance partitions: {} for segment: {} of table: {}", + tierInstancePartitions, segmentName, tableNameWithType); + instancePartitionsMap = Collections.singletonMap(InstancePartitionsType.OFFLINE, tierInstancePartitions); + } + } + } + + SegmentAssignment segmentAssignment = + SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager, tableConfig, _controllerMetrics); + synchronized (getIdealStateUpdaterLock(tableNameWithType)) { + long segmentAssignmentStartTs = System.currentTimeMillis(); + Map<InstancePartitionsType, InstancePartitions> finalInstancePartitionsMap = instancePartitionsMap; + HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, idealState -> { + assert idealState != null; + for (String segmentName: segmentNames) { + Map<String, Map<String, String>> currentAssignment = idealState.getRecord().getMapFields(); + if (currentAssignment.containsKey(segmentName)) { + LOGGER.warn("Segment: {} already exists in the IdealState for table: {}, do not update", segmentName, + tableNameWithType); + } else { + List<String> assignedInstances = + segmentAssignment.assignSegment(segmentName, currentAssignment, finalInstancePartitionsMap); + LOGGER.info("Assigning segment: {} to instances: {} for table: {}", segmentName, assignedInstances, + tableNameWithType); + currentAssignment.put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, + SegmentStateModel.ONLINE)); + } + } + return idealState; + }); + LOGGER.info("Added {} segments: {} to IdealState for table: {} in {} ms", segmentNames.size(), segmentNames, + tableNameWithType, System.currentTimeMillis() - segmentAssignmentStartTs); + } + } catch (Exception e) { + LOGGER.error( Review Comment: In the failure path, the code removes the segments from the ZK property store. This path is covered in the tests related to `SegmentDeletionManager`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org