This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 891e5438de Fix TableDoesNotExistError for hybrid tables in MSE queries 
in controller API (#16102)
891e5438de is described below

commit 891e5438de18a53ca59e64aac58cd52de9763460
Author: Rajat Venkatesh <1638298+vra...@users.noreply.github.com>
AuthorDate: Sat Jun 14 02:37:49 2025 +0530

    Fix TableDoesNotExistError for hybrid tables in MSE queries in controller 
API (#16102)
---
 .../api/resources/PinotQueryResource.java          | 90 +++++++---------------
 .../tests/HybridClusterIntegrationTest.java        | 53 +++++++++++++
 2 files changed, 81 insertions(+), 62 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
index ceaaa8bf07..af3e2eefb0 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
@@ -68,7 +68,6 @@ import org.apache.pinot.core.auth.ManualAuthorization;
 import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
 import org.apache.pinot.query.QueryEnvironment;
 import org.apache.pinot.query.parser.utils.ParserUtils;
-import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.LogicalTableConfig;
 import org.apache.pinot.spi.exception.DatabaseConflictException;
 import org.apache.pinot.spi.exception.QueryErrorCode;
@@ -240,31 +239,8 @@ public class PinotQueryResource {
   private List<String> getInstanceIds(String query, List<String> tableNames, 
String database) {
     List<String> instanceIds;
     if (!tableNames.isEmpty()) {
-      List<TableConfig> tableConfigList = getListTableConfigs(tableNames, 
database);
-      List<LogicalTableConfig> logicalTableConfigList = null;
-      // First check for table configs, if not found, check for logical table 
configs.
-      if (tableConfigList.size() != tableNames.size()) {
-        logicalTableConfigList = getListLogicalTableConfigs(tableNames, 
database);
-        // If config is not found for all tables, then find the tables that 
are not found.
-        if ((tableConfigList.size() + logicalTableConfigList.size()) != 
tableNames.size()) {
-          Set<String> tableNamesFoundSet = new HashSet<>();
-          for (TableConfig tableConfig : tableConfigList) {
-            tableNamesFoundSet.add(tableConfig.getTableName());
-          }
-          for (LogicalTableConfig logicalTableConfig : logicalTableConfigList) 
{
-            tableNamesFoundSet.add(logicalTableConfig.getTableName());
-          }
-
-          List<String> tablesNotFound = tableNames.stream().filter(name -> 
!tableNamesFoundSet.contains(name))
-              .collect(Collectors.toList());
-
-          throw QueryErrorCode.TABLE_DOES_NOT_EXIST.asException(
-              "Unable to find table in cluster, table does not exist for 
tables: " + tablesNotFound);
-        }
-      }
-
       // find the unions of all the broker tenant tags of the queried tables.
-      Set<String> brokerTenantsUnion = getBrokerTenantsUnion(tableConfigList, 
logicalTableConfigList);
+      Set<String> brokerTenantsUnion = getBrokerTenants(tableNames, database);
       if (brokerTenantsUnion.isEmpty()) {
         throw QueryErrorCode.BROKER_REQUEST_SEND.asException("Unable to find 
broker tenant for tables: " + tableNames);
       }
@@ -332,38 +308,44 @@ public class PinotQueryResource {
   }
 
   // given a list of tables, returns the list of tableConfigs
-  private List<TableConfig> getListTableConfigs(List<String> tableNames, 
String database) {
-    List<TableConfig> allTableConfigList = new ArrayList<>();
+  private Set<String> getBrokerTenants(List<String> tableNames, String 
database) {
+    Set<String> brokerTenants = new HashSet<>(tableNames.size());
+    List<String> tablesNotFound = new ArrayList<>(tableNames.size());
     for (String tableName : tableNames) {
+      boolean found = false;
       String actualTableName = 
_pinotHelixResourceManager.getActualTableName(tableName, database);
-      List<TableConfig> tableConfigList = new ArrayList<>();
       if (_pinotHelixResourceManager.hasRealtimeTable(actualTableName)) {
-        
tableConfigList.add(Objects.requireNonNull(_pinotHelixResourceManager.getRealtimeTableConfig(actualTableName)));
+        
brokerTenants.add(Objects.requireNonNull(_pinotHelixResourceManager.getRealtimeTableConfig(actualTableName))
+            .getTenantConfig().getBroker());
+        found = true;
       }
       if (_pinotHelixResourceManager.hasOfflineTable(actualTableName)) {
-        
tableConfigList.add(Objects.requireNonNull(_pinotHelixResourceManager.getOfflineTableConfig(actualTableName)));
+        
brokerTenants.add(Objects.requireNonNull(_pinotHelixResourceManager.getOfflineTableConfig(actualTableName))
+            .getTenantConfig().getBroker());
+        found = true;
       }
-      // If no table configs found for the table, skip it.
-      if (!tableConfigList.isEmpty()) {
-        allTableConfigList.addAll(tableConfigList);
+
+      if (!found) {
+        actualTableName = 
_pinotHelixResourceManager.getActualLogicalTableName(tableName, database);
+        LogicalTableConfig logicalTableConfig =
+            _pinotHelixResourceManager.getLogicalTableConfig(actualTableName);
+        if (logicalTableConfig != null) {
+          brokerTenants.add(logicalTableConfig.getBrokerTenant());
+          found = true;
+        }
       }
-    }
-    return allTableConfigList;
-  }
 
-  private List<LogicalTableConfig> getListLogicalTableConfigs(List<String> 
tableNames, String database) {
-    List<LogicalTableConfig> allLogicalTableConfigList = new ArrayList<>();
-    for (String tableName : tableNames) {
-      String actualTableName = 
_pinotHelixResourceManager.getActualLogicalTableName(tableName, database);
-      LogicalTableConfig logicalTableConfig =
-          _pinotHelixResourceManager.getLogicalTableConfig(actualTableName);
-      if (logicalTableConfig != null) {
-        allLogicalTableConfigList.add(logicalTableConfig);
+      if (!found) {
+        tablesNotFound.add(tableName);
       }
     }
-    return allLogicalTableConfigList;
-  }
 
+    if (!tablesNotFound.isEmpty()) {
+      throw QueryErrorCode.TABLE_DOES_NOT_EXIST.asException(
+          "Unable to find table in cluster, table does not exist for tables: " 
+ tablesNotFound);
+    }
+    return brokerTenants;
+  }
 
   private String selectRandomInstanceId(List<String> instanceIds) {
     if (instanceIds.isEmpty()) {
@@ -388,22 +370,6 @@ public class PinotQueryResource {
     return 
brokerInstanceConfigs.map(InstanceConfig::getInstanceName).collect(Collectors.toList());
   }
 
-  // return the union of brokerTenants from the tables list.
-  private Set<String> getBrokerTenantsUnion(List<TableConfig> tableConfigList,
-      @Nullable List<LogicalTableConfig> logicalTableConfigList) {
-    Set<String> tableBrokerTenants = new HashSet<>();
-    for (TableConfig tableConfig : tableConfigList) {
-      tableBrokerTenants.add(tableConfig.getTenantConfig().getBroker());
-    }
-
-    if (logicalTableConfigList != null) {
-      for (LogicalTableConfig logicalTableConfig : logicalTableConfigList) {
-        tableBrokerTenants.add(logicalTableConfig.getBrokerTenant());
-      }
-    }
-    return tableBrokerTenants;
-  }
-
   private StreamingOutput sendRequestToBroker(String query, String instanceId, 
String traceEnabled, String queryOptions,
       HttpHeaders httpHeaders) {
     InstanceConfig instanceConfig = 
_pinotHelixResourceManager.getHelixInstanceConfig(instanceId);
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
index e72a054ec5..d60ce7b428 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
@@ -26,10 +26,12 @@ import org.apache.helix.model.IdealState;
 import org.apache.pinot.broker.broker.helix.BaseBrokerStarter;
 import org.apache.pinot.common.utils.URIUtils;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.exception.QueryErrorCode;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.util.TestUtils;
+import org.intellij.lang.annotations.Language;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -308,4 +310,55 @@ public class HybridClusterIntegrationTest extends 
BaseHybridClusterIntegrationTe
       throws Exception {
     super.testVirtualColumnQueries();
   }
+
+  @Test(dataProvider = "useBothQueryEngines")
+  void testControllerQuerySubmit(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    // Hybrid Table
+    @Language("sql")
+    String query = "SELECT count(*) FROM " + getTableName();
+    JsonNode response = postQueryToController(query);
+    assertNoError(response);
+
+    // Offline table
+    String tableName = 
TableNameBuilder.OFFLINE.tableNameWithType(getTableName());
+    query = "SELECT count(*) FROM " + tableName;
+    response = postQueryToController(query);
+    assertNoError(response);
+
+    tableName = TableNameBuilder.REALTIME.tableNameWithType(getTableName());
+    query = "SELECT count(*) FROM " + tableName;
+    response = postQueryToController(query);
+    assertNoError(response);
+
+    query = "SELECT count(*) FROM unknown";
+    response = postQueryToController(query);
+    if (useMultiStageQueryEngine) {
+      
QueryAssert.assertThat(response).firstException().hasErrorCode(QueryErrorCode.TABLE_DOES_NOT_EXIST)
+          .containsMessage("TableDoesNotExistError");
+    } else {
+      
QueryAssert.assertThat(response).firstException().hasErrorCode(QueryErrorCode.BROKER_RESOURCE_MISSING)
+          .containsMessage("BrokerResourceMissingError");
+    }
+  }
+
+  @Test
+  void testControllerJoinQuerySubmit()
+      throws Exception {
+    setUseMultiStageQueryEngine(true);
+    // Hybrid Table
+    @Language("sql")
+    String query = "SELECT count(*) FROM unknown JOIN " + getTableName()
+        + " ON unknown.FlightNum = " + getTableName() + ".FlightNum";
+    JsonNode response = postQueryToController(query);
+    
QueryAssert.assertThat(response).firstException().hasErrorCode(QueryErrorCode.TABLE_DOES_NOT_EXIST)
+        .containsMessage("TableDoesNotExistError");
+
+    query = "SELECT count(*) FROM unknown_1 JOIN unknown_2  ON "
+        + "unknown_1.FlightNum = unknown_2.FlightNum";
+    response = postQueryToController(query);
+    
QueryAssert.assertThat(response).firstException().hasErrorCode(QueryErrorCode.TABLE_DOES_NOT_EXIST)
+        .containsMessage("TableDoesNotExistError");
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to