jtao15 commented on a change in pull request #6975: URL: https://github.com/apache/incubator-pinot/pull/6975#discussion_r641846397
########## File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutor.java ########## @@ -40,40 +53,133 @@ * TODO: * 1. Add the support for roll-up * 2. Add the support for time split to provide backfill support for merged segments - * 3. Change the way to decide the number of output segments (explicit numPartition config -> maxNumRowsPerSegment) + * 3. Add merge/rollup name prefixes for generated segments + * 4. Add the support for realtime table */ public class MergeRollupTaskExecutor extends BaseMultipleSegmentsConversionExecutor { private static final Logger LOGGER = LoggerFactory.getLogger(MergeRollupTaskExecutor.class); + private static final String INPUT_SEGMENTS_DIR = "input_segments"; + private static final String OUTPUT_SEGMENTS_DIR = "output_segments"; @Override protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig, List<File> originalIndexDirs, File workingDir) throws Exception { + String taskType = pinotTaskConfig.getTaskType(); Map<String, String> configs = pinotTaskConfig.getConfigs(); + LOGGER.info("Starting task: {} with configs: {}", taskType, configs); + long startMillis = System.currentTimeMillis(); + String mergeTypeString = configs.get(MinionConstants.MergeRollupTask.MERGE_TYPE_KEY); // TODO: add the support for rollup Preconditions.checkNotNull(mergeTypeString, "MergeType cannot be null"); - MergeType mergeType = MergeType.fromString(mergeTypeString); - Preconditions.checkState(mergeType == MergeType.CONCATENATE, "Only 'CONCATENATE' mode is currently supported."); - - String mergedSegmentName = configs.get(MinionConstants.MergeRollupTask.MERGED_SEGMENT_NAME_KEY); String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY); - TableConfig tableConfig = getTableConfig(tableNameWithType); + Schema schema = getSchema(tableNameWithType); + Set<String> schemaColumns = schema.getPhysicalColumnNames(); + + Map<String, ValueAggregatorFactory.ValueAggregatorType> aggregatorConfigs = + MergeRollupTaskUtils.getRollupAggregationTypeMap(configs); + String numRecordsPerSegment = configs.get(MinionConstants.MergeRollupTask.MAX_NUM_RECORDS_PER_SEGMENT); + + SegmentProcessorConfig.Builder segmentProcessorConfigBuilder = + new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema); + + // Partition config from tableConfig + if (tableConfig.getIndexingConfig().getSegmentPartitionConfig() != null) { + Map<String, ColumnPartitionConfig> columnPartitionMap = + tableConfig.getIndexingConfig().getSegmentPartitionConfig().getColumnPartitionMap(); + PartitionerConfig partitionerConfig = getPartitionerConfig(columnPartitionMap, tableNameWithType, schemaColumns); Review comment: You are right, will remove this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org