Copilot commented on code in PR #17291:
URL: https://github.com/apache/pinot/pull/17291#discussion_r2651350768


##########
pinot-plugins/pinot-system-table/src/main/java/org/apache/pinot/systemtable/provider/TablesSystemTableProvider.java:
##########
@@ -0,0 +1,662 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.systemtable.provider;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.function.IntFunction;
+import javax.annotation.Nullable;
+import org.apache.helix.HelixAdmin;
+import org.apache.pinot.client.admin.PinotAdminClient;
+import org.apache.pinot.client.admin.PinotAdminTransport;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.systemtable.SystemTable;
+import org.apache.pinot.common.systemtable.SystemTableProvider;
+import 
org.apache.pinot.common.systemtable.datasource.InMemorySystemTableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Basic system table exposing table-level metadata populated from the broker 
{@link TableCache}.
+ */
+@SystemTable
+public final class TablesSystemTableProvider implements SystemTableProvider {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TablesSystemTableProvider.class);
+  public static final String TABLE_NAME = "system.tables";
+  private static final String SIZE_CACHE_TTL_MS_PROPERTY = 
"pinot.systemtable.tables.sizeCacheTtlMs";
+  private static final long DEFAULT_SIZE_CACHE_TTL_MS = 
Duration.ofMinutes(1).toMillis();
+  private static final long SIZE_CACHE_TTL_MS = 
getNonNegativeLongProperty(SIZE_CACHE_TTL_MS_PROPERTY,
+      DEFAULT_SIZE_CACHE_TTL_MS);
+
+  private static final String CONTROLLER_TIMEOUT_MS_PROPERTY = 
"pinot.systemtable.tables.controllerTimeoutMs";
+  private static final long DEFAULT_CONTROLLER_TIMEOUT_MS = 
Duration.ofSeconds(5).toMillis();
+  private static final long CONTROLLER_TIMEOUT_MS = 
getPositiveLongProperty(CONTROLLER_TIMEOUT_MS_PROPERTY,
+      DEFAULT_CONTROLLER_TIMEOUT_MS);
+
+  private static final long SIZE_FETCH_FAILURE_WARN_INTERVAL_MS = 
Duration.ofHours(1).toMillis();
+
+  private static final Schema SCHEMA = new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+      .addSingleValueDimension("tableName", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("type", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("status", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("segments", FieldSpec.DataType.INT)
+      .addSingleValueDimension("totalDocs", FieldSpec.DataType.LONG)
+      .addMetric("reportedSize", FieldSpec.DataType.LONG)
+      .addMetric("estimatedSize", FieldSpec.DataType.LONG)
+      .addSingleValueDimension("brokerTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("serverTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("replicas", FieldSpec.DataType.INT)
+      .addSingleValueDimension("tableConfig", FieldSpec.DataType.STRING)
+      .build();
+
+  private final TableCache _tableCache;
+  private final @Nullable HelixAdmin _helixAdmin;
+  private final @Nullable String _clusterName;
+  private final @Nullable Function<String, TableSize> 
_tableSizeFetcherOverride;
+  private final List<String> _configuredControllerUrls;
+  private final Map<String, CachedSize> _sizeCache = new ConcurrentHashMap<>();
+  private final Map<String, PinotAdminClient> _adminClientCache = new 
ConcurrentHashMap<>();
+  private final AtomicLong _lastSizeFetchFailureWarnLogMs = new AtomicLong();
+
+  public TablesSystemTableProvider() {
+    this(null, null, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache) {
+    this(tableCache, null, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin) {
+    this(tableCache, helixAdmin, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin,
+      @Nullable String clusterName) {
+    this(tableCache, helixAdmin, clusterName, null, null);
+  }
+
+  TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin, @Nullable String clusterName,
+      @Nullable Function<String, TableSize> tableSizeFetcherOverride, 
@Nullable List<String> controllerUrls) {
+    _tableCache = tableCache;
+    _helixAdmin = helixAdmin;
+    _clusterName = clusterName;
+    _tableSizeFetcherOverride = tableSizeFetcherOverride;
+    _configuredControllerUrls = controllerUrls != null ? new 
ArrayList<>(controllerUrls) : List.of();
+  }
+
+  @Override
+  public String getTableName() {
+    return TABLE_NAME;
+  }
+
+  @Override
+  public Schema getSchema() {
+    return SCHEMA;
+  }
+
+  @Override
+  public TableConfig getTableConfig() {
+    return new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+  }
+
+  @Override
+  public void close()
+      throws Exception {
+    for (Map.Entry<String, PinotAdminClient> entry : 
_adminClientCache.entrySet()) {
+      try {
+        entry.getValue().close();
+      } catch (Exception e) {
+        LOGGER.debug("Failed to close admin client for {}: {}", 
entry.getKey(), e.toString());
+      }
+    }
+    _adminClientCache.clear();
+  }
+
+  @Override
+  public IndexSegment getDataSource() {
+    if (_tableCache == null) {
+      return new InMemorySystemTableSegment(TABLE_NAME, SCHEMA, 0, 
Collections.emptyMap());
+    }
+
+    Set<String> tableNamesWithType = new LinkedHashSet<>();
+    for (String tableName : _tableCache.getTableNameMap().values()) {
+      if (TableNameBuilder.getTableTypeFromTableName(tableName) != null) {
+        tableNamesWithType.add(tableName);
+      }
+    }
+    List<String> sortedTableNames = new ArrayList<>(tableNamesWithType);
+    sortedTableNames.sort(Comparator.naturalOrder());
+
+    List<String> controllerBaseUrls = getControllerBaseUrls();
+    Function<String, TableSize> sizeFetcher = getSizeFetcher();
+    class TableRow {
+      final String _tableNameWithType;
+      final TableType _tableType;
+      final String _rawTableName;
+      final @Nullable TableConfig _tableConfig;
+      private volatile @Nullable String _tableConfigJson;
+      private volatile @Nullable TableSize _tableSize;
+      private volatile boolean _tableSizeFetched;
+
+      private TableRow(String tableNameWithType, TableType tableType, String 
rawTableName,
+          @Nullable TableConfig tableConfig) {
+        _tableNameWithType = tableNameWithType;
+        _tableType = tableType;
+        _rawTableName = rawTableName;
+        _tableConfig = tableConfig;
+      }
+
+      @Nullable
+      private TableSize getTableSize() {
+        if (_tableSizeFetched) {
+          return _tableSize;
+        }
+        synchronized (this) {
+          if (_tableSizeFetched) {
+            return _tableSize;
+          }
+          _tableSize = fetchTableSize(_tableNameWithType, sizeFetcher, 
controllerBaseUrls);
+          _tableSizeFetched = true;
+          return _tableSize;
+        }
+      }
+
+      private String getStatus() {
+        if (_tableConfig != null) {
+          return "ONLINE";
+        }
+        TableSize sizeFromController = getTableSize();
+        int segments = sizeFromController != null ? 
getSegmentCount(sizeFromController, _tableType) : 0;
+        return segments > 0 ? "ONLINE" : "UNKNOWN";
+      }
+
+      private int getSegments() {
+        TableSize sizeFromController = getTableSize();
+        return sizeFromController != null ? 
getSegmentCount(sizeFromController, _tableType) : 0;
+      }
+
+      private long getTotalDocs() {
+        TableSize sizeFromController = getTableSize();
+        return sizeFromController != null ? 
TablesSystemTableProvider.this.getTotalDocs(sizeFromController, _tableType,
+            _tableNameWithType, controllerBaseUrls) : 0L;
+      }
+
+      private long getReportedSize() {
+        TableSize sizeFromController = getTableSize();
+        if (sizeFromController == null || 
sizeFromController._reportedSizeInBytes < 0) {
+          return 0L;
+        }
+        return sizeFromController._reportedSizeInBytes;
+      }
+
+      private long getEstimatedSize() {
+        TableSize sizeFromController = getTableSize();
+        if (sizeFromController == null || 
sizeFromController._estimatedSizeInBytes < 0) {
+          return 0L;
+        }
+        return sizeFromController._estimatedSizeInBytes;
+      }
+
+      private String getBrokerTenant() {
+        if (_tableConfig != null && _tableConfig.getTenantConfig() != null) {
+          String tenant = _tableConfig.getTenantConfig().getBroker();
+          return tenant != null ? tenant : "";
+        }
+        return "";
+      }
+
+      private String getServerTenant() {
+        if (_tableConfig != null && _tableConfig.getTenantConfig() != null) {
+          String tenant = _tableConfig.getTenantConfig().getServer();
+          return tenant != null ? tenant : "";
+        }
+        return "";
+      }
+
+      private int getReplicas() {
+        if (_tableConfig != null && _tableConfig.getValidationConfig() != 
null) {
+          Integer replicationNumber = 
_tableConfig.getValidationConfig().getReplicationNumber();
+          if (replicationNumber != null) {
+            return replicationNumber;
+          }
+        }
+        return 0;
+      }
+
+      private String getTableConfigJson() {
+        String cached = _tableConfigJson;
+        if (cached != null) {
+          return cached;
+        }
+        synchronized (this) {
+          cached = _tableConfigJson;
+          if (cached != null) {
+            return cached;
+          }
+          cached = "";
+          if (_tableConfig != null) {
+            try {
+              cached = JsonUtils.objectToString(_tableConfig);
+            } catch (Exception e) {
+              LOGGER.warn("Failed to serialize table config for {}: {}", 
_tableNameWithType, e.toString());
+              cached = _tableConfig.toString();
+            }
+          }
+          _tableConfigJson = cached;
+          return cached;
+        }
+      }
+    }
+
+    List<TableRow> tableRows = new ArrayList<>(sortedTableNames.size());
+    for (String tableNameWithType : sortedTableNames) {
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType == null) {
+        continue;
+      }
+      String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+      TableConfig tableConfig = _tableCache.getTableConfig(tableNameWithType);
+      tableRows.add(new TableRow(tableNameWithType, tableType, rawTableName, 
tableConfig));
+    }
+
+    Map<String, IntFunction<Object>> valueProviders = new 
java.util.HashMap<>();
+    valueProviders.put("tableName", docId -> 
tableRows.get(docId)._rawTableName);
+    valueProviders.put("type", docId -> 
tableRows.get(docId)._tableType.name());
+    valueProviders.put("status", docId -> tableRows.get(docId).getStatus());
+    valueProviders.put("segments", docId -> 
tableRows.get(docId).getSegments());
+    valueProviders.put("totalDocs", docId -> 
tableRows.get(docId).getTotalDocs());
+    valueProviders.put("reportedSize", docId -> 
tableRows.get(docId).getReportedSize());
+    valueProviders.put("estimatedSize", docId -> 
tableRows.get(docId).getEstimatedSize());
+    valueProviders.put("brokerTenant", docId -> 
tableRows.get(docId).getBrokerTenant());
+    valueProviders.put("serverTenant", docId -> 
tableRows.get(docId).getServerTenant());
+    valueProviders.put("replicas", docId -> 
tableRows.get(docId).getReplicas());
+    valueProviders.put("tableConfig", docId -> 
tableRows.get(docId).getTableConfigJson());
+
+    return new InMemorySystemTableSegment(TABLE_NAME, SCHEMA, 
tableRows.size(), valueProviders);
+  }
+
+  @Nullable
+  private TableSize fetchTableSize(String tableNameWithType,
+      @Nullable Function<String, TableSize> fetcher, List<String> 
controllerBaseUrls) {
+    boolean cacheEnabled = SIZE_CACHE_TTL_MS > 0;
+    TableSize cached = cacheEnabled ? getCachedSize(tableNameWithType) : null;
+    if (cached != null) {
+      return cached;
+    }
+    if (fetcher != null) {
+      try {
+        TableSize fetched = fetcher.apply(tableNameWithType);
+        if (fetched != null) {
+          if (cacheEnabled) {
+            cacheSize(tableNameWithType, fetched);
+          }
+          return fetched;
+        }
+      } catch (Exception e) {
+        LOGGER.warn("Table size fetcher failed for {}: {}", tableNameWithType, 
e.toString());
+      }
+    }
+    String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+    TableSize size = fetchTableSizeForName(controllerBaseUrls, rawTableName);
+    if (size == null) {
+      size = fetchTableSizeForName(controllerBaseUrls, tableNameWithType);
+      if (size == null) {
+        logSizeFetchFailure("{}: failed to fetch size for {} from controllers 
{} "
+                + "(tried raw table name '{}' and table name with type '{}')",
+            TABLE_NAME, tableNameWithType, controllerBaseUrls, rawTableName, 
tableNameWithType);
+      }
+    }
+    if (size != null && cacheEnabled) {
+      cacheSize(tableNameWithType, size);
+    }
+    return size;
+  }
+
+  @Nullable
+  private TableSize fetchTableSizeForName(List<String> controllerBaseUrls, 
String tableName) {
+    for (String baseUrl : controllerBaseUrls) {
+      try {
+        PinotAdminClient adminClient = getOrCreateAdminClient(baseUrl);
+        if (adminClient == null) {
+          continue;
+        }
+
+        JsonNode sizeNode = adminClient.getTableSize(tableName, true, false);
+
+        if (sizeNode == null) {
+          continue;
+        }
+
+        TableSize parsed = JsonUtils.stringToObject(sizeNode.toString(), 
TableSize.class);
+        LOGGER.debug("{}: controller size response for {} via {} -> segments 
offline={}, realtime={}, "
+                + "reportedSize={}, estimatedSize={}", TABLE_NAME, tableName, 
baseUrl,
+            parsed._offlineSegments != null && 
parsed._offlineSegments._segments != null
+                ? parsed._offlineSegments._segments.size() : 0,
+            parsed._realtimeSegments != null && 
parsed._realtimeSegments._segments != null
+                ? parsed._realtimeSegments._segments.size() : 0,
+            parsed._reportedSizeInBytes, parsed._estimatedSizeInBytes);
+        return parsed;
+      } catch (Exception e) {
+        logSizeFetchFailure("{}: error fetching table size for {} via {} using 
admin client", TABLE_NAME, tableName,
+            baseUrl, e);
+      }
+    }
+    return null;
+  }
+
+  private List<String> getControllerBaseUrls() {
+    Set<String> urls = new LinkedHashSet<>();
+    if (_helixAdmin != null) {
+      for (String controller : discoverControllersFromHelix()) {
+        String normalized = normalizeControllerUrl(controller);
+        if (normalized != null) {
+          urls.add(normalized);
+        }
+      }
+    }
+    for (String url : _configuredControllerUrls) {
+      String normalized = normalizeControllerUrl(url);
+      if (normalized != null) {
+        urls.add(normalized);
+      }
+    }
+    return new ArrayList<>(urls);
+  }
+
+  private int getSegmentCount(TableSize sizeFromController, TableType 
tableType) {
+    if (tableType == TableType.OFFLINE && sizeFromController._offlineSegments 
!= null
+        && sizeFromController._offlineSegments._segments != null) {
+      return sizeFromController._offlineSegments._segments.size();
+    }
+    if (tableType == TableType.REALTIME && 
sizeFromController._realtimeSegments != null
+        && sizeFromController._realtimeSegments._segments != null) {
+      return sizeFromController._realtimeSegments._segments.size();
+    }
+    return 0;
+  }
+
+  private long getTotalDocsFromSize(TableSize sizeFromController, TableType 
tableType) {
+    if (tableType == TableType.OFFLINE && sizeFromController._offlineSegments 
!= null
+        && sizeFromController._offlineSegments._segments != null) {
+      return sizeFromController._offlineSegments._segments.values().stream()
+          .mapToLong(segmentSize -> segmentSize._totalDocs).sum();

Review Comment:
   The stream operation iterates over all segment sizes to compute the sum. For 
tables with thousands of segments, this could be inefficient. Consider caching 
the computed sum or computing it once during the controller API response 
deserialization.



##########
pinot-common/src/main/java/org/apache/pinot/common/systemtable/SystemTableRegistry.java:
##########
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.systemtable;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.IdentityHashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.helix.HelixAdmin;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.PinotReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Registry to hold and lifecycle-manage system table data providers.
+ */
+public final class SystemTableRegistry {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SystemTableRegistry.class);
+
+  // Providers are registered once at broker startup.
+  private static final Map<String, SystemTableProvider> PROVIDERS = new 
HashMap<>();
+
+  private SystemTableRegistry() {
+    throw new UnsupportedOperationException("This is a utility class and 
cannot be instantiated");
+  }
+
+  public static void register(SystemTableProvider provider) {
+    synchronized (PROVIDERS) {
+      PROVIDERS.put(normalize(provider.getTableName()), provider);
+    }
+  }
+
+  @Nullable
+  public static SystemTableProvider get(String tableName) {
+    synchronized (PROVIDERS) {
+      return PROVIDERS.get(normalize(tableName));
+    }
+  }
+
+  public static boolean isRegistered(String tableName) {
+    synchronized (PROVIDERS) {
+      return PROVIDERS.containsKey(normalize(tableName));
+    }
+  }
+
+  public static Collection<SystemTableProvider> getProviders() {
+    synchronized (PROVIDERS) {
+      return Collections.unmodifiableCollection(new 
java.util.ArrayList<>(PROVIDERS.values()));
+    }
+  }
+
+  /**
+   * Discover and register providers marked with {@link SystemTable} using the 
available dependencies.
+   * Follows the ScalarFunction pattern: any class annotated with @SystemTable 
under a "*.systemtable.*" package
+   * will be discovered via reflection and registered.
+   */
+  public static void registerAnnotatedProviders(TableCache tableCache, 
HelixAdmin helixAdmin, String clusterName,
+      @Nullable PinotConfiguration config) {
+    Set<Class<?>> classes =
+        
PinotReflectionUtils.getClassesThroughReflection(".*\\.systemtable\\..*", 
SystemTable.class);
+    for (Class<?> clazz : classes) {
+      if (!SystemTableProvider.class.isAssignableFrom(clazz)) {
+        continue;
+      }
+      SystemTableProvider provider =
+          instantiateProvider(clazz.asSubclass(SystemTableProvider.class), 
tableCache, helixAdmin, clusterName, config);
+      if (provider == null) {
+        continue;
+      }
+      if (isRegistered(provider.getTableName())) {
+        continue;
+      }
+      LOGGER.info("Registering system table provider: {}", 
provider.getTableName());
+      register(provider);
+    }
+  }
+
+  public static void close()
+      throws Exception {
+    Exception firstException = null;
+    // Snapshot providers to avoid concurrent modifications and to close each 
provider at most once.
+    Map<SystemTableProvider, Boolean> providersToClose = new 
IdentityHashMap<>();
+    synchronized (PROVIDERS) {
+      for (SystemTableProvider provider : PROVIDERS.values()) {
+        providersToClose.put(provider, Boolean.TRUE);
+      }
+    }
+    try {
+      for (SystemTableProvider provider : providersToClose.keySet()) {
+        try {
+          provider.close();
+        } catch (Exception e) {
+          if (firstException == null) {
+            firstException = e;
+          } else {
+            firstException.addSuppressed(e);
+          }
+        }
+      }
+    } finally {
+      synchronized (PROVIDERS) {
+        PROVIDERS.clear();
+      }
+    }
+    if (firstException != null) {
+      throw firstException;
+    }
+  }
+
+  /**
+   * Initialize and register all annotated system table providers.
+   */
+  public static void init(TableCache tableCache, HelixAdmin helixAdmin, String 
clusterName) {
+    init(tableCache, helixAdmin, clusterName, null);
+  }
+
+  /**
+   * Initialize and register all annotated system table providers.
+   */
+  public static void init(TableCache tableCache, HelixAdmin helixAdmin, String 
clusterName,
+      @Nullable PinotConfiguration config) {
+    registerAnnotatedProviders(tableCache, helixAdmin, clusterName, config);
+  }
+
+  private static String normalize(String tableName) {
+    return tableName.toLowerCase(Locale.ROOT);
+  }
+
+  @Nullable
+  private static SystemTableProvider instantiateProvider(Class<? extends 
SystemTableProvider> clazz,
+      TableCache tableCache, HelixAdmin helixAdmin, String clusterName, 
@Nullable PinotConfiguration config) {
+    try {
+      // Prefer the most specific constructor available.
+      // System table providers may declare any of the following constructors, 
in decreasing order of specificity:
+      //   (TableCache, HelixAdmin, String, PinotConfiguration), (TableCache, 
HelixAdmin, String),
+      //   (TableCache, HelixAdmin), (TableCache), (PinotConfiguration), or a 
no-arg constructor.
+      // The registry will select the most specific available constructor so 
providers can opt in to the dependencies
+      // they need without forcing all implementations to depend on Helix or 
cluster metadata.
+      if (config != null) {
+        try {
+          return clazz.getConstructor(TableCache.class, HelixAdmin.class, 
String.class, PinotConfiguration.class)
+              .newInstance(tableCache, helixAdmin, clusterName, config);
+        } catch (NoSuchMethodException ignored) {
+          // fall through
+        }
+      }

Review Comment:
   The constructor resolution logic contains deeply nested try-catch blocks, 
making it difficult to follow. Consider extracting the constructor lookup logic 
into a separate method or using a more declarative approach with an ordered 
list of constructor signatures to try.



##########
pinot-plugins/pinot-system-table/src/main/java/org/apache/pinot/common/systemtable/datasource/InMemorySystemTableSegment.java:
##########
@@ -0,0 +1,724 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.systemtable.datasource;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import java.io.File;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.function.IntFunction;
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.spi.ColumnMetadata;
+import org.apache.pinot.segment.spi.Constants;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.segment.spi.creator.SegmentVersion;
+import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.datasource.DataSourceMetadata;
+import org.apache.pinot.segment.spi.index.IndexReader;
+import org.apache.pinot.segment.spi.index.IndexType;
+import org.apache.pinot.segment.spi.index.StandardIndexes;
+import org.apache.pinot.segment.spi.index.column.ColumnIndexContainer;
+import 
org.apache.pinot.segment.spi.index.multicolumntext.MultiColumnTextMetadata;
+import 
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.segment.spi.index.reader.BloomFilterReader;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
+import org.apache.pinot.segment.spi.index.reader.H3IndexReader;
+import org.apache.pinot.segment.spi.index.reader.InvertedIndexReader;
+import org.apache.pinot.segment.spi.index.reader.JsonIndexReader;
+import org.apache.pinot.segment.spi.index.reader.NullValueVectorReader;
+import org.apache.pinot.segment.spi.index.reader.RangeIndexReader;
+import org.apache.pinot.segment.spi.index.reader.TextIndexReader;
+import org.apache.pinot.segment.spi.index.reader.VectorIndexReader;
+import org.apache.pinot.segment.spi.index.startree.StarTreeV2;
+import org.apache.pinot.segment.spi.index.startree.StarTreeV2Metadata;
+import org.apache.pinot.segment.spi.partition.PartitionFunction;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.joda.time.Duration;
+import org.joda.time.Interval;
+
+
+/**
+ * In-memory {@link IndexSegment} implementation intended for system table 
queries.
+ * <p>
+ * This segment is backed by per-column value functions (docId -&gt; value) 
and exposes raw forward indexes (no
+ * dictionaries/inverted indexes) so that the standard v1 query engine can 
operate on it.
+ */
+public final class InMemorySystemTableSegment implements IndexSegment {
+  private final String _segmentName;
+  private final Schema _schema;
+  private final int _numDocs;
+  private final Map<String, IntFunction<Object>> _valueProvidersByColumn;
+  private final Map<String, DataSource> _dataSourcesByColumn;
+  private final SegmentMetadata _segmentMetadata;
+
+  public InMemorySystemTableSegment(String segmentName, Schema schema, int 
numDocs,
+      Map<String, IntFunction<Object>> valueProvidersByColumn) {
+    _segmentName = segmentName;
+    _schema = schema;
+    _numDocs = numDocs;
+    _valueProvidersByColumn = new HashMap<>(valueProvidersByColumn);
+    _dataSourcesByColumn = new HashMap<>(schema.getColumnNames().size());
+    for (String column : schema.getColumnNames()) {
+      FieldSpec fieldSpec = schema.getFieldSpecFor(column);
+      if (fieldSpec == null) {
+        continue;
+      }
+      IntFunction<Object> provider = _valueProvidersByColumn.get(column);
+      if (provider == null) {
+        provider = docId -> defaultValue(fieldSpec.getDataType());
+        _valueProvidersByColumn.put(column, provider);
+      }
+      _dataSourcesByColumn.put(column, new FunctionBasedDataSource(fieldSpec, 
numDocs, provider));
+    }
+    _segmentMetadata = new InMemorySegmentMetadata(segmentName, schema, 
numDocs);
+  }
+
+  @Override
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
+  @Override
+  public SegmentMetadata getSegmentMetadata() {
+    return _segmentMetadata;
+  }
+
+  @Override
+  public Set<String> getColumnNames() {
+    return _schema.getColumnNames();
+  }
+
+  @Override
+  public Set<String> getPhysicalColumnNames() {
+    return _schema.getPhysicalColumnNames();
+  }
+
+  @Nullable
+  @Override
+  public DataSource getDataSourceNullable(String column) {
+    return _dataSourcesByColumn.get(column);
+  }
+
+  @Override
+  public DataSource getDataSource(String column, Schema schema) {
+    DataSource dataSource = getDataSourceNullable(column);
+    if (dataSource != null) {
+      return dataSource;
+    }
+    throw new IllegalStateException("Failed to find data source for column: " 
+ column);
+  }
+
+  @Nullable
+  @Override
+  public List<StarTreeV2> getStarTrees() {
+    return null;
+  }
+
+  @Nullable
+  @Override
+  public TextIndexReader getMultiColumnTextIndex() {
+    return null;
+  }
+
+  @Nullable
+  @Override
+  public ThreadSafeMutableRoaringBitmap getValidDocIds() {
+    return null;
+  }
+
+  @Nullable
+  @Override
+  public ThreadSafeMutableRoaringBitmap getQueryableDocIds() {
+    return null;
+  }
+
+  @Override
+  public GenericRow getRecord(int docId, GenericRow reuse) {
+    GenericRow row = reuse != null ? reuse : new GenericRow();
+    row.getFieldToValueMap().clear();
+    for (String column : _schema.getColumnNames()) {
+      row.putValue(column, getValue(docId, column));
+    }
+    return row;
+  }
+
+  @Override
+  public Object getValue(int docId, String column) {
+    IntFunction<Object> provider = _valueProvidersByColumn.get(column);
+    if (provider == null) {
+      return null;
+    }
+    return provider.apply(docId);
+  }
+
+  @Override
+  public void offload() {
+  }
+
+  @Override
+  public void destroy() {
+  }
+
+  private static Object defaultValue(FieldSpec.DataType dataType) {
+    switch (dataType) {
+      case INT:
+        return 0;
+      case LONG:
+        return 0L;
+      case FLOAT:
+        return 0.0f;
+      case DOUBLE:
+        return 0.0d;
+      case BIG_DECIMAL:
+        return BigDecimal.ZERO;
+      case BOOLEAN:
+        return false;
+      case STRING:
+        return "";
+      case BYTES:
+        return new byte[0];
+      default:
+        return null;
+    }
+  }
+
+  private static final class InMemorySegmentMetadata implements 
SegmentMetadata {
+    private final String _segmentName;
+    private final Schema _schema;
+    private final int _totalDocs;
+    private final TreeMap<String, ColumnMetadata> _columnMetadataMap;
+
+    private InMemorySegmentMetadata(String segmentName, Schema schema, int 
totalDocs) {
+      _segmentName = segmentName;
+      _schema = schema;
+      _totalDocs = totalDocs;
+      _columnMetadataMap = new TreeMap<>();
+      for (String column : schema.getColumnNames()) {
+        FieldSpec fieldSpec = schema.getFieldSpecFor(column);
+        if (fieldSpec != null) {
+          _columnMetadataMap.put(column, new InMemoryColumnMetadata(fieldSpec, 
totalDocs));
+        }
+      }
+    }
+
+    @Deprecated
+    @Override
+    public String getTableName() {
+      return _schema.getSchemaName();
+    }
+
+    @Override
+    public String getName() {
+      return _segmentName;
+    }
+
+    @Override
+    public String getTimeColumn() {
+      return "";
+    }
+
+    @Override
+    public long getStartTime() {
+      return 0;
+    }
+
+    @Override
+    public long getEndTime() {
+      return 0;
+    }
+
+    @Override
+    public TimeUnit getTimeUnit() {
+      return TimeUnit.MILLISECONDS;
+    }
+
+    @Override
+    public Duration getTimeGranularity() {
+      return Duration.ZERO;
+    }
+
+    @Override
+    public Interval getTimeInterval() {
+      return new Interval(0L, 0L);
+    }
+
+    @Override
+    public String getCrc() {
+      return "";
+    }
+
+    @Override
+    public String getDataCrc() {
+      return "";
+    }
+
+    @Override
+    public SegmentVersion getVersion() {
+      return SegmentVersion.v3;
+    }
+
+    @Override
+    public Schema getSchema() {
+      return _schema;
+    }
+
+    @Override
+    public int getTotalDocs() {
+      return _totalDocs;
+    }
+
+    @Override
+    public File getIndexDir() {
+      return new File("");
+    }
+
+    @Nullable
+    @Override
+    public String getCreatorName() {
+      return "systemtable";
+    }
+
+    @Override
+    public long getIndexCreationTime() {
+      return 0;
+    }
+
+    @Override
+    public long getLastIndexedTimestamp() {
+      return 0;
+    }
+
+    @Override
+    public long getLatestIngestionTimestamp() {
+      return Long.MIN_VALUE;
+    }
+
+    @Nullable
+    @Override
+    public List<StarTreeV2Metadata> getStarTreeV2MetadataList() {
+      return null;
+    }
+
+    @Nullable
+    @Override
+    public MultiColumnTextMetadata getMultiColumnTextMetadata() {
+      return null;
+    }
+
+    @Override
+    public Map<String, String> getCustomMap() {
+      return Collections.emptyMap();
+    }
+
+    @Override
+    public String getStartOffset() {
+      return "";
+    }
+
+    @Override
+    public String getEndOffset() {
+      return "";
+    }
+
+    @Override
+    public TreeMap<String, ColumnMetadata> getColumnMetadataMap() {
+      return _columnMetadataMap;
+    }
+
+    @Override
+    public void removeColumn(String column) {
+      _columnMetadataMap.remove(column);
+    }
+
+    @Override
+    public JsonNode toJson(@Nullable Set<String> columnFilter) {
+      return JsonNodeFactory.instance.objectNode();
+    }
+  }
+
+  private static final class InMemoryColumnMetadata implements ColumnMetadata {
+    private final FieldSpec _fieldSpec;
+    private final int _totalDocs;
+
+    private InMemoryColumnMetadata(FieldSpec fieldSpec, int totalDocs) {
+      _fieldSpec = fieldSpec;
+      _totalDocs = totalDocs;
+    }
+
+    @Override
+    public FieldSpec getFieldSpec() {
+      return _fieldSpec;
+    }
+
+    @Override
+    public int getTotalDocs() {
+      return _totalDocs;
+    }
+
+    @Override
+    public int getCardinality() {
+      return Constants.UNKNOWN_CARDINALITY;
+    }
+
+    @Override
+    public boolean isSorted() {
+      return false;
+    }
+
+    @Override
+    public Comparable getMinValue() {
+      return defaultComparableValue(_fieldSpec.getDataType());
+    }
+
+    @Override
+    public Comparable getMaxValue() {
+      return defaultComparableValue(_fieldSpec.getDataType());
+    }
+
+    @Override
+    public boolean hasDictionary() {
+      return false;
+    }
+
+    @Override
+    public int getColumnMaxLength() {
+      return 0;
+    }
+
+    @Override
+    public int getBitsPerElement() {
+      return 0;
+    }
+
+    @Override
+    public int getMaxNumberOfMultiValues() {
+      return 0;
+    }
+
+    @Override
+    public int getTotalNumberOfEntries() {
+      return _totalDocs;
+    }
+
+    @Nullable
+    @Override
+    public PartitionFunction getPartitionFunction() {
+      return null;
+    }
+
+    @Nullable
+    @Override
+    public Set<Integer> getPartitions() {
+      return null;
+    }
+
+    @Override
+    public boolean isAutoGenerated() {
+      return false;
+    }
+
+    @Override
+    public Map<IndexType<?, ?, ?>, Long> getIndexSizeMap() {
+      return Collections.emptyMap();
+    }
+
+    private static Comparable defaultComparableValue(FieldSpec.DataType 
dataType) {
+      switch (dataType) {
+        case INT:
+          return 0;
+        case LONG:
+          return 0L;
+        case FLOAT:
+          return 0.0f;
+        case DOUBLE:
+          return 0.0d;
+        case BIG_DECIMAL:
+          return BigDecimal.ZERO;
+        case BOOLEAN:
+          return false;
+        case STRING:
+          return "";
+        default:
+          return 0;
+      }
+    }
+  }
+
+  private static final class FunctionBasedDataSource implements DataSource {
+    private final DataSourceMetadata _metadata;
+    private final ColumnIndexContainer _indexContainer;
+    private final ForwardIndexReader<?> _forwardIndex;
+
+    private FunctionBasedDataSource(FieldSpec fieldSpec, int numDocs, 
IntFunction<Object> valueProvider) {
+      _metadata = new FunctionBasedDataSourceMetadata(fieldSpec, numDocs);
+      _indexContainer = ColumnIndexContainer.Empty.INSTANCE;
+      _forwardIndex = new 
FunctionBasedForwardIndexReader(fieldSpec.getDataType(), valueProvider);
+    }
+
+    @Override
+    public DataSourceMetadata getDataSourceMetadata() {
+      return _metadata;
+    }
+
+    @Override
+    public ColumnIndexContainer getIndexContainer() {
+      return _indexContainer;
+    }
+
+    @Override
+    public <R extends IndexReader> R getIndex(IndexType<?, R, ?> type) {
+      if (type == null) {
+        return null;
+      }
+      if (StandardIndexes.FORWARD_ID.equals(type.getId())) {
+        return (R) _forwardIndex;
+      }
+      return null;
+    }
+
+    @Override
+    public ForwardIndexReader<?> getForwardIndex() {
+      return _forwardIndex;
+    }
+
+    @Nullable
+    @Override
+    public Dictionary getDictionary() {
+      return null;
+    }
+
+    @Nullable
+    @Override
+    public InvertedIndexReader<?> getInvertedIndex() {
+      return null;
+    }
+
+    @Nullable
+    @Override
+    public RangeIndexReader<?> getRangeIndex() {
+      return null;
+    }
+
+    @Nullable
+    @Override
+    public TextIndexReader getTextIndex() {
+      return null;
+    }
+
+    @Nullable
+    @Override
+    public TextIndexReader getFSTIndex() {
+      return null;
+    }
+
+    @Nullable
+    @Override
+    public TextIndexReader getIFSTIndex() {
+      return null;
+    }
+
+    @Nullable
+    @Override
+    public JsonIndexReader getJsonIndex() {
+      return null;
+    }
+
+    @Nullable
+    @Override
+    public H3IndexReader getH3Index() {
+      return null;
+    }
+
+    @Nullable
+    @Override
+    public BloomFilterReader getBloomFilter() {
+      return null;
+    }
+
+    @Nullable
+    @Override
+    public NullValueVectorReader getNullValueVector() {
+      return null;
+    }
+
+    @Nullable
+    @Override
+    public VectorIndexReader getVectorIndex() {
+      return null;
+    }
+  }
+
+  private static final class FunctionBasedDataSourceMetadata implements 
DataSourceMetadata {
+    private final FieldSpec _fieldSpec;
+    private final int _numDocs;
+
+    private FunctionBasedDataSourceMetadata(FieldSpec fieldSpec, int numDocs) {
+      _fieldSpec = fieldSpec;
+      _numDocs = numDocs;
+    }
+
+    @Override
+    public FieldSpec getFieldSpec() {
+      return _fieldSpec;
+    }
+
+    @Override
+    public boolean isSorted() {
+      return false;
+    }
+
+    @Override
+    public int getNumDocs() {
+      return _numDocs;
+    }
+
+    @Override
+    public int getNumValues() {
+      return _numDocs;
+    }
+
+    @Override
+    public int getMaxNumValuesPerMVEntry() {
+      return -1;
+    }
+
+    @Nullable
+    @Override
+    public Comparable getMinValue() {
+      return null;
+    }
+
+    @Nullable
+    @Override
+    public Comparable getMaxValue() {
+      return null;
+    }
+
+    @Nullable
+    @Override
+    public PartitionFunction getPartitionFunction() {
+      return null;
+    }
+
+    @Nullable
+    @Override
+    public Set<Integer> getPartitions() {
+      return null;
+    }
+
+    @Override
+    public int getCardinality() {
+      return -1;
+    }
+  }
+
+  private static final class FunctionBasedForwardIndexReader implements 
ForwardIndexReader<ForwardIndexReaderContext> {
+    private final FieldSpec.DataType _storedType;
+    private final IntFunction<Object> _valueProvider;
+
+    private FunctionBasedForwardIndexReader(FieldSpec.DataType storedType, 
IntFunction<Object> valueProvider) {
+      _storedType = storedType;
+      _valueProvider = valueProvider;
+    }
+
+    @Override
+    public boolean isDictionaryEncoded() {
+      return false;
+    }
+
+    @Override
+    public boolean isSingleValue() {
+      return true;
+    }
+
+    @Override
+    public FieldSpec.DataType getStoredType() {
+      return _storedType;
+    }
+
+    @Override
+    public int getInt(int docId, ForwardIndexReaderContext context) {
+      Object value = _valueProvider.apply(docId);
+      return value instanceof Number ? ((Number) value).intValue() : 
Integer.parseInt(String.valueOf(value));

Review Comment:
   The type conversion pattern with `String.valueOf()` fallback is repeated 
across multiple methods (getInt, getLong, getFloat, getDouble). Extract this 
into a reusable helper method to reduce duplication.



##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SystemTableBrokerRequestHandler.java:
##########
@@ -0,0 +1,546 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.requesthandler;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hc.client5.http.classic.methods.HttpPost;
+import org.apache.hc.client5.http.config.RequestConfig;
+import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
+import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
+import org.apache.hc.client5.http.impl.classic.HttpClients;
+import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
+import org.apache.hc.client5.http.io.HttpClientConnectionManager;
+import org.apache.hc.core5.http.io.entity.EntityUtils;
+import org.apache.hc.core5.http.io.entity.StringEntity;
+import org.apache.hc.core5.util.Timeout;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.broker.api.AccessControl;
+import org.apache.pinot.broker.broker.AccessControlFactory;
+import org.apache.pinot.broker.querylog.QueryLogger;
+import org.apache.pinot.broker.queryquota.QueryQuotaManager;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.datatable.DataTableFactory;
+import org.apache.pinot.common.datatable.DataTableImplV4;
+import org.apache.pinot.common.http.PoolingHttpClientConnectionManagerHelper;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.common.request.QuerySource;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.systemtable.SystemTableProvider;
+import org.apache.pinot.common.systemtable.SystemTableRegistry;
+import org.apache.pinot.common.utils.NamedThreadFactory;
+import org.apache.pinot.common.utils.config.InstanceUtils;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
+import org.apache.pinot.common.utils.request.RequestUtils;
+import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
+import org.apache.pinot.core.plan.Plan;
+import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
+import org.apache.pinot.core.plan.maker.PlanMaker;
+import org.apache.pinot.core.query.reduce.BrokerReduceService;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import 
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import org.apache.pinot.core.routing.RoutingManager;
+import org.apache.pinot.core.transport.ServerRoutingInstance;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.SegmentContext;
+import org.apache.pinot.spi.accounting.ThreadAccountant;
+import org.apache.pinot.spi.auth.AuthorizationResult;
+import org.apache.pinot.spi.auth.broker.RequesterIdentity;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.exception.BadQueryRequestException;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.query.QueryExecutionContext;
+import org.apache.pinot.spi.query.QueryThreadContext;
+import org.apache.pinot.spi.trace.RequestContext;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.InstanceTypeUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.sql.parsers.CalciteSqlParser;
+import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Broker request handler for system tables (handled entirely on the broker).
+ */
+public class SystemTableBrokerRequestHandler extends BaseBrokerRequestHandler {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SystemTableBrokerRequestHandler.class);
+  private static final String SYSTEM_TABLE_PSEUDO_HOST = "localhost";
+  private static final int SYSTEM_TABLE_PSEUDO_PORT = 0;
+  private static final String SYSTEM_TABLE_DATATABLE_API_PATH = 
"/query/systemTable/datatable";
+
+  private final BrokerReduceService _brokerReduceService;
+  private final PlanMaker _planMaker;
+  private final ExecutorService _executorService;
+  private final ExecutorService _scatterGatherExecutorService;
+  private final PoolingHttpClientConnectionManager _scatterGatherConnMgr;
+  private final CloseableHttpClient _scatterGatherHttpClient;
+  @Nullable
+  private final HelixManager _helixManager;
+
+  public SystemTableBrokerRequestHandler(PinotConfiguration config, String 
brokerId,
+      BrokerRequestIdGenerator requestIdGenerator, RoutingManager 
routingManager,
+      AccessControlFactory accessControlFactory, QueryQuotaManager 
queryQuotaManager, TableCache tableCache,
+      ThreadAccountant threadAccountant, @Nullable HelixManager helixManager) {
+    super(config, brokerId, requestIdGenerator, routingManager, 
accessControlFactory, queryQuotaManager, tableCache,
+        threadAccountant);
+    _brokerReduceService = new BrokerReduceService(_config);
+    _planMaker = new InstancePlanMakerImplV2();
+    _planMaker.init(_config);
+    _helixManager = helixManager;
+    int executorPoolSize = 
config.getProperty(CommonConstants.Broker.CONFIG_OF_SYSTEM_TABLE_EXECUTOR_POOL_SIZE,
+        CommonConstants.Broker.DEFAULT_SYSTEM_TABLE_EXECUTOR_POOL_SIZE);
+    executorPoolSize = Math.max(1, executorPoolSize);
+    _executorService = 
QueryThreadContext.contextAwareExecutorService(Executors.newFixedThreadPool(executorPoolSize,
+        new NamedThreadFactory("system-table-query-executor")));
+    _scatterGatherExecutorService =
+        
QueryThreadContext.contextAwareExecutorService(Executors.newFixedThreadPool(executorPoolSize,
+            new NamedThreadFactory("system-table-scatter-gather-executor")));
+    _scatterGatherConnMgr = 
PoolingHttpClientConnectionManagerHelper.createWithSocketFactory();
+    Timeout timeout = Timeout.of(_brokerTimeoutMs, TimeUnit.MILLISECONDS);
+    RequestConfig defaultRequestConfig =
+        
RequestConfig.custom().setConnectionRequestTimeout(timeout).setResponseTimeout(timeout).build();
+    _scatterGatherHttpClient =
+        
HttpClients.custom().setConnectionManager(_scatterGatherConnMgr).setDefaultRequestConfig(defaultRequestConfig)
+            .build();
+  }
+
+  @Override
+  public void start() {
+  }
+
+  @Override
+  public void shutDown() {
+    _executorService.shutdownNow();
+    _scatterGatherExecutorService.shutdownNow();
+    try {
+      _scatterGatherHttpClient.close();
+    } catch (Exception e) {
+      LOGGER.debug("Failed to close system table scatter-gather http client: 
{}", e.toString());
+    }
+    try {
+      _scatterGatherConnMgr.close();
+    } catch (Exception e) {
+      LOGGER.debug("Failed to close system table scatter-gather connection 
manager: {}", e.toString());
+    }
+    _brokerReduceService.shutDown();
+  }
+
+  public boolean canHandle(String tableName) {
+    return isSystemTable(tableName) && 
SystemTableRegistry.isRegistered(tableName);
+  }
+
+  @Override
+  protected BrokerResponse handleRequest(long requestId, String query, 
SqlNodeAndOptions sqlNodeAndOptions,
+      JsonNode request, @Nullable RequesterIdentity requesterIdentity, 
RequestContext requestContext,
+      @Nullable HttpHeaders httpHeaders, AccessControl accessControl)
+      throws Exception {
+    long startTimeMs = requestContext.getRequestArrivalTimeMillis();
+    long deadlineMs = startTimeMs + _brokerTimeoutMs;
+    QueryExecutionContext executionContext =
+        new QueryExecutionContext(QueryExecutionContext.QueryType.STE, 
requestId, Long.toString(requestId),
+            QueryOptionsUtils.getWorkloadName(sqlNodeAndOptions.getOptions()), 
startTimeMs, deadlineMs, deadlineMs,
+            _brokerId, _brokerId, 
org.apache.pinot.spi.utils.CommonConstants.Broker.DEFAULT_QUERY_HASH);
+    try (QueryThreadContext ignore = QueryThreadContext.open(executionContext, 
_threadAccountant)) {
+      PinotQuery pinotQuery;
+      try {
+        pinotQuery = CalciteSqlParser.compileToPinotQuery(sqlNodeAndOptions);
+      } catch (Exception e) {
+        requestContext.setErrorCode(QueryErrorCode.SQL_PARSING);
+        return new BrokerResponseNative(QueryErrorCode.SQL_PARSING, 
e.getMessage());
+      }
+
+      Set<String> tableNames = RequestUtils.getTableNames(pinotQuery);
+      if (tableNames == null || tableNames.isEmpty()) {
+        requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION);
+        return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION, 
"Failed to extract table name");
+      }
+      if (tableNames.size() != 1) {
+        requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION);
+        return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION, 
"System tables do not support joins");
+      }
+      String tableName = tableNames.iterator().next();
+      if (!isSystemTable(tableName)) {
+        requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION);
+        return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION, "Not 
a system table query");
+      }
+      AuthorizationResult authorizationResult =
+          hasTableAccess(requesterIdentity, Set.of(tableName), requestContext, 
httpHeaders);
+      if (!authorizationResult.hasAccess()) {
+        requestContext.setErrorCode(QueryErrorCode.ACCESS_DENIED);
+        return new BrokerResponseNative(QueryErrorCode.ACCESS_DENIED, 
authorizationResult.getFailureMessage());
+      }
+
+      return handleSystemTableQuery(request, pinotQuery, tableName, 
requestContext, requesterIdentity, query,
+          httpHeaders);
+    }
+  }
+
+  @Override
+  protected boolean handleCancel(long queryId, int timeoutMs, Executor 
executor,
+      HttpClientConnectionManager connMgr, Map<String, Integer> 
serverResponses) {
+    return false;
+  }
+
+  @Override
+  public boolean cancelQueryByClientId(String clientQueryId, int timeoutMs, 
Executor executor,
+      HttpClientConnectionManager connMgr, Map<String, Integer> 
serverResponses)
+      throws Exception {
+    return false;
+  }
+
+  @Override
+  public Map<Long, String> getRunningQueries() {
+    return Collections.emptyMap();
+  }
+
+  @Override
+  public OptionalLong getRequestIdByClientId(String clientQueryId) {
+    return OptionalLong.empty();
+  }
+
+  private boolean isSystemTable(String tableName) {
+    return tableName != null && 
tableName.toLowerCase(Locale.ROOT).startsWith("system.");
+  }
+
+  /**
+   * Executes a system table query against the local broker and returns the 
raw {@link DataTable} results.
+   * <p>
+   * This method is used by the internal broker-to-broker scatter-gather 
endpoint and must never perform fanout.
+   */
+  public DataTable handleSystemTableDataTableRequest(JsonNode request, 
@Nullable RequesterIdentity requesterIdentity,
+      RequestContext requestContext, @Nullable HttpHeaders httpHeaders) {
+    long startTimeMs = requestContext.getRequestArrivalTimeMillis();
+    if (startTimeMs <= 0) {
+      startTimeMs = System.currentTimeMillis();
+      requestContext.setRequestArrivalTimeMillis(startTimeMs);
+    }
+    long requestId = _requestIdGenerator.get();
+    long deadlineMs = startTimeMs + _brokerTimeoutMs;
+
+    JsonNode sql = request.get(CommonConstants.Broker.Request.SQL);
+    if (sql == null || !sql.isTextual()) {
+      return exceptionDataTable(QueryErrorCode.JSON_PARSING, "Failed to find 
'sql' in the request: " + request);
+    }
+    String query = sql.textValue();
+    requestContext.setQuery(query);
+
+    SqlNodeAndOptions sqlNodeAndOptions;
+    try {
+      sqlNodeAndOptions = RequestUtils.parseQuery(query, request);
+    } catch (Exception e) {
+      requestContext.setErrorCode(QueryErrorCode.SQL_PARSING);
+      return exceptionDataTable(QueryErrorCode.SQL_PARSING, e.getMessage());
+    }
+
+    QueryExecutionContext executionContext =
+        new QueryExecutionContext(QueryExecutionContext.QueryType.STE, 
requestId, Long.toString(requestId),
+            QueryOptionsUtils.getWorkloadName(sqlNodeAndOptions.getOptions()), 
startTimeMs, deadlineMs, deadlineMs,
+            _brokerId, _brokerId, 
org.apache.pinot.spi.utils.CommonConstants.Broker.DEFAULT_QUERY_HASH);
+    try (QueryThreadContext ignore = QueryThreadContext.open(executionContext, 
_threadAccountant)) {
+      AccessControl accessControl = _accessControlFactory.create();
+      AuthorizationResult authorizationResult = 
accessControl.authorize(requesterIdentity);
+      if (!authorizationResult.hasAccess()) {
+        requestContext.setErrorCode(QueryErrorCode.ACCESS_DENIED);
+        return exceptionDataTable(QueryErrorCode.ACCESS_DENIED, 
authorizationResult.getFailureMessage());
+      }
+
+      PinotQuery pinotQuery;
+      try {
+        pinotQuery = CalciteSqlParser.compileToPinotQuery(sqlNodeAndOptions);
+      } catch (Exception e) {
+        requestContext.setErrorCode(QueryErrorCode.SQL_PARSING);
+        return exceptionDataTable(QueryErrorCode.SQL_PARSING, e.getMessage());
+      }
+
+      Set<String> tableNames = RequestUtils.getTableNames(pinotQuery);
+      if (tableNames == null || tableNames.isEmpty()) {
+        requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION);
+        return exceptionDataTable(QueryErrorCode.QUERY_VALIDATION, "Failed to 
extract table name");
+      }
+      if (tableNames.size() != 1) {
+        requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION);
+        return exceptionDataTable(QueryErrorCode.QUERY_VALIDATION, "System 
tables do not support joins");
+      }
+      String tableName = tableNames.iterator().next();
+      if (!isSystemTable(tableName)) {
+        requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION);
+        return exceptionDataTable(QueryErrorCode.QUERY_VALIDATION, "Not a 
system table query");
+      }
+
+      AuthorizationResult tableAuthorizationResult =
+          hasTableAccess(requesterIdentity, Set.of(tableName), requestContext, 
httpHeaders);
+      if (!tableAuthorizationResult.hasAccess()) {
+        requestContext.setErrorCode(QueryErrorCode.ACCESS_DENIED);
+        return exceptionDataTable(QueryErrorCode.ACCESS_DENIED, 
tableAuthorizationResult.getFailureMessage());
+      }
+
+      SystemTableProvider provider = SystemTableRegistry.get(tableName);
+      if (provider == null) {
+        requestContext.setErrorCode(QueryErrorCode.TABLE_DOES_NOT_EXIST);
+        return exceptionDataTable(QueryErrorCode.TABLE_DOES_NOT_EXIST, "System 
table does not exist: " + tableName);
+      }
+
+      try {
+        return executeLocalSystemTableQuery(pinotQuery, provider);
+      } catch (BadQueryRequestException e) {
+        requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION);
+        
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 
1);
+        return exceptionDataTable(QueryErrorCode.QUERY_VALIDATION, 
e.getMessage());
+      } catch (Exception e) {
+        LOGGER.warn("Caught exception while handling system table datatable 
query {}: {}", tableName, e.getMessage(),
+            e);
+        requestContext.setErrorCode(QueryErrorCode.QUERY_EXECUTION);
+        return exceptionDataTable(QueryErrorCode.QUERY_EXECUTION, 
e.getMessage());
+      }
+    }
+  }
+
+  private BrokerResponse handleSystemTableQuery(JsonNode request, PinotQuery 
pinotQuery, String tableName,
+      RequestContext requestContext, @Nullable RequesterIdentity 
requesterIdentity, String query,
+      @Nullable HttpHeaders httpHeaders) {
+    if (pinotQuery.isExplain()) {
+      return BrokerResponseNative.BROKER_ONLY_EXPLAIN_PLAN_OUTPUT;
+    }
+    SystemTableProvider provider = SystemTableRegistry.get(tableName);
+    if (provider == null) {
+      requestContext.setErrorCode(QueryErrorCode.TABLE_DOES_NOT_EXIST);
+      return BrokerResponseNative.TABLE_DOES_NOT_EXIST;
+    }
+    try {
+      Map<ServerRoutingInstance, DataTable> dataTableMap;
+      if (provider.getExecutionMode() == 
SystemTableProvider.ExecutionMode.BROKER_SCATTER_GATHER) {
+        dataTableMap = scatterGatherSystemTableDataTables(provider, 
pinotQuery, tableName, request, httpHeaders);
+      } else {
+        dataTableMap = new HashMap<>(1);
+        // Use a synthetic routing instance for broker-local execution of 
system table queries.
+        dataTableMap.put(new ServerRoutingInstance(SYSTEM_TABLE_PSEUDO_HOST, 
SYSTEM_TABLE_PSEUDO_PORT,
+            TableType.OFFLINE), executeLocalSystemTableQuery(pinotQuery, 
provider));
+      }
+
+      BrokerResponseNative brokerResponse;
+      BrokerRequest brokerRequest = new BrokerRequest();
+      QuerySource querySource = new QuerySource();
+      querySource.setTableName(tableName);
+      brokerRequest.setQuerySource(querySource);
+      brokerRequest.setPinotQuery(pinotQuery);
+      brokerResponse = _brokerReduceService.reduceOnDataTable(brokerRequest, 
brokerRequest, dataTableMap,
+          _brokerTimeoutMs, _brokerMetrics);
+      
brokerResponse.setTablesQueried(Set.of(TableNameBuilder.extractRawTableName(tableName)));
+      brokerResponse.setTimeUsedMs(System.currentTimeMillis() - 
requestContext.getRequestArrivalTimeMillis());
+      _queryLogger.log(new QueryLogger.QueryLogParams(requestContext, 
tableName, brokerResponse,
+          QueryLogger.QueryLogParams.QueryEngine.SINGLE_STAGE, 
requesterIdentity, null));
+      return brokerResponse;
+    } catch (BadQueryRequestException e) {
+      requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION);
+      
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 
1);
+      return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION, 
e.getMessage());
+    } catch (Exception e) {
+      LOGGER.warn("Caught exception while handling system table query {}: {}", 
tableName, e.getMessage(), e);
+      requestContext.setErrorCode(QueryErrorCode.QUERY_EXECUTION);
+      return new BrokerResponseNative(QueryErrorCode.QUERY_EXECUTION, 
e.getMessage());
+    }
+  }
+
+  private DataTable executeLocalSystemTableQuery(PinotQuery pinotQuery, 
SystemTableProvider provider)
+      throws Exception {
+    IndexSegment dataSource = provider.getDataSource();
+    try {
+      QueryContext queryContext = 
QueryContextConverterUtils.getQueryContext(pinotQuery);
+      queryContext.setSchema(provider.getSchema());
+      queryContext.setEndTimeMs(System.currentTimeMillis() + _brokerTimeoutMs);
+
+      // Pass null for serverMetrics because system table queries run 
broker-local against an in-memory IndexSegment.
+      Plan plan = _planMaker.makeInstancePlan(List.of(new 
SegmentContext(dataSource)), queryContext, _executorService,
+          null);
+      InstanceResponseBlock instanceResponse = plan.execute();
+      return instanceResponse.toDataTable();
+    } finally {
+      dataSource.destroy();
+    }
+  }
+
+  static final class BrokerTarget {
+    final ServerRoutingInstance _routingInstance;
+    final String _dataTableUrl;
+
+    BrokerTarget(ServerRoutingInstance routingInstance, String dataTableUrl) {
+      _routingInstance = routingInstance;
+      _dataTableUrl = dataTableUrl;
+    }
+  }
+
+  @com.google.common.annotations.VisibleForTesting
+  protected Map<ServerRoutingInstance, DataTable> 
scatterGatherSystemTableDataTables(SystemTableProvider provider,
+      PinotQuery pinotQuery, String tableName, JsonNode request, @Nullable 
HttpHeaders httpHeaders) {
+    if (_helixManager == null) {
+      throw new IllegalStateException(
+          "HelixManager is required for scatter-gather execution of system 
table: " + tableName);
+    }
+
+    HelixDataAccessor dataAccessor = _helixManager.getHelixDataAccessor();
+    List<String> liveInstances = 
dataAccessor.getChildNames(dataAccessor.keyBuilder().liveInstances());
+    if (liveInstances == null || liveInstances.isEmpty()) {
+      throw new IllegalStateException("No live instances found for 
scatter-gather execution of system table: "
+          + tableName);
+    }
+
+    String localInstanceId = _brokerId;
+    List<BrokerTarget> remoteTargets = new ArrayList<>();
+    @Nullable ServerRoutingInstance localRoutingInstance = null;
+    for (String instanceId : liveInstances) {
+      if (!InstanceTypeUtils.isBroker(instanceId)) {
+        continue;
+      }
+      InstanceConfig instanceConfig = 
dataAccessor.getProperty(dataAccessor.keyBuilder().instanceConfig(instanceId));
+      if (instanceConfig == null) {
+        continue;
+      }
+      URI baseUri = 
URI.create(InstanceUtils.getInstanceBaseUri(instanceConfig));
+      ServerRoutingInstance routingInstance = new 
ServerRoutingInstance(baseUri.getHost(), baseUri.getPort(),
+          TableType.OFFLINE);
+      if (instanceId.equals(localInstanceId)) {
+        localRoutingInstance = routingInstance;
+      } else {
+        remoteTargets.add(new BrokerTarget(routingInstance, baseUri.toString() 
+ SYSTEM_TABLE_DATATABLE_API_PATH));
+      }
+    }
+
+    Map<ServerRoutingInstance, DataTable> dataTableMap = new 
HashMap<>(remoteTargets.size() + 1);
+    ServerRoutingInstance routingInstance = localRoutingInstance != null ? 
localRoutingInstance
+        : new ServerRoutingInstance(SYSTEM_TABLE_PSEUDO_HOST, 
SYSTEM_TABLE_PSEUDO_PORT, TableType.OFFLINE);
+    try {
+      dataTableMap.put(routingInstance, 
executeLocalSystemTableQuery(pinotQuery, provider));
+    } catch (Exception e) {
+      dataTableMap.put(routingInstance,
+          exceptionDataTable(QueryErrorCode.QUERY_EXECUTION, "Failed to 
execute system table query locally: "
+              + e.getMessage()));
+    }
+
+    if (remoteTargets.isEmpty()) {
+      return dataTableMap;
+    }
+
+    String requestBody = request.toString();
+    @Nullable Map<String, String> requestHeaders = 
toSingleValueRequestHeaders(httpHeaders);
+    if (requestHeaders == null) {
+      requestHeaders = new HashMap<>();
+    }
+    requestHeaders.putIfAbsent("Content-Type", MediaType.APPLICATION_JSON);
+    Map<String, String> requestHeadersFinal = requestHeaders;
+    int timeoutMs = (int) Math.min(Integer.MAX_VALUE, _brokerTimeoutMs);
+
+    List<Pair<BrokerTarget, Future<DataTable>>> futures = new 
ArrayList<>(remoteTargets.size());
+    for (BrokerTarget target : remoteTargets) {
+      Future<DataTable> future =
+          _scatterGatherExecutorService.submit(() -> 
fetchDataTableFromBroker(target._dataTableUrl, requestBody,
+              requestHeadersFinal, timeoutMs));
+      futures.add(Pair.of(target, future));
+    }
+
+    for (Pair<BrokerTarget, Future<DataTable>> pair : futures) {
+      BrokerTarget target = pair.getLeft();
+      try {
+        dataTableMap.put(target._routingInstance, pair.getRight().get());
+      } catch (Exception e) {
+        // Unexpected errors should be surfaced in the merged response.
+        dataTableMap.put(target._routingInstance, 
exceptionDataTable(QueryErrorCode.BROKER_REQUEST_SEND,
+            "Failed to gather system table response from " + 
target._dataTableUrl + ": " + e.getMessage()));
+      }
+    }
+    return dataTableMap;
+  }
+
+  private DataTable fetchDataTableFromBroker(String url, String requestBody, 
Map<String, String> requestHeaders,
+      int timeoutMs) {
+    HttpPost httpPost = new HttpPost(url);
+    Timeout timeout = Timeout.of(timeoutMs, TimeUnit.MILLISECONDS);
+    
httpPost.setConfig(RequestConfig.custom().setConnectionRequestTimeout(timeout).setResponseTimeout(timeout).build());
+    httpPost.setEntity(new StringEntity(requestBody));
+    requestHeaders.forEach(httpPost::setHeader);
+    try (CloseableHttpResponse response = 
_scatterGatherHttpClient.execute(httpPost)) {
+      int status = response.getCode();
+      if (status != 200) {
+        String error = EntityUtils.toString(response.getEntity());
+        return exceptionDataTable(QueryErrorCode.BROKER_REQUEST_SEND,
+            String.format("Scatter-gather system table request failed for %s 
(status=%d): %s", url, status, error));
+      }
+      byte[] bytes = EntityUtils.toByteArray(response.getEntity());
+      if (bytes.length == 0) {
+        return exceptionDataTable(QueryErrorCode.BROKER_REQUEST_SEND,
+            "Scatter-gather system table request returned an empty response 
for: " + url);
+      }
+      return DataTableFactory.getDataTable(bytes);
+    } catch (Exception e) {
+      return exceptionDataTable(QueryErrorCode.BROKER_REQUEST_SEND,
+          String.format("Scatter-gather system table request failed for %s: 
%s", url, e.getMessage()));
+    }
+  }
+
+  private static @Nullable Map<String, String> 
toSingleValueRequestHeaders(@Nullable HttpHeaders httpHeaders) {
+    if (httpHeaders == null) {
+      return null;
+    }
+    Map<String, String> requestHeaders = new HashMap<>();
+    for (Map.Entry<String, List<String>> entry : 
httpHeaders.getRequestHeaders().entrySet()) {
+      String headerName = entry.getKey();
+      // Do not forward hop-by-hop headers or content-specific headers that 
may be invalid for the new request body.
+      // See https://www.rfc-editor.org/rfc/rfc7230#section-6.1
+      String headerNameLower = headerName.toLowerCase(Locale.ROOT);
+      if ("connection".equals(headerNameLower) || 
"content-length".equals(headerNameLower)
+          || "transfer-encoding".equals(headerNameLower) || 
"host".equals(headerNameLower)) {
+        continue;

Review Comment:
   The list of hop-by-hop headers is hardcoded in the method. Extract this into 
a static final Set<String> constant for better maintainability and clarity.



##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SystemTableBrokerRequestHandler.java:
##########
@@ -0,0 +1,546 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.requesthandler;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hc.client5.http.classic.methods.HttpPost;
+import org.apache.hc.client5.http.config.RequestConfig;
+import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
+import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
+import org.apache.hc.client5.http.impl.classic.HttpClients;
+import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
+import org.apache.hc.client5.http.io.HttpClientConnectionManager;
+import org.apache.hc.core5.http.io.entity.EntityUtils;
+import org.apache.hc.core5.http.io.entity.StringEntity;
+import org.apache.hc.core5.util.Timeout;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.broker.api.AccessControl;
+import org.apache.pinot.broker.broker.AccessControlFactory;
+import org.apache.pinot.broker.querylog.QueryLogger;
+import org.apache.pinot.broker.queryquota.QueryQuotaManager;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.datatable.DataTableFactory;
+import org.apache.pinot.common.datatable.DataTableImplV4;
+import org.apache.pinot.common.http.PoolingHttpClientConnectionManagerHelper;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.common.request.QuerySource;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.systemtable.SystemTableProvider;
+import org.apache.pinot.common.systemtable.SystemTableRegistry;
+import org.apache.pinot.common.utils.NamedThreadFactory;
+import org.apache.pinot.common.utils.config.InstanceUtils;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
+import org.apache.pinot.common.utils.request.RequestUtils;
+import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
+import org.apache.pinot.core.plan.Plan;
+import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
+import org.apache.pinot.core.plan.maker.PlanMaker;
+import org.apache.pinot.core.query.reduce.BrokerReduceService;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import 
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import org.apache.pinot.core.routing.RoutingManager;
+import org.apache.pinot.core.transport.ServerRoutingInstance;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.SegmentContext;
+import org.apache.pinot.spi.accounting.ThreadAccountant;
+import org.apache.pinot.spi.auth.AuthorizationResult;
+import org.apache.pinot.spi.auth.broker.RequesterIdentity;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.exception.BadQueryRequestException;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.query.QueryExecutionContext;
+import org.apache.pinot.spi.query.QueryThreadContext;
+import org.apache.pinot.spi.trace.RequestContext;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.InstanceTypeUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.sql.parsers.CalciteSqlParser;
+import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Broker request handler for system tables (handled entirely on the broker).
+ */
+public class SystemTableBrokerRequestHandler extends BaseBrokerRequestHandler {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SystemTableBrokerRequestHandler.class);
+  private static final String SYSTEM_TABLE_PSEUDO_HOST = "localhost";
+  private static final int SYSTEM_TABLE_PSEUDO_PORT = 0;
+  private static final String SYSTEM_TABLE_DATATABLE_API_PATH = 
"/query/systemTable/datatable";
+
+  private final BrokerReduceService _brokerReduceService;
+  private final PlanMaker _planMaker;
+  private final ExecutorService _executorService;
+  private final ExecutorService _scatterGatherExecutorService;
+  private final PoolingHttpClientConnectionManager _scatterGatherConnMgr;
+  private final CloseableHttpClient _scatterGatherHttpClient;
+  @Nullable
+  private final HelixManager _helixManager;
+
+  public SystemTableBrokerRequestHandler(PinotConfiguration config, String 
brokerId,
+      BrokerRequestIdGenerator requestIdGenerator, RoutingManager 
routingManager,
+      AccessControlFactory accessControlFactory, QueryQuotaManager 
queryQuotaManager, TableCache tableCache,
+      ThreadAccountant threadAccountant, @Nullable HelixManager helixManager) {
+    super(config, brokerId, requestIdGenerator, routingManager, 
accessControlFactory, queryQuotaManager, tableCache,
+        threadAccountant);
+    _brokerReduceService = new BrokerReduceService(_config);
+    _planMaker = new InstancePlanMakerImplV2();
+    _planMaker.init(_config);
+    _helixManager = helixManager;
+    int executorPoolSize = 
config.getProperty(CommonConstants.Broker.CONFIG_OF_SYSTEM_TABLE_EXECUTOR_POOL_SIZE,
+        CommonConstants.Broker.DEFAULT_SYSTEM_TABLE_EXECUTOR_POOL_SIZE);
+    executorPoolSize = Math.max(1, executorPoolSize);
+    _executorService = 
QueryThreadContext.contextAwareExecutorService(Executors.newFixedThreadPool(executorPoolSize,
+        new NamedThreadFactory("system-table-query-executor")));
+    _scatterGatherExecutorService =
+        
QueryThreadContext.contextAwareExecutorService(Executors.newFixedThreadPool(executorPoolSize,
+            new NamedThreadFactory("system-table-scatter-gather-executor")));
+    _scatterGatherConnMgr = 
PoolingHttpClientConnectionManagerHelper.createWithSocketFactory();
+    Timeout timeout = Timeout.of(_brokerTimeoutMs, TimeUnit.MILLISECONDS);
+    RequestConfig defaultRequestConfig =
+        
RequestConfig.custom().setConnectionRequestTimeout(timeout).setResponseTimeout(timeout).build();
+    _scatterGatherHttpClient =
+        
HttpClients.custom().setConnectionManager(_scatterGatherConnMgr).setDefaultRequestConfig(defaultRequestConfig)
+            .build();
+  }
+
+  @Override
+  public void start() {
+  }
+
+  @Override
+  public void shutDown() {
+    _executorService.shutdownNow();
+    _scatterGatherExecutorService.shutdownNow();
+    try {
+      _scatterGatherHttpClient.close();
+    } catch (Exception e) {
+      LOGGER.debug("Failed to close system table scatter-gather http client: 
{}", e.toString());
+    }
+    try {
+      _scatterGatherConnMgr.close();
+    } catch (Exception e) {
+      LOGGER.debug("Failed to close system table scatter-gather connection 
manager: {}", e.toString());
+    }
+    _brokerReduceService.shutDown();
+  }
+
+  public boolean canHandle(String tableName) {
+    return isSystemTable(tableName) && 
SystemTableRegistry.isRegistered(tableName);
+  }
+
+  @Override
+  protected BrokerResponse handleRequest(long requestId, String query, 
SqlNodeAndOptions sqlNodeAndOptions,
+      JsonNode request, @Nullable RequesterIdentity requesterIdentity, 
RequestContext requestContext,
+      @Nullable HttpHeaders httpHeaders, AccessControl accessControl)
+      throws Exception {
+    long startTimeMs = requestContext.getRequestArrivalTimeMillis();
+    long deadlineMs = startTimeMs + _brokerTimeoutMs;
+    QueryExecutionContext executionContext =
+        new QueryExecutionContext(QueryExecutionContext.QueryType.STE, 
requestId, Long.toString(requestId),
+            QueryOptionsUtils.getWorkloadName(sqlNodeAndOptions.getOptions()), 
startTimeMs, deadlineMs, deadlineMs,
+            _brokerId, _brokerId, 
org.apache.pinot.spi.utils.CommonConstants.Broker.DEFAULT_QUERY_HASH);
+    try (QueryThreadContext ignore = QueryThreadContext.open(executionContext, 
_threadAccountant)) {
+      PinotQuery pinotQuery;
+      try {
+        pinotQuery = CalciteSqlParser.compileToPinotQuery(sqlNodeAndOptions);
+      } catch (Exception e) {
+        requestContext.setErrorCode(QueryErrorCode.SQL_PARSING);
+        return new BrokerResponseNative(QueryErrorCode.SQL_PARSING, 
e.getMessage());
+      }
+
+      Set<String> tableNames = RequestUtils.getTableNames(pinotQuery);
+      if (tableNames == null || tableNames.isEmpty()) {
+        requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION);
+        return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION, 
"Failed to extract table name");
+      }
+      if (tableNames.size() != 1) {
+        requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION);
+        return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION, 
"System tables do not support joins");
+      }
+      String tableName = tableNames.iterator().next();
+      if (!isSystemTable(tableName)) {
+        requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION);
+        return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION, "Not 
a system table query");
+      }
+      AuthorizationResult authorizationResult =
+          hasTableAccess(requesterIdentity, Set.of(tableName), requestContext, 
httpHeaders);
+      if (!authorizationResult.hasAccess()) {
+        requestContext.setErrorCode(QueryErrorCode.ACCESS_DENIED);
+        return new BrokerResponseNative(QueryErrorCode.ACCESS_DENIED, 
authorizationResult.getFailureMessage());
+      }
+
+      return handleSystemTableQuery(request, pinotQuery, tableName, 
requestContext, requesterIdentity, query,
+          httpHeaders);
+    }
+  }
+
+  @Override
+  protected boolean handleCancel(long queryId, int timeoutMs, Executor 
executor,
+      HttpClientConnectionManager connMgr, Map<String, Integer> 
serverResponses) {
+    return false;
+  }
+
+  @Override
+  public boolean cancelQueryByClientId(String clientQueryId, int timeoutMs, 
Executor executor,
+      HttpClientConnectionManager connMgr, Map<String, Integer> 
serverResponses)
+      throws Exception {
+    return false;
+  }
+
+  @Override
+  public Map<Long, String> getRunningQueries() {
+    return Collections.emptyMap();
+  }
+
+  @Override
+  public OptionalLong getRequestIdByClientId(String clientQueryId) {
+    return OptionalLong.empty();
+  }
+
+  private boolean isSystemTable(String tableName) {
+    return tableName != null && 
tableName.toLowerCase(Locale.ROOT).startsWith("system.");
+  }
+
+  /**
+   * Executes a system table query against the local broker and returns the 
raw {@link DataTable} results.
+   * <p>
+   * This method is used by the internal broker-to-broker scatter-gather 
endpoint and must never perform fanout.
+   */
+  public DataTable handleSystemTableDataTableRequest(JsonNode request, 
@Nullable RequesterIdentity requesterIdentity,
+      RequestContext requestContext, @Nullable HttpHeaders httpHeaders) {
+    long startTimeMs = requestContext.getRequestArrivalTimeMillis();
+    if (startTimeMs <= 0) {
+      startTimeMs = System.currentTimeMillis();
+      requestContext.setRequestArrivalTimeMillis(startTimeMs);
+    }
+    long requestId = _requestIdGenerator.get();
+    long deadlineMs = startTimeMs + _brokerTimeoutMs;
+
+    JsonNode sql = request.get(CommonConstants.Broker.Request.SQL);
+    if (sql == null || !sql.isTextual()) {
+      return exceptionDataTable(QueryErrorCode.JSON_PARSING, "Failed to find 
'sql' in the request: " + request);
+    }
+    String query = sql.textValue();
+    requestContext.setQuery(query);
+
+    SqlNodeAndOptions sqlNodeAndOptions;
+    try {
+      sqlNodeAndOptions = RequestUtils.parseQuery(query, request);
+    } catch (Exception e) {
+      requestContext.setErrorCode(QueryErrorCode.SQL_PARSING);
+      return exceptionDataTable(QueryErrorCode.SQL_PARSING, e.getMessage());
+    }
+
+    QueryExecutionContext executionContext =
+        new QueryExecutionContext(QueryExecutionContext.QueryType.STE, 
requestId, Long.toString(requestId),
+            QueryOptionsUtils.getWorkloadName(sqlNodeAndOptions.getOptions()), 
startTimeMs, deadlineMs, deadlineMs,
+            _brokerId, _brokerId, 
org.apache.pinot.spi.utils.CommonConstants.Broker.DEFAULT_QUERY_HASH);
+    try (QueryThreadContext ignore = QueryThreadContext.open(executionContext, 
_threadAccountant)) {
+      AccessControl accessControl = _accessControlFactory.create();
+      AuthorizationResult authorizationResult = 
accessControl.authorize(requesterIdentity);
+      if (!authorizationResult.hasAccess()) {
+        requestContext.setErrorCode(QueryErrorCode.ACCESS_DENIED);
+        return exceptionDataTable(QueryErrorCode.ACCESS_DENIED, 
authorizationResult.getFailureMessage());
+      }
+
+      PinotQuery pinotQuery;
+      try {
+        pinotQuery = CalciteSqlParser.compileToPinotQuery(sqlNodeAndOptions);
+      } catch (Exception e) {
+        requestContext.setErrorCode(QueryErrorCode.SQL_PARSING);
+        return exceptionDataTable(QueryErrorCode.SQL_PARSING, e.getMessage());
+      }
+
+      Set<String> tableNames = RequestUtils.getTableNames(pinotQuery);
+      if (tableNames == null || tableNames.isEmpty()) {
+        requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION);
+        return exceptionDataTable(QueryErrorCode.QUERY_VALIDATION, "Failed to 
extract table name");
+      }
+      if (tableNames.size() != 1) {
+        requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION);
+        return exceptionDataTable(QueryErrorCode.QUERY_VALIDATION, "System 
tables do not support joins");
+      }
+      String tableName = tableNames.iterator().next();
+      if (!isSystemTable(tableName)) {
+        requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION);
+        return exceptionDataTable(QueryErrorCode.QUERY_VALIDATION, "Not a 
system table query");
+      }
+
+      AuthorizationResult tableAuthorizationResult =
+          hasTableAccess(requesterIdentity, Set.of(tableName), requestContext, 
httpHeaders);
+      if (!tableAuthorizationResult.hasAccess()) {
+        requestContext.setErrorCode(QueryErrorCode.ACCESS_DENIED);
+        return exceptionDataTable(QueryErrorCode.ACCESS_DENIED, 
tableAuthorizationResult.getFailureMessage());
+      }
+
+      SystemTableProvider provider = SystemTableRegistry.get(tableName);
+      if (provider == null) {
+        requestContext.setErrorCode(QueryErrorCode.TABLE_DOES_NOT_EXIST);
+        return exceptionDataTable(QueryErrorCode.TABLE_DOES_NOT_EXIST, "System 
table does not exist: " + tableName);
+      }
+
+      try {
+        return executeLocalSystemTableQuery(pinotQuery, provider);
+      } catch (BadQueryRequestException e) {
+        requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION);
+        
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 
1);
+        return exceptionDataTable(QueryErrorCode.QUERY_VALIDATION, 
e.getMessage());
+      } catch (Exception e) {
+        LOGGER.warn("Caught exception while handling system table datatable 
query {}: {}", tableName, e.getMessage(),
+            e);
+        requestContext.setErrorCode(QueryErrorCode.QUERY_EXECUTION);
+        return exceptionDataTable(QueryErrorCode.QUERY_EXECUTION, 
e.getMessage());
+      }
+    }
+  }
+
+  private BrokerResponse handleSystemTableQuery(JsonNode request, PinotQuery 
pinotQuery, String tableName,
+      RequestContext requestContext, @Nullable RequesterIdentity 
requesterIdentity, String query,
+      @Nullable HttpHeaders httpHeaders) {
+    if (pinotQuery.isExplain()) {
+      return BrokerResponseNative.BROKER_ONLY_EXPLAIN_PLAN_OUTPUT;
+    }
+    SystemTableProvider provider = SystemTableRegistry.get(tableName);
+    if (provider == null) {
+      requestContext.setErrorCode(QueryErrorCode.TABLE_DOES_NOT_EXIST);
+      return BrokerResponseNative.TABLE_DOES_NOT_EXIST;
+    }
+    try {
+      Map<ServerRoutingInstance, DataTable> dataTableMap;
+      if (provider.getExecutionMode() == 
SystemTableProvider.ExecutionMode.BROKER_SCATTER_GATHER) {
+        dataTableMap = scatterGatherSystemTableDataTables(provider, 
pinotQuery, tableName, request, httpHeaders);
+      } else {
+        dataTableMap = new HashMap<>(1);
+        // Use a synthetic routing instance for broker-local execution of 
system table queries.
+        dataTableMap.put(new ServerRoutingInstance(SYSTEM_TABLE_PSEUDO_HOST, 
SYSTEM_TABLE_PSEUDO_PORT,
+            TableType.OFFLINE), executeLocalSystemTableQuery(pinotQuery, 
provider));
+      }
+
+      BrokerResponseNative brokerResponse;
+      BrokerRequest brokerRequest = new BrokerRequest();
+      QuerySource querySource = new QuerySource();
+      querySource.setTableName(tableName);
+      brokerRequest.setQuerySource(querySource);
+      brokerRequest.setPinotQuery(pinotQuery);
+      brokerResponse = _brokerReduceService.reduceOnDataTable(brokerRequest, 
brokerRequest, dataTableMap,
+          _brokerTimeoutMs, _brokerMetrics);
+      
brokerResponse.setTablesQueried(Set.of(TableNameBuilder.extractRawTableName(tableName)));
+      brokerResponse.setTimeUsedMs(System.currentTimeMillis() - 
requestContext.getRequestArrivalTimeMillis());
+      _queryLogger.log(new QueryLogger.QueryLogParams(requestContext, 
tableName, brokerResponse,
+          QueryLogger.QueryLogParams.QueryEngine.SINGLE_STAGE, 
requesterIdentity, null));
+      return brokerResponse;
+    } catch (BadQueryRequestException e) {
+      requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION);
+      
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 
1);
+      return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION, 
e.getMessage());
+    } catch (Exception e) {
+      LOGGER.warn("Caught exception while handling system table query {}: {}", 
tableName, e.getMessage(), e);
+      requestContext.setErrorCode(QueryErrorCode.QUERY_EXECUTION);
+      return new BrokerResponseNative(QueryErrorCode.QUERY_EXECUTION, 
e.getMessage());
+    }
+  }
+
+  private DataTable executeLocalSystemTableQuery(PinotQuery pinotQuery, 
SystemTableProvider provider)
+      throws Exception {
+    IndexSegment dataSource = provider.getDataSource();
+    try {
+      QueryContext queryContext = 
QueryContextConverterUtils.getQueryContext(pinotQuery);
+      queryContext.setSchema(provider.getSchema());
+      queryContext.setEndTimeMs(System.currentTimeMillis() + _brokerTimeoutMs);
+
+      // Pass null for serverMetrics because system table queries run 
broker-local against an in-memory IndexSegment.
+      Plan plan = _planMaker.makeInstancePlan(List.of(new 
SegmentContext(dataSource)), queryContext, _executorService,
+          null);
+      InstanceResponseBlock instanceResponse = plan.execute();
+      return instanceResponse.toDataTable();
+    } finally {
+      dataSource.destroy();
+    }
+  }
+
+  static final class BrokerTarget {
+    final ServerRoutingInstance _routingInstance;
+    final String _dataTableUrl;
+
+    BrokerTarget(ServerRoutingInstance routingInstance, String dataTableUrl) {
+      _routingInstance = routingInstance;
+      _dataTableUrl = dataTableUrl;
+    }
+  }
+
+  @com.google.common.annotations.VisibleForTesting
+  protected Map<ServerRoutingInstance, DataTable> 
scatterGatherSystemTableDataTables(SystemTableProvider provider,
+      PinotQuery pinotQuery, String tableName, JsonNode request, @Nullable 
HttpHeaders httpHeaders) {
+    if (_helixManager == null) {
+      throw new IllegalStateException(
+          "HelixManager is required for scatter-gather execution of system 
table: " + tableName);
+    }
+
+    HelixDataAccessor dataAccessor = _helixManager.getHelixDataAccessor();
+    List<String> liveInstances = 
dataAccessor.getChildNames(dataAccessor.keyBuilder().liveInstances());
+    if (liveInstances == null || liveInstances.isEmpty()) {
+      throw new IllegalStateException("No live instances found for 
scatter-gather execution of system table: "
+          + tableName);
+    }
+
+    String localInstanceId = _brokerId;
+    List<BrokerTarget> remoteTargets = new ArrayList<>();
+    @Nullable ServerRoutingInstance localRoutingInstance = null;
+    for (String instanceId : liveInstances) {
+      if (!InstanceTypeUtils.isBroker(instanceId)) {
+        continue;
+      }
+      InstanceConfig instanceConfig = 
dataAccessor.getProperty(dataAccessor.keyBuilder().instanceConfig(instanceId));
+      if (instanceConfig == null) {
+        continue;
+      }
+      URI baseUri = 
URI.create(InstanceUtils.getInstanceBaseUri(instanceConfig));
+      ServerRoutingInstance routingInstance = new 
ServerRoutingInstance(baseUri.getHost(), baseUri.getPort(),
+          TableType.OFFLINE);
+      if (instanceId.equals(localInstanceId)) {
+        localRoutingInstance = routingInstance;
+      } else {
+        remoteTargets.add(new BrokerTarget(routingInstance, baseUri.toString() 
+ SYSTEM_TABLE_DATATABLE_API_PATH));
+      }
+    }
+
+    Map<ServerRoutingInstance, DataTable> dataTableMap = new 
HashMap<>(remoteTargets.size() + 1);
+    ServerRoutingInstance routingInstance = localRoutingInstance != null ? 
localRoutingInstance
+        : new ServerRoutingInstance(SYSTEM_TABLE_PSEUDO_HOST, 
SYSTEM_TABLE_PSEUDO_PORT, TableType.OFFLINE);
+    try {
+      dataTableMap.put(routingInstance, 
executeLocalSystemTableQuery(pinotQuery, provider));
+    } catch (Exception e) {

Review Comment:
   When local system table query execution fails, the exception is caught and 
wrapped in line 456, but the error message doesn't include the original 
exception details. Consider including the full exception message and stack 
trace for better debugging.
   ```suggestion
       } catch (Exception e) {
         LOGGER.error("Failed to execute system table query locally for query: 
{}", pinotQuery, e);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to