xiangfu0 commented on code in PR #8465:
URL: https://github.com/apache/pinot/pull/8465#discussion_r843354424


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java:
##########
@@ -177,6 +180,78 @@ public String getTaskType() {
     return pinotTaskConfigs;
   }
 
+  @Override
+  public List<PinotTaskConfig> generateAdhocTasks(TableConfig tableConfig, 
Map<String, String> taskConfigs)
+      throws Exception {
+    String taskUUID = UUID.randomUUID().toString();
+    // Only generate tasks for OFFLINE tables
+    String offlineTableName = tableConfig.getTableName();
+    if (tableConfig.getTableType() != TableType.OFFLINE) {
+      LOGGER.warn("Skip generating SegmentGenerationAndPushTask for 
non-OFFLINE table: {}", offlineTableName);
+      return ImmutableList.of();
+    }
+
+    TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
+    if (tableTaskConfig != null) {
+      Map<String, String> taskConfigsFromTable =
+          
tableTaskConfig.getConfigsForTaskType(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE);
+      if (taskConfigsFromTable != null) {
+        taskConfigsFromTable.entrySet().forEach(entry -> 
taskConfigs.putIfAbsent(entry.getKey(), entry.getValue()));

Review Comment:
   my thought is to take the task config from the table as the base, then apply 
the changes from ad-hoc task config.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -128,6 +132,53 @@ public PinotTaskManager(PinotHelixTaskResourceManager 
helixTaskResourceManager,
     }
   }
 
+  public String createTask(Map<String, String> taskConfigs)
+      throws Exception {
+    String taskName = taskConfigs.get("taskName");
+    if (taskName == null) {
+      throw new IllegalArgumentException("Missing field 'taskName'");
+    }
+    String taskType = taskConfigs.get("taskType");
+    if (taskType == null) {
+      throw new IllegalArgumentException("Missing field 'taskType'");
+    }
+    String tableName = taskConfigs.get("tableName");
+    if (tableName == null) {
+      throw new IllegalArgumentException("Missing field 'tableName'");
+    }
+    String parentTaskName = 
_helixTaskResourceManager.getParentTaskName(taskType, taskName);
+    TaskState taskState = 
_helixTaskResourceManager.getTaskState(parentTaskName);
+    if (taskState != null) {
+      throw new RuntimeException(
+          "Task [" + taskName + "] of type [" + taskType + "] is already 
created. Current state is " + taskState);
+    }
+    if (TableNameBuilder.isRealtimeTableResource(tableName)) {
+      throw new UnsupportedOperationException(
+          "Realtime table: " + tableName + " is not supported for task type - 
" + taskType);
+    }
+    String tableNameWithType =
+        
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(TableNameBuilder.extractRawTableName(tableName));
+    TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+    PinotTaskGenerator taskGenerator = 
_taskGeneratorRegistry.getTaskGenerator(taskType);
+    // Generate each type of tasks
+    if (taskGenerator != null) {
+      _helixTaskResourceManager.ensureTaskQueueExists(taskType);
+      addTaskTypeMetricsUpdaterIfNeeded(taskType);
+      LOGGER.info("Trying to create a task type: {}", 
taskGenerator.getTaskType());
+      List<PinotTaskConfig> pinotTaskConfigs = 
taskGenerator.generateAdhocTasks(tableConfig, taskConfigs);
+      if (pinotTaskConfigs.isEmpty()) {
+        throw new NotFoundException("No task config has been generated");
+      }
+      LOGGER.info("Submitting ad-hoc task for task type: {} with task configs: 
{}", taskType, pinotTaskConfigs);
+      _controllerMetrics.addMeteredTableValue(taskType, 
ControllerMeter.NUMBER_TASKS_SUBMITTED, 1);
+      return _helixTaskResourceManager.submitTask(parentTaskName, 
pinotTaskConfigs,
+          CommonConstants.Helix.UNTAGGED_MINION_INSTANCE, 
taskGenerator.getTaskTimeoutMs(),
+          taskGenerator.getNumConcurrentTasksPerInstance());
+    }
+    throw new UnsupportedOperationException(

Review Comment:
   will do



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to