Copilot commented on code in PR #17291:
URL: https://github.com/apache/pinot/pull/17291#discussion_r2575346399


##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java:
##########
@@ -1033,6 +1046,246 @@ private void throwAccessDeniedError(long requestId, 
String query, RequestContext
     throw new WebApplicationException("Permission denied." + failureMessage, 
Response.Status.FORBIDDEN);
   }
 
+  private boolean isSystemTable(String tableName) {
+    return tableName != null && 
tableName.toLowerCase(Locale.ROOT).startsWith("system.");
+  }
+
+  private BrokerResponse handleSystemTableQuery(PinotQuery pinotQuery, String 
tableName,
+      RequestContext requestContext, @Nullable RequesterIdentity 
requesterIdentity, String query) {
+    if (pinotQuery.isExplain()) {
+      return BrokerResponseNative.BROKER_ONLY_EXPLAIN_PLAN_OUTPUT;
+    }
+    SystemTableProvider provider = _systemTableRegistry.get(tableName);
+    if (provider == null) {
+      requestContext.setErrorCode(QueryErrorCode.TABLE_DOES_NOT_EXIST);
+      return BrokerResponseNative.TABLE_DOES_NOT_EXIST;
+    }
+    try {
+      if (!isSupportedSystemTableQuery(pinotQuery)) {
+        requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION);
+        return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION,
+            "System tables only support simple projection/filter/limit 
queries");
+      }
+      Schema systemSchema = provider.getSchema();
+      List<String> projectionColumns = extractProjectionColumns(pinotQuery, 
systemSchema);
+      int offset = Math.max(0, pinotQuery.getOffset());
+      int limit = Math.max(0, pinotQuery.getLimit());
+      SystemTableRequest systemTableRequest =
+          new SystemTableRequest(projectionColumns, 
toSystemTableFilter(pinotQuery.getFilterExpression()), offset,
+              limit);
+      SystemTableResponse systemTableResponse = 
provider.getRows(systemTableRequest);
+      BrokerResponseNative brokerResponse =
+          buildSystemTableBrokerResponse(tableName, systemSchema, 
projectionColumns, systemTableResponse,
+              requestContext);
+      brokerResponse.setTimeUsedMs(System.currentTimeMillis() - 
requestContext.getRequestArrivalTimeMillis());
+      _queryLogger.log(new QueryLogger.QueryLogParams(requestContext, 
tableName, brokerResponse,
+          QueryLogger.QueryLogParams.QueryEngine.SINGLE_STAGE, 
requesterIdentity, null));
+      return brokerResponse;
+    } catch (BadQueryRequestException e) {
+      requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION);
+      
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 
1);
+      return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION, 
e.getMessage());
+    } catch (Exception e) {
+      LOGGER.warn("Caught exception while handling system table query {}: {}", 
tableName, e.getMessage(), e);
+      requestContext.setErrorCode(QueryErrorCode.QUERY_EXECUTION);
+      return new BrokerResponseNative(QueryErrorCode.QUERY_EXECUTION, 
e.getMessage());
+    }
+  }
+
+  private boolean isSupportedSystemTableQuery(PinotQuery pinotQuery) {
+    return (pinotQuery.getGroupByList() == null || 
pinotQuery.getGroupByList().isEmpty())
+        && pinotQuery.getHavingExpression() == null
+        && (pinotQuery.getOrderByList() == null || 
pinotQuery.getOrderByList().isEmpty());
+  }
+
+  private List<String> extractProjectionColumns(PinotQuery pinotQuery, Schema 
schema)
+      throws BadQueryRequestException {
+    List<String> projections = new ArrayList<>();
+    boolean hasStar = false;
+    List<Expression> selectList = pinotQuery.getSelectList();
+    if (CollectionUtils.isEmpty(selectList)) {
+      throw new BadQueryRequestException("System tables require a projection 
list");
+    }
+    for (Expression expression : selectList) {
+      Identifier identifier = expression.getIdentifier();
+      if (identifier != null) {
+        if ("*".equals(identifier.getName())) {
+          hasStar = true;
+        } else {
+          projections.add(identifier.getName());
+        }
+        continue;
+      }
+      Function function = expression.getFunctionCall();
+      if (function != null && "AS".equalsIgnoreCase(function.getOperator()) && 
!function.getOperands().isEmpty()) {
+        Identifier aliased = function.getOperands().get(0).getIdentifier();
+        if (aliased != null) {
+          projections.add(aliased.getName());
+          continue;
+        }
+      }
+      throw new BadQueryRequestException("System tables only support column 
projections or '*'");
+    }
+    if (hasStar || projections.isEmpty()) {
+      projections = new ArrayList<>(schema.getColumnNames());
+    }
+    List<String> normalized = new ArrayList<>(projections.size());
+    for (String column : projections) {
+      if (schema.hasColumn(column)) {
+        normalized.add(column);
+        continue;
+      }
+      String matched = null;
+      for (String schemaColumn : schema.getColumnNames()) {
+        if (schemaColumn.equalsIgnoreCase(column)) {
+          matched = schemaColumn;
+          break;
+        }
+      }
+      if (matched == null) {
+        throw new BadQueryRequestException("Unknown column in system table: " 
+ column);
+      }
+      normalized.add(matched);
+    }
+    return normalized;
+  }
+
+  private BrokerResponseNative buildSystemTableBrokerResponse(String 
tableName, Schema schema,
+      List<String> projectionColumns, SystemTableResponse response, 
RequestContext requestContext) {
+    DataSchema dataSchema = buildSystemTableDataSchema(schema, 
projectionColumns);
+    List<GenericRow> rows = response != null ? response.getRows() : 
Collections.emptyList();
+    List<Object[]> resultRows = new ArrayList<>();
+    if (rows != null) {
+      for (GenericRow row : rows) {
+        Object[] values = new Object[projectionColumns.size()];
+        for (int i = 0; i < projectionColumns.size(); i++) {
+          values[i] = row != null ? row.getValue(projectionColumns.get(i)) : 
null;

Review Comment:
   The null check for individual row elements (line 1162) is redundant since 
rows is already checked to be non-null and would not contain null elements. If 
null rows are possible, they should be filtered when building the list rather 
than handled during iteration.
   ```suggestion
             values[i] = row.getValue(projectionColumns.get(i));
   ```



##########
pinot-common/src/main/java/org/apache/pinot/common/systemtable/provider/TablesSystemTableProvider.java:
##########
@@ -0,0 +1,584 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.systemtable.provider;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.apache.helix.HelixAdmin;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.systemtable.SystemTableFilter;
+import org.apache.pinot.spi.systemtable.SystemTableProvider;
+import org.apache.pinot.spi.systemtable.SystemTableRequest;
+import org.apache.pinot.spi.systemtable.SystemTableResponse;
+import org.apache.pinot.spi.utils.InstanceTypeUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Basic system table exposing table-level metadata populated from the broker 
{@link TableCache}.
+ */
+public final class TablesSystemTableProvider implements SystemTableProvider {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TablesSystemTableProvider.class);
+
+  private static final Schema SCHEMA = new 
Schema.SchemaBuilder().setSchemaName("system.tables")
+      .addSingleValueDimension("tableName", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("type", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("status", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("segments", FieldSpec.DataType.INT)
+      .addSingleValueDimension("totalDocs", FieldSpec.DataType.LONG)
+      .addMetric("reportedSize", FieldSpec.DataType.LONG)
+      .addMetric("estimatedSize", FieldSpec.DataType.LONG)
+      .addSingleValueDimension("storageTier", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("brokerTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("serverTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("replicas", FieldSpec.DataType.INT)
+      .addSingleValueDimension("tableConfig", FieldSpec.DataType.STRING)
+      .build();
+
+  private final TableCache _tableCache;
+  private final @Nullable HelixAdmin _helixAdmin;
+  private final @Nullable String _clusterName;
+  private final HttpClient _httpClient;
+  private final @Nullable Function<String, TableSize> 
_tableSizeFetcherOverride;
+  private final List<String> _staticControllerUrls;
+
+  public TablesSystemTableProvider() {
+    this(null, null, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache) {
+    this(tableCache, null, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin) {
+    this(tableCache, helixAdmin, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin,
+      @Nullable String clusterName) {
+    this(tableCache, helixAdmin, clusterName, null, null);
+  }
+
+  TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin, @Nullable String clusterName,
+      @Nullable Function<String, TableSize> tableSizeFetcherOverride, 
@Nullable List<String> controllerUrls) {
+    _tableCache = tableCache;
+    _helixAdmin = helixAdmin;
+    _clusterName = clusterName;
+    _httpClient = 
HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(5)).build();
+    _tableSizeFetcherOverride = tableSizeFetcherOverride;
+    _staticControllerUrls = controllerUrls != null ? new 
ArrayList<>(controllerUrls) : List.of();
+  }
+
+  @Override
+  public String getTableName() {
+    return "system.tables";
+  }
+
+  @Override
+  public Schema getSchema() {
+    return SCHEMA;
+  }
+
+  @Override
+  public SystemTableResponse getRows(SystemTableRequest request) {
+    if (_tableCache == null) {
+      return new SystemTableResponse(List.of(), System.currentTimeMillis(), 0);
+    }
+    Set<String> tableNamesWithType = new LinkedHashSet<>();
+    for (String tableName : _tableCache.getTableNameMap().values()) {
+      if (TableNameBuilder.getTableTypeFromTableName(tableName) != null) {
+        tableNamesWithType.add(tableName);
+      }
+    }
+    List<String> sortedTableNames = new ArrayList<>(tableNamesWithType);
+    sortedTableNames.sort(Comparator.naturalOrder());
+
+    List<GenericRow> rows = new ArrayList<>(sortedTableNames.size());
+    int offset = request.getOffset();
+    int limit = request.getLimit();
+    if (limit == 0) {
+      return new SystemTableResponse(List.of(), System.currentTimeMillis(), 0);
+    }
+    for (String tableNameWithType : sortedTableNames) {
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType == null) {
+        continue;
+      }
+      String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+      TableStats stats = buildStats(tableNameWithType, tableType);
+      if (!matchesFilter(request.getFilter(), stats, rawTableName)) {
+        continue;
+      }
+      GenericRow row = new GenericRow();
+      row.putValue("tableName", rawTableName);
+      row.putValue("type", stats._type);
+      row.putValue("status", stats._status);
+      row.putValue("segments", stats._segments);
+      row.putValue("totalDocs", stats._totalDocs);
+      row.putValue("reportedSize", stats._reportedSizeInBytes);
+      row.putValue("estimatedSize", stats._estimatedSizeInBytes);
+      row.putValue("storageTier", stats._storageTier);
+      row.putValue("brokerTenant", stats._brokerTenant);
+      row.putValue("serverTenant", stats._serverTenant);
+      row.putValue("replicas", stats._replicas);
+      row.putValue("tableConfig", stats._tableConfig);
+      if (offset > 0) {
+        offset--;
+        continue;
+      }
+      rows.add(row);
+      if (limit > 0 && rows.size() >= limit) {
+        break;
+      }
+    }
+    return new SystemTableResponse(rows, System.currentTimeMillis(), 
rows.size());

Review Comment:
   The totalRows returned should be the count of all matching rows before 
applying offset/limit, but this returns rows.size() which is the count after 
applying limit. This will cause incorrect pagination metadata when offset/limit 
are used.
   ```suggestion
       return new SystemTableResponse(rows, System.currentTimeMillis(), 
sortedTableNames.size());
   ```



##########
pinot-common/src/main/java/org/apache/pinot/common/systemtable/provider/TablesSystemTableProvider.java:
##########
@@ -0,0 +1,584 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.systemtable.provider;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.apache.helix.HelixAdmin;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.systemtable.SystemTableFilter;
+import org.apache.pinot.spi.systemtable.SystemTableProvider;
+import org.apache.pinot.spi.systemtable.SystemTableRequest;
+import org.apache.pinot.spi.systemtable.SystemTableResponse;
+import org.apache.pinot.spi.utils.InstanceTypeUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Basic system table exposing table-level metadata populated from the broker 
{@link TableCache}.
+ */
+public final class TablesSystemTableProvider implements SystemTableProvider {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TablesSystemTableProvider.class);
+
+  private static final Schema SCHEMA = new 
Schema.SchemaBuilder().setSchemaName("system.tables")
+      .addSingleValueDimension("tableName", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("type", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("status", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("segments", FieldSpec.DataType.INT)
+      .addSingleValueDimension("totalDocs", FieldSpec.DataType.LONG)
+      .addMetric("reportedSize", FieldSpec.DataType.LONG)
+      .addMetric("estimatedSize", FieldSpec.DataType.LONG)
+      .addSingleValueDimension("storageTier", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("brokerTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("serverTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("replicas", FieldSpec.DataType.INT)
+      .addSingleValueDimension("tableConfig", FieldSpec.DataType.STRING)
+      .build();
+
+  private final TableCache _tableCache;
+  private final @Nullable HelixAdmin _helixAdmin;
+  private final @Nullable String _clusterName;
+  private final HttpClient _httpClient;
+  private final @Nullable Function<String, TableSize> 
_tableSizeFetcherOverride;
+  private final List<String> _staticControllerUrls;
+
+  public TablesSystemTableProvider() {
+    this(null, null, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache) {
+    this(tableCache, null, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin) {
+    this(tableCache, helixAdmin, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin,
+      @Nullable String clusterName) {
+    this(tableCache, helixAdmin, clusterName, null, null);
+  }
+
+  TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin, @Nullable String clusterName,
+      @Nullable Function<String, TableSize> tableSizeFetcherOverride, 
@Nullable List<String> controllerUrls) {
+    _tableCache = tableCache;
+    _helixAdmin = helixAdmin;
+    _clusterName = clusterName;
+    _httpClient = 
HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(5)).build();
+    _tableSizeFetcherOverride = tableSizeFetcherOverride;
+    _staticControllerUrls = controllerUrls != null ? new 
ArrayList<>(controllerUrls) : List.of();
+  }
+
+  @Override
+  public String getTableName() {
+    return "system.tables";
+  }
+
+  @Override
+  public Schema getSchema() {
+    return SCHEMA;
+  }
+
+  @Override
+  public SystemTableResponse getRows(SystemTableRequest request) {
+    if (_tableCache == null) {
+      return new SystemTableResponse(List.of(), System.currentTimeMillis(), 0);
+    }
+    Set<String> tableNamesWithType = new LinkedHashSet<>();
+    for (String tableName : _tableCache.getTableNameMap().values()) {
+      if (TableNameBuilder.getTableTypeFromTableName(tableName) != null) {
+        tableNamesWithType.add(tableName);
+      }
+    }
+    List<String> sortedTableNames = new ArrayList<>(tableNamesWithType);
+    sortedTableNames.sort(Comparator.naturalOrder());
+
+    List<GenericRow> rows = new ArrayList<>(sortedTableNames.size());
+    int offset = request.getOffset();
+    int limit = request.getLimit();
+    if (limit == 0) {
+      return new SystemTableResponse(List.of(), System.currentTimeMillis(), 0);
+    }
+    for (String tableNameWithType : sortedTableNames) {
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType == null) {
+        continue;
+      }
+      String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+      TableStats stats = buildStats(tableNameWithType, tableType);
+      if (!matchesFilter(request.getFilter(), stats, rawTableName)) {
+        continue;
+      }
+      GenericRow row = new GenericRow();
+      row.putValue("tableName", rawTableName);
+      row.putValue("type", stats._type);
+      row.putValue("status", stats._status);
+      row.putValue("segments", stats._segments);
+      row.putValue("totalDocs", stats._totalDocs);
+      row.putValue("reportedSize", stats._reportedSizeInBytes);
+      row.putValue("estimatedSize", stats._estimatedSizeInBytes);
+      row.putValue("storageTier", stats._storageTier);
+      row.putValue("brokerTenant", stats._brokerTenant);
+      row.putValue("serverTenant", stats._serverTenant);
+      row.putValue("replicas", stats._replicas);
+      row.putValue("tableConfig", stats._tableConfig);
+      if (offset > 0) {
+        offset--;
+        continue;
+      }

Review Comment:
   The offset logic is applied after the filter check, which means the offset 
is counting filtered rows. However, the totalRows count returned in line 171 
uses rows.size() after pagination, which creates inconsistent pagination 
behavior. The totalRows should reflect all filtered rows before offset/limit, 
but currently it's rows after limit.



##########
pinot-common/src/main/java/org/apache/pinot/common/systemtable/provider/TablesSystemTableProvider.java:
##########
@@ -0,0 +1,584 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.systemtable.provider;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.apache.helix.HelixAdmin;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.systemtable.SystemTableFilter;
+import org.apache.pinot.spi.systemtable.SystemTableProvider;
+import org.apache.pinot.spi.systemtable.SystemTableRequest;
+import org.apache.pinot.spi.systemtable.SystemTableResponse;
+import org.apache.pinot.spi.utils.InstanceTypeUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Basic system table exposing table-level metadata populated from the broker 
{@link TableCache}.
+ */
+public final class TablesSystemTableProvider implements SystemTableProvider {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TablesSystemTableProvider.class);
+
+  private static final Schema SCHEMA = new 
Schema.SchemaBuilder().setSchemaName("system.tables")
+      .addSingleValueDimension("tableName", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("type", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("status", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("segments", FieldSpec.DataType.INT)
+      .addSingleValueDimension("totalDocs", FieldSpec.DataType.LONG)
+      .addMetric("reportedSize", FieldSpec.DataType.LONG)
+      .addMetric("estimatedSize", FieldSpec.DataType.LONG)
+      .addSingleValueDimension("storageTier", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("brokerTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("serverTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("replicas", FieldSpec.DataType.INT)
+      .addSingleValueDimension("tableConfig", FieldSpec.DataType.STRING)
+      .build();
+
+  private final TableCache _tableCache;
+  private final @Nullable HelixAdmin _helixAdmin;
+  private final @Nullable String _clusterName;
+  private final HttpClient _httpClient;
+  private final @Nullable Function<String, TableSize> 
_tableSizeFetcherOverride;
+  private final List<String> _staticControllerUrls;
+
+  public TablesSystemTableProvider() {
+    this(null, null, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache) {
+    this(tableCache, null, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin) {
+    this(tableCache, helixAdmin, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin,
+      @Nullable String clusterName) {
+    this(tableCache, helixAdmin, clusterName, null, null);
+  }
+
+  TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin, @Nullable String clusterName,
+      @Nullable Function<String, TableSize> tableSizeFetcherOverride, 
@Nullable List<String> controllerUrls) {
+    _tableCache = tableCache;
+    _helixAdmin = helixAdmin;
+    _clusterName = clusterName;
+    _httpClient = 
HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(5)).build();
+    _tableSizeFetcherOverride = tableSizeFetcherOverride;
+    _staticControllerUrls = controllerUrls != null ? new 
ArrayList<>(controllerUrls) : List.of();
+  }
+
+  @Override
+  public String getTableName() {
+    return "system.tables";
+  }
+
+  @Override
+  public Schema getSchema() {
+    return SCHEMA;
+  }
+
+  @Override
+  public SystemTableResponse getRows(SystemTableRequest request) {
+    if (_tableCache == null) {
+      return new SystemTableResponse(List.of(), System.currentTimeMillis(), 0);
+    }
+    Set<String> tableNamesWithType = new LinkedHashSet<>();
+    for (String tableName : _tableCache.getTableNameMap().values()) {
+      if (TableNameBuilder.getTableTypeFromTableName(tableName) != null) {
+        tableNamesWithType.add(tableName);
+      }
+    }
+    List<String> sortedTableNames = new ArrayList<>(tableNamesWithType);
+    sortedTableNames.sort(Comparator.naturalOrder());
+
+    List<GenericRow> rows = new ArrayList<>(sortedTableNames.size());
+    int offset = request.getOffset();
+    int limit = request.getLimit();
+    if (limit == 0) {
+      return new SystemTableResponse(List.of(), System.currentTimeMillis(), 0);
+    }
+    for (String tableNameWithType : sortedTableNames) {
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType == null) {
+        continue;
+      }
+      String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+      TableStats stats = buildStats(tableNameWithType, tableType);
+      if (!matchesFilter(request.getFilter(), stats, rawTableName)) {
+        continue;
+      }
+      GenericRow row = new GenericRow();
+      row.putValue("tableName", rawTableName);
+      row.putValue("type", stats._type);
+      row.putValue("status", stats._status);
+      row.putValue("segments", stats._segments);
+      row.putValue("totalDocs", stats._totalDocs);
+      row.putValue("reportedSize", stats._reportedSizeInBytes);
+      row.putValue("estimatedSize", stats._estimatedSizeInBytes);
+      row.putValue("storageTier", stats._storageTier);
+      row.putValue("brokerTenant", stats._brokerTenant);
+      row.putValue("serverTenant", stats._serverTenant);
+      row.putValue("replicas", stats._replicas);
+      row.putValue("tableConfig", stats._tableConfig);
+      if (offset > 0) {
+        offset--;
+        continue;
+      }
+      rows.add(row);
+      if (limit > 0 && rows.size() >= limit) {
+        break;
+      }
+    }
+    return new SystemTableResponse(rows, System.currentTimeMillis(), 
rows.size());
+  }
+
+  private TableStats buildStats(String tableNameWithType, TableType tableType) 
{
+    TableConfig tableConfig = _tableCache.getTableConfig(tableNameWithType);
+    int segments = 0;
+    long totalDocs = 0;
+    long reportedSize = 0;
+    long estimatedSize = 0;
+    String tierValue = "";
+    String brokerTenant = "";
+    String serverTenant = "";
+    int replicas = 0;
+    if (tableConfig != null && tableConfig.getTenantConfig() != null) {
+      brokerTenant = tableConfig.getTenantConfig().getBroker();
+      serverTenant = tableConfig.getTenantConfig().getServer();
+    }
+    if (tableConfig != null && tableConfig.getValidationConfig() != null) {
+      Integer repl = tableConfig.getValidationConfig().getReplicationNumber();
+      replicas = repl != null ? repl : replicas;
+    }
+    // Use controller API only
+    TableSize sizeFromController = fetchTableSize(tableNameWithType);
+    if (sizeFromController != null) {
+      if (sizeFromController._reportedSizeInBytes >= 0) {
+        reportedSize = sizeFromController._reportedSizeInBytes;
+      }
+      if (sizeFromController._estimatedSizeInBytes >= 0) {
+        estimatedSize = sizeFromController._estimatedSizeInBytes;
+      }
+      segments = getSegmentCount(sizeFromController, tableType);
+      totalDocs = getTotalDocs(sizeFromController, tableType);
+    }
+    String status = tableConfig != null ? "ONLINE" : (segments > 0 ? "ONLINE" 
: "UNKNOWN");
+    String tableConfigJson = "";
+    if (tableConfig != null) {
+      try {
+        tableConfigJson = JsonUtils.objectToString(tableConfig);
+      } catch (Exception e) {
+        LOGGER.warn("Failed to serialize table config for {}: {}", 
tableNameWithType, e.toString());
+        tableConfigJson = tableConfig.toString();
+      }
+    }
+    return new TableStats(tableType.name(), status, segments, totalDocs, 
reportedSize, estimatedSize, tierValue,
+        tableConfigJson, brokerTenant, serverTenant, replicas);
+  }
+
+  private boolean matchesFilter(SystemTableFilter filter, TableStats stats, 
String rawTableName) {
+    if (filter == null) {
+      return true;
+    }
+    if (filter.getChildren().isEmpty()) {
+      return matchesLeafFilter(filter, stats, rawTableName);
+    }
+    switch (filter.getOperator()) {
+      case AND:
+        for (SystemTableFilter child : filter.getChildren()) {
+          if (!matchesFilter(child, stats, rawTableName)) {
+            return false;
+          }
+        }
+        return true;
+      case OR:
+        for (SystemTableFilter child : filter.getChildren()) {
+          if (matchesFilter(child, stats, rawTableName)) {
+            return true;
+          }
+        }
+        return false;
+      case NOT:
+        return filter.getChildren().isEmpty() || 
!matchesFilter(filter.getChildren().get(0), stats, rawTableName);
+      default:
+        return true;
+    }
+  }
+
+  private boolean matchesLeafFilter(SystemTableFilter filter, TableStats 
stats, String rawTableName) {
+    String column = filter.getColumn();
+    if (column == null) {
+      return true;
+    }
+    List<String> values = filter.getValues();
+    if (values == null || values.isEmpty()) {
+      return true;
+    }
+    switch (column.toLowerCase()) {
+      case "tablename":
+        return matchesString(values, rawTableName, filter.getOperator());
+      case "type":
+        return matchesString(values, stats._type, filter.getOperator());
+      case "status":
+        return matchesString(values, stats._status, filter.getOperator());
+      case "segments":
+        return matchesNumber(values, stats._segments, filter.getOperator());
+      case "reportedsize":
+        return matchesNumber(values, stats._reportedSizeInBytes, 
filter.getOperator());
+      case "estimatedsize":
+        return matchesNumber(values, stats._estimatedSizeInBytes, 
filter.getOperator());
+      default:
+        return true;
+    }
+  }
+
+  private boolean matchesString(List<String> candidates, String actual, 
SystemTableFilter.Operator operator) {
+    switch (operator) {
+      case EQ:
+        return candidates.stream().anyMatch(v -> v.equalsIgnoreCase(actual));
+      case IN:
+        return candidates.stream().anyMatch(v -> v.equalsIgnoreCase(actual));
+      case NEQ:
+        return candidates.stream().noneMatch(v -> v.equalsIgnoreCase(actual));
+      default:
+        return true;
+    }
+  }
+
+  private boolean matchesNumber(List<String> candidates, long actual, 
SystemTableFilter.Operator operator) {
+    try {
+      switch (operator) {
+        case EQ:
+          return candidates.stream().anyMatch(v -> Long.parseLong(v) == 
actual);
+        case NEQ:
+          return candidates.stream().noneMatch(v -> Long.parseLong(v) == 
actual);
+        case GT:
+          return candidates.stream().anyMatch(v -> actual > Long.parseLong(v));
+        case GTE:
+          return candidates.stream().anyMatch(v -> actual >= 
Long.parseLong(v));
+        case LT:
+          return candidates.stream().anyMatch(v -> actual < Long.parseLong(v));
+        case LTE:
+          return candidates.stream().anyMatch(v -> actual <= 
Long.parseLong(v));
+        case IN:
+          for (String candidate : candidates) {
+            if (actual == Long.parseLong(candidate)) {
+              return true;
+            }
+          }
+          return false;
+        default:
+          return true;
+      }
+    } catch (NumberFormatException e) {
+      LOGGER.debug("Failed to parse numeric filter value {}: {}", candidates, 
e.toString());
+      return true;
+    }
+  }
+
+  private @Nullable TableSize fetchTableSize(String tableNameWithType) {
+    Function<String, TableSize> fetcher = getSizeFetcher();
+    if (fetcher != null) {
+      try {
+        TableSize fetched = fetcher.apply(tableNameWithType);
+        if (fetched != null) {
+          return fetched;
+        }
+      } catch (Exception e) {
+        LOGGER.warn("Table size fetcher failed for {}: {}", tableNameWithType, 
e.toString());
+      }
+    }
+    String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+    TableSize size = fetchTableSizeForName(rawTableName);
+    if (size == null) {
+      size = fetchTableSizeForName(tableNameWithType);
+      if (size == null) {
+        LOGGER.warn("system.tables: failed to fetch size for {} (raw: {}), 
falling back to zeros",
+            tableNameWithType, rawTableName);
+      }
+    }
+    return size;
+  }
+
+  private @Nullable TableSize fetchTableSizeForName(String tableName) {
+    for (String baseUrl : getControllerBaseUrls()) {
+      try {
+        String url = baseUrl + "/tables/" + tableName + 
"/size?verbose=true&includeReplacedSegments=false";
+        HttpRequest request = HttpRequest.newBuilder(URI.create(url))
+            .timeout(Duration.ofSeconds(5))
+            .GET()
+            .header("Accept", "application/json")
+            .build();
+        HttpResponse<String> response = _httpClient.send(request, 
HttpResponse.BodyHandlers.ofString());
+        if (response.statusCode() >= 200 && response.statusCode() < 300) {
+          TableSize parsed = JsonUtils.stringToObject(response.body(), 
TableSize.class);
+          LOGGER.info("system.tables: controller size response for {} via {} 
-> segments offline={}, realtime={}, "
+                  + "reportedSize={}, estimatedSize={}", tableName, baseUrl,
+              parsed._offlineSegments != null && 
parsed._offlineSegments._segments != null
+                  ? parsed._offlineSegments._segments.size() : 0,
+              parsed._realtimeSegments != null && 
parsed._realtimeSegments._segments != null
+                  ? parsed._realtimeSegments._segments.size() : 0,
+              parsed._reportedSizeInBytes, parsed._estimatedSizeInBytes);

Review Comment:
   [nitpick] This info-level log on every table size fetch could be noisy in 
production with many tables. Consider using debug level or adding a sample rate 
to reduce log volume.



##########
pinot-common/src/main/java/org/apache/pinot/common/systemtable/provider/TablesSystemTableProvider.java:
##########
@@ -0,0 +1,584 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.systemtable.provider;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.apache.helix.HelixAdmin;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.systemtable.SystemTableFilter;
+import org.apache.pinot.spi.systemtable.SystemTableProvider;
+import org.apache.pinot.spi.systemtable.SystemTableRequest;
+import org.apache.pinot.spi.systemtable.SystemTableResponse;
+import org.apache.pinot.spi.utils.InstanceTypeUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Basic system table exposing table-level metadata populated from the broker 
{@link TableCache}.
+ */
+public final class TablesSystemTableProvider implements SystemTableProvider {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TablesSystemTableProvider.class);
+
+  private static final Schema SCHEMA = new 
Schema.SchemaBuilder().setSchemaName("system.tables")
+      .addSingleValueDimension("tableName", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("type", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("status", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("segments", FieldSpec.DataType.INT)
+      .addSingleValueDimension("totalDocs", FieldSpec.DataType.LONG)
+      .addMetric("reportedSize", FieldSpec.DataType.LONG)
+      .addMetric("estimatedSize", FieldSpec.DataType.LONG)
+      .addSingleValueDimension("storageTier", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("brokerTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("serverTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("replicas", FieldSpec.DataType.INT)
+      .addSingleValueDimension("tableConfig", FieldSpec.DataType.STRING)
+      .build();
+
+  private final TableCache _tableCache;
+  private final @Nullable HelixAdmin _helixAdmin;
+  private final @Nullable String _clusterName;
+  private final HttpClient _httpClient;
+  private final @Nullable Function<String, TableSize> 
_tableSizeFetcherOverride;
+  private final List<String> _staticControllerUrls;
+
+  public TablesSystemTableProvider() {
+    this(null, null, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache) {
+    this(tableCache, null, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin) {
+    this(tableCache, helixAdmin, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin,
+      @Nullable String clusterName) {
+    this(tableCache, helixAdmin, clusterName, null, null);
+  }
+
+  TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin, @Nullable String clusterName,
+      @Nullable Function<String, TableSize> tableSizeFetcherOverride, 
@Nullable List<String> controllerUrls) {
+    _tableCache = tableCache;
+    _helixAdmin = helixAdmin;
+    _clusterName = clusterName;
+    _httpClient = 
HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(5)).build();
+    _tableSizeFetcherOverride = tableSizeFetcherOverride;
+    _staticControllerUrls = controllerUrls != null ? new 
ArrayList<>(controllerUrls) : List.of();
+  }
+
+  @Override
+  public String getTableName() {
+    return "system.tables";
+  }
+
+  @Override
+  public Schema getSchema() {
+    return SCHEMA;
+  }
+
+  @Override
+  public SystemTableResponse getRows(SystemTableRequest request) {
+    if (_tableCache == null) {
+      return new SystemTableResponse(List.of(), System.currentTimeMillis(), 0);
+    }
+    Set<String> tableNamesWithType = new LinkedHashSet<>();
+    for (String tableName : _tableCache.getTableNameMap().values()) {
+      if (TableNameBuilder.getTableTypeFromTableName(tableName) != null) {
+        tableNamesWithType.add(tableName);
+      }
+    }
+    List<String> sortedTableNames = new ArrayList<>(tableNamesWithType);
+    sortedTableNames.sort(Comparator.naturalOrder());
+
+    List<GenericRow> rows = new ArrayList<>(sortedTableNames.size());
+    int offset = request.getOffset();
+    int limit = request.getLimit();
+    if (limit == 0) {
+      return new SystemTableResponse(List.of(), System.currentTimeMillis(), 0);
+    }
+    for (String tableNameWithType : sortedTableNames) {
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType == null) {
+        continue;
+      }
+      String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+      TableStats stats = buildStats(tableNameWithType, tableType);
+      if (!matchesFilter(request.getFilter(), stats, rawTableName)) {
+        continue;
+      }
+      GenericRow row = new GenericRow();
+      row.putValue("tableName", rawTableName);
+      row.putValue("type", stats._type);
+      row.putValue("status", stats._status);
+      row.putValue("segments", stats._segments);
+      row.putValue("totalDocs", stats._totalDocs);
+      row.putValue("reportedSize", stats._reportedSizeInBytes);
+      row.putValue("estimatedSize", stats._estimatedSizeInBytes);
+      row.putValue("storageTier", stats._storageTier);
+      row.putValue("brokerTenant", stats._brokerTenant);
+      row.putValue("serverTenant", stats._serverTenant);
+      row.putValue("replicas", stats._replicas);
+      row.putValue("tableConfig", stats._tableConfig);
+      if (offset > 0) {
+        offset--;
+        continue;
+      }
+      rows.add(row);
+      if (limit > 0 && rows.size() >= limit) {
+        break;
+      }
+    }
+    return new SystemTableResponse(rows, System.currentTimeMillis(), 
rows.size());
+  }
+
+  private TableStats buildStats(String tableNameWithType, TableType tableType) 
{
+    TableConfig tableConfig = _tableCache.getTableConfig(tableNameWithType);
+    int segments = 0;
+    long totalDocs = 0;
+    long reportedSize = 0;
+    long estimatedSize = 0;
+    String tierValue = "";
+    String brokerTenant = "";
+    String serverTenant = "";
+    int replicas = 0;
+    if (tableConfig != null && tableConfig.getTenantConfig() != null) {
+      brokerTenant = tableConfig.getTenantConfig().getBroker();
+      serverTenant = tableConfig.getTenantConfig().getServer();
+    }
+    if (tableConfig != null && tableConfig.getValidationConfig() != null) {
+      Integer repl = tableConfig.getValidationConfig().getReplicationNumber();
+      replicas = repl != null ? repl : replicas;
+    }
+    // Use controller API only
+    TableSize sizeFromController = fetchTableSize(tableNameWithType);
+    if (sizeFromController != null) {
+      if (sizeFromController._reportedSizeInBytes >= 0) {
+        reportedSize = sizeFromController._reportedSizeInBytes;
+      }
+      if (sizeFromController._estimatedSizeInBytes >= 0) {
+        estimatedSize = sizeFromController._estimatedSizeInBytes;
+      }
+      segments = getSegmentCount(sizeFromController, tableType);
+      totalDocs = getTotalDocs(sizeFromController, tableType);
+    }
+    String status = tableConfig != null ? "ONLINE" : (segments > 0 ? "ONLINE" 
: "UNKNOWN");
+    String tableConfigJson = "";
+    if (tableConfig != null) {
+      try {
+        tableConfigJson = JsonUtils.objectToString(tableConfig);
+      } catch (Exception e) {
+        LOGGER.warn("Failed to serialize table config for {}: {}", 
tableNameWithType, e.toString());
+        tableConfigJson = tableConfig.toString();
+      }
+    }
+    return new TableStats(tableType.name(), status, segments, totalDocs, 
reportedSize, estimatedSize, tierValue,
+        tableConfigJson, brokerTenant, serverTenant, replicas);
+  }
+
+  private boolean matchesFilter(SystemTableFilter filter, TableStats stats, 
String rawTableName) {
+    if (filter == null) {
+      return true;
+    }
+    if (filter.getChildren().isEmpty()) {
+      return matchesLeafFilter(filter, stats, rawTableName);
+    }
+    switch (filter.getOperator()) {
+      case AND:
+        for (SystemTableFilter child : filter.getChildren()) {
+          if (!matchesFilter(child, stats, rawTableName)) {
+            return false;
+          }
+        }
+        return true;
+      case OR:
+        for (SystemTableFilter child : filter.getChildren()) {
+          if (matchesFilter(child, stats, rawTableName)) {
+            return true;
+          }
+        }
+        return false;
+      case NOT:
+        return filter.getChildren().isEmpty() || 
!matchesFilter(filter.getChildren().get(0), stats, rawTableName);
+      default:
+        return true;
+    }
+  }
+
+  private boolean matchesLeafFilter(SystemTableFilter filter, TableStats 
stats, String rawTableName) {
+    String column = filter.getColumn();
+    if (column == null) {
+      return true;
+    }
+    List<String> values = filter.getValues();
+    if (values == null || values.isEmpty()) {
+      return true;
+    }
+    switch (column.toLowerCase()) {
+      case "tablename":
+        return matchesString(values, rawTableName, filter.getOperator());
+      case "type":
+        return matchesString(values, stats._type, filter.getOperator());
+      case "status":
+        return matchesString(values, stats._status, filter.getOperator());
+      case "segments":
+        return matchesNumber(values, stats._segments, filter.getOperator());
+      case "reportedsize":
+        return matchesNumber(values, stats._reportedSizeInBytes, 
filter.getOperator());
+      case "estimatedsize":
+        return matchesNumber(values, stats._estimatedSizeInBytes, 
filter.getOperator());
+      default:
+        return true;
+    }
+  }
+
+  private boolean matchesString(List<String> candidates, String actual, 
SystemTableFilter.Operator operator) {
+    switch (operator) {
+      case EQ:
+        return candidates.stream().anyMatch(v -> v.equalsIgnoreCase(actual));
+      case IN:
+        return candidates.stream().anyMatch(v -> v.equalsIgnoreCase(actual));
+      case NEQ:
+        return candidates.stream().noneMatch(v -> v.equalsIgnoreCase(actual));
+      default:
+        return true;
+    }
+  }
+
+  private boolean matchesNumber(List<String> candidates, long actual, 
SystemTableFilter.Operator operator) {
+    try {
+      switch (operator) {
+        case EQ:
+          return candidates.stream().anyMatch(v -> Long.parseLong(v) == 
actual);
+        case NEQ:
+          return candidates.stream().noneMatch(v -> Long.parseLong(v) == 
actual);
+        case GT:
+          return candidates.stream().anyMatch(v -> actual > Long.parseLong(v));
+        case GTE:
+          return candidates.stream().anyMatch(v -> actual >= 
Long.parseLong(v));
+        case LT:
+          return candidates.stream().anyMatch(v -> actual < Long.parseLong(v));
+        case LTE:
+          return candidates.stream().anyMatch(v -> actual <= 
Long.parseLong(v));
+        case IN:
+          for (String candidate : candidates) {
+            if (actual == Long.parseLong(candidate)) {
+              return true;
+            }
+          }
+          return false;
+        default:
+          return true;
+      }
+    } catch (NumberFormatException e) {
+      LOGGER.debug("Failed to parse numeric filter value {}: {}", candidates, 
e.toString());
+      return true;
+    }
+  }
+
+  private @Nullable TableSize fetchTableSize(String tableNameWithType) {
+    Function<String, TableSize> fetcher = getSizeFetcher();
+    if (fetcher != null) {
+      try {
+        TableSize fetched = fetcher.apply(tableNameWithType);
+        if (fetched != null) {
+          return fetched;
+        }
+      } catch (Exception e) {
+        LOGGER.warn("Table size fetcher failed for {}: {}", tableNameWithType, 
e.toString());
+      }
+    }
+    String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+    TableSize size = fetchTableSizeForName(rawTableName);
+    if (size == null) {
+      size = fetchTableSizeForName(tableNameWithType);
+      if (size == null) {
+        LOGGER.warn("system.tables: failed to fetch size for {} (raw: {}), 
falling back to zeros",
+            tableNameWithType, rawTableName);
+      }
+    }
+    return size;
+  }
+
+  private @Nullable TableSize fetchTableSizeForName(String tableName) {
+    for (String baseUrl : getControllerBaseUrls()) {
+      try {
+        String url = baseUrl + "/tables/" + tableName + 
"/size?verbose=true&includeReplacedSegments=false";
+        HttpRequest request = HttpRequest.newBuilder(URI.create(url))
+            .timeout(Duration.ofSeconds(5))
+            .GET()
+            .header("Accept", "application/json")
+            .build();
+        HttpResponse<String> response = _httpClient.send(request, 
HttpResponse.BodyHandlers.ofString());
+        if (response.statusCode() >= 200 && response.statusCode() < 300) {
+          TableSize parsed = JsonUtils.stringToObject(response.body(), 
TableSize.class);
+          LOGGER.info("system.tables: controller size response for {} via {} 
-> segments offline={}, realtime={}, "
+                  + "reportedSize={}, estimatedSize={}", tableName, baseUrl,
+              parsed._offlineSegments != null && 
parsed._offlineSegments._segments != null
+                  ? parsed._offlineSegments._segments.size() : 0,
+              parsed._realtimeSegments != null && 
parsed._realtimeSegments._segments != null
+                  ? parsed._realtimeSegments._segments.size() : 0,
+              parsed._reportedSizeInBytes, parsed._estimatedSizeInBytes);
+          return parsed;
+        } else {
+          LOGGER.warn("system.tables: failed to fetch table size for {} via 
{}: status {}, body={}", tableName,
+              baseUrl, response.statusCode(), response.body());
+        }
+      } catch (Exception e) {
+        LOGGER.warn("system.tables: error fetching table size for {} via {}: 
{}", tableName, baseUrl, e.toString(), e);

Review Comment:
   The error message format string has 3 placeholders but 4 arguments are 
provided (tableName, baseUrl, e.toString(), e). The last 'e' argument for stack 
trace won't match any placeholder, causing a formatting error.
   ```suggestion
           LOGGER.warn("system.tables: error fetching table size for {} via 
{}", tableName, baseUrl, e);
   ```



##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java:
##########
@@ -1033,6 +1046,246 @@ private void throwAccessDeniedError(long requestId, 
String query, RequestContext
     throw new WebApplicationException("Permission denied." + failureMessage, 
Response.Status.FORBIDDEN);
   }
 
+  private boolean isSystemTable(String tableName) {
+    return tableName != null && 
tableName.toLowerCase(Locale.ROOT).startsWith("system.");
+  }
+
+  private BrokerResponse handleSystemTableQuery(PinotQuery pinotQuery, String 
tableName,
+      RequestContext requestContext, @Nullable RequesterIdentity 
requesterIdentity, String query) {
+    if (pinotQuery.isExplain()) {
+      return BrokerResponseNative.BROKER_ONLY_EXPLAIN_PLAN_OUTPUT;
+    }
+    SystemTableProvider provider = _systemTableRegistry.get(tableName);
+    if (provider == null) {
+      requestContext.setErrorCode(QueryErrorCode.TABLE_DOES_NOT_EXIST);
+      return BrokerResponseNative.TABLE_DOES_NOT_EXIST;
+    }
+    try {
+      if (!isSupportedSystemTableQuery(pinotQuery)) {
+        requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION);
+        return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION,
+            "System tables only support simple projection/filter/limit 
queries");
+      }
+      Schema systemSchema = provider.getSchema();
+      List<String> projectionColumns = extractProjectionColumns(pinotQuery, 
systemSchema);
+      int offset = Math.max(0, pinotQuery.getOffset());
+      int limit = Math.max(0, pinotQuery.getLimit());
+      SystemTableRequest systemTableRequest =
+          new SystemTableRequest(projectionColumns, 
toSystemTableFilter(pinotQuery.getFilterExpression()), offset,
+              limit);
+      SystemTableResponse systemTableResponse = 
provider.getRows(systemTableRequest);
+      BrokerResponseNative brokerResponse =
+          buildSystemTableBrokerResponse(tableName, systemSchema, 
projectionColumns, systemTableResponse,
+              requestContext);
+      brokerResponse.setTimeUsedMs(System.currentTimeMillis() - 
requestContext.getRequestArrivalTimeMillis());
+      _queryLogger.log(new QueryLogger.QueryLogParams(requestContext, 
tableName, brokerResponse,
+          QueryLogger.QueryLogParams.QueryEngine.SINGLE_STAGE, 
requesterIdentity, null));
+      return brokerResponse;
+    } catch (BadQueryRequestException e) {
+      requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION);
+      
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 
1);
+      return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION, 
e.getMessage());
+    } catch (Exception e) {
+      LOGGER.warn("Caught exception while handling system table query {}: {}", 
tableName, e.getMessage(), e);
+      requestContext.setErrorCode(QueryErrorCode.QUERY_EXECUTION);
+      return new BrokerResponseNative(QueryErrorCode.QUERY_EXECUTION, 
e.getMessage());
+    }
+  }
+
+  private boolean isSupportedSystemTableQuery(PinotQuery pinotQuery) {
+    return (pinotQuery.getGroupByList() == null || 
pinotQuery.getGroupByList().isEmpty())
+        && pinotQuery.getHavingExpression() == null
+        && (pinotQuery.getOrderByList() == null || 
pinotQuery.getOrderByList().isEmpty());
+  }
+
+  private List<String> extractProjectionColumns(PinotQuery pinotQuery, Schema 
schema)
+      throws BadQueryRequestException {
+    List<String> projections = new ArrayList<>();
+    boolean hasStar = false;
+    List<Expression> selectList = pinotQuery.getSelectList();
+    if (CollectionUtils.isEmpty(selectList)) {
+      throw new BadQueryRequestException("System tables require a projection 
list");
+    }
+    for (Expression expression : selectList) {
+      Identifier identifier = expression.getIdentifier();
+      if (identifier != null) {
+        if ("*".equals(identifier.getName())) {
+          hasStar = true;
+        } else {
+          projections.add(identifier.getName());
+        }
+        continue;
+      }
+      Function function = expression.getFunctionCall();
+      if (function != null && "AS".equalsIgnoreCase(function.getOperator()) && 
!function.getOperands().isEmpty()) {
+        Identifier aliased = function.getOperands().get(0).getIdentifier();
+        if (aliased != null) {
+          projections.add(aliased.getName());
+          continue;
+        }
+      }
+      throw new BadQueryRequestException("System tables only support column 
projections or '*'");
+    }
+    if (hasStar || projections.isEmpty()) {
+      projections = new ArrayList<>(schema.getColumnNames());
+    }
+    List<String> normalized = new ArrayList<>(projections.size());
+    for (String column : projections) {
+      if (schema.hasColumn(column)) {
+        normalized.add(column);
+        continue;
+      }
+      String matched = null;
+      for (String schemaColumn : schema.getColumnNames()) {
+        if (schemaColumn.equalsIgnoreCase(column)) {
+          matched = schemaColumn;
+          break;
+        }
+      }
+      if (matched == null) {
+        throw new BadQueryRequestException("Unknown column in system table: " 
+ column);
+      }
+      normalized.add(matched);
+    }
+    return normalized;
+  }
+
+  private BrokerResponseNative buildSystemTableBrokerResponse(String 
tableName, Schema schema,
+      List<String> projectionColumns, SystemTableResponse response, 
RequestContext requestContext) {
+    DataSchema dataSchema = buildSystemTableDataSchema(schema, 
projectionColumns);
+    List<GenericRow> rows = response != null ? response.getRows() : 
Collections.emptyList();
+    List<Object[]> resultRows = new ArrayList<>();
+    if (rows != null) {
+      for (GenericRow row : rows) {
+        Object[] values = new Object[projectionColumns.size()];
+        for (int i = 0; i < projectionColumns.size(); i++) {
+          values[i] = row != null ? row.getValue(projectionColumns.get(i)) : 
null;
+        }
+        resultRows.add(values);
+      }
+    }
+    BrokerResponseNative brokerResponse = new BrokerResponseNative();
+    brokerResponse.setResultTable(new ResultTable(dataSchema, resultRows));
+    brokerResponse.setNumDocsScanned(resultRows.size());
+    brokerResponse.setNumEntriesScannedPostFilter(resultRows.size());

Review Comment:
   When response is null (line 1156), resultRows will be empty but 
numEntriesScannedPostFilter should be 0 by default. However, if response is 
non-null but contains null rows, this could be misleading. Consider using 
response.getTotalRows() when response is not null for consistency with 
totalDocs.
   ```suggestion
       brokerResponse.setNumEntriesScannedPostFilter(response != null ? 
response.getTotalRows() : 0);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to