This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f618cf3cc4 Improve database handling in multi-stage engine (#14040)
f618cf3cc4 is described below

commit f618cf3cc412e91df19d95c09faf2fcd58c1e84e
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Mon Sep 23 08:51:09 2024 -0700

    Improve database handling in multi-stage engine (#14040)
---
 .../MultiStageBrokerRequestHandler.java            |  3 --
 .../apache/pinot/common/utils/DatabaseUtils.java   | 14 +++++---
 .../rel/rules/PinotRelDistributionTraitRule.java   | 23 +++++++------
 .../org/apache/pinot/query/QueryEnvironment.java   |  2 +-
 .../apache/pinot/query/catalog/PinotCatalog.java   |  7 +---
 .../planner/logical/RelToPlanNodeConverter.java    | 38 ++++++++++------------
 .../query/runtime/queries/QueryRunnerTest.java     |  3 +-
 7 files changed, 44 insertions(+), 46 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index d5791d1661..b683bbef0a 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -54,7 +54,6 @@ import org.apache.pinot.common.utils.config.QueryOptionsUtils;
 import org.apache.pinot.core.auth.Actions;
 import org.apache.pinot.core.auth.TargetType;
 import org.apache.pinot.query.QueryEnvironment;
-import org.apache.pinot.query.catalog.PinotCatalog;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
 import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
@@ -83,7 +82,6 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
 
   private final WorkerManager _workerManager;
   private final QueryDispatcher _queryDispatcher;
-  private final PinotCatalog _catalog;
 
   public MultiStageBrokerRequestHandler(PinotConfiguration config, String 
brokerId, BrokerRoutingManager routingManager,
       AccessControlFactory accessControlFactory, QueryQuotaManager 
queryQuotaManager, TableCache tableCache) {
@@ -92,7 +90,6 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
     int port = 
Integer.parseInt(config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT));
     _workerManager = new WorkerManager(hostname, port, _routingManager);
     _queryDispatcher = new QueryDispatcher(new MailboxService(hostname, port, 
config));
-    _catalog = new PinotCatalog(tableCache);
     LOGGER.info("Initialized MultiStageBrokerRequestHandler on host: {}, port: 
{} with broker id: {}, timeout: {}ms, "
             + "query log max length: {}, query log max rate: {}", hostname, 
port, _brokerId, _brokerTimeoutMs,
         _queryLogger.getMaxQueryLengthToLog(), _queryLogger.getLogRateLimit());
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/DatabaseUtils.java 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/DatabaseUtils.java
index 3691e0063c..e61bbbe4d2 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/DatabaseUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/DatabaseUtils.java
@@ -32,6 +32,13 @@ public class DatabaseUtils {
   private DatabaseUtils() {
   }
 
+  /**
+   * Returns the fully qualified table name. Do not prefix the database name 
if it is the default database.
+   */
+  public static String constructFullyQualifiedTableName(String databaseName, 
String tableName) {
+    return databaseName.equalsIgnoreCase(CommonConstants.DEFAULT_DATABASE) ? 
tableName : databaseName + "." + tableName;
+  }
+
   /**
    * Construct the fully qualified table name i.e. {databaseName}.{tableName} 
from given table name and database name
    * @param tableName table/schema name
@@ -48,11 +55,8 @@ public class DatabaseUtils {
     String[] tableSplit = StringUtils.split(tableName, '.');
     switch (tableSplit.length) {
       case 1:
-        // do not concat the database name prefix if it's a 'default' database
-        if (StringUtils.isNotEmpty(databaseName) && 
!databaseName.equalsIgnoreCase(CommonConstants.DEFAULT_DATABASE)) {
-          return databaseName + "." + tableName;
-        }
-        return tableName;
+        return StringUtils.isEmpty(databaseName) ? tableName
+            : constructFullyQualifiedTableName(databaseName, tableName);
       case 2:
         Preconditions.checkArgument(!tableSplit[1].isEmpty(), "Invalid table 
name '%s'", tableName);
         String databasePrefix = tableSplit[0];
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotRelDistributionTraitRule.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotRelDistributionTraitRule.java
index 8fbd8da202..f2aa72da84 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotRelDistributionTraitRule.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotRelDistributionTraitRule.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.calcite.rel.rules;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import java.util.ArrayList;
 import java.util.List;
@@ -30,10 +31,12 @@ import org.apache.calcite.rel.RelDistributions;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Exchange;
 import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.calcite.rel.logical.LogicalProject;
 import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.tools.RelBuilderFactory;
 import org.apache.calcite.util.mapping.IntPair;
 import org.apache.calcite.util.mapping.Mapping;
@@ -43,6 +46,7 @@ import org.apache.pinot.calcite.rel.hint.PinotHintOptions;
 import org.apache.pinot.calcite.rel.hint.PinotHintStrategyTable;
 import org.apache.pinot.calcite.rel.logical.PinotLogicalAggregate;
 import org.apache.pinot.calcite.rel.logical.PinotLogicalExchange;
+import org.apache.pinot.query.planner.logical.RelToPlanNodeConverter;
 import org.apache.pinot.query.planner.plannode.AggregateNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,7 +65,7 @@ public class PinotRelDistributionTraitRule extends RelOptRule 
{
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotRelDistributionTraitRule.class);
 
   public PinotRelDistributionTraitRule(RelBuilderFactory factory) {
-    super(operand(RelNode.class, any()));
+    super(operand(RelNode.class, any()), factory, null);
   }
 
   @Override
@@ -74,8 +78,7 @@ public class PinotRelDistributionTraitRule extends RelOptRule 
{
     RelNode current = call.rel(0);
     List<RelNode> inputs = current.getInputs();
     RelDistribution relDistribution;
-
-    if (inputs == null || inputs.size() == 0) {
+    if (inputs == null || inputs.isEmpty()) {
       relDistribution = computeCurrentDistribution(current);
     } else {
       // if there's input to the current node, attempt to derive the 
RelDistribution.
@@ -167,15 +170,17 @@ public class PinotRelDistributionTraitRule extends 
RelOptRule {
   private static RelDistribution computeCurrentDistribution(RelNode node) {
     if (node instanceof Exchange) {
       return ((Exchange) node).getDistribution();
-    } else if (node instanceof LogicalTableScan) {
-      LogicalTableScan tableScan = (LogicalTableScan) node;
+    } else if (node instanceof TableScan) {
+      TableScan tableScan = (TableScan) node;
       // convert table scan hints into rel trait
       String partitionKey =
           PinotHintStrategyTable.getHintOption(tableScan.getHints(), 
PinotHintOptions.TABLE_HINT_OPTIONS,
               PinotHintOptions.TableHintOptions.PARTITION_KEY);
       if (partitionKey != null) {
-        int partitionIndex = tableScan.getRowType().getField(partitionKey, 
true, true).getIndex();
-        return RelDistributions.hash(ImmutableList.of(partitionIndex));
+        RelDataTypeField field = tableScan.getRowType().getField(partitionKey, 
true, true);
+        Preconditions.checkState(field != null, "Failed to find partition key: 
%s in table: %s", partitionKey,
+            RelToPlanNodeConverter.getTableNameFromTableScan(tableScan));
+        return RelDistributions.hash(List.of(field.getIndex()));
       } else {
         return RelDistributions.of(RelDistribution.Type.RANDOM_DISTRIBUTED, 
RelDistributions.EMPTY);
       }
@@ -183,9 +188,7 @@ public class PinotRelDistributionTraitRule extends 
RelOptRule {
       PinotLogicalAggregate agg = (PinotLogicalAggregate) node;
       AggregateNode.AggType aggType = agg.getAggType();
       if (aggType == AggregateNode.AggType.FINAL || aggType == 
AggregateNode.AggType.DIRECT) {
-        List<Integer> groupSetIndices = new ArrayList<>();
-        agg.getGroupSet().forEach(groupSetIndices::add);
-        return RelDistributions.hash(groupSetIndices);
+        return RelDistributions.hash(agg.getGroupSet().asList());
       } else {
         return RelDistributions.of(RelDistribution.Type.RANDOM_DISTRIBUTED, 
RelDistributions.EMPTY);
       }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
index cdcfc2f173..50daaa74e7 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
@@ -101,7 +101,7 @@ public class QueryEnvironment {
   private final WorkerManager _workerManager;
 
   public QueryEnvironment(String database, TableCache tableCache, @Nullable 
WorkerManager workerManager) {
-    PinotCatalog catalog = new PinotCatalog(database, tableCache);
+    PinotCatalog catalog = new PinotCatalog(tableCache, database);
     CalciteSchema rootSchema = CalciteSchema.createRootSchema(false, false, 
database, catalog);
     _config = 
Frameworks.newConfigBuilder().traitDefs().operatorTable(PinotOperatorTable.instance())
         
.defaultSchema(rootSchema.plus()).sqlToRelConverterConfig(PinotRuleUtils.PINOT_SQL_TO_REL_CONFIG).build();
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotCatalog.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotCatalog.java
index 7a364b56af..47db3df30a 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotCatalog.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotCatalog.java
@@ -53,12 +53,7 @@ public class PinotCatalog implements Schema {
    * PinotCatalog needs have access to the actual {@link TableCache} object 
because TableCache hosts the actual
    * table available for query and processes table/segment metadata updates 
when cluster status changes.
    */
-  public PinotCatalog(TableCache tableCache) {
-    _tableCache = tableCache;
-    _databaseName = null;
-  }
-
-  public PinotCatalog(String databaseName, TableCache tableCache) {
+  public PinotCatalog(TableCache tableCache, String databaseName) {
     _tableCache = tableCache;
     _databaseName = databaseName;
   }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
index 5bd2ed3705..f161b3a253 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
@@ -19,10 +19,11 @@
 package org.apache.pinot.query.planner.logical;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.RelFieldCollation;
@@ -31,6 +32,7 @@ import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.core.Exchange;
 import org.apache.calcite.rel.core.JoinInfo;
 import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.core.Window;
 import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rel.logical.LogicalJoin;
@@ -239,13 +241,7 @@ public final class RelToPlanNodeConverter {
   }
 
   private TableScanNode convertLogicalTableScan(LogicalTableScan node) {
-    String tableName;
-    List<String> qualifiedName = node.getTable().getQualifiedName();
-    if (qualifiedName.size() == 1) {
-      tableName = qualifiedName.get(0);
-    } else {
-      tableName = DatabaseUtils.translateTableName(qualifiedName.get(1), 
qualifiedName.get(0));
-    }
+    String tableName = getTableNameFromTableScan(node);
     List<RelDataTypeField> fields = node.getRowType().getFieldList();
     List<String> columns = new ArrayList<>(fields.size());
     for (RelDataTypeField field : fields) {
@@ -369,20 +365,22 @@ public final class RelToPlanNodeConverter {
     }
   }
 
+  public static String getTableNameFromTableScan(TableScan tableScan) {
+    return getTableNameFromRelTable(tableScan.getTable());
+  }
+
   public static Set<String> getTableNamesFromRelRoot(RelNode relRoot) {
-    Set<String> tableNames = new HashSet<>();
-    List<String> qualifiedTableNames = 
RelOptUtil.findAllTableQualifiedNames(relRoot);
-    for (String qualifiedTableName : qualifiedTableNames) {
-      // Calcite encloses table and schema names in square brackets to 
properly quote and delimit them in SQL
-      // statements, particularly to handle cases when they contain special 
characters or reserved keywords.
-      String tableName = qualifiedTableName.replaceAll("^\\[(.*)\\]$", "$1");
-      String[] split = tableName.split(", ");
-      if (split.length == 1) {
-        tableNames.add(tableName);
-      } else {
-        tableNames.add(DatabaseUtils.translateTableName(split[1], split[0]));
-      }
+    List<RelOptTable> tables = RelOptUtil.findAllTables(relRoot);
+    Set<String> tableNames = Sets.newHashSetWithExpectedSize(tables.size());
+    for (RelOptTable table : tables) {
+      tableNames.add(getTableNameFromRelTable(table));
     }
     return tableNames;
   }
+
+  public static String getTableNameFromRelTable(RelOptTable table) {
+    List<String> qualifiedName = table.getQualifiedName();
+    return qualifiedName.size() == 1 ? qualifiedName.get(0)
+        : DatabaseUtils.constructFullyQualifiedTableName(qualifiedName.get(0), 
qualifiedName.get(1));
+  }
 }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java
index 271c5c6297..76802c7b77 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java
@@ -282,7 +282,8 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
                 + "GROUP BY a.col2, b.col2",
             1
         },
-        new Object[]{"SELECT * FROM \"default.tbl-escape-naming\"", 5}
+        new Object[]{"SELECT * FROM default.\"tbl-escape-naming\"", 5},
+        new Object[]{"SELECT * FROM \"default\".\"tbl-escape-naming\"", 5}
     };
     //@formatter:on
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to