Copilot commented on code in PR #17291:
URL: https://github.com/apache/pinot/pull/17291#discussion_r2649183195


##########
pinot-common/src/main/java/org/apache/pinot/common/systemtable/provider/TablesSystemTableProvider.java:
##########
@@ -0,0 +1,740 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.systemtable.provider;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.function.IntFunction;
+import javax.annotation.Nullable;
+import org.apache.helix.HelixAdmin;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.systemtable.SystemTableDataProvider;
+import 
org.apache.pinot.common.systemtable.datasource.InMemorySystemTableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.systemtable.SystemTable;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.InstanceTypeUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Basic system table exposing table-level metadata populated from the broker 
{@link TableCache}.
+ */
+@SystemTable
+public final class TablesSystemTableProvider implements 
SystemTableDataProvider {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TablesSystemTableProvider.class);
+  public static final String TABLE_NAME = "system.tables";
+  private static final String PINOT_ADMIN_CLIENT_CLASS_NAME = 
"org.apache.pinot.client.admin.PinotAdminClient";

Review Comment:
   The string literal for the class name creates tight coupling and is 
error-prone. Consider using `PinotAdminClient.class.getName()` instead to 
benefit from compile-time validation and refactoring safety. If the class might 
not be on the classpath in some deployments, document this assumption clearly.



