This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 8e1a74d2a2 Add Tabled Is Disabled Error (#14199) 8e1a74d2a2 is described below commit 8e1a74d2a22ab8b3087f5de97913bb4a59dfce57 Author: ashishjayamohan <46698969+ashishjayamo...@users.noreply.github.com> AuthorDate: Sun Oct 27 16:59:55 2024 -0700 Add Tabled Is Disabled Error (#14199) --- .../BaseSingleStageBrokerRequestHandler.java | 33 ++++++++++++++++++++-- .../pinot/broker/routing/BrokerRoutingManager.java | 27 ++++++++++++++++-- .../pinot/common/exception/QueryException.java | 5 ++++ .../response/broker/BrokerResponseNative.java | 2 ++ 4 files changed, 62 insertions(+), 5 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java index c9862d1af0..42ae1d13ab 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java @@ -485,6 +485,7 @@ public abstract class BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ if (realtimeTableName == null) { realtimeTableConfig = null; } + HandlerContext handlerContext = getHandlerContext(offlineTableConfig, realtimeTableConfig); if (handlerContext._disableGroovy) { rejectGroovyQuery(serverPinotQuery); @@ -620,9 +621,16 @@ public abstract class BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ Map<ServerInstance, Pair<List<String>, List<String>>> realtimeRoutingTable = null; List<String> unavailableSegments = new ArrayList<>(); int numPrunedSegmentsTotal = 0; + boolean offlineTableDisabled = false; + boolean realtimeTableDisabled = false; + List<ProcessingException> exceptions = new ArrayList<>(); if (offlineBrokerRequest != null) { + offlineTableDisabled = _routingManager.isTableDisabled(offlineTableName); // NOTE: Routing table might be null if table is just removed - RoutingTable routingTable = _routingManager.getRoutingTable(offlineBrokerRequest, requestId); + RoutingTable routingTable = null; + if (!offlineTableDisabled) { + routingTable = _routingManager.getRoutingTable(offlineBrokerRequest, requestId); + } if (routingTable != null) { unavailableSegments.addAll(routingTable.getUnavailableSegments()); Map<ServerInstance, Pair<List<String>, List<String>>> serverInstanceToSegmentsMap = @@ -638,8 +646,12 @@ public abstract class BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ } } if (realtimeBrokerRequest != null) { + realtimeTableDisabled = _routingManager.isTableDisabled(realtimeTableName); // NOTE: Routing table might be null if table is just removed - RoutingTable routingTable = _routingManager.getRoutingTable(realtimeBrokerRequest, requestId); + RoutingTable routingTable = null; + if (!realtimeTableDisabled) { + routingTable = _routingManager.getRoutingTable(realtimeBrokerRequest, requestId); + } if (routingTable != null) { unavailableSegments.addAll(routingTable.getUnavailableSegments()); Map<ServerInstance, Pair<List<String>, List<String>>> serverInstanceToSegmentsMap = @@ -654,10 +666,25 @@ public abstract class BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ realtimeBrokerRequest = null; } } + + if (offlineTableDisabled || realtimeTableDisabled) { + String errorMessage = null; + if (((realtimeTableConfig != null && offlineTableConfig != null) && (offlineTableDisabled + && realtimeTableDisabled)) || (offlineTableConfig == null && realtimeTableDisabled) || ( + realtimeTableConfig == null && offlineTableDisabled)) { + requestContext.setErrorCode(QueryException.TABLE_IS_DISABLED_ERROR_CODE); + return BrokerResponseNative.TABLE_IS_DISABLED; + } else if ((realtimeTableConfig != null && offlineTableConfig != null) && realtimeTableDisabled) { + errorMessage = "Realtime table is disabled in hybrid table"; + } else if ((realtimeTableConfig != null && offlineTableConfig != null) && offlineTableDisabled) { + errorMessage = "Offline table is disabled in hybrid table"; + } + exceptions.add(QueryException.getException(QueryException.TABLE_IS_DISABLED_ERROR, errorMessage)); + } + int numUnavailableSegments = unavailableSegments.size(); requestContext.setNumUnavailableSegments(numUnavailableSegments); - List<ProcessingException> exceptions = new ArrayList<>(); if (numUnavailableSegments > 0) { String errorMessage; if (numUnavailableSegments > MAX_UNAVAILABLE_SEGMENTS_TO_PRINT_IN_QUERY_EXCEPTION) { diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java index f2f65c91e8..b6f82e0705 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java @@ -533,7 +533,7 @@ public class BrokerRoutingManager implements RoutingManager, ClusterChangeHandle RoutingEntry routingEntry = new RoutingEntry(tableNameWithType, idealStatePath, externalViewPath, segmentPreSelector, segmentSelector, segmentPruners, instanceSelector, idealStateVersion, externalViewVersion, segmentZkMetadataFetcher, - timeBoundaryManager, partitionMetadataManager, queryTimeoutMs); + timeBoundaryManager, partitionMetadataManager, queryTimeoutMs, !idealState.isEnabled()); if (_routingEntryMap.put(tableNameWithType, routingEntry) == null) { LOGGER.info("Built routing for table: {}", tableNameWithType); } else { @@ -603,6 +603,20 @@ public class BrokerRoutingManager implements RoutingManager, ClusterChangeHandle return _routingEntryMap.containsKey(tableNameWithType); } + /** + * Returns whether the given table is enabled + * @param tableNameWithType Table name with type + * @return Whether the given table is enabled + */ + public boolean isTableDisabled(String tableNameWithType) { + RoutingEntry routingEntry = _routingEntryMap.getOrDefault(tableNameWithType, null); + if (routingEntry == null) { + return false; + } else { + return routingEntry.isDisabled(); + } + } + /** * Returns the routing table (a map from server instance to list of segments hosted by the server, and a list of * unavailable segments) based on the broker request, or {@code null} if the routing does not exist. @@ -729,11 +743,14 @@ public class BrokerRoutingManager implements RoutingManager, ClusterChangeHandle // Time boundary manager is only available for the offline part of the hybrid table transient TimeBoundaryManager _timeBoundaryManager; + transient boolean _disabled; + RoutingEntry(String tableNameWithType, String idealStatePath, String externalViewPath, SegmentPreSelector segmentPreSelector, SegmentSelector segmentSelector, List<SegmentPruner> segmentPruners, InstanceSelector instanceSelector, int lastUpdateIdealStateVersion, int lastUpdateExternalViewVersion, SegmentZkMetadataFetcher segmentZkMetadataFetcher, @Nullable TimeBoundaryManager timeBoundaryManager, - @Nullable SegmentPartitionMetadataManager partitionMetadataManager, @Nullable Long queryTimeoutMs) { + @Nullable SegmentPartitionMetadataManager partitionMetadataManager, @Nullable Long queryTimeoutMs, + boolean disabled) { _tableNameWithType = tableNameWithType; _idealStatePath = idealStatePath; _externalViewPath = externalViewPath; @@ -747,6 +764,7 @@ public class BrokerRoutingManager implements RoutingManager, ClusterChangeHandle _partitionMetadataManager = partitionMetadataManager; _queryTimeoutMs = queryTimeoutMs; _segmentZkMetadataFetcher = segmentZkMetadataFetcher; + _disabled = disabled; } String getTableNameWithType() { @@ -779,6 +797,10 @@ public class BrokerRoutingManager implements RoutingManager, ClusterChangeHandle return _queryTimeoutMs; } + boolean isDisabled() { + return _disabled; + } + // NOTE: The change gets applied in sequence, and before change applied to all components, there could be some // inconsistency between components, which is fine because the inconsistency only exists for the newly changed // segments and only lasts for a very short time. @@ -793,6 +815,7 @@ public class BrokerRoutingManager implements RoutingManager, ClusterChangeHandle } _lastUpdateIdealStateVersion = idealState.getStat().getVersion(); _lastUpdateExternalViewVersion = externalView.getStat().getVersion(); + _disabled = !idealState.isEnabled(); } void onInstancesChange(Set<String> enabledInstances, List<String> changedInstances) { diff --git a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java index 82ed026aca..f6e563f62c 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java @@ -53,6 +53,7 @@ public class QueryException { public static final int COMBINE_SEGMENT_PLAN_TIMEOUT_ERROR_CODE = 170; public static final int ACCESS_DENIED_ERROR_CODE = 180; public static final int TABLE_DOES_NOT_EXIST_ERROR_CODE = 190; + public static final int TABLE_IS_DISABLED_ERROR_CODE = 191; public static final int QUERY_EXECUTION_ERROR_CODE = 200; public static final int QUERY_CANCELLATION_ERROR_CODE = 503; // TODO: Handle these errors in broker @@ -95,6 +96,8 @@ public class QueryException { new ProcessingException(COMBINE_SEGMENT_PLAN_TIMEOUT_ERROR_CODE); public static final ProcessingException TABLE_DOES_NOT_EXIST_ERROR = new ProcessingException(TABLE_DOES_NOT_EXIST_ERROR_CODE); + public static final ProcessingException TABLE_IS_DISABLED_ERROR = + new ProcessingException(TABLE_IS_DISABLED_ERROR_CODE); public static final ProcessingException QUERY_EXECUTION_ERROR = new ProcessingException(QUERY_EXECUTION_ERROR_CODE); public static final ProcessingException QUERY_CANCELLATION_ERROR = new ProcessingException(QUERY_CANCELLATION_ERROR_CODE); @@ -146,6 +149,7 @@ public class QueryException { SEGMENT_PLAN_EXECUTION_ERROR.setMessage("SegmentPlanExecutionError"); COMBINE_SEGMENT_PLAN_TIMEOUT_ERROR.setMessage("CombineSegmentPlanTimeoutError"); TABLE_DOES_NOT_EXIST_ERROR.setMessage("TableDoesNotExistError"); + TABLE_IS_DISABLED_ERROR.setMessage("TableIsDisabledError"); QUERY_EXECUTION_ERROR.setMessage("QueryExecutionError"); QUERY_CANCELLATION_ERROR.setMessage("QueryCancellationError"); SERVER_SCHEDULER_DOWN_ERROR.setMessage("ServerShuttingDown"); @@ -230,6 +234,7 @@ public class QueryException { case QueryException.SQL_PARSING_ERROR_CODE: case QueryException.TOO_MANY_REQUESTS_ERROR_CODE: case QueryException.TABLE_DOES_NOT_EXIST_ERROR_CODE: + case QueryException.TABLE_IS_DISABLED_ERROR_CODE: case QueryException.UNKNOWN_COLUMN_ERROR_CODE: return true; default: diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java index 6cdfb3cc15..83a30a10a3 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java @@ -60,6 +60,8 @@ public class BrokerResponseNative implements BrokerResponse { new BrokerResponseNative(QueryException.BROKER_RESOURCE_MISSING_ERROR); public static final BrokerResponseNative TABLE_DOES_NOT_EXIST = new BrokerResponseNative(QueryException.TABLE_DOES_NOT_EXIST_ERROR); + public static final BrokerResponseNative TABLE_IS_DISABLED = + new BrokerResponseNative(QueryException.TABLE_IS_DISABLED_ERROR); public static final BrokerResponseNative BROKER_ONLY_EXPLAIN_PLAN_OUTPUT = getBrokerResponseExplainPlanOutput(); private ResultTable _resultTable; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org