This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 891e5438de Fix TableDoesNotExistError for hybrid tables in MSE queries in controller API (#16102) 891e5438de is described below commit 891e5438de18a53ca59e64aac58cd52de9763460 Author: Rajat Venkatesh <1638298+vra...@users.noreply.github.com> AuthorDate: Sat Jun 14 02:37:49 2025 +0530 Fix TableDoesNotExistError for hybrid tables in MSE queries in controller API (#16102) --- .../api/resources/PinotQueryResource.java | 90 +++++++--------------- .../tests/HybridClusterIntegrationTest.java | 53 +++++++++++++ 2 files changed, 81 insertions(+), 62 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java index ceaaa8bf07..af3e2eefb0 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java @@ -68,7 +68,6 @@ import org.apache.pinot.core.auth.ManualAuthorization; import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor; import org.apache.pinot.query.QueryEnvironment; import org.apache.pinot.query.parser.utils.ParserUtils; -import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.LogicalTableConfig; import org.apache.pinot.spi.exception.DatabaseConflictException; import org.apache.pinot.spi.exception.QueryErrorCode; @@ -240,31 +239,8 @@ public class PinotQueryResource { private List<String> getInstanceIds(String query, List<String> tableNames, String database) { List<String> instanceIds; if (!tableNames.isEmpty()) { - List<TableConfig> tableConfigList = getListTableConfigs(tableNames, database); - List<LogicalTableConfig> logicalTableConfigList = null; - // First check for table configs, if not found, check for logical table configs. - if (tableConfigList.size() != tableNames.size()) { - logicalTableConfigList = getListLogicalTableConfigs(tableNames, database); - // If config is not found for all tables, then find the tables that are not found. - if ((tableConfigList.size() + logicalTableConfigList.size()) != tableNames.size()) { - Set<String> tableNamesFoundSet = new HashSet<>(); - for (TableConfig tableConfig : tableConfigList) { - tableNamesFoundSet.add(tableConfig.getTableName()); - } - for (LogicalTableConfig logicalTableConfig : logicalTableConfigList) { - tableNamesFoundSet.add(logicalTableConfig.getTableName()); - } - - List<String> tablesNotFound = tableNames.stream().filter(name -> !tableNamesFoundSet.contains(name)) - .collect(Collectors.toList()); - - throw QueryErrorCode.TABLE_DOES_NOT_EXIST.asException( - "Unable to find table in cluster, table does not exist for tables: " + tablesNotFound); - } - } - // find the unions of all the broker tenant tags of the queried tables. - Set<String> brokerTenantsUnion = getBrokerTenantsUnion(tableConfigList, logicalTableConfigList); + Set<String> brokerTenantsUnion = getBrokerTenants(tableNames, database); if (brokerTenantsUnion.isEmpty()) { throw QueryErrorCode.BROKER_REQUEST_SEND.asException("Unable to find broker tenant for tables: " + tableNames); } @@ -332,38 +308,44 @@ public class PinotQueryResource { } // given a list of tables, returns the list of tableConfigs - private List<TableConfig> getListTableConfigs(List<String> tableNames, String database) { - List<TableConfig> allTableConfigList = new ArrayList<>(); + private Set<String> getBrokerTenants(List<String> tableNames, String database) { + Set<String> brokerTenants = new HashSet<>(tableNames.size()); + List<String> tablesNotFound = new ArrayList<>(tableNames.size()); for (String tableName : tableNames) { + boolean found = false; String actualTableName = _pinotHelixResourceManager.getActualTableName(tableName, database); - List<TableConfig> tableConfigList = new ArrayList<>(); if (_pinotHelixResourceManager.hasRealtimeTable(actualTableName)) { - tableConfigList.add(Objects.requireNonNull(_pinotHelixResourceManager.getRealtimeTableConfig(actualTableName))); + brokerTenants.add(Objects.requireNonNull(_pinotHelixResourceManager.getRealtimeTableConfig(actualTableName)) + .getTenantConfig().getBroker()); + found = true; } if (_pinotHelixResourceManager.hasOfflineTable(actualTableName)) { - tableConfigList.add(Objects.requireNonNull(_pinotHelixResourceManager.getOfflineTableConfig(actualTableName))); + brokerTenants.add(Objects.requireNonNull(_pinotHelixResourceManager.getOfflineTableConfig(actualTableName)) + .getTenantConfig().getBroker()); + found = true; } - // If no table configs found for the table, skip it. - if (!tableConfigList.isEmpty()) { - allTableConfigList.addAll(tableConfigList); + + if (!found) { + actualTableName = _pinotHelixResourceManager.getActualLogicalTableName(tableName, database); + LogicalTableConfig logicalTableConfig = + _pinotHelixResourceManager.getLogicalTableConfig(actualTableName); + if (logicalTableConfig != null) { + brokerTenants.add(logicalTableConfig.getBrokerTenant()); + found = true; + } } - } - return allTableConfigList; - } - private List<LogicalTableConfig> getListLogicalTableConfigs(List<String> tableNames, String database) { - List<LogicalTableConfig> allLogicalTableConfigList = new ArrayList<>(); - for (String tableName : tableNames) { - String actualTableName = _pinotHelixResourceManager.getActualLogicalTableName(tableName, database); - LogicalTableConfig logicalTableConfig = - _pinotHelixResourceManager.getLogicalTableConfig(actualTableName); - if (logicalTableConfig != null) { - allLogicalTableConfigList.add(logicalTableConfig); + if (!found) { + tablesNotFound.add(tableName); } } - return allLogicalTableConfigList; - } + if (!tablesNotFound.isEmpty()) { + throw QueryErrorCode.TABLE_DOES_NOT_EXIST.asException( + "Unable to find table in cluster, table does not exist for tables: " + tablesNotFound); + } + return brokerTenants; + } private String selectRandomInstanceId(List<String> instanceIds) { if (instanceIds.isEmpty()) { @@ -388,22 +370,6 @@ public class PinotQueryResource { return brokerInstanceConfigs.map(InstanceConfig::getInstanceName).collect(Collectors.toList()); } - // return the union of brokerTenants from the tables list. - private Set<String> getBrokerTenantsUnion(List<TableConfig> tableConfigList, - @Nullable List<LogicalTableConfig> logicalTableConfigList) { - Set<String> tableBrokerTenants = new HashSet<>(); - for (TableConfig tableConfig : tableConfigList) { - tableBrokerTenants.add(tableConfig.getTenantConfig().getBroker()); - } - - if (logicalTableConfigList != null) { - for (LogicalTableConfig logicalTableConfig : logicalTableConfigList) { - tableBrokerTenants.add(logicalTableConfig.getBrokerTenant()); - } - } - return tableBrokerTenants; - } - private StreamingOutput sendRequestToBroker(String query, String instanceId, String traceEnabled, String queryOptions, HttpHeaders httpHeaders) { InstanceConfig instanceConfig = _pinotHelixResourceManager.getHelixInstanceConfig(instanceId); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java index e72a054ec5..d60ce7b428 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java @@ -26,10 +26,12 @@ import org.apache.helix.model.IdealState; import org.apache.pinot.broker.broker.helix.BaseBrokerStarter; import org.apache.pinot.common.utils.URIUtils; import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.exception.QueryErrorCode; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.util.TestUtils; +import org.intellij.lang.annotations.Language; import org.testng.Assert; import org.testng.annotations.Test; @@ -308,4 +310,55 @@ public class HybridClusterIntegrationTest extends BaseHybridClusterIntegrationTe throws Exception { super.testVirtualColumnQueries(); } + + @Test(dataProvider = "useBothQueryEngines") + void testControllerQuerySubmit(boolean useMultiStageQueryEngine) + throws Exception { + setUseMultiStageQueryEngine(useMultiStageQueryEngine); + // Hybrid Table + @Language("sql") + String query = "SELECT count(*) FROM " + getTableName(); + JsonNode response = postQueryToController(query); + assertNoError(response); + + // Offline table + String tableName = TableNameBuilder.OFFLINE.tableNameWithType(getTableName()); + query = "SELECT count(*) FROM " + tableName; + response = postQueryToController(query); + assertNoError(response); + + tableName = TableNameBuilder.REALTIME.tableNameWithType(getTableName()); + query = "SELECT count(*) FROM " + tableName; + response = postQueryToController(query); + assertNoError(response); + + query = "SELECT count(*) FROM unknown"; + response = postQueryToController(query); + if (useMultiStageQueryEngine) { + QueryAssert.assertThat(response).firstException().hasErrorCode(QueryErrorCode.TABLE_DOES_NOT_EXIST) + .containsMessage("TableDoesNotExistError"); + } else { + QueryAssert.assertThat(response).firstException().hasErrorCode(QueryErrorCode.BROKER_RESOURCE_MISSING) + .containsMessage("BrokerResourceMissingError"); + } + } + + @Test + void testControllerJoinQuerySubmit() + throws Exception { + setUseMultiStageQueryEngine(true); + // Hybrid Table + @Language("sql") + String query = "SELECT count(*) FROM unknown JOIN " + getTableName() + + " ON unknown.FlightNum = " + getTableName() + ".FlightNum"; + JsonNode response = postQueryToController(query); + QueryAssert.assertThat(response).firstException().hasErrorCode(QueryErrorCode.TABLE_DOES_NOT_EXIST) + .containsMessage("TableDoesNotExistError"); + + query = "SELECT count(*) FROM unknown_1 JOIN unknown_2 ON " + + "unknown_1.FlightNum = unknown_2.FlightNum"; + response = postQueryToController(query); + QueryAssert.assertThat(response).firstException().hasErrorCode(QueryErrorCode.TABLE_DOES_NOT_EXIST) + .containsMessage("TableDoesNotExistError"); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org