Copilot commented on code in PR #16571:
URL: https://github.com/apache/pinot/pull/16571#discussion_r2268133562
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -739,6 +748,16 @@ protected TaskSchedulingInfo
scheduleTask(PinotTaskGenerator taskGenerator, List
List<PinotTaskConfig> presentTaskConfig =
minionInstanceTagToTaskConfigs.computeIfAbsent(minionInstanceTag,
k -> new ArrayList<>());
taskGenerator.generateTasks(List.of(tableConfig), presentTaskConfig);
+ int maxNumberOfSubTasks = taskGenerator.getMaxNumSubTasks();
+ // choose first maxNumberOfSubTasks tasks to schedule from
presentTaskConfig
+ if (presentTaskConfig.size() > maxNumberOfSubTasks) {
+ LOGGER.warn("Number of tasks generated for task type: {} for table:
{} is {}, which is greater than the "
+ + "maximum number of tasks to schedule: {}. Only the first {}
tasks will be scheduled. This is controlled"
+ + " by the cluster config maxAllowedSubTasks which is set
based on controller's performance",
+ taskType, tableName, presentTaskConfig.size(),
maxNumberOfSubTasks, maxNumberOfSubTasks);
+ presentTaskConfig = presentTaskConfig.subList(0,
maxNumberOfSubTasks);
Review Comment:
The subList assignment is not updating the original list in the map. This
creates a new list but doesn't update `minionInstanceTagToTaskConfigs`. The
subsequent `put` operation on line 761 should use the truncated list, but the
variable assignment here doesn't affect the map entry.
```suggestion
presentTaskConfig = new ArrayList<>(presentTaskConfig.subList(0,
maxNumberOfSubTasks));
```
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/BaseTaskGenerator.java:
##########
@@ -81,6 +81,48 @@ public int getNumConcurrentTasksPerInstance() {
return JobConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
}
+ @Override
+ public int getMaxNumSubTasks() {
+ String configKey = MinionConstants.MAX_ALLOWED_SUB_TASKS_KEY;
+ String configValue = _clusterInfoAccessor.getClusterConfig(configKey);
+ if (configValue != null) {
+ try {
+ return Integer.parseInt(configValue);
+ } catch (Exception e) {
+ LOGGER.error("Invalid config {}: '{}'", configKey, configValue, e);
+ }
+ }
+ return MinionConstants.DEFAULT_MAX_NUM_OF_SUBTASKS;
+ }
+
+ /**
+ * Gets the final maximum number of subtasks for the given table based on the
+ * 1. configured value for the table subtask
+ * 2. default value for the task type
+ * 3. any cluster default(s)
+ * @param defaultNumSubTasks - the default number of subtasks for the task
type
+ */
+ public int getNumSubTasks(Map<String, String> taskConfigs, int
defaultNumSubTasks, String tableName) {
+ int tableMaxNumTasks = defaultNumSubTasks;
+ String tableMaxNumTasksConfig =
taskConfigs.get(MinionConstants.TABLE_MAX_NUM_TASKS_KEY);
+ if (tableMaxNumTasksConfig != null) {
+ try {
+ tableMaxNumTasks = Integer.parseInt(tableMaxNumTasksConfig);
+ } catch (Exception e) {
+ LOGGER.warn("MaxNumTasks {} have been wrongly set for table : {}, and
task {}",
+ tableMaxNumTasksConfig, tableName, getTaskType());
+ }
+ }
+ if (tableMaxNumTasks > getMaxNumSubTasks() || tableMaxNumTasks <= 0) {
+ LOGGER.warn(
+ "MaxNumTasks for table {} for tasktype {} is {} which is greater
than the max allowed subtasks {}"
+ + ". Setting it to the max allowed subtasks",
Review Comment:
The condition checks for `tableMaxNumTasks <= 0` but doesn't handle negative
values appropriately. If `tableMaxNumTasks` is negative, it should either use
the default value or throw an exception, rather than setting it to
`getMaxNumSubTasks()`.
```suggestion
if (tableMaxNumTasks < 0) {
LOGGER.warn(
"MaxNumTasks for table {} for tasktype {} is {} which is negative.
Setting it to the default value {}",
tableName, getTaskType(), tableMaxNumTasks, defaultNumSubTasks);
tableMaxNumTasks = defaultNumSubTasks;
} else if (tableMaxNumTasks == 0) {
LOGGER.warn(
"MaxNumTasks for table {} for tasktype {} is 0. Setting it to the
default value {}",
tableName, getTaskType(), defaultNumSubTasks);
tableMaxNumTasks = defaultNumSubTasks;
} else if (tableMaxNumTasks > getMaxNumSubTasks()) {
LOGGER.warn(
"MaxNumTasks for table {} for tasktype {} is {} which is greater
than the max allowed subtasks {}. Setting it to the max allowed subtasks",
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]