This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new f618cf3cc4 Improve database handling in multi-stage engine (#14040) f618cf3cc4 is described below commit f618cf3cc412e91df19d95c09faf2fcd58c1e84e Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Mon Sep 23 08:51:09 2024 -0700 Improve database handling in multi-stage engine (#14040) --- .../MultiStageBrokerRequestHandler.java | 3 -- .../apache/pinot/common/utils/DatabaseUtils.java | 14 +++++--- .../rel/rules/PinotRelDistributionTraitRule.java | 23 +++++++------ .../org/apache/pinot/query/QueryEnvironment.java | 2 +- .../apache/pinot/query/catalog/PinotCatalog.java | 7 +--- .../planner/logical/RelToPlanNodeConverter.java | 38 ++++++++++------------ .../query/runtime/queries/QueryRunnerTest.java | 3 +- 7 files changed, 44 insertions(+), 46 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java index d5791d1661..b683bbef0a 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java @@ -54,7 +54,6 @@ import org.apache.pinot.common.utils.config.QueryOptionsUtils; import org.apache.pinot.core.auth.Actions; import org.apache.pinot.core.auth.TargetType; import org.apache.pinot.query.QueryEnvironment; -import org.apache.pinot.query.catalog.PinotCatalog; import org.apache.pinot.query.mailbox.MailboxService; import org.apache.pinot.query.planner.physical.DispatchablePlanFragment; import org.apache.pinot.query.planner.physical.DispatchableSubPlan; @@ -83,7 +82,6 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { private final WorkerManager _workerManager; private final QueryDispatcher _queryDispatcher; - private final PinotCatalog _catalog; public MultiStageBrokerRequestHandler(PinotConfiguration config, String brokerId, BrokerRoutingManager routingManager, AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache) { @@ -92,7 +90,6 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { int port = Integer.parseInt(config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT)); _workerManager = new WorkerManager(hostname, port, _routingManager); _queryDispatcher = new QueryDispatcher(new MailboxService(hostname, port, config)); - _catalog = new PinotCatalog(tableCache); LOGGER.info("Initialized MultiStageBrokerRequestHandler on host: {}, port: {} with broker id: {}, timeout: {}ms, " + "query log max length: {}, query log max rate: {}", hostname, port, _brokerId, _brokerTimeoutMs, _queryLogger.getMaxQueryLengthToLog(), _queryLogger.getLogRateLimit()); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/DatabaseUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/DatabaseUtils.java index 3691e0063c..e61bbbe4d2 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DatabaseUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DatabaseUtils.java @@ -32,6 +32,13 @@ public class DatabaseUtils { private DatabaseUtils() { } + /** + * Returns the fully qualified table name. Do not prefix the database name if it is the default database. + */ + public static String constructFullyQualifiedTableName(String databaseName, String tableName) { + return databaseName.equalsIgnoreCase(CommonConstants.DEFAULT_DATABASE) ? tableName : databaseName + "." + tableName; + } + /** * Construct the fully qualified table name i.e. {databaseName}.{tableName} from given table name and database name * @param tableName table/schema name @@ -48,11 +55,8 @@ public class DatabaseUtils { String[] tableSplit = StringUtils.split(tableName, '.'); switch (tableSplit.length) { case 1: - // do not concat the database name prefix if it's a 'default' database - if (StringUtils.isNotEmpty(databaseName) && !databaseName.equalsIgnoreCase(CommonConstants.DEFAULT_DATABASE)) { - return databaseName + "." + tableName; - } - return tableName; + return StringUtils.isEmpty(databaseName) ? tableName + : constructFullyQualifiedTableName(databaseName, tableName); case 2: Preconditions.checkArgument(!tableSplit[1].isEmpty(), "Invalid table name '%s'", tableName); String databasePrefix = tableSplit[0]; diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotRelDistributionTraitRule.java b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotRelDistributionTraitRule.java index 8fbd8da202..f2aa72da84 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotRelDistributionTraitRule.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotRelDistributionTraitRule.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.calcite.rel.rules; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import java.util.ArrayList; import java.util.List; @@ -30,10 +31,12 @@ import org.apache.calcite.rel.RelDistributions; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Exchange; import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.core.TableScan; import org.apache.calcite.rel.logical.LogicalFilter; import org.apache.calcite.rel.logical.LogicalJoin; import org.apache.calcite.rel.logical.LogicalProject; import org.apache.calcite.rel.logical.LogicalTableScan; +import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.tools.RelBuilderFactory; import org.apache.calcite.util.mapping.IntPair; import org.apache.calcite.util.mapping.Mapping; @@ -43,6 +46,7 @@ import org.apache.pinot.calcite.rel.hint.PinotHintOptions; import org.apache.pinot.calcite.rel.hint.PinotHintStrategyTable; import org.apache.pinot.calcite.rel.logical.PinotLogicalAggregate; import org.apache.pinot.calcite.rel.logical.PinotLogicalExchange; +import org.apache.pinot.query.planner.logical.RelToPlanNodeConverter; import org.apache.pinot.query.planner.plannode.AggregateNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,7 +65,7 @@ public class PinotRelDistributionTraitRule extends RelOptRule { private static final Logger LOGGER = LoggerFactory.getLogger(PinotRelDistributionTraitRule.class); public PinotRelDistributionTraitRule(RelBuilderFactory factory) { - super(operand(RelNode.class, any())); + super(operand(RelNode.class, any()), factory, null); } @Override @@ -74,8 +78,7 @@ public class PinotRelDistributionTraitRule extends RelOptRule { RelNode current = call.rel(0); List<RelNode> inputs = current.getInputs(); RelDistribution relDistribution; - - if (inputs == null || inputs.size() == 0) { + if (inputs == null || inputs.isEmpty()) { relDistribution = computeCurrentDistribution(current); } else { // if there's input to the current node, attempt to derive the RelDistribution. @@ -167,15 +170,17 @@ public class PinotRelDistributionTraitRule extends RelOptRule { private static RelDistribution computeCurrentDistribution(RelNode node) { if (node instanceof Exchange) { return ((Exchange) node).getDistribution(); - } else if (node instanceof LogicalTableScan) { - LogicalTableScan tableScan = (LogicalTableScan) node; + } else if (node instanceof TableScan) { + TableScan tableScan = (TableScan) node; // convert table scan hints into rel trait String partitionKey = PinotHintStrategyTable.getHintOption(tableScan.getHints(), PinotHintOptions.TABLE_HINT_OPTIONS, PinotHintOptions.TableHintOptions.PARTITION_KEY); if (partitionKey != null) { - int partitionIndex = tableScan.getRowType().getField(partitionKey, true, true).getIndex(); - return RelDistributions.hash(ImmutableList.of(partitionIndex)); + RelDataTypeField field = tableScan.getRowType().getField(partitionKey, true, true); + Preconditions.checkState(field != null, "Failed to find partition key: %s in table: %s", partitionKey, + RelToPlanNodeConverter.getTableNameFromTableScan(tableScan)); + return RelDistributions.hash(List.of(field.getIndex())); } else { return RelDistributions.of(RelDistribution.Type.RANDOM_DISTRIBUTED, RelDistributions.EMPTY); } @@ -183,9 +188,7 @@ public class PinotRelDistributionTraitRule extends RelOptRule { PinotLogicalAggregate agg = (PinotLogicalAggregate) node; AggregateNode.AggType aggType = agg.getAggType(); if (aggType == AggregateNode.AggType.FINAL || aggType == AggregateNode.AggType.DIRECT) { - List<Integer> groupSetIndices = new ArrayList<>(); - agg.getGroupSet().forEach(groupSetIndices::add); - return RelDistributions.hash(groupSetIndices); + return RelDistributions.hash(agg.getGroupSet().asList()); } else { return RelDistributions.of(RelDistribution.Type.RANDOM_DISTRIBUTED, RelDistributions.EMPTY); } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java index cdcfc2f173..50daaa74e7 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java @@ -101,7 +101,7 @@ public class QueryEnvironment { private final WorkerManager _workerManager; public QueryEnvironment(String database, TableCache tableCache, @Nullable WorkerManager workerManager) { - PinotCatalog catalog = new PinotCatalog(database, tableCache); + PinotCatalog catalog = new PinotCatalog(tableCache, database); CalciteSchema rootSchema = CalciteSchema.createRootSchema(false, false, database, catalog); _config = Frameworks.newConfigBuilder().traitDefs().operatorTable(PinotOperatorTable.instance()) .defaultSchema(rootSchema.plus()).sqlToRelConverterConfig(PinotRuleUtils.PINOT_SQL_TO_REL_CONFIG).build(); diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotCatalog.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotCatalog.java index 7a364b56af..47db3df30a 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotCatalog.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotCatalog.java @@ -53,12 +53,7 @@ public class PinotCatalog implements Schema { * PinotCatalog needs have access to the actual {@link TableCache} object because TableCache hosts the actual * table available for query and processes table/segment metadata updates when cluster status changes. */ - public PinotCatalog(TableCache tableCache) { - _tableCache = tableCache; - _databaseName = null; - } - - public PinotCatalog(String databaseName, TableCache tableCache) { + public PinotCatalog(TableCache tableCache, String databaseName) { _tableCache = tableCache; _databaseName = databaseName; } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java index 5bd2ed3705..f161b3a253 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java @@ -19,10 +19,11 @@ package org.apache.pinot.query.planner.logical; import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; import java.util.ArrayList; -import java.util.HashSet; import java.util.List; import java.util.Set; +import org.apache.calcite.plan.RelOptTable; import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.rel.RelDistribution; import org.apache.calcite.rel.RelFieldCollation; @@ -31,6 +32,7 @@ import org.apache.calcite.rel.core.AggregateCall; import org.apache.calcite.rel.core.Exchange; import org.apache.calcite.rel.core.JoinInfo; import org.apache.calcite.rel.core.SetOp; +import org.apache.calcite.rel.core.TableScan; import org.apache.calcite.rel.core.Window; import org.apache.calcite.rel.logical.LogicalFilter; import org.apache.calcite.rel.logical.LogicalJoin; @@ -239,13 +241,7 @@ public final class RelToPlanNodeConverter { } private TableScanNode convertLogicalTableScan(LogicalTableScan node) { - String tableName; - List<String> qualifiedName = node.getTable().getQualifiedName(); - if (qualifiedName.size() == 1) { - tableName = qualifiedName.get(0); - } else { - tableName = DatabaseUtils.translateTableName(qualifiedName.get(1), qualifiedName.get(0)); - } + String tableName = getTableNameFromTableScan(node); List<RelDataTypeField> fields = node.getRowType().getFieldList(); List<String> columns = new ArrayList<>(fields.size()); for (RelDataTypeField field : fields) { @@ -369,20 +365,22 @@ public final class RelToPlanNodeConverter { } } + public static String getTableNameFromTableScan(TableScan tableScan) { + return getTableNameFromRelTable(tableScan.getTable()); + } + public static Set<String> getTableNamesFromRelRoot(RelNode relRoot) { - Set<String> tableNames = new HashSet<>(); - List<String> qualifiedTableNames = RelOptUtil.findAllTableQualifiedNames(relRoot); - for (String qualifiedTableName : qualifiedTableNames) { - // Calcite encloses table and schema names in square brackets to properly quote and delimit them in SQL - // statements, particularly to handle cases when they contain special characters or reserved keywords. - String tableName = qualifiedTableName.replaceAll("^\\[(.*)\\]$", "$1"); - String[] split = tableName.split(", "); - if (split.length == 1) { - tableNames.add(tableName); - } else { - tableNames.add(DatabaseUtils.translateTableName(split[1], split[0])); - } + List<RelOptTable> tables = RelOptUtil.findAllTables(relRoot); + Set<String> tableNames = Sets.newHashSetWithExpectedSize(tables.size()); + for (RelOptTable table : tables) { + tableNames.add(getTableNameFromRelTable(table)); } return tableNames; } + + public static String getTableNameFromRelTable(RelOptTable table) { + List<String> qualifiedName = table.getQualifiedName(); + return qualifiedName.size() == 1 ? qualifiedName.get(0) + : DatabaseUtils.constructFullyQualifiedTableName(qualifiedName.get(0), qualifiedName.get(1)); + } } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java index 271c5c6297..76802c7b77 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java @@ -282,7 +282,8 @@ public class QueryRunnerTest extends QueryRunnerTestBase { + "GROUP BY a.col2, b.col2", 1 }, - new Object[]{"SELECT * FROM \"default.tbl-escape-naming\"", 5} + new Object[]{"SELECT * FROM default.\"tbl-escape-naming\"", 5}, + new Object[]{"SELECT * FROM \"default\".\"tbl-escape-naming\"", 5} }; //@formatter:on } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org