This is an automated email from the ASF dual-hosted git repository. cpsoman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new a251c42 Adding ability to check ingestion status for Offline Pinot table (#7070) a251c42 is described below commit a251c423183a0460dd4bf1e6dd644a935003725a Author: icefury71 <chinmay.cere...@gmail.com> AuthorDate: Wed Jul 7 10:31:50 2021 -0700 Adding ability to check ingestion status for Offline Pinot table (#7070) * Adding ability to check ingestion status for Offline Pinot table * - Adding table ingestion status to debug endpoint - Error handling for offline table * Fixing unused import violation * Fixing formatting violation (unused import) * Addressing review feedback (TABLE_NAME constant) --- .../pinot/controller/api/debug/TableDebugInfo.java | 11 ++- .../api/resources/PinotTableRestletResource.java | 34 ++++++--- .../api/resources/PinotTaskRestletResource.java | 9 +++ .../api/resources/TableDebugResource.java | 28 ++++++- .../core/minion/PinotHelixTaskResourceManager.java | 35 +++++++++ .../util/TableIngestionStatusHelper.java | 89 ++++++++++++++++++++++ .../apache/pinot/spi/config/table/TableStatus.java | 3 +- .../apache/pinot/spi/utils/CommonConstants.java | 2 + 8 files changed, 198 insertions(+), 13 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/debug/TableDebugInfo.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/debug/TableDebugInfo.java index 3242894..54d13da 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/debug/TableDebugInfo.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/debug/TableDebugInfo.java @@ -27,6 +27,7 @@ import java.util.Map; import org.apache.commons.io.FileUtils; import org.apache.pinot.common.restlet.resources.SegmentConsumerInfo; import org.apache.pinot.common.restlet.resources.SegmentErrorInfo; +import org.apache.pinot.spi.config.table.TableStatus; /** @@ -44,6 +45,9 @@ public class TableDebugInfo { @JsonProperty("tableName") private final String _tableName; + @JsonProperty("ingestionStatus") + private final TableStatus.IngestionStatus _ingestionStatus; + @JsonProperty("tableSize") private final TableSizeSummary _tableSizeSummary; @@ -66,10 +70,11 @@ public class TableDebugInfo { private final List<BrokerDebugInfo> _brokerDebugInfos; @JsonCreator - public TableDebugInfo(String tableName, TableSizeSummary tableSizeSummary, int numBrokers, int numServers, + public TableDebugInfo(String tableName, TableStatus.IngestionStatus ingestionStatus, TableSizeSummary tableSizeSummary, int numBrokers, int numServers, int numSegments, List<SegmentDebugInfo> segmentDebugInfos, List<ServerDebugInfo> serverDebugInfos, List<BrokerDebugInfo> brokerDebugInfos) { _tableName = tableName; + _ingestionStatus = ingestionStatus; _tableSizeSummary = tableSizeSummary; _numBrokers = numBrokers; @@ -85,6 +90,10 @@ public class TableDebugInfo { return _tableName; } + public TableStatus.IngestionStatus getIngestionStatus() { + return _ingestionStatus; + } + public TableSizeSummary getTableSize() { return _tableSizeSummary; } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java index a0bf714..bb47d77 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java @@ -62,11 +62,12 @@ import org.apache.pinot.controller.api.exception.ControllerApplicationException; import org.apache.pinot.controller.api.exception.InvalidTableConfigException; import org.apache.pinot.controller.api.exception.TableAlreadyExistsException; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager; import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfigConstants; import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult; import org.apache.pinot.controller.recommender.RecommenderDriver; import org.apache.pinot.controller.tuner.TableConfigTunerUtils; -import org.apache.pinot.controller.util.ConsumingSegmentInfoReader; +import org.apache.pinot.controller.util.TableIngestionStatusHelper; import org.apache.pinot.segment.local.utils.TableConfigUtils; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableStats; @@ -107,6 +108,9 @@ public class PinotTableRestletResource { PinotHelixResourceManager _pinotHelixResourceManager; @Inject + PinotHelixTaskResourceManager _pinotHelixTaskResourceManager; + + @Inject ControllerConf _controllerConf; @Inject @@ -606,17 +610,27 @@ public class PinotTableRestletResource { @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName, @ApiParam(value = "realtime|offline") @QueryParam("type") String tableTypeStr) { try { - TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName); + TableType tableType = Constants.validateTableType(tableTypeStr); + if (tableType == null) { + throw new ControllerApplicationException(LOGGER, "Table type should either be realtime|offline", + Response.Status.BAD_REQUEST); + } + String tableNameWithType = TableNameBuilder.forType(tableType).tableNameWithType(tableName); + if (!_pinotHelixResourceManager.hasTable(tableNameWithType)) { + throw new ControllerApplicationException(LOGGER, + "Specified table name: " + tableName + " of type: " + tableTypeStr + " does not exist.", + Response.Status.BAD_REQUEST); + } + TableStatus.IngestionStatus ingestionStatus = null; if (TableType.OFFLINE == tableType) { - // TODO: Support table status for offline table. Currently only supported for realtime. - throw new UnsupportedOperationException( - "Table status for OFFLINE table: " + tableName + " is currently unsupported"); + ingestionStatus = TableIngestionStatusHelper + .getOfflineTableIngestionStatus(tableNameWithType, _pinotHelixResourceManager, + _pinotHelixTaskResourceManager); + } else { + ingestionStatus = TableIngestionStatusHelper.getRealtimeTableIngestionStatus(tableNameWithType, + _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000, _executor, _connectionManager, + _pinotHelixResourceManager); } - String tableNameWithType = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName); - ConsumingSegmentInfoReader consumingSegmentInfoReader = - new ConsumingSegmentInfoReader(_executor, _connectionManager, _pinotHelixResourceManager); - TableStatus.IngestionStatus ingestionStatus = consumingSegmentInfoReader - .getIngestionStatus(tableNameWithType, _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000); TableStatus tableStatus = new TableStatus(ingestionStatus); return JsonUtils.objectToPrettyString(tableStatus); } catch (Exception e) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java index 096ab3b..e573be2 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java @@ -122,6 +122,15 @@ public class PinotTaskRestletResource { return _pinotHelixTaskResourceManager.getTasks(taskType); } + @GET + @Path("/tasks/{taskType}/{tableNameWithType}/state") + @ApiOperation("List all tasks for the given task type") + public Map<String, TaskState> getTaskStatesByTable( + @ApiParam(value = "Task type", required = true) @PathParam("taskType") String taskType, + @ApiParam(value = "Table name with type", required = true) @PathParam("tableNameWithType") String tableNameWithType) { + return _pinotHelixTaskResourceManager.getTaskStatesByTable(taskType, tableNameWithType); + } + @Deprecated @GET @Path("/tasks/tasks/{taskType}") diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableDebugResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableDebugResource.java index f42afc2..8bbab53 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableDebugResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableDebugResource.java @@ -61,8 +61,11 @@ import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.api.debug.TableDebugInfo; import org.apache.pinot.controller.api.exception.ControllerApplicationException; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager; import org.apache.pinot.controller.util.CompletionServiceHelper; +import org.apache.pinot.controller.util.TableIngestionStatusHelper; import org.apache.pinot.controller.util.TableSizeReader; +import org.apache.pinot.spi.config.table.TableStatus; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.JsonUtils; @@ -87,7 +90,11 @@ public class TableDebugResource { PinotHelixResourceManager _pinotHelixResourceManager; @Inject + PinotHelixTaskResourceManager _pinotHelixTaskResourceManager; + + @Inject Executor _executor; + @Inject HttpConnectionManager _connectionManager; @@ -152,16 +159,35 @@ public class TableDebugResource { // Table size summary. TableDebugInfo.TableSizeSummary tableSizeSummary = getTableSize(tableNameWithType); + TableStatus.IngestionStatus ingestionStatus = getIngestionStatus(tableNameWithType, tableType); + // Number of segments in the table. IdealState idealState = _pinotHelixResourceManager.getTableIdealState(tableNameWithType); int numSegments = (idealState != null) ? idealState.getPartitionSet().size() : 0; - return new TableDebugInfo(tableNameWithType, tableSizeSummary, + return new TableDebugInfo(tableNameWithType, ingestionStatus, tableSizeSummary, _pinotHelixResourceManager.getBrokerInstancesForTable(tableName, tableType).size(), _pinotHelixResourceManager.getServerInstancesForTable(tableName, tableType).size(), numSegments, segmentDebugInfos, serverDebugInfos, brokerDebugInfos); } + private TableStatus.IngestionStatus getIngestionStatus(String tableNameWithType, TableType tableType) { + try { + switch (tableType) { + case OFFLINE: + return TableIngestionStatusHelper.getOfflineTableIngestionStatus(tableNameWithType, _pinotHelixResourceManager, + _pinotHelixTaskResourceManager); + case REALTIME: + return TableIngestionStatusHelper.getRealtimeTableIngestionStatus(tableNameWithType, + _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000, _executor, _connectionManager, + _pinotHelixResourceManager); + } + } catch (Exception e) { + return TableStatus.IngestionStatus.newIngestionStatus(TableStatus.IngestionState.UNKNOWN, e.getMessage()); + } + return null; + } + private TableDebugInfo.TableSizeSummary getTableSize(String tableNameWithType) { TableSizeReader tableSizeReader = new TableSizeReader(_executor, _connectionManager, _controllerMetrics, _pinotHelixResourceManager); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java index 265d9f3..f687b36 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java @@ -42,6 +42,8 @@ import org.apache.pinot.spi.utils.CommonConstants.Helix; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.pinot.spi.utils.CommonConstants.TABLE_NAME; + /** * The class <code>PinotHelixTaskResourceManager</code> manages all the task resources in Pinot cluster. @@ -347,6 +349,39 @@ public class PinotHelixTaskResourceManager { } /** + * Helper method to return a map of task names to corresponding task state + * where the task corresponds to the given Pinot table name. This is used to + * check status of all tasks for a given table. + * @param taskType Task Name + * @param tableNameWithType table name with type to filter on + * @return Map of filtered task name to corresponding state + */ + public synchronized Map<String, TaskState> getTaskStatesByTable(String taskType, String tableNameWithType) { + Map<String, TaskState> filteredTaskStateMap = new HashMap<>(); + Map<String, TaskState> taskStateMap = getTaskStates(taskType); + + for (Map.Entry<String, TaskState> taskState : taskStateMap.entrySet()) { + String taskName = taskState.getKey(); + + // Iterate through all task configs associated with this task name + for (PinotTaskConfig taskConfig: getTaskConfigs(taskName)) { + Map<String, String> pinotConfigs = taskConfig.getConfigs(); + + // Filter task configs that matches this table name + if (pinotConfigs != null) { + String tableNameConfig = pinotConfigs.get(TABLE_NAME); + if (tableNameConfig != null && tableNameConfig.equals(tableNameWithType)) { + // Found a match ! Track state for this particular task in the final result map + filteredTaskStateMap.put(taskName, taskStateMap.get(taskName)); + break; + } + } + } + } + return filteredTaskStateMap; + } + + /** * Helper method to convert task type to Helix JobQueue name. * <p>E.g. DummyTask -> TaskQueue_DummyTask * diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableIngestionStatusHelper.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableIngestionStatusHelper.java new file mode 100644 index 0000000..3ca4625 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableIngestionStatusHelper.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.util; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executor; +import javax.ws.rs.core.Response; +import org.apache.commons.httpclient.HttpConnectionManager; +import org.apache.helix.task.TaskState; +import org.apache.pinot.controller.api.exception.ControllerApplicationException; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager; +import org.apache.pinot.core.common.MinionConstants; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableStatus; +import org.apache.pinot.spi.config.table.TableTaskConfig; +import org.slf4j.LoggerFactory; + + +/** + * Helper class to fetch ingestion status for realtime and offline table + */ +public class TableIngestionStatusHelper { + public static org.slf4j.Logger LOGGER = LoggerFactory.getLogger(TableIngestionStatusHelper.class); + + public static TableStatus.IngestionStatus getRealtimeTableIngestionStatus(String tableNameWithType, int timeoutMs, + Executor executor, HttpConnectionManager connectionManager, PinotHelixResourceManager pinotHelixResourceManager) { + ConsumingSegmentInfoReader consumingSegmentInfoReader = + new ConsumingSegmentInfoReader(executor, connectionManager, pinotHelixResourceManager); + return consumingSegmentInfoReader.getIngestionStatus(tableNameWithType, timeoutMs); + } + + public static TableStatus.IngestionStatus getOfflineTableIngestionStatus(String tableNameWithType, + PinotHelixResourceManager pinotHelixResourceManager, + PinotHelixTaskResourceManager pinotHelixTaskResourceManager) { + // Check if this offline table uses the built-in segment generation and push task type + // Offline table ingestion status for ingestion via other task types is not supported. + TableConfig tableConfig = pinotHelixResourceManager.getTableConfig(tableNameWithType); + TableTaskConfig taskConfig = tableConfig.getTaskConfig(); + if (taskConfig == null + || taskConfig.getConfigsForTaskType(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE) == null) { + throw new ControllerApplicationException(LOGGER, + "Cannot retrieve ingestion status for Table : " + tableNameWithType + + " since it does not use the built-in SegmentGenerationAndPushTask task", Response.Status.BAD_REQUEST); + } + + TableStatus.IngestionState ingestionState = TableStatus.IngestionState.HEALTHY; + String errorMessage = ""; + + // Retrieve all the Minion tasks and corresponding states for this table + Map<String, TaskState> taskStateMap = pinotHelixTaskResourceManager + .getTaskStatesByTable(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE, tableNameWithType); + List<String> failedTasks = new ArrayList<>(); + + // Check if any of the tasks are in error state + for (Map.Entry<String, TaskState> taskStateEntry : taskStateMap.entrySet()) { + switch (taskStateEntry.getValue()) { + case FAILED: + case ABORTED: + failedTasks.add(taskStateEntry.getKey()); + default: + continue; + } + } + if (failedTasks.size() > 0) { + ingestionState = TableStatus.IngestionState.UNHEALTHY; + errorMessage = "Follow ingestion tasks have failed: " + failedTasks.toString(); + } + return new TableStatus.IngestionStatus(ingestionState.toString(), errorMessage); + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableStatus.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableStatus.java index 9c0ea16..8be22d5 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableStatus.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableStatus.java @@ -33,7 +33,8 @@ public class TableStatus { public enum IngestionState { HEALTHY, - UNHEALTHY + UNHEALTHY, + UNKNOWN } @JsonIgnoreProperties(ignoreUnknown = true) diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 7cec52d..7a43e06 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -33,6 +33,8 @@ public class CommonConstants { public static final String KEY_OF_AUTH_TOKEN = "auth.token"; + public static final String TABLE_NAME = "tableName"; + /** * The state of the consumer for a given segment */ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org