Copilot commented on code in PR #17291: URL: https://github.com/apache/pinot/pull/17291#discussion_r2617600154
########## pinot-common/src/main/java/org/apache/pinot/common/systemtable/provider/TablesSystemTableProvider.java: ########## @@ -0,0 +1,603 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.systemtable.provider; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import javax.annotation.Nullable; +import org.apache.helix.HelixAdmin; +import org.apache.pinot.common.config.provider.TableCache; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.systemtable.SystemTableFilter; +import org.apache.pinot.spi.systemtable.SystemTableProvider; +import org.apache.pinot.spi.systemtable.SystemTableRequest; +import org.apache.pinot.spi.systemtable.SystemTableResponse; +import org.apache.pinot.spi.utils.InstanceTypeUtils; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Basic system table exposing table-level metadata populated from the broker {@link TableCache}. + */ +public final class TablesSystemTableProvider implements SystemTableProvider { + private static final Logger LOGGER = LoggerFactory.getLogger(TablesSystemTableProvider.class); + public static final String TABLE_NAME = "system.tables"; + + private static final Schema SCHEMA = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME) + .addSingleValueDimension("tableName", FieldSpec.DataType.STRING) + .addSingleValueDimension("type", FieldSpec.DataType.STRING) + .addSingleValueDimension("status", FieldSpec.DataType.STRING) + .addSingleValueDimension("segments", FieldSpec.DataType.INT) + .addSingleValueDimension("totalDocs", FieldSpec.DataType.LONG) + .addMetric("reportedSize", FieldSpec.DataType.LONG) + .addMetric("estimatedSize", FieldSpec.DataType.LONG) + .addSingleValueDimension("storageTier", FieldSpec.DataType.STRING) + .addSingleValueDimension("brokerTenant", FieldSpec.DataType.STRING) + .addSingleValueDimension("serverTenant", FieldSpec.DataType.STRING) + .addSingleValueDimension("replicas", FieldSpec.DataType.INT) + .addSingleValueDimension("tableConfig", FieldSpec.DataType.STRING) + .build(); + + private static final boolean IS_ADMIN_CLIENT_AVAILABLE; + static { + boolean available; + try { + Class.forName("org.apache.pinot.client.admin.PinotAdminClient"); + available = true; + } catch (ClassNotFoundException e) { + available = false; + } + IS_ADMIN_CLIENT_AVAILABLE = available; + } + + private final TableCache _tableCache; + private final @Nullable HelixAdmin _helixAdmin; + private final @Nullable String _clusterName; + private final HttpClient _httpClient; + private final @Nullable Function<String, TableSize> _tableSizeFetcherOverride; + private final List<String> _staticControllerUrls; + + public TablesSystemTableProvider() { + this(null, null, null, null, null); + } + + public TablesSystemTableProvider(TableCache tableCache) { + this(tableCache, null, null, null, null); + } + + public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin helixAdmin) { + this(tableCache, helixAdmin, null, null, null); + } + + public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin helixAdmin, + @Nullable String clusterName) { + this(tableCache, helixAdmin, clusterName, null, null); + } + + TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin helixAdmin, @Nullable String clusterName, + @Nullable Function<String, TableSize> tableSizeFetcherOverride, @Nullable List<String> controllerUrls) { + _tableCache = tableCache; + _helixAdmin = helixAdmin; + _clusterName = clusterName; + _httpClient = HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(5)).build(); + _tableSizeFetcherOverride = tableSizeFetcherOverride; + _staticControllerUrls = controllerUrls != null ? new ArrayList<>(controllerUrls) : List.of(); + } + + @Override + public String getTableName() { + return TABLE_NAME; + } + + @Override + public Schema getSchema() { + return SCHEMA; + } + + @Override + public SystemTableResponse getRows(SystemTableRequest request) { + if (_tableCache == null) { + return new SystemTableResponse(List.of(), System.currentTimeMillis(), 0); + } + Set<String> tableNamesWithType = new LinkedHashSet<>(); + for (String tableName : _tableCache.getTableNameMap().values()) { + if (TableNameBuilder.getTableTypeFromTableName(tableName) != null) { + tableNamesWithType.add(tableName); + } + } + List<String> sortedTableNames = new ArrayList<>(tableNamesWithType); + sortedTableNames.sort(Comparator.naturalOrder()); + + int offset = Math.max(0, request.getOffset()); + int limit = request.getLimit(); + boolean hasLimit = limit > 0; + int totalRows = 0; + int initialCapacity = hasLimit ? Math.min(sortedTableNames.size(), limit) : 0; + List<GenericRow> rows = new ArrayList<>(initialCapacity); + for (String tableNameWithType : sortedTableNames) { + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); + if (tableType == null) { + continue; + } + String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); + TableStats stats = buildStats(tableNameWithType, tableType); + if (!matchesFilter(request.getFilter(), stats, rawTableName)) { + continue; + } + totalRows++; + if (offset > 0) { + offset--; + continue; + } Review Comment: When limit is 0, the provider should not collect any rows but should still count totalRows for the response. However, this condition is checked after totalRows++ but before offset handling completes. If offset > 0 and limit == 0, this logic can prematurely skip rows that should be counted in totalRows. The limit check should occur after offset handling is complete. ```suggestion if (offset > 0) { offset--; continue; } totalRows++; ``` ########## pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java: ########## @@ -1033,6 +1046,247 @@ private void throwAccessDeniedError(long requestId, String query, RequestContext throw new WebApplicationException("Permission denied." + failureMessage, Response.Status.FORBIDDEN); } + private boolean isSystemTable(String tableName) { + return tableName != null && tableName.toLowerCase(Locale.ROOT).startsWith("system."); + } + + private BrokerResponse handleSystemTableQuery(PinotQuery pinotQuery, String tableName, + RequestContext requestContext, @Nullable RequesterIdentity requesterIdentity, String query) { + if (pinotQuery.isExplain()) { + return BrokerResponseNative.BROKER_ONLY_EXPLAIN_PLAN_OUTPUT; + } + SystemTableProvider provider = _systemTableRegistry.get(tableName); + if (provider == null) { + requestContext.setErrorCode(QueryErrorCode.TABLE_DOES_NOT_EXIST); + return BrokerResponseNative.TABLE_DOES_NOT_EXIST; + } + try { + if (!isSupportedSystemTableQuery(pinotQuery)) { + requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION); + return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION, + "System tables only support simple projection/filter/limit queries"); + } + Schema systemSchema = provider.getSchema(); + List<String> projectionColumns = extractProjectionColumns(pinotQuery, systemSchema); + int offset = Math.max(0, pinotQuery.getOffset()); + int limit = Math.max(0, pinotQuery.getLimit()); + SystemTableRequest systemTableRequest = + new SystemTableRequest(projectionColumns, toSystemTableFilter(pinotQuery.getFilterExpression()), offset, + limit); + SystemTableResponse systemTableResponse = provider.getRows(systemTableRequest); + BrokerResponseNative brokerResponse = + buildSystemTableBrokerResponse(tableName, systemSchema, projectionColumns, systemTableResponse, + requestContext); + brokerResponse.setTimeUsedMs(System.currentTimeMillis() - requestContext.getRequestArrivalTimeMillis()); + _queryLogger.log(new QueryLogger.QueryLogParams(requestContext, tableName, brokerResponse, + QueryLogger.QueryLogParams.QueryEngine.SINGLE_STAGE, requesterIdentity, null)); + return brokerResponse; + } catch (BadQueryRequestException e) { + requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION); + _brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 1); + return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION, e.getMessage()); + } catch (Exception e) { + LOGGER.warn("Caught exception while handling system table query {}: {}", tableName, e.getMessage(), e); + requestContext.setErrorCode(QueryErrorCode.QUERY_EXECUTION); + return new BrokerResponseNative(QueryErrorCode.QUERY_EXECUTION, e.getMessage()); + } + } + + // System tables are broker-served virtual tables; they only support projection/filter/offset/limit today. + private boolean isSupportedSystemTableQuery(PinotQuery pinotQuery) { + return (pinotQuery.getGroupByList() == null || pinotQuery.getGroupByList().isEmpty()) + && pinotQuery.getHavingExpression() == null + && (pinotQuery.getOrderByList() == null || pinotQuery.getOrderByList().isEmpty()); + } + + private List<String> extractProjectionColumns(PinotQuery pinotQuery, Schema schema) + throws BadQueryRequestException { + List<String> projections = new ArrayList<>(); + boolean hasStar = false; + List<Expression> selectList = pinotQuery.getSelectList(); + if (CollectionUtils.isEmpty(selectList)) { + throw new BadQueryRequestException("System tables require a projection list"); + } + for (Expression expression : selectList) { + Identifier identifier = expression.getIdentifier(); + if (identifier != null) { + if ("*".equals(identifier.getName())) { + hasStar = true; + } else { + projections.add(identifier.getName()); + } + continue; + } + Function function = expression.getFunctionCall(); + if (function != null && "AS".equalsIgnoreCase(function.getOperator()) && !function.getOperands().isEmpty()) { + Identifier aliased = function.getOperands().get(0).getIdentifier(); + if (aliased != null) { + projections.add(aliased.getName()); + continue; + } + } + throw new BadQueryRequestException("System tables only support column projections or '*'"); + } + if (hasStar) { + projections = new ArrayList<>(schema.getColumnNames()); + } else if (projections.isEmpty()) { + throw new BadQueryRequestException("No valid projection columns found in SELECT list for system table"); Review Comment: This error condition appears unreachable based on the logic flow. If hasStar is true, projections gets reassigned to all schema columns. If hasStar is false and projections is empty, it means the select list had no identifiers and no AS functions with identifiers, which would have thrown an exception at line 1128. Consider removing this check or adding a code comment explaining when this can occur. ```suggestion ``` ########## pinot-common/src/main/java/org/apache/pinot/common/utils/DatabaseUtils.java: ########## @@ -67,6 +67,19 @@ public static String translateTableName(String tableName, @Nullable String datab case 2: Preconditions.checkArgument(!tableSplit[1].isEmpty(), "Invalid table name '%s'", tableName); String databasePrefix = tableSplit[0]; + /* + * Allow system tables to bypass database prefix validation so they can be queried regardless of the database + * header. System tables are intended to be globally accessible and are not subject to database-level access + * controls. Ensure system tables do not expose sensitive information because they can be queried without a + * matching database context. + * + * Security note: system tables are intentionally exempt from database-scoped access control; broader Pinot + * security is handled by AccessControl implementations (see org.apache.pinot.spi.security.AccessControl) and + * the Pinot security model documentation. New system tables should be vetted for sensitive content. + */ Review Comment: While this security note is helpful, consider adding it to the SystemTableProvider interface documentation as well, since that's where new providers are implemented. This ensures developers creating new system tables see the security guidance at the primary extension point. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
