Copilot commented on code in PR #17291:
URL: https://github.com/apache/pinot/pull/17291#discussion_r2617586921


##########
pinot-common/src/main/java/org/apache/pinot/common/systemtable/provider/TablesSystemTableProvider.java:
##########
@@ -0,0 +1,597 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.systemtable.provider;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.apache.helix.HelixAdmin;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.systemtable.SystemTableFilter;
+import org.apache.pinot.spi.systemtable.SystemTableProvider;
+import org.apache.pinot.spi.systemtable.SystemTableRequest;
+import org.apache.pinot.spi.systemtable.SystemTableResponse;
+import org.apache.pinot.spi.utils.InstanceTypeUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Basic system table exposing table-level metadata populated from the broker 
{@link TableCache}.
+ */
+public final class TablesSystemTableProvider implements SystemTableProvider {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TablesSystemTableProvider.class);
+
+  private static final Schema SCHEMA = new 
Schema.SchemaBuilder().setSchemaName("system.tables")
+      .addSingleValueDimension("tableName", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("type", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("status", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("segments", FieldSpec.DataType.INT)
+      .addSingleValueDimension("totalDocs", FieldSpec.DataType.LONG)
+      .addMetric("reportedSize", FieldSpec.DataType.LONG)
+      .addMetric("estimatedSize", FieldSpec.DataType.LONG)
+      .addSingleValueDimension("storageTier", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("brokerTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("serverTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("replicas", FieldSpec.DataType.INT)
+      .addSingleValueDimension("tableConfig", FieldSpec.DataType.STRING)
+      .build();
+
+  private static final boolean IS_ADMIN_CLIENT_AVAILABLE;
+  static {
+    boolean available;
+    try {
+      Class.forName("org.apache.pinot.client.admin.PinotAdminClient");
+      available = true;
+    } catch (ClassNotFoundException e) {
+      available = false;
+    }
+    IS_ADMIN_CLIENT_AVAILABLE = available;

Review Comment:
   The class name check uses reflection to verify PinotAdminClient 
availability, but this is fragile and could break if the class is relocated or 
renamed. Consider using a more robust mechanism such as checking for the 
presence of a specific method or using a feature flag/configuration property 
instead of relying on ClassNotFoundException.
   ```suggestion
       String adminClientEnabled = 
System.getProperty("pinot.admin.client.enabled");
       if (adminClientEnabled == null) {
         LOGGER.warn("System property 'pinot.admin.client.enabled' is not set. 
Defaulting PinotAdminClient availability to false.");
         IS_ADMIN_CLIENT_AVAILABLE = false;
       } else {
         IS_ADMIN_CLIENT_AVAILABLE = Boolean.parseBoolean(adminClientEnabled);
       }
   ```



##########
pinot-common/src/main/java/org/apache/pinot/common/systemtable/provider/TablesSystemTableProvider.java:
##########
@@ -0,0 +1,597 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.systemtable.provider;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.apache.helix.HelixAdmin;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.systemtable.SystemTableFilter;
+import org.apache.pinot.spi.systemtable.SystemTableProvider;
+import org.apache.pinot.spi.systemtable.SystemTableRequest;
+import org.apache.pinot.spi.systemtable.SystemTableResponse;
+import org.apache.pinot.spi.utils.InstanceTypeUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Basic system table exposing table-level metadata populated from the broker 
{@link TableCache}.
+ */
+public final class TablesSystemTableProvider implements SystemTableProvider {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TablesSystemTableProvider.class);
+
+  private static final Schema SCHEMA = new 
Schema.SchemaBuilder().setSchemaName("system.tables")
+      .addSingleValueDimension("tableName", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("type", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("status", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("segments", FieldSpec.DataType.INT)
+      .addSingleValueDimension("totalDocs", FieldSpec.DataType.LONG)
+      .addMetric("reportedSize", FieldSpec.DataType.LONG)
+      .addMetric("estimatedSize", FieldSpec.DataType.LONG)
+      .addSingleValueDimension("storageTier", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("brokerTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("serverTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("replicas", FieldSpec.DataType.INT)
+      .addSingleValueDimension("tableConfig", FieldSpec.DataType.STRING)
+      .build();
+
+  private static final boolean IS_ADMIN_CLIENT_AVAILABLE;
+  static {
+    boolean available;
+    try {
+      Class.forName("org.apache.pinot.client.admin.PinotAdminClient");
+      available = true;
+    } catch (ClassNotFoundException e) {
+      available = false;
+    }
+    IS_ADMIN_CLIENT_AVAILABLE = available;
+  }
+
+  private final TableCache _tableCache;
+  private final @Nullable HelixAdmin _helixAdmin;
+  private final @Nullable String _clusterName;
+  private final HttpClient _httpClient;
+  private final @Nullable Function<String, TableSize> 
_tableSizeFetcherOverride;
+  private final List<String> _staticControllerUrls;
+
+  public TablesSystemTableProvider() {
+    this(null, null, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache) {
+    this(tableCache, null, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin) {
+    this(tableCache, helixAdmin, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin,
+      @Nullable String clusterName) {
+    this(tableCache, helixAdmin, clusterName, null, null);
+  }
+
+  TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin, @Nullable String clusterName,
+      @Nullable Function<String, TableSize> tableSizeFetcherOverride, 
@Nullable List<String> controllerUrls) {
+    _tableCache = tableCache;
+    _helixAdmin = helixAdmin;
+    _clusterName = clusterName;
+    _httpClient = 
HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(5)).build();
+    _tableSizeFetcherOverride = tableSizeFetcherOverride;
+    _staticControllerUrls = controllerUrls != null ? new 
ArrayList<>(controllerUrls) : List.of();
+  }
+
+  @Override
+  public String getTableName() {
+    return "system.tables";
+  }
+
+  @Override
+  public Schema getSchema() {
+    return SCHEMA;
+  }
+
+  @Override
+  public SystemTableResponse getRows(SystemTableRequest request) {
+    if (_tableCache == null) {
+      return new SystemTableResponse(List.of(), System.currentTimeMillis(), 0);
+    }
+    Set<String> tableNamesWithType = new LinkedHashSet<>();
+    for (String tableName : _tableCache.getTableNameMap().values()) {
+      if (TableNameBuilder.getTableTypeFromTableName(tableName) != null) {
+        tableNamesWithType.add(tableName);
+      }
+    }
+    List<String> sortedTableNames = new ArrayList<>(tableNamesWithType);
+    sortedTableNames.sort(Comparator.naturalOrder());
+
+    int offset = Math.max(0, request.getOffset());
+    int limit = request.getLimit();
+    boolean hasLimit = limit > 0;
+    int totalRows = 0;
+    int initialCapacity = hasLimit ? Math.min(sortedTableNames.size(), limit) 
: 0;

Review Comment:
   The initial capacity is set to 0 when there's no limit, which will cause the 
ArrayList to resize multiple times as elements are added. This is inefficient 
for large table lists. Set initialCapacity to sortedTableNames.size() when 
hasLimit is false to avoid unnecessary resizing.
   ```suggestion
       int initialCapacity = hasLimit ? Math.min(sortedTableNames.size(), 
limit) : sortedTableNames.size();
   ```



##########
pinot-common/src/main/java/org/apache/pinot/common/systemtable/provider/TablesSystemTableProvider.java:
##########
@@ -0,0 +1,597 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.systemtable.provider;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.apache.helix.HelixAdmin;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.systemtable.SystemTableFilter;
+import org.apache.pinot.spi.systemtable.SystemTableProvider;
+import org.apache.pinot.spi.systemtable.SystemTableRequest;
+import org.apache.pinot.spi.systemtable.SystemTableResponse;
+import org.apache.pinot.spi.utils.InstanceTypeUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Basic system table exposing table-level metadata populated from the broker 
{@link TableCache}.
+ */
+public final class TablesSystemTableProvider implements SystemTableProvider {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TablesSystemTableProvider.class);
+
+  private static final Schema SCHEMA = new 
Schema.SchemaBuilder().setSchemaName("system.tables")
+      .addSingleValueDimension("tableName", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("type", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("status", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("segments", FieldSpec.DataType.INT)
+      .addSingleValueDimension("totalDocs", FieldSpec.DataType.LONG)
+      .addMetric("reportedSize", FieldSpec.DataType.LONG)
+      .addMetric("estimatedSize", FieldSpec.DataType.LONG)
+      .addSingleValueDimension("storageTier", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("brokerTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("serverTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("replicas", FieldSpec.DataType.INT)
+      .addSingleValueDimension("tableConfig", FieldSpec.DataType.STRING)
+      .build();
+
+  private static final boolean IS_ADMIN_CLIENT_AVAILABLE;
+  static {
+    boolean available;
+    try {
+      Class.forName("org.apache.pinot.client.admin.PinotAdminClient");
+      available = true;
+    } catch (ClassNotFoundException e) {
+      available = false;
+    }
+    IS_ADMIN_CLIENT_AVAILABLE = available;
+  }
+
+  private final TableCache _tableCache;
+  private final @Nullable HelixAdmin _helixAdmin;
+  private final @Nullable String _clusterName;
+  private final HttpClient _httpClient;
+  private final @Nullable Function<String, TableSize> 
_tableSizeFetcherOverride;
+  private final List<String> _staticControllerUrls;
+
+  public TablesSystemTableProvider() {
+    this(null, null, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache) {
+    this(tableCache, null, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin) {
+    this(tableCache, helixAdmin, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin,
+      @Nullable String clusterName) {
+    this(tableCache, helixAdmin, clusterName, null, null);
+  }
+
+  TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin, @Nullable String clusterName,
+      @Nullable Function<String, TableSize> tableSizeFetcherOverride, 
@Nullable List<String> controllerUrls) {
+    _tableCache = tableCache;
+    _helixAdmin = helixAdmin;
+    _clusterName = clusterName;
+    _httpClient = 
HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(5)).build();
+    _tableSizeFetcherOverride = tableSizeFetcherOverride;
+    _staticControllerUrls = controllerUrls != null ? new 
ArrayList<>(controllerUrls) : List.of();
+  }
+
+  @Override
+  public String getTableName() {
+    return "system.tables";
+  }
+
+  @Override
+  public Schema getSchema() {
+    return SCHEMA;
+  }
+
+  @Override
+  public SystemTableResponse getRows(SystemTableRequest request) {
+    if (_tableCache == null) {
+      return new SystemTableResponse(List.of(), System.currentTimeMillis(), 0);
+    }
+    Set<String> tableNamesWithType = new LinkedHashSet<>();
+    for (String tableName : _tableCache.getTableNameMap().values()) {
+      if (TableNameBuilder.getTableTypeFromTableName(tableName) != null) {
+        tableNamesWithType.add(tableName);
+      }
+    }
+    List<String> sortedTableNames = new ArrayList<>(tableNamesWithType);
+    sortedTableNames.sort(Comparator.naturalOrder());
+
+    int offset = Math.max(0, request.getOffset());
+    int limit = request.getLimit();
+    boolean hasLimit = limit > 0;
+    int totalRows = 0;
+    int initialCapacity = hasLimit ? Math.min(sortedTableNames.size(), limit) 
: 0;
+    List<GenericRow> rows = new ArrayList<>(initialCapacity);
+    for (String tableNameWithType : sortedTableNames) {
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType == null) {
+        continue;
+      }
+      String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+      TableStats stats = buildStats(tableNameWithType, tableType);
+      if (!matchesFilter(request.getFilter(), stats, rawTableName)) {
+        continue;
+      }
+      totalRows++;
+      if (offset > 0) {
+        offset--;
+        continue;
+      }
+      if (limit == 0) {
+        continue;
+      }
+      if (hasLimit && rows.size() >= limit) {
+        continue;
+      }
+      GenericRow row = new GenericRow();
+      row.putValue("tableName", rawTableName);
+      row.putValue("type", stats._type);
+      row.putValue("status", stats._status);
+      row.putValue("segments", stats._segments);
+      row.putValue("totalDocs", stats._totalDocs);
+      row.putValue("reportedSize", stats._reportedSizeInBytes);
+      row.putValue("estimatedSize", stats._estimatedSizeInBytes);
+      row.putValue("storageTier", stats._storageTier);
+      row.putValue("brokerTenant", stats._brokerTenant);
+      row.putValue("serverTenant", stats._serverTenant);
+      row.putValue("replicas", stats._replicas);
+      row.putValue("tableConfig", stats._tableConfig);
+      rows.add(row);
+    }
+    return new SystemTableResponse(rows, System.currentTimeMillis(), 
totalRows);
+  }
+
+  private TableStats buildStats(String tableNameWithType, TableType tableType) 
{
+    TableConfig tableConfig = _tableCache.getTableConfig(tableNameWithType);
+    int segments = 0;
+    long totalDocs = 0;
+    long reportedSize = 0;
+    long estimatedSize = 0;
+    String tierValue = "";
+    String brokerTenant = "";
+    String serverTenant = "";
+    int replicas = 0;
+    if (tableConfig != null && tableConfig.getTenantConfig() != null) {
+      brokerTenant = tableConfig.getTenantConfig().getBroker();
+      serverTenant = tableConfig.getTenantConfig().getServer();
+    }
+    if (tableConfig != null && tableConfig.getValidationConfig() != null) {
+      Integer repl = tableConfig.getValidationConfig().getReplicationNumber();
+      replicas = repl != null ? repl : replicas;
+    }
+    // Use controller API only
+    TableSize sizeFromController = fetchTableSize(tableNameWithType);
+    if (sizeFromController != null) {
+      if (sizeFromController._reportedSizeInBytes >= 0) {
+        reportedSize = sizeFromController._reportedSizeInBytes;
+      }
+      if (sizeFromController._estimatedSizeInBytes >= 0) {
+        estimatedSize = sizeFromController._estimatedSizeInBytes;
+      }
+      segments = getSegmentCount(sizeFromController, tableType);
+      totalDocs = getTotalDocs(sizeFromController, tableType);
+    }
+    String status = tableConfig != null ? "ONLINE" : (segments > 0 ? "ONLINE" 
: "UNKNOWN");
+    String tableConfigJson = "";
+    if (tableConfig != null) {
+      try {
+        tableConfigJson = JsonUtils.objectToString(tableConfig);
+      } catch (Exception e) {
+        LOGGER.warn("Failed to serialize table config for {}: {}", 
tableNameWithType, e.toString());
+        tableConfigJson = tableConfig.toString();
+      }
+    }
+    return new TableStats(tableType.name(), status, segments, totalDocs, 
reportedSize, estimatedSize, tierValue,
+        tableConfigJson, brokerTenant, serverTenant, replicas);
+  }
+
+  private boolean matchesFilter(SystemTableFilter filter, TableStats stats, 
String rawTableName) {
+    if (filter == null) {
+      return true;
+    }
+    if (filter.getChildren().isEmpty()) {
+      return matchesLeafFilter(filter, stats, rawTableName);
+    }
+    switch (filter.getOperator()) {
+      case AND:
+        for (SystemTableFilter child : filter.getChildren()) {
+          if (!matchesFilter(child, stats, rawTableName)) {
+            return false;
+          }
+        }
+        return true;
+      case OR:
+        for (SystemTableFilter child : filter.getChildren()) {
+          if (matchesFilter(child, stats, rawTableName)) {
+            return true;
+          }
+        }
+        return false;
+      case NOT:
+        return filter.getChildren().isEmpty() || 
!matchesFilter(filter.getChildren().get(0), stats, rawTableName);
+      default:
+        return true;
+    }
+  }
+
+  private boolean matchesLeafFilter(SystemTableFilter filter, TableStats 
stats, String rawTableName) {
+    String column = filter.getColumn();
+    if (column == null) {
+      return true;
+    }
+    List<String> values = filter.getValues();
+    if (values == null || values.isEmpty()) {
+      return true;
+    }
+    switch (column.toLowerCase()) {
+      case "tablename":
+        return matchesString(values, rawTableName, filter.getOperator());
+      case "type":
+        return matchesString(values, stats._type, filter.getOperator());
+      case "status":
+        return matchesString(values, stats._status, filter.getOperator());
+      case "segments":
+        return matchesNumber(values, stats._segments, filter.getOperator());
+      case "reportedsize":
+        return matchesNumber(values, stats._reportedSizeInBytes, 
filter.getOperator());
+      case "estimatedsize":
+        return matchesNumber(values, stats._estimatedSizeInBytes, 
filter.getOperator());
+      default:
+        return true;
+    }
+  }
+
+  private boolean matchesString(List<String> candidates, String actual, 
SystemTableFilter.Operator operator) {
+    switch (operator) {
+      case EQ:
+        return candidates.stream().anyMatch(v -> v.equalsIgnoreCase(actual));
+      case IN:
+        return candidates.stream().anyMatch(v -> v.equalsIgnoreCase(actual));
+      case NEQ:
+        return candidates.stream().noneMatch(v -> v.equalsIgnoreCase(actual));
+      default:
+        return true;
+    }
+  }
+
+  private boolean matchesNumber(List<String> candidates, long actual, 
SystemTableFilter.Operator operator) {
+    try {
+      switch (operator) {
+        case EQ:
+          return candidates.stream().anyMatch(v -> Long.parseLong(v) == 
actual);
+        case NEQ:
+          return candidates.stream().noneMatch(v -> Long.parseLong(v) == 
actual);
+        case GT:
+          return candidates.stream().anyMatch(v -> actual > Long.parseLong(v));
+        case GTE:
+          return candidates.stream().anyMatch(v -> actual >= 
Long.parseLong(v));
+        case LT:
+          return candidates.stream().anyMatch(v -> actual < Long.parseLong(v));
+        case LTE:
+          return candidates.stream().anyMatch(v -> actual <= 
Long.parseLong(v));
+        case IN:
+          for (String candidate : candidates) {
+            if (actual == Long.parseLong(candidate)) {
+              return true;
+            }
+          }
+          return false;
+        default:
+          return true;
+      }
+    } catch (NumberFormatException e) {
+      LOGGER.debug("Failed to parse numeric filter value {}: {}", candidates, 
e.toString());
+      return true;
+    }
+  }
+
+  private @Nullable TableSize fetchTableSize(String tableNameWithType) {
+    Function<String, TableSize> fetcher = getSizeFetcher();
+    if (fetcher != null) {
+      try {
+        TableSize fetched = fetcher.apply(tableNameWithType);
+        if (fetched != null) {
+          return fetched;
+        }
+      } catch (Exception e) {
+        LOGGER.warn("Table size fetcher failed for {}: {}", tableNameWithType, 
e.toString());
+      }
+    }
+    String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+    TableSize size = fetchTableSizeForName(rawTableName);
+    if (size == null) {
+      size = fetchTableSizeForName(tableNameWithType);
+      if (size == null) {
+        LOGGER.warn("system.tables: failed to fetch size for {} (raw: {}), 
falling back to zeros",
+            tableNameWithType, rawTableName);
+      }
+    }
+    return size;
+  }
+
+  private @Nullable TableSize fetchTableSizeForName(String tableName) {
+    for (String baseUrl : getControllerBaseUrls()) {
+      try {
+        String url = baseUrl + "/tables/" + tableName + 
"/size?verbose=true&includeReplacedSegments=false";
+        HttpRequest request = HttpRequest.newBuilder(URI.create(url))
+            .timeout(Duration.ofSeconds(5))
+            .GET()
+            .header("Accept", "application/json")
+            .build();
+        HttpResponse<String> response = _httpClient.send(request, 
HttpResponse.BodyHandlers.ofString());
+        if (response.statusCode() >= 200 && response.statusCode() < 300) {
+          TableSize parsed = JsonUtils.stringToObject(response.body(), 
TableSize.class);
+          LOGGER.debug("system.tables: controller size response for {} via {} 
-> segments offline={}, realtime={}, "
+                  + "reportedSize={}, estimatedSize={}", tableName, baseUrl,
+              parsed._offlineSegments != null && 
parsed._offlineSegments._segments != null
+                  ? parsed._offlineSegments._segments.size() : 0,
+              parsed._realtimeSegments != null && 
parsed._realtimeSegments._segments != null
+                  ? parsed._realtimeSegments._segments.size() : 0,
+              parsed._reportedSizeInBytes, parsed._estimatedSizeInBytes);
+          return parsed;
+        } else {
+          LOGGER.warn("system.tables: failed to fetch table size for {} via 
{}: status {}, body={}", tableName,
+              baseUrl, response.statusCode(), response.body());
+        }
+      } catch (Exception e) {
+        LOGGER.warn("system.tables: error fetching table size for {} via {}", 
tableName, baseUrl, e);
+      }
+    }
+    return null;
+  }
+
+  private List<String> getControllerBaseUrls() {
+    Set<String> urls = new LinkedHashSet<>();
+    if (_helixAdmin != null) {
+      for (String controller : discoverControllersFromHelix()) {
+        String normalized = normalizeControllerUrl(controller);
+        if (normalized != null) {
+          urls.add(normalized);
+        }
+      }
+    }
+    for (String url : _staticControllerUrls) {
+      String normalized = normalizeControllerUrl(url);
+      if (normalized != null) {
+        urls.add(normalized);
+      }
+    }
+    return new ArrayList<>(urls);
+  }
+
+  private int getSegmentCount(TableSize sizeFromController, TableType 
tableType) {
+    if (tableType == TableType.OFFLINE && sizeFromController._offlineSegments 
!= null
+        && sizeFromController._offlineSegments._segments != null) {
+      return sizeFromController._offlineSegments._segments.size();
+    }
+    if (tableType == TableType.REALTIME && 
sizeFromController._realtimeSegments != null
+        && sizeFromController._realtimeSegments._segments != null) {
+      return sizeFromController._realtimeSegments._segments.size();
+    }
+    return 0;
+  }
+
+  private long getTotalDocs(TableSize sizeFromController, TableType tableType) 
{
+    if (tableType == TableType.OFFLINE && sizeFromController._offlineSegments 
!= null
+        && sizeFromController._offlineSegments._segments != null) {
+      return sizeFromController._offlineSegments._segments.values().stream()
+          .mapToLong(segmentSize -> segmentSize._totalDocs).sum();
+    }
+    if (tableType == TableType.REALTIME && 
sizeFromController._realtimeSegments != null
+        && sizeFromController._realtimeSegments._segments != null) {
+      return sizeFromController._realtimeSegments._segments.values().stream()
+          .mapToLong(segmentSize -> segmentSize._totalDocs).sum();
+    }
+    return 0;
+  }
+
+  private @Nullable Function<String, TableSize> getSizeFetcher() {
+    if (_tableSizeFetcherOverride != null) {
+      return _tableSizeFetcherOverride;
+    }
+    List<String> controllers = getControllerBaseUrls();
+    if (controllers.isEmpty() || !isAdminClientAvailable()) {
+      return null;
+    }
+    return tableNameWithType -> fetchWithAdminClient(controllers, 
tableNameWithType);
+  }
+
+  private List<String> discoverControllersFromHelix() {
+    List<String> urls = new ArrayList<>();
+    try {
+      if (_clusterName == null) {
+        LOGGER.warn("Cannot discover controllers without cluster name");
+        return List.of();
+      }
+      for (String controllerId : 
_helixAdmin.getInstancesInCluster(_clusterName)) {
+        if (!InstanceTypeUtils.isController(controllerId)) {
+          continue;
+        }
+        int idx = controllerId.lastIndexOf('_');
+        if (idx > 0 && idx < controllerId.length() - 1) {
+          String host = controllerId.substring(controllerId.indexOf('_') + 1, 
idx);
+          String port = controllerId.substring(idx + 1);
+          urls.add(host + ":" + port);
+        } else {
+          LOGGER.warn("Unable to parse controller address from instance id: 
{}", controllerId);
+        }
+      }

Review Comment:
   The parsing logic uses `indexOf('_')` for the host extraction but 
`lastIndexOf('_')` for validation and port extraction. If the controller ID has 
multiple underscores (e.g., "Controller_host_name_8080"), the host will be 
incorrectly extracted starting from the first underscore instead of after 
"Controller_". Use `idx` consistently or validate the controller ID format more 
carefully.
   ```suggestion
           int firstUnderscoreIdx = controllerId.indexOf('_');
           int lastUnderscoreIdx = controllerId.lastIndexOf('_');
           // Ensure the controllerId starts with "Controller_", and has at 
least two underscores
           if (firstUnderscoreIdx > 0 && lastUnderscoreIdx > firstUnderscoreIdx 
&& lastUnderscoreIdx < controllerId.length() - 1) {
             String host = controllerId.substring(firstUnderscoreIdx + 1, 
lastUnderscoreIdx);
             String port = controllerId.substring(lastUnderscoreIdx + 1);
             urls.add(host + ":" + port);
           } else {
             LOGGER.warn("Unable to parse controller address from instance id: 
{}", controllerId);
           }
   ```



##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java:
##########
@@ -1033,6 +1046,244 @@ private void throwAccessDeniedError(long requestId, 
String query, RequestContext
     throw new WebApplicationException("Permission denied." + failureMessage, 
Response.Status.FORBIDDEN);
   }
 
+  private boolean isSystemTable(String tableName) {
+    return tableName != null && 
tableName.toLowerCase(Locale.ROOT).startsWith("system.");
+  }
+
+  private BrokerResponse handleSystemTableQuery(PinotQuery pinotQuery, String 
tableName,
+      RequestContext requestContext, @Nullable RequesterIdentity 
requesterIdentity, String query) {
+    if (pinotQuery.isExplain()) {
+      return BrokerResponseNative.BROKER_ONLY_EXPLAIN_PLAN_OUTPUT;
+    }
+    SystemTableProvider provider = _systemTableRegistry.get(tableName);
+    if (provider == null) {
+      requestContext.setErrorCode(QueryErrorCode.TABLE_DOES_NOT_EXIST);
+      return BrokerResponseNative.TABLE_DOES_NOT_EXIST;
+    }
+    try {
+      if (!isSupportedSystemTableQuery(pinotQuery)) {
+        requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION);
+        return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION,
+            "System tables only support simple projection/filter/limit 
queries");
+      }
+      Schema systemSchema = provider.getSchema();
+      List<String> projectionColumns = extractProjectionColumns(pinotQuery, 
systemSchema);
+      int offset = Math.max(0, pinotQuery.getOffset());
+      int limit = Math.max(0, pinotQuery.getLimit());

Review Comment:
   When offset or limit are negative in the PinotQuery, they are clamped to 0. 
However, a limit of 0 has special semantics (no results returned) which may not 
be the intended behavior when the original limit was negative (typically 
meaning "no limit"). Consider using -1 or Integer.MAX_VALUE to represent "no 
limit" rather than 0.
   ```suggestion
         int limit = pinotQuery.getLimit() < 0 ? -1 : pinotQuery.getLimit();
   ```



##########
pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java:
##########
@@ -354,6 +357,12 @@ public void start()
     boolean caseInsensitive =
         _brokerConf.getProperty(Helix.ENABLE_CASE_INSENSITIVE_KEY, 
Helix.DEFAULT_ENABLE_CASE_INSENSITIVE);
     TableCache tableCache = new ZkTableCache(_propertyStore, caseInsensitive);
+    if (!SystemTableRegistry.INSTANCE.isRegistered("system.tables")) {
+      SystemTableRegistry.INSTANCE.register(new 
TablesSystemTableProvider(tableCache, _helixAdmin, _clusterName));
+    }
+    if (!SystemTableRegistry.INSTANCE.isRegistered("system.instances")) {
+      SystemTableRegistry.INSTANCE.register(new 
InstancesSystemTableProvider());

Review Comment:
   Using hardcoded string literals "system.tables" and "system.instances" for 
registration checks creates a risk of typos and makes the code harder to 
maintain. Define constants for these table names (e.g., in the provider classes 
or a shared constants file) and reference them here.



##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java:
##########
@@ -1033,6 +1046,244 @@ private void throwAccessDeniedError(long requestId, 
String query, RequestContext
     throw new WebApplicationException("Permission denied." + failureMessage, 
Response.Status.FORBIDDEN);
   }
 
+  private boolean isSystemTable(String tableName) {
+    return tableName != null && 
tableName.toLowerCase(Locale.ROOT).startsWith("system.");
+  }
+
+  private BrokerResponse handleSystemTableQuery(PinotQuery pinotQuery, String 
tableName,
+      RequestContext requestContext, @Nullable RequesterIdentity 
requesterIdentity, String query) {
+    if (pinotQuery.isExplain()) {
+      return BrokerResponseNative.BROKER_ONLY_EXPLAIN_PLAN_OUTPUT;
+    }
+    SystemTableProvider provider = _systemTableRegistry.get(tableName);
+    if (provider == null) {
+      requestContext.setErrorCode(QueryErrorCode.TABLE_DOES_NOT_EXIST);
+      return BrokerResponseNative.TABLE_DOES_NOT_EXIST;
+    }
+    try {
+      if (!isSupportedSystemTableQuery(pinotQuery)) {
+        requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION);
+        return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION,
+            "System tables only support simple projection/filter/limit 
queries");
+      }
+      Schema systemSchema = provider.getSchema();
+      List<String> projectionColumns = extractProjectionColumns(pinotQuery, 
systemSchema);
+      int offset = Math.max(0, pinotQuery.getOffset());
+      int limit = Math.max(0, pinotQuery.getLimit());
+      SystemTableRequest systemTableRequest =
+          new SystemTableRequest(projectionColumns, 
toSystemTableFilter(pinotQuery.getFilterExpression()), offset,
+              limit);
+      SystemTableResponse systemTableResponse = 
provider.getRows(systemTableRequest);
+      BrokerResponseNative brokerResponse =
+          buildSystemTableBrokerResponse(tableName, systemSchema, 
projectionColumns, systemTableResponse,
+              requestContext);
+      brokerResponse.setTimeUsedMs(System.currentTimeMillis() - 
requestContext.getRequestArrivalTimeMillis());
+      _queryLogger.log(new QueryLogger.QueryLogParams(requestContext, 
tableName, brokerResponse,
+          QueryLogger.QueryLogParams.QueryEngine.SINGLE_STAGE, 
requesterIdentity, null));
+      return brokerResponse;
+    } catch (BadQueryRequestException e) {
+      requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION);
+      
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 
1);
+      return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION, 
e.getMessage());
+    } catch (Exception e) {
+      LOGGER.warn("Caught exception while handling system table query {}: {}", 
tableName, e.getMessage(), e);
+      requestContext.setErrorCode(QueryErrorCode.QUERY_EXECUTION);
+      return new BrokerResponseNative(QueryErrorCode.QUERY_EXECUTION, 
e.getMessage());
+    }
+  }
+
+  private boolean isSupportedSystemTableQuery(PinotQuery pinotQuery) {
+    return (pinotQuery.getGroupByList() == null || 
pinotQuery.getGroupByList().isEmpty())
+        && pinotQuery.getHavingExpression() == null
+        && (pinotQuery.getOrderByList() == null || 
pinotQuery.getOrderByList().isEmpty());
+  }
+
+  private List<String> extractProjectionColumns(PinotQuery pinotQuery, Schema 
schema)
+      throws BadQueryRequestException {
+    List<String> projections = new ArrayList<>();
+    boolean hasStar = false;
+    List<Expression> selectList = pinotQuery.getSelectList();
+    if (CollectionUtils.isEmpty(selectList)) {
+      throw new BadQueryRequestException("System tables require a projection 
list");
+    }
+    for (Expression expression : selectList) {
+      Identifier identifier = expression.getIdentifier();
+      if (identifier != null) {
+        if ("*".equals(identifier.getName())) {
+          hasStar = true;
+        } else {
+          projections.add(identifier.getName());
+        }
+        continue;
+      }
+      Function function = expression.getFunctionCall();
+      if (function != null && "AS".equalsIgnoreCase(function.getOperator()) && 
!function.getOperands().isEmpty()) {
+        Identifier aliased = function.getOperands().get(0).getIdentifier();
+        if (aliased != null) {
+          projections.add(aliased.getName());
+          continue;
+        }
+      }
+      throw new BadQueryRequestException("System tables only support column 
projections or '*'");

Review Comment:
   The error message doesn't explain what was actually encountered that caused 
the failure. Include the problematic expression in the error message (e.g., 
"System tables only support column projections or '*', but found: " + 
expression) to help users debug their queries.
   ```suggestion
         throw new BadQueryRequestException("System tables only support column 
projections or '*', but found: " + expression);
   ```



##########
pinot-common/src/main/java/org/apache/pinot/common/systemtable/provider/TablesSystemTableProvider.java:
##########
@@ -0,0 +1,597 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.systemtable.provider;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.apache.helix.HelixAdmin;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.systemtable.SystemTableFilter;
+import org.apache.pinot.spi.systemtable.SystemTableProvider;
+import org.apache.pinot.spi.systemtable.SystemTableRequest;
+import org.apache.pinot.spi.systemtable.SystemTableResponse;
+import org.apache.pinot.spi.utils.InstanceTypeUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Basic system table exposing table-level metadata populated from the broker 
{@link TableCache}.
+ */
+public final class TablesSystemTableProvider implements SystemTableProvider {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TablesSystemTableProvider.class);
+
+  private static final Schema SCHEMA = new 
Schema.SchemaBuilder().setSchemaName("system.tables")
+      .addSingleValueDimension("tableName", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("type", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("status", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("segments", FieldSpec.DataType.INT)
+      .addSingleValueDimension("totalDocs", FieldSpec.DataType.LONG)
+      .addMetric("reportedSize", FieldSpec.DataType.LONG)
+      .addMetric("estimatedSize", FieldSpec.DataType.LONG)
+      .addSingleValueDimension("storageTier", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("brokerTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("serverTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("replicas", FieldSpec.DataType.INT)
+      .addSingleValueDimension("tableConfig", FieldSpec.DataType.STRING)
+      .build();
+
+  private static final boolean IS_ADMIN_CLIENT_AVAILABLE;
+  static {
+    boolean available;
+    try {
+      Class.forName("org.apache.pinot.client.admin.PinotAdminClient");
+      available = true;
+    } catch (ClassNotFoundException e) {
+      available = false;
+    }
+    IS_ADMIN_CLIENT_AVAILABLE = available;
+  }
+
+  private final TableCache _tableCache;
+  private final @Nullable HelixAdmin _helixAdmin;
+  private final @Nullable String _clusterName;
+  private final HttpClient _httpClient;
+  private final @Nullable Function<String, TableSize> 
_tableSizeFetcherOverride;
+  private final List<String> _staticControllerUrls;
+
+  public TablesSystemTableProvider() {
+    this(null, null, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache) {
+    this(tableCache, null, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin) {
+    this(tableCache, helixAdmin, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin,
+      @Nullable String clusterName) {
+    this(tableCache, helixAdmin, clusterName, null, null);
+  }
+
+  TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin, @Nullable String clusterName,
+      @Nullable Function<String, TableSize> tableSizeFetcherOverride, 
@Nullable List<String> controllerUrls) {
+    _tableCache = tableCache;
+    _helixAdmin = helixAdmin;
+    _clusterName = clusterName;
+    _httpClient = 
HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(5)).build();
+    _tableSizeFetcherOverride = tableSizeFetcherOverride;
+    _staticControllerUrls = controllerUrls != null ? new 
ArrayList<>(controllerUrls) : List.of();
+  }
+
+  @Override
+  public String getTableName() {
+    return "system.tables";
+  }
+
+  @Override
+  public Schema getSchema() {
+    return SCHEMA;
+  }
+
+  @Override
+  public SystemTableResponse getRows(SystemTableRequest request) {
+    if (_tableCache == null) {
+      return new SystemTableResponse(List.of(), System.currentTimeMillis(), 0);
+    }
+    Set<String> tableNamesWithType = new LinkedHashSet<>();
+    for (String tableName : _tableCache.getTableNameMap().values()) {
+      if (TableNameBuilder.getTableTypeFromTableName(tableName) != null) {
+        tableNamesWithType.add(tableName);
+      }
+    }
+    List<String> sortedTableNames = new ArrayList<>(tableNamesWithType);
+    sortedTableNames.sort(Comparator.naturalOrder());
+
+    int offset = Math.max(0, request.getOffset());
+    int limit = request.getLimit();
+    boolean hasLimit = limit > 0;
+    int totalRows = 0;
+    int initialCapacity = hasLimit ? Math.min(sortedTableNames.size(), limit) 
: 0;
+    List<GenericRow> rows = new ArrayList<>(initialCapacity);
+    for (String tableNameWithType : sortedTableNames) {
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType == null) {
+        continue;
+      }
+      String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+      TableStats stats = buildStats(tableNameWithType, tableType);
+      if (!matchesFilter(request.getFilter(), stats, rawTableName)) {
+        continue;
+      }
+      totalRows++;
+      if (offset > 0) {
+        offset--;
+        continue;
+      }
+      if (limit == 0) {
+        continue;

Review Comment:
   When limit is 0, the loop continues to process remaining tables even though 
no more rows should be added. This is inefficient and wastes CPU cycles. Add a 
break statement here instead of continue to exit the loop early once the limit 
is reached.
   ```suggestion
           break;
   ```



##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java:
##########
@@ -1033,6 +1046,244 @@ private void throwAccessDeniedError(long requestId, 
String query, RequestContext
     throw new WebApplicationException("Permission denied." + failureMessage, 
Response.Status.FORBIDDEN);
   }
 
+  private boolean isSystemTable(String tableName) {
+    return tableName != null && 
tableName.toLowerCase(Locale.ROOT).startsWith("system.");
+  }
+
+  private BrokerResponse handleSystemTableQuery(PinotQuery pinotQuery, String 
tableName,
+      RequestContext requestContext, @Nullable RequesterIdentity 
requesterIdentity, String query) {
+    if (pinotQuery.isExplain()) {
+      return BrokerResponseNative.BROKER_ONLY_EXPLAIN_PLAN_OUTPUT;
+    }
+    SystemTableProvider provider = _systemTableRegistry.get(tableName);
+    if (provider == null) {
+      requestContext.setErrorCode(QueryErrorCode.TABLE_DOES_NOT_EXIST);
+      return BrokerResponseNative.TABLE_DOES_NOT_EXIST;
+    }
+    try {
+      if (!isSupportedSystemTableQuery(pinotQuery)) {
+        requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION);
+        return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION,
+            "System tables only support simple projection/filter/limit 
queries");
+      }
+      Schema systemSchema = provider.getSchema();
+      List<String> projectionColumns = extractProjectionColumns(pinotQuery, 
systemSchema);
+      int offset = Math.max(0, pinotQuery.getOffset());
+      int limit = Math.max(0, pinotQuery.getLimit());
+      SystemTableRequest systemTableRequest =
+          new SystemTableRequest(projectionColumns, 
toSystemTableFilter(pinotQuery.getFilterExpression()), offset,
+              limit);
+      SystemTableResponse systemTableResponse = 
provider.getRows(systemTableRequest);
+      BrokerResponseNative brokerResponse =
+          buildSystemTableBrokerResponse(tableName, systemSchema, 
projectionColumns, systemTableResponse,
+              requestContext);
+      brokerResponse.setTimeUsedMs(System.currentTimeMillis() - 
requestContext.getRequestArrivalTimeMillis());
+      _queryLogger.log(new QueryLogger.QueryLogParams(requestContext, 
tableName, brokerResponse,
+          QueryLogger.QueryLogParams.QueryEngine.SINGLE_STAGE, 
requesterIdentity, null));
+      return brokerResponse;
+    } catch (BadQueryRequestException e) {
+      requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION);
+      
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 
1);
+      return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION, 
e.getMessage());
+    } catch (Exception e) {
+      LOGGER.warn("Caught exception while handling system table query {}: {}", 
tableName, e.getMessage(), e);
+      requestContext.setErrorCode(QueryErrorCode.QUERY_EXECUTION);
+      return new BrokerResponseNative(QueryErrorCode.QUERY_EXECUTION, 
e.getMessage());
+    }
+  }
+
+  private boolean isSupportedSystemTableQuery(PinotQuery pinotQuery) {
+    return (pinotQuery.getGroupByList() == null || 
pinotQuery.getGroupByList().isEmpty())
+        && pinotQuery.getHavingExpression() == null
+        && (pinotQuery.getOrderByList() == null || 
pinotQuery.getOrderByList().isEmpty());
+  }
+
+  private List<String> extractProjectionColumns(PinotQuery pinotQuery, Schema 
schema)
+      throws BadQueryRequestException {
+    List<String> projections = new ArrayList<>();
+    boolean hasStar = false;
+    List<Expression> selectList = pinotQuery.getSelectList();
+    if (CollectionUtils.isEmpty(selectList)) {
+      throw new BadQueryRequestException("System tables require a projection 
list");

Review Comment:
   The error message "System tables require a projection list" is unclear 
because empty select lists are already handled by the Pinot query parser and 
would fail earlier. If this check is defensive, clarify the message to indicate 
this is an internal error (e.g., "Internal error: system table query missing 
projection list").
   ```suggestion
         throw new BadQueryRequestException("Internal error: system table query 
missing projection list");
   ```



##########
pinot-common/src/main/java/org/apache/pinot/common/systemtable/provider/TablesSystemTableProvider.java:
##########
@@ -0,0 +1,597 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.systemtable.provider;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.apache.helix.HelixAdmin;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.systemtable.SystemTableFilter;
+import org.apache.pinot.spi.systemtable.SystemTableProvider;
+import org.apache.pinot.spi.systemtable.SystemTableRequest;
+import org.apache.pinot.spi.systemtable.SystemTableResponse;
+import org.apache.pinot.spi.utils.InstanceTypeUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Basic system table exposing table-level metadata populated from the broker 
{@link TableCache}.
+ */
+public final class TablesSystemTableProvider implements SystemTableProvider {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TablesSystemTableProvider.class);
+
+  private static final Schema SCHEMA = new 
Schema.SchemaBuilder().setSchemaName("system.tables")
+      .addSingleValueDimension("tableName", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("type", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("status", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("segments", FieldSpec.DataType.INT)
+      .addSingleValueDimension("totalDocs", FieldSpec.DataType.LONG)
+      .addMetric("reportedSize", FieldSpec.DataType.LONG)
+      .addMetric("estimatedSize", FieldSpec.DataType.LONG)
+      .addSingleValueDimension("storageTier", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("brokerTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("serverTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("replicas", FieldSpec.DataType.INT)
+      .addSingleValueDimension("tableConfig", FieldSpec.DataType.STRING)
+      .build();
+
+  private static final boolean IS_ADMIN_CLIENT_AVAILABLE;
+  static {
+    boolean available;
+    try {
+      Class.forName("org.apache.pinot.client.admin.PinotAdminClient");
+      available = true;
+    } catch (ClassNotFoundException e) {
+      available = false;
+    }
+    IS_ADMIN_CLIENT_AVAILABLE = available;
+  }
+
+  private final TableCache _tableCache;
+  private final @Nullable HelixAdmin _helixAdmin;
+  private final @Nullable String _clusterName;
+  private final HttpClient _httpClient;
+  private final @Nullable Function<String, TableSize> 
_tableSizeFetcherOverride;
+  private final List<String> _staticControllerUrls;
+
+  public TablesSystemTableProvider() {
+    this(null, null, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache) {
+    this(tableCache, null, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin) {
+    this(tableCache, helixAdmin, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin,
+      @Nullable String clusterName) {
+    this(tableCache, helixAdmin, clusterName, null, null);
+  }
+
+  TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin, @Nullable String clusterName,
+      @Nullable Function<String, TableSize> tableSizeFetcherOverride, 
@Nullable List<String> controllerUrls) {
+    _tableCache = tableCache;
+    _helixAdmin = helixAdmin;
+    _clusterName = clusterName;
+    _httpClient = 
HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(5)).build();
+    _tableSizeFetcherOverride = tableSizeFetcherOverride;
+    _staticControllerUrls = controllerUrls != null ? new 
ArrayList<>(controllerUrls) : List.of();
+  }
+
+  @Override
+  public String getTableName() {
+    return "system.tables";
+  }
+
+  @Override
+  public Schema getSchema() {
+    return SCHEMA;
+  }
+
+  @Override
+  public SystemTableResponse getRows(SystemTableRequest request) {
+    if (_tableCache == null) {
+      return new SystemTableResponse(List.of(), System.currentTimeMillis(), 0);
+    }
+    Set<String> tableNamesWithType = new LinkedHashSet<>();
+    for (String tableName : _tableCache.getTableNameMap().values()) {
+      if (TableNameBuilder.getTableTypeFromTableName(tableName) != null) {
+        tableNamesWithType.add(tableName);
+      }
+    }
+    List<String> sortedTableNames = new ArrayList<>(tableNamesWithType);
+    sortedTableNames.sort(Comparator.naturalOrder());
+
+    int offset = Math.max(0, request.getOffset());
+    int limit = request.getLimit();
+    boolean hasLimit = limit > 0;
+    int totalRows = 0;
+    int initialCapacity = hasLimit ? Math.min(sortedTableNames.size(), limit) 
: 0;
+    List<GenericRow> rows = new ArrayList<>(initialCapacity);
+    for (String tableNameWithType : sortedTableNames) {
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType == null) {
+        continue;
+      }
+      String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+      TableStats stats = buildStats(tableNameWithType, tableType);
+      if (!matchesFilter(request.getFilter(), stats, rawTableName)) {
+        continue;
+      }
+      totalRows++;
+      if (offset > 0) {
+        offset--;
+        continue;
+      }
+      if (limit == 0) {
+        continue;
+      }
+      if (hasLimit && rows.size() >= limit) {
+        continue;
+      }
+      GenericRow row = new GenericRow();
+      row.putValue("tableName", rawTableName);
+      row.putValue("type", stats._type);
+      row.putValue("status", stats._status);
+      row.putValue("segments", stats._segments);
+      row.putValue("totalDocs", stats._totalDocs);
+      row.putValue("reportedSize", stats._reportedSizeInBytes);
+      row.putValue("estimatedSize", stats._estimatedSizeInBytes);
+      row.putValue("storageTier", stats._storageTier);
+      row.putValue("brokerTenant", stats._brokerTenant);
+      row.putValue("serverTenant", stats._serverTenant);
+      row.putValue("replicas", stats._replicas);
+      row.putValue("tableConfig", stats._tableConfig);
+      rows.add(row);
+    }
+    return new SystemTableResponse(rows, System.currentTimeMillis(), 
totalRows);
+  }
+
+  private TableStats buildStats(String tableNameWithType, TableType tableType) 
{
+    TableConfig tableConfig = _tableCache.getTableConfig(tableNameWithType);
+    int segments = 0;
+    long totalDocs = 0;
+    long reportedSize = 0;
+    long estimatedSize = 0;
+    String tierValue = "";
+    String brokerTenant = "";
+    String serverTenant = "";
+    int replicas = 0;
+    if (tableConfig != null && tableConfig.getTenantConfig() != null) {
+      brokerTenant = tableConfig.getTenantConfig().getBroker();
+      serverTenant = tableConfig.getTenantConfig().getServer();
+    }
+    if (tableConfig != null && tableConfig.getValidationConfig() != null) {
+      Integer repl = tableConfig.getValidationConfig().getReplicationNumber();
+      replicas = repl != null ? repl : replicas;
+    }
+    // Use controller API only
+    TableSize sizeFromController = fetchTableSize(tableNameWithType);
+    if (sizeFromController != null) {
+      if (sizeFromController._reportedSizeInBytes >= 0) {
+        reportedSize = sizeFromController._reportedSizeInBytes;
+      }
+      if (sizeFromController._estimatedSizeInBytes >= 0) {
+        estimatedSize = sizeFromController._estimatedSizeInBytes;
+      }
+      segments = getSegmentCount(sizeFromController, tableType);
+      totalDocs = getTotalDocs(sizeFromController, tableType);
+    }
+    String status = tableConfig != null ? "ONLINE" : (segments > 0 ? "ONLINE" 
: "UNKNOWN");

Review Comment:
   The status determination logic is duplicated (both branches return "ONLINE" 
when their condition is true). Simplify this to `String status = (tableConfig 
!= null || segments > 0) ? "ONLINE" : "UNKNOWN";` for better readability and 
maintainability.
   ```suggestion
       String status = (tableConfig != null || segments > 0) ? "ONLINE" : 
"UNKNOWN";
   ```



##########
pinot-common/src/main/java/org/apache/pinot/common/systemtable/provider/TablesSystemTableProvider.java:
##########
@@ -0,0 +1,597 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.systemtable.provider;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.apache.helix.HelixAdmin;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.systemtable.SystemTableFilter;
+import org.apache.pinot.spi.systemtable.SystemTableProvider;
+import org.apache.pinot.spi.systemtable.SystemTableRequest;
+import org.apache.pinot.spi.systemtable.SystemTableResponse;
+import org.apache.pinot.spi.utils.InstanceTypeUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Basic system table exposing table-level metadata populated from the broker 
{@link TableCache}.
+ */
+public final class TablesSystemTableProvider implements SystemTableProvider {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TablesSystemTableProvider.class);
+
+  private static final Schema SCHEMA = new 
Schema.SchemaBuilder().setSchemaName("system.tables")
+      .addSingleValueDimension("tableName", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("type", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("status", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("segments", FieldSpec.DataType.INT)
+      .addSingleValueDimension("totalDocs", FieldSpec.DataType.LONG)
+      .addMetric("reportedSize", FieldSpec.DataType.LONG)
+      .addMetric("estimatedSize", FieldSpec.DataType.LONG)
+      .addSingleValueDimension("storageTier", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("brokerTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("serverTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("replicas", FieldSpec.DataType.INT)
+      .addSingleValueDimension("tableConfig", FieldSpec.DataType.STRING)
+      .build();
+
+  private static final boolean IS_ADMIN_CLIENT_AVAILABLE;
+  static {
+    boolean available;
+    try {
+      Class.forName("org.apache.pinot.client.admin.PinotAdminClient");
+      available = true;
+    } catch (ClassNotFoundException e) {
+      available = false;
+    }
+    IS_ADMIN_CLIENT_AVAILABLE = available;
+  }
+
+  private final TableCache _tableCache;
+  private final @Nullable HelixAdmin _helixAdmin;
+  private final @Nullable String _clusterName;
+  private final HttpClient _httpClient;
+  private final @Nullable Function<String, TableSize> 
_tableSizeFetcherOverride;
+  private final List<String> _staticControllerUrls;
+
+  public TablesSystemTableProvider() {
+    this(null, null, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache) {
+    this(tableCache, null, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin) {
+    this(tableCache, helixAdmin, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin,
+      @Nullable String clusterName) {
+    this(tableCache, helixAdmin, clusterName, null, null);
+  }
+
+  TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin, @Nullable String clusterName,
+      @Nullable Function<String, TableSize> tableSizeFetcherOverride, 
@Nullable List<String> controllerUrls) {
+    _tableCache = tableCache;
+    _helixAdmin = helixAdmin;
+    _clusterName = clusterName;
+    _httpClient = 
HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(5)).build();
+    _tableSizeFetcherOverride = tableSizeFetcherOverride;
+    _staticControllerUrls = controllerUrls != null ? new 
ArrayList<>(controllerUrls) : List.of();
+  }
+
+  @Override
+  public String getTableName() {
+    return "system.tables";
+  }
+
+  @Override
+  public Schema getSchema() {
+    return SCHEMA;
+  }
+
+  @Override
+  public SystemTableResponse getRows(SystemTableRequest request) {
+    if (_tableCache == null) {
+      return new SystemTableResponse(List.of(), System.currentTimeMillis(), 0);
+    }
+    Set<String> tableNamesWithType = new LinkedHashSet<>();
+    for (String tableName : _tableCache.getTableNameMap().values()) {
+      if (TableNameBuilder.getTableTypeFromTableName(tableName) != null) {
+        tableNamesWithType.add(tableName);
+      }
+    }
+    List<String> sortedTableNames = new ArrayList<>(tableNamesWithType);
+    sortedTableNames.sort(Comparator.naturalOrder());
+
+    int offset = Math.max(0, request.getOffset());
+    int limit = request.getLimit();
+    boolean hasLimit = limit > 0;
+    int totalRows = 0;
+    int initialCapacity = hasLimit ? Math.min(sortedTableNames.size(), limit) 
: 0;
+    List<GenericRow> rows = new ArrayList<>(initialCapacity);
+    for (String tableNameWithType : sortedTableNames) {
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType == null) {
+        continue;
+      }
+      String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+      TableStats stats = buildStats(tableNameWithType, tableType);
+      if (!matchesFilter(request.getFilter(), stats, rawTableName)) {
+        continue;
+      }
+      totalRows++;
+      if (offset > 0) {
+        offset--;
+        continue;
+      }
+      if (limit == 0) {
+        continue;
+      }
+      if (hasLimit && rows.size() >= limit) {
+        continue;
+      }
+      GenericRow row = new GenericRow();
+      row.putValue("tableName", rawTableName);
+      row.putValue("type", stats._type);
+      row.putValue("status", stats._status);
+      row.putValue("segments", stats._segments);
+      row.putValue("totalDocs", stats._totalDocs);
+      row.putValue("reportedSize", stats._reportedSizeInBytes);
+      row.putValue("estimatedSize", stats._estimatedSizeInBytes);
+      row.putValue("storageTier", stats._storageTier);
+      row.putValue("brokerTenant", stats._brokerTenant);
+      row.putValue("serverTenant", stats._serverTenant);
+      row.putValue("replicas", stats._replicas);
+      row.putValue("tableConfig", stats._tableConfig);
+      rows.add(row);
+    }
+    return new SystemTableResponse(rows, System.currentTimeMillis(), 
totalRows);
+  }
+
+  private TableStats buildStats(String tableNameWithType, TableType tableType) 
{
+    TableConfig tableConfig = _tableCache.getTableConfig(tableNameWithType);
+    int segments = 0;
+    long totalDocs = 0;
+    long reportedSize = 0;
+    long estimatedSize = 0;
+    String tierValue = "";
+    String brokerTenant = "";
+    String serverTenant = "";
+    int replicas = 0;
+    if (tableConfig != null && tableConfig.getTenantConfig() != null) {
+      brokerTenant = tableConfig.getTenantConfig().getBroker();
+      serverTenant = tableConfig.getTenantConfig().getServer();
+    }
+    if (tableConfig != null && tableConfig.getValidationConfig() != null) {
+      Integer repl = tableConfig.getValidationConfig().getReplicationNumber();
+      replicas = repl != null ? repl : replicas;
+    }
+    // Use controller API only
+    TableSize sizeFromController = fetchTableSize(tableNameWithType);
+    if (sizeFromController != null) {
+      if (sizeFromController._reportedSizeInBytes >= 0) {
+        reportedSize = sizeFromController._reportedSizeInBytes;
+      }
+      if (sizeFromController._estimatedSizeInBytes >= 0) {
+        estimatedSize = sizeFromController._estimatedSizeInBytes;
+      }
+      segments = getSegmentCount(sizeFromController, tableType);
+      totalDocs = getTotalDocs(sizeFromController, tableType);
+    }
+    String status = tableConfig != null ? "ONLINE" : (segments > 0 ? "ONLINE" 
: "UNKNOWN");
+    String tableConfigJson = "";
+    if (tableConfig != null) {
+      try {
+        tableConfigJson = JsonUtils.objectToString(tableConfig);
+      } catch (Exception e) {
+        LOGGER.warn("Failed to serialize table config for {}: {}", 
tableNameWithType, e.toString());
+        tableConfigJson = tableConfig.toString();
+      }
+    }
+    return new TableStats(tableType.name(), status, segments, totalDocs, 
reportedSize, estimatedSize, tierValue,
+        tableConfigJson, brokerTenant, serverTenant, replicas);
+  }
+
+  private boolean matchesFilter(SystemTableFilter filter, TableStats stats, 
String rawTableName) {
+    if (filter == null) {
+      return true;
+    }
+    if (filter.getChildren().isEmpty()) {
+      return matchesLeafFilter(filter, stats, rawTableName);
+    }
+    switch (filter.getOperator()) {
+      case AND:
+        for (SystemTableFilter child : filter.getChildren()) {
+          if (!matchesFilter(child, stats, rawTableName)) {
+            return false;
+          }
+        }
+        return true;
+      case OR:
+        for (SystemTableFilter child : filter.getChildren()) {
+          if (matchesFilter(child, stats, rawTableName)) {
+            return true;
+          }
+        }
+        return false;
+      case NOT:
+        return filter.getChildren().isEmpty() || 
!matchesFilter(filter.getChildren().get(0), stats, rawTableName);
+      default:
+        return true;
+    }
+  }
+
+  private boolean matchesLeafFilter(SystemTableFilter filter, TableStats 
stats, String rawTableName) {
+    String column = filter.getColumn();
+    if (column == null) {
+      return true;
+    }
+    List<String> values = filter.getValues();
+    if (values == null || values.isEmpty()) {
+      return true;
+    }
+    switch (column.toLowerCase()) {
+      case "tablename":
+        return matchesString(values, rawTableName, filter.getOperator());
+      case "type":
+        return matchesString(values, stats._type, filter.getOperator());
+      case "status":
+        return matchesString(values, stats._status, filter.getOperator());
+      case "segments":
+        return matchesNumber(values, stats._segments, filter.getOperator());
+      case "reportedsize":
+        return matchesNumber(values, stats._reportedSizeInBytes, 
filter.getOperator());
+      case "estimatedsize":
+        return matchesNumber(values, stats._estimatedSizeInBytes, 
filter.getOperator());
+      default:
+        return true;
+    }
+  }
+
+  private boolean matchesString(List<String> candidates, String actual, 
SystemTableFilter.Operator operator) {
+    switch (operator) {
+      case EQ:
+        return candidates.stream().anyMatch(v -> v.equalsIgnoreCase(actual));
+      case IN:
+        return candidates.stream().anyMatch(v -> v.equalsIgnoreCase(actual));
+      case NEQ:
+        return candidates.stream().noneMatch(v -> v.equalsIgnoreCase(actual));
+      default:
+        return true;
+    }
+  }
+
+  private boolean matchesNumber(List<String> candidates, long actual, 
SystemTableFilter.Operator operator) {
+    try {
+      switch (operator) {
+        case EQ:
+          return candidates.stream().anyMatch(v -> Long.parseLong(v) == 
actual);
+        case NEQ:
+          return candidates.stream().noneMatch(v -> Long.parseLong(v) == 
actual);
+        case GT:
+          return candidates.stream().anyMatch(v -> actual > Long.parseLong(v));
+        case GTE:
+          return candidates.stream().anyMatch(v -> actual >= 
Long.parseLong(v));
+        case LT:
+          return candidates.stream().anyMatch(v -> actual < Long.parseLong(v));
+        case LTE:
+          return candidates.stream().anyMatch(v -> actual <= 
Long.parseLong(v));
+        case IN:
+          for (String candidate : candidates) {
+            if (actual == Long.parseLong(candidate)) {
+              return true;
+            }
+          }
+          return false;
+        default:
+          return true;
+      }
+    } catch (NumberFormatException e) {
+      LOGGER.debug("Failed to parse numeric filter value {}: {}", candidates, 
e.toString());
+      return true;
+    }
+  }
+
+  private @Nullable TableSize fetchTableSize(String tableNameWithType) {
+    Function<String, TableSize> fetcher = getSizeFetcher();
+    if (fetcher != null) {
+      try {
+        TableSize fetched = fetcher.apply(tableNameWithType);
+        if (fetched != null) {
+          return fetched;
+        }
+      } catch (Exception e) {
+        LOGGER.warn("Table size fetcher failed for {}: {}", tableNameWithType, 
e.toString());
+      }
+    }
+    String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+    TableSize size = fetchTableSizeForName(rawTableName);
+    if (size == null) {
+      size = fetchTableSizeForName(tableNameWithType);
+      if (size == null) {
+        LOGGER.warn("system.tables: failed to fetch size for {} (raw: {}), 
falling back to zeros",

Review Comment:
   The warning message says "falling back to zeros" but the code actually 
returns null when size fetching fails, not zero values. Update the message to 
accurately reflect the behavior (e.g., "failed to fetch size, returning null") 
or change the code to return a TableSize object with zero values as the message 
suggests.
   ```suggestion
           LOGGER.warn("system.tables: failed to fetch size for {} (raw: {}), 
returning null",
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to