mcvsubbu commented on code in PR #8465: URL: https://github.com/apache/pinot/pull/8465#discussion_r847525990
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java: ########## @@ -27,8 +27,10 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import javax.annotation.Nullable; +import javax.ws.rs.NotFoundException; Review Comment: We have a class called TableNotFoundException. Can we use that (for consistency)? Also, it may be better not to introduce dependency on javax.ws ? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java: ########## @@ -128,6 +132,68 @@ public PinotTaskManager(PinotHelixTaskResourceManager helixTaskResourceManager, } } + public Map<String, String> createTask(String taskType, String tableName, @Nullable String taskName, + Map<String, String> taskConfigs) + throws Exception { + if (taskName == null) { + taskName = tableName + "_" + UUID.randomUUID(); + LOGGER.info("Task name is missing, auto-generate one: {}", taskName); + } + String minionInstanceTag = + taskConfigs.getOrDefault("minionInstanceTag", CommonConstants.Helix.UNTAGGED_MINION_INSTANCE); + String parentTaskName = _helixTaskResourceManager.getParentTaskName(taskType, taskName); + TaskState taskState = _helixTaskResourceManager.getTaskState(parentTaskName); + if (taskState != null) { + throw new RuntimeException( + "Task [" + taskName + "] of type [" + taskType + "] is already created. Current state is " + taskState); + } + List<String> tableNameWithTypes = new ArrayList<>(); + if (TableNameBuilder.getTableTypeFromTableName(tableName) == null) { + String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName); + if (_pinotHelixResourceManager.hasOfflineTable(offlineTableName)) { + tableNameWithTypes.add(offlineTableName); + } + String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName); + if (_pinotHelixResourceManager.hasRealtimeTable(realtimeTableName)) { + tableNameWithTypes.add(realtimeTableName); + } + } else { + if (_pinotHelixResourceManager.hasTable(tableName)) { + tableNameWithTypes.add(tableName); + } + } + if (tableNameWithTypes.isEmpty()) { + throw new NotFoundException("'tableName' " + tableName + " is not found"); Review Comment: 4xx? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java: ########## @@ -128,6 +132,68 @@ public PinotTaskManager(PinotHelixTaskResourceManager helixTaskResourceManager, } } + public Map<String, String> createTask(String taskType, String tableName, @Nullable String taskName, + Map<String, String> taskConfigs) + throws Exception { + if (taskName == null) { + taskName = tableName + "_" + UUID.randomUUID(); + LOGGER.info("Task name is missing, auto-generate one: {}", taskName); + } + String minionInstanceTag = + taskConfigs.getOrDefault("minionInstanceTag", CommonConstants.Helix.UNTAGGED_MINION_INSTANCE); + String parentTaskName = _helixTaskResourceManager.getParentTaskName(taskType, taskName); + TaskState taskState = _helixTaskResourceManager.getTaskState(parentTaskName); + if (taskState != null) { + throw new RuntimeException( Review Comment: I think this will result in a 5xx exception right? It should throw a 4xx to the caller of `/tasks/execute` Similar concerns with some of the other exceptions below. ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/PinotTaskGenerator.java: ########## @@ -45,6 +46,12 @@ public interface PinotTaskGenerator { */ List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs); + /** + * Generates a list of adhoc tasks to schedule based on the given table configs and task configs. + */ + List<PinotTaskConfig> generateAdhocTasks(TableConfig tableConfig, Map<String, String> taskConfigs) Review Comment: can we just call the method `generateTasks`? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java: ########## @@ -128,6 +132,68 @@ public PinotTaskManager(PinotHelixTaskResourceManager helixTaskResourceManager, } } + public Map<String, String> createTask(String taskType, String tableName, @Nullable String taskName, + Map<String, String> taskConfigs) + throws Exception { + if (taskName == null) { + taskName = tableName + "_" + UUID.randomUUID(); + LOGGER.info("Task name is missing, auto-generate one: {}", taskName); + } + String minionInstanceTag = + taskConfigs.getOrDefault("minionInstanceTag", CommonConstants.Helix.UNTAGGED_MINION_INSTANCE); + String parentTaskName = _helixTaskResourceManager.getParentTaskName(taskType, taskName); + TaskState taskState = _helixTaskResourceManager.getTaskState(parentTaskName); + if (taskState != null) { + throw new RuntimeException( + "Task [" + taskName + "] of type [" + taskType + "] is already created. Current state is " + taskState); + } + List<String> tableNameWithTypes = new ArrayList<>(); + if (TableNameBuilder.getTableTypeFromTableName(tableName) == null) { + String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName); + if (_pinotHelixResourceManager.hasOfflineTable(offlineTableName)) { + tableNameWithTypes.add(offlineTableName); + } + String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName); + if (_pinotHelixResourceManager.hasRealtimeTable(realtimeTableName)) { + tableNameWithTypes.add(realtimeTableName); + } + } else { + if (_pinotHelixResourceManager.hasTable(tableName)) { + tableNameWithTypes.add(tableName); + } + } + if (tableNameWithTypes.isEmpty()) { + throw new NotFoundException("'tableName' " + tableName + " is not found"); + } + + PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType); + // Generate each type of tasks + if (taskGenerator == null) { + throw new UnsupportedOperationException( Review Comment: 4xx ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java: ########## @@ -347,7 +368,11 @@ public synchronized Set<String> getTasksInProgress(String taskType) { */ public synchronized TaskState getTaskState(String taskName) { String taskType = getTaskType(taskName); - return _taskDriver.getWorkflowContext(getHelixJobQueueName(taskType)).getJobState(getHelixJobName(taskName)); + WorkflowContext workflowContext = _taskDriver.getWorkflowContext(getHelixJobQueueName(taskType)); + if (workflowContext == null) { Review Comment: Not sure if you did this because of a bug or just to cover the _ad hoc_ task functionality. But if we could return null from here, then we should also look to the callers to make sure they handle null. Mark the return value nullable, and verify that the callers handle null. Or, maybe we can return `NOT_STARTED` ? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java: ########## @@ -611,6 +636,10 @@ private static String getTaskType(String name) { return name.split(TASK_NAME_SEPARATOR)[1]; } + public String getParentTaskName(String taskType, String taskName) { + return TASK_PREFIX + taskType + TASK_NAME_SEPARATOR + taskName; Review Comment: Is there any assumption anywhere in the split code that the `taskName` is a timestamp? ########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java: ########## @@ -177,6 +180,76 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) { return pinotTaskConfigs; } + @Override + public List<PinotTaskConfig> generateAdhocTasks(TableConfig tableConfig, Map<String, String> taskConfigs) + throws Exception { + String taskUUID = UUID.randomUUID().toString(); + // Only generate tasks for OFFLINE tables + String offlineTableName = tableConfig.getTableName(); + if (tableConfig.getTableType() != TableType.OFFLINE) { + LOGGER.warn("Skip generating SegmentGenerationAndPushTask for non-OFFLINE table: {}", offlineTableName); + return ImmutableList.of(); Review Comment: Should we not throw an exception to return this error to the caller of the `/tasks/execute` API? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/BaseTaskGenerator.java: ########## @@ -68,4 +72,11 @@ public int getNumConcurrentTasksPerInstance() { } return JobConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE; } + + @Override + public List<PinotTaskConfig> generateAdhocTasks(TableConfig tableConfig, Map<String, String> taskConfigs) + throws Exception { + throw new UnsupportedOperationException( Review Comment: 4xx? ########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java: ########## @@ -177,6 +180,76 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) { return pinotTaskConfigs; } + @Override + public List<PinotTaskConfig> generateAdhocTasks(TableConfig tableConfig, Map<String, String> taskConfigs) + throws Exception { + String taskUUID = UUID.randomUUID().toString(); + // Only generate tasks for OFFLINE tables + String offlineTableName = tableConfig.getTableName(); + if (tableConfig.getTableType() != TableType.OFFLINE) { + LOGGER.warn("Skip generating SegmentGenerationAndPushTask for non-OFFLINE table: {}", offlineTableName); + return ImmutableList.of(); + } + + // Override task configs from table with adhoc task configs. + Map<String, String> batchConfigMap = new HashMap<>(); + TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig(); + if (tableTaskConfig != null) { + batchConfigMap.putAll( + tableTaskConfig.getConfigsForTaskType(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE)); + } + batchConfigMap.putAll(taskConfigs); + + int tableNumTasks = 0; + try { + URI inputDirURI = + SegmentGenerationUtils.getDirectoryURI(batchConfigMap.get(BatchConfigProperties.INPUT_DIR_URI)); + List<URI> inputFileURIs = getInputFilesFromDirectory(batchConfigMap, inputDirURI, Collections.emptySet()); + if (inputFileURIs.isEmpty()) { + LOGGER.warn("Skip generating SegmentGenerationAndPushTask, no input files found : {}", inputDirURI); + return ImmutableList.of(); Review Comment: should we not propagate this error to the caller of the API? ########## pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java: ########## @@ -176,7 +176,7 @@ public static void copyURIToLocal(Map<String, String> batchConfigMap, URI source if (!PinotFSFactory.isSchemeSupported(sourceFileURIScheme)) { PinotFSFactory.register(sourceFileURIScheme, batchConfigMap.get(BatchConfigProperties.INPUT_FS_CLASS), IngestionConfigUtils.getInputFsProps(IngestionConfigUtils.getConfigMapWithPrefix( - batchConfigMap, BatchConfigProperties.INPUT_FS_PROP_PREFIX))); + batchConfigMap, BatchConfigProperties.INPUT_FS_PROP_PREFIX + IngestionConfigUtils.DOT_SEPARATOR))); Review Comment: Isnt it better to keep this in a separate PR ? (that way, when we cut releases, it is easy to identify a bug fix vs feature) ########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java: ########## @@ -177,6 +180,76 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) { return pinotTaskConfigs; } + @Override + public List<PinotTaskConfig> generateAdhocTasks(TableConfig tableConfig, Map<String, String> taskConfigs) + throws Exception { + String taskUUID = UUID.randomUUID().toString(); + // Only generate tasks for OFFLINE tables + String offlineTableName = tableConfig.getTableName(); + if (tableConfig.getTableType() != TableType.OFFLINE) { + LOGGER.warn("Skip generating SegmentGenerationAndPushTask for non-OFFLINE table: {}", offlineTableName); + return ImmutableList.of(); + } + + // Override task configs from table with adhoc task configs. + Map<String, String> batchConfigMap = new HashMap<>(); + TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig(); + if (tableTaskConfig != null) { + batchConfigMap.putAll( + tableTaskConfig.getConfigsForTaskType(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE)); + } + batchConfigMap.putAll(taskConfigs); + + int tableNumTasks = 0; + try { + URI inputDirURI = + SegmentGenerationUtils.getDirectoryURI(batchConfigMap.get(BatchConfigProperties.INPUT_DIR_URI)); + List<URI> inputFileURIs = getInputFilesFromDirectory(batchConfigMap, inputDirURI, Collections.emptySet()); + if (inputFileURIs.isEmpty()) { + LOGGER.warn("Skip generating SegmentGenerationAndPushTask, no input files found : {}", inputDirURI); + return ImmutableList.of(); + } + if (!batchConfigMap.containsKey(BatchConfigProperties.INPUT_FORMAT)) { + batchConfigMap.put(BatchConfigProperties.INPUT_FORMAT, + extractFormatFromFileSuffix(inputFileURIs.get(0).getPath())); + } + updateRecordReaderConfigs(batchConfigMap); + + List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>(); + LOGGER.info("Final input files for task config generation: {}", inputFileURIs); Review Comment: We should truncate the list of URIs if the list is too long. ########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskExecutor.java: ########## @@ -305,8 +303,8 @@ protected SegmentGenerationTaskSpec generateTaskSpec(Map<String, String> taskCon } SegmentNameGeneratorSpec segmentNameGeneratorSpec = new SegmentNameGeneratorSpec(); segmentNameGeneratorSpec.setType(taskConfigs.get(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE)); - segmentNameGeneratorSpec.setConfigs(IngestionConfigUtils - .getConfigMapWithPrefix(taskConfigs, BatchConfigProperties.SEGMENT_NAME_GENERATOR_PROP_PREFIX)); + segmentNameGeneratorSpec.setConfigs(IngestionConfigUtils.getConfigMapWithPrefix(taskConfigs, + BatchConfigProperties.SEGMENT_NAME_GENERATOR_PROP_PREFIX + IngestionConfigUtils.DOT_SEPARATOR)); Review Comment: same here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org