Copilot commented on code in PR #17291:
URL: https://github.com/apache/pinot/pull/17291#discussion_r2647110034


##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java:
##########
@@ -106,7 +115,27 @@ public BrokerResponse handleRequest(JsonNode request, 
@Nullable SqlNodeAndOption
       }
     }
 
+    Set<String> tableNames = null;
+    if (_systemTableBrokerRequestHandler != null) {
+      try {
+        // NOTE: compileToPinotQuery(SqlNodeAndOptions) mutates the SqlNode 
(e.g. SqlOrderBy -> SqlSelect).
+        // Re-parse from raw SQL so the SqlNodeAndOptions passed to the 
handler is unchanged (important for
+        // Multi-Stage Engine (MSE)).
+        JsonNode sql = request.get(Request.SQL);
+        String sqlQuery =
+            (sql != null && sql.isTextual()) ? sql.asText() : 
sqlNodeAndOptions.getSqlNode().toString();
+        tableNames = 
RequestUtils.getTableNames(CalciteSqlParser.compileToPinotQuery(sqlQuery));

Review Comment:
   This code re-parses the SQL query for every request to determine if it's a 
system table query. This adds unnecessary overhead to all queries. Consider 
extracting table names directly from the already-parsed 
`sqlNodeAndOptions.getSqlNode()` using a visitor pattern or checking if 
`RequestUtils.getTableNames` can accept a SqlNode directly, avoiding the 
redundant parse step.



##########
pinot-common/src/main/java/org/apache/pinot/common/systemtable/provider/TablesSystemTableProvider.java:
##########
@@ -0,0 +1,720 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.systemtable.provider;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.function.IntFunction;
+import javax.annotation.Nullable;
+import org.apache.helix.HelixAdmin;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.systemtable.SystemTableDataProvider;
+import 
org.apache.pinot.common.systemtable.datasource.InMemorySystemTableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.systemtable.SystemTable;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.InstanceTypeUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Basic system table exposing table-level metadata populated from the broker 
{@link TableCache}.
+ */
+@SystemTable
+public final class TablesSystemTableProvider implements 
SystemTableDataProvider {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TablesSystemTableProvider.class);
+  public static final String TABLE_NAME = "system.tables";
+  private static final String PINOT_ADMIN_CLIENT_CLASS_NAME = 
"org.apache.pinot.client.admin.PinotAdminClient";
+  private static final String SIZE_CACHE_TTL_MS_PROPERTY = 
"pinot.systemtable.tables.sizeCacheTtlMs";
+  private static final long DEFAULT_SIZE_CACHE_TTL_MS = 
Duration.ofMinutes(1).toMillis();
+  private static final long SIZE_CACHE_TTL_MS = 
getNonNegativeLongProperty(SIZE_CACHE_TTL_MS_PROPERTY,
+      DEFAULT_SIZE_CACHE_TTL_MS);
+
+  private static final String CONTROLLER_TIMEOUT_MS_PROPERTY = 
"pinot.systemtable.tables.controllerTimeoutMs";
+  private static final long DEFAULT_CONTROLLER_TIMEOUT_MS = 
Duration.ofSeconds(5).toMillis();
+  private static final long CONTROLLER_TIMEOUT_MS = 
getPositiveLongProperty(CONTROLLER_TIMEOUT_MS_PROPERTY,
+      DEFAULT_CONTROLLER_TIMEOUT_MS);
+
+  private static final String ADMIN_TRANSPORT_REQUEST_TIMEOUT_MS = 
"pinot.admin.request.timeout.ms";
+  private static final String ADMIN_TRANSPORT_SCHEME = "pinot.admin.scheme";
+
+  private static final Schema SCHEMA = new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+      .addSingleValueDimension("tableName", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("type", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("status", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("segments", FieldSpec.DataType.INT)
+      .addSingleValueDimension("totalDocs", FieldSpec.DataType.LONG)
+      .addMetric("reportedSize", FieldSpec.DataType.LONG)
+      .addMetric("estimatedSize", FieldSpec.DataType.LONG)
+      .addSingleValueDimension("brokerTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("serverTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("replicas", FieldSpec.DataType.INT)
+      .addSingleValueDimension("tableConfig", FieldSpec.DataType.STRING)
+      .build();
+
+  private static final @Nullable Class<?> PINOT_ADMIN_CLIENT_CLASS = 
loadAdminClientClass();
+
+  private final TableCache _tableCache;
+  private final @Nullable HelixAdmin _helixAdmin;
+  private final @Nullable String _clusterName;
+  private final @Nullable Function<String, TableSize> 
_tableSizeFetcherOverride;
+  private final List<String> _staticControllerUrls;
+  private final Map<String, CachedSize> _sizeCache = new ConcurrentHashMap<>();
+  private final Map<String, AutoCloseable> _adminClientCache = new 
ConcurrentHashMap<>();
+  private final AtomicBoolean _loggedSizeFetchFailure = new AtomicBoolean();
+
+  public TablesSystemTableProvider() {
+    this(null, null, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache) {
+    this(tableCache, null, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin) {
+    this(tableCache, helixAdmin, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin,
+      @Nullable String clusterName) {
+    this(tableCache, helixAdmin, clusterName, null, null);
+  }
+
+  TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin, @Nullable String clusterName,
+      @Nullable Function<String, TableSize> tableSizeFetcherOverride, 
@Nullable List<String> controllerUrls) {
+    _tableCache = tableCache;
+    _helixAdmin = helixAdmin;
+    _clusterName = clusterName;
+    _tableSizeFetcherOverride = tableSizeFetcherOverride;
+    _staticControllerUrls = controllerUrls != null ? new 
ArrayList<>(controllerUrls) : List.of();
+  }
+
+  @Override
+  public String getTableName() {
+    return TABLE_NAME;
+  }
+
+  @Override
+  public Schema getSchema() {
+    return SCHEMA;
+  }
+
+  @Override
+  public TableConfig getTableConfig() {
+    return new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+  }
+
+  @Override
+  public void close()
+      throws Exception {
+    for (Map.Entry<String, AutoCloseable> entry : 
_adminClientCache.entrySet()) {
+      try {
+        entry.getValue().close();
+      } catch (Exception e) {
+        LOGGER.debug("Failed to close admin client for {}: {}", 
entry.getKey(), e.toString());
+      }
+    }
+    _adminClientCache.clear();
+  }
+
+  @Override
+  public IndexSegment getDataSource() {
+    if (_tableCache == null) {
+      return new InMemorySystemTableSegment(TABLE_NAME, SCHEMA, 0, 
Collections.emptyMap());
+    }
+
+    Set<String> tableNamesWithType = new LinkedHashSet<>();
+    for (String tableName : _tableCache.getTableNameMap().values()) {
+      if (TableNameBuilder.getTableTypeFromTableName(tableName) != null) {
+        tableNamesWithType.add(tableName);
+      }
+    }
+    List<String> sortedTableNames = new ArrayList<>(tableNamesWithType);
+    sortedTableNames.sort(Comparator.naturalOrder());
+
+    List<String> controllerBaseUrls = getControllerBaseUrls();
+    Function<String, TableSize> sizeFetcher = getSizeFetcher();
+    class TableRow {
+      final String _tableNameWithType;
+      final TableType _tableType;
+      final String _rawTableName;
+      final @Nullable TableConfig _tableConfig;
+      private volatile @Nullable String _tableConfigJson;
+      private volatile @Nullable TableSize _tableSize;
+      private volatile boolean _tableSizeFetched;
+
+      private TableRow(String tableNameWithType, TableType tableType, String 
rawTableName,
+          @Nullable TableConfig tableConfig) {
+        _tableNameWithType = tableNameWithType;
+        _tableType = tableType;
+        _rawTableName = rawTableName;
+        _tableConfig = tableConfig;
+      }
+
+      @Nullable
+      private TableSize getTableSize() {
+        if (_tableSizeFetched) {
+          return _tableSize;
+        }
+        synchronized (this) {
+          if (_tableSizeFetched) {
+            return _tableSize;
+          }
+          _tableSize = fetchTableSize(_tableNameWithType, sizeFetcher, 
controllerBaseUrls);
+          _tableSizeFetched = true;
+          return _tableSize;
+        }
+      }
+
+      private String getStatus() {
+        if (_tableConfig != null) {
+          return "ONLINE";
+        }
+        TableSize sizeFromController = getTableSize();
+        int segments = sizeFromController != null ? 
getSegmentCount(sizeFromController, _tableType) : 0;
+        return segments > 0 ? "ONLINE" : "UNKNOWN";
+      }
+
+      private int getSegments() {
+        TableSize sizeFromController = getTableSize();
+        return sizeFromController != null ? 
getSegmentCount(sizeFromController, _tableType) : 0;
+      }
+
+      private long getTotalDocs() {
+        TableSize sizeFromController = getTableSize();
+        return sizeFromController != null ? 
TablesSystemTableProvider.this.getTotalDocs(sizeFromController, _tableType,
+            _tableNameWithType, controllerBaseUrls) : 0L;
+      }
+
+      private long getReportedSize() {
+        TableSize sizeFromController = getTableSize();
+        if (sizeFromController == null || 
sizeFromController._reportedSizeInBytes < 0) {
+          return 0L;
+        }
+        return sizeFromController._reportedSizeInBytes;
+      }
+
+      private long getEstimatedSize() {
+        TableSize sizeFromController = getTableSize();
+        if (sizeFromController == null || 
sizeFromController._estimatedSizeInBytes < 0) {
+          return 0L;
+        }
+        return sizeFromController._estimatedSizeInBytes;
+      }
+
+      private String getBrokerTenant() {
+        if (_tableConfig != null && _tableConfig.getTenantConfig() != null) {
+          String tenant = _tableConfig.getTenantConfig().getBroker();
+          return tenant != null ? tenant : "";
+        }
+        return "";
+      }
+
+      private String getServerTenant() {
+        if (_tableConfig != null && _tableConfig.getTenantConfig() != null) {
+          String tenant = _tableConfig.getTenantConfig().getServer();
+          return tenant != null ? tenant : "";
+        }
+        return "";
+      }
+
+      private int getReplicas() {
+        if (_tableConfig != null && _tableConfig.getValidationConfig() != 
null) {
+          Integer replicationNumber = 
_tableConfig.getValidationConfig().getReplicationNumber();
+          if (replicationNumber != null) {
+            return replicationNumber;
+          }
+        }
+        return 0;
+      }
+
+      private String getTableConfigJson() {
+        String cached = _tableConfigJson;
+        if (cached != null) {
+          return cached;
+        }
+        synchronized (this) {
+          cached = _tableConfigJson;
+          if (cached != null) {
+            return cached;
+          }
+          cached = "";
+          if (_tableConfig != null) {
+            try {
+              cached = JsonUtils.objectToString(_tableConfig);
+            } catch (Exception e) {
+              LOGGER.warn("Failed to serialize table config for {}: {}", 
_tableNameWithType, e.toString());
+              cached = _tableConfig.toString();
+            }
+          }
+          _tableConfigJson = cached;
+          return cached;
+        }
+      }
+    }
+
+    List<TableRow> tableRows = new ArrayList<>(sortedTableNames.size());
+    for (String tableNameWithType : sortedTableNames) {
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType == null) {
+        continue;
+      }
+      String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+      TableConfig tableConfig = _tableCache.getTableConfig(tableNameWithType);
+      tableRows.add(new TableRow(tableNameWithType, tableType, rawTableName, 
tableConfig));
+    }
+
+    Map<String, IntFunction<Object>> valueProviders = new 
java.util.HashMap<>();
+    valueProviders.put("tableName", docId -> 
tableRows.get(docId)._rawTableName);
+    valueProviders.put("type", docId -> 
tableRows.get(docId)._tableType.name());
+    valueProviders.put("status", docId -> tableRows.get(docId).getStatus());
+    valueProviders.put("segments", docId -> 
tableRows.get(docId).getSegments());
+    valueProviders.put("totalDocs", docId -> 
tableRows.get(docId).getTotalDocs());
+    valueProviders.put("reportedSize", docId -> 
tableRows.get(docId).getReportedSize());
+    valueProviders.put("estimatedSize", docId -> 
tableRows.get(docId).getEstimatedSize());
+    valueProviders.put("brokerTenant", docId -> 
tableRows.get(docId).getBrokerTenant());
+    valueProviders.put("serverTenant", docId -> 
tableRows.get(docId).getServerTenant());
+    valueProviders.put("replicas", docId -> 
tableRows.get(docId).getReplicas());
+    valueProviders.put("tableConfig", docId -> 
tableRows.get(docId).getTableConfigJson());
+
+    return new InMemorySystemTableSegment(TABLE_NAME, SCHEMA, 
tableRows.size(), valueProviders);
+  }
+
+  @Nullable
+  private TableSize fetchTableSize(String tableNameWithType,
+      @Nullable Function<String, TableSize> fetcher, List<String> 
controllerBaseUrls) {
+    boolean cacheEnabled = SIZE_CACHE_TTL_MS > 0;
+    TableSize cached = cacheEnabled ? getCachedSize(tableNameWithType) : null;
+    if (cached != null) {
+      return cached;
+    }
+    if (fetcher != null) {
+      try {
+        TableSize fetched = fetcher.apply(tableNameWithType);
+        if (fetched != null) {
+          if (cacheEnabled) {
+            cacheSize(tableNameWithType, fetched);
+          }
+          return fetched;
+        }
+      } catch (Exception e) {
+        LOGGER.warn("Table size fetcher failed for {}: {}", tableNameWithType, 
e.toString());
+      }
+    }
+    String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+    TableSize size = fetchTableSizeForName(controllerBaseUrls, rawTableName);
+    if (size == null) {
+      size = fetchTableSizeForName(controllerBaseUrls, tableNameWithType);
+      if (size == null) {
+        logSizeFetchFailure("{}: failed to fetch size for {} from controllers 
{} (tried '{}' and '{}')", TABLE_NAME,
+            tableNameWithType, controllerBaseUrls, rawTableName, 
tableNameWithType);
+      }
+    }
+    if (size != null && cacheEnabled) {
+      cacheSize(tableNameWithType, size);
+    }
+    return size;
+  }
+
+  @Nullable
+  private TableSize fetchTableSizeForName(List<String> controllerBaseUrls, 
String tableName) {
+    Class<?> clientClass = PINOT_ADMIN_CLIENT_CLASS;
+    if (clientClass == null) {
+      return null;
+    }
+    for (String baseUrl : controllerBaseUrls) {
+      try {
+        AutoCloseable adminClient = getOrCreateAdminClient(baseUrl);
+        if (adminClient == null) {
+          continue;
+        }
+
+        Object sizeNode;
+        try {
+          sizeNode = clientClass.getMethod("getTableSize", String.class, 
boolean.class, boolean.class)
+              .invoke(adminClient, tableName, true, false);
+        } catch (NoSuchMethodException e) {
+          Object tableClient = 
clientClass.getMethod("getTableClient").invoke(adminClient);
+          sizeNode = tableClient.getClass().getMethod("getTableSize", 
String.class, boolean.class, boolean.class)
+              .invoke(tableClient, tableName, true, false);
+        }
+
+        if (sizeNode == null) {
+          continue;
+        }
+
+        TableSize parsed = JsonUtils.stringToObject(sizeNode.toString(), 
TableSize.class);
+        LOGGER.debug("{}: controller size response for {} via {} -> segments 
offline={}, realtime={}, "
+                + "reportedSize={}, estimatedSize={}", TABLE_NAME, tableName, 
baseUrl,
+            parsed._offlineSegments != null && 
parsed._offlineSegments._segments != null
+                ? parsed._offlineSegments._segments.size() : 0,
+            parsed._realtimeSegments != null && 
parsed._realtimeSegments._segments != null
+                ? parsed._realtimeSegments._segments.size() : 0,
+            parsed._reportedSizeInBytes, parsed._estimatedSizeInBytes);
+        return parsed;
+      } catch (Exception e) {
+        logSizeFetchFailure("{}: error fetching table size for {} via {} using 
admin client", TABLE_NAME, tableName,
+            baseUrl, e);
+      }
+    }
+    return null;
+  }
+
+  private List<String> getControllerBaseUrls() {
+    Set<String> urls = new LinkedHashSet<>();
+    if (_helixAdmin != null) {
+      for (String controller : discoverControllersFromHelix()) {
+        String normalized = normalizeControllerUrl(controller);
+        if (normalized != null) {
+          urls.add(normalized);
+        }
+      }
+    }
+    for (String url : _staticControllerUrls) {
+      String normalized = normalizeControllerUrl(url);
+      if (normalized != null) {
+        urls.add(normalized);
+      }
+    }
+    return new ArrayList<>(urls);
+  }
+
+  private int getSegmentCount(TableSize sizeFromController, TableType 
tableType) {
+    if (tableType == TableType.OFFLINE && sizeFromController._offlineSegments 
!= null
+        && sizeFromController._offlineSegments._segments != null) {
+      return sizeFromController._offlineSegments._segments.size();
+    }
+    if (tableType == TableType.REALTIME && 
sizeFromController._realtimeSegments != null
+        && sizeFromController._realtimeSegments._segments != null) {
+      return sizeFromController._realtimeSegments._segments.size();
+    }
+    return 0;
+  }
+
+  private long getTotalDocsFromSize(TableSize sizeFromController, TableType 
tableType) {
+    if (tableType == TableType.OFFLINE && sizeFromController._offlineSegments 
!= null
+        && sizeFromController._offlineSegments._segments != null) {
+      return sizeFromController._offlineSegments._segments.values().stream()
+          .mapToLong(segmentSize -> segmentSize._totalDocs).sum();
+    }
+    if (tableType == TableType.REALTIME && 
sizeFromController._realtimeSegments != null
+        && sizeFromController._realtimeSegments._segments != null) {
+      return sizeFromController._realtimeSegments._segments.values().stream()
+          .mapToLong(segmentSize -> segmentSize._totalDocs).sum();
+    }
+    return 0;
+  }
+
+  private long getTotalDocs(TableSize sizeFromController, TableType tableType, 
String tableNameWithType,
+      List<String> controllerBaseUrls) {
+    if (tableType == TableType.OFFLINE && sizeFromController._offlineSegments 
!= null
+        && sizeFromController._offlineSegments._segments != null) {
+      if (sizeFromController._offlineTotalDocs >= 0) {
+        return sizeFromController._offlineTotalDocs;
+      }
+      long totalDocsFromSize = getTotalDocsFromSize(sizeFromController, 
tableType);
+      if (totalDocsFromSize > 0) {
+        sizeFromController._offlineTotalDocs = totalDocsFromSize;
+        return totalDocsFromSize;
+      }
+      long fetched = fetchTotalDocsFromSegmentMetadata(tableNameWithType, 
sizeFromController._offlineSegments._segments,
+          controllerBaseUrls);
+      sizeFromController._offlineTotalDocs = fetched;
+      return fetched;
+    }
+    if (tableType == TableType.REALTIME && 
sizeFromController._realtimeSegments != null
+        && sizeFromController._realtimeSegments._segments != null) {
+      if (sizeFromController._realtimeTotalDocs >= 0) {
+        return sizeFromController._realtimeTotalDocs;
+      }
+      long totalDocsFromSize = getTotalDocsFromSize(sizeFromController, 
tableType);
+      if (totalDocsFromSize > 0) {
+        sizeFromController._realtimeTotalDocs = totalDocsFromSize;
+        return totalDocsFromSize;
+      }
+      long fetched = fetchTotalDocsFromSegmentMetadata(tableNameWithType,
+          sizeFromController._realtimeSegments._segments, controllerBaseUrls);
+      sizeFromController._realtimeTotalDocs = fetched;
+      return fetched;
+    }
+    return 0;
+  }
+
+  private long fetchTotalDocsFromSegmentMetadata(String tableNameWithType, 
Map<String, SegmentSize> segments,
+      List<String> controllerBaseUrls) {
+    if (segments.isEmpty()) {
+      return 0;
+    }
+    Class<?> clientClass = PINOT_ADMIN_CLIENT_CLASS;
+    if (clientClass == null) {
+      return 0;
+    }
+    for (String baseUrl : controllerBaseUrls) {
+      try {
+        AutoCloseable adminClient = getOrCreateAdminClient(baseUrl);
+        if (adminClient == null) {
+          continue;
+        }
+        Object segmentMetadataFetcher;
+        java.lang.reflect.Method getSegmentMetadataMethod;
+        boolean returnsJsonNode;
+        try {
+          segmentMetadataFetcher = 
clientClass.getMethod("getSegmentApiClient").invoke(adminClient);
+          getSegmentMetadataMethod =
+              
segmentMetadataFetcher.getClass().getMethod("getSegmentMetadata", String.class, 
String.class);
+          returnsJsonNode = true;
+        } catch (NoSuchMethodException e) {
+          segmentMetadataFetcher = 
clientClass.getMethod("getSegmentClient").invoke(adminClient);
+          getSegmentMetadataMethod = segmentMetadataFetcher.getClass()
+              .getMethod("getSegmentMetadata", String.class, String.class, 
List.class);
+          returnsJsonNode = false;
+        }
+
+        long totalDocs = 0;
+        if (returnsJsonNode) {
+          for (String segmentName : segments.keySet()) {
+            JsonNode segmentMetadata = (JsonNode) 
getSegmentMetadataMethod.invoke(segmentMetadataFetcher,
+                tableNameWithType, segmentName);
+            totalDocs += 
segmentMetadata.path(CommonConstants.Segment.TOTAL_DOCS).asLong(0);
+          }
+        } else {
+          for (String segmentName : segments.keySet()) {
+            @SuppressWarnings("unchecked")
+            Map<String, Object> segmentMetadata = (Map<String, Object>) 
getSegmentMetadataMethod
+                .invoke(segmentMetadataFetcher, tableNameWithType, 
segmentName, List.of());
+            Object totalDocsObj = 
segmentMetadata.get(CommonConstants.Segment.TOTAL_DOCS);
+            if (totalDocsObj != null) {
+              totalDocs += Long.parseLong(totalDocsObj.toString());
+            }
+          }
+        }
+        return totalDocs;
+      } catch (Exception e) {
+        logSizeFetchFailure("{}: error fetching segment metadata for {} via 
{}", TABLE_NAME, tableNameWithType, baseUrl,
+            e);
+      }
+    }
+    return 0;
+  }
+
+  @Nullable
+  private Function<String, TableSize> getSizeFetcher() {
+    if (_tableSizeFetcherOverride != null) {
+      return _tableSizeFetcherOverride;
+    }
+    return null;
+  }
+
+  private List<String> discoverControllersFromHelix() {
+    List<String> urls = new ArrayList<>();
+    try {
+      if (_clusterName == null) {
+        LOGGER.warn("Cannot discover controllers without cluster name");
+        return List.of();
+      }
+      for (String controllerId : 
_helixAdmin.getInstancesInCluster(_clusterName)) {
+        if (!InstanceTypeUtils.isController(controllerId)) {
+          continue;
+        }
+        int firstUnderscore = controllerId.indexOf('_');
+        int lastUnderscore = controllerId.lastIndexOf('_');
+        if (firstUnderscore > 0 && lastUnderscore > firstUnderscore && 
lastUnderscore < controllerId.length() - 1) {
+          String host = controllerId.substring(firstUnderscore + 1, 
lastUnderscore);
+          String port = controllerId.substring(lastUnderscore + 1);
+          if (!host.isEmpty() && !port.isEmpty()) {
+            urls.add(host + ":" + port);
+          } else {
+            LOGGER.warn("Unable to parse controller address from instance id 
(empty host or port): {}", controllerId);
+          }
+        } else {
+          LOGGER.warn("Unable to parse controller address from instance id 
'{}' (expected 'Controller_<host>_<port>')",
+              controllerId);
+        }
+      }
+    } catch (Exception e) {
+      LOGGER.warn("Failed to discover controllers from Helix", e);
+    }
+    return urls;
+  }
+
+  @Nullable
+  private AutoCloseable getOrCreateAdminClient(String controllerBaseUrl) {
+    Class<?> clientClass = PINOT_ADMIN_CLIENT_CLASS;
+    if (clientClass == null) {
+      return null;
+    }
+    String normalized = normalizeControllerUrl(controllerBaseUrl);
+    if (normalized == null) {
+      return null;
+    }
+    return _adminClientCache.computeIfAbsent(normalized, controller -> {
+      try {
+        String controllerAddress = stripScheme(controller);
+        Properties properties = new Properties();
+        properties.setProperty(ADMIN_TRANSPORT_REQUEST_TIMEOUT_MS, 
String.valueOf(CONTROLLER_TIMEOUT_MS));
+        properties.setProperty(ADMIN_TRANSPORT_SCHEME, 
controller.startsWith("https://";) ? "https" : "http");
+        return (AutoCloseable) clientClass.getConstructor(String.class, 
Properties.class)
+            .newInstance(controllerAddress, properties);
+      } catch (Exception e) {
+        LOGGER.warn("Failed to create admin client for {}: {}", controller, 
e.toString());
+        return null;
+      }
+    });
+  }
+
+  private static String normalizeControllerUrl(@Nullable String controllerUrl) 
{
+    if (controllerUrl == null || controllerUrl.isEmpty()) {
+      return null;
+    }
+    String normalized = controllerUrl;
+    if (!normalized.startsWith("http://";) && 
!normalized.startsWith("https://";)) {
+      normalized = "http://"; + normalized;
+    }
+    if (normalized.endsWith("/")) {
+      normalized = normalized.substring(0, normalized.length() - 1);
+    }
+    return normalized;
+  }
+
+  private static String stripScheme(String controllerUrl) {
+    if (controllerUrl == null) {
+      return null;
+    }
+    if (controllerUrl.startsWith("http://";)) {
+      return controllerUrl.substring("http://".length());
+    }
+    if (controllerUrl.startsWith("https://";)) {
+      return controllerUrl.substring("https://".length());
+    }
+    return controllerUrl;
+  }
+
+  /**
+   * Minimal shape of controller table size response.
+   */
+  @JsonIgnoreProperties(ignoreUnknown = true)
+  public static final class TableSize {
+    @JsonProperty("reportedSizeInBytes")
+    public long _reportedSizeInBytes = -1;
+
+    @JsonProperty("estimatedSizeInBytes")
+    public long _estimatedSizeInBytes = -1;
+
+    @JsonProperty("offlineSegments")
+    public TableSubType _offlineSegments;
+
+    @JsonProperty("realtimeSegments")
+    public TableSubType _realtimeSegments;
+
+    public long _offlineTotalDocs = -1;
+    public long _realtimeTotalDocs = -1;
+  }
+
+  @JsonIgnoreProperties(ignoreUnknown = true)
+  public static final class TableSubType {
+    @JsonProperty("segments")
+    public Map<String, SegmentSize> _segments;
+  }
+
+  @JsonIgnoreProperties(ignoreUnknown = true)
+  public static final class SegmentSize {
+    @JsonProperty("totalDocs")
+    public long _totalDocs = 0;
+  }
+
+  private static @Nullable Class<?> loadAdminClientClass() {
+    try {
+      return Class.forName(PINOT_ADMIN_CLIENT_CLASS_NAME);
+    } catch (ClassNotFoundException e) {
+      return null;
+    }
+  }
+
+  private static long getPositiveLongProperty(String key, long defaultValue) {
+    long value = Long.getLong(key, defaultValue);
+    return value > 0 ? value : defaultValue;
+  }
+
+  private static long getNonNegativeLongProperty(String key, long 
defaultValue) {
+    long value = Long.getLong(key, defaultValue);
+    return value >= 0 ? value : defaultValue;
+  }
+
+  private void logSizeFetchFailure(String message, Object... args) {
+    if (_loggedSizeFetchFailure.compareAndSet(false, true)) {
+      LOGGER.warn(message, args);
+    } else {
+      LOGGER.debug(message, args);

Review Comment:
   This method suppresses repeated warnings by only logging at WARN level once, 
then switching to DEBUG. However, it uses a single boolean flag that never 
resets, so if failures occur intermittently (e.g., controller down then up), 
only the first failure is visible at WARN level. Consider adding a time-based 
reset mechanism (e.g., log at WARN level once per hour) or tracking per-table 
failures separately to improve observability.



##########
pinot-common/src/main/java/org/apache/pinot/common/systemtable/SystemTableRegistry.java:
##########
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.systemtable;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.IdentityHashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.helix.HelixAdmin;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.spi.systemtable.SystemTable;
+import org.apache.pinot.spi.utils.PinotReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Registry to hold and lifecycle-manage system table data providers.
+ */
+public final class SystemTableRegistry {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SystemTableRegistry.class);
+
+  // Providers are registered once at broker startup.
+  private static final Map<String, SystemTableDataProvider> PROVIDERS = new 
HashMap<>();
+
+  private SystemTableRegistry() {
+    throw new UnsupportedOperationException("This is a utility class and 
cannot be instantiated");
+  }
+
+  public static void register(SystemTableDataProvider provider) {
+    synchronized (PROVIDERS) {
+      PROVIDERS.put(normalize(provider.getTableName()), provider);
+    }
+  }
+
+  @Nullable
+  public static SystemTableDataProvider get(String tableName) {
+    synchronized (PROVIDERS) {
+      return PROVIDERS.get(normalize(tableName));
+    }
+  }
+
+  public static boolean isRegistered(String tableName) {
+    synchronized (PROVIDERS) {
+      return PROVIDERS.containsKey(normalize(tableName));
+    }
+  }
+
+  public static Collection<SystemTableDataProvider> getProviders() {
+    synchronized (PROVIDERS) {
+      return Collections.unmodifiableCollection(new 
java.util.ArrayList<>(PROVIDERS.values()));
+    }
+  }
+
+  /**
+   * Discover and register providers marked with {@link SystemTable} using the 
available dependencies.
+   * Follows the ScalarFunction pattern: any class annotated with @SystemTable 
under a "*.systemtable.*" package
+   * will be discovered via reflection and registered.
+   */
+  public static void registerAnnotatedProviders(TableCache tableCache, 
HelixAdmin helixAdmin, String clusterName) {
+    Set<Class<?>> classes =
+        
PinotReflectionUtils.getClassesThroughReflection(".*\\.systemtable\\..*", 
SystemTable.class);
+    for (Class<?> clazz : classes) {
+      if (!SystemTableDataProvider.class.isAssignableFrom(clazz)) {
+        continue;
+      }
+      SystemTableDataProvider provider = instantiateProvider(
+          clazz.asSubclass(SystemTableDataProvider.class), tableCache, 
helixAdmin, clusterName);
+      if (provider == null) {
+        continue;
+      }
+      if (isRegistered(provider.getTableName())) {
+        continue;
+      }
+      LOGGER.info("Registering system table provider: {}", 
provider.getTableName());
+      register(provider);
+    }
+  }
+
+  public static void close()
+      throws Exception {
+    Exception firstException = null;
+    // Snapshot providers to avoid concurrent modifications and to close each 
provider at most once.
+    Map<SystemTableDataProvider, Boolean> providersToClose = new 
IdentityHashMap<>();
+    synchronized (PROVIDERS) {
+      for (SystemTableDataProvider provider : PROVIDERS.values()) {
+        providersToClose.put(provider, Boolean.TRUE);
+      }
+    }
+    try {
+      for (SystemTableDataProvider provider : providersToClose.keySet()) {
+        try {
+          provider.close();
+        } catch (Exception e) {
+          if (firstException == null) {
+            firstException = e;
+          } else {
+            firstException.addSuppressed(e);
+          }
+        }
+      }
+    } finally {
+      synchronized (PROVIDERS) {
+        PROVIDERS.clear();
+      }
+    }
+    if (firstException != null) {
+      throw firstException;
+    }
+  }
+
+  /**
+   * Initialize and register all annotated system table providers.
+   */
+  public static void init(TableCache tableCache, HelixAdmin helixAdmin, String 
clusterName) {
+    registerAnnotatedProviders(tableCache, helixAdmin, clusterName);
+  }
+
+  private static String normalize(String tableName) {
+    return tableName.toLowerCase(Locale.ROOT);
+  }
+
+  private static @Nullable SystemTableDataProvider instantiateProvider(Class<? 
extends SystemTableDataProvider> clazz,
+      TableCache tableCache, HelixAdmin helixAdmin, String clusterName) {
+    try {
+      // Prefer the most specific constructor available.

Review Comment:
   The constructor fallback logic (lines 147-163) tries multiple constructor 
signatures but doesn't document which constructors are expected or why. Add a 
comment explaining that providers can implement any of these constructor 
signatures: `(TableCache, HelixAdmin, String)`, `(TableCache, HelixAdmin)`, 
`(TableCache)`, or no-arg constructor, and that the most specific available 
constructor is selected.
   ```suggestion
         // System table providers may declare any of the following 
constructors, in decreasing order of specificity:
         //   (TableCache, HelixAdmin, String), (TableCache, HelixAdmin), 
(TableCache), or a no-arg constructor.
         // The registry will select the most specific available constructor so 
that providers can opt in to the
         // dependencies they require without forcing all implementations to 
depend on Helix or cluster metadata.
   ```



##########
pinot-common/src/main/java/org/apache/pinot/common/systemtable/provider/TablesSystemTableProvider.java:
##########
@@ -0,0 +1,720 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.systemtable.provider;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.function.IntFunction;
+import javax.annotation.Nullable;
+import org.apache.helix.HelixAdmin;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.systemtable.SystemTableDataProvider;
+import 
org.apache.pinot.common.systemtable.datasource.InMemorySystemTableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.systemtable.SystemTable;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.InstanceTypeUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Basic system table exposing table-level metadata populated from the broker 
{@link TableCache}.
+ */
+@SystemTable
+public final class TablesSystemTableProvider implements 
SystemTableDataProvider {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TablesSystemTableProvider.class);
+  public static final String TABLE_NAME = "system.tables";
+  private static final String PINOT_ADMIN_CLIENT_CLASS_NAME = 
"org.apache.pinot.client.admin.PinotAdminClient";
+  private static final String SIZE_CACHE_TTL_MS_PROPERTY = 
"pinot.systemtable.tables.sizeCacheTtlMs";
+  private static final long DEFAULT_SIZE_CACHE_TTL_MS = 
Duration.ofMinutes(1).toMillis();
+  private static final long SIZE_CACHE_TTL_MS = 
getNonNegativeLongProperty(SIZE_CACHE_TTL_MS_PROPERTY,
+      DEFAULT_SIZE_CACHE_TTL_MS);
+
+  private static final String CONTROLLER_TIMEOUT_MS_PROPERTY = 
"pinot.systemtable.tables.controllerTimeoutMs";
+  private static final long DEFAULT_CONTROLLER_TIMEOUT_MS = 
Duration.ofSeconds(5).toMillis();
+  private static final long CONTROLLER_TIMEOUT_MS = 
getPositiveLongProperty(CONTROLLER_TIMEOUT_MS_PROPERTY,
+      DEFAULT_CONTROLLER_TIMEOUT_MS);
+
+  private static final String ADMIN_TRANSPORT_REQUEST_TIMEOUT_MS = 
"pinot.admin.request.timeout.ms";
+  private static final String ADMIN_TRANSPORT_SCHEME = "pinot.admin.scheme";
+
+  private static final Schema SCHEMA = new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+      .addSingleValueDimension("tableName", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("type", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("status", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("segments", FieldSpec.DataType.INT)
+      .addSingleValueDimension("totalDocs", FieldSpec.DataType.LONG)
+      .addMetric("reportedSize", FieldSpec.DataType.LONG)
+      .addMetric("estimatedSize", FieldSpec.DataType.LONG)
+      .addSingleValueDimension("brokerTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("serverTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("replicas", FieldSpec.DataType.INT)
+      .addSingleValueDimension("tableConfig", FieldSpec.DataType.STRING)
+      .build();
+
+  private static final @Nullable Class<?> PINOT_ADMIN_CLIENT_CLASS = 
loadAdminClientClass();
+
+  private final TableCache _tableCache;
+  private final @Nullable HelixAdmin _helixAdmin;
+  private final @Nullable String _clusterName;
+  private final @Nullable Function<String, TableSize> 
_tableSizeFetcherOverride;
+  private final List<String> _staticControllerUrls;
+  private final Map<String, CachedSize> _sizeCache = new ConcurrentHashMap<>();
+  private final Map<String, AutoCloseable> _adminClientCache = new 
ConcurrentHashMap<>();
+  private final AtomicBoolean _loggedSizeFetchFailure = new AtomicBoolean();
+
+  public TablesSystemTableProvider() {
+    this(null, null, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache) {
+    this(tableCache, null, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin) {
+    this(tableCache, helixAdmin, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin,
+      @Nullable String clusterName) {
+    this(tableCache, helixAdmin, clusterName, null, null);
+  }
+
+  TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin, @Nullable String clusterName,
+      @Nullable Function<String, TableSize> tableSizeFetcherOverride, 
@Nullable List<String> controllerUrls) {
+    _tableCache = tableCache;
+    _helixAdmin = helixAdmin;
+    _clusterName = clusterName;
+    _tableSizeFetcherOverride = tableSizeFetcherOverride;
+    _staticControllerUrls = controllerUrls != null ? new 
ArrayList<>(controllerUrls) : List.of();
+  }
+
+  @Override
+  public String getTableName() {
+    return TABLE_NAME;
+  }
+
+  @Override
+  public Schema getSchema() {
+    return SCHEMA;
+  }
+
+  @Override
+  public TableConfig getTableConfig() {
+    return new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+  }
+
+  @Override
+  public void close()
+      throws Exception {
+    for (Map.Entry<String, AutoCloseable> entry : 
_adminClientCache.entrySet()) {
+      try {
+        entry.getValue().close();
+      } catch (Exception e) {
+        LOGGER.debug("Failed to close admin client for {}: {}", 
entry.getKey(), e.toString());
+      }
+    }
+    _adminClientCache.clear();
+  }
+
+  @Override
+  public IndexSegment getDataSource() {
+    if (_tableCache == null) {
+      return new InMemorySystemTableSegment(TABLE_NAME, SCHEMA, 0, 
Collections.emptyMap());
+    }
+
+    Set<String> tableNamesWithType = new LinkedHashSet<>();
+    for (String tableName : _tableCache.getTableNameMap().values()) {
+      if (TableNameBuilder.getTableTypeFromTableName(tableName) != null) {
+        tableNamesWithType.add(tableName);
+      }
+    }
+    List<String> sortedTableNames = new ArrayList<>(tableNamesWithType);
+    sortedTableNames.sort(Comparator.naturalOrder());
+
+    List<String> controllerBaseUrls = getControllerBaseUrls();
+    Function<String, TableSize> sizeFetcher = getSizeFetcher();
+    class TableRow {
+      final String _tableNameWithType;
+      final TableType _tableType;
+      final String _rawTableName;
+      final @Nullable TableConfig _tableConfig;
+      private volatile @Nullable String _tableConfigJson;
+      private volatile @Nullable TableSize _tableSize;
+      private volatile boolean _tableSizeFetched;
+
+      private TableRow(String tableNameWithType, TableType tableType, String 
rawTableName,
+          @Nullable TableConfig tableConfig) {
+        _tableNameWithType = tableNameWithType;
+        _tableType = tableType;
+        _rawTableName = rawTableName;
+        _tableConfig = tableConfig;
+      }
+
+      @Nullable
+      private TableSize getTableSize() {
+        if (_tableSizeFetched) {
+          return _tableSize;
+        }
+        synchronized (this) {
+          if (_tableSizeFetched) {
+            return _tableSize;
+          }
+          _tableSize = fetchTableSize(_tableNameWithType, sizeFetcher, 
controllerBaseUrls);
+          _tableSizeFetched = true;
+          return _tableSize;
+        }
+      }
+
+      private String getStatus() {
+        if (_tableConfig != null) {
+          return "ONLINE";
+        }
+        TableSize sizeFromController = getTableSize();
+        int segments = sizeFromController != null ? 
getSegmentCount(sizeFromController, _tableType) : 0;
+        return segments > 0 ? "ONLINE" : "UNKNOWN";
+      }
+
+      private int getSegments() {
+        TableSize sizeFromController = getTableSize();
+        return sizeFromController != null ? 
getSegmentCount(sizeFromController, _tableType) : 0;
+      }
+
+      private long getTotalDocs() {
+        TableSize sizeFromController = getTableSize();
+        return sizeFromController != null ? 
TablesSystemTableProvider.this.getTotalDocs(sizeFromController, _tableType,
+            _tableNameWithType, controllerBaseUrls) : 0L;
+      }
+
+      private long getReportedSize() {
+        TableSize sizeFromController = getTableSize();
+        if (sizeFromController == null || 
sizeFromController._reportedSizeInBytes < 0) {
+          return 0L;
+        }
+        return sizeFromController._reportedSizeInBytes;
+      }
+
+      private long getEstimatedSize() {
+        TableSize sizeFromController = getTableSize();
+        if (sizeFromController == null || 
sizeFromController._estimatedSizeInBytes < 0) {
+          return 0L;
+        }
+        return sizeFromController._estimatedSizeInBytes;
+      }
+
+      private String getBrokerTenant() {
+        if (_tableConfig != null && _tableConfig.getTenantConfig() != null) {
+          String tenant = _tableConfig.getTenantConfig().getBroker();
+          return tenant != null ? tenant : "";
+        }
+        return "";
+      }
+
+      private String getServerTenant() {
+        if (_tableConfig != null && _tableConfig.getTenantConfig() != null) {
+          String tenant = _tableConfig.getTenantConfig().getServer();
+          return tenant != null ? tenant : "";
+        }
+        return "";
+      }
+
+      private int getReplicas() {
+        if (_tableConfig != null && _tableConfig.getValidationConfig() != 
null) {
+          Integer replicationNumber = 
_tableConfig.getValidationConfig().getReplicationNumber();
+          if (replicationNumber != null) {
+            return replicationNumber;
+          }
+        }
+        return 0;
+      }
+
+      private String getTableConfigJson() {
+        String cached = _tableConfigJson;
+        if (cached != null) {
+          return cached;
+        }
+        synchronized (this) {
+          cached = _tableConfigJson;
+          if (cached != null) {
+            return cached;
+          }
+          cached = "";
+          if (_tableConfig != null) {
+            try {
+              cached = JsonUtils.objectToString(_tableConfig);
+            } catch (Exception e) {
+              LOGGER.warn("Failed to serialize table config for {}: {}", 
_tableNameWithType, e.toString());
+              cached = _tableConfig.toString();
+            }
+          }
+          _tableConfigJson = cached;
+          return cached;
+        }
+      }
+    }
+
+    List<TableRow> tableRows = new ArrayList<>(sortedTableNames.size());
+    for (String tableNameWithType : sortedTableNames) {
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType == null) {
+        continue;
+      }
+      String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+      TableConfig tableConfig = _tableCache.getTableConfig(tableNameWithType);
+      tableRows.add(new TableRow(tableNameWithType, tableType, rawTableName, 
tableConfig));
+    }
+
+    Map<String, IntFunction<Object>> valueProviders = new 
java.util.HashMap<>();
+    valueProviders.put("tableName", docId -> 
tableRows.get(docId)._rawTableName);
+    valueProviders.put("type", docId -> 
tableRows.get(docId)._tableType.name());
+    valueProviders.put("status", docId -> tableRows.get(docId).getStatus());
+    valueProviders.put("segments", docId -> 
tableRows.get(docId).getSegments());
+    valueProviders.put("totalDocs", docId -> 
tableRows.get(docId).getTotalDocs());
+    valueProviders.put("reportedSize", docId -> 
tableRows.get(docId).getReportedSize());
+    valueProviders.put("estimatedSize", docId -> 
tableRows.get(docId).getEstimatedSize());
+    valueProviders.put("brokerTenant", docId -> 
tableRows.get(docId).getBrokerTenant());
+    valueProviders.put("serverTenant", docId -> 
tableRows.get(docId).getServerTenant());
+    valueProviders.put("replicas", docId -> 
tableRows.get(docId).getReplicas());
+    valueProviders.put("tableConfig", docId -> 
tableRows.get(docId).getTableConfigJson());
+
+    return new InMemorySystemTableSegment(TABLE_NAME, SCHEMA, 
tableRows.size(), valueProviders);
+  }
+
+  @Nullable
+  private TableSize fetchTableSize(String tableNameWithType,
+      @Nullable Function<String, TableSize> fetcher, List<String> 
controllerBaseUrls) {
+    boolean cacheEnabled = SIZE_CACHE_TTL_MS > 0;
+    TableSize cached = cacheEnabled ? getCachedSize(tableNameWithType) : null;
+    if (cached != null) {
+      return cached;
+    }
+    if (fetcher != null) {
+      try {
+        TableSize fetched = fetcher.apply(tableNameWithType);
+        if (fetched != null) {
+          if (cacheEnabled) {
+            cacheSize(tableNameWithType, fetched);
+          }
+          return fetched;
+        }
+      } catch (Exception e) {
+        LOGGER.warn("Table size fetcher failed for {}: {}", tableNameWithType, 
e.toString());
+      }
+    }
+    String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+    TableSize size = fetchTableSizeForName(controllerBaseUrls, rawTableName);
+    if (size == null) {
+      size = fetchTableSizeForName(controllerBaseUrls, tableNameWithType);
+      if (size == null) {
+        logSizeFetchFailure("{}: failed to fetch size for {} from controllers 
{} (tried '{}' and '{}')", TABLE_NAME,
+            tableNameWithType, controllerBaseUrls, rawTableName, 
tableNameWithType);
+      }
+    }
+    if (size != null && cacheEnabled) {
+      cacheSize(tableNameWithType, size);
+    }
+    return size;
+  }
+
+  @Nullable
+  private TableSize fetchTableSizeForName(List<String> controllerBaseUrls, 
String tableName) {
+    Class<?> clientClass = PINOT_ADMIN_CLIENT_CLASS;
+    if (clientClass == null) {
+      return null;
+    }
+    for (String baseUrl : controllerBaseUrls) {
+      try {
+        AutoCloseable adminClient = getOrCreateAdminClient(baseUrl);
+        if (adminClient == null) {
+          continue;
+        }
+
+        Object sizeNode;
+        try {
+          sizeNode = clientClass.getMethod("getTableSize", String.class, 
boolean.class, boolean.class)
+              .invoke(adminClient, tableName, true, false);
+        } catch (NoSuchMethodException e) {
+          Object tableClient = 
clientClass.getMethod("getTableClient").invoke(adminClient);
+          sizeNode = tableClient.getClass().getMethod("getTableSize", 
String.class, boolean.class, boolean.class)
+              .invoke(tableClient, tableName, true, false);
+        }
+
+        if (sizeNode == null) {
+          continue;
+        }
+
+        TableSize parsed = JsonUtils.stringToObject(sizeNode.toString(), 
TableSize.class);
+        LOGGER.debug("{}: controller size response for {} via {} -> segments 
offline={}, realtime={}, "
+                + "reportedSize={}, estimatedSize={}", TABLE_NAME, tableName, 
baseUrl,
+            parsed._offlineSegments != null && 
parsed._offlineSegments._segments != null
+                ? parsed._offlineSegments._segments.size() : 0,
+            parsed._realtimeSegments != null && 
parsed._realtimeSegments._segments != null
+                ? parsed._realtimeSegments._segments.size() : 0,
+            parsed._reportedSizeInBytes, parsed._estimatedSizeInBytes);
+        return parsed;
+      } catch (Exception e) {
+        logSizeFetchFailure("{}: error fetching table size for {} via {} using 
admin client", TABLE_NAME, tableName,
+            baseUrl, e);
+      }
+    }
+    return null;
+  }
+
+  private List<String> getControllerBaseUrls() {
+    Set<String> urls = new LinkedHashSet<>();
+    if (_helixAdmin != null) {
+      for (String controller : discoverControllersFromHelix()) {
+        String normalized = normalizeControllerUrl(controller);
+        if (normalized != null) {
+          urls.add(normalized);
+        }
+      }
+    }
+    for (String url : _staticControllerUrls) {
+      String normalized = normalizeControllerUrl(url);
+      if (normalized != null) {
+        urls.add(normalized);
+      }
+    }
+    return new ArrayList<>(urls);
+  }
+
+  private int getSegmentCount(TableSize sizeFromController, TableType 
tableType) {
+    if (tableType == TableType.OFFLINE && sizeFromController._offlineSegments 
!= null
+        && sizeFromController._offlineSegments._segments != null) {
+      return sizeFromController._offlineSegments._segments.size();
+    }
+    if (tableType == TableType.REALTIME && 
sizeFromController._realtimeSegments != null
+        && sizeFromController._realtimeSegments._segments != null) {
+      return sizeFromController._realtimeSegments._segments.size();
+    }
+    return 0;
+  }
+
+  private long getTotalDocsFromSize(TableSize sizeFromController, TableType 
tableType) {
+    if (tableType == TableType.OFFLINE && sizeFromController._offlineSegments 
!= null
+        && sizeFromController._offlineSegments._segments != null) {
+      return sizeFromController._offlineSegments._segments.values().stream()
+          .mapToLong(segmentSize -> segmentSize._totalDocs).sum();
+    }
+    if (tableType == TableType.REALTIME && 
sizeFromController._realtimeSegments != null
+        && sizeFromController._realtimeSegments._segments != null) {
+      return sizeFromController._realtimeSegments._segments.values().stream()
+          .mapToLong(segmentSize -> segmentSize._totalDocs).sum();
+    }
+    return 0;
+  }
+
+  private long getTotalDocs(TableSize sizeFromController, TableType tableType, 
String tableNameWithType,
+      List<String> controllerBaseUrls) {
+    if (tableType == TableType.OFFLINE && sizeFromController._offlineSegments 
!= null
+        && sizeFromController._offlineSegments._segments != null) {
+      if (sizeFromController._offlineTotalDocs >= 0) {
+        return sizeFromController._offlineTotalDocs;
+      }
+      long totalDocsFromSize = getTotalDocsFromSize(sizeFromController, 
tableType);
+      if (totalDocsFromSize > 0) {
+        sizeFromController._offlineTotalDocs = totalDocsFromSize;
+        return totalDocsFromSize;
+      }
+      long fetched = fetchTotalDocsFromSegmentMetadata(tableNameWithType, 
sizeFromController._offlineSegments._segments,
+          controllerBaseUrls);
+      sizeFromController._offlineTotalDocs = fetched;
+      return fetched;
+    }
+    if (tableType == TableType.REALTIME && 
sizeFromController._realtimeSegments != null
+        && sizeFromController._realtimeSegments._segments != null) {
+      if (sizeFromController._realtimeTotalDocs >= 0) {
+        return sizeFromController._realtimeTotalDocs;
+      }
+      long totalDocsFromSize = getTotalDocsFromSize(sizeFromController, 
tableType);
+      if (totalDocsFromSize > 0) {
+        sizeFromController._realtimeTotalDocs = totalDocsFromSize;
+        return totalDocsFromSize;
+      }
+      long fetched = fetchTotalDocsFromSegmentMetadata(tableNameWithType,
+          sizeFromController._realtimeSegments._segments, controllerBaseUrls);
+      sizeFromController._realtimeTotalDocs = fetched;
+      return fetched;

Review Comment:
   The `TableSize` object fields `_offlineTotalDocs` and `_realtimeTotalDocs` 
are mutated across multiple method calls (lines 454, 459, 469, 474) without 
synchronization, but the `TableSize` object may be shared via the `_sizeCache` 
which is a `ConcurrentHashMap`. This can lead to race conditions where multiple 
threads reading/writing these cached values see inconsistent state. Consider 
using `AtomicLong` for these fields or ensuring synchronized access when 
mutating cached objects.
   ```suggestion
         long totalDocsFromSize = getTotalDocsFromSize(sizeFromController, 
tableType);
         if (totalDocsFromSize > 0) {
           return totalDocsFromSize;
         }
         return fetchTotalDocsFromSegmentMetadata(tableNameWithType, 
sizeFromController._offlineSegments._segments,
             controllerBaseUrls);
       }
       if (tableType == TableType.REALTIME && 
sizeFromController._realtimeSegments != null
           && sizeFromController._realtimeSegments._segments != null) {
         long totalDocsFromSize = getTotalDocsFromSize(sizeFromController, 
tableType);
         if (totalDocsFromSize > 0) {
           return totalDocsFromSize;
         }
         return fetchTotalDocsFromSegmentMetadata(tableNameWithType,
             sizeFromController._realtimeSegments._segments, 
controllerBaseUrls);
   ```



##########
pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotSegmentApiClient.java:
##########
@@ -52,20 +54,25 @@ public static class QueryParameters {
   private static final String SELECT_PATH = "/select";
   private static final String METADATA_PATH = "/metadata";
 
+  private static String encodePathSegment(String segment) {
+    return URLEncoder.encode(segment, StandardCharsets.UTF_8);

Review Comment:
   The parameter name `segment` is ambiguous—it could refer to either a URL 
path segment or a Pinot segment name. Consider renaming to `pathSegment` or 
`value` to clarify that this method encodes arbitrary path components, not 
specifically Pinot segments.
   ```suggestion
     private static String encodePathSegment(String pathSegment) {
       return URLEncoder.encode(pathSegment, StandardCharsets.UTF_8);
   ```



##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SystemTableBrokerRequestHandler.java:
##########
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.requesthandler;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import javax.annotation.Nullable;
+import javax.ws.rs.core.HttpHeaders;
+import org.apache.hc.client5.http.io.HttpClientConnectionManager;
+import org.apache.pinot.broker.api.AccessControl;
+import org.apache.pinot.broker.broker.AccessControlFactory;
+import org.apache.pinot.broker.querylog.QueryLogger;
+import org.apache.pinot.broker.queryquota.QueryQuotaManager;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.common.request.QuerySource;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.systemtable.SystemTableDataProvider;
+import org.apache.pinot.common.systemtable.SystemTableRegistry;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
+import org.apache.pinot.common.utils.request.RequestUtils;
+import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
+import org.apache.pinot.core.plan.Plan;
+import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
+import org.apache.pinot.core.plan.maker.PlanMaker;
+import org.apache.pinot.core.query.reduce.BrokerReduceService;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import 
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import org.apache.pinot.core.routing.RoutingManager;
+import org.apache.pinot.core.transport.ServerRoutingInstance;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.SegmentContext;
+import org.apache.pinot.spi.accounting.ThreadAccountant;
+import org.apache.pinot.spi.auth.AuthorizationResult;
+import org.apache.pinot.spi.auth.broker.RequesterIdentity;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.exception.BadQueryRequestException;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.query.QueryExecutionContext;
+import org.apache.pinot.spi.query.QueryThreadContext;
+import org.apache.pinot.spi.trace.RequestContext;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.sql.parsers.CalciteSqlParser;
+import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Broker request handler for system tables (handled entirely on the broker).
+ */
+public class SystemTableBrokerRequestHandler extends BaseBrokerRequestHandler {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SystemTableBrokerRequestHandler.class);
+
+  private final BrokerReduceService _brokerReduceService;
+  private final PlanMaker _planMaker;
+  private final ExecutorService _executorService;
+
+  public SystemTableBrokerRequestHandler(PinotConfiguration config, String 
brokerId,
+      BrokerRequestIdGenerator requestIdGenerator, RoutingManager 
routingManager,
+      AccessControlFactory accessControlFactory, QueryQuotaManager 
queryQuotaManager, TableCache tableCache,
+      ThreadAccountant threadAccountant) {
+    super(config, brokerId, requestIdGenerator, routingManager, 
accessControlFactory, queryQuotaManager, tableCache,
+        threadAccountant);
+    _brokerReduceService = new BrokerReduceService(_config);
+    _planMaker = new InstancePlanMakerImplV2();
+    _planMaker.init(_config);
+    _executorService = 
QueryThreadContext.contextAwareExecutorService(Executors.newFixedThreadPool(2));

Review Comment:
   The thread pool size is hardcoded to 2. This may be insufficient under load 
or wasteful for low-traffic deployments. Consider making this configurable via 
`PinotConfiguration` with a reasonable default, similar to how other executor 
pools are configured in Pinot.



##########
pinot-common/src/main/java/org/apache/pinot/common/systemtable/provider/TablesSystemTableProvider.java:
##########
@@ -0,0 +1,720 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.systemtable.provider;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.function.IntFunction;
+import javax.annotation.Nullable;
+import org.apache.helix.HelixAdmin;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.systemtable.SystemTableDataProvider;
+import 
org.apache.pinot.common.systemtable.datasource.InMemorySystemTableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.systemtable.SystemTable;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.InstanceTypeUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Basic system table exposing table-level metadata populated from the broker 
{@link TableCache}.
+ */
+@SystemTable
+public final class TablesSystemTableProvider implements 
SystemTableDataProvider {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TablesSystemTableProvider.class);
+  public static final String TABLE_NAME = "system.tables";
+  private static final String PINOT_ADMIN_CLIENT_CLASS_NAME = 
"org.apache.pinot.client.admin.PinotAdminClient";
+  private static final String SIZE_CACHE_TTL_MS_PROPERTY = 
"pinot.systemtable.tables.sizeCacheTtlMs";
+  private static final long DEFAULT_SIZE_CACHE_TTL_MS = 
Duration.ofMinutes(1).toMillis();
+  private static final long SIZE_CACHE_TTL_MS = 
getNonNegativeLongProperty(SIZE_CACHE_TTL_MS_PROPERTY,
+      DEFAULT_SIZE_CACHE_TTL_MS);
+
+  private static final String CONTROLLER_TIMEOUT_MS_PROPERTY = 
"pinot.systemtable.tables.controllerTimeoutMs";
+  private static final long DEFAULT_CONTROLLER_TIMEOUT_MS = 
Duration.ofSeconds(5).toMillis();
+  private static final long CONTROLLER_TIMEOUT_MS = 
getPositiveLongProperty(CONTROLLER_TIMEOUT_MS_PROPERTY,
+      DEFAULT_CONTROLLER_TIMEOUT_MS);
+
+  private static final String ADMIN_TRANSPORT_REQUEST_TIMEOUT_MS = 
"pinot.admin.request.timeout.ms";
+  private static final String ADMIN_TRANSPORT_SCHEME = "pinot.admin.scheme";
+
+  private static final Schema SCHEMA = new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+      .addSingleValueDimension("tableName", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("type", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("status", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("segments", FieldSpec.DataType.INT)
+      .addSingleValueDimension("totalDocs", FieldSpec.DataType.LONG)
+      .addMetric("reportedSize", FieldSpec.DataType.LONG)
+      .addMetric("estimatedSize", FieldSpec.DataType.LONG)
+      .addSingleValueDimension("brokerTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("serverTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("replicas", FieldSpec.DataType.INT)
+      .addSingleValueDimension("tableConfig", FieldSpec.DataType.STRING)
+      .build();
+
+  private static final @Nullable Class<?> PINOT_ADMIN_CLIENT_CLASS = 
loadAdminClientClass();
+
+  private final TableCache _tableCache;
+  private final @Nullable HelixAdmin _helixAdmin;
+  private final @Nullable String _clusterName;
+  private final @Nullable Function<String, TableSize> 
_tableSizeFetcherOverride;
+  private final List<String> _staticControllerUrls;
+  private final Map<String, CachedSize> _sizeCache = new ConcurrentHashMap<>();
+  private final Map<String, AutoCloseable> _adminClientCache = new 
ConcurrentHashMap<>();
+  private final AtomicBoolean _loggedSizeFetchFailure = new AtomicBoolean();
+
+  public TablesSystemTableProvider() {
+    this(null, null, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache) {
+    this(tableCache, null, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin) {
+    this(tableCache, helixAdmin, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin,
+      @Nullable String clusterName) {
+    this(tableCache, helixAdmin, clusterName, null, null);
+  }
+
+  TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin, @Nullable String clusterName,
+      @Nullable Function<String, TableSize> tableSizeFetcherOverride, 
@Nullable List<String> controllerUrls) {
+    _tableCache = tableCache;
+    _helixAdmin = helixAdmin;
+    _clusterName = clusterName;
+    _tableSizeFetcherOverride = tableSizeFetcherOverride;
+    _staticControllerUrls = controllerUrls != null ? new 
ArrayList<>(controllerUrls) : List.of();
+  }
+
+  @Override
+  public String getTableName() {
+    return TABLE_NAME;
+  }
+
+  @Override
+  public Schema getSchema() {
+    return SCHEMA;
+  }
+
+  @Override
+  public TableConfig getTableConfig() {
+    return new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+  }
+
+  @Override
+  public void close()
+      throws Exception {
+    for (Map.Entry<String, AutoCloseable> entry : 
_adminClientCache.entrySet()) {
+      try {
+        entry.getValue().close();
+      } catch (Exception e) {
+        LOGGER.debug("Failed to close admin client for {}: {}", 
entry.getKey(), e.toString());
+      }
+    }
+    _adminClientCache.clear();
+  }
+
+  @Override
+  public IndexSegment getDataSource() {
+    if (_tableCache == null) {
+      return new InMemorySystemTableSegment(TABLE_NAME, SCHEMA, 0, 
Collections.emptyMap());
+    }
+
+    Set<String> tableNamesWithType = new LinkedHashSet<>();
+    for (String tableName : _tableCache.getTableNameMap().values()) {
+      if (TableNameBuilder.getTableTypeFromTableName(tableName) != null) {
+        tableNamesWithType.add(tableName);
+      }
+    }
+    List<String> sortedTableNames = new ArrayList<>(tableNamesWithType);
+    sortedTableNames.sort(Comparator.naturalOrder());
+
+    List<String> controllerBaseUrls = getControllerBaseUrls();
+    Function<String, TableSize> sizeFetcher = getSizeFetcher();
+    class TableRow {
+      final String _tableNameWithType;
+      final TableType _tableType;
+      final String _rawTableName;
+      final @Nullable TableConfig _tableConfig;
+      private volatile @Nullable String _tableConfigJson;
+      private volatile @Nullable TableSize _tableSize;
+      private volatile boolean _tableSizeFetched;
+
+      private TableRow(String tableNameWithType, TableType tableType, String 
rawTableName,
+          @Nullable TableConfig tableConfig) {
+        _tableNameWithType = tableNameWithType;
+        _tableType = tableType;
+        _rawTableName = rawTableName;
+        _tableConfig = tableConfig;
+      }
+
+      @Nullable
+      private TableSize getTableSize() {
+        if (_tableSizeFetched) {
+          return _tableSize;
+        }
+        synchronized (this) {
+          if (_tableSizeFetched) {
+            return _tableSize;
+          }
+          _tableSize = fetchTableSize(_tableNameWithType, sizeFetcher, 
controllerBaseUrls);
+          _tableSizeFetched = true;
+          return _tableSize;
+        }
+      }
+
+      private String getStatus() {
+        if (_tableConfig != null) {
+          return "ONLINE";
+        }
+        TableSize sizeFromController = getTableSize();
+        int segments = sizeFromController != null ? 
getSegmentCount(sizeFromController, _tableType) : 0;
+        return segments > 0 ? "ONLINE" : "UNKNOWN";
+      }
+
+      private int getSegments() {
+        TableSize sizeFromController = getTableSize();
+        return sizeFromController != null ? 
getSegmentCount(sizeFromController, _tableType) : 0;
+      }
+
+      private long getTotalDocs() {
+        TableSize sizeFromController = getTableSize();
+        return sizeFromController != null ? 
TablesSystemTableProvider.this.getTotalDocs(sizeFromController, _tableType,
+            _tableNameWithType, controllerBaseUrls) : 0L;
+      }
+
+      private long getReportedSize() {
+        TableSize sizeFromController = getTableSize();
+        if (sizeFromController == null || 
sizeFromController._reportedSizeInBytes < 0) {
+          return 0L;
+        }
+        return sizeFromController._reportedSizeInBytes;
+      }
+
+      private long getEstimatedSize() {
+        TableSize sizeFromController = getTableSize();
+        if (sizeFromController == null || 
sizeFromController._estimatedSizeInBytes < 0) {
+          return 0L;
+        }
+        return sizeFromController._estimatedSizeInBytes;
+      }
+
+      private String getBrokerTenant() {
+        if (_tableConfig != null && _tableConfig.getTenantConfig() != null) {
+          String tenant = _tableConfig.getTenantConfig().getBroker();
+          return tenant != null ? tenant : "";
+        }
+        return "";
+      }
+
+      private String getServerTenant() {
+        if (_tableConfig != null && _tableConfig.getTenantConfig() != null) {
+          String tenant = _tableConfig.getTenantConfig().getServer();
+          return tenant != null ? tenant : "";
+        }
+        return "";
+      }
+
+      private int getReplicas() {
+        if (_tableConfig != null && _tableConfig.getValidationConfig() != 
null) {
+          Integer replicationNumber = 
_tableConfig.getValidationConfig().getReplicationNumber();
+          if (replicationNumber != null) {
+            return replicationNumber;
+          }
+        }
+        return 0;
+      }
+
+      private String getTableConfigJson() {
+        String cached = _tableConfigJson;
+        if (cached != null) {
+          return cached;
+        }
+        synchronized (this) {
+          cached = _tableConfigJson;
+          if (cached != null) {
+            return cached;
+          }
+          cached = "";
+          if (_tableConfig != null) {
+            try {
+              cached = JsonUtils.objectToString(_tableConfig);
+            } catch (Exception e) {
+              LOGGER.warn("Failed to serialize table config for {}: {}", 
_tableNameWithType, e.toString());
+              cached = _tableConfig.toString();
+            }
+          }
+          _tableConfigJson = cached;
+          return cached;
+        }
+      }
+    }
+
+    List<TableRow> tableRows = new ArrayList<>(sortedTableNames.size());
+    for (String tableNameWithType : sortedTableNames) {
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType == null) {
+        continue;
+      }
+      String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+      TableConfig tableConfig = _tableCache.getTableConfig(tableNameWithType);
+      tableRows.add(new TableRow(tableNameWithType, tableType, rawTableName, 
tableConfig));
+    }
+
+    Map<String, IntFunction<Object>> valueProviders = new 
java.util.HashMap<>();
+    valueProviders.put("tableName", docId -> 
tableRows.get(docId)._rawTableName);
+    valueProviders.put("type", docId -> 
tableRows.get(docId)._tableType.name());
+    valueProviders.put("status", docId -> tableRows.get(docId).getStatus());
+    valueProviders.put("segments", docId -> 
tableRows.get(docId).getSegments());
+    valueProviders.put("totalDocs", docId -> 
tableRows.get(docId).getTotalDocs());
+    valueProviders.put("reportedSize", docId -> 
tableRows.get(docId).getReportedSize());
+    valueProviders.put("estimatedSize", docId -> 
tableRows.get(docId).getEstimatedSize());
+    valueProviders.put("brokerTenant", docId -> 
tableRows.get(docId).getBrokerTenant());
+    valueProviders.put("serverTenant", docId -> 
tableRows.get(docId).getServerTenant());
+    valueProviders.put("replicas", docId -> 
tableRows.get(docId).getReplicas());
+    valueProviders.put("tableConfig", docId -> 
tableRows.get(docId).getTableConfigJson());
+
+    return new InMemorySystemTableSegment(TABLE_NAME, SCHEMA, 
tableRows.size(), valueProviders);
+  }
+
+  @Nullable
+  private TableSize fetchTableSize(String tableNameWithType,
+      @Nullable Function<String, TableSize> fetcher, List<String> 
controllerBaseUrls) {
+    boolean cacheEnabled = SIZE_CACHE_TTL_MS > 0;
+    TableSize cached = cacheEnabled ? getCachedSize(tableNameWithType) : null;
+    if (cached != null) {
+      return cached;
+    }
+    if (fetcher != null) {
+      try {
+        TableSize fetched = fetcher.apply(tableNameWithType);
+        if (fetched != null) {
+          if (cacheEnabled) {
+            cacheSize(tableNameWithType, fetched);
+          }
+          return fetched;
+        }
+      } catch (Exception e) {
+        LOGGER.warn("Table size fetcher failed for {}: {}", tableNameWithType, 
e.toString());
+      }
+    }
+    String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+    TableSize size = fetchTableSizeForName(controllerBaseUrls, rawTableName);
+    if (size == null) {
+      size = fetchTableSizeForName(controllerBaseUrls, tableNameWithType);
+      if (size == null) {
+        logSizeFetchFailure("{}: failed to fetch size for {} from controllers 
{} (tried '{}' and '{}')", TABLE_NAME,
+            tableNameWithType, controllerBaseUrls, rawTableName, 
tableNameWithType);
+      }
+    }
+    if (size != null && cacheEnabled) {
+      cacheSize(tableNameWithType, size);
+    }
+    return size;
+  }
+
+  @Nullable
+  private TableSize fetchTableSizeForName(List<String> controllerBaseUrls, 
String tableName) {
+    Class<?> clientClass = PINOT_ADMIN_CLIENT_CLASS;
+    if (clientClass == null) {
+      return null;
+    }
+    for (String baseUrl : controllerBaseUrls) {
+      try {
+        AutoCloseable adminClient = getOrCreateAdminClient(baseUrl);
+        if (adminClient == null) {
+          continue;
+        }
+
+        Object sizeNode;
+        try {
+          sizeNode = clientClass.getMethod("getTableSize", String.class, 
boolean.class, boolean.class)
+              .invoke(adminClient, tableName, true, false);
+        } catch (NoSuchMethodException e) {
+          Object tableClient = 
clientClass.getMethod("getTableClient").invoke(adminClient);
+          sizeNode = tableClient.getClass().getMethod("getTableSize", 
String.class, boolean.class, boolean.class)
+              .invoke(tableClient, tableName, true, false);
+        }
+
+        if (sizeNode == null) {
+          continue;
+        }
+
+        TableSize parsed = JsonUtils.stringToObject(sizeNode.toString(), 
TableSize.class);
+        LOGGER.debug("{}: controller size response for {} via {} -> segments 
offline={}, realtime={}, "
+                + "reportedSize={}, estimatedSize={}", TABLE_NAME, tableName, 
baseUrl,
+            parsed._offlineSegments != null && 
parsed._offlineSegments._segments != null
+                ? parsed._offlineSegments._segments.size() : 0,
+            parsed._realtimeSegments != null && 
parsed._realtimeSegments._segments != null
+                ? parsed._realtimeSegments._segments.size() : 0,
+            parsed._reportedSizeInBytes, parsed._estimatedSizeInBytes);
+        return parsed;
+      } catch (Exception e) {
+        logSizeFetchFailure("{}: error fetching table size for {} via {} using 
admin client", TABLE_NAME, tableName,
+            baseUrl, e);
+      }
+    }
+    return null;
+  }
+
+  private List<String> getControllerBaseUrls() {
+    Set<String> urls = new LinkedHashSet<>();
+    if (_helixAdmin != null) {
+      for (String controller : discoverControllersFromHelix()) {
+        String normalized = normalizeControllerUrl(controller);
+        if (normalized != null) {
+          urls.add(normalized);
+        }
+      }
+    }
+    for (String url : _staticControllerUrls) {
+      String normalized = normalizeControllerUrl(url);
+      if (normalized != null) {
+        urls.add(normalized);
+      }
+    }
+    return new ArrayList<>(urls);
+  }
+
+  private int getSegmentCount(TableSize sizeFromController, TableType 
tableType) {
+    if (tableType == TableType.OFFLINE && sizeFromController._offlineSegments 
!= null
+        && sizeFromController._offlineSegments._segments != null) {
+      return sizeFromController._offlineSegments._segments.size();
+    }
+    if (tableType == TableType.REALTIME && 
sizeFromController._realtimeSegments != null
+        && sizeFromController._realtimeSegments._segments != null) {
+      return sizeFromController._realtimeSegments._segments.size();
+    }
+    return 0;
+  }
+
+  private long getTotalDocsFromSize(TableSize sizeFromController, TableType 
tableType) {
+    if (tableType == TableType.OFFLINE && sizeFromController._offlineSegments 
!= null
+        && sizeFromController._offlineSegments._segments != null) {
+      return sizeFromController._offlineSegments._segments.values().stream()
+          .mapToLong(segmentSize -> segmentSize._totalDocs).sum();
+    }
+    if (tableType == TableType.REALTIME && 
sizeFromController._realtimeSegments != null
+        && sizeFromController._realtimeSegments._segments != null) {
+      return sizeFromController._realtimeSegments._segments.values().stream()
+          .mapToLong(segmentSize -> segmentSize._totalDocs).sum();
+    }
+    return 0;
+  }
+
+  private long getTotalDocs(TableSize sizeFromController, TableType tableType, 
String tableNameWithType,
+      List<String> controllerBaseUrls) {
+    if (tableType == TableType.OFFLINE && sizeFromController._offlineSegments 
!= null
+        && sizeFromController._offlineSegments._segments != null) {
+      if (sizeFromController._offlineTotalDocs >= 0) {
+        return sizeFromController._offlineTotalDocs;
+      }
+      long totalDocsFromSize = getTotalDocsFromSize(sizeFromController, 
tableType);
+      if (totalDocsFromSize > 0) {
+        sizeFromController._offlineTotalDocs = totalDocsFromSize;
+        return totalDocsFromSize;
+      }
+      long fetched = fetchTotalDocsFromSegmentMetadata(tableNameWithType, 
sizeFromController._offlineSegments._segments,
+          controllerBaseUrls);
+      sizeFromController._offlineTotalDocs = fetched;
+      return fetched;
+    }
+    if (tableType == TableType.REALTIME && 
sizeFromController._realtimeSegments != null
+        && sizeFromController._realtimeSegments._segments != null) {
+      if (sizeFromController._realtimeTotalDocs >= 0) {
+        return sizeFromController._realtimeTotalDocs;
+      }
+      long totalDocsFromSize = getTotalDocsFromSize(sizeFromController, 
tableType);
+      if (totalDocsFromSize > 0) {
+        sizeFromController._realtimeTotalDocs = totalDocsFromSize;
+        return totalDocsFromSize;
+      }
+      long fetched = fetchTotalDocsFromSegmentMetadata(tableNameWithType,
+          sizeFromController._realtimeSegments._segments, controllerBaseUrls);
+      sizeFromController._realtimeTotalDocs = fetched;
+      return fetched;
+    }
+    return 0;
+  }
+
+  private long fetchTotalDocsFromSegmentMetadata(String tableNameWithType, 
Map<String, SegmentSize> segments,
+      List<String> controllerBaseUrls) {
+    if (segments.isEmpty()) {
+      return 0;
+    }
+    Class<?> clientClass = PINOT_ADMIN_CLIENT_CLASS;
+    if (clientClass == null) {
+      return 0;
+    }
+    for (String baseUrl : controllerBaseUrls) {
+      try {
+        AutoCloseable adminClient = getOrCreateAdminClient(baseUrl);
+        if (adminClient == null) {
+          continue;
+        }
+        Object segmentMetadataFetcher;
+        java.lang.reflect.Method getSegmentMetadataMethod;
+        boolean returnsJsonNode;
+        try {
+          segmentMetadataFetcher = 
clientClass.getMethod("getSegmentApiClient").invoke(adminClient);
+          getSegmentMetadataMethod =
+              
segmentMetadataFetcher.getClass().getMethod("getSegmentMetadata", String.class, 
String.class);
+          returnsJsonNode = true;
+        } catch (NoSuchMethodException e) {
+          segmentMetadataFetcher = 
clientClass.getMethod("getSegmentClient").invoke(adminClient);
+          getSegmentMetadataMethod = segmentMetadataFetcher.getClass()
+              .getMethod("getSegmentMetadata", String.class, String.class, 
List.class);
+          returnsJsonNode = false;
+        }
+
+        long totalDocs = 0;
+        if (returnsJsonNode) {
+          for (String segmentName : segments.keySet()) {
+            JsonNode segmentMetadata = (JsonNode) 
getSegmentMetadataMethod.invoke(segmentMetadataFetcher,
+                tableNameWithType, segmentName);
+            totalDocs += 
segmentMetadata.path(CommonConstants.Segment.TOTAL_DOCS).asLong(0);
+          }
+        } else {
+          for (String segmentName : segments.keySet()) {
+            @SuppressWarnings("unchecked")
+            Map<String, Object> segmentMetadata = (Map<String, Object>) 
getSegmentMetadataMethod
+                .invoke(segmentMetadataFetcher, tableNameWithType, 
segmentName, List.of());
+            Object totalDocsObj = 
segmentMetadata.get(CommonConstants.Segment.TOTAL_DOCS);
+            if (totalDocsObj != null) {
+              totalDocs += Long.parseLong(totalDocsObj.toString());
+            }
+          }
+        }
+        return totalDocs;
+      } catch (Exception e) {
+        logSizeFetchFailure("{}: error fetching segment metadata for {} via 
{}", TABLE_NAME, tableNameWithType, baseUrl,
+            e);
+      }
+    }
+    return 0;
+  }
+
+  @Nullable
+  private Function<String, TableSize> getSizeFetcher() {
+    if (_tableSizeFetcherOverride != null) {
+      return _tableSizeFetcherOverride;
+    }
+    return null;
+  }
+
+  private List<String> discoverControllersFromHelix() {
+    List<String> urls = new ArrayList<>();
+    try {
+      if (_clusterName == null) {
+        LOGGER.warn("Cannot discover controllers without cluster name");
+        return List.of();
+      }
+      for (String controllerId : 
_helixAdmin.getInstancesInCluster(_clusterName)) {
+        if (!InstanceTypeUtils.isController(controllerId)) {
+          continue;
+        }
+        int firstUnderscore = controllerId.indexOf('_');
+        int lastUnderscore = controllerId.lastIndexOf('_');
+        if (firstUnderscore > 0 && lastUnderscore > firstUnderscore && 
lastUnderscore < controllerId.length() - 1) {
+          String host = controllerId.substring(firstUnderscore + 1, 
lastUnderscore);
+          String port = controllerId.substring(lastUnderscore + 1);
+          if (!host.isEmpty() && !port.isEmpty()) {
+            urls.add(host + ":" + port);
+          } else {
+            LOGGER.warn("Unable to parse controller address from instance id 
(empty host or port): {}", controllerId);
+          }
+        } else {
+          LOGGER.warn("Unable to parse controller address from instance id 
'{}' (expected 'Controller_<host>_<port>')",
+              controllerId);
+        }
+      }
+    } catch (Exception e) {
+      LOGGER.warn("Failed to discover controllers from Helix", e);
+    }
+    return urls;
+  }
+
+  @Nullable
+  private AutoCloseable getOrCreateAdminClient(String controllerBaseUrl) {
+    Class<?> clientClass = PINOT_ADMIN_CLIENT_CLASS;
+    if (clientClass == null) {
+      return null;
+    }
+    String normalized = normalizeControllerUrl(controllerBaseUrl);
+    if (normalized == null) {
+      return null;
+    }
+    return _adminClientCache.computeIfAbsent(normalized, controller -> {
+      try {
+        String controllerAddress = stripScheme(controller);
+        Properties properties = new Properties();
+        properties.setProperty(ADMIN_TRANSPORT_REQUEST_TIMEOUT_MS, 
String.valueOf(CONTROLLER_TIMEOUT_MS));
+        properties.setProperty(ADMIN_TRANSPORT_SCHEME, 
controller.startsWith("https://";) ? "https" : "http");
+        return (AutoCloseable) clientClass.getConstructor(String.class, 
Properties.class)
+            .newInstance(controllerAddress, properties);
+      } catch (Exception e) {
+        LOGGER.warn("Failed to create admin client for {}: {}", controller, 
e.toString());
+        return null;
+      }
+    });
+  }
+
+  private static String normalizeControllerUrl(@Nullable String controllerUrl) 
{
+    if (controllerUrl == null || controllerUrl.isEmpty()) {
+      return null;
+    }
+    String normalized = controllerUrl;
+    if (!normalized.startsWith("http://";) && 
!normalized.startsWith("https://";)) {
+      normalized = "http://"; + normalized;
+    }
+    if (normalized.endsWith("/")) {
+      normalized = normalized.substring(0, normalized.length() - 1);
+    }
+    return normalized;
+  }
+
+  private static String stripScheme(String controllerUrl) {
+    if (controllerUrl == null) {

Review Comment:
   This method can return null (line 618), but the caller at line 589 uses the 
result directly without null checking: `stripScheme(controller)`. If 
`controller` is somehow null (though unlikely given the computeIfAbsent logic), 
this would result in passing null to the PinotAdminClient constructor, 
potentially causing a NullPointerException.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to