Copilot commented on code in PR #17291:
URL: https://github.com/apache/pinot/pull/17291#discussion_r2575346399
##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java:
##########
@@ -1033,6 +1046,246 @@ private void throwAccessDeniedError(long requestId,
String query, RequestContext
throw new WebApplicationException("Permission denied." + failureMessage,
Response.Status.FORBIDDEN);
}
+ private boolean isSystemTable(String tableName) {
+ return tableName != null &&
tableName.toLowerCase(Locale.ROOT).startsWith("system.");
+ }
+
+ private BrokerResponse handleSystemTableQuery(PinotQuery pinotQuery, String
tableName,
+ RequestContext requestContext, @Nullable RequesterIdentity
requesterIdentity, String query) {
+ if (pinotQuery.isExplain()) {
+ return BrokerResponseNative.BROKER_ONLY_EXPLAIN_PLAN_OUTPUT;
+ }
+ SystemTableProvider provider = _systemTableRegistry.get(tableName);
+ if (provider == null) {
+ requestContext.setErrorCode(QueryErrorCode.TABLE_DOES_NOT_EXIST);
+ return BrokerResponseNative.TABLE_DOES_NOT_EXIST;
+ }
+ try {
+ if (!isSupportedSystemTableQuery(pinotQuery)) {
+ requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION);
+ return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION,
+ "System tables only support simple projection/filter/limit
queries");
+ }
+ Schema systemSchema = provider.getSchema();
+ List<String> projectionColumns = extractProjectionColumns(pinotQuery,
systemSchema);
+ int offset = Math.max(0, pinotQuery.getOffset());
+ int limit = Math.max(0, pinotQuery.getLimit());
+ SystemTableRequest systemTableRequest =
+ new SystemTableRequest(projectionColumns,
toSystemTableFilter(pinotQuery.getFilterExpression()), offset,
+ limit);
+ SystemTableResponse systemTableResponse =
provider.getRows(systemTableRequest);
+ BrokerResponseNative brokerResponse =
+ buildSystemTableBrokerResponse(tableName, systemSchema,
projectionColumns, systemTableResponse,
+ requestContext);
+ brokerResponse.setTimeUsedMs(System.currentTimeMillis() -
requestContext.getRequestArrivalTimeMillis());
+ _queryLogger.log(new QueryLogger.QueryLogParams(requestContext,
tableName, brokerResponse,
+ QueryLogger.QueryLogParams.QueryEngine.SINGLE_STAGE,
requesterIdentity, null));
+ return brokerResponse;
+ } catch (BadQueryRequestException e) {
+ requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION);
+
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERY_VALIDATION_EXCEPTIONS,
1);
+ return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION,
e.getMessage());
+ } catch (Exception e) {
+ LOGGER.warn("Caught exception while handling system table query {}: {}",
tableName, e.getMessage(), e);
+ requestContext.setErrorCode(QueryErrorCode.QUERY_EXECUTION);
+ return new BrokerResponseNative(QueryErrorCode.QUERY_EXECUTION,
e.getMessage());
+ }
+ }
+
+ private boolean isSupportedSystemTableQuery(PinotQuery pinotQuery) {
+ return (pinotQuery.getGroupByList() == null ||
pinotQuery.getGroupByList().isEmpty())
+ && pinotQuery.getHavingExpression() == null
+ && (pinotQuery.getOrderByList() == null ||
pinotQuery.getOrderByList().isEmpty());
+ }
+
+ private List<String> extractProjectionColumns(PinotQuery pinotQuery, Schema
schema)
+ throws BadQueryRequestException {
+ List<String> projections = new ArrayList<>();
+ boolean hasStar = false;
+ List<Expression> selectList = pinotQuery.getSelectList();
+ if (CollectionUtils.isEmpty(selectList)) {
+ throw new BadQueryRequestException("System tables require a projection
list");
+ }
+ for (Expression expression : selectList) {
+ Identifier identifier = expression.getIdentifier();
+ if (identifier != null) {
+ if ("*".equals(identifier.getName())) {
+ hasStar = true;
+ } else {
+ projections.add(identifier.getName());
+ }
+ continue;
+ }
+ Function function = expression.getFunctionCall();
+ if (function != null && "AS".equalsIgnoreCase(function.getOperator()) &&
!function.getOperands().isEmpty()) {
+ Identifier aliased = function.getOperands().get(0).getIdentifier();
+ if (aliased != null) {
+ projections.add(aliased.getName());
+ continue;
+ }
+ }
+ throw new BadQueryRequestException("System tables only support column
projections or '*'");
+ }
+ if (hasStar || projections.isEmpty()) {
+ projections = new ArrayList<>(schema.getColumnNames());
+ }
+ List<String> normalized = new ArrayList<>(projections.size());
+ for (String column : projections) {
+ if (schema.hasColumn(column)) {
+ normalized.add(column);
+ continue;
+ }
+ String matched = null;
+ for (String schemaColumn : schema.getColumnNames()) {
+ if (schemaColumn.equalsIgnoreCase(column)) {
+ matched = schemaColumn;
+ break;
+ }
+ }
+ if (matched == null) {
+ throw new BadQueryRequestException("Unknown column in system table: "
+ column);
+ }
+ normalized.add(matched);
+ }
+ return normalized;
+ }
+
+ private BrokerResponseNative buildSystemTableBrokerResponse(String
tableName, Schema schema,
+ List<String> projectionColumns, SystemTableResponse response,
RequestContext requestContext) {
+ DataSchema dataSchema = buildSystemTableDataSchema(schema,
projectionColumns);
+ List<GenericRow> rows = response != null ? response.getRows() :
Collections.emptyList();
+ List<Object[]> resultRows = new ArrayList<>();
+ if (rows != null) {
+ for (GenericRow row : rows) {
+ Object[] values = new Object[projectionColumns.size()];
+ for (int i = 0; i < projectionColumns.size(); i++) {
+ values[i] = row != null ? row.getValue(projectionColumns.get(i)) :
null;
Review Comment:
The null check for individual row elements (line 1162) is redundant since
rows is already checked to be non-null and would not contain null elements. If
null rows are possible, they should be filtered when building the list rather
than handled during iteration.
```suggestion
values[i] = row.getValue(projectionColumns.get(i));
```
##########
pinot-common/src/main/java/org/apache/pinot/common/systemtable/provider/TablesSystemTableProvider.java:
##########
@@ -0,0 +1,584 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.systemtable.provider;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.apache.helix.HelixAdmin;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.systemtable.SystemTableFilter;
+import org.apache.pinot.spi.systemtable.SystemTableProvider;
+import org.apache.pinot.spi.systemtable.SystemTableRequest;
+import org.apache.pinot.spi.systemtable.SystemTableResponse;
+import org.apache.pinot.spi.utils.InstanceTypeUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Basic system table exposing table-level metadata populated from the broker
{@link TableCache}.
+ */
+public final class TablesSystemTableProvider implements SystemTableProvider {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TablesSystemTableProvider.class);
+
+ private static final Schema SCHEMA = new
Schema.SchemaBuilder().setSchemaName("system.tables")
+ .addSingleValueDimension("tableName", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("type", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("status", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("segments", FieldSpec.DataType.INT)
+ .addSingleValueDimension("totalDocs", FieldSpec.DataType.LONG)
+ .addMetric("reportedSize", FieldSpec.DataType.LONG)
+ .addMetric("estimatedSize", FieldSpec.DataType.LONG)
+ .addSingleValueDimension("storageTier", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("brokerTenant", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("serverTenant", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("replicas", FieldSpec.DataType.INT)
+ .addSingleValueDimension("tableConfig", FieldSpec.DataType.STRING)
+ .build();
+
+ private final TableCache _tableCache;
+ private final @Nullable HelixAdmin _helixAdmin;
+ private final @Nullable String _clusterName;
+ private final HttpClient _httpClient;
+ private final @Nullable Function<String, TableSize>
_tableSizeFetcherOverride;
+ private final List<String> _staticControllerUrls;
+
+ public TablesSystemTableProvider() {
+ this(null, null, null, null, null);
+ }
+
+ public TablesSystemTableProvider(TableCache tableCache) {
+ this(tableCache, null, null, null, null);
+ }
+
+ public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin
helixAdmin) {
+ this(tableCache, helixAdmin, null, null, null);
+ }
+
+ public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin
helixAdmin,
+ @Nullable String clusterName) {
+ this(tableCache, helixAdmin, clusterName, null, null);
+ }
+
+ TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin
helixAdmin, @Nullable String clusterName,
+ @Nullable Function<String, TableSize> tableSizeFetcherOverride,
@Nullable List<String> controllerUrls) {
+ _tableCache = tableCache;
+ _helixAdmin = helixAdmin;
+ _clusterName = clusterName;
+ _httpClient =
HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(5)).build();
+ _tableSizeFetcherOverride = tableSizeFetcherOverride;
+ _staticControllerUrls = controllerUrls != null ? new
ArrayList<>(controllerUrls) : List.of();
+ }
+
+ @Override
+ public String getTableName() {
+ return "system.tables";
+ }
+
+ @Override
+ public Schema getSchema() {
+ return SCHEMA;
+ }
+
+ @Override
+ public SystemTableResponse getRows(SystemTableRequest request) {
+ if (_tableCache == null) {
+ return new SystemTableResponse(List.of(), System.currentTimeMillis(), 0);
+ }
+ Set<String> tableNamesWithType = new LinkedHashSet<>();
+ for (String tableName : _tableCache.getTableNameMap().values()) {
+ if (TableNameBuilder.getTableTypeFromTableName(tableName) != null) {
+ tableNamesWithType.add(tableName);
+ }
+ }
+ List<String> sortedTableNames = new ArrayList<>(tableNamesWithType);
+ sortedTableNames.sort(Comparator.naturalOrder());
+
+ List<GenericRow> rows = new ArrayList<>(sortedTableNames.size());
+ int offset = request.getOffset();
+ int limit = request.getLimit();
+ if (limit == 0) {
+ return new SystemTableResponse(List.of(), System.currentTimeMillis(), 0);
+ }
+ for (String tableNameWithType : sortedTableNames) {
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ if (tableType == null) {
+ continue;
+ }
+ String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
+ TableStats stats = buildStats(tableNameWithType, tableType);
+ if (!matchesFilter(request.getFilter(), stats, rawTableName)) {
+ continue;
+ }
+ GenericRow row = new GenericRow();
+ row.putValue("tableName", rawTableName);
+ row.putValue("type", stats._type);
+ row.putValue("status", stats._status);
+ row.putValue("segments", stats._segments);
+ row.putValue("totalDocs", stats._totalDocs);
+ row.putValue("reportedSize", stats._reportedSizeInBytes);
+ row.putValue("estimatedSize", stats._estimatedSizeInBytes);
+ row.putValue("storageTier", stats._storageTier);
+ row.putValue("brokerTenant", stats._brokerTenant);
+ row.putValue("serverTenant", stats._serverTenant);
+ row.putValue("replicas", stats._replicas);
+ row.putValue("tableConfig", stats._tableConfig);
+ if (offset > 0) {
+ offset--;
+ continue;
+ }
+ rows.add(row);
+ if (limit > 0 && rows.size() >= limit) {
+ break;
+ }
+ }
+ return new SystemTableResponse(rows, System.currentTimeMillis(),
rows.size());
Review Comment:
The totalRows returned should be the count of all matching rows before
applying offset/limit, but this returns rows.size() which is the count after
applying limit. This will cause incorrect pagination metadata when offset/limit
are used.
```suggestion
return new SystemTableResponse(rows, System.currentTimeMillis(),
sortedTableNames.size());
```
##########
pinot-common/src/main/java/org/apache/pinot/common/systemtable/provider/TablesSystemTableProvider.java:
##########
@@ -0,0 +1,584 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.systemtable.provider;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.apache.helix.HelixAdmin;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.systemtable.SystemTableFilter;
+import org.apache.pinot.spi.systemtable.SystemTableProvider;
+import org.apache.pinot.spi.systemtable.SystemTableRequest;
+import org.apache.pinot.spi.systemtable.SystemTableResponse;
+import org.apache.pinot.spi.utils.InstanceTypeUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Basic system table exposing table-level metadata populated from the broker
{@link TableCache}.
+ */
+public final class TablesSystemTableProvider implements SystemTableProvider {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TablesSystemTableProvider.class);
+
+ private static final Schema SCHEMA = new
Schema.SchemaBuilder().setSchemaName("system.tables")
+ .addSingleValueDimension("tableName", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("type", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("status", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("segments", FieldSpec.DataType.INT)
+ .addSingleValueDimension("totalDocs", FieldSpec.DataType.LONG)
+ .addMetric("reportedSize", FieldSpec.DataType.LONG)
+ .addMetric("estimatedSize", FieldSpec.DataType.LONG)
+ .addSingleValueDimension("storageTier", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("brokerTenant", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("serverTenant", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("replicas", FieldSpec.DataType.INT)
+ .addSingleValueDimension("tableConfig", FieldSpec.DataType.STRING)
+ .build();
+
+ private final TableCache _tableCache;
+ private final @Nullable HelixAdmin _helixAdmin;
+ private final @Nullable String _clusterName;
+ private final HttpClient _httpClient;
+ private final @Nullable Function<String, TableSize>
_tableSizeFetcherOverride;
+ private final List<String> _staticControllerUrls;
+
+ public TablesSystemTableProvider() {
+ this(null, null, null, null, null);
+ }
+
+ public TablesSystemTableProvider(TableCache tableCache) {
+ this(tableCache, null, null, null, null);
+ }
+
+ public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin
helixAdmin) {
+ this(tableCache, helixAdmin, null, null, null);
+ }
+
+ public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin
helixAdmin,
+ @Nullable String clusterName) {
+ this(tableCache, helixAdmin, clusterName, null, null);
+ }
+
+ TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin
helixAdmin, @Nullable String clusterName,
+ @Nullable Function<String, TableSize> tableSizeFetcherOverride,
@Nullable List<String> controllerUrls) {
+ _tableCache = tableCache;
+ _helixAdmin = helixAdmin;
+ _clusterName = clusterName;
+ _httpClient =
HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(5)).build();
+ _tableSizeFetcherOverride = tableSizeFetcherOverride;
+ _staticControllerUrls = controllerUrls != null ? new
ArrayList<>(controllerUrls) : List.of();
+ }
+
+ @Override
+ public String getTableName() {
+ return "system.tables";
+ }
+
+ @Override
+ public Schema getSchema() {
+ return SCHEMA;
+ }
+
+ @Override
+ public SystemTableResponse getRows(SystemTableRequest request) {
+ if (_tableCache == null) {
+ return new SystemTableResponse(List.of(), System.currentTimeMillis(), 0);
+ }
+ Set<String> tableNamesWithType = new LinkedHashSet<>();
+ for (String tableName : _tableCache.getTableNameMap().values()) {
+ if (TableNameBuilder.getTableTypeFromTableName(tableName) != null) {
+ tableNamesWithType.add(tableName);
+ }
+ }
+ List<String> sortedTableNames = new ArrayList<>(tableNamesWithType);
+ sortedTableNames.sort(Comparator.naturalOrder());
+
+ List<GenericRow> rows = new ArrayList<>(sortedTableNames.size());
+ int offset = request.getOffset();
+ int limit = request.getLimit();
+ if (limit == 0) {
+ return new SystemTableResponse(List.of(), System.currentTimeMillis(), 0);
+ }
+ for (String tableNameWithType : sortedTableNames) {
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ if (tableType == null) {
+ continue;
+ }
+ String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
+ TableStats stats = buildStats(tableNameWithType, tableType);
+ if (!matchesFilter(request.getFilter(), stats, rawTableName)) {
+ continue;
+ }
+ GenericRow row = new GenericRow();
+ row.putValue("tableName", rawTableName);
+ row.putValue("type", stats._type);
+ row.putValue("status", stats._status);
+ row.putValue("segments", stats._segments);
+ row.putValue("totalDocs", stats._totalDocs);
+ row.putValue("reportedSize", stats._reportedSizeInBytes);
+ row.putValue("estimatedSize", stats._estimatedSizeInBytes);
+ row.putValue("storageTier", stats._storageTier);
+ row.putValue("brokerTenant", stats._brokerTenant);
+ row.putValue("serverTenant", stats._serverTenant);
+ row.putValue("replicas", stats._replicas);
+ row.putValue("tableConfig", stats._tableConfig);
+ if (offset > 0) {
+ offset--;
+ continue;
+ }
Review Comment:
The offset logic is applied after the filter check, which means the offset
is counting filtered rows. However, the totalRows count returned in line 171
uses rows.size() after pagination, which creates inconsistent pagination
behavior. The totalRows should reflect all filtered rows before offset/limit,
but currently it's rows after limit.
##########
pinot-common/src/main/java/org/apache/pinot/common/systemtable/provider/TablesSystemTableProvider.java:
##########
@@ -0,0 +1,584 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.systemtable.provider;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.apache.helix.HelixAdmin;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.systemtable.SystemTableFilter;
+import org.apache.pinot.spi.systemtable.SystemTableProvider;
+import org.apache.pinot.spi.systemtable.SystemTableRequest;
+import org.apache.pinot.spi.systemtable.SystemTableResponse;
+import org.apache.pinot.spi.utils.InstanceTypeUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Basic system table exposing table-level metadata populated from the broker
{@link TableCache}.
+ */
+public final class TablesSystemTableProvider implements SystemTableProvider {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TablesSystemTableProvider.class);
+
+ private static final Schema SCHEMA = new
Schema.SchemaBuilder().setSchemaName("system.tables")
+ .addSingleValueDimension("tableName", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("type", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("status", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("segments", FieldSpec.DataType.INT)
+ .addSingleValueDimension("totalDocs", FieldSpec.DataType.LONG)
+ .addMetric("reportedSize", FieldSpec.DataType.LONG)
+ .addMetric("estimatedSize", FieldSpec.DataType.LONG)
+ .addSingleValueDimension("storageTier", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("brokerTenant", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("serverTenant", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("replicas", FieldSpec.DataType.INT)
+ .addSingleValueDimension("tableConfig", FieldSpec.DataType.STRING)
+ .build();
+
+ private final TableCache _tableCache;
+ private final @Nullable HelixAdmin _helixAdmin;
+ private final @Nullable String _clusterName;
+ private final HttpClient _httpClient;
+ private final @Nullable Function<String, TableSize>
_tableSizeFetcherOverride;
+ private final List<String> _staticControllerUrls;
+
+ public TablesSystemTableProvider() {
+ this(null, null, null, null, null);
+ }
+
+ public TablesSystemTableProvider(TableCache tableCache) {
+ this(tableCache, null, null, null, null);
+ }
+
+ public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin
helixAdmin) {
+ this(tableCache, helixAdmin, null, null, null);
+ }
+
+ public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin
helixAdmin,
+ @Nullable String clusterName) {
+ this(tableCache, helixAdmin, clusterName, null, null);
+ }
+
+ TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin
helixAdmin, @Nullable String clusterName,
+ @Nullable Function<String, TableSize> tableSizeFetcherOverride,
@Nullable List<String> controllerUrls) {
+ _tableCache = tableCache;
+ _helixAdmin = helixAdmin;
+ _clusterName = clusterName;
+ _httpClient =
HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(5)).build();
+ _tableSizeFetcherOverride = tableSizeFetcherOverride;
+ _staticControllerUrls = controllerUrls != null ? new
ArrayList<>(controllerUrls) : List.of();
+ }
+
+ @Override
+ public String getTableName() {
+ return "system.tables";
+ }
+
+ @Override
+ public Schema getSchema() {
+ return SCHEMA;
+ }
+
+ @Override
+ public SystemTableResponse getRows(SystemTableRequest request) {
+ if (_tableCache == null) {
+ return new SystemTableResponse(List.of(), System.currentTimeMillis(), 0);
+ }
+ Set<String> tableNamesWithType = new LinkedHashSet<>();
+ for (String tableName : _tableCache.getTableNameMap().values()) {
+ if (TableNameBuilder.getTableTypeFromTableName(tableName) != null) {
+ tableNamesWithType.add(tableName);
+ }
+ }
+ List<String> sortedTableNames = new ArrayList<>(tableNamesWithType);
+ sortedTableNames.sort(Comparator.naturalOrder());
+
+ List<GenericRow> rows = new ArrayList<>(sortedTableNames.size());
+ int offset = request.getOffset();
+ int limit = request.getLimit();
+ if (limit == 0) {
+ return new SystemTableResponse(List.of(), System.currentTimeMillis(), 0);
+ }
+ for (String tableNameWithType : sortedTableNames) {
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ if (tableType == null) {
+ continue;
+ }
+ String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
+ TableStats stats = buildStats(tableNameWithType, tableType);
+ if (!matchesFilter(request.getFilter(), stats, rawTableName)) {
+ continue;
+ }
+ GenericRow row = new GenericRow();
+ row.putValue("tableName", rawTableName);
+ row.putValue("type", stats._type);
+ row.putValue("status", stats._status);
+ row.putValue("segments", stats._segments);
+ row.putValue("totalDocs", stats._totalDocs);
+ row.putValue("reportedSize", stats._reportedSizeInBytes);
+ row.putValue("estimatedSize", stats._estimatedSizeInBytes);
+ row.putValue("storageTier", stats._storageTier);
+ row.putValue("brokerTenant", stats._brokerTenant);
+ row.putValue("serverTenant", stats._serverTenant);
+ row.putValue("replicas", stats._replicas);
+ row.putValue("tableConfig", stats._tableConfig);
+ if (offset > 0) {
+ offset--;
+ continue;
+ }
+ rows.add(row);
+ if (limit > 0 && rows.size() >= limit) {
+ break;
+ }
+ }
+ return new SystemTableResponse(rows, System.currentTimeMillis(),
rows.size());
+ }
+
+ private TableStats buildStats(String tableNameWithType, TableType tableType)
{
+ TableConfig tableConfig = _tableCache.getTableConfig(tableNameWithType);
+ int segments = 0;
+ long totalDocs = 0;
+ long reportedSize = 0;
+ long estimatedSize = 0;
+ String tierValue = "";
+ String brokerTenant = "";
+ String serverTenant = "";
+ int replicas = 0;
+ if (tableConfig != null && tableConfig.getTenantConfig() != null) {
+ brokerTenant = tableConfig.getTenantConfig().getBroker();
+ serverTenant = tableConfig.getTenantConfig().getServer();
+ }
+ if (tableConfig != null && tableConfig.getValidationConfig() != null) {
+ Integer repl = tableConfig.getValidationConfig().getReplicationNumber();
+ replicas = repl != null ? repl : replicas;
+ }
+ // Use controller API only
+ TableSize sizeFromController = fetchTableSize(tableNameWithType);
+ if (sizeFromController != null) {
+ if (sizeFromController._reportedSizeInBytes >= 0) {
+ reportedSize = sizeFromController._reportedSizeInBytes;
+ }
+ if (sizeFromController._estimatedSizeInBytes >= 0) {
+ estimatedSize = sizeFromController._estimatedSizeInBytes;
+ }
+ segments = getSegmentCount(sizeFromController, tableType);
+ totalDocs = getTotalDocs(sizeFromController, tableType);
+ }
+ String status = tableConfig != null ? "ONLINE" : (segments > 0 ? "ONLINE"
: "UNKNOWN");
+ String tableConfigJson = "";
+ if (tableConfig != null) {
+ try {
+ tableConfigJson = JsonUtils.objectToString(tableConfig);
+ } catch (Exception e) {
+ LOGGER.warn("Failed to serialize table config for {}: {}",
tableNameWithType, e.toString());
+ tableConfigJson = tableConfig.toString();
+ }
+ }
+ return new TableStats(tableType.name(), status, segments, totalDocs,
reportedSize, estimatedSize, tierValue,
+ tableConfigJson, brokerTenant, serverTenant, replicas);
+ }
+
+ private boolean matchesFilter(SystemTableFilter filter, TableStats stats,
String rawTableName) {
+ if (filter == null) {
+ return true;
+ }
+ if (filter.getChildren().isEmpty()) {
+ return matchesLeafFilter(filter, stats, rawTableName);
+ }
+ switch (filter.getOperator()) {
+ case AND:
+ for (SystemTableFilter child : filter.getChildren()) {
+ if (!matchesFilter(child, stats, rawTableName)) {
+ return false;
+ }
+ }
+ return true;
+ case OR:
+ for (SystemTableFilter child : filter.getChildren()) {
+ if (matchesFilter(child, stats, rawTableName)) {
+ return true;
+ }
+ }
+ return false;
+ case NOT:
+ return filter.getChildren().isEmpty() ||
!matchesFilter(filter.getChildren().get(0), stats, rawTableName);
+ default:
+ return true;
+ }
+ }
+
+ private boolean matchesLeafFilter(SystemTableFilter filter, TableStats
stats, String rawTableName) {
+ String column = filter.getColumn();
+ if (column == null) {
+ return true;
+ }
+ List<String> values = filter.getValues();
+ if (values == null || values.isEmpty()) {
+ return true;
+ }
+ switch (column.toLowerCase()) {
+ case "tablename":
+ return matchesString(values, rawTableName, filter.getOperator());
+ case "type":
+ return matchesString(values, stats._type, filter.getOperator());
+ case "status":
+ return matchesString(values, stats._status, filter.getOperator());
+ case "segments":
+ return matchesNumber(values, stats._segments, filter.getOperator());
+ case "reportedsize":
+ return matchesNumber(values, stats._reportedSizeInBytes,
filter.getOperator());
+ case "estimatedsize":
+ return matchesNumber(values, stats._estimatedSizeInBytes,
filter.getOperator());
+ default:
+ return true;
+ }
+ }
+
+ private boolean matchesString(List<String> candidates, String actual,
SystemTableFilter.Operator operator) {
+ switch (operator) {
+ case EQ:
+ return candidates.stream().anyMatch(v -> v.equalsIgnoreCase(actual));
+ case IN:
+ return candidates.stream().anyMatch(v -> v.equalsIgnoreCase(actual));
+ case NEQ:
+ return candidates.stream().noneMatch(v -> v.equalsIgnoreCase(actual));
+ default:
+ return true;
+ }
+ }
+
+ private boolean matchesNumber(List<String> candidates, long actual,
SystemTableFilter.Operator operator) {
+ try {
+ switch (operator) {
+ case EQ:
+ return candidates.stream().anyMatch(v -> Long.parseLong(v) ==
actual);
+ case NEQ:
+ return candidates.stream().noneMatch(v -> Long.parseLong(v) ==
actual);
+ case GT:
+ return candidates.stream().anyMatch(v -> actual > Long.parseLong(v));
+ case GTE:
+ return candidates.stream().anyMatch(v -> actual >=
Long.parseLong(v));
+ case LT:
+ return candidates.stream().anyMatch(v -> actual < Long.parseLong(v));
+ case LTE:
+ return candidates.stream().anyMatch(v -> actual <=
Long.parseLong(v));
+ case IN:
+ for (String candidate : candidates) {
+ if (actual == Long.parseLong(candidate)) {
+ return true;
+ }
+ }
+ return false;
+ default:
+ return true;
+ }
+ } catch (NumberFormatException e) {
+ LOGGER.debug("Failed to parse numeric filter value {}: {}", candidates,
e.toString());
+ return true;
+ }
+ }
+
+ private @Nullable TableSize fetchTableSize(String tableNameWithType) {
+ Function<String, TableSize> fetcher = getSizeFetcher();
+ if (fetcher != null) {
+ try {
+ TableSize fetched = fetcher.apply(tableNameWithType);
+ if (fetched != null) {
+ return fetched;
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Table size fetcher failed for {}: {}", tableNameWithType,
e.toString());
+ }
+ }
+ String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
+ TableSize size = fetchTableSizeForName(rawTableName);
+ if (size == null) {
+ size = fetchTableSizeForName(tableNameWithType);
+ if (size == null) {
+ LOGGER.warn("system.tables: failed to fetch size for {} (raw: {}),
falling back to zeros",
+ tableNameWithType, rawTableName);
+ }
+ }
+ return size;
+ }
+
+ private @Nullable TableSize fetchTableSizeForName(String tableName) {
+ for (String baseUrl : getControllerBaseUrls()) {
+ try {
+ String url = baseUrl + "/tables/" + tableName +
"/size?verbose=true&includeReplacedSegments=false";
+ HttpRequest request = HttpRequest.newBuilder(URI.create(url))
+ .timeout(Duration.ofSeconds(5))
+ .GET()
+ .header("Accept", "application/json")
+ .build();
+ HttpResponse<String> response = _httpClient.send(request,
HttpResponse.BodyHandlers.ofString());
+ if (response.statusCode() >= 200 && response.statusCode() < 300) {
+ TableSize parsed = JsonUtils.stringToObject(response.body(),
TableSize.class);
+ LOGGER.info("system.tables: controller size response for {} via {}
-> segments offline={}, realtime={}, "
+ + "reportedSize={}, estimatedSize={}", tableName, baseUrl,
+ parsed._offlineSegments != null &&
parsed._offlineSegments._segments != null
+ ? parsed._offlineSegments._segments.size() : 0,
+ parsed._realtimeSegments != null &&
parsed._realtimeSegments._segments != null
+ ? parsed._realtimeSegments._segments.size() : 0,
+ parsed._reportedSizeInBytes, parsed._estimatedSizeInBytes);
Review Comment:
[nitpick] This info-level log on every table size fetch could be noisy in
production with many tables. Consider using debug level or adding a sample rate
to reduce log volume.
##########
pinot-common/src/main/java/org/apache/pinot/common/systemtable/provider/TablesSystemTableProvider.java:
##########
@@ -0,0 +1,584 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.systemtable.provider;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.apache.helix.HelixAdmin;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.systemtable.SystemTableFilter;
+import org.apache.pinot.spi.systemtable.SystemTableProvider;
+import org.apache.pinot.spi.systemtable.SystemTableRequest;
+import org.apache.pinot.spi.systemtable.SystemTableResponse;
+import org.apache.pinot.spi.utils.InstanceTypeUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Basic system table exposing table-level metadata populated from the broker
{@link TableCache}.
+ */
+public final class TablesSystemTableProvider implements SystemTableProvider {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TablesSystemTableProvider.class);
+
+ private static final Schema SCHEMA = new
Schema.SchemaBuilder().setSchemaName("system.tables")
+ .addSingleValueDimension("tableName", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("type", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("status", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("segments", FieldSpec.DataType.INT)
+ .addSingleValueDimension("totalDocs", FieldSpec.DataType.LONG)
+ .addMetric("reportedSize", FieldSpec.DataType.LONG)
+ .addMetric("estimatedSize", FieldSpec.DataType.LONG)
+ .addSingleValueDimension("storageTier", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("brokerTenant", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("serverTenant", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("replicas", FieldSpec.DataType.INT)
+ .addSingleValueDimension("tableConfig", FieldSpec.DataType.STRING)
+ .build();
+
+ private final TableCache _tableCache;
+ private final @Nullable HelixAdmin _helixAdmin;
+ private final @Nullable String _clusterName;
+ private final HttpClient _httpClient;
+ private final @Nullable Function<String, TableSize>
_tableSizeFetcherOverride;
+ private final List<String> _staticControllerUrls;
+
+ public TablesSystemTableProvider() {
+ this(null, null, null, null, null);
+ }
+
+ public TablesSystemTableProvider(TableCache tableCache) {
+ this(tableCache, null, null, null, null);
+ }
+
+ public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin
helixAdmin) {
+ this(tableCache, helixAdmin, null, null, null);
+ }
+
+ public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin
helixAdmin,
+ @Nullable String clusterName) {
+ this(tableCache, helixAdmin, clusterName, null, null);
+ }
+
+ TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin
helixAdmin, @Nullable String clusterName,
+ @Nullable Function<String, TableSize> tableSizeFetcherOverride,
@Nullable List<String> controllerUrls) {
+ _tableCache = tableCache;
+ _helixAdmin = helixAdmin;
+ _clusterName = clusterName;
+ _httpClient =
HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(5)).build();
+ _tableSizeFetcherOverride = tableSizeFetcherOverride;
+ _staticControllerUrls = controllerUrls != null ? new
ArrayList<>(controllerUrls) : List.of();
+ }
+
+ @Override
+ public String getTableName() {
+ return "system.tables";
+ }
+
+ @Override
+ public Schema getSchema() {
+ return SCHEMA;
+ }
+
+ @Override
+ public SystemTableResponse getRows(SystemTableRequest request) {
+ if (_tableCache == null) {
+ return new SystemTableResponse(List.of(), System.currentTimeMillis(), 0);
+ }
+ Set<String> tableNamesWithType = new LinkedHashSet<>();
+ for (String tableName : _tableCache.getTableNameMap().values()) {
+ if (TableNameBuilder.getTableTypeFromTableName(tableName) != null) {
+ tableNamesWithType.add(tableName);
+ }
+ }
+ List<String> sortedTableNames = new ArrayList<>(tableNamesWithType);
+ sortedTableNames.sort(Comparator.naturalOrder());
+
+ List<GenericRow> rows = new ArrayList<>(sortedTableNames.size());
+ int offset = request.getOffset();
+ int limit = request.getLimit();
+ if (limit == 0) {
+ return new SystemTableResponse(List.of(), System.currentTimeMillis(), 0);
+ }
+ for (String tableNameWithType : sortedTableNames) {
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ if (tableType == null) {
+ continue;
+ }
+ String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
+ TableStats stats = buildStats(tableNameWithType, tableType);
+ if (!matchesFilter(request.getFilter(), stats, rawTableName)) {
+ continue;
+ }
+ GenericRow row = new GenericRow();
+ row.putValue("tableName", rawTableName);
+ row.putValue("type", stats._type);
+ row.putValue("status", stats._status);
+ row.putValue("segments", stats._segments);
+ row.putValue("totalDocs", stats._totalDocs);
+ row.putValue("reportedSize", stats._reportedSizeInBytes);
+ row.putValue("estimatedSize", stats._estimatedSizeInBytes);
+ row.putValue("storageTier", stats._storageTier);
+ row.putValue("brokerTenant", stats._brokerTenant);
+ row.putValue("serverTenant", stats._serverTenant);
+ row.putValue("replicas", stats._replicas);
+ row.putValue("tableConfig", stats._tableConfig);
+ if (offset > 0) {
+ offset--;
+ continue;
+ }
+ rows.add(row);
+ if (limit > 0 && rows.size() >= limit) {
+ break;
+ }
+ }
+ return new SystemTableResponse(rows, System.currentTimeMillis(),
rows.size());
+ }
+
+ private TableStats buildStats(String tableNameWithType, TableType tableType)
{
+ TableConfig tableConfig = _tableCache.getTableConfig(tableNameWithType);
+ int segments = 0;
+ long totalDocs = 0;
+ long reportedSize = 0;
+ long estimatedSize = 0;
+ String tierValue = "";
+ String brokerTenant = "";
+ String serverTenant = "";
+ int replicas = 0;
+ if (tableConfig != null && tableConfig.getTenantConfig() != null) {
+ brokerTenant = tableConfig.getTenantConfig().getBroker();
+ serverTenant = tableConfig.getTenantConfig().getServer();
+ }
+ if (tableConfig != null && tableConfig.getValidationConfig() != null) {
+ Integer repl = tableConfig.getValidationConfig().getReplicationNumber();
+ replicas = repl != null ? repl : replicas;
+ }
+ // Use controller API only
+ TableSize sizeFromController = fetchTableSize(tableNameWithType);
+ if (sizeFromController != null) {
+ if (sizeFromController._reportedSizeInBytes >= 0) {
+ reportedSize = sizeFromController._reportedSizeInBytes;
+ }
+ if (sizeFromController._estimatedSizeInBytes >= 0) {
+ estimatedSize = sizeFromController._estimatedSizeInBytes;
+ }
+ segments = getSegmentCount(sizeFromController, tableType);
+ totalDocs = getTotalDocs(sizeFromController, tableType);
+ }
+ String status = tableConfig != null ? "ONLINE" : (segments > 0 ? "ONLINE"
: "UNKNOWN");
+ String tableConfigJson = "";
+ if (tableConfig != null) {
+ try {
+ tableConfigJson = JsonUtils.objectToString(tableConfig);
+ } catch (Exception e) {
+ LOGGER.warn("Failed to serialize table config for {}: {}",
tableNameWithType, e.toString());
+ tableConfigJson = tableConfig.toString();
+ }
+ }
+ return new TableStats(tableType.name(), status, segments, totalDocs,
reportedSize, estimatedSize, tierValue,
+ tableConfigJson, brokerTenant, serverTenant, replicas);
+ }
+
+ private boolean matchesFilter(SystemTableFilter filter, TableStats stats,
String rawTableName) {
+ if (filter == null) {
+ return true;
+ }
+ if (filter.getChildren().isEmpty()) {
+ return matchesLeafFilter(filter, stats, rawTableName);
+ }
+ switch (filter.getOperator()) {
+ case AND:
+ for (SystemTableFilter child : filter.getChildren()) {
+ if (!matchesFilter(child, stats, rawTableName)) {
+ return false;
+ }
+ }
+ return true;
+ case OR:
+ for (SystemTableFilter child : filter.getChildren()) {
+ if (matchesFilter(child, stats, rawTableName)) {
+ return true;
+ }
+ }
+ return false;
+ case NOT:
+ return filter.getChildren().isEmpty() ||
!matchesFilter(filter.getChildren().get(0), stats, rawTableName);
+ default:
+ return true;
+ }
+ }
+
+ private boolean matchesLeafFilter(SystemTableFilter filter, TableStats
stats, String rawTableName) {
+ String column = filter.getColumn();
+ if (column == null) {
+ return true;
+ }
+ List<String> values = filter.getValues();
+ if (values == null || values.isEmpty()) {
+ return true;
+ }
+ switch (column.toLowerCase()) {
+ case "tablename":
+ return matchesString(values, rawTableName, filter.getOperator());
+ case "type":
+ return matchesString(values, stats._type, filter.getOperator());
+ case "status":
+ return matchesString(values, stats._status, filter.getOperator());
+ case "segments":
+ return matchesNumber(values, stats._segments, filter.getOperator());
+ case "reportedsize":
+ return matchesNumber(values, stats._reportedSizeInBytes,
filter.getOperator());
+ case "estimatedsize":
+ return matchesNumber(values, stats._estimatedSizeInBytes,
filter.getOperator());
+ default:
+ return true;
+ }
+ }
+
+ private boolean matchesString(List<String> candidates, String actual,
SystemTableFilter.Operator operator) {
+ switch (operator) {
+ case EQ:
+ return candidates.stream().anyMatch(v -> v.equalsIgnoreCase(actual));
+ case IN:
+ return candidates.stream().anyMatch(v -> v.equalsIgnoreCase(actual));
+ case NEQ:
+ return candidates.stream().noneMatch(v -> v.equalsIgnoreCase(actual));
+ default:
+ return true;
+ }
+ }
+
+ private boolean matchesNumber(List<String> candidates, long actual,
SystemTableFilter.Operator operator) {
+ try {
+ switch (operator) {
+ case EQ:
+ return candidates.stream().anyMatch(v -> Long.parseLong(v) ==
actual);
+ case NEQ:
+ return candidates.stream().noneMatch(v -> Long.parseLong(v) ==
actual);
+ case GT:
+ return candidates.stream().anyMatch(v -> actual > Long.parseLong(v));
+ case GTE:
+ return candidates.stream().anyMatch(v -> actual >=
Long.parseLong(v));
+ case LT:
+ return candidates.stream().anyMatch(v -> actual < Long.parseLong(v));
+ case LTE:
+ return candidates.stream().anyMatch(v -> actual <=
Long.parseLong(v));
+ case IN:
+ for (String candidate : candidates) {
+ if (actual == Long.parseLong(candidate)) {
+ return true;
+ }
+ }
+ return false;
+ default:
+ return true;
+ }
+ } catch (NumberFormatException e) {
+ LOGGER.debug("Failed to parse numeric filter value {}: {}", candidates,
e.toString());
+ return true;
+ }
+ }
+
+ private @Nullable TableSize fetchTableSize(String tableNameWithType) {
+ Function<String, TableSize> fetcher = getSizeFetcher();
+ if (fetcher != null) {
+ try {
+ TableSize fetched = fetcher.apply(tableNameWithType);
+ if (fetched != null) {
+ return fetched;
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Table size fetcher failed for {}: {}", tableNameWithType,
e.toString());
+ }
+ }
+ String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
+ TableSize size = fetchTableSizeForName(rawTableName);
+ if (size == null) {
+ size = fetchTableSizeForName(tableNameWithType);
+ if (size == null) {
+ LOGGER.warn("system.tables: failed to fetch size for {} (raw: {}),
falling back to zeros",
+ tableNameWithType, rawTableName);
+ }
+ }
+ return size;
+ }
+
+ private @Nullable TableSize fetchTableSizeForName(String tableName) {
+ for (String baseUrl : getControllerBaseUrls()) {
+ try {
+ String url = baseUrl + "/tables/" + tableName +
"/size?verbose=true&includeReplacedSegments=false";
+ HttpRequest request = HttpRequest.newBuilder(URI.create(url))
+ .timeout(Duration.ofSeconds(5))
+ .GET()
+ .header("Accept", "application/json")
+ .build();
+ HttpResponse<String> response = _httpClient.send(request,
HttpResponse.BodyHandlers.ofString());
+ if (response.statusCode() >= 200 && response.statusCode() < 300) {
+ TableSize parsed = JsonUtils.stringToObject(response.body(),
TableSize.class);
+ LOGGER.info("system.tables: controller size response for {} via {}
-> segments offline={}, realtime={}, "
+ + "reportedSize={}, estimatedSize={}", tableName, baseUrl,
+ parsed._offlineSegments != null &&
parsed._offlineSegments._segments != null
+ ? parsed._offlineSegments._segments.size() : 0,
+ parsed._realtimeSegments != null &&
parsed._realtimeSegments._segments != null
+ ? parsed._realtimeSegments._segments.size() : 0,
+ parsed._reportedSizeInBytes, parsed._estimatedSizeInBytes);
+ return parsed;
+ } else {
+ LOGGER.warn("system.tables: failed to fetch table size for {} via
{}: status {}, body={}", tableName,
+ baseUrl, response.statusCode(), response.body());
+ }
+ } catch (Exception e) {
+ LOGGER.warn("system.tables: error fetching table size for {} via {}:
{}", tableName, baseUrl, e.toString(), e);
Review Comment:
The error message format string has 3 placeholders but 4 arguments are
provided (tableName, baseUrl, e.toString(), e). The last 'e' argument for stack
trace won't match any placeholder, causing a formatting error.
```suggestion
LOGGER.warn("system.tables: error fetching table size for {} via
{}", tableName, baseUrl, e);
```
##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java:
##########
@@ -1033,6 +1046,246 @@ private void throwAccessDeniedError(long requestId,
String query, RequestContext
throw new WebApplicationException("Permission denied." + failureMessage,
Response.Status.FORBIDDEN);
}
+ private boolean isSystemTable(String tableName) {
+ return tableName != null &&
tableName.toLowerCase(Locale.ROOT).startsWith("system.");
+ }
+
+ private BrokerResponse handleSystemTableQuery(PinotQuery pinotQuery, String
tableName,
+ RequestContext requestContext, @Nullable RequesterIdentity
requesterIdentity, String query) {
+ if (pinotQuery.isExplain()) {
+ return BrokerResponseNative.BROKER_ONLY_EXPLAIN_PLAN_OUTPUT;
+ }
+ SystemTableProvider provider = _systemTableRegistry.get(tableName);
+ if (provider == null) {
+ requestContext.setErrorCode(QueryErrorCode.TABLE_DOES_NOT_EXIST);
+ return BrokerResponseNative.TABLE_DOES_NOT_EXIST;
+ }
+ try {
+ if (!isSupportedSystemTableQuery(pinotQuery)) {
+ requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION);
+ return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION,
+ "System tables only support simple projection/filter/limit
queries");
+ }
+ Schema systemSchema = provider.getSchema();
+ List<String> projectionColumns = extractProjectionColumns(pinotQuery,
systemSchema);
+ int offset = Math.max(0, pinotQuery.getOffset());
+ int limit = Math.max(0, pinotQuery.getLimit());
+ SystemTableRequest systemTableRequest =
+ new SystemTableRequest(projectionColumns,
toSystemTableFilter(pinotQuery.getFilterExpression()), offset,
+ limit);
+ SystemTableResponse systemTableResponse =
provider.getRows(systemTableRequest);
+ BrokerResponseNative brokerResponse =
+ buildSystemTableBrokerResponse(tableName, systemSchema,
projectionColumns, systemTableResponse,
+ requestContext);
+ brokerResponse.setTimeUsedMs(System.currentTimeMillis() -
requestContext.getRequestArrivalTimeMillis());
+ _queryLogger.log(new QueryLogger.QueryLogParams(requestContext,
tableName, brokerResponse,
+ QueryLogger.QueryLogParams.QueryEngine.SINGLE_STAGE,
requesterIdentity, null));
+ return brokerResponse;
+ } catch (BadQueryRequestException e) {
+ requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION);
+
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERY_VALIDATION_EXCEPTIONS,
1);
+ return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION,
e.getMessage());
+ } catch (Exception e) {
+ LOGGER.warn("Caught exception while handling system table query {}: {}",
tableName, e.getMessage(), e);
+ requestContext.setErrorCode(QueryErrorCode.QUERY_EXECUTION);
+ return new BrokerResponseNative(QueryErrorCode.QUERY_EXECUTION,
e.getMessage());
+ }
+ }
+
+ private boolean isSupportedSystemTableQuery(PinotQuery pinotQuery) {
+ return (pinotQuery.getGroupByList() == null ||
pinotQuery.getGroupByList().isEmpty())
+ && pinotQuery.getHavingExpression() == null
+ && (pinotQuery.getOrderByList() == null ||
pinotQuery.getOrderByList().isEmpty());
+ }
+
+ private List<String> extractProjectionColumns(PinotQuery pinotQuery, Schema
schema)
+ throws BadQueryRequestException {
+ List<String> projections = new ArrayList<>();
+ boolean hasStar = false;
+ List<Expression> selectList = pinotQuery.getSelectList();
+ if (CollectionUtils.isEmpty(selectList)) {
+ throw new BadQueryRequestException("System tables require a projection
list");
+ }
+ for (Expression expression : selectList) {
+ Identifier identifier = expression.getIdentifier();
+ if (identifier != null) {
+ if ("*".equals(identifier.getName())) {
+ hasStar = true;
+ } else {
+ projections.add(identifier.getName());
+ }
+ continue;
+ }
+ Function function = expression.getFunctionCall();
+ if (function != null && "AS".equalsIgnoreCase(function.getOperator()) &&
!function.getOperands().isEmpty()) {
+ Identifier aliased = function.getOperands().get(0).getIdentifier();
+ if (aliased != null) {
+ projections.add(aliased.getName());
+ continue;
+ }
+ }
+ throw new BadQueryRequestException("System tables only support column
projections or '*'");
+ }
+ if (hasStar || projections.isEmpty()) {
+ projections = new ArrayList<>(schema.getColumnNames());
+ }
+ List<String> normalized = new ArrayList<>(projections.size());
+ for (String column : projections) {
+ if (schema.hasColumn(column)) {
+ normalized.add(column);
+ continue;
+ }
+ String matched = null;
+ for (String schemaColumn : schema.getColumnNames()) {
+ if (schemaColumn.equalsIgnoreCase(column)) {
+ matched = schemaColumn;
+ break;
+ }
+ }
+ if (matched == null) {
+ throw new BadQueryRequestException("Unknown column in system table: "
+ column);
+ }
+ normalized.add(matched);
+ }
+ return normalized;
+ }
+
+ private BrokerResponseNative buildSystemTableBrokerResponse(String
tableName, Schema schema,
+ List<String> projectionColumns, SystemTableResponse response,
RequestContext requestContext) {
+ DataSchema dataSchema = buildSystemTableDataSchema(schema,
projectionColumns);
+ List<GenericRow> rows = response != null ? response.getRows() :
Collections.emptyList();
+ List<Object[]> resultRows = new ArrayList<>();
+ if (rows != null) {
+ for (GenericRow row : rows) {
+ Object[] values = new Object[projectionColumns.size()];
+ for (int i = 0; i < projectionColumns.size(); i++) {
+ values[i] = row != null ? row.getValue(projectionColumns.get(i)) :
null;
+ }
+ resultRows.add(values);
+ }
+ }
+ BrokerResponseNative brokerResponse = new BrokerResponseNative();
+ brokerResponse.setResultTable(new ResultTable(dataSchema, resultRows));
+ brokerResponse.setNumDocsScanned(resultRows.size());
+ brokerResponse.setNumEntriesScannedPostFilter(resultRows.size());
Review Comment:
When response is null (line 1156), resultRows will be empty but
numEntriesScannedPostFilter should be 0 by default. However, if response is
non-null but contains null rows, this could be misleading. Consider using
response.getTotalRows() when response is not null for consistency with
totalDocs.
```suggestion
brokerResponse.setNumEntriesScannedPostFilter(response != null ?
response.getTotalRows() : 0);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]