##########
pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotSegmentAdminClient.java:
##########
@@ -36,6 +38,10 @@ public class PinotSegmentAdminClient {
   private final String _controllerAddress;
   private final Map<String, String> _headers;
 
+  private static String encodePathSegment(String segment) {
+    return URLEncoder.encode(segment, StandardCharsets.UTF_8);

Review Comment:
   The method name `encodePathSegment` suggests path segment encoding (RFC 
3986), but `URLEncoder.encode` uses form URL encoding 
(application/x-www-form-urlencoded) which encodes spaces as `+` instead of 
`%20`. For proper path segment encoding, consider using 
`URLEncoder.encode(...).replace("+", "%20")` or a URI builder utility that 
handles path segments correctly.
   ```suggestion
       return URLEncoder.encode(segment, StandardCharsets.UTF_8).replace("+", 
"%20");
   ```



##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java:
##########
@@ -43,27 +44,32 @@
 
 /**
  * {@code BrokerRequestHandlerDelegate} delegates the inbound broker request 
to one of the enabled
- * {@link BrokerRequestHandler} based on the requested handle type.
- *
- * {@see: @CommonConstant
+ * {@link BrokerRequestHandler} implementations based on the request type.
  */
 public class BrokerRequestHandlerDelegate implements BrokerRequestHandler {
+  private static final Pattern SYSTEM_TABLE_QUERY_PATTERN =
+      Pattern.compile("\\b(from|join)\\s+system\\.", Pattern.CASE_INSENSITIVE);

Review Comment:
   The regex pattern for detecting system table queries is fragile and could 
produce false positives (e.g., in string literals or comments) or false 
negatives (e.g., with unusual whitespace or subqueries). Consider extracting 
the table name from the parsed PinotQuery/SqlNodeAndOptions instead of pattern 
matching on raw SQL text for more reliable detection.
   ```suggestion
         
Pattern.compile("(?i)(?<!['\"-])\\b(from|join)\\s+system\\s*\\.\\s*[a-zA-Z_][a-zA-Z0-9_]*\\b");
   ```



##########
pinot-common/src/main/java/org/apache/pinot/common/systemtable/provider/TablesSystemTableProvider.java:
##########
@@ -0,0 +1,740 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.systemtable.provider;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.function.IntFunction;
+import javax.annotation.Nullable;
+import org.apache.helix.HelixAdmin;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.systemtable.SystemTableDataProvider;
+import 
org.apache.pinot.common.systemtable.datasource.InMemorySystemTableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.systemtable.SystemTable;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.InstanceTypeUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Basic system table exposing table-level metadata populated from the broker 
{@link TableCache}.
+ */
+@SystemTable
+public final class TablesSystemTableProvider implements 
SystemTableDataProvider {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TablesSystemTableProvider.class);
+  public static final String TABLE_NAME = "system.tables";
+  private static final String PINOT_ADMIN_CLIENT_CLASS_NAME = 
"org.apache.pinot.client.admin.PinotAdminClient";
+  private static final String SIZE_CACHE_TTL_MS_PROPERTY = 
"pinot.systemtable.tables.sizeCacheTtlMs";
+  private static final long DEFAULT_SIZE_CACHE_TTL_MS = 
Duration.ofMinutes(1).toMillis();
+  private static final long SIZE_CACHE_TTL_MS = 
getNonNegativeLongProperty(SIZE_CACHE_TTL_MS_PROPERTY,
+      DEFAULT_SIZE_CACHE_TTL_MS);
+
+  private static final String CONTROLLER_TIMEOUT_MS_PROPERTY = 
"pinot.systemtable.tables.controllerTimeoutMs";
+  private static final long DEFAULT_CONTROLLER_TIMEOUT_MS = 
Duration.ofSeconds(5).toMillis();
+  private static final long CONTROLLER_TIMEOUT_MS = 
getPositiveLongProperty(CONTROLLER_TIMEOUT_MS_PROPERTY,
+      DEFAULT_CONTROLLER_TIMEOUT_MS);
+
+  private static final long SIZE_FETCH_FAILURE_WARN_INTERVAL_MS = 
Duration.ofHours(1).toMillis();
+
+  private static final String ADMIN_TRANSPORT_REQUEST_TIMEOUT_MS = 
"pinot.admin.request.timeout.ms";
+  private static final String ADMIN_TRANSPORT_SCHEME = "pinot.admin.scheme";
+
+  private static final Schema SCHEMA = new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+      .addSingleValueDimension("tableName", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("type", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("status", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("segments", FieldSpec.DataType.INT)
+      .addSingleValueDimension("totalDocs", FieldSpec.DataType.LONG)
+      .addMetric("reportedSize", FieldSpec.DataType.LONG)
+      .addMetric("estimatedSize", FieldSpec.DataType.LONG)
+      .addSingleValueDimension("brokerTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("serverTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("replicas", FieldSpec.DataType.INT)
+      .addSingleValueDimension("tableConfig", FieldSpec.DataType.STRING)
+      .build();
+
+  private static final @Nullable Class<?> PINOT_ADMIN_CLIENT_CLASS = 
loadAdminClientClass();
+
+  private final TableCache _tableCache;
+  private final @Nullable HelixAdmin _helixAdmin;
+  private final @Nullable String _clusterName;
+  private final @Nullable Function<String, TableSize> 
_tableSizeFetcherOverride;
+  private final List<String> _staticControllerUrls;
+  private final Map<String, CachedSize> _sizeCache = new ConcurrentHashMap<>();
+  private final Map<String, AutoCloseable> _adminClientCache = new 
ConcurrentHashMap<>();
+  private final AtomicLong _lastSizeFetchFailureWarnLogMs = new AtomicLong();
+
+  public TablesSystemTableProvider() {
+    this(null, null, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache) {
+    this(tableCache, null, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin) {
+    this(tableCache, helixAdmin, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin,
+      @Nullable String clusterName) {
+    this(tableCache, helixAdmin, clusterName, null, null);
+  }
+
+  TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin, @Nullable String clusterName,
+      @Nullable Function<String, TableSize> tableSizeFetcherOverride, 
@Nullable List<String> controllerUrls) {
+    _tableCache = tableCache;
+    _helixAdmin = helixAdmin;
+    _clusterName = clusterName;
+    _tableSizeFetcherOverride = tableSizeFetcherOverride;
+    _staticControllerUrls = controllerUrls != null ? new 
ArrayList<>(controllerUrls) : List.of();
+  }
+
+  @Override
+  public String getTableName() {
+    return TABLE_NAME;
+  }
+
+  @Override
+  public Schema getSchema() {
+    return SCHEMA;
+  }
+
+  @Override
+  public TableConfig getTableConfig() {
+    return new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+  }
+
+  @Override
+  public void close()
+      throws Exception {
+    for (Map.Entry<String, AutoCloseable> entry : 
_adminClientCache.entrySet()) {
+      try {
+        entry.getValue().close();
+      } catch (Exception e) {
+        LOGGER.debug("Failed to close admin client for {}: {}", 
entry.getKey(), e.toString());
+      }
+    }
+    _adminClientCache.clear();
+  }
+
+  @Override
+  public IndexSegment getDataSource() {
+    if (_tableCache == null) {
+      return new InMemorySystemTableSegment(TABLE_NAME, SCHEMA, 0, 
Collections.emptyMap());
+    }
+
+    Set<String> tableNamesWithType = new LinkedHashSet<>();
+    for (String tableName : _tableCache.getTableNameMap().values()) {
+      if (TableNameBuilder.getTableTypeFromTableName(tableName) != null) {
+        tableNamesWithType.add(tableName);
+      }
+    }
+    List<String> sortedTableNames = new ArrayList<>(tableNamesWithType);
+    sortedTableNames.sort(Comparator.naturalOrder());
+
+    List<String> controllerBaseUrls = getControllerBaseUrls();
+    Function<String, TableSize> sizeFetcher = getSizeFetcher();
+    class TableRow {
+      final String _tableNameWithType;
+      final TableType _tableType;
+      final String _rawTableName;
+      final @Nullable TableConfig _tableConfig;
+      private volatile @Nullable String _tableConfigJson;
+      private volatile @Nullable TableSize _tableSize;
+      private volatile boolean _tableSizeFetched;
+
+      private TableRow(String tableNameWithType, TableType tableType, String 
rawTableName,
+          @Nullable TableConfig tableConfig) {
+        _tableNameWithType = tableNameWithType;
+        _tableType = tableType;
+        _rawTableName = rawTableName;
+        _tableConfig = tableConfig;
+      }
+
+      @Nullable
+      private TableSize getTableSize() {
+        if (_tableSizeFetched) {
+          return _tableSize;
+        }
+        synchronized (this) {
+          if (_tableSizeFetched) {
+            return _tableSize;
+          }
+          _tableSize = fetchTableSize(_tableNameWithType, sizeFetcher, 
controllerBaseUrls);
+          _tableSizeFetched = true;
+          return _tableSize;
+        }
+      }
+
+      private String getStatus() {
+        if (_tableConfig != null) {
+          return "ONLINE";
+        }
+        TableSize sizeFromController = getTableSize();
+        int segments = sizeFromController != null ? 
getSegmentCount(sizeFromController, _tableType) : 0;
+        return segments > 0 ? "ONLINE" : "UNKNOWN";
+      }
+
+      private int getSegments() {
+        TableSize sizeFromController = getTableSize();
+        return sizeFromController != null ? 
getSegmentCount(sizeFromController, _tableType) : 0;
+      }
+
+      private long getTotalDocs() {
+        TableSize sizeFromController = getTableSize();
+        return sizeFromController != null ? 
TablesSystemTableProvider.this.getTotalDocs(sizeFromController, _tableType,
+            _tableNameWithType, controllerBaseUrls) : 0L;
+      }
+
+      private long getReportedSize() {
+        TableSize sizeFromController = getTableSize();
+        if (sizeFromController == null || 
sizeFromController._reportedSizeInBytes < 0) {
+          return 0L;
+        }
+        return sizeFromController._reportedSizeInBytes;
+      }
+
+      private long getEstimatedSize() {
+        TableSize sizeFromController = getTableSize();
+        if (sizeFromController == null || 
sizeFromController._estimatedSizeInBytes < 0) {
+          return 0L;
+        }
+        return sizeFromController._estimatedSizeInBytes;
+      }
+
+      private String getBrokerTenant() {
+        if (_tableConfig != null && _tableConfig.getTenantConfig() != null) {
+          String tenant = _tableConfig.getTenantConfig().getBroker();
+          return tenant != null ? tenant : "";
+        }
+        return "";
+      }
+
+      private String getServerTenant() {
+        if (_tableConfig != null && _tableConfig.getTenantConfig() != null) {
+          String tenant = _tableConfig.getTenantConfig().getServer();
+          return tenant != null ? tenant : "";
+        }
+        return "";
+      }
+
+      private int getReplicas() {
+        if (_tableConfig != null && _tableConfig.getValidationConfig() != 
null) {
+          Integer replicationNumber = 
_tableConfig.getValidationConfig().getReplicationNumber();
+          if (replicationNumber != null) {
+            return replicationNumber;
+          }
+        }
+        return 0;
+      }
+
+      private String getTableConfigJson() {
+        String cached = _tableConfigJson;
+        if (cached != null) {
+          return cached;
+        }
+        synchronized (this) {
+          cached = _tableConfigJson;
+          if (cached != null) {
+            return cached;
+          }
+          cached = "";
+          if (_tableConfig != null) {
+            try {
+              cached = JsonUtils.objectToString(_tableConfig);
+            } catch (Exception e) {
+              LOGGER.warn("Failed to serialize table config for {}: {}", 
_tableNameWithType, e.toString());
+              cached = _tableConfig.toString();
+            }
+          }
+          _tableConfigJson = cached;
+          return cached;
+        }
+      }
+    }
+
+    List<TableRow> tableRows = new ArrayList<>(sortedTableNames.size());
+    for (String tableNameWithType : sortedTableNames) {
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType == null) {
+        continue;
+      }
+      String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+      TableConfig tableConfig = _tableCache.getTableConfig(tableNameWithType);
+      tableRows.add(new TableRow(tableNameWithType, tableType, rawTableName, 
tableConfig));
+    }
+
+    Map<String, IntFunction<Object>> valueProviders = new 
java.util.HashMap<>();
+    valueProviders.put("tableName", docId -> 
tableRows.get(docId)._rawTableName);
+    valueProviders.put("type", docId -> 
tableRows.get(docId)._tableType.name());
+    valueProviders.put("status", docId -> tableRows.get(docId).getStatus());
+    valueProviders.put("segments", docId -> 
tableRows.get(docId).getSegments());
+    valueProviders.put("totalDocs", docId -> 
tableRows.get(docId).getTotalDocs());
+    valueProviders.put("reportedSize", docId -> 
tableRows.get(docId).getReportedSize());
+    valueProviders.put("estimatedSize", docId -> 
tableRows.get(docId).getEstimatedSize());
+    valueProviders.put("brokerTenant", docId -> 
tableRows.get(docId).getBrokerTenant());
+    valueProviders.put("serverTenant", docId -> 
tableRows.get(docId).getServerTenant());
+    valueProviders.put("replicas", docId -> 
tableRows.get(docId).getReplicas());
+    valueProviders.put("tableConfig", docId -> 
tableRows.get(docId).getTableConfigJson());
+
+    return new InMemorySystemTableSegment(TABLE_NAME, SCHEMA, 
tableRows.size(), valueProviders);
+  }
+
+  @Nullable
+  private TableSize fetchTableSize(String tableNameWithType,
+      @Nullable Function<String, TableSize> fetcher, List<String> 
controllerBaseUrls) {
+    boolean cacheEnabled = SIZE_CACHE_TTL_MS > 0;
+    TableSize cached = cacheEnabled ? getCachedSize(tableNameWithType) : null;
+    if (cached != null) {
+      return cached;
+    }
+    if (fetcher != null) {
+      try {
+        TableSize fetched = fetcher.apply(tableNameWithType);
+        if (fetched != null) {
+          if (cacheEnabled) {
+            cacheSize(tableNameWithType, fetched);
+          }
+          return fetched;
+        }
+      } catch (Exception e) {
+        LOGGER.warn("Table size fetcher failed for {}: {}", tableNameWithType, 
e.toString());
+      }
+    }
+    String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+    TableSize size = fetchTableSizeForName(controllerBaseUrls, rawTableName);
+    if (size == null) {
+      size = fetchTableSizeForName(controllerBaseUrls, tableNameWithType);
+      if (size == null) {
+        logSizeFetchFailure("{}: failed to fetch size for {} from controllers 
{} (tried '{}' and '{}')", TABLE_NAME,
+            tableNameWithType, controllerBaseUrls, rawTableName, 
tableNameWithType);
+      }
+    }
+    if (size != null && cacheEnabled) {
+      cacheSize(tableNameWithType, size);
+    }
+    return size;
+  }
+
+  @Nullable
+  private TableSize fetchTableSizeForName(List<String> controllerBaseUrls, 
String tableName) {
+    Class<?> clientClass = PINOT_ADMIN_CLIENT_CLASS;
+    if (clientClass == null) {
+      return null;
+    }
+    for (String baseUrl : controllerBaseUrls) {
+      try {
+        AutoCloseable adminClient = getOrCreateAdminClient(baseUrl);
+        if (adminClient == null) {
+          continue;
+        }
+
+        Object sizeNode;
+        try {
+          sizeNode = clientClass.getMethod("getTableSize", String.class, 
boolean.class, boolean.class)
+              .invoke(adminClient, tableName, true, false);
+        } catch (NoSuchMethodException e) {
+          Object tableClient = 
clientClass.getMethod("getTableClient").invoke(adminClient);
+          sizeNode = tableClient.getClass().getMethod("getTableSize", 
String.class, boolean.class, boolean.class)
+              .invoke(tableClient, tableName, true, false);
+        }
+
+        if (sizeNode == null) {
+          continue;
+        }
+
+        TableSize parsed = JsonUtils.stringToObject(sizeNode.toString(), 
TableSize.class);
+        LOGGER.debug("{}: controller size response for {} via {} -> segments 
offline={}, realtime={}, "
+                + "reportedSize={}, estimatedSize={}", TABLE_NAME, tableName, 
baseUrl,
+            parsed._offlineSegments != null && 
parsed._offlineSegments._segments != null
+                ? parsed._offlineSegments._segments.size() : 0,
+            parsed._realtimeSegments != null && 
parsed._realtimeSegments._segments != null
+                ? parsed._realtimeSegments._segments.size() : 0,
+            parsed._reportedSizeInBytes, parsed._estimatedSizeInBytes);
+        return parsed;
+      } catch (Exception e) {
+        logSizeFetchFailure("{}: error fetching table size for {} via {} using 
admin client", TABLE_NAME, tableName,
+            baseUrl, e);
+      }
+    }
+    return null;
+  }
+
+  private List<String> getControllerBaseUrls() {
+    Set<String> urls = new LinkedHashSet<>();
+    if (_helixAdmin != null) {
+      for (String controller : discoverControllersFromHelix()) {
+        String normalized = normalizeControllerUrl(controller);
+        if (normalized != null) {
+          urls.add(normalized);
+        }
+      }
+    }
+    for (String url : _staticControllerUrls) {
+      String normalized = normalizeControllerUrl(url);
+      if (normalized != null) {
+        urls.add(normalized);
+      }
+    }
+    return new ArrayList<>(urls);
+  }
+
+  private int getSegmentCount(TableSize sizeFromController, TableType 
tableType) {
+    if (tableType == TableType.OFFLINE && sizeFromController._offlineSegments 
!= null
+        && sizeFromController._offlineSegments._segments != null) {
+      return sizeFromController._offlineSegments._segments.size();
+    }
+    if (tableType == TableType.REALTIME && 
sizeFromController._realtimeSegments != null
+        && sizeFromController._realtimeSegments._segments != null) {
+      return sizeFromController._realtimeSegments._segments.size();
+    }
+    return 0;
+  }
+
+  private long getTotalDocsFromSize(TableSize sizeFromController, TableType 
tableType) {
+    if (tableType == TableType.OFFLINE && sizeFromController._offlineSegments 
!= null
+        && sizeFromController._offlineSegments._segments != null) {
+      return sizeFromController._offlineSegments._segments.values().stream()
+          .mapToLong(segmentSize -> segmentSize._totalDocs).sum();
+    }
+    if (tableType == TableType.REALTIME && 
sizeFromController._realtimeSegments != null
+        && sizeFromController._realtimeSegments._segments != null) {
+      return sizeFromController._realtimeSegments._segments.values().stream()
+          .mapToLong(segmentSize -> segmentSize._totalDocs).sum();
+    }
+    return 0;
+  }
+
+  private long getTotalDocs(TableSize sizeFromController, TableType tableType, 
String tableNameWithType,
+      List<String> controllerBaseUrls) {
+    if (tableType == TableType.OFFLINE && sizeFromController._offlineSegments 
!= null
+        && sizeFromController._offlineSegments._segments != null) {
+      long cached = sizeFromController._offlineTotalDocs;
+      if (cached >= 0) {
+        return cached;
+      }
+      long totalDocsFromSize = getTotalDocsFromSize(sizeFromController, 
tableType);
+      if (totalDocsFromSize > 0) {
+        synchronized (sizeFromController) {
+          if (sizeFromController._offlineTotalDocs < 0) {
+            sizeFromController._offlineTotalDocs = totalDocsFromSize;
+          }
+          return sizeFromController._offlineTotalDocs;
+        }
+      }
+      long fetched = fetchTotalDocsFromSegmentMetadata(tableNameWithType, 
sizeFromController._offlineSegments._segments,
+          controllerBaseUrls);
+      synchronized (sizeFromController) {
+        if (sizeFromController._offlineTotalDocs < 0) {
+          sizeFromController._offlineTotalDocs = fetched;
+        }
+        return sizeFromController._offlineTotalDocs;
+      }
+    }
+    if (tableType == TableType.REALTIME && 
sizeFromController._realtimeSegments != null
+        && sizeFromController._realtimeSegments._segments != null) {
+      long cached = sizeFromController._realtimeTotalDocs;
+      if (cached >= 0) {
+        return cached;
+      }
+      long totalDocsFromSize = getTotalDocsFromSize(sizeFromController, 
tableType);
+      if (totalDocsFromSize > 0) {
+        synchronized (sizeFromController) {
+          if (sizeFromController._realtimeTotalDocs < 0) {
+            sizeFromController._realtimeTotalDocs = totalDocsFromSize;
+          }
+          return sizeFromController._realtimeTotalDocs;
+        }
+      }
+      long fetched = fetchTotalDocsFromSegmentMetadata(tableNameWithType,
+          sizeFromController._realtimeSegments._segments, controllerBaseUrls);
+      synchronized (sizeFromController) {
+        if (sizeFromController._realtimeTotalDocs < 0) {
+          sizeFromController._realtimeTotalDocs = fetched;
+        }
+        return sizeFromController._realtimeTotalDocs;
+      }
+    }
+    return 0;
+  }
+
+  private long fetchTotalDocsFromSegmentMetadata(String tableNameWithType, 
Map<String, SegmentSize> segments,
+      List<String> controllerBaseUrls) {
+    if (segments.isEmpty()) {
+      return 0;
+    }
+    Class<?> clientClass = PINOT_ADMIN_CLIENT_CLASS;
+    if (clientClass == null) {
+      return 0;
+    }
+    for (String baseUrl : controllerBaseUrls) {
+      try {
+        AutoCloseable adminClient = getOrCreateAdminClient(baseUrl);
+        if (adminClient == null) {
+          continue;
+        }
+        Object segmentMetadataFetcher;
+        java.lang.reflect.Method getSegmentMetadataMethod;
+        boolean returnsJsonNode;
+        try {
+          segmentMetadataFetcher = 
clientClass.getMethod("getSegmentApiClient").invoke(adminClient);
+          getSegmentMetadataMethod =
+              
segmentMetadataFetcher.getClass().getMethod("getSegmentMetadata", String.class, 
String.class);
+          returnsJsonNode = true;
+        } catch (NoSuchMethodException e) {
+          segmentMetadataFetcher = 
clientClass.getMethod("getSegmentClient").invoke(adminClient);
+          getSegmentMetadataMethod = segmentMetadataFetcher.getClass()
+              .getMethod("getSegmentMetadata", String.class, String.class, 
List.class);
+          returnsJsonNode = false;
+        }
+
+        long totalDocs = 0;
+        if (returnsJsonNode) {
+          for (String segmentName : segments.keySet()) {
+            JsonNode segmentMetadata = (JsonNode) 
getSegmentMetadataMethod.invoke(segmentMetadataFetcher,
+                tableNameWithType, segmentName);
+            totalDocs += 
segmentMetadata.path(CommonConstants.Segment.TOTAL_DOCS).asLong(0);
+          }
+        } else {
+          for (String segmentName : segments.keySet()) {
+            @SuppressWarnings("unchecked")
+            Map<String, Object> segmentMetadata = (Map<String, Object>) 
getSegmentMetadataMethod
+                .invoke(segmentMetadataFetcher, tableNameWithType, 
segmentName, List.of());
+            Object totalDocsObj = 
segmentMetadata.get(CommonConstants.Segment.TOTAL_DOCS);
+            if (totalDocsObj != null) {
+              totalDocs += Long.parseLong(totalDocsObj.toString());
+            }
+          }
+        }
+        return totalDocs;
+      } catch (Exception e) {
+        logSizeFetchFailure("{}: error fetching segment metadata for {} via 
{}", TABLE_NAME, tableNameWithType, baseUrl,
+            e);
+      }
+    }
+    return 0;
+  }
+
+  @Nullable
+  private Function<String, TableSize> getSizeFetcher() {
+    if (_tableSizeFetcherOverride != null) {
+      return _tableSizeFetcherOverride;
+    }
+    return null;
+  }
+
+  private List<String> discoverControllersFromHelix() {
+    List<String> urls = new ArrayList<>();
+    try {
+      if (_clusterName == null) {
+        LOGGER.warn("Cannot discover controllers without cluster name");
+        return List.of();
+      }
+      for (String controllerId : 
_helixAdmin.getInstancesInCluster(_clusterName)) {
+        if (!InstanceTypeUtils.isController(controllerId)) {
+          continue;
+        }
+        int firstUnderscore = controllerId.indexOf('_');
+        int lastUnderscore = controllerId.lastIndexOf('_');
+        if (firstUnderscore > 0 && lastUnderscore > firstUnderscore && 
lastUnderscore < controllerId.length() - 1) {
+          String host = controllerId.substring(firstUnderscore + 1, 
lastUnderscore);
+          String port = controllerId.substring(lastUnderscore + 1);

Review Comment:
   The controller instance ID parsing logic assumes a specific format 
(`Controller_<host>_<port>`) and will fail silently for variations. Consider 
using a regex pattern with capture groups for more robust parsing and clearer 
validation, or extracting this logic into a dedicated utility method with 
proper error handling.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to