klsince commented on code in PR #8465: URL: https://github.com/apache/pinot/pull/8465#discussion_r843229599
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java: ########## @@ -128,6 +132,53 @@ public PinotTaskManager(PinotHelixTaskResourceManager helixTaskResourceManager, } } + public String createTask(Map<String, String> taskConfigs) + throws Exception { + String taskName = taskConfigs.get("taskName"); + if (taskName == null) { + throw new IllegalArgumentException("Missing field 'taskName'"); + } + String taskType = taskConfigs.get("taskType"); + if (taskType == null) { + throw new IllegalArgumentException("Missing field 'taskType'"); + } + String tableName = taskConfigs.get("tableName"); + if (tableName == null) { + throw new IllegalArgumentException("Missing field 'tableName'"); + } + String parentTaskName = _helixTaskResourceManager.getParentTaskName(taskType, taskName); + TaskState taskState = _helixTaskResourceManager.getTaskState(parentTaskName); + if (taskState != null) { + throw new RuntimeException( + "Task [" + taskName + "] of type [" + taskType + "] is already created. Current state is " + taskState); + } + if (TableNameBuilder.isRealtimeTableResource(tableName)) { + throw new UnsupportedOperationException( + "Realtime table: " + tableName + " is not supported for task type - " + taskType); + } + String tableNameWithType = + TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(TableNameBuilder.extractRawTableName(tableName)); + TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType); + PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType); + // Generate each type of tasks + if (taskGenerator != null) { + _helixTaskResourceManager.ensureTaskQueueExists(taskType); + addTaskTypeMetricsUpdaterIfNeeded(taskType); + LOGGER.info("Trying to create a task type: {}", taskGenerator.getTaskType()); + List<PinotTaskConfig> pinotTaskConfigs = taskGenerator.generateAdhocTasks(tableConfig, taskConfigs); + if (pinotTaskConfigs.isEmpty()) { + throw new NotFoundException("No task config has been generated"); + } + LOGGER.info("Submitting ad-hoc task for task type: {} with task configs: {}", taskType, pinotTaskConfigs); + _controllerMetrics.addMeteredTableValue(taskType, ControllerMeter.NUMBER_TASKS_SUBMITTED, 1); Review Comment: use a specific meter for submitting ad-hoc tasks? ########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java: ########## @@ -177,6 +180,78 @@ public String getTaskType() { return pinotTaskConfigs; } + @Override + public List<PinotTaskConfig> generateAdhocTasks(TableConfig tableConfig, Map<String, String> taskConfigs) + throws Exception { + String taskUUID = UUID.randomUUID().toString(); + // Only generate tasks for OFFLINE tables + String offlineTableName = tableConfig.getTableName(); + if (tableConfig.getTableType() != TableType.OFFLINE) { + LOGGER.warn("Skip generating SegmentGenerationAndPushTask for non-OFFLINE table: {}", offlineTableName); + return ImmutableList.of(); + } + + TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig(); + if (tableTaskConfig != null) { + Map<String, String> taskConfigsFromTable = + tableTaskConfig.getConfigsForTaskType(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE); + if (taskConfigsFromTable != null) { + taskConfigsFromTable.entrySet().forEach(entry -> taskConfigs.putIfAbsent(entry.getKey(), entry.getValue())); + } + } + Map<String, String> batchConfigMap = new HashMap<>(taskConfigs); + int tableNumTasks = 0; + try { + URI inputDirURI = + SegmentGenerationUtils.getDirectoryURI(batchConfigMap.get(BatchConfigProperties.INPUT_DIR_URI)); + List<URI> inputFileURIs = getInputFilesFromDirectory(batchConfigMap, inputDirURI, Collections.emptySet()); + if (inputFileURIs.isEmpty()) { + LOGGER.warn("Skip generating SegmentGenerationAndPushTask, no input files found : {}", inputDirURI); + return ImmutableList.of(); + } + if (!batchConfigMap.containsKey(BatchConfigProperties.INPUT_FORMAT)) { + batchConfigMap.put(BatchConfigProperties.INPUT_FORMAT, + extractFormatFromFileSuffix(inputFileURIs.get(0).getPath())); + } + updateRecordReaderConfigs(batchConfigMap); + + List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>(); + LOGGER.info("Final input files for task config generation: {}", inputFileURIs); + for (URI inputFileURI : inputFileURIs) { + Map<String, String> singleFileGenerationTaskConfig = + getSingleFileGenerationTaskConfig(offlineTableName, tableNumTasks, batchConfigMap, inputFileURI, + generateFixedSegmentName(offlineTableName, taskUUID, tableNumTasks)); + pinotTaskConfigs.add(new PinotTaskConfig(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE, + singleFileGenerationTaskConfig)); + tableNumTasks++; + } + if (!batchConfigMap.containsKey(BatchConfigProperties.INPUT_FORMAT)) { + batchConfigMap.put(BatchConfigProperties.INPUT_FORMAT, + extractFormatFromFileSuffix(inputFileURIs.get(0).getPath())); + updateRecordReaderConfigs(batchConfigMap); + } + return pinotTaskConfigs; + } catch (Exception e) { + LOGGER.error("Unable to generate the SegmentGenerationAndPush task. [ table configs: {}, task configs: {} ]", + tableConfig, taskConfigs, e); + throw e; + } + } + + private String generateFixedSegmentName(String offlineTableName, String taskUUID, int tableNumTasks) { + return String.format("%s_%s_%d", offlineTableName, taskUUID, tableNumTasks); + } + + private String extractFormatFromFileSuffix(String path) { + String fileExtension = path.substring(path.lastIndexOf(".") + 1); + switch (fileExtension) { + case "": + throw new UnsupportedOperationException("No file extension found"); Review Comment: nit: use `if(StringUtils.isEmpty())`for simplicity? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java: ########## @@ -128,6 +132,53 @@ public PinotTaskManager(PinotHelixTaskResourceManager helixTaskResourceManager, } } + public String createTask(Map<String, String> taskConfigs) + throws Exception { + String taskName = taskConfigs.get("taskName"); + if (taskName == null) { + throw new IllegalArgumentException("Missing field 'taskName'"); + } + String taskType = taskConfigs.get("taskType"); + if (taskType == null) { + throw new IllegalArgumentException("Missing field 'taskType'"); + } + String tableName = taskConfigs.get("tableName"); + if (tableName == null) { + throw new IllegalArgumentException("Missing field 'tableName'"); + } + String parentTaskName = _helixTaskResourceManager.getParentTaskName(taskType, taskName); + TaskState taskState = _helixTaskResourceManager.getTaskState(parentTaskName); + if (taskState != null) { + throw new RuntimeException( + "Task [" + taskName + "] of type [" + taskType + "] is already created. Current state is " + taskState); + } + if (TableNameBuilder.isRealtimeTableResource(tableName)) { + throw new UnsupportedOperationException( + "Realtime table: " + tableName + " is not supported for task type - " + taskType); + } + String tableNameWithType = + TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(TableNameBuilder.extractRawTableName(tableName)); + TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType); + PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType); + // Generate each type of tasks + if (taskGenerator != null) { + _helixTaskResourceManager.ensureTaskQueueExists(taskType); + addTaskTypeMetricsUpdaterIfNeeded(taskType); + LOGGER.info("Trying to create a task type: {}", taskGenerator.getTaskType()); Review Comment: nit: "... to create tasks of type:" {} ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java: ########## @@ -128,6 +132,53 @@ public PinotTaskManager(PinotHelixTaskResourceManager helixTaskResourceManager, } } + public String createTask(Map<String, String> taskConfigs) + throws Exception { + String taskName = taskConfigs.get("taskName"); + if (taskName == null) { + throw new IllegalArgumentException("Missing field 'taskName'"); + } + String taskType = taskConfigs.get("taskType"); + if (taskType == null) { + throw new IllegalArgumentException("Missing field 'taskType'"); + } + String tableName = taskConfigs.get("tableName"); + if (tableName == null) { + throw new IllegalArgumentException("Missing field 'tableName'"); + } + String parentTaskName = _helixTaskResourceManager.getParentTaskName(taskType, taskName); + TaskState taskState = _helixTaskResourceManager.getTaskState(parentTaskName); + if (taskState != null) { + throw new RuntimeException( + "Task [" + taskName + "] of type [" + taskType + "] is already created. Current state is " + taskState); + } + if (TableNameBuilder.isRealtimeTableResource(tableName)) { Review Comment: can do this check at the beginning to bail out sooner ########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java: ########## @@ -177,6 +180,78 @@ public String getTaskType() { return pinotTaskConfigs; } + @Override + public List<PinotTaskConfig> generateAdhocTasks(TableConfig tableConfig, Map<String, String> taskConfigs) + throws Exception { + String taskUUID = UUID.randomUUID().toString(); + // Only generate tasks for OFFLINE tables + String offlineTableName = tableConfig.getTableName(); + if (tableConfig.getTableType() != TableType.OFFLINE) { + LOGGER.warn("Skip generating SegmentGenerationAndPushTask for non-OFFLINE table: {}", offlineTableName); + return ImmutableList.of(); + } + + TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig(); + if (tableTaskConfig != null) { + Map<String, String> taskConfigsFromTable = + tableTaskConfig.getConfigsForTaskType(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE); + if (taskConfigsFromTable != null) { + taskConfigsFromTable.entrySet().forEach(entry -> taskConfigs.putIfAbsent(entry.getKey(), entry.getValue())); Review Comment: should user-provided task configs overwrite those from TableConfig instead? but anyway, perhaps leave a comment about merging the configs. ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java: ########## @@ -128,6 +132,53 @@ public PinotTaskManager(PinotHelixTaskResourceManager helixTaskResourceManager, } } + public String createTask(Map<String, String> taskConfigs) + throws Exception { + String taskName = taskConfigs.get("taskName"); + if (taskName == null) { + throw new IllegalArgumentException("Missing field 'taskName'"); + } + String taskType = taskConfigs.get("taskType"); + if (taskType == null) { + throw new IllegalArgumentException("Missing field 'taskType'"); + } + String tableName = taskConfigs.get("tableName"); + if (tableName == null) { + throw new IllegalArgumentException("Missing field 'tableName'"); + } + String parentTaskName = _helixTaskResourceManager.getParentTaskName(taskType, taskName); + TaskState taskState = _helixTaskResourceManager.getTaskState(parentTaskName); + if (taskState != null) { + throw new RuntimeException( + "Task [" + taskName + "] of type [" + taskType + "] is already created. Current state is " + taskState); + } + if (TableNameBuilder.isRealtimeTableResource(tableName)) { + throw new UnsupportedOperationException( + "Realtime table: " + tableName + " is not supported for task type - " + taskType); + } + String tableNameWithType = + TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(TableNameBuilder.extractRawTableName(tableName)); + TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType); + PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType); + // Generate each type of tasks + if (taskGenerator != null) { + _helixTaskResourceManager.ensureTaskQueueExists(taskType); + addTaskTypeMetricsUpdaterIfNeeded(taskType); + LOGGER.info("Trying to create a task type: {}", taskGenerator.getTaskType()); + List<PinotTaskConfig> pinotTaskConfigs = taskGenerator.generateAdhocTasks(tableConfig, taskConfigs); + if (pinotTaskConfigs.isEmpty()) { + throw new NotFoundException("No task config has been generated"); + } + LOGGER.info("Submitting ad-hoc task for task type: {} with task configs: {}", taskType, pinotTaskConfigs); + _controllerMetrics.addMeteredTableValue(taskType, ControllerMeter.NUMBER_TASKS_SUBMITTED, 1); + return _helixTaskResourceManager.submitTask(parentTaskName, pinotTaskConfigs, + CommonConstants.Helix.UNTAGGED_MINION_INSTANCE, taskGenerator.getTaskTimeoutMs(), + taskGenerator.getNumConcurrentTasksPerInstance()); + } + throw new UnsupportedOperationException( Review Comment: nit: to save some indents ``` if (taskGenerator == null) { throw new ... } ... the logic to submit adhoc tasks ... ``` ########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java: ########## @@ -177,6 +180,78 @@ public String getTaskType() { return pinotTaskConfigs; } + @Override + public List<PinotTaskConfig> generateAdhocTasks(TableConfig tableConfig, Map<String, String> taskConfigs) + throws Exception { + String taskUUID = UUID.randomUUID().toString(); + // Only generate tasks for OFFLINE tables + String offlineTableName = tableConfig.getTableName(); + if (tableConfig.getTableType() != TableType.OFFLINE) { + LOGGER.warn("Skip generating SegmentGenerationAndPushTask for non-OFFLINE table: {}", offlineTableName); + return ImmutableList.of(); + } + + TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig(); + if (tableTaskConfig != null) { + Map<String, String> taskConfigsFromTable = + tableTaskConfig.getConfigsForTaskType(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE); + if (taskConfigsFromTable != null) { + taskConfigsFromTable.entrySet().forEach(entry -> taskConfigs.putIfAbsent(entry.getKey(), entry.getValue())); + } + } + Map<String, String> batchConfigMap = new HashMap<>(taskConfigs); + int tableNumTasks = 0; + try { + URI inputDirURI = + SegmentGenerationUtils.getDirectoryURI(batchConfigMap.get(BatchConfigProperties.INPUT_DIR_URI)); + List<URI> inputFileURIs = getInputFilesFromDirectory(batchConfigMap, inputDirURI, Collections.emptySet()); + if (inputFileURIs.isEmpty()) { + LOGGER.warn("Skip generating SegmentGenerationAndPushTask, no input files found : {}", inputDirURI); + return ImmutableList.of(); + } + if (!batchConfigMap.containsKey(BatchConfigProperties.INPUT_FORMAT)) { + batchConfigMap.put(BatchConfigProperties.INPUT_FORMAT, + extractFormatFromFileSuffix(inputFileURIs.get(0).getPath())); + } + updateRecordReaderConfigs(batchConfigMap); + + List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>(); + LOGGER.info("Final input files for task config generation: {}", inputFileURIs); + for (URI inputFileURI : inputFileURIs) { + Map<String, String> singleFileGenerationTaskConfig = + getSingleFileGenerationTaskConfig(offlineTableName, tableNumTasks, batchConfigMap, inputFileURI, + generateFixedSegmentName(offlineTableName, taskUUID, tableNumTasks)); Review Comment: perhaps fallback to `fixed` generator, when no segment name generator is configured from task configs, instead of fixing on 'fixed' generator here? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java: ########## @@ -128,6 +132,53 @@ public PinotTaskManager(PinotHelixTaskResourceManager helixTaskResourceManager, } } + public String createTask(Map<String, String> taskConfigs) + throws Exception { + String taskName = taskConfigs.get("taskName"); + if (taskName == null) { + throw new IllegalArgumentException("Missing field 'taskName'"); + } + String taskType = taskConfigs.get("taskType"); + if (taskType == null) { + throw new IllegalArgumentException("Missing field 'taskType'"); + } + String tableName = taskConfigs.get("tableName"); + if (tableName == null) { + throw new IllegalArgumentException("Missing field 'tableName'"); + } + String parentTaskName = _helixTaskResourceManager.getParentTaskName(taskType, taskName); + TaskState taskState = _helixTaskResourceManager.getTaskState(parentTaskName); + if (taskState != null) { + throw new RuntimeException( + "Task [" + taskName + "] of type [" + taskType + "] is already created. Current state is " + taskState); + } + if (TableNameBuilder.isRealtimeTableResource(tableName)) { + throw new UnsupportedOperationException( + "Realtime table: " + tableName + " is not supported for task type - " + taskType); + } + String tableNameWithType = + TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(TableNameBuilder.extractRawTableName(tableName)); + TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType); + PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType); + // Generate each type of tasks + if (taskGenerator != null) { + _helixTaskResourceManager.ensureTaskQueueExists(taskType); + addTaskTypeMetricsUpdaterIfNeeded(taskType); + LOGGER.info("Trying to create a task type: {}", taskGenerator.getTaskType()); + List<PinotTaskConfig> pinotTaskConfigs = taskGenerator.generateAdhocTasks(tableConfig, taskConfigs); + if (pinotTaskConfigs.isEmpty()) { + throw new NotFoundException("No task config has been generated"); Review Comment: how about `return null` like the scheduleTask method? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org