This is an automated email from the ASF dual-hosted git repository. snlee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new eefa8fbfe8 Allow custom segment grouping in MergeRollupTask based on lineage metadata (#10964) eefa8fbfe8 is described below commit eefa8fbfe86bc5b1412f9b9085364fb8a773476f Author: Jiapeng Tao <jia...@linkedin.com> AuthorDate: Mon Jul 3 16:03:58 2023 -0700 Allow custom segment grouping in MergeRollupTask based on lineage metadata (#10964) --- .../apache/pinot/core/common/MinionConstants.java | 3 + .../mergerollup/MergeRollupTaskGenerator.java | 130 +++++++++++---------- .../DefaultMergeRollupTaskSegmentGroupManager.java | 35 ++++++ .../MergeRollupTaskSegmentGroupManager.java | 36 ++++++ ...MergeRollupTaskSegmentGroupManagerProvider.java | 46 ++++++++ 5 files changed, 188 insertions(+), 62 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java index 530f757048..8e4c5cfc1c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java @@ -118,6 +118,9 @@ public class MinionConstants { public static final String SEGMENT_ZK_METADATA_TIME_KEY = TASK_TYPE + TASK_TIME_SUFFIX; public static final String MERGED_SEGMENT_NAME_PREFIX = "merged_"; + + // Custom segment group manager class name + public static final String SEGMENT_GROUP_MANAGER_CLASS_NAME_KEY = "segment.group.manager.class.name"; } /** diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java index a19634d525..7d87abc96d 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java @@ -49,6 +49,7 @@ import org.apache.pinot.core.common.MinionConstants.MergeTask; import org.apache.pinot.core.minion.PinotTaskConfig; import org.apache.pinot.plugin.minion.tasks.MergeTaskUtils; import org.apache.pinot.plugin.minion.tasks.MinionTaskUtils; +import org.apache.pinot.plugin.minion.tasks.mergerollup.segmentgroupmananger.MergeRollupTaskSegmentGroupManagerProvider; import org.apache.pinot.spi.annotations.minion.TaskGenerator; import org.apache.pinot.spi.config.table.ColumnPartitionConfig; import org.apache.pinot.spi.config.table.SegmentPartitionConfig; @@ -409,7 +410,7 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator { if (segmentPartitionConfig == null) { for (List<SegmentZKMetadata> selectedSegmentsPerBucket : selectedSegmentsForAllBuckets) { pinotTaskConfigsForTable.addAll( - createPinotTaskConfigs(selectedSegmentsPerBucket, tableNameWithType, maxNumRecordsPerTask, mergeLevel, + createPinotTaskConfigs(selectedSegmentsPerBucket, tableConfig, maxNumRecordsPerTask, mergeLevel, null, mergeConfigs, taskConfigs)); } } else { @@ -448,13 +449,13 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator { List<Integer> partition = entry.getKey(); List<SegmentZKMetadata> partitionedSegments = entry.getValue(); pinotTaskConfigsForTable.addAll( - createPinotTaskConfigs(partitionedSegments, tableNameWithType, maxNumRecordsPerTask, mergeLevel, + createPinotTaskConfigs(partitionedSegments, tableConfig, maxNumRecordsPerTask, mergeLevel, partition, mergeConfigs, taskConfigs)); } if (!outlierSegments.isEmpty()) { pinotTaskConfigsForTable.addAll( - createPinotTaskConfigs(outlierSegments, tableNameWithType, maxNumRecordsPerTask, mergeLevel, + createPinotTaskConfigs(outlierSegments, tableConfig, maxNumRecordsPerTask, mergeLevel, null, mergeConfigs, taskConfigs)); } } @@ -651,72 +652,77 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator { * Create pinot task configs with selected segments and configs */ private List<PinotTaskConfig> createPinotTaskConfigs(List<SegmentZKMetadata> selectedSegments, - String tableNameWithType, int maxNumRecordsPerTask, String mergeLevel, List<Integer> partition, + TableConfig tableConfig, int maxNumRecordsPerTask, String mergeLevel, List<Integer> partition, Map<String, String> mergeConfigs, Map<String, String> taskConfigs) { - int numRecordsPerTask = 0; - List<List<String>> segmentNamesList = new ArrayList<>(); - List<List<String>> downloadURLsList = new ArrayList<>(); - List<String> segmentNames = new ArrayList<>(); - List<String> downloadURLs = new ArrayList<>(); - - for (int i = 0; i < selectedSegments.size(); i++) { - SegmentZKMetadata targetSegment = selectedSegments.get(i); - segmentNames.add(targetSegment.getSegmentName()); - downloadURLs.add(targetSegment.getDownloadUrl()); - numRecordsPerTask += targetSegment.getTotalDocs(); - if (numRecordsPerTask >= maxNumRecordsPerTask || i == selectedSegments.size() - 1) { - segmentNamesList.add(segmentNames); - downloadURLsList.add(downloadURLs); - numRecordsPerTask = 0; - segmentNames = new ArrayList<>(); - downloadURLs = new ArrayList<>(); - } - } - + String tableNameWithType = tableConfig.getTableName(); + List<List<SegmentZKMetadata>> segmentGroups = MergeRollupTaskSegmentGroupManagerProvider.create(taskConfigs) + .getSegmentGroups(tableConfig, _clusterInfoAccessor, selectedSegments); List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>(); - StringBuilder partitionSuffixBuilder = new StringBuilder(); - if (partition != null && !partition.isEmpty()) { - for (int columnPartition : partition) { - partitionSuffixBuilder.append(DELIMITER_IN_SEGMENT_NAME).append(columnPartition); + for (List<SegmentZKMetadata> segments : segmentGroups) { + int numRecordsPerTask = 0; + List<List<String>> segmentNamesList = new ArrayList<>(); + List<List<String>> downloadURLsList = new ArrayList<>(); + List<String> segmentNames = new ArrayList<>(); + List<String> downloadURLs = new ArrayList<>(); + + for (int i = 0; i < segments.size(); i++) { + SegmentZKMetadata targetSegment = segments.get(i); + segmentNames.add(targetSegment.getSegmentName()); + downloadURLs.add(targetSegment.getDownloadUrl()); + numRecordsPerTask += targetSegment.getTotalDocs(); + if (numRecordsPerTask >= maxNumRecordsPerTask || i == segments.size() - 1) { + segmentNamesList.add(segmentNames); + downloadURLsList.add(downloadURLs); + numRecordsPerTask = 0; + segmentNames = new ArrayList<>(); + downloadURLs = new ArrayList<>(); + } } - } - String partitionSuffix = partitionSuffixBuilder.toString(); - - for (int i = 0; i < segmentNamesList.size(); i++) { - String downloadURL = StringUtils.join(downloadURLsList.get(i), MinionConstants.URL_SEPARATOR); - Map<String, String> configs = MinionTaskUtils.getPushTaskConfig(tableNameWithType, taskConfigs, - _clusterInfoAccessor); - configs.put(MinionConstants.TABLE_NAME_KEY, tableNameWithType); - configs.put(MinionConstants.SEGMENT_NAME_KEY, - StringUtils.join(segmentNamesList.get(i), MinionConstants.SEGMENT_NAME_SEPARATOR)); - configs.put(MinionConstants.DOWNLOAD_URL_KEY, downloadURL); - configs.put(MinionConstants.UPLOAD_URL_KEY, _clusterInfoAccessor.getVipUrl() + "/segments"); - configs.put(MinionConstants.ENABLE_REPLACE_SEGMENTS_KEY, "true"); - - for (Map.Entry<String, String> taskConfig : taskConfigs.entrySet()) { - if (taskConfig.getKey().endsWith(MinionConstants.MergeRollupTask.AGGREGATION_TYPE_KEY_SUFFIX)) { - configs.put(taskConfig.getKey(), taskConfig.getValue()); + + StringBuilder partitionSuffixBuilder = new StringBuilder(); + if (partition != null && !partition.isEmpty()) { + for (int columnPartition : partition) { + partitionSuffixBuilder.append(DELIMITER_IN_SEGMENT_NAME).append(columnPartition); } } + String partitionSuffix = partitionSuffixBuilder.toString(); + + for (int i = 0; i < segmentNamesList.size(); i++) { + String downloadURL = StringUtils.join(downloadURLsList.get(i), MinionConstants.URL_SEPARATOR); + Map<String, String> configs = MinionTaskUtils.getPushTaskConfig(tableNameWithType, taskConfigs, + _clusterInfoAccessor); + configs.put(MinionConstants.TABLE_NAME_KEY, tableNameWithType); + configs.put(MinionConstants.SEGMENT_NAME_KEY, + StringUtils.join(segmentNamesList.get(i), MinionConstants.SEGMENT_NAME_SEPARATOR)); + configs.put(MinionConstants.DOWNLOAD_URL_KEY, downloadURL); + configs.put(MinionConstants.UPLOAD_URL_KEY, _clusterInfoAccessor.getVipUrl() + "/segments"); + configs.put(MinionConstants.ENABLE_REPLACE_SEGMENTS_KEY, "true"); + + for (Map.Entry<String, String> taskConfig : taskConfigs.entrySet()) { + if (taskConfig.getKey().endsWith(MinionConstants.MergeRollupTask.AGGREGATION_TYPE_KEY_SUFFIX)) { + configs.put(taskConfig.getKey(), taskConfig.getValue()); + } + } - configs.put(BatchConfigProperties.OVERWRITE_OUTPUT, - taskConfigs.getOrDefault(BatchConfigProperties.OVERWRITE_OUTPUT, "false")); - configs.put(MergeRollupTask.MERGE_TYPE_KEY, mergeConfigs.get(MergeTask.MERGE_TYPE_KEY)); - configs.put(MergeRollupTask.MERGE_LEVEL_KEY, mergeLevel); - configs.put(MergeTask.PARTITION_BUCKET_TIME_PERIOD_KEY, mergeConfigs.get(MergeTask.BUCKET_TIME_PERIOD_KEY)); - configs.put(MergeTask.ROUND_BUCKET_TIME_PERIOD_KEY, mergeConfigs.get(MergeTask.ROUND_BUCKET_TIME_PERIOD_KEY)); - configs.put(MergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY, - mergeConfigs.get(MergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY)); - - // Segment name conflict happens when the current method "createPinotTaskConfigs" is invoked more than once within - // the same epoch millisecond, which may happen when there are multiple partitions. - // To prevent such name conflict, we include a partitionSeqSuffix to the segment name. - configs.put(MergeRollupTask.SEGMENT_NAME_PREFIX_KEY, - MergeRollupTask.MERGED_SEGMENT_NAME_PREFIX + mergeLevel + DELIMITER_IN_SEGMENT_NAME - + System.currentTimeMillis() + partitionSuffix + DELIMITER_IN_SEGMENT_NAME + i - + DELIMITER_IN_SEGMENT_NAME + TableNameBuilder.extractRawTableName(tableNameWithType)); - pinotTaskConfigs.add(new PinotTaskConfig(MergeRollupTask.TASK_TYPE, configs)); + configs.put(BatchConfigProperties.OVERWRITE_OUTPUT, + taskConfigs.getOrDefault(BatchConfigProperties.OVERWRITE_OUTPUT, "false")); + configs.put(MergeRollupTask.MERGE_TYPE_KEY, mergeConfigs.get(MergeTask.MERGE_TYPE_KEY)); + configs.put(MergeRollupTask.MERGE_LEVEL_KEY, mergeLevel); + configs.put(MergeTask.PARTITION_BUCKET_TIME_PERIOD_KEY, mergeConfigs.get(MergeTask.BUCKET_TIME_PERIOD_KEY)); + configs.put(MergeTask.ROUND_BUCKET_TIME_PERIOD_KEY, mergeConfigs.get(MergeTask.ROUND_BUCKET_TIME_PERIOD_KEY)); + configs.put(MergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY, + mergeConfigs.get(MergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY)); + + // Segment name conflict happens when the current method "createPinotTaskConfigs" is invoked more than once + // within the same epoch millisecond, which may happen when there are multiple partitions. + // To prevent such name conflict, we include a partitionSeqSuffix to the segment name. + configs.put(MergeRollupTask.SEGMENT_NAME_PREFIX_KEY, + MergeRollupTask.MERGED_SEGMENT_NAME_PREFIX + mergeLevel + DELIMITER_IN_SEGMENT_NAME + + System.currentTimeMillis() + partitionSuffix + DELIMITER_IN_SEGMENT_NAME + i + + DELIMITER_IN_SEGMENT_NAME + TableNameBuilder.extractRawTableName(tableNameWithType)); + pinotTaskConfigs.add(new PinotTaskConfig(MergeRollupTask.TASK_TYPE, configs)); + } } return pinotTaskConfigs; diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/segmentgroupmananger/DefaultMergeRollupTaskSegmentGroupManager.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/segmentgroupmananger/DefaultMergeRollupTaskSegmentGroupManager.java new file mode 100644 index 0000000000..9aca72ac35 --- /dev/null +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/segmentgroupmananger/DefaultMergeRollupTaskSegmentGroupManager.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.minion.tasks.mergerollup.segmentgroupmananger; + +import java.util.Collections; +import java.util.List; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor; +import org.apache.pinot.spi.config.table.TableConfig; + + +public class DefaultMergeRollupTaskSegmentGroupManager implements MergeRollupTaskSegmentGroupManager { + + @Override + public List<List<SegmentZKMetadata>> getSegmentGroups(TableConfig tableConfig, + ClusterInfoAccessor clusterInfoAccessor, List<SegmentZKMetadata> segments) { + return Collections.singletonList(segments); + } +} diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/segmentgroupmananger/MergeRollupTaskSegmentGroupManager.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/segmentgroupmananger/MergeRollupTaskSegmentGroupManager.java new file mode 100644 index 0000000000..1bf74140ce --- /dev/null +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/segmentgroupmananger/MergeRollupTaskSegmentGroupManager.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.minion.tasks.mergerollup.segmentgroupmananger; + +import java.util.List; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor; +import org.apache.pinot.spi.config.table.TableConfig; + + +/** + * The interface <code>PinotTaskSegmentGroupManager</code> defines the APIs to group segments for task generation. + */ +public interface MergeRollupTaskSegmentGroupManager { + /** + * Returns a list of segment groups which are scheduled in separate tasks + */ + List<List<SegmentZKMetadata>> getSegmentGroups(TableConfig tableConfig, ClusterInfoAccessor clusterInfoAccessor, + List<SegmentZKMetadata> segments); +} diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/segmentgroupmananger/MergeRollupTaskSegmentGroupManagerProvider.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/segmentgroupmananger/MergeRollupTaskSegmentGroupManagerProvider.java new file mode 100644 index 0000000000..ca163f20f2 --- /dev/null +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/segmentgroupmananger/MergeRollupTaskSegmentGroupManagerProvider.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.minion.tasks.mergerollup.segmentgroupmananger; + +import java.util.Map; +import org.apache.pinot.core.common.MinionConstants; +import org.apache.pinot.spi.plugin.PluginManager; + + +/** + * Provider class for {@link MergeRollupTaskSegmentGroupManager} + */ +public abstract class MergeRollupTaskSegmentGroupManagerProvider { + /** + * Constructs the {@link MergeRollupTaskSegmentGroupManager} using MergeRollup task configs + */ + public static MergeRollupTaskSegmentGroupManager create(Map<String, String> taskConfigs) { + String segmentGroupManagerClassName = + taskConfigs.get(MinionConstants.MergeRollupTask.SEGMENT_GROUP_MANAGER_CLASS_NAME_KEY); + if (segmentGroupManagerClassName != null) { + try { + return PluginManager.get().createInstance(segmentGroupManagerClassName); + } catch (Exception e) { + throw new RuntimeException("Fail to create segment group manager", e); + } + } else { + return new DefaultMergeRollupTaskSegmentGroupManager(); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org