Copilot commented on code in PR #17291:
URL: https://github.com/apache/pinot/pull/17291#discussion_r2640570977


##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SystemTableBrokerRequestHandler.java:
##########
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.requesthandler;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.util.Collections;
+import java.util.Locale;
+import java.util.Map;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import javax.annotation.Nullable;
+import javax.ws.rs.core.HttpHeaders;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hc.client5.http.io.HttpClientConnectionManager;
+import org.apache.pinot.broker.api.AccessControl;
+import org.apache.pinot.broker.broker.AccessControlFactory;
+import org.apache.pinot.broker.querylog.QueryLogger;
+import org.apache.pinot.broker.queryquota.QueryQuotaManager;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.systemtable.SystemTableDataProvider;
+import org.apache.pinot.common.systemtable.SystemTableRegistry;
+import org.apache.pinot.common.utils.request.RequestUtils;
+import org.apache.pinot.core.routing.RoutingManager;
+import org.apache.pinot.spi.accounting.ThreadAccountant;
+import org.apache.pinot.spi.auth.AuthorizationResult;
+import org.apache.pinot.spi.auth.broker.RequesterIdentity;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.exception.BadQueryRequestException;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.trace.RequestContext;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.sql.parsers.CalciteSqlParser;
+import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Broker request handler for system tables (handled entirely on the broker).
+ */
+public class SystemTableBrokerRequestHandler extends BaseBrokerRequestHandler {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SystemTableBrokerRequestHandler.class);
+
+  public SystemTableBrokerRequestHandler(PinotConfiguration config, String 
brokerId,
+      BrokerRequestIdGenerator requestIdGenerator, RoutingManager 
routingManager,
+      AccessControlFactory accessControlFactory, QueryQuotaManager 
queryQuotaManager, TableCache tableCache,
+      ThreadAccountant threadAccountant) {
+    super(config, brokerId, requestIdGenerator, routingManager, 
accessControlFactory, queryQuotaManager, tableCache,
+        threadAccountant);
+  }
+
+  @Override
+  public void start() {
+  }
+
+  @Override
+  public void shutDown() {
+  }
+
+  public boolean canHandle(String tableName) {
+    return isSystemTable(tableName) && 
SystemTableRegistry.isRegistered(tableName);
+  }
+
+  @Override
+  protected BrokerResponse handleRequest(long requestId, String query, 
SqlNodeAndOptions sqlNodeAndOptions,
+      JsonNode request, @Nullable RequesterIdentity requesterIdentity, 
RequestContext requestContext,
+      @Nullable HttpHeaders httpHeaders, AccessControl accessControl)
+      throws Exception {
+    PinotQuery pinotQuery;
+    try {
+      pinotQuery = CalciteSqlParser.compileToPinotQuery(sqlNodeAndOptions);
+    } catch (Exception e) {
+      requestContext.setErrorCode(QueryErrorCode.SQL_PARSING);
+      return new BrokerResponseNative(QueryErrorCode.SQL_PARSING, 
e.getMessage());
+    }
+
+    Set<String> tableNames = RequestUtils.getTableNames(pinotQuery);
+    if (tableNames == null || tableNames.isEmpty()) {
+      requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION);
+      return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION, "Failed 
to extract table name");
+    }
+    if (tableNames.size() != 1) {
+      requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION);
+      return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION, "System 
tables do not support joins");
+    }
+    String tableName = tableNames.iterator().next();
+    if (!isSystemTable(tableName)) {
+      requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION);
+      return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION, "Not a 
system table query");
+    }
+    AuthorizationResult authorizationResult =
+        hasTableAccess(requesterIdentity, Set.of(tableName), requestContext, 
httpHeaders);
+    if (!authorizationResult.hasAccess()) {
+      requestContext.setErrorCode(QueryErrorCode.ACCESS_DENIED);
+      return new BrokerResponseNative(QueryErrorCode.ACCESS_DENIED, 
authorizationResult.getFailureMessage());
+    }
+
+    return handleSystemTableQuery(pinotQuery, tableName, requestContext, 
requesterIdentity, query);
+  }
+
+  @Override
+  protected boolean handleCancel(long queryId, int timeoutMs, Executor 
executor,
+      HttpClientConnectionManager connMgr, Map<String, Integer> 
serverResponses) {
+    return false;
+  }
+
+  @Override
+  public boolean cancelQueryByClientId(String clientQueryId, int timeoutMs, 
Executor executor,
+      HttpClientConnectionManager connMgr, Map<String, Integer> 
serverResponses)
+      throws Exception {
+    return false;
+  }
+
+  @Override
+  public Map<Long, String> getRunningQueries() {
+    return Collections.emptyMap();
+  }
+
+  @Override
+  public OptionalLong getRequestIdByClientId(String clientQueryId) {
+    return OptionalLong.empty();
+  }
+
+  private boolean isSystemTable(String tableName) {
+    return tableName != null && 
tableName.toLowerCase(Locale.ROOT).startsWith("system.");
+  }
+
+  private BrokerResponse handleSystemTableQuery(PinotQuery pinotQuery, String 
tableName, RequestContext requestContext,
+      @Nullable RequesterIdentity requesterIdentity, String query) {
+    if (pinotQuery.isExplain()) {
+      return BrokerResponseNative.BROKER_ONLY_EXPLAIN_PLAN_OUTPUT;
+    }
+    SystemTableDataProvider provider = SystemTableRegistry.get(tableName);
+    if (provider == null) {
+      requestContext.setErrorCode(QueryErrorCode.TABLE_DOES_NOT_EXIST);
+      return BrokerResponseNative.TABLE_DOES_NOT_EXIST;
+    }
+    try {
+      if (!isSupportedSystemTableQuery(pinotQuery)) {
+        requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION);
+        return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION,
+            "System tables only support simple projection/filter/limit 
queries");

Review Comment:
   The error message does not specify which unsupported features were used 
(e.g., GROUP BY, ORDER BY, HAVING). Include the specific unsupported clause(s) 
found to help users understand the issue.



##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java:
##########
@@ -106,7 +113,25 @@ public BrokerResponse handleRequest(JsonNode request, 
@Nullable SqlNodeAndOption
       }
     }
 
+    Set<String> tableNames = null;
+    if (_systemTableBrokerRequestHandler != null) {
+      try {
+        // NOTE: compileToPinotQuery(SqlNodeAndOptions) mutates the SqlNode 
(e.g. SqlOrderBy -> SqlSelect).
+        // Re-parse from raw SQL so the SqlNodeAndOptions passed to the 
handler is unchanged (important for MSE).

Review Comment:
   The comment references 'MSE' without defining the acronym. Add '(Multi-Stage 
Engine)' after the first usage to clarify for future maintainers unfamiliar 
with this shorthand.
   ```suggestion
           // Re-parse from raw SQL so the SqlNodeAndOptions passed to the 
handler is unchanged (important for MSE (Multi-Stage Engine)).
   ```



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/DatabaseUtils.java:
##########
@@ -67,6 +67,19 @@ public static String translateTableName(String tableName, 
@Nullable String datab
       case 2:
         Preconditions.checkArgument(!tableSplit[1].isEmpty(), "Invalid table 
name '%s'", tableName);
         String databasePrefix = tableSplit[0];
+        /*
+         * Allow system tables to bypass database prefix validation so they 
can be queried regardless of the database
+         * header. System tables are intended to be globally accessible and 
are not subject to database-level access
+         * controls. Ensure system tables do not expose sensitive information 
because they can be queried without a
+         * matching database context.
+         *
+         * Security note: system tables are intentionally exempt from 
database-scoped access control; broader Pinot
+         * security is handled by AccessControl implementations (see 
org.apache.pinot.spi.security.AccessControl) and
+         * the Pinot security model documentation. New system tables should be 
vetted for sensitive content.
+         */
+        if ("system".equalsIgnoreCase(databasePrefix)) {
+          return tableName;

Review Comment:
   The newly added system table bypass logic lacks corresponding test coverage. 
Add a test case verifying that system.* tables are correctly exempted from 
database prefix validation.



##########
pinot-common/src/main/java/org/apache/pinot/common/systemtable/provider/TablesSystemTableProvider.java:
##########
@@ -0,0 +1,726 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.systemtable.provider;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.helix.HelixAdmin;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.systemtable.SystemTableDataProvider;
+import org.apache.pinot.common.systemtable.SystemTableResponseUtils;
+import org.apache.pinot.common.utils.request.RequestUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.systemtable.SystemTable;
+import org.apache.pinot.spi.utils.InstanceTypeUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.sql.FilterKind;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Basic system table exposing table-level metadata populated from the broker 
{@link TableCache}.
+ */
+@SystemTable
+public final class TablesSystemTableProvider implements 
SystemTableDataProvider {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TablesSystemTableProvider.class);
+  public static final String TABLE_NAME = "system.tables";
+  private static final long SIZE_CACHE_TTL_MS = 
Duration.ofMinutes(1).toMillis();
+
+  private static final Schema SCHEMA = new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+      .addSingleValueDimension("tableName", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("type", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("status", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("segments", FieldSpec.DataType.INT)
+      .addSingleValueDimension("totalDocs", FieldSpec.DataType.LONG)
+      .addMetric("reportedSize", FieldSpec.DataType.LONG)
+      .addMetric("estimatedSize", FieldSpec.DataType.LONG)
+      .addSingleValueDimension("storageTier", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("brokerTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("serverTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("replicas", FieldSpec.DataType.INT)
+      .addSingleValueDimension("tableConfig", FieldSpec.DataType.STRING)
+      .build();
+
+  private static final boolean IS_ADMIN_CLIENT_AVAILABLE;
+  static {
+    boolean available;
+    try {
+      Class.forName("org.apache.pinot.client.admin.PinotAdminClient");
+      available = true;
+    } catch (ClassNotFoundException e) {
+      available = false;
+    }
+    IS_ADMIN_CLIENT_AVAILABLE = available;
+  }
+
+  private final TableCache _tableCache;
+  private final @Nullable HelixAdmin _helixAdmin;
+  private final @Nullable String _clusterName;
+  private final HttpClient _httpClient;
+  private final @Nullable Function<String, TableSize> 
_tableSizeFetcherOverride;
+  private final List<String> _staticControllerUrls;
+  private final Map<String, CachedSize> _sizeCache = new ConcurrentHashMap<>();
+
+  public TablesSystemTableProvider() {
+    this(null, null, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache) {
+    this(tableCache, null, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin) {
+    this(tableCache, helixAdmin, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin,
+      @Nullable String clusterName) {
+    this(tableCache, helixAdmin, clusterName, null, null);
+  }
+
+  TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin, @Nullable String clusterName,
+      @Nullable Function<String, TableSize> tableSizeFetcherOverride, 
@Nullable List<String> controllerUrls) {
+    _tableCache = tableCache;
+    _helixAdmin = helixAdmin;
+    _clusterName = clusterName;
+    _httpClient = 
HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(5)).build();
+    _tableSizeFetcherOverride = tableSizeFetcherOverride;
+    _staticControllerUrls = controllerUrls != null ? new 
ArrayList<>(controllerUrls) : List.of();
+  }
+
+  @Override
+  public String getTableName() {
+    return TABLE_NAME;
+  }
+
+  @Override
+  public Schema getSchema() {
+    return SCHEMA;
+  }
+
+  @Override
+  public TableConfig getTableConfig() {
+    return new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+  }
+
+  @Override
+  public BrokerResponseNative getBrokerResponse(PinotQuery pinotQuery) {
+    List<String> projectionColumns = pinotQuery.getSelectList() != null
+        ? pinotQuery.getSelectList().stream().map(expr -> 
expr.getIdentifier().getName()).collect(Collectors.toList())
+        : List.of();
+    if (_tableCache == null) {
+      return SystemTableResponseUtils.buildBrokerResponse(TABLE_NAME, SCHEMA, 
projectionColumns, List.of(), 0);
+    }
+    Set<String> tableNamesWithType = new LinkedHashSet<>();
+    for (String tableName : _tableCache.getTableNameMap().values()) {
+      if (TableNameBuilder.getTableTypeFromTableName(tableName) != null) {
+        tableNamesWithType.add(tableName);
+      }
+    }
+    List<String> sortedTableNames = new ArrayList<>(tableNamesWithType);
+    sortedTableNames.sort(Comparator.naturalOrder());
+
+    int offset = Math.max(0, pinotQuery.getOffset());
+    int limit = pinotQuery.getLimit();
+    boolean hasLimit = limit > 0;
+    int totalRows = 0;
+    int initialCapacity = hasLimit ? Math.min(sortedTableNames.size(), limit) 
: 0;
+    List<GenericRow> rows = new ArrayList<>(initialCapacity);
+    for (String tableNameWithType : sortedTableNames) {
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType == null) {
+        continue;
+      }
+      String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+      TableStats stats = buildStats(tableNameWithType, tableType);
+      if (!matchesFilter(pinotQuery.getFilterExpression(), stats, 
rawTableName)) {
+        continue;
+      }
+      if (offset > 0) {
+        offset--;
+        totalRows++;
+        continue;
+      }
+      totalRows++;
+      if (limit == 0) {
+        continue;
+      }
+      if (hasLimit && rows.size() >= limit) {
+        continue;
+      }
+      GenericRow row = new GenericRow();
+      row.putValue("tableName", rawTableName);
+      row.putValue("type", stats._type);
+      row.putValue("status", stats._status);
+      row.putValue("segments", stats._segments);
+      row.putValue("totalDocs", stats._totalDocs);
+      row.putValue("reportedSize", stats._reportedSizeInBytes);
+      row.putValue("estimatedSize", stats._estimatedSizeInBytes);
+      row.putValue("storageTier", stats._storageTier);
+      row.putValue("brokerTenant", stats._brokerTenant);
+      row.putValue("serverTenant", stats._serverTenant);
+      row.putValue("replicas", stats._replicas);
+      row.putValue("tableConfig", stats._tableConfig);
+      rows.add(row);
+    }
+    return SystemTableResponseUtils.buildBrokerResponse(TABLE_NAME, SCHEMA, 
projectionColumns, rows, totalRows);
+  }
+
+  private TableStats buildStats(String tableNameWithType, TableType tableType) 
{
+    TableConfig tableConfig = _tableCache.getTableConfig(tableNameWithType);
+    int segments = 0;
+    long totalDocs = 0;
+    long reportedSize = 0;
+    long estimatedSize = 0;
+    String tierValue = "";
+    String brokerTenant = "";
+    String serverTenant = "";
+    int replicas = 0;
+    if (tableConfig != null && tableConfig.getTenantConfig() != null) {
+      brokerTenant = tableConfig.getTenantConfig().getBroker();
+      serverTenant = tableConfig.getTenantConfig().getServer();
+    }
+    if (tableConfig != null && tableConfig.getValidationConfig() != null) {
+      Integer repl = tableConfig.getValidationConfig().getReplicationNumber();
+      replicas = repl != null ? repl : replicas;
+    }
+    // Use controller API only
+    TableSize sizeFromController = fetchTableSize(tableNameWithType);
+    if (sizeFromController != null) {
+      if (sizeFromController._reportedSizeInBytes >= 0) {
+        reportedSize = sizeFromController._reportedSizeInBytes;
+      }
+      if (sizeFromController._estimatedSizeInBytes >= 0) {
+        estimatedSize = sizeFromController._estimatedSizeInBytes;
+      }
+      segments = getSegmentCount(sizeFromController, tableType);
+      totalDocs = getTotalDocs(sizeFromController, tableType);
+    }
+    String status = tableConfig != null ? "ONLINE" : (segments > 0 ? "ONLINE" 
: "UNKNOWN");
+    String tableConfigJson = "";
+    if (tableConfig != null) {
+      try {
+        tableConfigJson = JsonUtils.objectToString(tableConfig);
+      } catch (Exception e) {
+        LOGGER.warn("Failed to serialize table config for {}: {}", 
tableNameWithType, e.toString());
+        tableConfigJson = tableConfig.toString();
+      }
+    }
+    return new TableStats(tableType.name(), status, segments, totalDocs, 
reportedSize, estimatedSize, tierValue,
+        tableConfigJson, brokerTenant, serverTenant, replicas);
+  }
+
+  private boolean matchesFilter(@Nullable 
org.apache.pinot.common.request.Expression filterExpression, TableStats stats,
+      String rawTableName) {
+    if (filterExpression == null) {
+      return true;
+    }
+    org.apache.pinot.common.request.Function function = 
filterExpression.getFunctionCall();
+    if (function == null) {
+      return true;
+    }
+    FilterKind filterKind = toFilterKind(function.getOperator());
+    if (filterKind == null) {
+      return true;
+    }
+    switch (filterKind) {
+      case AND:
+        for (org.apache.pinot.common.request.Expression child : 
function.getOperands()) {
+          if (!matchesFilter(child, stats, rawTableName)) {
+            return false;
+          }
+        }
+        return true;
+      case OR:
+        for (org.apache.pinot.common.request.Expression child : 
function.getOperands()) {
+          if (matchesFilter(child, stats, rawTableName)) {
+            return true;
+          }
+        }
+        return false;
+      case NOT:
+        if (function.getOperandsSize() == 0) {
+          return true;
+        }
+        return !matchesFilter(function.getOperands().get(0), stats, 
rawTableName);
+      default:
+        return matchesLeafFilter(filterKind, function.getOperands(), stats, 
rawTableName);
+    }
+  }
+
+  private boolean matchesLeafFilter(FilterKind filterKind,
+      List<org.apache.pinot.common.request.Expression> operands, TableStats 
stats, String rawTableName) {
+    String column = extractIdentifier(operands);
+    if (column == null) {
+      return true;
+    }
+    List<String> values = extractLiteralValues(operands);
+    if (values.isEmpty()) {
+      return true;
+    }
+    switch (column.toLowerCase(Locale.ROOT)) {
+      case "tablename":
+        return matchesString(values, rawTableName, filterKind);
+      case "type":
+        return matchesString(values, stats._type, filterKind);
+      case "status":
+        return matchesString(values, stats._status, filterKind);
+      case "segments":
+        return matchesNumber(values, stats._segments, filterKind);
+      case "reportedsize":
+        return matchesNumber(values, stats._reportedSizeInBytes, filterKind);
+      case "estimatedsize":
+        return matchesNumber(values, stats._estimatedSizeInBytes, filterKind);
+      default:
+        return true;
+    }
+  }
+
+  private boolean matchesString(List<String> candidates, String actual, 
FilterKind filterKind) {
+    switch (filterKind) {
+      case EQUALS:
+      case IN:
+        return candidates.stream().anyMatch(v -> v.equalsIgnoreCase(actual));
+      case NOT_EQUALS:
+        return candidates.stream().noneMatch(v -> v.equalsIgnoreCase(actual));
+      default:
+        return true;
+    }
+  }
+
+  private boolean matchesNumber(List<String> candidates, long actual, 
FilterKind filterKind) {
+    try {
+      switch (filterKind) {
+        case EQUALS:
+          return candidates.stream().anyMatch(v -> Long.parseLong(v) == 
actual);
+        case NOT_EQUALS:
+          return candidates.stream().noneMatch(v -> Long.parseLong(v) == 
actual);
+        case GREATER_THAN:
+          return candidates.stream().anyMatch(v -> actual > Long.parseLong(v));
+        case GREATER_THAN_OR_EQUAL:
+          return candidates.stream().anyMatch(v -> actual >= 
Long.parseLong(v));
+        case LESS_THAN:
+          return candidates.stream().anyMatch(v -> actual < Long.parseLong(v));
+        case LESS_THAN_OR_EQUAL:
+          return candidates.stream().anyMatch(v -> actual <= 
Long.parseLong(v));
+        case IN:
+          for (String candidate : candidates) {
+            if (actual == Long.parseLong(candidate)) {
+              return true;
+            }
+          }
+          return false;
+        case RANGE:
+        case BETWEEN:
+          if (candidates.size() >= 2) {
+            long lower = Long.parseLong(candidates.get(0));
+            long upper = Long.parseLong(candidates.get(1));
+            return actual >= lower && actual <= upper;
+          }
+          return true;
+        default:
+          return true;
+      }
+    } catch (NumberFormatException e) {
+      LOGGER.debug("Failed to parse numeric filter value {}: {}", candidates, 
e.toString());
+      return true;
+    }
+  }
+
+  private @Nullable String 
extractIdentifier(List<org.apache.pinot.common.request.Expression> operands) {
+    for (org.apache.pinot.common.request.Expression operand : operands) {
+      org.apache.pinot.common.request.Identifier identifier = 
operand.getIdentifier();
+      if (identifier != null) {
+        return identifier.getName();
+      }
+    }
+    return null;
+  }
+
+  private List<String> 
extractLiteralValues(List<org.apache.pinot.common.request.Expression> operands) 
{
+    List<String> values = new ArrayList<>();
+    for (org.apache.pinot.common.request.Expression operand : operands) {
+      org.apache.pinot.common.request.Literal literal = operand.getLiteral();
+      if (literal != null) {
+        if (literal.getSetField() == 
org.apache.pinot.common.request.Literal._Fields.NULL_VALUE) {
+          values.add("null");
+          continue;
+        }
+        try {
+          values.add(RequestUtils.getLiteralString(literal));
+        } catch (Exception e) {
+          values.add(String.valueOf(RequestUtils.getLiteralValue(literal)));
+        }
+      }
+    }
+    return values;
+  }
+
+  private @Nullable FilterKind toFilterKind(String operator) {
+    String normalized = operator.toUpperCase(Locale.ROOT);
+    switch (normalized) {
+      case "EQ":
+        return FilterKind.EQUALS;
+      case "NEQ":
+        return FilterKind.NOT_EQUALS;
+      case "GT":
+        return FilterKind.GREATER_THAN;
+      case "GTE":
+        return FilterKind.GREATER_THAN_OR_EQUAL;
+      case "LT":
+        return FilterKind.LESS_THAN;
+      case "LTE":
+        return FilterKind.LESS_THAN_OR_EQUAL;
+      default:
+        try {
+          return FilterKind.valueOf(normalized);
+        } catch (Exception e) {
+          return null;
+        }
+    }
+  }
+
+  private @Nullable TableSize fetchTableSize(String tableNameWithType) {
+    TableSize cached = getCachedSize(tableNameWithType);
+    if (cached != null) {
+      return cached;
+    }
+    Function<String, TableSize> fetcher = getSizeFetcher();
+    if (fetcher != null) {
+      try {
+        TableSize fetched = fetcher.apply(tableNameWithType);
+        if (fetched != null) {
+          cacheSize(tableNameWithType, fetched);
+          return fetched;
+        }
+      } catch (Exception e) {
+        LOGGER.warn("Table size fetcher failed for {}: {}", tableNameWithType, 
e.toString());
+      }
+    }
+    String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+    TableSize size = fetchTableSizeForName(rawTableName);
+    if (size == null) {
+      size = fetchTableSizeForName(tableNameWithType);
+      if (size == null) {
+        LOGGER.warn("{}: failed to fetch size for {} (raw: {}), returning 
null", TABLE_NAME, tableNameWithType,
+            rawTableName);
+      }
+    }
+    if (size != null) {
+      cacheSize(tableNameWithType, size);
+    }
+    return size;
+  }
+
+  private @Nullable TableSize fetchTableSizeForName(String tableName) {
+    for (String baseUrl : getControllerBaseUrls()) {
+      try {
+        String url = baseUrl + "/tables/" + tableName + 
"/size?verbose=true&includeReplacedSegments=false";
+        HttpRequest request = HttpRequest.newBuilder(URI.create(url))
+            .timeout(Duration.ofSeconds(5))
+            .GET()
+            .header("Accept", "application/json")
+            .build();
+        HttpResponse<String> response = _httpClient.send(request, 
HttpResponse.BodyHandlers.ofString());
+        if (response.statusCode() >= 200 && response.statusCode() < 300) {
+          TableSize parsed = JsonUtils.stringToObject(response.body(), 
TableSize.class);
+          LOGGER.debug("{}: controller size response for {} via {} -> segments 
offline={}, realtime={}, "
+                  + "reportedSize={}, estimatedSize={}", TABLE_NAME, 
tableName, baseUrl,
+              parsed._offlineSegments != null && 
parsed._offlineSegments._segments != null
+                  ? parsed._offlineSegments._segments.size() : 0,
+              parsed._realtimeSegments != null && 
parsed._realtimeSegments._segments != null
+                  ? parsed._realtimeSegments._segments.size() : 0,
+              parsed._reportedSizeInBytes, parsed._estimatedSizeInBytes);
+          return parsed;
+        } else {
+          LOGGER.warn("{}: failed to fetch table size for {} via {}: status 
{}, body={}", TABLE_NAME, tableName,
+              baseUrl, response.statusCode(), response.body());
+        }
+      } catch (Exception e) {
+        LOGGER.warn("{}: error fetching table size for {} via {}", TABLE_NAME, 
tableName, baseUrl, e);
+      }
+    }
+    return null;
+  }
+
+  private List<String> getControllerBaseUrls() {
+    Set<String> urls = new LinkedHashSet<>();
+    if (_helixAdmin != null) {
+      for (String controller : discoverControllersFromHelix()) {
+        String normalized = normalizeControllerUrl(controller);
+        if (normalized != null) {
+          urls.add(normalized);
+        }
+      }
+    }
+    for (String url : _staticControllerUrls) {
+      String normalized = normalizeControllerUrl(url);
+      if (normalized != null) {
+        urls.add(normalized);
+      }
+    }
+    return new ArrayList<>(urls);
+  }
+
+  private int getSegmentCount(TableSize sizeFromController, TableType 
tableType) {
+    if (tableType == TableType.OFFLINE && sizeFromController._offlineSegments 
!= null
+        && sizeFromController._offlineSegments._segments != null) {
+      return sizeFromController._offlineSegments._segments.size();
+    }
+    if (tableType == TableType.REALTIME && 
sizeFromController._realtimeSegments != null
+        && sizeFromController._realtimeSegments._segments != null) {
+      return sizeFromController._realtimeSegments._segments.size();
+    }
+    return 0;
+  }
+
+  private long getTotalDocs(TableSize sizeFromController, TableType tableType) 
{
+    if (tableType == TableType.OFFLINE && sizeFromController._offlineSegments 
!= null
+        && sizeFromController._offlineSegments._segments != null) {
+      return sizeFromController._offlineSegments._segments.values().stream()
+          .mapToLong(segmentSize -> segmentSize._totalDocs).sum();
+    }
+    if (tableType == TableType.REALTIME && 
sizeFromController._realtimeSegments != null
+        && sizeFromController._realtimeSegments._segments != null) {
+      return sizeFromController._realtimeSegments._segments.values().stream()
+          .mapToLong(segmentSize -> segmentSize._totalDocs).sum();
+    }
+    return 0;
+  }
+
+  private @Nullable Function<String, TableSize> getSizeFetcher() {
+    if (_tableSizeFetcherOverride != null) {
+      return _tableSizeFetcherOverride;
+    }
+    List<String> controllers = getControllerBaseUrls();
+    if (controllers.isEmpty() || !isAdminClientAvailable()) {
+      return null;
+    }
+    return tableNameWithType -> fetchWithAdminClient(controllers, 
tableNameWithType);
+  }
+
+  private List<String> discoverControllersFromHelix() {
+    List<String> urls = new ArrayList<>();
+    try {
+      if (_clusterName == null) {
+        LOGGER.warn("Cannot discover controllers without cluster name");
+        return List.of();
+      }
+      for (String controllerId : 
_helixAdmin.getInstancesInCluster(_clusterName)) {
+        if (!InstanceTypeUtils.isController(controllerId)) {
+          continue;
+        }
+        int firstUnderscore = controllerId.indexOf('_');
+        int lastUnderscore = controllerId.lastIndexOf('_');
+        if (firstUnderscore > 0 && lastUnderscore > firstUnderscore && 
lastUnderscore < controllerId.length() - 1) {
+          String host = controllerId.substring(firstUnderscore + 1, 
lastUnderscore);
+          String port = controllerId.substring(lastUnderscore + 1);
+          if (!host.isEmpty() && !port.isEmpty()) {
+            urls.add(host + ":" + port);
+          } else {
+            LOGGER.warn("Unable to parse controller address from instance id 
(empty host/port): {}", controllerId);
+          }
+        } else {
+          LOGGER.warn("Unable to parse controller address from instance id: 
{}", controllerId);

Review Comment:
   The error message does not explain *why* parsing failed. Consider including 
details about the expected format or the actual structure encountered to aid 
debugging.
   ```suggestion
             LOGGER.warn(
                 "Unable to parse controller address from instance id '{}': 
expected format '<prefix>_<host>_<port>' "
                     + "but got firstUnderscore={}, lastUnderscore={}, 
length={}",
                 controllerId, firstUnderscore, lastUnderscore, 
controllerId.length());
   ```



##########
pinot-common/src/main/java/org/apache/pinot/common/systemtable/provider/TablesSystemTableProvider.java:
##########
@@ -0,0 +1,726 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.systemtable.provider;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.helix.HelixAdmin;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.systemtable.SystemTableDataProvider;
+import org.apache.pinot.common.systemtable.SystemTableResponseUtils;
+import org.apache.pinot.common.utils.request.RequestUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.systemtable.SystemTable;
+import org.apache.pinot.spi.utils.InstanceTypeUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.sql.FilterKind;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Basic system table exposing table-level metadata populated from the broker 
{@link TableCache}.
+ */
+@SystemTable
+public final class TablesSystemTableProvider implements 
SystemTableDataProvider {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TablesSystemTableProvider.class);
+  public static final String TABLE_NAME = "system.tables";
+  private static final long SIZE_CACHE_TTL_MS = 
Duration.ofMinutes(1).toMillis();
+
+  private static final Schema SCHEMA = new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+      .addSingleValueDimension("tableName", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("type", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("status", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("segments", FieldSpec.DataType.INT)
+      .addSingleValueDimension("totalDocs", FieldSpec.DataType.LONG)
+      .addMetric("reportedSize", FieldSpec.DataType.LONG)
+      .addMetric("estimatedSize", FieldSpec.DataType.LONG)
+      .addSingleValueDimension("storageTier", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("brokerTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("serverTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("replicas", FieldSpec.DataType.INT)
+      .addSingleValueDimension("tableConfig", FieldSpec.DataType.STRING)
+      .build();
+
+  private static final boolean IS_ADMIN_CLIENT_AVAILABLE;
+  static {
+    boolean available;
+    try {
+      Class.forName("org.apache.pinot.client.admin.PinotAdminClient");
+      available = true;
+    } catch (ClassNotFoundException e) {
+      available = false;
+    }
+    IS_ADMIN_CLIENT_AVAILABLE = available;
+  }
+
+  private final TableCache _tableCache;
+  private final @Nullable HelixAdmin _helixAdmin;
+  private final @Nullable String _clusterName;
+  private final HttpClient _httpClient;
+  private final @Nullable Function<String, TableSize> 
_tableSizeFetcherOverride;
+  private final List<String> _staticControllerUrls;
+  private final Map<String, CachedSize> _sizeCache = new ConcurrentHashMap<>();
+
+  public TablesSystemTableProvider() {
+    this(null, null, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache) {
+    this(tableCache, null, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin) {
+    this(tableCache, helixAdmin, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin,
+      @Nullable String clusterName) {
+    this(tableCache, helixAdmin, clusterName, null, null);
+  }
+
+  TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin, @Nullable String clusterName,
+      @Nullable Function<String, TableSize> tableSizeFetcherOverride, 
@Nullable List<String> controllerUrls) {
+    _tableCache = tableCache;
+    _helixAdmin = helixAdmin;
+    _clusterName = clusterName;
+    _httpClient = 
HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(5)).build();
+    _tableSizeFetcherOverride = tableSizeFetcherOverride;
+    _staticControllerUrls = controllerUrls != null ? new 
ArrayList<>(controllerUrls) : List.of();
+  }
+
+  @Override
+  public String getTableName() {
+    return TABLE_NAME;
+  }
+
+  @Override
+  public Schema getSchema() {
+    return SCHEMA;
+  }
+
+  @Override
+  public TableConfig getTableConfig() {
+    return new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+  }
+
+  @Override
+  public BrokerResponseNative getBrokerResponse(PinotQuery pinotQuery) {
+    List<String> projectionColumns = pinotQuery.getSelectList() != null
+        ? pinotQuery.getSelectList().stream().map(expr -> 
expr.getIdentifier().getName()).collect(Collectors.toList())
+        : List.of();
+    if (_tableCache == null) {
+      return SystemTableResponseUtils.buildBrokerResponse(TABLE_NAME, SCHEMA, 
projectionColumns, List.of(), 0);
+    }
+    Set<String> tableNamesWithType = new LinkedHashSet<>();
+    for (String tableName : _tableCache.getTableNameMap().values()) {
+      if (TableNameBuilder.getTableTypeFromTableName(tableName) != null) {
+        tableNamesWithType.add(tableName);
+      }
+    }
+    List<String> sortedTableNames = new ArrayList<>(tableNamesWithType);
+    sortedTableNames.sort(Comparator.naturalOrder());
+
+    int offset = Math.max(0, pinotQuery.getOffset());
+    int limit = pinotQuery.getLimit();
+    boolean hasLimit = limit > 0;
+    int totalRows = 0;
+    int initialCapacity = hasLimit ? Math.min(sortedTableNames.size(), limit) 
: 0;
+    List<GenericRow> rows = new ArrayList<>(initialCapacity);
+    for (String tableNameWithType : sortedTableNames) {
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType == null) {
+        continue;
+      }
+      String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+      TableStats stats = buildStats(tableNameWithType, tableType);
+      if (!matchesFilter(pinotQuery.getFilterExpression(), stats, 
rawTableName)) {
+        continue;
+      }
+      if (offset > 0) {
+        offset--;
+        totalRows++;
+        continue;
+      }
+      totalRows++;
+      if (limit == 0) {
+        continue;
+      }
+      if (hasLimit && rows.size() >= limit) {
+        continue;
+      }
+      GenericRow row = new GenericRow();
+      row.putValue("tableName", rawTableName);
+      row.putValue("type", stats._type);
+      row.putValue("status", stats._status);
+      row.putValue("segments", stats._segments);
+      row.putValue("totalDocs", stats._totalDocs);
+      row.putValue("reportedSize", stats._reportedSizeInBytes);
+      row.putValue("estimatedSize", stats._estimatedSizeInBytes);
+      row.putValue("storageTier", stats._storageTier);
+      row.putValue("brokerTenant", stats._brokerTenant);
+      row.putValue("serverTenant", stats._serverTenant);
+      row.putValue("replicas", stats._replicas);
+      row.putValue("tableConfig", stats._tableConfig);
+      rows.add(row);
+    }
+    return SystemTableResponseUtils.buildBrokerResponse(TABLE_NAME, SCHEMA, 
projectionColumns, rows, totalRows);
+  }
+
+  private TableStats buildStats(String tableNameWithType, TableType tableType) 
{
+    TableConfig tableConfig = _tableCache.getTableConfig(tableNameWithType);
+    int segments = 0;
+    long totalDocs = 0;
+    long reportedSize = 0;
+    long estimatedSize = 0;
+    String tierValue = "";
+    String brokerTenant = "";
+    String serverTenant = "";
+    int replicas = 0;
+    if (tableConfig != null && tableConfig.getTenantConfig() != null) {
+      brokerTenant = tableConfig.getTenantConfig().getBroker();
+      serverTenant = tableConfig.getTenantConfig().getServer();
+    }
+    if (tableConfig != null && tableConfig.getValidationConfig() != null) {
+      Integer repl = tableConfig.getValidationConfig().getReplicationNumber();
+      replicas = repl != null ? repl : replicas;
+    }
+    // Use controller API only
+    TableSize sizeFromController = fetchTableSize(tableNameWithType);
+    if (sizeFromController != null) {
+      if (sizeFromController._reportedSizeInBytes >= 0) {
+        reportedSize = sizeFromController._reportedSizeInBytes;
+      }
+      if (sizeFromController._estimatedSizeInBytes >= 0) {
+        estimatedSize = sizeFromController._estimatedSizeInBytes;
+      }
+      segments = getSegmentCount(sizeFromController, tableType);
+      totalDocs = getTotalDocs(sizeFromController, tableType);
+    }
+    String status = tableConfig != null ? "ONLINE" : (segments > 0 ? "ONLINE" 
: "UNKNOWN");
+    String tableConfigJson = "";
+    if (tableConfig != null) {
+      try {
+        tableConfigJson = JsonUtils.objectToString(tableConfig);
+      } catch (Exception e) {
+        LOGGER.warn("Failed to serialize table config for {}: {}", 
tableNameWithType, e.toString());
+        tableConfigJson = tableConfig.toString();
+      }
+    }
+    return new TableStats(tableType.name(), status, segments, totalDocs, 
reportedSize, estimatedSize, tierValue,
+        tableConfigJson, brokerTenant, serverTenant, replicas);
+  }
+
+  private boolean matchesFilter(@Nullable 
org.apache.pinot.common.request.Expression filterExpression, TableStats stats,
+      String rawTableName) {
+    if (filterExpression == null) {
+      return true;
+    }
+    org.apache.pinot.common.request.Function function = 
filterExpression.getFunctionCall();
+    if (function == null) {
+      return true;
+    }
+    FilterKind filterKind = toFilterKind(function.getOperator());
+    if (filterKind == null) {
+      return true;
+    }
+    switch (filterKind) {
+      case AND:
+        for (org.apache.pinot.common.request.Expression child : 
function.getOperands()) {
+          if (!matchesFilter(child, stats, rawTableName)) {
+            return false;
+          }
+        }
+        return true;
+      case OR:
+        for (org.apache.pinot.common.request.Expression child : 
function.getOperands()) {
+          if (matchesFilter(child, stats, rawTableName)) {
+            return true;
+          }
+        }
+        return false;
+      case NOT:
+        if (function.getOperandsSize() == 0) {
+          return true;
+        }
+        return !matchesFilter(function.getOperands().get(0), stats, 
rawTableName);
+      default:
+        return matchesLeafFilter(filterKind, function.getOperands(), stats, 
rawTableName);
+    }
+  }
+
+  private boolean matchesLeafFilter(FilterKind filterKind,
+      List<org.apache.pinot.common.request.Expression> operands, TableStats 
stats, String rawTableName) {
+    String column = extractIdentifier(operands);
+    if (column == null) {
+      return true;
+    }
+    List<String> values = extractLiteralValues(operands);
+    if (values.isEmpty()) {
+      return true;
+    }
+    switch (column.toLowerCase(Locale.ROOT)) {
+      case "tablename":
+        return matchesString(values, rawTableName, filterKind);
+      case "type":
+        return matchesString(values, stats._type, filterKind);
+      case "status":
+        return matchesString(values, stats._status, filterKind);
+      case "segments":
+        return matchesNumber(values, stats._segments, filterKind);
+      case "reportedsize":
+        return matchesNumber(values, stats._reportedSizeInBytes, filterKind);
+      case "estimatedsize":
+        return matchesNumber(values, stats._estimatedSizeInBytes, filterKind);
+      default:
+        return true;
+    }
+  }
+
+  private boolean matchesString(List<String> candidates, String actual, 
FilterKind filterKind) {
+    switch (filterKind) {
+      case EQUALS:
+      case IN:
+        return candidates.stream().anyMatch(v -> v.equalsIgnoreCase(actual));
+      case NOT_EQUALS:
+        return candidates.stream().noneMatch(v -> v.equalsIgnoreCase(actual));
+      default:
+        return true;
+    }
+  }
+
+  private boolean matchesNumber(List<String> candidates, long actual, 
FilterKind filterKind) {
+    try {
+      switch (filterKind) {
+        case EQUALS:
+          return candidates.stream().anyMatch(v -> Long.parseLong(v) == 
actual);
+        case NOT_EQUALS:
+          return candidates.stream().noneMatch(v -> Long.parseLong(v) == 
actual);
+        case GREATER_THAN:
+          return candidates.stream().anyMatch(v -> actual > Long.parseLong(v));
+        case GREATER_THAN_OR_EQUAL:
+          return candidates.stream().anyMatch(v -> actual >= 
Long.parseLong(v));
+        case LESS_THAN:
+          return candidates.stream().anyMatch(v -> actual < Long.parseLong(v));
+        case LESS_THAN_OR_EQUAL:
+          return candidates.stream().anyMatch(v -> actual <= 
Long.parseLong(v));
+        case IN:
+          for (String candidate : candidates) {
+            if (actual == Long.parseLong(candidate)) {
+              return true;
+            }
+          }
+          return false;
+        case RANGE:
+        case BETWEEN:
+          if (candidates.size() >= 2) {
+            long lower = Long.parseLong(candidates.get(0));
+            long upper = Long.parseLong(candidates.get(1));
+            return actual >= lower && actual <= upper;
+          }
+          return true;
+        default:
+          return true;
+      }
+    } catch (NumberFormatException e) {
+      LOGGER.debug("Failed to parse numeric filter value {}: {}", candidates, 
e.toString());
+      return true;
+    }
+  }
+
+  private @Nullable String 
extractIdentifier(List<org.apache.pinot.common.request.Expression> operands) {
+    for (org.apache.pinot.common.request.Expression operand : operands) {
+      org.apache.pinot.common.request.Identifier identifier = 
operand.getIdentifier();
+      if (identifier != null) {
+        return identifier.getName();
+      }
+    }
+    return null;
+  }
+
+  private List<String> 
extractLiteralValues(List<org.apache.pinot.common.request.Expression> operands) 
{
+    List<String> values = new ArrayList<>();
+    for (org.apache.pinot.common.request.Expression operand : operands) {
+      org.apache.pinot.common.request.Literal literal = operand.getLiteral();
+      if (literal != null) {
+        if (literal.getSetField() == 
org.apache.pinot.common.request.Literal._Fields.NULL_VALUE) {
+          values.add("null");
+          continue;
+        }
+        try {
+          values.add(RequestUtils.getLiteralString(literal));
+        } catch (Exception e) {
+          values.add(String.valueOf(RequestUtils.getLiteralValue(literal)));
+        }
+      }
+    }
+    return values;
+  }
+
+  private @Nullable FilterKind toFilterKind(String operator) {
+    String normalized = operator.toUpperCase(Locale.ROOT);
+    switch (normalized) {
+      case "EQ":
+        return FilterKind.EQUALS;
+      case "NEQ":
+        return FilterKind.NOT_EQUALS;
+      case "GT":
+        return FilterKind.GREATER_THAN;
+      case "GTE":
+        return FilterKind.GREATER_THAN_OR_EQUAL;
+      case "LT":
+        return FilterKind.LESS_THAN;
+      case "LTE":
+        return FilterKind.LESS_THAN_OR_EQUAL;
+      default:
+        try {
+          return FilterKind.valueOf(normalized);
+        } catch (Exception e) {
+          return null;
+        }
+    }
+  }
+
+  private @Nullable TableSize fetchTableSize(String tableNameWithType) {
+    TableSize cached = getCachedSize(tableNameWithType);
+    if (cached != null) {
+      return cached;
+    }
+    Function<String, TableSize> fetcher = getSizeFetcher();
+    if (fetcher != null) {
+      try {
+        TableSize fetched = fetcher.apply(tableNameWithType);
+        if (fetched != null) {
+          cacheSize(tableNameWithType, fetched);
+          return fetched;
+        }
+      } catch (Exception e) {
+        LOGGER.warn("Table size fetcher failed for {}: {}", tableNameWithType, 
e.toString());
+      }
+    }
+    String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+    TableSize size = fetchTableSizeForName(rawTableName);
+    if (size == null) {
+      size = fetchTableSizeForName(tableNameWithType);
+      if (size == null) {
+        LOGGER.warn("{}: failed to fetch size for {} (raw: {}), returning 
null", TABLE_NAME, tableNameWithType,
+            rawTableName);
+      }
+    }
+    if (size != null) {
+      cacheSize(tableNameWithType, size);
+    }
+    return size;
+  }
+
+  private @Nullable TableSize fetchTableSizeForName(String tableName) {
+    for (String baseUrl : getControllerBaseUrls()) {
+      try {
+        String url = baseUrl + "/tables/" + tableName + 
"/size?verbose=true&includeReplacedSegments=false";
+        HttpRequest request = HttpRequest.newBuilder(URI.create(url))
+            .timeout(Duration.ofSeconds(5))
+            .GET()
+            .header("Accept", "application/json")
+            .build();
+        HttpResponse<String> response = _httpClient.send(request, 
HttpResponse.BodyHandlers.ofString());
+        if (response.statusCode() >= 200 && response.statusCode() < 300) {
+          TableSize parsed = JsonUtils.stringToObject(response.body(), 
TableSize.class);
+          LOGGER.debug("{}: controller size response for {} via {} -> segments 
offline={}, realtime={}, "
+                  + "reportedSize={}, estimatedSize={}", TABLE_NAME, 
tableName, baseUrl,
+              parsed._offlineSegments != null && 
parsed._offlineSegments._segments != null
+                  ? parsed._offlineSegments._segments.size() : 0,
+              parsed._realtimeSegments != null && 
parsed._realtimeSegments._segments != null
+                  ? parsed._realtimeSegments._segments.size() : 0,
+              parsed._reportedSizeInBytes, parsed._estimatedSizeInBytes);

Review Comment:
   The nested null checks for segment counts make the logging statement 
difficult to read. Extract these calculations into helper methods like 
getOfflineSegmentCount() and getRealtimeSegmentCount() to improve readability.



##########
pinot-common/src/main/java/org/apache/pinot/common/systemtable/provider/TablesSystemTableProvider.java:
##########
@@ -0,0 +1,726 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.systemtable.provider;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.helix.HelixAdmin;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.systemtable.SystemTableDataProvider;
+import org.apache.pinot.common.systemtable.SystemTableResponseUtils;
+import org.apache.pinot.common.utils.request.RequestUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.systemtable.SystemTable;
+import org.apache.pinot.spi.utils.InstanceTypeUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.sql.FilterKind;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Basic system table exposing table-level metadata populated from the broker 
{@link TableCache}.
+ */
+@SystemTable
+public final class TablesSystemTableProvider implements 
SystemTableDataProvider {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TablesSystemTableProvider.class);
+  public static final String TABLE_NAME = "system.tables";
+  private static final long SIZE_CACHE_TTL_MS = 
Duration.ofMinutes(1).toMillis();
+
+  private static final Schema SCHEMA = new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+      .addSingleValueDimension("tableName", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("type", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("status", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("segments", FieldSpec.DataType.INT)
+      .addSingleValueDimension("totalDocs", FieldSpec.DataType.LONG)
+      .addMetric("reportedSize", FieldSpec.DataType.LONG)
+      .addMetric("estimatedSize", FieldSpec.DataType.LONG)
+      .addSingleValueDimension("storageTier", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("brokerTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("serverTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("replicas", FieldSpec.DataType.INT)
+      .addSingleValueDimension("tableConfig", FieldSpec.DataType.STRING)
+      .build();
+
+  private static final boolean IS_ADMIN_CLIENT_AVAILABLE;
+  static {
+    boolean available;
+    try {
+      Class.forName("org.apache.pinot.client.admin.PinotAdminClient");
+      available = true;
+    } catch (ClassNotFoundException e) {
+      available = false;
+    }
+    IS_ADMIN_CLIENT_AVAILABLE = available;
+  }
+
+  private final TableCache _tableCache;
+  private final @Nullable HelixAdmin _helixAdmin;
+  private final @Nullable String _clusterName;
+  private final HttpClient _httpClient;
+  private final @Nullable Function<String, TableSize> 
_tableSizeFetcherOverride;
+  private final List<String> _staticControllerUrls;
+  private final Map<String, CachedSize> _sizeCache = new ConcurrentHashMap<>();
+
+  public TablesSystemTableProvider() {
+    this(null, null, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache) {
+    this(tableCache, null, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin) {
+    this(tableCache, helixAdmin, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin,
+      @Nullable String clusterName) {
+    this(tableCache, helixAdmin, clusterName, null, null);
+  }
+
+  TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin, @Nullable String clusterName,
+      @Nullable Function<String, TableSize> tableSizeFetcherOverride, 
@Nullable List<String> controllerUrls) {
+    _tableCache = tableCache;
+    _helixAdmin = helixAdmin;
+    _clusterName = clusterName;
+    _httpClient = 
HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(5)).build();
+    _tableSizeFetcherOverride = tableSizeFetcherOverride;
+    _staticControllerUrls = controllerUrls != null ? new 
ArrayList<>(controllerUrls) : List.of();
+  }
+
+  @Override
+  public String getTableName() {
+    return TABLE_NAME;
+  }
+
+  @Override
+  public Schema getSchema() {
+    return SCHEMA;
+  }
+
+  @Override
+  public TableConfig getTableConfig() {
+    return new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+  }
+
+  @Override
+  public BrokerResponseNative getBrokerResponse(PinotQuery pinotQuery) {
+    List<String> projectionColumns = pinotQuery.getSelectList() != null
+        ? pinotQuery.getSelectList().stream().map(expr -> 
expr.getIdentifier().getName()).collect(Collectors.toList())
+        : List.of();
+    if (_tableCache == null) {
+      return SystemTableResponseUtils.buildBrokerResponse(TABLE_NAME, SCHEMA, 
projectionColumns, List.of(), 0);
+    }
+    Set<String> tableNamesWithType = new LinkedHashSet<>();
+    for (String tableName : _tableCache.getTableNameMap().values()) {
+      if (TableNameBuilder.getTableTypeFromTableName(tableName) != null) {
+        tableNamesWithType.add(tableName);
+      }
+    }
+    List<String> sortedTableNames = new ArrayList<>(tableNamesWithType);
+    sortedTableNames.sort(Comparator.naturalOrder());
+
+    int offset = Math.max(0, pinotQuery.getOffset());
+    int limit = pinotQuery.getLimit();
+    boolean hasLimit = limit > 0;
+    int totalRows = 0;
+    int initialCapacity = hasLimit ? Math.min(sortedTableNames.size(), limit) 
: 0;
+    List<GenericRow> rows = new ArrayList<>(initialCapacity);
+    for (String tableNameWithType : sortedTableNames) {
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType == null) {
+        continue;
+      }
+      String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+      TableStats stats = buildStats(tableNameWithType, tableType);
+      if (!matchesFilter(pinotQuery.getFilterExpression(), stats, 
rawTableName)) {
+        continue;
+      }
+      if (offset > 0) {
+        offset--;
+        totalRows++;
+        continue;
+      }
+      totalRows++;
+      if (limit == 0) {
+        continue;
+      }
+      if (hasLimit && rows.size() >= limit) {
+        continue;
+      }
+      GenericRow row = new GenericRow();
+      row.putValue("tableName", rawTableName);
+      row.putValue("type", stats._type);
+      row.putValue("status", stats._status);
+      row.putValue("segments", stats._segments);
+      row.putValue("totalDocs", stats._totalDocs);
+      row.putValue("reportedSize", stats._reportedSizeInBytes);
+      row.putValue("estimatedSize", stats._estimatedSizeInBytes);
+      row.putValue("storageTier", stats._storageTier);
+      row.putValue("brokerTenant", stats._brokerTenant);
+      row.putValue("serverTenant", stats._serverTenant);
+      row.putValue("replicas", stats._replicas);
+      row.putValue("tableConfig", stats._tableConfig);
+      rows.add(row);
+    }
+    return SystemTableResponseUtils.buildBrokerResponse(TABLE_NAME, SCHEMA, 
projectionColumns, rows, totalRows);
+  }
+
+  private TableStats buildStats(String tableNameWithType, TableType tableType) 
{
+    TableConfig tableConfig = _tableCache.getTableConfig(tableNameWithType);
+    int segments = 0;
+    long totalDocs = 0;
+    long reportedSize = 0;
+    long estimatedSize = 0;
+    String tierValue = "";
+    String brokerTenant = "";
+    String serverTenant = "";
+    int replicas = 0;
+    if (tableConfig != null && tableConfig.getTenantConfig() != null) {
+      brokerTenant = tableConfig.getTenantConfig().getBroker();
+      serverTenant = tableConfig.getTenantConfig().getServer();
+    }
+    if (tableConfig != null && tableConfig.getValidationConfig() != null) {
+      Integer repl = tableConfig.getValidationConfig().getReplicationNumber();
+      replicas = repl != null ? repl : replicas;
+    }
+    // Use controller API only
+    TableSize sizeFromController = fetchTableSize(tableNameWithType);
+    if (sizeFromController != null) {
+      if (sizeFromController._reportedSizeInBytes >= 0) {
+        reportedSize = sizeFromController._reportedSizeInBytes;
+      }
+      if (sizeFromController._estimatedSizeInBytes >= 0) {
+        estimatedSize = sizeFromController._estimatedSizeInBytes;
+      }
+      segments = getSegmentCount(sizeFromController, tableType);
+      totalDocs = getTotalDocs(sizeFromController, tableType);
+    }
+    String status = tableConfig != null ? "ONLINE" : (segments > 0 ? "ONLINE" 
: "UNKNOWN");
+    String tableConfigJson = "";
+    if (tableConfig != null) {
+      try {
+        tableConfigJson = JsonUtils.objectToString(tableConfig);
+      } catch (Exception e) {
+        LOGGER.warn("Failed to serialize table config for {}: {}", 
tableNameWithType, e.toString());
+        tableConfigJson = tableConfig.toString();
+      }
+    }
+    return new TableStats(tableType.name(), status, segments, totalDocs, 
reportedSize, estimatedSize, tierValue,
+        tableConfigJson, brokerTenant, serverTenant, replicas);
+  }
+
+  private boolean matchesFilter(@Nullable 
org.apache.pinot.common.request.Expression filterExpression, TableStats stats,
+      String rawTableName) {
+    if (filterExpression == null) {
+      return true;
+    }
+    org.apache.pinot.common.request.Function function = 
filterExpression.getFunctionCall();
+    if (function == null) {
+      return true;
+    }
+    FilterKind filterKind = toFilterKind(function.getOperator());
+    if (filterKind == null) {
+      return true;
+    }
+    switch (filterKind) {
+      case AND:
+        for (org.apache.pinot.common.request.Expression child : 
function.getOperands()) {
+          if (!matchesFilter(child, stats, rawTableName)) {
+            return false;
+          }
+        }
+        return true;
+      case OR:
+        for (org.apache.pinot.common.request.Expression child : 
function.getOperands()) {
+          if (matchesFilter(child, stats, rawTableName)) {
+            return true;
+          }
+        }
+        return false;
+      case NOT:
+        if (function.getOperandsSize() == 0) {
+          return true;
+        }
+        return !matchesFilter(function.getOperands().get(0), stats, 
rawTableName);
+      default:
+        return matchesLeafFilter(filterKind, function.getOperands(), stats, 
rawTableName);
+    }
+  }
+
+  private boolean matchesLeafFilter(FilterKind filterKind,
+      List<org.apache.pinot.common.request.Expression> operands, TableStats 
stats, String rawTableName) {
+    String column = extractIdentifier(operands);
+    if (column == null) {
+      return true;
+    }
+    List<String> values = extractLiteralValues(operands);
+    if (values.isEmpty()) {
+      return true;
+    }
+    switch (column.toLowerCase(Locale.ROOT)) {
+      case "tablename":
+        return matchesString(values, rawTableName, filterKind);
+      case "type":
+        return matchesString(values, stats._type, filterKind);
+      case "status":
+        return matchesString(values, stats._status, filterKind);
+      case "segments":
+        return matchesNumber(values, stats._segments, filterKind);
+      case "reportedsize":
+        return matchesNumber(values, stats._reportedSizeInBytes, filterKind);
+      case "estimatedsize":
+        return matchesNumber(values, stats._estimatedSizeInBytes, filterKind);
+      default:
+        return true;
+    }
+  }
+
+  private boolean matchesString(List<String> candidates, String actual, 
FilterKind filterKind) {
+    switch (filterKind) {
+      case EQUALS:
+      case IN:
+        return candidates.stream().anyMatch(v -> v.equalsIgnoreCase(actual));
+      case NOT_EQUALS:
+        return candidates.stream().noneMatch(v -> v.equalsIgnoreCase(actual));
+      default:
+        return true;
+    }
+  }
+
+  private boolean matchesNumber(List<String> candidates, long actual, 
FilterKind filterKind) {
+    try {
+      switch (filterKind) {
+        case EQUALS:
+          return candidates.stream().anyMatch(v -> Long.parseLong(v) == 
actual);
+        case NOT_EQUALS:
+          return candidates.stream().noneMatch(v -> Long.parseLong(v) == 
actual);
+        case GREATER_THAN:
+          return candidates.stream().anyMatch(v -> actual > Long.parseLong(v));
+        case GREATER_THAN_OR_EQUAL:
+          return candidates.stream().anyMatch(v -> actual >= 
Long.parseLong(v));
+        case LESS_THAN:
+          return candidates.stream().anyMatch(v -> actual < Long.parseLong(v));
+        case LESS_THAN_OR_EQUAL:
+          return candidates.stream().anyMatch(v -> actual <= 
Long.parseLong(v));
+        case IN:
+          for (String candidate : candidates) {
+            if (actual == Long.parseLong(candidate)) {
+              return true;
+            }
+          }
+          return false;
+        case RANGE:
+        case BETWEEN:
+          if (candidates.size() >= 2) {
+            long lower = Long.parseLong(candidates.get(0));
+            long upper = Long.parseLong(candidates.get(1));
+            return actual >= lower && actual <= upper;
+          }
+          return true;
+        default:
+          return true;
+      }
+    } catch (NumberFormatException e) {
+      LOGGER.debug("Failed to parse numeric filter value {}: {}", candidates, 
e.toString());
+      return true;
+    }
+  }
+
+  private @Nullable String 
extractIdentifier(List<org.apache.pinot.common.request.Expression> operands) {
+    for (org.apache.pinot.common.request.Expression operand : operands) {
+      org.apache.pinot.common.request.Identifier identifier = 
operand.getIdentifier();
+      if (identifier != null) {
+        return identifier.getName();
+      }
+    }
+    return null;
+  }
+
+  private List<String> 
extractLiteralValues(List<org.apache.pinot.common.request.Expression> operands) 
{
+    List<String> values = new ArrayList<>();
+    for (org.apache.pinot.common.request.Expression operand : operands) {
+      org.apache.pinot.common.request.Literal literal = operand.getLiteral();
+      if (literal != null) {
+        if (literal.getSetField() == 
org.apache.pinot.common.request.Literal._Fields.NULL_VALUE) {
+          values.add("null");
+          continue;
+        }
+        try {
+          values.add(RequestUtils.getLiteralString(literal));
+        } catch (Exception e) {
+          values.add(String.valueOf(RequestUtils.getLiteralValue(literal)));
+        }
+      }
+    }
+    return values;
+  }
+
+  private @Nullable FilterKind toFilterKind(String operator) {
+    String normalized = operator.toUpperCase(Locale.ROOT);
+    switch (normalized) {
+      case "EQ":
+        return FilterKind.EQUALS;
+      case "NEQ":
+        return FilterKind.NOT_EQUALS;
+      case "GT":
+        return FilterKind.GREATER_THAN;
+      case "GTE":
+        return FilterKind.GREATER_THAN_OR_EQUAL;
+      case "LT":
+        return FilterKind.LESS_THAN;
+      case "LTE":
+        return FilterKind.LESS_THAN_OR_EQUAL;
+      default:
+        try {
+          return FilterKind.valueOf(normalized);
+        } catch (Exception e) {
+          return null;
+        }
+    }
+  }
+
+  private @Nullable TableSize fetchTableSize(String tableNameWithType) {
+    TableSize cached = getCachedSize(tableNameWithType);
+    if (cached != null) {
+      return cached;
+    }
+    Function<String, TableSize> fetcher = getSizeFetcher();
+    if (fetcher != null) {
+      try {
+        TableSize fetched = fetcher.apply(tableNameWithType);
+        if (fetched != null) {
+          cacheSize(tableNameWithType, fetched);
+          return fetched;
+        }
+      } catch (Exception e) {
+        LOGGER.warn("Table size fetcher failed for {}: {}", tableNameWithType, 
e.toString());
+      }
+    }
+    String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+    TableSize size = fetchTableSizeForName(rawTableName);
+    if (size == null) {
+      size = fetchTableSizeForName(tableNameWithType);
+      if (size == null) {
+        LOGGER.warn("{}: failed to fetch size for {} (raw: {}), returning 
null", TABLE_NAME, tableNameWithType,
+            rawTableName);
+      }
+    }
+    if (size != null) {
+      cacheSize(tableNameWithType, size);
+    }
+    return size;
+  }
+
+  private @Nullable TableSize fetchTableSizeForName(String tableName) {
+    for (String baseUrl : getControllerBaseUrls()) {
+      try {
+        String url = baseUrl + "/tables/" + tableName + 
"/size?verbose=true&includeReplacedSegments=false";
+        HttpRequest request = HttpRequest.newBuilder(URI.create(url))
+            .timeout(Duration.ofSeconds(5))
+            .GET()
+            .header("Accept", "application/json")
+            .build();
+        HttpResponse<String> response = _httpClient.send(request, 
HttpResponse.BodyHandlers.ofString());
+        if (response.statusCode() >= 200 && response.statusCode() < 300) {
+          TableSize parsed = JsonUtils.stringToObject(response.body(), 
TableSize.class);
+          LOGGER.debug("{}: controller size response for {} via {} -> segments 
offline={}, realtime={}, "
+                  + "reportedSize={}, estimatedSize={}", TABLE_NAME, 
tableName, baseUrl,
+              parsed._offlineSegments != null && 
parsed._offlineSegments._segments != null
+                  ? parsed._offlineSegments._segments.size() : 0,
+              parsed._realtimeSegments != null && 
parsed._realtimeSegments._segments != null
+                  ? parsed._realtimeSegments._segments.size() : 0,
+              parsed._reportedSizeInBytes, parsed._estimatedSizeInBytes);
+          return parsed;
+        } else {
+          LOGGER.warn("{}: failed to fetch table size for {} via {}: status 
{}, body={}", TABLE_NAME, tableName,
+              baseUrl, response.statusCode(), response.body());
+        }
+      } catch (Exception e) {
+        LOGGER.warn("{}: error fetching table size for {} via {}", TABLE_NAME, 
tableName, baseUrl, e);
+      }
+    }
+    return null;
+  }
+
+  private List<String> getControllerBaseUrls() {
+    Set<String> urls = new LinkedHashSet<>();
+    if (_helixAdmin != null) {
+      for (String controller : discoverControllersFromHelix()) {
+        String normalized = normalizeControllerUrl(controller);
+        if (normalized != null) {
+          urls.add(normalized);
+        }
+      }
+    }
+    for (String url : _staticControllerUrls) {
+      String normalized = normalizeControllerUrl(url);
+      if (normalized != null) {
+        urls.add(normalized);
+      }
+    }
+    return new ArrayList<>(urls);
+  }
+
+  private int getSegmentCount(TableSize sizeFromController, TableType 
tableType) {
+    if (tableType == TableType.OFFLINE && sizeFromController._offlineSegments 
!= null
+        && sizeFromController._offlineSegments._segments != null) {
+      return sizeFromController._offlineSegments._segments.size();
+    }
+    if (tableType == TableType.REALTIME && 
sizeFromController._realtimeSegments != null
+        && sizeFromController._realtimeSegments._segments != null) {
+      return sizeFromController._realtimeSegments._segments.size();
+    }
+    return 0;
+  }
+
+  private long getTotalDocs(TableSize sizeFromController, TableType tableType) 
{
+    if (tableType == TableType.OFFLINE && sizeFromController._offlineSegments 
!= null
+        && sizeFromController._offlineSegments._segments != null) {
+      return sizeFromController._offlineSegments._segments.values().stream()
+          .mapToLong(segmentSize -> segmentSize._totalDocs).sum();
+    }
+    if (tableType == TableType.REALTIME && 
sizeFromController._realtimeSegments != null
+        && sizeFromController._realtimeSegments._segments != null) {
+      return sizeFromController._realtimeSegments._segments.values().stream()
+          .mapToLong(segmentSize -> segmentSize._totalDocs).sum();
+    }
+    return 0;
+  }
+
+  private @Nullable Function<String, TableSize> getSizeFetcher() {
+    if (_tableSizeFetcherOverride != null) {
+      return _tableSizeFetcherOverride;
+    }
+    List<String> controllers = getControllerBaseUrls();
+    if (controllers.isEmpty() || !isAdminClientAvailable()) {
+      return null;
+    }
+    return tableNameWithType -> fetchWithAdminClient(controllers, 
tableNameWithType);
+  }
+
+  private List<String> discoverControllersFromHelix() {
+    List<String> urls = new ArrayList<>();
+    try {
+      if (_clusterName == null) {
+        LOGGER.warn("Cannot discover controllers without cluster name");
+        return List.of();
+      }
+      for (String controllerId : 
_helixAdmin.getInstancesInCluster(_clusterName)) {
+        if (!InstanceTypeUtils.isController(controllerId)) {
+          continue;
+        }
+        int firstUnderscore = controllerId.indexOf('_');
+        int lastUnderscore = controllerId.lastIndexOf('_');
+        if (firstUnderscore > 0 && lastUnderscore > firstUnderscore && 
lastUnderscore < controllerId.length() - 1) {
+          String host = controllerId.substring(firstUnderscore + 1, 
lastUnderscore);
+          String port = controllerId.substring(lastUnderscore + 1);
+          if (!host.isEmpty() && !port.isEmpty()) {
+            urls.add(host + ":" + port);
+          } else {
+            LOGGER.warn("Unable to parse controller address from instance id 
(empty host/port): {}", controllerId);

Review Comment:
   The error message is unclear—'empty host/port' suggests both are missing, 
but the condition checks if *either* is empty. Reword to 'empty host or port' 
for accuracy.
   ```suggestion
               LOGGER.warn("Unable to parse controller address from instance id 
(empty host or port): {}", controllerId);
   ```



##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java:
##########
@@ -106,7 +113,25 @@ public BrokerResponse handleRequest(JsonNode request, 
@Nullable SqlNodeAndOption
       }
     }
 
+    Set<String> tableNames = null;
+    if (_systemTableBrokerRequestHandler != null) {
+      try {
+        // NOTE: compileToPinotQuery(SqlNodeAndOptions) mutates the SqlNode 
(e.g. SqlOrderBy -> SqlSelect).
+        // Re-parse from raw SQL so the SqlNodeAndOptions passed to the 
handler is unchanged (important for MSE).
+        JsonNode sql = request.get(Request.SQL);
+        String sqlQuery =
+            (sql != null && sql.isTextual()) ? sql.asText() : 
sqlNodeAndOptions.getSqlNode().toString();
+        tableNames = 
RequestUtils.getTableNames(CalciteSqlParser.compileToPinotQuery(sqlQuery));
+      } catch (Exception e) {
+        // Ignore compilation exceptions here; the selected request handler 
will surface them appropriately.

Review Comment:
   The comment claims the handler will surface errors, but swallowing 
exceptions without logging may hide issues during debugging. Consider logging 
at debug level or explain in the comment why silent failure is acceptable.



##########
pinot-common/src/main/java/org/apache/pinot/common/systemtable/provider/TablesSystemTableProvider.java:
##########
@@ -0,0 +1,726 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.systemtable.provider;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.helix.HelixAdmin;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.systemtable.SystemTableDataProvider;
+import org.apache.pinot.common.systemtable.SystemTableResponseUtils;
+import org.apache.pinot.common.utils.request.RequestUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.systemtable.SystemTable;
+import org.apache.pinot.spi.utils.InstanceTypeUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.sql.FilterKind;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Basic system table exposing table-level metadata populated from the broker 
{@link TableCache}.
+ */
+@SystemTable
+public final class TablesSystemTableProvider implements 
SystemTableDataProvider {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TablesSystemTableProvider.class);
+  public static final String TABLE_NAME = "system.tables";
+  private static final long SIZE_CACHE_TTL_MS = 
Duration.ofMinutes(1).toMillis();
+
+  private static final Schema SCHEMA = new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+      .addSingleValueDimension("tableName", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("type", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("status", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("segments", FieldSpec.DataType.INT)
+      .addSingleValueDimension("totalDocs", FieldSpec.DataType.LONG)
+      .addMetric("reportedSize", FieldSpec.DataType.LONG)
+      .addMetric("estimatedSize", FieldSpec.DataType.LONG)
+      .addSingleValueDimension("storageTier", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("brokerTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("serverTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("replicas", FieldSpec.DataType.INT)
+      .addSingleValueDimension("tableConfig", FieldSpec.DataType.STRING)
+      .build();
+
+  private static final boolean IS_ADMIN_CLIENT_AVAILABLE;
+  static {
+    boolean available;
+    try {
+      Class.forName("org.apache.pinot.client.admin.PinotAdminClient");
+      available = true;
+    } catch (ClassNotFoundException e) {
+      available = false;
+    }
+    IS_ADMIN_CLIENT_AVAILABLE = available;
+  }
+
+  private final TableCache _tableCache;
+  private final @Nullable HelixAdmin _helixAdmin;
+  private final @Nullable String _clusterName;
+  private final HttpClient _httpClient;
+  private final @Nullable Function<String, TableSize> 
_tableSizeFetcherOverride;
+  private final List<String> _staticControllerUrls;
+  private final Map<String, CachedSize> _sizeCache = new ConcurrentHashMap<>();
+
+  public TablesSystemTableProvider() {
+    this(null, null, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache) {
+    this(tableCache, null, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin) {
+    this(tableCache, helixAdmin, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin,
+      @Nullable String clusterName) {
+    this(tableCache, helixAdmin, clusterName, null, null);
+  }
+
+  TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin, @Nullable String clusterName,
+      @Nullable Function<String, TableSize> tableSizeFetcherOverride, 
@Nullable List<String> controllerUrls) {
+    _tableCache = tableCache;
+    _helixAdmin = helixAdmin;
+    _clusterName = clusterName;
+    _httpClient = 
HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(5)).build();

Review Comment:
   The 5-second connect timeout is a magic number and could vary based on 
deployment. Extract it as a named constant or configuration property to improve 
maintainability.



##########
pinot-common/src/main/java/org/apache/pinot/common/systemtable/provider/TablesSystemTableProvider.java:
##########
@@ -0,0 +1,726 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.systemtable.provider;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.helix.HelixAdmin;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.systemtable.SystemTableDataProvider;
+import org.apache.pinot.common.systemtable.SystemTableResponseUtils;
+import org.apache.pinot.common.utils.request.RequestUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.systemtable.SystemTable;
+import org.apache.pinot.spi.utils.InstanceTypeUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.sql.FilterKind;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Basic system table exposing table-level metadata populated from the broker 
{@link TableCache}.
+ */
+@SystemTable
+public final class TablesSystemTableProvider implements 
SystemTableDataProvider {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TablesSystemTableProvider.class);
+  public static final String TABLE_NAME = "system.tables";
+  private static final long SIZE_CACHE_TTL_MS = 
Duration.ofMinutes(1).toMillis();

Review Comment:
   The 1-minute cache TTL may not suit all deployment scenarios. Consider 
making this configurable via a system property or constructor parameter for 
flexibility.
   ```suggestion
     private static final String SIZE_CACHE_TTL_MS_PROPERTY = 
"pinot.system.tables.sizeCacheTtlMs";
     private static final long DEFAULT_SIZE_CACHE_TTL_MS = 
Duration.ofMinutes(1).toMillis();
     private static final long SIZE_CACHE_TTL_MS =
         Long.getLong(SIZE_CACHE_TTL_MS_PROPERTY, DEFAULT_SIZE_CACHE_TTL_MS);
   ```



##########
pinot-common/src/main/java/org/apache/pinot/common/systemtable/provider/TablesSystemTableProvider.java:
##########
@@ -0,0 +1,726 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.systemtable.provider;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.helix.HelixAdmin;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.systemtable.SystemTableDataProvider;
+import org.apache.pinot.common.systemtable.SystemTableResponseUtils;
+import org.apache.pinot.common.utils.request.RequestUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.systemtable.SystemTable;
+import org.apache.pinot.spi.utils.InstanceTypeUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.sql.FilterKind;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Basic system table exposing table-level metadata populated from the broker 
{@link TableCache}.
+ */
+@SystemTable
+public final class TablesSystemTableProvider implements 
SystemTableDataProvider {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TablesSystemTableProvider.class);
+  public static final String TABLE_NAME = "system.tables";
+  private static final long SIZE_CACHE_TTL_MS = 
Duration.ofMinutes(1).toMillis();
+
+  private static final Schema SCHEMA = new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+      .addSingleValueDimension("tableName", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("type", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("status", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("segments", FieldSpec.DataType.INT)
+      .addSingleValueDimension("totalDocs", FieldSpec.DataType.LONG)
+      .addMetric("reportedSize", FieldSpec.DataType.LONG)
+      .addMetric("estimatedSize", FieldSpec.DataType.LONG)
+      .addSingleValueDimension("storageTier", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("brokerTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("serverTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("replicas", FieldSpec.DataType.INT)
+      .addSingleValueDimension("tableConfig", FieldSpec.DataType.STRING)
+      .build();
+
+  private static final boolean IS_ADMIN_CLIENT_AVAILABLE;
+  static {
+    boolean available;
+    try {
+      Class.forName("org.apache.pinot.client.admin.PinotAdminClient");

Review Comment:
   The hardcoded class name 'org.apache.pinot.client.admin.PinotAdminClient' 
appears both in the static initializer and in fetchWithAdminClient (line 586). 
Define it as a constant to avoid duplication and ease future refactoring.



##########
pinot-common/src/main/java/org/apache/pinot/common/systemtable/provider/TablesSystemTableProvider.java:
##########
@@ -0,0 +1,726 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.systemtable.provider;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.helix.HelixAdmin;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.systemtable.SystemTableDataProvider;
+import org.apache.pinot.common.systemtable.SystemTableResponseUtils;
+import org.apache.pinot.common.utils.request.RequestUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.systemtable.SystemTable;
+import org.apache.pinot.spi.utils.InstanceTypeUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.sql.FilterKind;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Basic system table exposing table-level metadata populated from the broker 
{@link TableCache}.
+ */
+@SystemTable
+public final class TablesSystemTableProvider implements 
SystemTableDataProvider {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TablesSystemTableProvider.class);
+  public static final String TABLE_NAME = "system.tables";
+  private static final long SIZE_CACHE_TTL_MS = 
Duration.ofMinutes(1).toMillis();
+
+  private static final Schema SCHEMA = new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+      .addSingleValueDimension("tableName", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("type", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("status", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("segments", FieldSpec.DataType.INT)
+      .addSingleValueDimension("totalDocs", FieldSpec.DataType.LONG)
+      .addMetric("reportedSize", FieldSpec.DataType.LONG)
+      .addMetric("estimatedSize", FieldSpec.DataType.LONG)
+      .addSingleValueDimension("storageTier", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("brokerTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("serverTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("replicas", FieldSpec.DataType.INT)
+      .addSingleValueDimension("tableConfig", FieldSpec.DataType.STRING)
+      .build();
+
+  private static final boolean IS_ADMIN_CLIENT_AVAILABLE;
+  static {
+    boolean available;
+    try {
+      Class.forName("org.apache.pinot.client.admin.PinotAdminClient");
+      available = true;
+    } catch (ClassNotFoundException e) {
+      available = false;
+    }
+    IS_ADMIN_CLIENT_AVAILABLE = available;
+  }
+
+  private final TableCache _tableCache;
+  private final @Nullable HelixAdmin _helixAdmin;
+  private final @Nullable String _clusterName;
+  private final HttpClient _httpClient;
+  private final @Nullable Function<String, TableSize> 
_tableSizeFetcherOverride;
+  private final List<String> _staticControllerUrls;
+  private final Map<String, CachedSize> _sizeCache = new ConcurrentHashMap<>();
+
+  public TablesSystemTableProvider() {
+    this(null, null, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache) {
+    this(tableCache, null, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin) {
+    this(tableCache, helixAdmin, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin,
+      @Nullable String clusterName) {
+    this(tableCache, helixAdmin, clusterName, null, null);
+  }
+
+  TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin, @Nullable String clusterName,
+      @Nullable Function<String, TableSize> tableSizeFetcherOverride, 
@Nullable List<String> controllerUrls) {
+    _tableCache = tableCache;
+    _helixAdmin = helixAdmin;
+    _clusterName = clusterName;
+    _httpClient = 
HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(5)).build();
+    _tableSizeFetcherOverride = tableSizeFetcherOverride;
+    _staticControllerUrls = controllerUrls != null ? new 
ArrayList<>(controllerUrls) : List.of();
+  }
+
+  @Override
+  public String getTableName() {
+    return TABLE_NAME;
+  }
+
+  @Override
+  public Schema getSchema() {
+    return SCHEMA;
+  }
+
+  @Override
+  public TableConfig getTableConfig() {
+    return new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+  }
+
+  @Override
+  public BrokerResponseNative getBrokerResponse(PinotQuery pinotQuery) {
+    List<String> projectionColumns = pinotQuery.getSelectList() != null
+        ? pinotQuery.getSelectList().stream().map(expr -> 
expr.getIdentifier().getName()).collect(Collectors.toList())
+        : List.of();
+    if (_tableCache == null) {
+      return SystemTableResponseUtils.buildBrokerResponse(TABLE_NAME, SCHEMA, 
projectionColumns, List.of(), 0);
+    }
+    Set<String> tableNamesWithType = new LinkedHashSet<>();
+    for (String tableName : _tableCache.getTableNameMap().values()) {
+      if (TableNameBuilder.getTableTypeFromTableName(tableName) != null) {
+        tableNamesWithType.add(tableName);
+      }
+    }
+    List<String> sortedTableNames = new ArrayList<>(tableNamesWithType);
+    sortedTableNames.sort(Comparator.naturalOrder());
+
+    int offset = Math.max(0, pinotQuery.getOffset());
+    int limit = pinotQuery.getLimit();
+    boolean hasLimit = limit > 0;
+    int totalRows = 0;
+    int initialCapacity = hasLimit ? Math.min(sortedTableNames.size(), limit) 
: 0;
+    List<GenericRow> rows = new ArrayList<>(initialCapacity);
+    for (String tableNameWithType : sortedTableNames) {
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType == null) {
+        continue;
+      }
+      String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+      TableStats stats = buildStats(tableNameWithType, tableType);
+      if (!matchesFilter(pinotQuery.getFilterExpression(), stats, 
rawTableName)) {
+        continue;
+      }
+      if (offset > 0) {
+        offset--;
+        totalRows++;
+        continue;
+      }
+      totalRows++;
+      if (limit == 0) {
+        continue;
+      }
+      if (hasLimit && rows.size() >= limit) {
+        continue;
+      }
+      GenericRow row = new GenericRow();
+      row.putValue("tableName", rawTableName);
+      row.putValue("type", stats._type);
+      row.putValue("status", stats._status);
+      row.putValue("segments", stats._segments);
+      row.putValue("totalDocs", stats._totalDocs);
+      row.putValue("reportedSize", stats._reportedSizeInBytes);
+      row.putValue("estimatedSize", stats._estimatedSizeInBytes);
+      row.putValue("storageTier", stats._storageTier);
+      row.putValue("brokerTenant", stats._brokerTenant);
+      row.putValue("serverTenant", stats._serverTenant);
+      row.putValue("replicas", stats._replicas);
+      row.putValue("tableConfig", stats._tableConfig);
+      rows.add(row);
+    }
+    return SystemTableResponseUtils.buildBrokerResponse(TABLE_NAME, SCHEMA, 
projectionColumns, rows, totalRows);
+  }
+
+  private TableStats buildStats(String tableNameWithType, TableType tableType) 
{
+    TableConfig tableConfig = _tableCache.getTableConfig(tableNameWithType);
+    int segments = 0;
+    long totalDocs = 0;
+    long reportedSize = 0;
+    long estimatedSize = 0;
+    String tierValue = "";
+    String brokerTenant = "";
+    String serverTenant = "";
+    int replicas = 0;
+    if (tableConfig != null && tableConfig.getTenantConfig() != null) {
+      brokerTenant = tableConfig.getTenantConfig().getBroker();
+      serverTenant = tableConfig.getTenantConfig().getServer();
+    }
+    if (tableConfig != null && tableConfig.getValidationConfig() != null) {
+      Integer repl = tableConfig.getValidationConfig().getReplicationNumber();
+      replicas = repl != null ? repl : replicas;
+    }
+    // Use controller API only
+    TableSize sizeFromController = fetchTableSize(tableNameWithType);
+    if (sizeFromController != null) {
+      if (sizeFromController._reportedSizeInBytes >= 0) {
+        reportedSize = sizeFromController._reportedSizeInBytes;
+      }
+      if (sizeFromController._estimatedSizeInBytes >= 0) {
+        estimatedSize = sizeFromController._estimatedSizeInBytes;
+      }
+      segments = getSegmentCount(sizeFromController, tableType);
+      totalDocs = getTotalDocs(sizeFromController, tableType);
+    }
+    String status = tableConfig != null ? "ONLINE" : (segments > 0 ? "ONLINE" 
: "UNKNOWN");
+    String tableConfigJson = "";
+    if (tableConfig != null) {
+      try {
+        tableConfigJson = JsonUtils.objectToString(tableConfig);
+      } catch (Exception e) {
+        LOGGER.warn("Failed to serialize table config for {}: {}", 
tableNameWithType, e.toString());
+        tableConfigJson = tableConfig.toString();
+      }
+    }
+    return new TableStats(tableType.name(), status, segments, totalDocs, 
reportedSize, estimatedSize, tierValue,
+        tableConfigJson, brokerTenant, serverTenant, replicas);
+  }
+
+  private boolean matchesFilter(@Nullable 
org.apache.pinot.common.request.Expression filterExpression, TableStats stats,
+      String rawTableName) {
+    if (filterExpression == null) {
+      return true;
+    }
+    org.apache.pinot.common.request.Function function = 
filterExpression.getFunctionCall();
+    if (function == null) {
+      return true;
+    }
+    FilterKind filterKind = toFilterKind(function.getOperator());
+    if (filterKind == null) {
+      return true;
+    }
+    switch (filterKind) {
+      case AND:
+        for (org.apache.pinot.common.request.Expression child : 
function.getOperands()) {
+          if (!matchesFilter(child, stats, rawTableName)) {
+            return false;
+          }
+        }
+        return true;
+      case OR:
+        for (org.apache.pinot.common.request.Expression child : 
function.getOperands()) {
+          if (matchesFilter(child, stats, rawTableName)) {
+            return true;
+          }
+        }
+        return false;
+      case NOT:
+        if (function.getOperandsSize() == 0) {
+          return true;
+        }
+        return !matchesFilter(function.getOperands().get(0), stats, 
rawTableName);
+      default:
+        return matchesLeafFilter(filterKind, function.getOperands(), stats, 
rawTableName);
+    }
+  }
+
+  private boolean matchesLeafFilter(FilterKind filterKind,
+      List<org.apache.pinot.common.request.Expression> operands, TableStats 
stats, String rawTableName) {
+    String column = extractIdentifier(operands);
+    if (column == null) {
+      return true;
+    }
+    List<String> values = extractLiteralValues(operands);
+    if (values.isEmpty()) {
+      return true;
+    }
+    switch (column.toLowerCase(Locale.ROOT)) {
+      case "tablename":
+        return matchesString(values, rawTableName, filterKind);
+      case "type":
+        return matchesString(values, stats._type, filterKind);
+      case "status":
+        return matchesString(values, stats._status, filterKind);
+      case "segments":
+        return matchesNumber(values, stats._segments, filterKind);
+      case "reportedsize":
+        return matchesNumber(values, stats._reportedSizeInBytes, filterKind);
+      case "estimatedsize":
+        return matchesNumber(values, stats._estimatedSizeInBytes, filterKind);
+      default:
+        return true;
+    }
+  }
+
+  private boolean matchesString(List<String> candidates, String actual, 
FilterKind filterKind) {
+    switch (filterKind) {
+      case EQUALS:
+      case IN:
+        return candidates.stream().anyMatch(v -> v.equalsIgnoreCase(actual));
+      case NOT_EQUALS:
+        return candidates.stream().noneMatch(v -> v.equalsIgnoreCase(actual));
+      default:
+        return true;
+    }
+  }
+
+  private boolean matchesNumber(List<String> candidates, long actual, 
FilterKind filterKind) {
+    try {
+      switch (filterKind) {
+        case EQUALS:
+          return candidates.stream().anyMatch(v -> Long.parseLong(v) == 
actual);
+        case NOT_EQUALS:
+          return candidates.stream().noneMatch(v -> Long.parseLong(v) == 
actual);
+        case GREATER_THAN:
+          return candidates.stream().anyMatch(v -> actual > Long.parseLong(v));
+        case GREATER_THAN_OR_EQUAL:
+          return candidates.stream().anyMatch(v -> actual >= 
Long.parseLong(v));
+        case LESS_THAN:
+          return candidates.stream().anyMatch(v -> actual < Long.parseLong(v));
+        case LESS_THAN_OR_EQUAL:
+          return candidates.stream().anyMatch(v -> actual <= 
Long.parseLong(v));
+        case IN:
+          for (String candidate : candidates) {
+            if (actual == Long.parseLong(candidate)) {
+              return true;
+            }
+          }
+          return false;
+        case RANGE:
+        case BETWEEN:
+          if (candidates.size() >= 2) {
+            long lower = Long.parseLong(candidates.get(0));
+            long upper = Long.parseLong(candidates.get(1));
+            return actual >= lower && actual <= upper;
+          }
+          return true;
+        default:
+          return true;
+      }
+    } catch (NumberFormatException e) {
+      LOGGER.debug("Failed to parse numeric filter value {}: {}", candidates, 
e.toString());
+      return true;
+    }
+  }
+
+  private @Nullable String 
extractIdentifier(List<org.apache.pinot.common.request.Expression> operands) {
+    for (org.apache.pinot.common.request.Expression operand : operands) {
+      org.apache.pinot.common.request.Identifier identifier = 
operand.getIdentifier();
+      if (identifier != null) {
+        return identifier.getName();
+      }
+    }
+    return null;
+  }
+
+  private List<String> 
extractLiteralValues(List<org.apache.pinot.common.request.Expression> operands) 
{
+    List<String> values = new ArrayList<>();
+    for (org.apache.pinot.common.request.Expression operand : operands) {
+      org.apache.pinot.common.request.Literal literal = operand.getLiteral();
+      if (literal != null) {
+        if (literal.getSetField() == 
org.apache.pinot.common.request.Literal._Fields.NULL_VALUE) {
+          values.add("null");
+          continue;
+        }
+        try {
+          values.add(RequestUtils.getLiteralString(literal));
+        } catch (Exception e) {
+          values.add(String.valueOf(RequestUtils.getLiteralValue(literal)));
+        }
+      }
+    }
+    return values;
+  }
+
+  private @Nullable FilterKind toFilterKind(String operator) {
+    String normalized = operator.toUpperCase(Locale.ROOT);
+    switch (normalized) {
+      case "EQ":
+        return FilterKind.EQUALS;
+      case "NEQ":
+        return FilterKind.NOT_EQUALS;
+      case "GT":
+        return FilterKind.GREATER_THAN;
+      case "GTE":
+        return FilterKind.GREATER_THAN_OR_EQUAL;
+      case "LT":
+        return FilterKind.LESS_THAN;
+      case "LTE":
+        return FilterKind.LESS_THAN_OR_EQUAL;
+      default:
+        try {
+          return FilterKind.valueOf(normalized);
+        } catch (Exception e) {
+          return null;
+        }
+    }
+  }
+
+  private @Nullable TableSize fetchTableSize(String tableNameWithType) {
+    TableSize cached = getCachedSize(tableNameWithType);
+    if (cached != null) {
+      return cached;
+    }
+    Function<String, TableSize> fetcher = getSizeFetcher();
+    if (fetcher != null) {
+      try {
+        TableSize fetched = fetcher.apply(tableNameWithType);
+        if (fetched != null) {
+          cacheSize(tableNameWithType, fetched);
+          return fetched;
+        }
+      } catch (Exception e) {
+        LOGGER.warn("Table size fetcher failed for {}: {}", tableNameWithType, 
e.toString());
+      }
+    }
+    String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+    TableSize size = fetchTableSizeForName(rawTableName);
+    if (size == null) {
+      size = fetchTableSizeForName(tableNameWithType);
+      if (size == null) {
+        LOGGER.warn("{}: failed to fetch size for {} (raw: {}), returning 
null", TABLE_NAME, tableNameWithType,
+            rawTableName);

Review Comment:
   The message states 'returning null' but does not explain *why* both raw and 
type-suffixed names failed or what the user can do. Consider adding context 
about potential causes (e.g., table not found, controller unreachable).
   ```suggestion
           LOGGER.warn(
               "{}: failed to fetch table size for {} (raw: {}); no successful 
response from any controller. This may "
                   + "indicate that the table does not exist or that the 
controller is unreachable; returning null size",
               TABLE_NAME, tableNameWithType, rawTableName);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to