This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 1c202c7ea0 get task runtime configs tracked in Helix (#9540) 1c202c7ea0 is described below commit 1c202c7ea0a184de4c216943b1e36ba313431d22 Author: Xiaobing <61892277+klsi...@users.noreply.github.com> AuthorDate: Thu Oct 6 14:06:15 2022 -0700 get task runtime configs tracked in Helix (#9540) --- .../api/resources/PinotTaskRestletResource.java | 20 +++++++++++++------ .../helix/core/minion/ClusterInfoAccessor.java | 2 +- .../core/minion/PinotHelixTaskResourceManager.java | 23 +++++++++++++++++++--- .../MergeRollupMinionClusterIntegrationTest.java | 6 +++--- .../pinot/minion/event/MinionEventObservers.java | 9 ++++++++- 5 files changed, 46 insertions(+), 14 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java index e0535be493..c2605c7614 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java @@ -292,10 +292,9 @@ public class PinotTaskRestletResource { // Relying on original schema that was used to query the controller URI uri = _uriInfo.getRequestUri(); String scheme = uri.getScheme(); - List<String> controllerUrls = controllers.stream().map(controller -> { - return String.format("%s://%s:%d/tasks/generator/%s/%s/debug?localOnly=true", scheme, controller.getHostName(), - Integer.parseInt(controller.getPort()), tableNameWithType, taskType); - }).collect(Collectors.toList()); + List<String> controllerUrls = controllers.stream().map(controller -> String + .format("%s://%s:%d/tasks/generator/%s/%s/debug?localOnly=true", scheme, controller.getHostName(), + Integer.parseInt(controller.getPort()), tableNameWithType, taskType)).collect(Collectors.toList()); CompletionServiceHelper completionServiceHelper = new CompletionServiceHelper(_executor, _connectionManager, HashBiMap.create(0)); @@ -303,6 +302,7 @@ public class PinotTaskRestletResource { httpHeaders.getRequestHeaders().keySet().forEach(header -> { requestHeaders.put(header, httpHeaders.getHeaderString(header)); }); + LOGGER.debug("Getting task generation info with controllerUrls: {}", controllerUrls); CompletionServiceHelper.CompletionServiceResponse serviceResponse = completionServiceHelper.doMultiGetRequest(controllerUrls, null, true, requestHeaders, 10000); @@ -386,7 +386,15 @@ public class PinotTaskRestletResource { @ApiOperation("Get the task config (a list of child task configs) for the given task") public List<PinotTaskConfig> getTaskConfigs( @ApiParam(value = "Task name", required = true) @PathParam("taskName") String taskName) { - return _pinotHelixTaskResourceManager.getTaskConfigs(taskName); + return _pinotHelixTaskResourceManager.getSubtaskConfigs(taskName); + } + + @GET + @Path("/tasks/task/{taskName}/runtime/config") + @ApiOperation("Get the task runtime config for the given task") + public Map<String, String> getTaskConfig( + @ApiParam(value = "Task name", required = true) @PathParam("taskName") String taskName) { + return _pinotHelixTaskResourceManager.getTaskRuntimeConfig(taskName); } @Deprecated @@ -395,7 +403,7 @@ public class PinotTaskRestletResource { @ApiOperation("Get the task config (a list of child task configs) for the given task (deprecated)") public List<PinotTaskConfig> getTaskConfigsDeprecated( @ApiParam(value = "Task name", required = true) @PathParam("taskName") String taskName) { - return _pinotHelixTaskResourceManager.getTaskConfigs(taskName); + return _pinotHelixTaskResourceManager.getSubtaskConfigs(taskName); } @GET diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java index 14f5e0ee2c..bcb1491988 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java @@ -148,7 +148,7 @@ public class ClusterInfoAccessor { * @return List of child task configs */ public List<PinotTaskConfig> getTaskConfigs(String taskName) { - return _pinotHelixTaskResourceManager.getTaskConfigs(taskName); + return _pinotHelixTaskResourceManager.getSubtaskConfigs(taskName); } /** diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java index 04cb52481f..48e3717a72 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java @@ -434,7 +434,7 @@ public class PinotHelixTaskResourceManager { * @param taskName Task name * @return List of child task configs */ - public synchronized List<PinotTaskConfig> getTaskConfigs(String taskName) { + public synchronized List<PinotTaskConfig> getSubtaskConfigs(String taskName) { Collection<TaskConfig> helixTaskConfigs = _taskDriver.getJobConfig(getHelixJobName(taskName)).getTaskConfigMap().values(); List<PinotTaskConfig> taskConfigs = new ArrayList<>(helixTaskConfigs.size()); @@ -444,6 +444,23 @@ public class PinotHelixTaskResourceManager { return taskConfigs; } + /** + * Get the task runtime config for the given task name. A task can have multiple subtasks, whose configs can be + * retrieved via the getSubtaskConfigs() method instead. + * + * @param taskName Task name + * @return Configs for the task returned as a Map. + */ + public synchronized Map<String, String> getTaskRuntimeConfig(String taskName) { + JobConfig jobConfig = _taskDriver.getJobConfig(getHelixJobName(taskName)); + HashMap<String, String> configs = new HashMap<>(); + configs.put("ConcurrentTasksPerWorker", String.valueOf(jobConfig.getNumConcurrentTasksPerInstance())); + configs.put("TaskTimeoutMs", String.valueOf(jobConfig.getTimeoutPerTask())); + configs.put("TaskExpireTimeMs", String.valueOf(jobConfig.getExpiry())); + configs.put("MinionWorkerGroupTag", jobConfig.getInstanceGroupTag()); + return configs; + } + /** * Get configs of the specified sub task for a given task. * @@ -567,7 +584,7 @@ public class PinotHelixTaskResourceManager { String taskName = taskState.getKey(); // Iterate through all task configs associated with this task name - for (PinotTaskConfig taskConfig : getTaskConfigs(taskName)) { + for (PinotTaskConfig taskConfig : getSubtaskConfigs(taskName)) { Map<String, String> pinotConfigs = taskConfig.getConfigs(); // Filter task configs that matches this table name @@ -650,7 +667,7 @@ public class PinotHelixTaskResourceManager { String pinotTaskName = getPinotTaskName(helixJobName); // Iterate through all task configs associated with this task name - for (PinotTaskConfig taskConfig : getTaskConfigs(pinotTaskName)) { + for (PinotTaskConfig taskConfig : getSubtaskConfigs(pinotTaskName)) { Map<String, String> pinotConfigs = taskConfig.getConfigs(); // Filter task configs that matches this table name diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java index e80e9cfbb9..5846eba2d3 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java @@ -288,7 +288,7 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat for (String tasks = _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE); tasks != null; tasks = _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE), numTasks++) { - assertEquals(_helixTaskResourceManager.getTaskConfigs(tasks).size(), expectedNumSubTasks[numTasks]); + assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(), expectedNumSubTasks[numTasks]); assertTrue(_helixTaskResourceManager.getTaskQueues() .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE))); // Will not schedule task if there's incomplete task @@ -393,7 +393,7 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat for (String tasks = _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE); tasks != null; tasks = _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE), numTasks++) { - assertEquals(_helixTaskResourceManager.getTaskConfigs(tasks).size(), 1); + assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(), 1); assertTrue(_helixTaskResourceManager.getTaskQueues() .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE))); // Will not schedule task if there's incomplete task @@ -541,7 +541,7 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat for (String tasks = _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE); tasks != null; tasks = _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE), numTasks++) { - assertEquals(_helixTaskResourceManager.getTaskConfigs(tasks).size(), expectedNumSubTasks[numTasks]); + assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(), expectedNumSubTasks[numTasks]); assertTrue(_helixTaskResourceManager.getTaskQueues() .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE))); // Will not schedule task if there's incomplete task diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionEventObservers.java b/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionEventObservers.java index 440b38a787..46befe39a8 100644 --- a/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionEventObservers.java +++ b/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionEventObservers.java @@ -62,6 +62,7 @@ public class MinionEventObservers { LOGGER.info("Configured to clean up task event observers immediately"); return; } + LOGGER.info("Configured to clean up task event observers with cleanupDelayMs: {}", _eventObserverCleanupDelayMs); _cleanupExecutor.submit(() -> { LOGGER.info("Start to cleanup task event observers with cleanupDelayMs: {}", _eventObserverCleanupDelayMs); while (!Thread.interrupted()) { @@ -97,7 +98,13 @@ public class MinionEventObservers { } public static MinionEventObservers getInstance() { - return _customInstance != null ? _customInstance : DEFAULT_INSTANCE; + if (_customInstance != null) { + return _customInstance; + } + // Test code might reach here, but this should never happen in prod case, as instance is created upon worker + // starts before any tasks can run. But log something for debugging just in case. + LOGGER.warn("Using default MinionEventObservers instance"); + return DEFAULT_INSTANCE; } public MinionEventObserver getMinionEventObserver(String taskId) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org