mcvsubbu commented on code in PR #8465:
URL: https://github.com/apache/pinot/pull/8465#discussion_r847525990


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -27,8 +27,10 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import javax.annotation.Nullable;
+import javax.ws.rs.NotFoundException;

Review Comment:
   We have a class called TableNotFoundException. Can we use that (for 
consistency)? Also, it may be better not to introduce dependency on javax.ws ?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -128,6 +132,68 @@ public PinotTaskManager(PinotHelixTaskResourceManager 
helixTaskResourceManager,
     }
   }
 
+  public Map<String, String> createTask(String taskType, String tableName, 
@Nullable String taskName,
+      Map<String, String> taskConfigs)
+      throws Exception {
+    if (taskName == null) {
+      taskName = tableName + "_" + UUID.randomUUID();
+      LOGGER.info("Task name is missing, auto-generate one: {}", taskName);
+    }
+    String minionInstanceTag =
+        taskConfigs.getOrDefault("minionInstanceTag", 
CommonConstants.Helix.UNTAGGED_MINION_INSTANCE);
+    String parentTaskName = 
_helixTaskResourceManager.getParentTaskName(taskType, taskName);
+    TaskState taskState = 
_helixTaskResourceManager.getTaskState(parentTaskName);
+    if (taskState != null) {
+      throw new RuntimeException(
+          "Task [" + taskName + "] of type [" + taskType + "] is already 
created. Current state is " + taskState);
+    }
+    List<String> tableNameWithTypes = new ArrayList<>();
+    if (TableNameBuilder.getTableTypeFromTableName(tableName) == null) {
+      String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+      if (_pinotHelixResourceManager.hasOfflineTable(offlineTableName)) {
+        tableNameWithTypes.add(offlineTableName);
+      }
+      String realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+      if (_pinotHelixResourceManager.hasRealtimeTable(realtimeTableName)) {
+        tableNameWithTypes.add(realtimeTableName);
+      }
+    } else {
+      if (_pinotHelixResourceManager.hasTable(tableName)) {
+        tableNameWithTypes.add(tableName);
+      }
+    }
+    if (tableNameWithTypes.isEmpty()) {
+      throw new NotFoundException("'tableName' " + tableName + " is not 
found");

Review Comment:
   4xx?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -128,6 +132,68 @@ public PinotTaskManager(PinotHelixTaskResourceManager 
helixTaskResourceManager,
     }
   }
 
+  public Map<String, String> createTask(String taskType, String tableName, 
@Nullable String taskName,
+      Map<String, String> taskConfigs)
+      throws Exception {
+    if (taskName == null) {
+      taskName = tableName + "_" + UUID.randomUUID();
+      LOGGER.info("Task name is missing, auto-generate one: {}", taskName);
+    }
+    String minionInstanceTag =
+        taskConfigs.getOrDefault("minionInstanceTag", 
CommonConstants.Helix.UNTAGGED_MINION_INSTANCE);
+    String parentTaskName = 
_helixTaskResourceManager.getParentTaskName(taskType, taskName);
+    TaskState taskState = 
_helixTaskResourceManager.getTaskState(parentTaskName);
+    if (taskState != null) {
+      throw new RuntimeException(

Review Comment:
   I think this will result in a 5xx exception right? It should throw a 4xx to 
the caller of `/tasks/execute`
   
   Similar concerns with some of the other exceptions below. 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/PinotTaskGenerator.java:
##########
@@ -45,6 +46,12 @@ public interface PinotTaskGenerator {
    */
   List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs);
 
+  /**
+   * Generates a list of adhoc tasks to schedule based on the given table 
configs and task configs.
+   */
+  List<PinotTaskConfig> generateAdhocTasks(TableConfig tableConfig, 
Map<String, String> taskConfigs)

Review Comment:
   can we just call the method `generateTasks`?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -128,6 +132,68 @@ public PinotTaskManager(PinotHelixTaskResourceManager 
helixTaskResourceManager,
     }
   }
 
+  public Map<String, String> createTask(String taskType, String tableName, 
@Nullable String taskName,
+      Map<String, String> taskConfigs)
+      throws Exception {
+    if (taskName == null) {
+      taskName = tableName + "_" + UUID.randomUUID();
+      LOGGER.info("Task name is missing, auto-generate one: {}", taskName);
+    }
+    String minionInstanceTag =
+        taskConfigs.getOrDefault("minionInstanceTag", 
CommonConstants.Helix.UNTAGGED_MINION_INSTANCE);
+    String parentTaskName = 
_helixTaskResourceManager.getParentTaskName(taskType, taskName);
+    TaskState taskState = 
_helixTaskResourceManager.getTaskState(parentTaskName);
+    if (taskState != null) {
+      throw new RuntimeException(
+          "Task [" + taskName + "] of type [" + taskType + "] is already 
created. Current state is " + taskState);
+    }
+    List<String> tableNameWithTypes = new ArrayList<>();
+    if (TableNameBuilder.getTableTypeFromTableName(tableName) == null) {
+      String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+      if (_pinotHelixResourceManager.hasOfflineTable(offlineTableName)) {
+        tableNameWithTypes.add(offlineTableName);
+      }
+      String realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+      if (_pinotHelixResourceManager.hasRealtimeTable(realtimeTableName)) {
+        tableNameWithTypes.add(realtimeTableName);
+      }
+    } else {
+      if (_pinotHelixResourceManager.hasTable(tableName)) {
+        tableNameWithTypes.add(tableName);
+      }
+    }
+    if (tableNameWithTypes.isEmpty()) {
+      throw new NotFoundException("'tableName' " + tableName + " is not 
found");
+    }
+
+    PinotTaskGenerator taskGenerator = 
_taskGeneratorRegistry.getTaskGenerator(taskType);
+    // Generate each type of tasks
+    if (taskGenerator == null) {
+      throw new UnsupportedOperationException(

Review Comment:
   4xx



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java:
##########
@@ -347,7 +368,11 @@ public synchronized Set<String> getTasksInProgress(String 
taskType) {
    */
   public synchronized TaskState getTaskState(String taskName) {
     String taskType = getTaskType(taskName);
-    return 
_taskDriver.getWorkflowContext(getHelixJobQueueName(taskType)).getJobState(getHelixJobName(taskName));
+    WorkflowContext workflowContext = 
_taskDriver.getWorkflowContext(getHelixJobQueueName(taskType));
+    if (workflowContext == null) {

Review Comment:
   Not sure if you did this because of a bug or just to cover the _ad hoc_ task 
functionality. But if we could return null from here, then we should also look 
to the callers to make sure they handle null. Mark the return value nullable, 
and verify that the callers handle null.
   
   Or, maybe we can return `NOT_STARTED` ?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java:
##########
@@ -611,6 +636,10 @@ private static String getTaskType(String name) {
     return name.split(TASK_NAME_SEPARATOR)[1];
   }
 
+  public String getParentTaskName(String taskType, String taskName) {
+    return TASK_PREFIX + taskType + TASK_NAME_SEPARATOR + taskName;

Review Comment:
   Is there any assumption anywhere in the split code that the `taskName` is a 
timestamp?



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java:
##########
@@ -177,6 +180,76 @@ public List<PinotTaskConfig> 
generateTasks(List<TableConfig> tableConfigs) {
     return pinotTaskConfigs;
   }
 
+  @Override
+  public List<PinotTaskConfig> generateAdhocTasks(TableConfig tableConfig, 
Map<String, String> taskConfigs)
+      throws Exception {
+    String taskUUID = UUID.randomUUID().toString();
+    // Only generate tasks for OFFLINE tables
+    String offlineTableName = tableConfig.getTableName();
+    if (tableConfig.getTableType() != TableType.OFFLINE) {
+      LOGGER.warn("Skip generating SegmentGenerationAndPushTask for 
non-OFFLINE table: {}", offlineTableName);
+      return ImmutableList.of();

Review Comment:
   Should we not throw an exception to return this error to the caller of the 
`/tasks/execute` API?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/BaseTaskGenerator.java:
##########
@@ -68,4 +72,11 @@ public int getNumConcurrentTasksPerInstance() {
     }
     return JobConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
   }
+
+  @Override
+  public List<PinotTaskConfig> generateAdhocTasks(TableConfig tableConfig, 
Map<String, String> taskConfigs)
+      throws Exception {
+    throw new UnsupportedOperationException(

Review Comment:
   4xx?



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java:
##########
@@ -177,6 +180,76 @@ public List<PinotTaskConfig> 
generateTasks(List<TableConfig> tableConfigs) {
     return pinotTaskConfigs;
   }
 
+  @Override
+  public List<PinotTaskConfig> generateAdhocTasks(TableConfig tableConfig, 
Map<String, String> taskConfigs)
+      throws Exception {
+    String taskUUID = UUID.randomUUID().toString();
+    // Only generate tasks for OFFLINE tables
+    String offlineTableName = tableConfig.getTableName();
+    if (tableConfig.getTableType() != TableType.OFFLINE) {
+      LOGGER.warn("Skip generating SegmentGenerationAndPushTask for 
non-OFFLINE table: {}", offlineTableName);
+      return ImmutableList.of();
+    }
+
+    // Override task configs from table with adhoc task configs.
+    Map<String, String> batchConfigMap = new HashMap<>();
+    TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
+    if (tableTaskConfig != null) {
+      batchConfigMap.putAll(
+          
tableTaskConfig.getConfigsForTaskType(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE));
+    }
+    batchConfigMap.putAll(taskConfigs);
+
+    int tableNumTasks = 0;
+    try {
+      URI inputDirURI =
+          
SegmentGenerationUtils.getDirectoryURI(batchConfigMap.get(BatchConfigProperties.INPUT_DIR_URI));
+      List<URI> inputFileURIs = getInputFilesFromDirectory(batchConfigMap, 
inputDirURI, Collections.emptySet());
+      if (inputFileURIs.isEmpty()) {
+        LOGGER.warn("Skip generating SegmentGenerationAndPushTask, no input 
files found : {}", inputDirURI);
+        return ImmutableList.of();

Review Comment:
   should we not propagate this error to the caller of the API?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java:
##########
@@ -176,7 +176,7 @@ public static void copyURIToLocal(Map<String, String> 
batchConfigMap, URI source
     if (!PinotFSFactory.isSchemeSupported(sourceFileURIScheme)) {
       PinotFSFactory.register(sourceFileURIScheme, 
batchConfigMap.get(BatchConfigProperties.INPUT_FS_CLASS),
           
IngestionConfigUtils.getInputFsProps(IngestionConfigUtils.getConfigMapWithPrefix(
-              batchConfigMap, BatchConfigProperties.INPUT_FS_PROP_PREFIX)));
+              batchConfigMap, BatchConfigProperties.INPUT_FS_PROP_PREFIX + 
IngestionConfigUtils.DOT_SEPARATOR)));

Review Comment:
   Isnt it better to keep this in a separate PR ? (that way, when we cut 
releases, it is easy to identify a bug fix vs feature)



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java:
##########
@@ -177,6 +180,76 @@ public List<PinotTaskConfig> 
generateTasks(List<TableConfig> tableConfigs) {
     return pinotTaskConfigs;
   }
 
+  @Override
+  public List<PinotTaskConfig> generateAdhocTasks(TableConfig tableConfig, 
Map<String, String> taskConfigs)
+      throws Exception {
+    String taskUUID = UUID.randomUUID().toString();
+    // Only generate tasks for OFFLINE tables
+    String offlineTableName = tableConfig.getTableName();
+    if (tableConfig.getTableType() != TableType.OFFLINE) {
+      LOGGER.warn("Skip generating SegmentGenerationAndPushTask for 
non-OFFLINE table: {}", offlineTableName);
+      return ImmutableList.of();
+    }
+
+    // Override task configs from table with adhoc task configs.
+    Map<String, String> batchConfigMap = new HashMap<>();
+    TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
+    if (tableTaskConfig != null) {
+      batchConfigMap.putAll(
+          
tableTaskConfig.getConfigsForTaskType(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE));
+    }
+    batchConfigMap.putAll(taskConfigs);
+
+    int tableNumTasks = 0;
+    try {
+      URI inputDirURI =
+          
SegmentGenerationUtils.getDirectoryURI(batchConfigMap.get(BatchConfigProperties.INPUT_DIR_URI));
+      List<URI> inputFileURIs = getInputFilesFromDirectory(batchConfigMap, 
inputDirURI, Collections.emptySet());
+      if (inputFileURIs.isEmpty()) {
+        LOGGER.warn("Skip generating SegmentGenerationAndPushTask, no input 
files found : {}", inputDirURI);
+        return ImmutableList.of();
+      }
+      if (!batchConfigMap.containsKey(BatchConfigProperties.INPUT_FORMAT)) {
+        batchConfigMap.put(BatchConfigProperties.INPUT_FORMAT,
+            extractFormatFromFileSuffix(inputFileURIs.get(0).getPath()));
+      }
+      updateRecordReaderConfigs(batchConfigMap);
+
+      List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+      LOGGER.info("Final input files for task config generation: {}", 
inputFileURIs);

Review Comment:
   We should truncate the list of URIs if the list is too long.



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskExecutor.java:
##########
@@ -305,8 +303,8 @@ protected SegmentGenerationTaskSpec 
generateTaskSpec(Map<String, String> taskCon
     }
     SegmentNameGeneratorSpec segmentNameGeneratorSpec = new 
SegmentNameGeneratorSpec();
     
segmentNameGeneratorSpec.setType(taskConfigs.get(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE));
-    segmentNameGeneratorSpec.setConfigs(IngestionConfigUtils
-        .getConfigMapWithPrefix(taskConfigs, 
BatchConfigProperties.SEGMENT_NAME_GENERATOR_PROP_PREFIX));
+    
segmentNameGeneratorSpec.setConfigs(IngestionConfigUtils.getConfigMapWithPrefix(taskConfigs,
+        BatchConfigProperties.SEGMENT_NAME_GENERATOR_PROP_PREFIX + 
IngestionConfigUtils.DOT_SEPARATOR));

Review Comment:
   same here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to