klsince commented on code in PR #8465:
URL: https://github.com/apache/pinot/pull/8465#discussion_r843229599


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -128,6 +132,53 @@ public PinotTaskManager(PinotHelixTaskResourceManager 
helixTaskResourceManager,
     }
   }
 
+  public String createTask(Map<String, String> taskConfigs)
+      throws Exception {
+    String taskName = taskConfigs.get("taskName");
+    if (taskName == null) {
+      throw new IllegalArgumentException("Missing field 'taskName'");
+    }
+    String taskType = taskConfigs.get("taskType");
+    if (taskType == null) {
+      throw new IllegalArgumentException("Missing field 'taskType'");
+    }
+    String tableName = taskConfigs.get("tableName");
+    if (tableName == null) {
+      throw new IllegalArgumentException("Missing field 'tableName'");
+    }
+    String parentTaskName = 
_helixTaskResourceManager.getParentTaskName(taskType, taskName);
+    TaskState taskState = 
_helixTaskResourceManager.getTaskState(parentTaskName);
+    if (taskState != null) {
+      throw new RuntimeException(
+          "Task [" + taskName + "] of type [" + taskType + "] is already 
created. Current state is " + taskState);
+    }
+    if (TableNameBuilder.isRealtimeTableResource(tableName)) {
+      throw new UnsupportedOperationException(
+          "Realtime table: " + tableName + " is not supported for task type - 
" + taskType);
+    }
+    String tableNameWithType =
+        
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(TableNameBuilder.extractRawTableName(tableName));
+    TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+    PinotTaskGenerator taskGenerator = 
_taskGeneratorRegistry.getTaskGenerator(taskType);
+    // Generate each type of tasks
+    if (taskGenerator != null) {
+      _helixTaskResourceManager.ensureTaskQueueExists(taskType);
+      addTaskTypeMetricsUpdaterIfNeeded(taskType);
+      LOGGER.info("Trying to create a task type: {}", 
taskGenerator.getTaskType());
+      List<PinotTaskConfig> pinotTaskConfigs = 
taskGenerator.generateAdhocTasks(tableConfig, taskConfigs);
+      if (pinotTaskConfigs.isEmpty()) {
+        throw new NotFoundException("No task config has been generated");
+      }
+      LOGGER.info("Submitting ad-hoc task for task type: {} with task configs: 
{}", taskType, pinotTaskConfigs);
+      _controllerMetrics.addMeteredTableValue(taskType, 
ControllerMeter.NUMBER_TASKS_SUBMITTED, 1);

Review Comment:
   use a specific meter for submitting ad-hoc tasks? 



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java:
##########
@@ -177,6 +180,78 @@ public String getTaskType() {
     return pinotTaskConfigs;
   }
 
+  @Override
+  public List<PinotTaskConfig> generateAdhocTasks(TableConfig tableConfig, 
Map<String, String> taskConfigs)
+      throws Exception {
+    String taskUUID = UUID.randomUUID().toString();
+    // Only generate tasks for OFFLINE tables
+    String offlineTableName = tableConfig.getTableName();
+    if (tableConfig.getTableType() != TableType.OFFLINE) {
+      LOGGER.warn("Skip generating SegmentGenerationAndPushTask for 
non-OFFLINE table: {}", offlineTableName);
+      return ImmutableList.of();
+    }
+
+    TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
+    if (tableTaskConfig != null) {
+      Map<String, String> taskConfigsFromTable =
+          
tableTaskConfig.getConfigsForTaskType(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE);
+      if (taskConfigsFromTable != null) {
+        taskConfigsFromTable.entrySet().forEach(entry -> 
taskConfigs.putIfAbsent(entry.getKey(), entry.getValue()));
+      }
+    }
+    Map<String, String> batchConfigMap = new HashMap<>(taskConfigs);
+    int tableNumTasks = 0;
+    try {
+      URI inputDirURI =
+          
SegmentGenerationUtils.getDirectoryURI(batchConfigMap.get(BatchConfigProperties.INPUT_DIR_URI));
+      List<URI> inputFileURIs = getInputFilesFromDirectory(batchConfigMap, 
inputDirURI, Collections.emptySet());
+      if (inputFileURIs.isEmpty()) {
+        LOGGER.warn("Skip generating SegmentGenerationAndPushTask, no input 
files found : {}", inputDirURI);
+        return ImmutableList.of();
+      }
+      if (!batchConfigMap.containsKey(BatchConfigProperties.INPUT_FORMAT)) {
+        batchConfigMap.put(BatchConfigProperties.INPUT_FORMAT,
+            extractFormatFromFileSuffix(inputFileURIs.get(0).getPath()));
+      }
+      updateRecordReaderConfigs(batchConfigMap);
+
+      List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+      LOGGER.info("Final input files for task config generation: {}", 
inputFileURIs);
+      for (URI inputFileURI : inputFileURIs) {
+        Map<String, String> singleFileGenerationTaskConfig =
+            getSingleFileGenerationTaskConfig(offlineTableName, tableNumTasks, 
batchConfigMap, inputFileURI,
+                generateFixedSegmentName(offlineTableName, taskUUID, 
tableNumTasks));
+        pinotTaskConfigs.add(new 
PinotTaskConfig(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE,
+            singleFileGenerationTaskConfig));
+        tableNumTasks++;
+      }
+      if (!batchConfigMap.containsKey(BatchConfigProperties.INPUT_FORMAT)) {
+        batchConfigMap.put(BatchConfigProperties.INPUT_FORMAT,
+            extractFormatFromFileSuffix(inputFileURIs.get(0).getPath()));
+        updateRecordReaderConfigs(batchConfigMap);
+      }
+      return pinotTaskConfigs;
+    } catch (Exception e) {
+      LOGGER.error("Unable to generate the SegmentGenerationAndPush task. [ 
table configs: {}, task configs: {} ]",
+          tableConfig, taskConfigs, e);
+      throw e;
+    }
+  }
+
+  private String generateFixedSegmentName(String offlineTableName, String 
taskUUID, int tableNumTasks) {
+    return String.format("%s_%s_%d", offlineTableName, taskUUID, 
tableNumTasks);
+  }
+
+  private String extractFormatFromFileSuffix(String path) {
+    String fileExtension = path.substring(path.lastIndexOf(".") + 1);
+    switch (fileExtension) {
+      case "":
+        throw new UnsupportedOperationException("No file extension found");

Review Comment:
   nit: use `if(StringUtils.isEmpty())`for simplicity? 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -128,6 +132,53 @@ public PinotTaskManager(PinotHelixTaskResourceManager 
helixTaskResourceManager,
     }
   }
 
+  public String createTask(Map<String, String> taskConfigs)
+      throws Exception {
+    String taskName = taskConfigs.get("taskName");
+    if (taskName == null) {
+      throw new IllegalArgumentException("Missing field 'taskName'");
+    }
+    String taskType = taskConfigs.get("taskType");
+    if (taskType == null) {
+      throw new IllegalArgumentException("Missing field 'taskType'");
+    }
+    String tableName = taskConfigs.get("tableName");
+    if (tableName == null) {
+      throw new IllegalArgumentException("Missing field 'tableName'");
+    }
+    String parentTaskName = 
_helixTaskResourceManager.getParentTaskName(taskType, taskName);
+    TaskState taskState = 
_helixTaskResourceManager.getTaskState(parentTaskName);
+    if (taskState != null) {
+      throw new RuntimeException(
+          "Task [" + taskName + "] of type [" + taskType + "] is already 
created. Current state is " + taskState);
+    }
+    if (TableNameBuilder.isRealtimeTableResource(tableName)) {
+      throw new UnsupportedOperationException(
+          "Realtime table: " + tableName + " is not supported for task type - 
" + taskType);
+    }
+    String tableNameWithType =
+        
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(TableNameBuilder.extractRawTableName(tableName));
+    TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+    PinotTaskGenerator taskGenerator = 
_taskGeneratorRegistry.getTaskGenerator(taskType);
+    // Generate each type of tasks
+    if (taskGenerator != null) {
+      _helixTaskResourceManager.ensureTaskQueueExists(taskType);
+      addTaskTypeMetricsUpdaterIfNeeded(taskType);
+      LOGGER.info("Trying to create a task type: {}", 
taskGenerator.getTaskType());

Review Comment:
   nit: "... to create tasks of type:" {}



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -128,6 +132,53 @@ public PinotTaskManager(PinotHelixTaskResourceManager 
helixTaskResourceManager,
     }
   }
 
+  public String createTask(Map<String, String> taskConfigs)
+      throws Exception {
+    String taskName = taskConfigs.get("taskName");
+    if (taskName == null) {
+      throw new IllegalArgumentException("Missing field 'taskName'");
+    }
+    String taskType = taskConfigs.get("taskType");
+    if (taskType == null) {
+      throw new IllegalArgumentException("Missing field 'taskType'");
+    }
+    String tableName = taskConfigs.get("tableName");
+    if (tableName == null) {
+      throw new IllegalArgumentException("Missing field 'tableName'");
+    }
+    String parentTaskName = 
_helixTaskResourceManager.getParentTaskName(taskType, taskName);
+    TaskState taskState = 
_helixTaskResourceManager.getTaskState(parentTaskName);
+    if (taskState != null) {
+      throw new RuntimeException(
+          "Task [" + taskName + "] of type [" + taskType + "] is already 
created. Current state is " + taskState);
+    }
+    if (TableNameBuilder.isRealtimeTableResource(tableName)) {

Review Comment:
   can do this check at the beginning to bail out sooner



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java:
##########
@@ -177,6 +180,78 @@ public String getTaskType() {
     return pinotTaskConfigs;
   }
 
+  @Override
+  public List<PinotTaskConfig> generateAdhocTasks(TableConfig tableConfig, 
Map<String, String> taskConfigs)
+      throws Exception {
+    String taskUUID = UUID.randomUUID().toString();
+    // Only generate tasks for OFFLINE tables
+    String offlineTableName = tableConfig.getTableName();
+    if (tableConfig.getTableType() != TableType.OFFLINE) {
+      LOGGER.warn("Skip generating SegmentGenerationAndPushTask for 
non-OFFLINE table: {}", offlineTableName);
+      return ImmutableList.of();
+    }
+
+    TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
+    if (tableTaskConfig != null) {
+      Map<String, String> taskConfigsFromTable =
+          
tableTaskConfig.getConfigsForTaskType(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE);
+      if (taskConfigsFromTable != null) {
+        taskConfigsFromTable.entrySet().forEach(entry -> 
taskConfigs.putIfAbsent(entry.getKey(), entry.getValue()));

Review Comment:
   should user-provided task configs overwrite those from TableConfig instead? 
but anyway, perhaps leave a comment about merging the configs.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -128,6 +132,53 @@ public PinotTaskManager(PinotHelixTaskResourceManager 
helixTaskResourceManager,
     }
   }
 
+  public String createTask(Map<String, String> taskConfigs)
+      throws Exception {
+    String taskName = taskConfigs.get("taskName");
+    if (taskName == null) {
+      throw new IllegalArgumentException("Missing field 'taskName'");
+    }
+    String taskType = taskConfigs.get("taskType");
+    if (taskType == null) {
+      throw new IllegalArgumentException("Missing field 'taskType'");
+    }
+    String tableName = taskConfigs.get("tableName");
+    if (tableName == null) {
+      throw new IllegalArgumentException("Missing field 'tableName'");
+    }
+    String parentTaskName = 
_helixTaskResourceManager.getParentTaskName(taskType, taskName);
+    TaskState taskState = 
_helixTaskResourceManager.getTaskState(parentTaskName);
+    if (taskState != null) {
+      throw new RuntimeException(
+          "Task [" + taskName + "] of type [" + taskType + "] is already 
created. Current state is " + taskState);
+    }
+    if (TableNameBuilder.isRealtimeTableResource(tableName)) {
+      throw new UnsupportedOperationException(
+          "Realtime table: " + tableName + " is not supported for task type - 
" + taskType);
+    }
+    String tableNameWithType =
+        
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(TableNameBuilder.extractRawTableName(tableName));
+    TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+    PinotTaskGenerator taskGenerator = 
_taskGeneratorRegistry.getTaskGenerator(taskType);
+    // Generate each type of tasks
+    if (taskGenerator != null) {
+      _helixTaskResourceManager.ensureTaskQueueExists(taskType);
+      addTaskTypeMetricsUpdaterIfNeeded(taskType);
+      LOGGER.info("Trying to create a task type: {}", 
taskGenerator.getTaskType());
+      List<PinotTaskConfig> pinotTaskConfigs = 
taskGenerator.generateAdhocTasks(tableConfig, taskConfigs);
+      if (pinotTaskConfigs.isEmpty()) {
+        throw new NotFoundException("No task config has been generated");
+      }
+      LOGGER.info("Submitting ad-hoc task for task type: {} with task configs: 
{}", taskType, pinotTaskConfigs);
+      _controllerMetrics.addMeteredTableValue(taskType, 
ControllerMeter.NUMBER_TASKS_SUBMITTED, 1);
+      return _helixTaskResourceManager.submitTask(parentTaskName, 
pinotTaskConfigs,
+          CommonConstants.Helix.UNTAGGED_MINION_INSTANCE, 
taskGenerator.getTaskTimeoutMs(),
+          taskGenerator.getNumConcurrentTasksPerInstance());
+    }
+    throw new UnsupportedOperationException(

Review Comment:
   nit: to save some indents
   ```
   if (taskGenerator == null) {
     throw new ...
   } 
   ... the logic to submit adhoc tasks ...
   ```



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java:
##########
@@ -177,6 +180,78 @@ public String getTaskType() {
     return pinotTaskConfigs;
   }
 
+  @Override
+  public List<PinotTaskConfig> generateAdhocTasks(TableConfig tableConfig, 
Map<String, String> taskConfigs)
+      throws Exception {
+    String taskUUID = UUID.randomUUID().toString();
+    // Only generate tasks for OFFLINE tables
+    String offlineTableName = tableConfig.getTableName();
+    if (tableConfig.getTableType() != TableType.OFFLINE) {
+      LOGGER.warn("Skip generating SegmentGenerationAndPushTask for 
non-OFFLINE table: {}", offlineTableName);
+      return ImmutableList.of();
+    }
+
+    TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
+    if (tableTaskConfig != null) {
+      Map<String, String> taskConfigsFromTable =
+          
tableTaskConfig.getConfigsForTaskType(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE);
+      if (taskConfigsFromTable != null) {
+        taskConfigsFromTable.entrySet().forEach(entry -> 
taskConfigs.putIfAbsent(entry.getKey(), entry.getValue()));
+      }
+    }
+    Map<String, String> batchConfigMap = new HashMap<>(taskConfigs);
+    int tableNumTasks = 0;
+    try {
+      URI inputDirURI =
+          
SegmentGenerationUtils.getDirectoryURI(batchConfigMap.get(BatchConfigProperties.INPUT_DIR_URI));
+      List<URI> inputFileURIs = getInputFilesFromDirectory(batchConfigMap, 
inputDirURI, Collections.emptySet());
+      if (inputFileURIs.isEmpty()) {
+        LOGGER.warn("Skip generating SegmentGenerationAndPushTask, no input 
files found : {}", inputDirURI);
+        return ImmutableList.of();
+      }
+      if (!batchConfigMap.containsKey(BatchConfigProperties.INPUT_FORMAT)) {
+        batchConfigMap.put(BatchConfigProperties.INPUT_FORMAT,
+            extractFormatFromFileSuffix(inputFileURIs.get(0).getPath()));
+      }
+      updateRecordReaderConfigs(batchConfigMap);
+
+      List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+      LOGGER.info("Final input files for task config generation: {}", 
inputFileURIs);
+      for (URI inputFileURI : inputFileURIs) {
+        Map<String, String> singleFileGenerationTaskConfig =
+            getSingleFileGenerationTaskConfig(offlineTableName, tableNumTasks, 
batchConfigMap, inputFileURI,
+                generateFixedSegmentName(offlineTableName, taskUUID, 
tableNumTasks));

Review Comment:
   perhaps fallback to `fixed` generator, when no segment name generator is 
configured from task configs, instead of fixing on 'fixed' generator here? 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -128,6 +132,53 @@ public PinotTaskManager(PinotHelixTaskResourceManager 
helixTaskResourceManager,
     }
   }
 
+  public String createTask(Map<String, String> taskConfigs)
+      throws Exception {
+    String taskName = taskConfigs.get("taskName");
+    if (taskName == null) {
+      throw new IllegalArgumentException("Missing field 'taskName'");
+    }
+    String taskType = taskConfigs.get("taskType");
+    if (taskType == null) {
+      throw new IllegalArgumentException("Missing field 'taskType'");
+    }
+    String tableName = taskConfigs.get("tableName");
+    if (tableName == null) {
+      throw new IllegalArgumentException("Missing field 'tableName'");
+    }
+    String parentTaskName = 
_helixTaskResourceManager.getParentTaskName(taskType, taskName);
+    TaskState taskState = 
_helixTaskResourceManager.getTaskState(parentTaskName);
+    if (taskState != null) {
+      throw new RuntimeException(
+          "Task [" + taskName + "] of type [" + taskType + "] is already 
created. Current state is " + taskState);
+    }
+    if (TableNameBuilder.isRealtimeTableResource(tableName)) {
+      throw new UnsupportedOperationException(
+          "Realtime table: " + tableName + " is not supported for task type - 
" + taskType);
+    }
+    String tableNameWithType =
+        
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(TableNameBuilder.extractRawTableName(tableName));
+    TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+    PinotTaskGenerator taskGenerator = 
_taskGeneratorRegistry.getTaskGenerator(taskType);
+    // Generate each type of tasks
+    if (taskGenerator != null) {
+      _helixTaskResourceManager.ensureTaskQueueExists(taskType);
+      addTaskTypeMetricsUpdaterIfNeeded(taskType);
+      LOGGER.info("Trying to create a task type: {}", 
taskGenerator.getTaskType());
+      List<PinotTaskConfig> pinotTaskConfigs = 
taskGenerator.generateAdhocTasks(tableConfig, taskConfigs);
+      if (pinotTaskConfigs.isEmpty()) {
+        throw new NotFoundException("No task config has been generated");

Review Comment:
   how about `return null` like the scheduleTask method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to