This is an automated email from the ASF dual-hosted git repository. snlee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new bad572ecb6 [MergeRollupTask] include partition info into segment name (#9815) bad572ecb6 is described below commit bad572ecb636d766862b048be47c9b6bfa8ab4ef Author: Haitao Zhang <hai...@startree.ai> AuthorDate: Thu Nov 17 18:11:23 2022 -0800 [MergeRollupTask] include partition info into segment name (#9815) --- .../mergerollup/MergeRollupTaskGenerator.java | 32 ++++++++++++++++------ 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java index 0983e9f4cd..28043c2f60 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java @@ -105,6 +105,7 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator { private static final int DEFAULT_MAX_NUM_RECORDS_PER_TASK = 50_000_000; private static final int DEFAULT_NUM_PARALLEL_BUCKETS = 1; private static final String REFRESH = "REFRESH"; + private static final String DELIMITER_IN_SEGMENT_NAME = "_"; // This is the metric that keeps track of the task delay in the number of time buckets. For example, if we see this // number to be 7 and merge task is configured with "bucketTimePeriod = 1d", this means that we have 7 days of @@ -349,7 +350,7 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator { for (List<SegmentZKMetadata> selectedSegmentsPerBucket : selectedSegmentsForAllBuckets) { pinotTaskConfigsForTable.addAll( createPinotTaskConfigs(selectedSegmentsPerBucket, offlineTableName, maxNumRecordsPerTask, mergeLevel, - mergeConfigs, taskConfigs)); + null, mergeConfigs, taskConfigs)); } } else { // For partitioned table, schedule separate tasks for each partitionId (partitionId is constructed from @@ -383,16 +384,18 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator { } } - for (List<SegmentZKMetadata> partitionedSegments : partitionToSegments.values()) { + for (Map.Entry<List<Integer>, List<SegmentZKMetadata>> entry : partitionToSegments.entrySet()) { + List<Integer> partition = entry.getKey(); + List<SegmentZKMetadata> partitionedSegments = entry.getValue(); pinotTaskConfigsForTable.addAll( createPinotTaskConfigs(partitionedSegments, offlineTableName, maxNumRecordsPerTask, mergeLevel, - mergeConfigs, taskConfigs)); + partition, mergeConfigs, taskConfigs)); } if (!outlierSegments.isEmpty()) { pinotTaskConfigsForTable.addAll( createPinotTaskConfigs(outlierSegments, offlineTableName, maxNumRecordsPerTask, mergeLevel, - mergeConfigs, taskConfigs)); + null, mergeConfigs, taskConfigs)); } } } @@ -516,8 +519,8 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator { * Create pinot task configs with selected segments and configs */ private List<PinotTaskConfig> createPinotTaskConfigs(List<SegmentZKMetadata> selectedSegments, - String offlineTableName, int maxNumRecordsPerTask, String mergeLevel, Map<String, String> mergeConfigs, - Map<String, String> taskConfigs) { + String offlineTableName, int maxNumRecordsPerTask, String mergeLevel, List<Integer> partition, + Map<String, String> mergeConfigs, Map<String, String> taskConfigs) { int numRecordsPerTask = 0; List<List<String>> segmentNamesList = new ArrayList<>(); List<List<String>> downloadURLsList = new ArrayList<>(); @@ -539,6 +542,15 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator { } List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>(); + + StringBuilder partitionSuffixBuilder = new StringBuilder(); + if (partition != null && !partition.isEmpty()) { + for (int columnPartition : partition) { + partitionSuffixBuilder.append(DELIMITER_IN_SEGMENT_NAME).append(columnPartition); + } + } + String partitionSuffix = partitionSuffixBuilder.toString(); + for (int i = 0; i < segmentNamesList.size(); i++) { Map<String, String> configs = new HashMap<>(); configs.put(MinionConstants.TABLE_NAME_KEY, offlineTableName); @@ -562,9 +574,13 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator { configs.put(MergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY, mergeConfigs.get(MergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY)); + // Segment name conflict happens when the current method "createPinotTaskConfigs" is invoked more than once within + // the same epoch millisecond, which may happen when there are multiple partitions. + // To prevent such name conflict, we include a partitionSeqSuffix to the segment name. configs.put(MergeRollupTask.SEGMENT_NAME_PREFIX_KEY, - MergeRollupTask.MERGED_SEGMENT_NAME_PREFIX + mergeLevel + "_" + System.currentTimeMillis() + "_" + i + "_" - + TableNameBuilder.extractRawTableName(offlineTableName)); + MergeRollupTask.MERGED_SEGMENT_NAME_PREFIX + mergeLevel + DELIMITER_IN_SEGMENT_NAME + + System.currentTimeMillis() + partitionSuffix + DELIMITER_IN_SEGMENT_NAME + i + + DELIMITER_IN_SEGMENT_NAME + TableNameBuilder.extractRawTableName(offlineTableName)); pinotTaskConfigs.add(new PinotTaskConfig(MergeRollupTask.TASK_TYPE, configs)); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org