Copilot commented on code in PR #17291: URL: https://github.com/apache/pinot/pull/17291#discussion_r2617586921
########## pinot-common/src/main/java/org/apache/pinot/common/systemtable/provider/TablesSystemTableProvider.java: ########## @@ -0,0 +1,597 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.systemtable.provider; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import javax.annotation.Nullable; +import org.apache.helix.HelixAdmin; +import org.apache.pinot.common.config.provider.TableCache; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.systemtable.SystemTableFilter; +import org.apache.pinot.spi.systemtable.SystemTableProvider; +import org.apache.pinot.spi.systemtable.SystemTableRequest; +import org.apache.pinot.spi.systemtable.SystemTableResponse; +import org.apache.pinot.spi.utils.InstanceTypeUtils; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Basic system table exposing table-level metadata populated from the broker {@link TableCache}. + */ +public final class TablesSystemTableProvider implements SystemTableProvider { + private static final Logger LOGGER = LoggerFactory.getLogger(TablesSystemTableProvider.class); + + private static final Schema SCHEMA = new Schema.SchemaBuilder().setSchemaName("system.tables") + .addSingleValueDimension("tableName", FieldSpec.DataType.STRING) + .addSingleValueDimension("type", FieldSpec.DataType.STRING) + .addSingleValueDimension("status", FieldSpec.DataType.STRING) + .addSingleValueDimension("segments", FieldSpec.DataType.INT) + .addSingleValueDimension("totalDocs", FieldSpec.DataType.LONG) + .addMetric("reportedSize", FieldSpec.DataType.LONG) + .addMetric("estimatedSize", FieldSpec.DataType.LONG) + .addSingleValueDimension("storageTier", FieldSpec.DataType.STRING) + .addSingleValueDimension("brokerTenant", FieldSpec.DataType.STRING) + .addSingleValueDimension("serverTenant", FieldSpec.DataType.STRING) + .addSingleValueDimension("replicas", FieldSpec.DataType.INT) + .addSingleValueDimension("tableConfig", FieldSpec.DataType.STRING) + .build(); + + private static final boolean IS_ADMIN_CLIENT_AVAILABLE; + static { + boolean available; + try { + Class.forName("org.apache.pinot.client.admin.PinotAdminClient"); + available = true; + } catch (ClassNotFoundException e) { + available = false; + } + IS_ADMIN_CLIENT_AVAILABLE = available; Review Comment: The class name check uses reflection to verify PinotAdminClient availability, but this is fragile and could break if the class is relocated or renamed. Consider using a more robust mechanism such as checking for the presence of a specific method or using a feature flag/configuration property instead of relying on ClassNotFoundException. ```suggestion String adminClientEnabled = System.getProperty("pinot.admin.client.enabled"); if (adminClientEnabled == null) { LOGGER.warn("System property 'pinot.admin.client.enabled' is not set. Defaulting PinotAdminClient availability to false."); IS_ADMIN_CLIENT_AVAILABLE = false; } else { IS_ADMIN_CLIENT_AVAILABLE = Boolean.parseBoolean(adminClientEnabled); } ``` ########## pinot-common/src/main/java/org/apache/pinot/common/systemtable/provider/TablesSystemTableProvider.java: ########## @@ -0,0 +1,597 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.systemtable.provider; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import javax.annotation.Nullable; +import org.apache.helix.HelixAdmin; +import org.apache.pinot.common.config.provider.TableCache; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.systemtable.SystemTableFilter; +import org.apache.pinot.spi.systemtable.SystemTableProvider; +import org.apache.pinot.spi.systemtable.SystemTableRequest; +import org.apache.pinot.spi.systemtable.SystemTableResponse; +import org.apache.pinot.spi.utils.InstanceTypeUtils; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Basic system table exposing table-level metadata populated from the broker {@link TableCache}. + */ +public final class TablesSystemTableProvider implements SystemTableProvider { + private static final Logger LOGGER = LoggerFactory.getLogger(TablesSystemTableProvider.class); + + private static final Schema SCHEMA = new Schema.SchemaBuilder().setSchemaName("system.tables") + .addSingleValueDimension("tableName", FieldSpec.DataType.STRING) + .addSingleValueDimension("type", FieldSpec.DataType.STRING) + .addSingleValueDimension("status", FieldSpec.DataType.STRING) + .addSingleValueDimension("segments", FieldSpec.DataType.INT) + .addSingleValueDimension("totalDocs", FieldSpec.DataType.LONG) + .addMetric("reportedSize", FieldSpec.DataType.LONG) + .addMetric("estimatedSize", FieldSpec.DataType.LONG) + .addSingleValueDimension("storageTier", FieldSpec.DataType.STRING) + .addSingleValueDimension("brokerTenant", FieldSpec.DataType.STRING) + .addSingleValueDimension("serverTenant", FieldSpec.DataType.STRING) + .addSingleValueDimension("replicas", FieldSpec.DataType.INT) + .addSingleValueDimension("tableConfig", FieldSpec.DataType.STRING) + .build(); + + private static final boolean IS_ADMIN_CLIENT_AVAILABLE; + static { + boolean available; + try { + Class.forName("org.apache.pinot.client.admin.PinotAdminClient"); + available = true; + } catch (ClassNotFoundException e) { + available = false; + } + IS_ADMIN_CLIENT_AVAILABLE = available; + } + + private final TableCache _tableCache; + private final @Nullable HelixAdmin _helixAdmin; + private final @Nullable String _clusterName; + private final HttpClient _httpClient; + private final @Nullable Function<String, TableSize> _tableSizeFetcherOverride; + private final List<String> _staticControllerUrls; + + public TablesSystemTableProvider() { + this(null, null, null, null, null); + } + + public TablesSystemTableProvider(TableCache tableCache) { + this(tableCache, null, null, null, null); + } + + public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin helixAdmin) { + this(tableCache, helixAdmin, null, null, null); + } + + public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin helixAdmin, + @Nullable String clusterName) { + this(tableCache, helixAdmin, clusterName, null, null); + } + + TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin helixAdmin, @Nullable String clusterName, + @Nullable Function<String, TableSize> tableSizeFetcherOverride, @Nullable List<String> controllerUrls) { + _tableCache = tableCache; + _helixAdmin = helixAdmin; + _clusterName = clusterName; + _httpClient = HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(5)).build(); + _tableSizeFetcherOverride = tableSizeFetcherOverride; + _staticControllerUrls = controllerUrls != null ? new ArrayList<>(controllerUrls) : List.of(); + } + + @Override + public String getTableName() { + return "system.tables"; + } + + @Override + public Schema getSchema() { + return SCHEMA; + } + + @Override + public SystemTableResponse getRows(SystemTableRequest request) { + if (_tableCache == null) { + return new SystemTableResponse(List.of(), System.currentTimeMillis(), 0); + } + Set<String> tableNamesWithType = new LinkedHashSet<>(); + for (String tableName : _tableCache.getTableNameMap().values()) { + if (TableNameBuilder.getTableTypeFromTableName(tableName) != null) { + tableNamesWithType.add(tableName); + } + } + List<String> sortedTableNames = new ArrayList<>(tableNamesWithType); + sortedTableNames.sort(Comparator.naturalOrder()); + + int offset = Math.max(0, request.getOffset()); + int limit = request.getLimit(); + boolean hasLimit = limit > 0; + int totalRows = 0; + int initialCapacity = hasLimit ? Math.min(sortedTableNames.size(), limit) : 0; Review Comment: The initial capacity is set to 0 when there's no limit, which will cause the ArrayList to resize multiple times as elements are added. This is inefficient for large table lists. Set initialCapacity to sortedTableNames.size() when hasLimit is false to avoid unnecessary resizing. ```suggestion int initialCapacity = hasLimit ? Math.min(sortedTableNames.size(), limit) : sortedTableNames.size(); ``` ########## pinot-common/src/main/java/org/apache/pinot/common/systemtable/provider/TablesSystemTableProvider.java: ########## @@ -0,0 +1,597 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.systemtable.provider; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import javax.annotation.Nullable; +import org.apache.helix.HelixAdmin; +import org.apache.pinot.common.config.provider.TableCache; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.systemtable.SystemTableFilter; +import org.apache.pinot.spi.systemtable.SystemTableProvider; +import org.apache.pinot.spi.systemtable.SystemTableRequest; +import org.apache.pinot.spi.systemtable.SystemTableResponse; +import org.apache.pinot.spi.utils.InstanceTypeUtils; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Basic system table exposing table-level metadata populated from the broker {@link TableCache}. + */ +public final class TablesSystemTableProvider implements SystemTableProvider { + private static final Logger LOGGER = LoggerFactory.getLogger(TablesSystemTableProvider.class); + + private static final Schema SCHEMA = new Schema.SchemaBuilder().setSchemaName("system.tables") + .addSingleValueDimension("tableName", FieldSpec.DataType.STRING) + .addSingleValueDimension("type", FieldSpec.DataType.STRING) + .addSingleValueDimension("status", FieldSpec.DataType.STRING) + .addSingleValueDimension("segments", FieldSpec.DataType.INT) + .addSingleValueDimension("totalDocs", FieldSpec.DataType.LONG) + .addMetric("reportedSize", FieldSpec.DataType.LONG) + .addMetric("estimatedSize", FieldSpec.DataType.LONG) + .addSingleValueDimension("storageTier", FieldSpec.DataType.STRING) + .addSingleValueDimension("brokerTenant", FieldSpec.DataType.STRING) + .addSingleValueDimension("serverTenant", FieldSpec.DataType.STRING) + .addSingleValueDimension("replicas", FieldSpec.DataType.INT) + .addSingleValueDimension("tableConfig", FieldSpec.DataType.STRING) + .build(); + + private static final boolean IS_ADMIN_CLIENT_AVAILABLE; + static { + boolean available; + try { + Class.forName("org.apache.pinot.client.admin.PinotAdminClient"); + available = true; + } catch (ClassNotFoundException e) { + available = false; + } + IS_ADMIN_CLIENT_AVAILABLE = available; + } + + private final TableCache _tableCache; + private final @Nullable HelixAdmin _helixAdmin; + private final @Nullable String _clusterName; + private final HttpClient _httpClient; + private final @Nullable Function<String, TableSize> _tableSizeFetcherOverride; + private final List<String> _staticControllerUrls; + + public TablesSystemTableProvider() { + this(null, null, null, null, null); + } + + public TablesSystemTableProvider(TableCache tableCache) { + this(tableCache, null, null, null, null); + } + + public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin helixAdmin) { + this(tableCache, helixAdmin, null, null, null); + } + + public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin helixAdmin, + @Nullable String clusterName) { + this(tableCache, helixAdmin, clusterName, null, null); + } + + TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin helixAdmin, @Nullable String clusterName, + @Nullable Function<String, TableSize> tableSizeFetcherOverride, @Nullable List<String> controllerUrls) { + _tableCache = tableCache; + _helixAdmin = helixAdmin; + _clusterName = clusterName; + _httpClient = HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(5)).build(); + _tableSizeFetcherOverride = tableSizeFetcherOverride; + _staticControllerUrls = controllerUrls != null ? new ArrayList<>(controllerUrls) : List.of(); + } + + @Override + public String getTableName() { + return "system.tables"; + } + + @Override + public Schema getSchema() { + return SCHEMA; + } + + @Override + public SystemTableResponse getRows(SystemTableRequest request) { + if (_tableCache == null) { + return new SystemTableResponse(List.of(), System.currentTimeMillis(), 0); + } + Set<String> tableNamesWithType = new LinkedHashSet<>(); + for (String tableName : _tableCache.getTableNameMap().values()) { + if (TableNameBuilder.getTableTypeFromTableName(tableName) != null) { + tableNamesWithType.add(tableName); + } + } + List<String> sortedTableNames = new ArrayList<>(tableNamesWithType); + sortedTableNames.sort(Comparator.naturalOrder()); + + int offset = Math.max(0, request.getOffset()); + int limit = request.getLimit(); + boolean hasLimit = limit > 0; + int totalRows = 0; + int initialCapacity = hasLimit ? Math.min(sortedTableNames.size(), limit) : 0; + List<GenericRow> rows = new ArrayList<>(initialCapacity); + for (String tableNameWithType : sortedTableNames) { + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); + if (tableType == null) { + continue; + } + String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); + TableStats stats = buildStats(tableNameWithType, tableType); + if (!matchesFilter(request.getFilter(), stats, rawTableName)) { + continue; + } + totalRows++; + if (offset > 0) { + offset--; + continue; + } + if (limit == 0) { + continue; + } + if (hasLimit && rows.size() >= limit) { + continue; + } + GenericRow row = new GenericRow(); + row.putValue("tableName", rawTableName); + row.putValue("type", stats._type); + row.putValue("status", stats._status); + row.putValue("segments", stats._segments); + row.putValue("totalDocs", stats._totalDocs); + row.putValue("reportedSize", stats._reportedSizeInBytes); + row.putValue("estimatedSize", stats._estimatedSizeInBytes); + row.putValue("storageTier", stats._storageTier); + row.putValue("brokerTenant", stats._brokerTenant); + row.putValue("serverTenant", stats._serverTenant); + row.putValue("replicas", stats._replicas); + row.putValue("tableConfig", stats._tableConfig); + rows.add(row); + } + return new SystemTableResponse(rows, System.currentTimeMillis(), totalRows); + } + + private TableStats buildStats(String tableNameWithType, TableType tableType) { + TableConfig tableConfig = _tableCache.getTableConfig(tableNameWithType); + int segments = 0; + long totalDocs = 0; + long reportedSize = 0; + long estimatedSize = 0; + String tierValue = ""; + String brokerTenant = ""; + String serverTenant = ""; + int replicas = 0; + if (tableConfig != null && tableConfig.getTenantConfig() != null) { + brokerTenant = tableConfig.getTenantConfig().getBroker(); + serverTenant = tableConfig.getTenantConfig().getServer(); + } + if (tableConfig != null && tableConfig.getValidationConfig() != null) { + Integer repl = tableConfig.getValidationConfig().getReplicationNumber(); + replicas = repl != null ? repl : replicas; + } + // Use controller API only + TableSize sizeFromController = fetchTableSize(tableNameWithType); + if (sizeFromController != null) { + if (sizeFromController._reportedSizeInBytes >= 0) { + reportedSize = sizeFromController._reportedSizeInBytes; + } + if (sizeFromController._estimatedSizeInBytes >= 0) { + estimatedSize = sizeFromController._estimatedSizeInBytes; + } + segments = getSegmentCount(sizeFromController, tableType); + totalDocs = getTotalDocs(sizeFromController, tableType); + } + String status = tableConfig != null ? "ONLINE" : (segments > 0 ? "ONLINE" : "UNKNOWN"); + String tableConfigJson = ""; + if (tableConfig != null) { + try { + tableConfigJson = JsonUtils.objectToString(tableConfig); + } catch (Exception e) { + LOGGER.warn("Failed to serialize table config for {}: {}", tableNameWithType, e.toString()); + tableConfigJson = tableConfig.toString(); + } + } + return new TableStats(tableType.name(), status, segments, totalDocs, reportedSize, estimatedSize, tierValue, + tableConfigJson, brokerTenant, serverTenant, replicas); + } + + private boolean matchesFilter(SystemTableFilter filter, TableStats stats, String rawTableName) { + if (filter == null) { + return true; + } + if (filter.getChildren().isEmpty()) { + return matchesLeafFilter(filter, stats, rawTableName); + } + switch (filter.getOperator()) { + case AND: + for (SystemTableFilter child : filter.getChildren()) { + if (!matchesFilter(child, stats, rawTableName)) { + return false; + } + } + return true; + case OR: + for (SystemTableFilter child : filter.getChildren()) { + if (matchesFilter(child, stats, rawTableName)) { + return true; + } + } + return false; + case NOT: + return filter.getChildren().isEmpty() || !matchesFilter(filter.getChildren().get(0), stats, rawTableName); + default: + return true; + } + } + + private boolean matchesLeafFilter(SystemTableFilter filter, TableStats stats, String rawTableName) { + String column = filter.getColumn(); + if (column == null) { + return true; + } + List<String> values = filter.getValues(); + if (values == null || values.isEmpty()) { + return true; + } + switch (column.toLowerCase()) { + case "tablename": + return matchesString(values, rawTableName, filter.getOperator()); + case "type": + return matchesString(values, stats._type, filter.getOperator()); + case "status": + return matchesString(values, stats._status, filter.getOperator()); + case "segments": + return matchesNumber(values, stats._segments, filter.getOperator()); + case "reportedsize": + return matchesNumber(values, stats._reportedSizeInBytes, filter.getOperator()); + case "estimatedsize": + return matchesNumber(values, stats._estimatedSizeInBytes, filter.getOperator()); + default: + return true; + } + } + + private boolean matchesString(List<String> candidates, String actual, SystemTableFilter.Operator operator) { + switch (operator) { + case EQ: + return candidates.stream().anyMatch(v -> v.equalsIgnoreCase(actual)); + case IN: + return candidates.stream().anyMatch(v -> v.equalsIgnoreCase(actual)); + case NEQ: + return candidates.stream().noneMatch(v -> v.equalsIgnoreCase(actual)); + default: + return true; + } + } + + private boolean matchesNumber(List<String> candidates, long actual, SystemTableFilter.Operator operator) { + try { + switch (operator) { + case EQ: + return candidates.stream().anyMatch(v -> Long.parseLong(v) == actual); + case NEQ: + return candidates.stream().noneMatch(v -> Long.parseLong(v) == actual); + case GT: + return candidates.stream().anyMatch(v -> actual > Long.parseLong(v)); + case GTE: + return candidates.stream().anyMatch(v -> actual >= Long.parseLong(v)); + case LT: + return candidates.stream().anyMatch(v -> actual < Long.parseLong(v)); + case LTE: + return candidates.stream().anyMatch(v -> actual <= Long.parseLong(v)); + case IN: + for (String candidate : candidates) { + if (actual == Long.parseLong(candidate)) { + return true; + } + } + return false; + default: + return true; + } + } catch (NumberFormatException e) { + LOGGER.debug("Failed to parse numeric filter value {}: {}", candidates, e.toString()); + return true; + } + } + + private @Nullable TableSize fetchTableSize(String tableNameWithType) { + Function<String, TableSize> fetcher = getSizeFetcher(); + if (fetcher != null) { + try { + TableSize fetched = fetcher.apply(tableNameWithType); + if (fetched != null) { + return fetched; + } + } catch (Exception e) { + LOGGER.warn("Table size fetcher failed for {}: {}", tableNameWithType, e.toString()); + } + } + String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); + TableSize size = fetchTableSizeForName(rawTableName); + if (size == null) { + size = fetchTableSizeForName(tableNameWithType); + if (size == null) { + LOGGER.warn("system.tables: failed to fetch size for {} (raw: {}), falling back to zeros", + tableNameWithType, rawTableName); + } + } + return size; + } + + private @Nullable TableSize fetchTableSizeForName(String tableName) { + for (String baseUrl : getControllerBaseUrls()) { + try { + String url = baseUrl + "/tables/" + tableName + "/size?verbose=true&includeReplacedSegments=false"; + HttpRequest request = HttpRequest.newBuilder(URI.create(url)) + .timeout(Duration.ofSeconds(5)) + .GET() + .header("Accept", "application/json") + .build(); + HttpResponse<String> response = _httpClient.send(request, HttpResponse.BodyHandlers.ofString()); + if (response.statusCode() >= 200 && response.statusCode() < 300) { + TableSize parsed = JsonUtils.stringToObject(response.body(), TableSize.class); + LOGGER.debug("system.tables: controller size response for {} via {} -> segments offline={}, realtime={}, " + + "reportedSize={}, estimatedSize={}", tableName, baseUrl, + parsed._offlineSegments != null && parsed._offlineSegments._segments != null + ? parsed._offlineSegments._segments.size() : 0, + parsed._realtimeSegments != null && parsed._realtimeSegments._segments != null + ? parsed._realtimeSegments._segments.size() : 0, + parsed._reportedSizeInBytes, parsed._estimatedSizeInBytes); + return parsed; + } else { + LOGGER.warn("system.tables: failed to fetch table size for {} via {}: status {}, body={}", tableName, + baseUrl, response.statusCode(), response.body()); + } + } catch (Exception e) { + LOGGER.warn("system.tables: error fetching table size for {} via {}", tableName, baseUrl, e); + } + } + return null; + } + + private List<String> getControllerBaseUrls() { + Set<String> urls = new LinkedHashSet<>(); + if (_helixAdmin != null) { + for (String controller : discoverControllersFromHelix()) { + String normalized = normalizeControllerUrl(controller); + if (normalized != null) { + urls.add(normalized); + } + } + } + for (String url : _staticControllerUrls) { + String normalized = normalizeControllerUrl(url); + if (normalized != null) { + urls.add(normalized); + } + } + return new ArrayList<>(urls); + } + + private int getSegmentCount(TableSize sizeFromController, TableType tableType) { + if (tableType == TableType.OFFLINE && sizeFromController._offlineSegments != null + && sizeFromController._offlineSegments._segments != null) { + return sizeFromController._offlineSegments._segments.size(); + } + if (tableType == TableType.REALTIME && sizeFromController._realtimeSegments != null + && sizeFromController._realtimeSegments._segments != null) { + return sizeFromController._realtimeSegments._segments.size(); + } + return 0; + } + + private long getTotalDocs(TableSize sizeFromController, TableType tableType) { + if (tableType == TableType.OFFLINE && sizeFromController._offlineSegments != null + && sizeFromController._offlineSegments._segments != null) { + return sizeFromController._offlineSegments._segments.values().stream() + .mapToLong(segmentSize -> segmentSize._totalDocs).sum(); + } + if (tableType == TableType.REALTIME && sizeFromController._realtimeSegments != null + && sizeFromController._realtimeSegments._segments != null) { + return sizeFromController._realtimeSegments._segments.values().stream() + .mapToLong(segmentSize -> segmentSize._totalDocs).sum(); + } + return 0; + } + + private @Nullable Function<String, TableSize> getSizeFetcher() { + if (_tableSizeFetcherOverride != null) { + return _tableSizeFetcherOverride; + } + List<String> controllers = getControllerBaseUrls(); + if (controllers.isEmpty() || !isAdminClientAvailable()) { + return null; + } + return tableNameWithType -> fetchWithAdminClient(controllers, tableNameWithType); + } + + private List<String> discoverControllersFromHelix() { + List<String> urls = new ArrayList<>(); + try { + if (_clusterName == null) { + LOGGER.warn("Cannot discover controllers without cluster name"); + return List.of(); + } + for (String controllerId : _helixAdmin.getInstancesInCluster(_clusterName)) { + if (!InstanceTypeUtils.isController(controllerId)) { + continue; + } + int idx = controllerId.lastIndexOf('_'); + if (idx > 0 && idx < controllerId.length() - 1) { + String host = controllerId.substring(controllerId.indexOf('_') + 1, idx); + String port = controllerId.substring(idx + 1); + urls.add(host + ":" + port); + } else { + LOGGER.warn("Unable to parse controller address from instance id: {}", controllerId); + } + } Review Comment: The parsing logic uses `indexOf('_')` for the host extraction but `lastIndexOf('_')` for validation and port extraction. If the controller ID has multiple underscores (e.g., "Controller_host_name_8080"), the host will be incorrectly extracted starting from the first underscore instead of after "Controller_". Use `idx` consistently or validate the controller ID format more carefully. ```suggestion int firstUnderscoreIdx = controllerId.indexOf('_'); int lastUnderscoreIdx = controllerId.lastIndexOf('_'); // Ensure the controllerId starts with "Controller_", and has at least two underscores if (firstUnderscoreIdx > 0 && lastUnderscoreIdx > firstUnderscoreIdx && lastUnderscoreIdx < controllerId.length() - 1) { String host = controllerId.substring(firstUnderscoreIdx + 1, lastUnderscoreIdx); String port = controllerId.substring(lastUnderscoreIdx + 1); urls.add(host + ":" + port); } else { LOGGER.warn("Unable to parse controller address from instance id: {}", controllerId); } ``` ########## pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java: ########## @@ -1033,6 +1046,244 @@ private void throwAccessDeniedError(long requestId, String query, RequestContext throw new WebApplicationException("Permission denied." + failureMessage, Response.Status.FORBIDDEN); } + private boolean isSystemTable(String tableName) { + return tableName != null && tableName.toLowerCase(Locale.ROOT).startsWith("system."); + } + + private BrokerResponse handleSystemTableQuery(PinotQuery pinotQuery, String tableName, + RequestContext requestContext, @Nullable RequesterIdentity requesterIdentity, String query) { + if (pinotQuery.isExplain()) { + return BrokerResponseNative.BROKER_ONLY_EXPLAIN_PLAN_OUTPUT; + } + SystemTableProvider provider = _systemTableRegistry.get(tableName); + if (provider == null) { + requestContext.setErrorCode(QueryErrorCode.TABLE_DOES_NOT_EXIST); + return BrokerResponseNative.TABLE_DOES_NOT_EXIST; + } + try { + if (!isSupportedSystemTableQuery(pinotQuery)) { + requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION); + return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION, + "System tables only support simple projection/filter/limit queries"); + } + Schema systemSchema = provider.getSchema(); + List<String> projectionColumns = extractProjectionColumns(pinotQuery, systemSchema); + int offset = Math.max(0, pinotQuery.getOffset()); + int limit = Math.max(0, pinotQuery.getLimit()); Review Comment: When offset or limit are negative in the PinotQuery, they are clamped to 0. However, a limit of 0 has special semantics (no results returned) which may not be the intended behavior when the original limit was negative (typically meaning "no limit"). Consider using -1 or Integer.MAX_VALUE to represent "no limit" rather than 0. ```suggestion int limit = pinotQuery.getLimit() < 0 ? -1 : pinotQuery.getLimit(); ``` ########## pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java: ########## @@ -354,6 +357,12 @@ public void start() boolean caseInsensitive = _brokerConf.getProperty(Helix.ENABLE_CASE_INSENSITIVE_KEY, Helix.DEFAULT_ENABLE_CASE_INSENSITIVE); TableCache tableCache = new ZkTableCache(_propertyStore, caseInsensitive); + if (!SystemTableRegistry.INSTANCE.isRegistered("system.tables")) { + SystemTableRegistry.INSTANCE.register(new TablesSystemTableProvider(tableCache, _helixAdmin, _clusterName)); + } + if (!SystemTableRegistry.INSTANCE.isRegistered("system.instances")) { + SystemTableRegistry.INSTANCE.register(new InstancesSystemTableProvider()); Review Comment: Using hardcoded string literals "system.tables" and "system.instances" for registration checks creates a risk of typos and makes the code harder to maintain. Define constants for these table names (e.g., in the provider classes or a shared constants file) and reference them here. ########## pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java: ########## @@ -1033,6 +1046,244 @@ private void throwAccessDeniedError(long requestId, String query, RequestContext throw new WebApplicationException("Permission denied." + failureMessage, Response.Status.FORBIDDEN); } + private boolean isSystemTable(String tableName) { + return tableName != null && tableName.toLowerCase(Locale.ROOT).startsWith("system."); + } + + private BrokerResponse handleSystemTableQuery(PinotQuery pinotQuery, String tableName, + RequestContext requestContext, @Nullable RequesterIdentity requesterIdentity, String query) { + if (pinotQuery.isExplain()) { + return BrokerResponseNative.BROKER_ONLY_EXPLAIN_PLAN_OUTPUT; + } + SystemTableProvider provider = _systemTableRegistry.get(tableName); + if (provider == null) { + requestContext.setErrorCode(QueryErrorCode.TABLE_DOES_NOT_EXIST); + return BrokerResponseNative.TABLE_DOES_NOT_EXIST; + } + try { + if (!isSupportedSystemTableQuery(pinotQuery)) { + requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION); + return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION, + "System tables only support simple projection/filter/limit queries"); + } + Schema systemSchema = provider.getSchema(); + List<String> projectionColumns = extractProjectionColumns(pinotQuery, systemSchema); + int offset = Math.max(0, pinotQuery.getOffset()); + int limit = Math.max(0, pinotQuery.getLimit()); + SystemTableRequest systemTableRequest = + new SystemTableRequest(projectionColumns, toSystemTableFilter(pinotQuery.getFilterExpression()), offset, + limit); + SystemTableResponse systemTableResponse = provider.getRows(systemTableRequest); + BrokerResponseNative brokerResponse = + buildSystemTableBrokerResponse(tableName, systemSchema, projectionColumns, systemTableResponse, + requestContext); + brokerResponse.setTimeUsedMs(System.currentTimeMillis() - requestContext.getRequestArrivalTimeMillis()); + _queryLogger.log(new QueryLogger.QueryLogParams(requestContext, tableName, brokerResponse, + QueryLogger.QueryLogParams.QueryEngine.SINGLE_STAGE, requesterIdentity, null)); + return brokerResponse; + } catch (BadQueryRequestException e) { + requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION); + _brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 1); + return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION, e.getMessage()); + } catch (Exception e) { + LOGGER.warn("Caught exception while handling system table query {}: {}", tableName, e.getMessage(), e); + requestContext.setErrorCode(QueryErrorCode.QUERY_EXECUTION); + return new BrokerResponseNative(QueryErrorCode.QUERY_EXECUTION, e.getMessage()); + } + } + + private boolean isSupportedSystemTableQuery(PinotQuery pinotQuery) { + return (pinotQuery.getGroupByList() == null || pinotQuery.getGroupByList().isEmpty()) + && pinotQuery.getHavingExpression() == null + && (pinotQuery.getOrderByList() == null || pinotQuery.getOrderByList().isEmpty()); + } + + private List<String> extractProjectionColumns(PinotQuery pinotQuery, Schema schema) + throws BadQueryRequestException { + List<String> projections = new ArrayList<>(); + boolean hasStar = false; + List<Expression> selectList = pinotQuery.getSelectList(); + if (CollectionUtils.isEmpty(selectList)) { + throw new BadQueryRequestException("System tables require a projection list"); + } + for (Expression expression : selectList) { + Identifier identifier = expression.getIdentifier(); + if (identifier != null) { + if ("*".equals(identifier.getName())) { + hasStar = true; + } else { + projections.add(identifier.getName()); + } + continue; + } + Function function = expression.getFunctionCall(); + if (function != null && "AS".equalsIgnoreCase(function.getOperator()) && !function.getOperands().isEmpty()) { + Identifier aliased = function.getOperands().get(0).getIdentifier(); + if (aliased != null) { + projections.add(aliased.getName()); + continue; + } + } + throw new BadQueryRequestException("System tables only support column projections or '*'"); Review Comment: The error message doesn't explain what was actually encountered that caused the failure. Include the problematic expression in the error message (e.g., "System tables only support column projections or '*', but found: " + expression) to help users debug their queries. ```suggestion throw new BadQueryRequestException("System tables only support column projections or '*', but found: " + expression); ``` ########## pinot-common/src/main/java/org/apache/pinot/common/systemtable/provider/TablesSystemTableProvider.java: ########## @@ -0,0 +1,597 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.systemtable.provider; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import javax.annotation.Nullable; +import org.apache.helix.HelixAdmin; +import org.apache.pinot.common.config.provider.TableCache; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.systemtable.SystemTableFilter; +import org.apache.pinot.spi.systemtable.SystemTableProvider; +import org.apache.pinot.spi.systemtable.SystemTableRequest; +import org.apache.pinot.spi.systemtable.SystemTableResponse; +import org.apache.pinot.spi.utils.InstanceTypeUtils; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Basic system table exposing table-level metadata populated from the broker {@link TableCache}. + */ +public final class TablesSystemTableProvider implements SystemTableProvider { + private static final Logger LOGGER = LoggerFactory.getLogger(TablesSystemTableProvider.class); + + private static final Schema SCHEMA = new Schema.SchemaBuilder().setSchemaName("system.tables") + .addSingleValueDimension("tableName", FieldSpec.DataType.STRING) + .addSingleValueDimension("type", FieldSpec.DataType.STRING) + .addSingleValueDimension("status", FieldSpec.DataType.STRING) + .addSingleValueDimension("segments", FieldSpec.DataType.INT) + .addSingleValueDimension("totalDocs", FieldSpec.DataType.LONG) + .addMetric("reportedSize", FieldSpec.DataType.LONG) + .addMetric("estimatedSize", FieldSpec.DataType.LONG) + .addSingleValueDimension("storageTier", FieldSpec.DataType.STRING) + .addSingleValueDimension("brokerTenant", FieldSpec.DataType.STRING) + .addSingleValueDimension("serverTenant", FieldSpec.DataType.STRING) + .addSingleValueDimension("replicas", FieldSpec.DataType.INT) + .addSingleValueDimension("tableConfig", FieldSpec.DataType.STRING) + .build(); + + private static final boolean IS_ADMIN_CLIENT_AVAILABLE; + static { + boolean available; + try { + Class.forName("org.apache.pinot.client.admin.PinotAdminClient"); + available = true; + } catch (ClassNotFoundException e) { + available = false; + } + IS_ADMIN_CLIENT_AVAILABLE = available; + } + + private final TableCache _tableCache; + private final @Nullable HelixAdmin _helixAdmin; + private final @Nullable String _clusterName; + private final HttpClient _httpClient; + private final @Nullable Function<String, TableSize> _tableSizeFetcherOverride; + private final List<String> _staticControllerUrls; + + public TablesSystemTableProvider() { + this(null, null, null, null, null); + } + + public TablesSystemTableProvider(TableCache tableCache) { + this(tableCache, null, null, null, null); + } + + public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin helixAdmin) { + this(tableCache, helixAdmin, null, null, null); + } + + public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin helixAdmin, + @Nullable String clusterName) { + this(tableCache, helixAdmin, clusterName, null, null); + } + + TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin helixAdmin, @Nullable String clusterName, + @Nullable Function<String, TableSize> tableSizeFetcherOverride, @Nullable List<String> controllerUrls) { + _tableCache = tableCache; + _helixAdmin = helixAdmin; + _clusterName = clusterName; + _httpClient = HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(5)).build(); + _tableSizeFetcherOverride = tableSizeFetcherOverride; + _staticControllerUrls = controllerUrls != null ? new ArrayList<>(controllerUrls) : List.of(); + } + + @Override + public String getTableName() { + return "system.tables"; + } + + @Override + public Schema getSchema() { + return SCHEMA; + } + + @Override + public SystemTableResponse getRows(SystemTableRequest request) { + if (_tableCache == null) { + return new SystemTableResponse(List.of(), System.currentTimeMillis(), 0); + } + Set<String> tableNamesWithType = new LinkedHashSet<>(); + for (String tableName : _tableCache.getTableNameMap().values()) { + if (TableNameBuilder.getTableTypeFromTableName(tableName) != null) { + tableNamesWithType.add(tableName); + } + } + List<String> sortedTableNames = new ArrayList<>(tableNamesWithType); + sortedTableNames.sort(Comparator.naturalOrder()); + + int offset = Math.max(0, request.getOffset()); + int limit = request.getLimit(); + boolean hasLimit = limit > 0; + int totalRows = 0; + int initialCapacity = hasLimit ? Math.min(sortedTableNames.size(), limit) : 0; + List<GenericRow> rows = new ArrayList<>(initialCapacity); + for (String tableNameWithType : sortedTableNames) { + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); + if (tableType == null) { + continue; + } + String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); + TableStats stats = buildStats(tableNameWithType, tableType); + if (!matchesFilter(request.getFilter(), stats, rawTableName)) { + continue; + } + totalRows++; + if (offset > 0) { + offset--; + continue; + } + if (limit == 0) { + continue; Review Comment: When limit is 0, the loop continues to process remaining tables even though no more rows should be added. This is inefficient and wastes CPU cycles. Add a break statement here instead of continue to exit the loop early once the limit is reached. ```suggestion break; ``` ########## pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java: ########## @@ -1033,6 +1046,244 @@ private void throwAccessDeniedError(long requestId, String query, RequestContext throw new WebApplicationException("Permission denied." + failureMessage, Response.Status.FORBIDDEN); } + private boolean isSystemTable(String tableName) { + return tableName != null && tableName.toLowerCase(Locale.ROOT).startsWith("system."); + } + + private BrokerResponse handleSystemTableQuery(PinotQuery pinotQuery, String tableName, + RequestContext requestContext, @Nullable RequesterIdentity requesterIdentity, String query) { + if (pinotQuery.isExplain()) { + return BrokerResponseNative.BROKER_ONLY_EXPLAIN_PLAN_OUTPUT; + } + SystemTableProvider provider = _systemTableRegistry.get(tableName); + if (provider == null) { + requestContext.setErrorCode(QueryErrorCode.TABLE_DOES_NOT_EXIST); + return BrokerResponseNative.TABLE_DOES_NOT_EXIST; + } + try { + if (!isSupportedSystemTableQuery(pinotQuery)) { + requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION); + return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION, + "System tables only support simple projection/filter/limit queries"); + } + Schema systemSchema = provider.getSchema(); + List<String> projectionColumns = extractProjectionColumns(pinotQuery, systemSchema); + int offset = Math.max(0, pinotQuery.getOffset()); + int limit = Math.max(0, pinotQuery.getLimit()); + SystemTableRequest systemTableRequest = + new SystemTableRequest(projectionColumns, toSystemTableFilter(pinotQuery.getFilterExpression()), offset, + limit); + SystemTableResponse systemTableResponse = provider.getRows(systemTableRequest); + BrokerResponseNative brokerResponse = + buildSystemTableBrokerResponse(tableName, systemSchema, projectionColumns, systemTableResponse, + requestContext); + brokerResponse.setTimeUsedMs(System.currentTimeMillis() - requestContext.getRequestArrivalTimeMillis()); + _queryLogger.log(new QueryLogger.QueryLogParams(requestContext, tableName, brokerResponse, + QueryLogger.QueryLogParams.QueryEngine.SINGLE_STAGE, requesterIdentity, null)); + return brokerResponse; + } catch (BadQueryRequestException e) { + requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION); + _brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 1); + return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION, e.getMessage()); + } catch (Exception e) { + LOGGER.warn("Caught exception while handling system table query {}: {}", tableName, e.getMessage(), e); + requestContext.setErrorCode(QueryErrorCode.QUERY_EXECUTION); + return new BrokerResponseNative(QueryErrorCode.QUERY_EXECUTION, e.getMessage()); + } + } + + private boolean isSupportedSystemTableQuery(PinotQuery pinotQuery) { + return (pinotQuery.getGroupByList() == null || pinotQuery.getGroupByList().isEmpty()) + && pinotQuery.getHavingExpression() == null + && (pinotQuery.getOrderByList() == null || pinotQuery.getOrderByList().isEmpty()); + } + + private List<String> extractProjectionColumns(PinotQuery pinotQuery, Schema schema) + throws BadQueryRequestException { + List<String> projections = new ArrayList<>(); + boolean hasStar = false; + List<Expression> selectList = pinotQuery.getSelectList(); + if (CollectionUtils.isEmpty(selectList)) { + throw new BadQueryRequestException("System tables require a projection list"); Review Comment: The error message "System tables require a projection list" is unclear because empty select lists are already handled by the Pinot query parser and would fail earlier. If this check is defensive, clarify the message to indicate this is an internal error (e.g., "Internal error: system table query missing projection list"). ```suggestion throw new BadQueryRequestException("Internal error: system table query missing projection list"); ``` ########## pinot-common/src/main/java/org/apache/pinot/common/systemtable/provider/TablesSystemTableProvider.java: ########## @@ -0,0 +1,597 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.systemtable.provider; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import javax.annotation.Nullable; +import org.apache.helix.HelixAdmin; +import org.apache.pinot.common.config.provider.TableCache; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.systemtable.SystemTableFilter; +import org.apache.pinot.spi.systemtable.SystemTableProvider; +import org.apache.pinot.spi.systemtable.SystemTableRequest; +import org.apache.pinot.spi.systemtable.SystemTableResponse; +import org.apache.pinot.spi.utils.InstanceTypeUtils; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Basic system table exposing table-level metadata populated from the broker {@link TableCache}. + */ +public final class TablesSystemTableProvider implements SystemTableProvider { + private static final Logger LOGGER = LoggerFactory.getLogger(TablesSystemTableProvider.class); + + private static final Schema SCHEMA = new Schema.SchemaBuilder().setSchemaName("system.tables") + .addSingleValueDimension("tableName", FieldSpec.DataType.STRING) + .addSingleValueDimension("type", FieldSpec.DataType.STRING) + .addSingleValueDimension("status", FieldSpec.DataType.STRING) + .addSingleValueDimension("segments", FieldSpec.DataType.INT) + .addSingleValueDimension("totalDocs", FieldSpec.DataType.LONG) + .addMetric("reportedSize", FieldSpec.DataType.LONG) + .addMetric("estimatedSize", FieldSpec.DataType.LONG) + .addSingleValueDimension("storageTier", FieldSpec.DataType.STRING) + .addSingleValueDimension("brokerTenant", FieldSpec.DataType.STRING) + .addSingleValueDimension("serverTenant", FieldSpec.DataType.STRING) + .addSingleValueDimension("replicas", FieldSpec.DataType.INT) + .addSingleValueDimension("tableConfig", FieldSpec.DataType.STRING) + .build(); + + private static final boolean IS_ADMIN_CLIENT_AVAILABLE; + static { + boolean available; + try { + Class.forName("org.apache.pinot.client.admin.PinotAdminClient"); + available = true; + } catch (ClassNotFoundException e) { + available = false; + } + IS_ADMIN_CLIENT_AVAILABLE = available; + } + + private final TableCache _tableCache; + private final @Nullable HelixAdmin _helixAdmin; + private final @Nullable String _clusterName; + private final HttpClient _httpClient; + private final @Nullable Function<String, TableSize> _tableSizeFetcherOverride; + private final List<String> _staticControllerUrls; + + public TablesSystemTableProvider() { + this(null, null, null, null, null); + } + + public TablesSystemTableProvider(TableCache tableCache) { + this(tableCache, null, null, null, null); + } + + public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin helixAdmin) { + this(tableCache, helixAdmin, null, null, null); + } + + public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin helixAdmin, + @Nullable String clusterName) { + this(tableCache, helixAdmin, clusterName, null, null); + } + + TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin helixAdmin, @Nullable String clusterName, + @Nullable Function<String, TableSize> tableSizeFetcherOverride, @Nullable List<String> controllerUrls) { + _tableCache = tableCache; + _helixAdmin = helixAdmin; + _clusterName = clusterName; + _httpClient = HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(5)).build(); + _tableSizeFetcherOverride = tableSizeFetcherOverride; + _staticControllerUrls = controllerUrls != null ? new ArrayList<>(controllerUrls) : List.of(); + } + + @Override + public String getTableName() { + return "system.tables"; + } + + @Override + public Schema getSchema() { + return SCHEMA; + } + + @Override + public SystemTableResponse getRows(SystemTableRequest request) { + if (_tableCache == null) { + return new SystemTableResponse(List.of(), System.currentTimeMillis(), 0); + } + Set<String> tableNamesWithType = new LinkedHashSet<>(); + for (String tableName : _tableCache.getTableNameMap().values()) { + if (TableNameBuilder.getTableTypeFromTableName(tableName) != null) { + tableNamesWithType.add(tableName); + } + } + List<String> sortedTableNames = new ArrayList<>(tableNamesWithType); + sortedTableNames.sort(Comparator.naturalOrder()); + + int offset = Math.max(0, request.getOffset()); + int limit = request.getLimit(); + boolean hasLimit = limit > 0; + int totalRows = 0; + int initialCapacity = hasLimit ? Math.min(sortedTableNames.size(), limit) : 0; + List<GenericRow> rows = new ArrayList<>(initialCapacity); + for (String tableNameWithType : sortedTableNames) { + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); + if (tableType == null) { + continue; + } + String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); + TableStats stats = buildStats(tableNameWithType, tableType); + if (!matchesFilter(request.getFilter(), stats, rawTableName)) { + continue; + } + totalRows++; + if (offset > 0) { + offset--; + continue; + } + if (limit == 0) { + continue; + } + if (hasLimit && rows.size() >= limit) { + continue; + } + GenericRow row = new GenericRow(); + row.putValue("tableName", rawTableName); + row.putValue("type", stats._type); + row.putValue("status", stats._status); + row.putValue("segments", stats._segments); + row.putValue("totalDocs", stats._totalDocs); + row.putValue("reportedSize", stats._reportedSizeInBytes); + row.putValue("estimatedSize", stats._estimatedSizeInBytes); + row.putValue("storageTier", stats._storageTier); + row.putValue("brokerTenant", stats._brokerTenant); + row.putValue("serverTenant", stats._serverTenant); + row.putValue("replicas", stats._replicas); + row.putValue("tableConfig", stats._tableConfig); + rows.add(row); + } + return new SystemTableResponse(rows, System.currentTimeMillis(), totalRows); + } + + private TableStats buildStats(String tableNameWithType, TableType tableType) { + TableConfig tableConfig = _tableCache.getTableConfig(tableNameWithType); + int segments = 0; + long totalDocs = 0; + long reportedSize = 0; + long estimatedSize = 0; + String tierValue = ""; + String brokerTenant = ""; + String serverTenant = ""; + int replicas = 0; + if (tableConfig != null && tableConfig.getTenantConfig() != null) { + brokerTenant = tableConfig.getTenantConfig().getBroker(); + serverTenant = tableConfig.getTenantConfig().getServer(); + } + if (tableConfig != null && tableConfig.getValidationConfig() != null) { + Integer repl = tableConfig.getValidationConfig().getReplicationNumber(); + replicas = repl != null ? repl : replicas; + } + // Use controller API only + TableSize sizeFromController = fetchTableSize(tableNameWithType); + if (sizeFromController != null) { + if (sizeFromController._reportedSizeInBytes >= 0) { + reportedSize = sizeFromController._reportedSizeInBytes; + } + if (sizeFromController._estimatedSizeInBytes >= 0) { + estimatedSize = sizeFromController._estimatedSizeInBytes; + } + segments = getSegmentCount(sizeFromController, tableType); + totalDocs = getTotalDocs(sizeFromController, tableType); + } + String status = tableConfig != null ? "ONLINE" : (segments > 0 ? "ONLINE" : "UNKNOWN"); Review Comment: The status determination logic is duplicated (both branches return "ONLINE" when their condition is true). Simplify this to `String status = (tableConfig != null || segments > 0) ? "ONLINE" : "UNKNOWN";` for better readability and maintainability. ```suggestion String status = (tableConfig != null || segments > 0) ? "ONLINE" : "UNKNOWN"; ``` ########## pinot-common/src/main/java/org/apache/pinot/common/systemtable/provider/TablesSystemTableProvider.java: ########## @@ -0,0 +1,597 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.systemtable.provider; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import javax.annotation.Nullable; +import org.apache.helix.HelixAdmin; +import org.apache.pinot.common.config.provider.TableCache; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.systemtable.SystemTableFilter; +import org.apache.pinot.spi.systemtable.SystemTableProvider; +import org.apache.pinot.spi.systemtable.SystemTableRequest; +import org.apache.pinot.spi.systemtable.SystemTableResponse; +import org.apache.pinot.spi.utils.InstanceTypeUtils; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Basic system table exposing table-level metadata populated from the broker {@link TableCache}. + */ +public final class TablesSystemTableProvider implements SystemTableProvider { + private static final Logger LOGGER = LoggerFactory.getLogger(TablesSystemTableProvider.class); + + private static final Schema SCHEMA = new Schema.SchemaBuilder().setSchemaName("system.tables") + .addSingleValueDimension("tableName", FieldSpec.DataType.STRING) + .addSingleValueDimension("type", FieldSpec.DataType.STRING) + .addSingleValueDimension("status", FieldSpec.DataType.STRING) + .addSingleValueDimension("segments", FieldSpec.DataType.INT) + .addSingleValueDimension("totalDocs", FieldSpec.DataType.LONG) + .addMetric("reportedSize", FieldSpec.DataType.LONG) + .addMetric("estimatedSize", FieldSpec.DataType.LONG) + .addSingleValueDimension("storageTier", FieldSpec.DataType.STRING) + .addSingleValueDimension("brokerTenant", FieldSpec.DataType.STRING) + .addSingleValueDimension("serverTenant", FieldSpec.DataType.STRING) + .addSingleValueDimension("replicas", FieldSpec.DataType.INT) + .addSingleValueDimension("tableConfig", FieldSpec.DataType.STRING) + .build(); + + private static final boolean IS_ADMIN_CLIENT_AVAILABLE; + static { + boolean available; + try { + Class.forName("org.apache.pinot.client.admin.PinotAdminClient"); + available = true; + } catch (ClassNotFoundException e) { + available = false; + } + IS_ADMIN_CLIENT_AVAILABLE = available; + } + + private final TableCache _tableCache; + private final @Nullable HelixAdmin _helixAdmin; + private final @Nullable String _clusterName; + private final HttpClient _httpClient; + private final @Nullable Function<String, TableSize> _tableSizeFetcherOverride; + private final List<String> _staticControllerUrls; + + public TablesSystemTableProvider() { + this(null, null, null, null, null); + } + + public TablesSystemTableProvider(TableCache tableCache) { + this(tableCache, null, null, null, null); + } + + public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin helixAdmin) { + this(tableCache, helixAdmin, null, null, null); + } + + public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin helixAdmin, + @Nullable String clusterName) { + this(tableCache, helixAdmin, clusterName, null, null); + } + + TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin helixAdmin, @Nullable String clusterName, + @Nullable Function<String, TableSize> tableSizeFetcherOverride, @Nullable List<String> controllerUrls) { + _tableCache = tableCache; + _helixAdmin = helixAdmin; + _clusterName = clusterName; + _httpClient = HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(5)).build(); + _tableSizeFetcherOverride = tableSizeFetcherOverride; + _staticControllerUrls = controllerUrls != null ? new ArrayList<>(controllerUrls) : List.of(); + } + + @Override + public String getTableName() { + return "system.tables"; + } + + @Override + public Schema getSchema() { + return SCHEMA; + } + + @Override + public SystemTableResponse getRows(SystemTableRequest request) { + if (_tableCache == null) { + return new SystemTableResponse(List.of(), System.currentTimeMillis(), 0); + } + Set<String> tableNamesWithType = new LinkedHashSet<>(); + for (String tableName : _tableCache.getTableNameMap().values()) { + if (TableNameBuilder.getTableTypeFromTableName(tableName) != null) { + tableNamesWithType.add(tableName); + } + } + List<String> sortedTableNames = new ArrayList<>(tableNamesWithType); + sortedTableNames.sort(Comparator.naturalOrder()); + + int offset = Math.max(0, request.getOffset()); + int limit = request.getLimit(); + boolean hasLimit = limit > 0; + int totalRows = 0; + int initialCapacity = hasLimit ? Math.min(sortedTableNames.size(), limit) : 0; + List<GenericRow> rows = new ArrayList<>(initialCapacity); + for (String tableNameWithType : sortedTableNames) { + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); + if (tableType == null) { + continue; + } + String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); + TableStats stats = buildStats(tableNameWithType, tableType); + if (!matchesFilter(request.getFilter(), stats, rawTableName)) { + continue; + } + totalRows++; + if (offset > 0) { + offset--; + continue; + } + if (limit == 0) { + continue; + } + if (hasLimit && rows.size() >= limit) { + continue; + } + GenericRow row = new GenericRow(); + row.putValue("tableName", rawTableName); + row.putValue("type", stats._type); + row.putValue("status", stats._status); + row.putValue("segments", stats._segments); + row.putValue("totalDocs", stats._totalDocs); + row.putValue("reportedSize", stats._reportedSizeInBytes); + row.putValue("estimatedSize", stats._estimatedSizeInBytes); + row.putValue("storageTier", stats._storageTier); + row.putValue("brokerTenant", stats._brokerTenant); + row.putValue("serverTenant", stats._serverTenant); + row.putValue("replicas", stats._replicas); + row.putValue("tableConfig", stats._tableConfig); + rows.add(row); + } + return new SystemTableResponse(rows, System.currentTimeMillis(), totalRows); + } + + private TableStats buildStats(String tableNameWithType, TableType tableType) { + TableConfig tableConfig = _tableCache.getTableConfig(tableNameWithType); + int segments = 0; + long totalDocs = 0; + long reportedSize = 0; + long estimatedSize = 0; + String tierValue = ""; + String brokerTenant = ""; + String serverTenant = ""; + int replicas = 0; + if (tableConfig != null && tableConfig.getTenantConfig() != null) { + brokerTenant = tableConfig.getTenantConfig().getBroker(); + serverTenant = tableConfig.getTenantConfig().getServer(); + } + if (tableConfig != null && tableConfig.getValidationConfig() != null) { + Integer repl = tableConfig.getValidationConfig().getReplicationNumber(); + replicas = repl != null ? repl : replicas; + } + // Use controller API only + TableSize sizeFromController = fetchTableSize(tableNameWithType); + if (sizeFromController != null) { + if (sizeFromController._reportedSizeInBytes >= 0) { + reportedSize = sizeFromController._reportedSizeInBytes; + } + if (sizeFromController._estimatedSizeInBytes >= 0) { + estimatedSize = sizeFromController._estimatedSizeInBytes; + } + segments = getSegmentCount(sizeFromController, tableType); + totalDocs = getTotalDocs(sizeFromController, tableType); + } + String status = tableConfig != null ? "ONLINE" : (segments > 0 ? "ONLINE" : "UNKNOWN"); + String tableConfigJson = ""; + if (tableConfig != null) { + try { + tableConfigJson = JsonUtils.objectToString(tableConfig); + } catch (Exception e) { + LOGGER.warn("Failed to serialize table config for {}: {}", tableNameWithType, e.toString()); + tableConfigJson = tableConfig.toString(); + } + } + return new TableStats(tableType.name(), status, segments, totalDocs, reportedSize, estimatedSize, tierValue, + tableConfigJson, brokerTenant, serverTenant, replicas); + } + + private boolean matchesFilter(SystemTableFilter filter, TableStats stats, String rawTableName) { + if (filter == null) { + return true; + } + if (filter.getChildren().isEmpty()) { + return matchesLeafFilter(filter, stats, rawTableName); + } + switch (filter.getOperator()) { + case AND: + for (SystemTableFilter child : filter.getChildren()) { + if (!matchesFilter(child, stats, rawTableName)) { + return false; + } + } + return true; + case OR: + for (SystemTableFilter child : filter.getChildren()) { + if (matchesFilter(child, stats, rawTableName)) { + return true; + } + } + return false; + case NOT: + return filter.getChildren().isEmpty() || !matchesFilter(filter.getChildren().get(0), stats, rawTableName); + default: + return true; + } + } + + private boolean matchesLeafFilter(SystemTableFilter filter, TableStats stats, String rawTableName) { + String column = filter.getColumn(); + if (column == null) { + return true; + } + List<String> values = filter.getValues(); + if (values == null || values.isEmpty()) { + return true; + } + switch (column.toLowerCase()) { + case "tablename": + return matchesString(values, rawTableName, filter.getOperator()); + case "type": + return matchesString(values, stats._type, filter.getOperator()); + case "status": + return matchesString(values, stats._status, filter.getOperator()); + case "segments": + return matchesNumber(values, stats._segments, filter.getOperator()); + case "reportedsize": + return matchesNumber(values, stats._reportedSizeInBytes, filter.getOperator()); + case "estimatedsize": + return matchesNumber(values, stats._estimatedSizeInBytes, filter.getOperator()); + default: + return true; + } + } + + private boolean matchesString(List<String> candidates, String actual, SystemTableFilter.Operator operator) { + switch (operator) { + case EQ: + return candidates.stream().anyMatch(v -> v.equalsIgnoreCase(actual)); + case IN: + return candidates.stream().anyMatch(v -> v.equalsIgnoreCase(actual)); + case NEQ: + return candidates.stream().noneMatch(v -> v.equalsIgnoreCase(actual)); + default: + return true; + } + } + + private boolean matchesNumber(List<String> candidates, long actual, SystemTableFilter.Operator operator) { + try { + switch (operator) { + case EQ: + return candidates.stream().anyMatch(v -> Long.parseLong(v) == actual); + case NEQ: + return candidates.stream().noneMatch(v -> Long.parseLong(v) == actual); + case GT: + return candidates.stream().anyMatch(v -> actual > Long.parseLong(v)); + case GTE: + return candidates.stream().anyMatch(v -> actual >= Long.parseLong(v)); + case LT: + return candidates.stream().anyMatch(v -> actual < Long.parseLong(v)); + case LTE: + return candidates.stream().anyMatch(v -> actual <= Long.parseLong(v)); + case IN: + for (String candidate : candidates) { + if (actual == Long.parseLong(candidate)) { + return true; + } + } + return false; + default: + return true; + } + } catch (NumberFormatException e) { + LOGGER.debug("Failed to parse numeric filter value {}: {}", candidates, e.toString()); + return true; + } + } + + private @Nullable TableSize fetchTableSize(String tableNameWithType) { + Function<String, TableSize> fetcher = getSizeFetcher(); + if (fetcher != null) { + try { + TableSize fetched = fetcher.apply(tableNameWithType); + if (fetched != null) { + return fetched; + } + } catch (Exception e) { + LOGGER.warn("Table size fetcher failed for {}: {}", tableNameWithType, e.toString()); + } + } + String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); + TableSize size = fetchTableSizeForName(rawTableName); + if (size == null) { + size = fetchTableSizeForName(tableNameWithType); + if (size == null) { + LOGGER.warn("system.tables: failed to fetch size for {} (raw: {}), falling back to zeros", Review Comment: The warning message says "falling back to zeros" but the code actually returns null when size fetching fails, not zero values. Update the message to accurately reflect the behavior (e.g., "failed to fetch size, returning null") or change the code to return a TableSize object with zero values as the message suggests. ```suggestion LOGGER.warn("system.tables: failed to fetch size for {} (raw: {}), returning null", ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
