snleee commented on a change in pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#discussion_r641760045



##########
File path: 
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutor.java
##########
@@ -40,40 +53,133 @@
  * TODO:
  *   1. Add the support for roll-up
  *   2. Add the support for time split to provide backfill support for merged 
segments
- *   3. Change the way to decide the number of output segments (explicit 
numPartition config -> maxNumRowsPerSegment)
+ *   3. Add merge/rollup name prefixes for generated segments
+ *   4. Add the support for realtime table
  */
 public class MergeRollupTaskExecutor extends 
BaseMultipleSegmentsConversionExecutor {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(MergeRollupTaskExecutor.class);
+  private static final String INPUT_SEGMENTS_DIR = "input_segments";
+  private static final String OUTPUT_SEGMENTS_DIR = "output_segments";
 
   @Override
   protected List<SegmentConversionResult> convert(PinotTaskConfig 
pinotTaskConfig, List<File> originalIndexDirs,
       File workingDir)
       throws Exception {
+    String taskType = pinotTaskConfig.getTaskType();
     Map<String, String> configs = pinotTaskConfig.getConfigs();
+    LOGGER.info("Starting task: {} with configs: {}", taskType, configs);
+    long startMillis = System.currentTimeMillis();
+
     String mergeTypeString = 
configs.get(MinionConstants.MergeRollupTask.MERGE_TYPE_KEY);
     // TODO: add the support for rollup
     Preconditions.checkNotNull(mergeTypeString, "MergeType cannot be null");
 
-    MergeType mergeType = MergeType.fromString(mergeTypeString);
-    Preconditions.checkState(mergeType == MergeType.CONCATENATE, "Only 
'CONCATENATE' mode is currently supported.");
-
-    String mergedSegmentName = 
configs.get(MinionConstants.MergeRollupTask.MERGED_SEGMENT_NAME_KEY);
     String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
-
     TableConfig tableConfig = getTableConfig(tableNameWithType);
+    Schema schema = getSchema(tableNameWithType);
+    Set<String> schemaColumns = schema.getPhysicalColumnNames();
+
+    Map<String, ValueAggregatorFactory.ValueAggregatorType> aggregatorConfigs =
+        MergeRollupTaskUtils.getRollupAggregationTypeMap(configs);
+    String numRecordsPerSegment = 
configs.get(MinionConstants.MergeRollupTask.MAX_NUM_RECORDS_PER_SEGMENT);
+
+    SegmentProcessorConfig.Builder segmentProcessorConfigBuilder =
+        new 
SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema);
+
+    // Partition config from tableConfig
+    if (tableConfig.getIndexingConfig().getSegmentPartitionConfig() != null) {
+      Map<String, ColumnPartitionConfig> columnPartitionMap =
+          
tableConfig.getIndexingConfig().getSegmentPartitionConfig().getColumnPartitionMap();
+      PartitionerConfig partitionerConfig = 
getPartitionerConfig(columnPartitionMap, tableNameWithType, schemaColumns);
+      
segmentProcessorConfigBuilder.setPartitionerConfigs(Lists.newArrayList(partitionerConfig));
+    }
 
-    MergeRollupSegmentConverter rollupSegmentConverter =
-        new 
MergeRollupSegmentConverter.Builder().setMergeType(mergeType).setTableName(tableNameWithType)
-            
.setSegmentName(mergedSegmentName).setInputIndexDirs(originalIndexDirs).setWorkingDir(workingDir)
-            .setTableConfig(tableConfig).build();
+    // Aggregations using configured Collector
+    List<String> sortedColumns = 
tableConfig.getIndexingConfig().getSortedColumn();
+    CollectorConfig collectorConfig =
+        getCollectorConfig(mergeTypeString, aggregatorConfigs, schemaColumns, 
sortedColumns);
+    Preconditions.checkState(collectorConfig.getCollectorType() == 
CollectorFactory.CollectorType.CONCAT,
+        "Only 'CONCAT' mode is currently supported.");
+    segmentProcessorConfigBuilder.setCollectorConfig(collectorConfig);
 
-    List<File> resultFiles = rollupSegmentConverter.convert();
+    // Segment config
+    if (numRecordsPerSegment != null) {
+      SegmentConfig segmentConfig = getSegmentConfig(numRecordsPerSegment);
+      segmentProcessorConfigBuilder.setSegmentConfig(segmentConfig);
+    }
+
+    SegmentProcessorConfig segmentProcessorConfig = 
segmentProcessorConfigBuilder.build();
+
+    File inputSegmentsDir = new File(workingDir, INPUT_SEGMENTS_DIR);
+    Preconditions.checkState(inputSegmentsDir.mkdirs(), "Failed to create 
input directory: %s for task: %s",
+        inputSegmentsDir.getAbsolutePath(), taskType);
+    for (File indexDir : originalIndexDirs) {
+      FileUtils.copyDirectoryToDirectory(indexDir, inputSegmentsDir);
+    }
+    File outputSegmentsDir = new File(workingDir, OUTPUT_SEGMENTS_DIR);
+    Preconditions.checkState(outputSegmentsDir.mkdirs(), "Failed to create 
output directory: %s for task: %s",
+        outputSegmentsDir.getAbsolutePath(), taskType);
+
+    SegmentProcessorFramework segmentProcessorFramework =
+        new SegmentProcessorFramework(inputSegmentsDir, 
segmentProcessorConfig, outputSegmentsDir);
+    try {
+      segmentProcessorFramework.processSegments();
+    } finally {
+      segmentProcessorFramework.cleanup();
+    }
+
+    long endMillis = System.currentTimeMillis();
+    LOGGER.info("Finished task: {} with configs: {}. Total time: {}ms", 
taskType, configs, (endMillis - startMillis));
     List<SegmentConversionResult> results = new ArrayList<>();
-    for (File file : resultFiles) {
+    for (File file : outputSegmentsDir.listFiles()) {
       String outputSegmentName = file.getName();
       results.add(new 
SegmentConversionResult.Builder().setFile(file).setSegmentName(outputSegmentName)
           .setTableNameWithType(tableNameWithType).build());
     }
     return results;
   }
+
+  @Override
+  protected SegmentZKMetadataCustomMapModifier 
getSegmentZKMetadataCustomMapModifier(PinotTaskConfig pinotTaskConfig) {
+    return new 
SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE,
 Collections
+        .singletonMap(MinionConstants.MergeRollupTask.TASK_TYPE + 
MinionConstants.TASK_BUCKET_GRANULARITY_SUFFIX,
+            
pinotTaskConfig.getConfigs().get(MinionConstants.MergeRollupTask.GRANULARITY_KEY)));

Review comment:
       Do we have a guarantee that granularity values are always upper case 
(e.g. `DAILY`, `HOURLY`) ? 
   
   I think that the following input should all work: `hourly, Hourly, 
HOURLY...etc`

##########
File path: 
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutor.java
##########
@@ -40,40 +53,133 @@
  * TODO:
  *   1. Add the support for roll-up
  *   2. Add the support for time split to provide backfill support for merged 
segments
- *   3. Change the way to decide the number of output segments (explicit 
numPartition config -> maxNumRowsPerSegment)
+ *   3. Add merge/rollup name prefixes for generated segments
+ *   4. Add the support for realtime table
  */
 public class MergeRollupTaskExecutor extends 
BaseMultipleSegmentsConversionExecutor {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(MergeRollupTaskExecutor.class);
+  private static final String INPUT_SEGMENTS_DIR = "input_segments";
+  private static final String OUTPUT_SEGMENTS_DIR = "output_segments";
 
   @Override
   protected List<SegmentConversionResult> convert(PinotTaskConfig 
pinotTaskConfig, List<File> originalIndexDirs,
       File workingDir)
       throws Exception {
+    String taskType = pinotTaskConfig.getTaskType();
     Map<String, String> configs = pinotTaskConfig.getConfigs();
+    LOGGER.info("Starting task: {} with configs: {}", taskType, configs);
+    long startMillis = System.currentTimeMillis();
+
     String mergeTypeString = 
configs.get(MinionConstants.MergeRollupTask.MERGE_TYPE_KEY);
     // TODO: add the support for rollup
     Preconditions.checkNotNull(mergeTypeString, "MergeType cannot be null");
 
-    MergeType mergeType = MergeType.fromString(mergeTypeString);
-    Preconditions.checkState(mergeType == MergeType.CONCATENATE, "Only 
'CONCATENATE' mode is currently supported.");
-
-    String mergedSegmentName = 
configs.get(MinionConstants.MergeRollupTask.MERGED_SEGMENT_NAME_KEY);
     String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
-
     TableConfig tableConfig = getTableConfig(tableNameWithType);
+    Schema schema = getSchema(tableNameWithType);
+    Set<String> schemaColumns = schema.getPhysicalColumnNames();
+
+    Map<String, ValueAggregatorFactory.ValueAggregatorType> aggregatorConfigs =
+        MergeRollupTaskUtils.getRollupAggregationTypeMap(configs);
+    String numRecordsPerSegment = 
configs.get(MinionConstants.MergeRollupTask.MAX_NUM_RECORDS_PER_SEGMENT);
+
+    SegmentProcessorConfig.Builder segmentProcessorConfigBuilder =
+        new 
SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema);
+
+    // Partition config from tableConfig
+    if (tableConfig.getIndexingConfig().getSegmentPartitionConfig() != null) {
+      Map<String, ColumnPartitionConfig> columnPartitionMap =
+          
tableConfig.getIndexingConfig().getSegmentPartitionConfig().getColumnPartitionMap();
+      PartitionerConfig partitionerConfig = 
getPartitionerConfig(columnPartitionMap, tableNameWithType, schemaColumns);
+      
segmentProcessorConfigBuilder.setPartitionerConfigs(Lists.newArrayList(partitionerConfig));
+    }
 
-    MergeRollupSegmentConverter rollupSegmentConverter =
-        new 
MergeRollupSegmentConverter.Builder().setMergeType(mergeType).setTableName(tableNameWithType)
-            
.setSegmentName(mergedSegmentName).setInputIndexDirs(originalIndexDirs).setWorkingDir(workingDir)
-            .setTableConfig(tableConfig).build();
+    // Aggregations using configured Collector
+    List<String> sortedColumns = 
tableConfig.getIndexingConfig().getSortedColumn();
+    CollectorConfig collectorConfig =
+        getCollectorConfig(mergeTypeString, aggregatorConfigs, schemaColumns, 
sortedColumns);
+    Preconditions.checkState(collectorConfig.getCollectorType() == 
CollectorFactory.CollectorType.CONCAT,
+        "Only 'CONCAT' mode is currently supported.");
+    segmentProcessorConfigBuilder.setCollectorConfig(collectorConfig);
 
-    List<File> resultFiles = rollupSegmentConverter.convert();
+    // Segment config
+    if (numRecordsPerSegment != null) {

Review comment:
       Let's parse the string to integer here instead of passing string value 
to `getSegmentConfig()`.  Also, we can use `1000000` as the default value.

##########
File path: 
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutor.java
##########
@@ -40,40 +53,133 @@
  * TODO:
  *   1. Add the support for roll-up
  *   2. Add the support for time split to provide backfill support for merged 
segments
- *   3. Change the way to decide the number of output segments (explicit 
numPartition config -> maxNumRowsPerSegment)
+ *   3. Add merge/rollup name prefixes for generated segments
+ *   4. Add the support for realtime table
  */
 public class MergeRollupTaskExecutor extends 
BaseMultipleSegmentsConversionExecutor {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(MergeRollupTaskExecutor.class);
+  private static final String INPUT_SEGMENTS_DIR = "input_segments";
+  private static final String OUTPUT_SEGMENTS_DIR = "output_segments";
 
   @Override
   protected List<SegmentConversionResult> convert(PinotTaskConfig 
pinotTaskConfig, List<File> originalIndexDirs,
       File workingDir)
       throws Exception {
+    String taskType = pinotTaskConfig.getTaskType();
     Map<String, String> configs = pinotTaskConfig.getConfigs();
+    LOGGER.info("Starting task: {} with configs: {}", taskType, configs);
+    long startMillis = System.currentTimeMillis();
+
     String mergeTypeString = 
configs.get(MinionConstants.MergeRollupTask.MERGE_TYPE_KEY);
     // TODO: add the support for rollup
     Preconditions.checkNotNull(mergeTypeString, "MergeType cannot be null");
 
-    MergeType mergeType = MergeType.fromString(mergeTypeString);
-    Preconditions.checkState(mergeType == MergeType.CONCATENATE, "Only 
'CONCATENATE' mode is currently supported.");
-
-    String mergedSegmentName = 
configs.get(MinionConstants.MergeRollupTask.MERGED_SEGMENT_NAME_KEY);
     String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
-
     TableConfig tableConfig = getTableConfig(tableNameWithType);
+    Schema schema = getSchema(tableNameWithType);
+    Set<String> schemaColumns = schema.getPhysicalColumnNames();
+
+    Map<String, ValueAggregatorFactory.ValueAggregatorType> aggregatorConfigs =
+        MergeRollupTaskUtils.getRollupAggregationTypeMap(configs);
+    String numRecordsPerSegment = 
configs.get(MinionConstants.MergeRollupTask.MAX_NUM_RECORDS_PER_SEGMENT);
+
+    SegmentProcessorConfig.Builder segmentProcessorConfigBuilder =
+        new 
SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema);
+
+    // Partition config from tableConfig
+    if (tableConfig.getIndexingConfig().getSegmentPartitionConfig() != null) {
+      Map<String, ColumnPartitionConfig> columnPartitionMap =
+          
tableConfig.getIndexingConfig().getSegmentPartitionConfig().getColumnPartitionMap();
+      PartitionerConfig partitionerConfig = 
getPartitionerConfig(columnPartitionMap, tableNameWithType, schemaColumns);
+      
segmentProcessorConfigBuilder.setPartitionerConfigs(Lists.newArrayList(partitionerConfig));
+    }
 
-    MergeRollupSegmentConverter rollupSegmentConverter =
-        new 
MergeRollupSegmentConverter.Builder().setMergeType(mergeType).setTableName(tableNameWithType)
-            
.setSegmentName(mergedSegmentName).setInputIndexDirs(originalIndexDirs).setWorkingDir(workingDir)
-            .setTableConfig(tableConfig).build();
+    // Aggregations using configured Collector
+    List<String> sortedColumns = 
tableConfig.getIndexingConfig().getSortedColumn();
+    CollectorConfig collectorConfig =
+        getCollectorConfig(mergeTypeString, aggregatorConfigs, schemaColumns, 
sortedColumns);
+    Preconditions.checkState(collectorConfig.getCollectorType() == 
CollectorFactory.CollectorType.CONCAT,
+        "Only 'CONCAT' mode is currently supported.");
+    segmentProcessorConfigBuilder.setCollectorConfig(collectorConfig);
 
-    List<File> resultFiles = rollupSegmentConverter.convert();
+    // Segment config
+    if (numRecordsPerSegment != null) {
+      SegmentConfig segmentConfig = getSegmentConfig(numRecordsPerSegment);
+      segmentProcessorConfigBuilder.setSegmentConfig(segmentConfig);

Review comment:
       What happens if we don't set `SegmentConfig` ?

##########
File path: 
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutor.java
##########
@@ -40,40 +53,133 @@
  * TODO:
  *   1. Add the support for roll-up
  *   2. Add the support for time split to provide backfill support for merged 
segments
- *   3. Change the way to decide the number of output segments (explicit 
numPartition config -> maxNumRowsPerSegment)
+ *   3. Add merge/rollup name prefixes for generated segments
+ *   4. Add the support for realtime table
  */
 public class MergeRollupTaskExecutor extends 
BaseMultipleSegmentsConversionExecutor {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(MergeRollupTaskExecutor.class);
+  private static final String INPUT_SEGMENTS_DIR = "input_segments";
+  private static final String OUTPUT_SEGMENTS_DIR = "output_segments";
 
   @Override
   protected List<SegmentConversionResult> convert(PinotTaskConfig 
pinotTaskConfig, List<File> originalIndexDirs,

Review comment:
       Instead of directly using `SegmentProcessingFramework`, can we wrap this 
to a separate class something similar to `MergeRollupSegmentConverter`?
   
   In that case, we can reuse `MergeRollupSegmentConverter` in the 
`SegmentMergeCommand`.
   

##########
File path: 
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutor.java
##########
@@ -68,7 +68,7 @@ protected SegmentConversionResult convert(PinotTaskConfig 
pinotTaskConfig, File
 
   @Override
   protected SegmentZKMetadataCustomMapModifier 
getSegmentZKMetadataCustomMapModifier() {
-    return new 
SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.REPLACE,
 Collections
+    return new 
SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE,
 Collections

Review comment:
       Is there an existing test for segment upload API? Can you check if 
there's a test for `SegmentZKMetadataCustomMapModifier`? If so, we can add the 
test in regarding to this change (`REPLACE -> UPDATE`)

##########
File path: 
pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
##########
@@ -296,6 +309,21 @@ public static URI getUploadSegmentURI(URI controllerURI)
     return getURI(controllerURI.getScheme(), controllerURI.getHost(), 
controllerURI.getPort(), SEGMENT_PATH);
   }
 
+  public static URI getStartReplaceSegmentsURI(URI controllerURI, String 
rawTableName, String tableType)
+      throws URISyntaxException {
+    return getURI(controllerURI.getScheme(), controllerURI.getHost(), 
controllerURI.getPort(),

Review comment:
       I remember that using `getUri()` somehow didn't work so I ended up using 
`new URI`. Can you double check on this 
https://github.com/apache/incubator-pinot/pull/6094?
   
   ```
   http://localhost:18998/segments/mytable/startReplaceSegments%3Ftype=OFFLINE 
<- getURI(),
   http://localhost:18998/segments/mytable/startReplaceSegments?type=OFFLINE  
<- new URI(..)
   ``` 

##########
File path: 
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutor.java
##########
@@ -40,40 +53,133 @@
  * TODO:
  *   1. Add the support for roll-up
  *   2. Add the support for time split to provide backfill support for merged 
segments
- *   3. Change the way to decide the number of output segments (explicit 
numPartition config -> maxNumRowsPerSegment)
+ *   3. Add merge/rollup name prefixes for generated segments
+ *   4. Add the support for realtime table
  */
 public class MergeRollupTaskExecutor extends 
BaseMultipleSegmentsConversionExecutor {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(MergeRollupTaskExecutor.class);
+  private static final String INPUT_SEGMENTS_DIR = "input_segments";
+  private static final String OUTPUT_SEGMENTS_DIR = "output_segments";
 
   @Override
   protected List<SegmentConversionResult> convert(PinotTaskConfig 
pinotTaskConfig, List<File> originalIndexDirs,
       File workingDir)
       throws Exception {
+    String taskType = pinotTaskConfig.getTaskType();
     Map<String, String> configs = pinotTaskConfig.getConfigs();
+    LOGGER.info("Starting task: {} with configs: {}", taskType, configs);
+    long startMillis = System.currentTimeMillis();
+
     String mergeTypeString = 
configs.get(MinionConstants.MergeRollupTask.MERGE_TYPE_KEY);
     // TODO: add the support for rollup
     Preconditions.checkNotNull(mergeTypeString, "MergeType cannot be null");
 
-    MergeType mergeType = MergeType.fromString(mergeTypeString);
-    Preconditions.checkState(mergeType == MergeType.CONCATENATE, "Only 
'CONCATENATE' mode is currently supported.");
-
-    String mergedSegmentName = 
configs.get(MinionConstants.MergeRollupTask.MERGED_SEGMENT_NAME_KEY);
     String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
-
     TableConfig tableConfig = getTableConfig(tableNameWithType);
+    Schema schema = getSchema(tableNameWithType);
+    Set<String> schemaColumns = schema.getPhysicalColumnNames();
+
+    Map<String, ValueAggregatorFactory.ValueAggregatorType> aggregatorConfigs =
+        MergeRollupTaskUtils.getRollupAggregationTypeMap(configs);
+    String numRecordsPerSegment = 
configs.get(MinionConstants.MergeRollupTask.MAX_NUM_RECORDS_PER_SEGMENT);
+
+    SegmentProcessorConfig.Builder segmentProcessorConfigBuilder =
+        new 
SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema);
+
+    // Partition config from tableConfig
+    if (tableConfig.getIndexingConfig().getSegmentPartitionConfig() != null) {
+      Map<String, ColumnPartitionConfig> columnPartitionMap =
+          
tableConfig.getIndexingConfig().getSegmentPartitionConfig().getColumnPartitionMap();
+      PartitionerConfig partitionerConfig = 
getPartitionerConfig(columnPartitionMap, tableNameWithType, schemaColumns);

Review comment:
       Why do we read the partition config from the table config and set it to 
the partition for the segment processing framework?
   
   We should handle this on the task scheduler. The scheduler should only pick 
the segments from the same partition when the custom partitioning is enabled.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to