rajagopr commented on code in PR #13597: URL: https://github.com/apache/pinot/pull/13597#discussion_r1681484482
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java: ########## @@ -2327,6 +2327,82 @@ public void assignTableSegment(String tableNameWithType, String segmentName) { } } + // Assign a list of segments in batch mode + public void assignTableSegments(String tableNameWithType, List<String> segmentNames) { + Map<String, String> segmentZKMetadataPathMap = new HashMap<>(); + for (String segmentName: segmentNames) { + String segmentZKMetadataPath = ZKMetadataProvider.constructPropertyStorePathForSegment(tableNameWithType, + segmentName); + segmentZKMetadataPathMap.put(segmentName, segmentZKMetadataPath); + } + // Assign instances for the segment and add it into IdealState + try { + TableConfig tableConfig = getTableConfig(tableNameWithType); + Preconditions.checkState(tableConfig != null, "Failed to find table config for table: " + tableNameWithType); + + Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = + fetchOrComputeInstancePartitions(tableNameWithType, tableConfig); + + // Initialize tier information only in case direct tier assignment is configured + if (_enableTieredSegmentAssignment && CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList())) { + List<Tier> sortedTiers = TierConfigUtils.getSortedTiersForStorageType(tableConfig.getTierConfigsList(), + TierFactory.PINOT_SERVER_STORAGE_TYPE, _helixZkManager); + for (String segmentName: segmentNames) { + // Update segment tier to support direct assignment for multiple data directories + updateSegmentTargetTier(tableNameWithType, segmentName, sortedTiers); + InstancePartitions tierInstancePartitions = TierConfigUtils.getTieredInstancePartitionsForSegment( + tableNameWithType, segmentName, sortedTiers, _helixZkManager); + if (tierInstancePartitions != null && TableNameBuilder.isOfflineTableResource(tableNameWithType)) { + // Override instance partitions for offline table + LOGGER.info("Overriding with tiered instance partitions: {} for segment: {} of table: {}", + tierInstancePartitions, segmentName, tableNameWithType); + instancePartitionsMap = Collections.singletonMap(InstancePartitionsType.OFFLINE, tierInstancePartitions); + } + } + } + + SegmentAssignment segmentAssignment = + SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager, tableConfig, _controllerMetrics); + synchronized (getTableUpdaterLock(tableNameWithType)) { Review Comment: Added the logs. The metrics would be too sparse since this is not a continuous activity and only happens when segments are uploaded? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java: ########## @@ -2327,6 +2327,82 @@ public void assignTableSegment(String tableNameWithType, String segmentName) { } } + // Assign a list of segments in batch mode + public void assignTableSegments(String tableNameWithType, List<String> segmentNames) { + Map<String, String> segmentZKMetadataPathMap = new HashMap<>(); + for (String segmentName: segmentNames) { + String segmentZKMetadataPath = ZKMetadataProvider.constructPropertyStorePathForSegment(tableNameWithType, + segmentName); + segmentZKMetadataPathMap.put(segmentName, segmentZKMetadataPath); + } + // Assign instances for the segment and add it into IdealState + try { + TableConfig tableConfig = getTableConfig(tableNameWithType); + Preconditions.checkState(tableConfig != null, "Failed to find table config for table: " + tableNameWithType); + + Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = + fetchOrComputeInstancePartitions(tableNameWithType, tableConfig); + + // Initialize tier information only in case direct tier assignment is configured + if (_enableTieredSegmentAssignment && CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList())) { + List<Tier> sortedTiers = TierConfigUtils.getSortedTiersForStorageType(tableConfig.getTierConfigsList(), + TierFactory.PINOT_SERVER_STORAGE_TYPE, _helixZkManager); + for (String segmentName: segmentNames) { + // Update segment tier to support direct assignment for multiple data directories + updateSegmentTargetTier(tableNameWithType, segmentName, sortedTiers); + InstancePartitions tierInstancePartitions = TierConfigUtils.getTieredInstancePartitionsForSegment( + tableNameWithType, segmentName, sortedTiers, _helixZkManager); + if (tierInstancePartitions != null && TableNameBuilder.isOfflineTableResource(tableNameWithType)) { + // Override instance partitions for offline table + LOGGER.info("Overriding with tiered instance partitions: {} for segment: {} of table: {}", + tierInstancePartitions, segmentName, tableNameWithType); + instancePartitionsMap = Collections.singletonMap(InstancePartitionsType.OFFLINE, tierInstancePartitions); + } + } + } + + SegmentAssignment segmentAssignment = + SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager, tableConfig, _controllerMetrics); + synchronized (getTableUpdaterLock(tableNameWithType)) { + Map<InstancePartitionsType, InstancePartitions> finalInstancePartitionsMap = instancePartitionsMap; + HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, idealState -> { + assert idealState != null; + for (String segmentName: segmentNames) { + Map<String, Map<String, String>> currentAssignment = idealState.getRecord().getMapFields(); + if (currentAssignment.containsKey(segmentName)) { + LOGGER.warn("Segment: {} already exists in the IdealState for table: {}, do not update", segmentName, + tableNameWithType); + } else { + List<String> assignedInstances = + segmentAssignment.assignSegment(segmentName, currentAssignment, finalInstancePartitionsMap); + LOGGER.info("Assigning segment: {} to instances: {} for table: {}", segmentName, assignedInstances, + tableNameWithType); + currentAssignment.put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, + SegmentStateModel.ONLINE)); + } + } + return idealState; + }); + LOGGER.info("Added segments: {} to IdealState for table: {}", segmentNames, tableNameWithType); + } + } catch (Exception e) { + LOGGER.error( + "Caught exception while adding segments: {} to IdealState for table: {}, deleting segments ZK metadata", + segmentNames, tableNameWithType, e); + for (Map.Entry<String, String> segmentZKMetadataPathEntry: segmentZKMetadataPathMap.entrySet()) { + String segmentName = segmentZKMetadataPathEntry.getKey(); + String segmentZKMetadataPath = segmentZKMetadataPathEntry.getValue(); + if (_propertyStore.remove(segmentZKMetadataPath, AccessOption.PERSISTENT)) { + LOGGER.info("Deleted segment ZK metadata for segment: {} of table: {}", segmentName, tableNameWithType); + } else { + LOGGER.error("Failed to deleted segment ZK metadata for segment: {} of table: {}", segmentName, Review Comment: Yes, the original logic doesn't retry as well. I'm not sure if we have reason to believe that retry would make the remove operation succeed. It looks like the helix library itself does some error handling once the remove method is called. ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java: ########## @@ -277,6 +278,8 @@ public static void sendSegmentUriAndMetadata(SegmentGenerationJobSpec spec, Pino Map<String, String> segmentUriToTarPathMap, List<Header> headers, List<NameValuePair> parameters) throws Exception { String tableName = spec.getTableSpec().getTableName(); + Map<String, File> segmentMetadataFileMap = new HashMap<>(); + Map<String, String> segmentUriPathMap = new HashMap<>(); Review Comment: If we introduce a limit, then we need to invoke this API in batches. For example, if we have 100 segments to upload and if we have set the limit on batch size as 20, then we have to call this API 5 times. As of now, we are dependent on the SegmentConversionResult size and expect the size to be a reasonable number. Let's discuss and set reasonable limits. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org