This is an automated email from the ASF dual-hosted git repository. gortiz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 6ad08a7016 Fix colocated join when no mapping project is used (#13666) 6ad08a7016 is described below commit 6ad08a7016a19c34e2cce2c0834ac38b451798e4 Author: Gonzalo Ortiz Jaureguizar <gor...@users.noreply.github.com> AuthorDate: Tue Jul 30 11:32:48 2024 +0200 Fix colocated join when no mapping project is used (#13666) --- .../rel/rules/PinotRelDistributionTraitRule.java | 21 ++++++++- .../resources/queries/ExplainPhysicalPlans.json | 52 ++++++++++++++++++++++ 2 files changed, 72 insertions(+), 1 deletion(-) diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotRelDistributionTraitRule.java b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotRelDistributionTraitRule.java index 821d1cf5ad..8fbd8da202 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotRelDistributionTraitRule.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotRelDistributionTraitRule.java @@ -29,17 +29,23 @@ import org.apache.calcite.rel.RelDistribution; import org.apache.calcite.rel.RelDistributions; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Exchange; +import org.apache.calcite.rel.core.Project; import org.apache.calcite.rel.logical.LogicalFilter; import org.apache.calcite.rel.logical.LogicalJoin; import org.apache.calcite.rel.logical.LogicalProject; import org.apache.calcite.rel.logical.LogicalTableScan; import org.apache.calcite.tools.RelBuilderFactory; +import org.apache.calcite.util.mapping.IntPair; +import org.apache.calcite.util.mapping.Mapping; +import org.apache.calcite.util.mapping.MappingType; import org.apache.calcite.util.mapping.Mappings; import org.apache.pinot.calcite.rel.hint.PinotHintOptions; import org.apache.pinot.calcite.rel.hint.PinotHintStrategyTable; import org.apache.pinot.calcite.rel.logical.PinotLogicalAggregate; import org.apache.pinot.calcite.rel.logical.PinotLogicalExchange; import org.apache.pinot.query.planner.plannode.AggregateNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -52,6 +58,7 @@ import org.apache.pinot.query.planner.plannode.AggregateNode; public class PinotRelDistributionTraitRule extends RelOptRule { public static final PinotRelDistributionTraitRule INSTANCE = new PinotRelDistributionTraitRule(PinotRuleUtils.PINOT_REL_FACTORY); + private static final Logger LOGGER = LoggerFactory.getLogger(PinotRelDistributionTraitRule.class); public PinotRelDistributionTraitRule(RelBuilderFactory factory) { super(operand(RelNode.class, any())); @@ -110,10 +117,22 @@ public class PinotRelDistributionTraitRule extends RelOptRule { LogicalProject project = (LogicalProject) node; try { if (inputRelDistribution != null) { - return inputRelDistribution.apply(project.getMapping()); + Mappings.TargetMapping mapping = + Project.getPartialMapping(input.getRowType().getFieldCount(), project.getProjects()); + // Note(gonzalo): In Calcite 1.37 mapping.getTargetOpt will fail in what it looks like a Calcite bug. + // Therefore here we apply a workaround and create a new map where the same elements (extracted with + // iterator, which actually work) are added to the new mapping. + // See https://lists.apache.org/thread/qz18qxrfp5bqldnoln2tg4582g402zyv + Mapping actualMapping = Mappings.create(MappingType.PARTIAL_FUNCTION, input.getRowType().getFieldCount(), + project.getRowType().getFieldCount()); + for (IntPair intPair : mapping) { + actualMapping.set(intPair.source, intPair.target); + } + return inputRelDistribution.apply(actualMapping); } } catch (Exception e) { // ... skip; + LOGGER.warn("Failed to derive distribution from input for node: {}", node, e); } } else if (node instanceof LogicalFilter) { assert inputs.size() == 1; diff --git a/pinot-query-planner/src/test/resources/queries/ExplainPhysicalPlans.json b/pinot-query-planner/src/test/resources/queries/ExplainPhysicalPlans.json index f72f8c1852..31db5ee99b 100644 --- a/pinot-query-planner/src/test/resources/queries/ExplainPhysicalPlans.json +++ b/pinot-query-planner/src/test/resources/queries/ExplainPhysicalPlans.json @@ -449,6 +449,58 @@ " └── [5]@localhost:1|[0] TABLE SCAN (b) null\n", "" ] + }, + { + "description": "explain plan with colocated join and a projection that is not mapping", + "sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.mySum, b.col3 FROM (select col2, col3 + col2 as mySum from a /*+ tableOptions(partition_function='hashcode', partition_key='col2', partition_size='4') */) as a JOIN b /*+ tableOptions(partition_function='hashcode', partition_key='col1', partition_size='4') */ ON a.col2 = b.col1", + "output": [ + "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n", + "├── [1]@localhost:2|[2] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n", + "├── [1]@localhost:2|[3] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n", + "├── [1]@localhost:1|[0] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n", + "└── [1]@localhost:1|[1] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n", + " └── [1]@localhost:1|[1] PROJECT\n", + " └── [1]@localhost:1|[1] JOIN\n", + " ├── [1]@localhost:1|[1] MAIL_RECEIVE(HASH_DISTRIBUTED)\n", + " │ ├── [2]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[2]} (Subtree Omitted)\n", + " │ ├── [2]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[3]} (Subtree Omitted)\n", + " │ ├── [2]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[0]} (Subtree Omitted)\n", + " │ └── [2]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[1]}\n", + " │ └── [2]@localhost:1|[1] PROJECT\n", + " │ └── [2]@localhost:1|[1] TABLE SCAN (a) null\n", + " └── [1]@localhost:1|[1] MAIL_RECEIVE(HASH_DISTRIBUTED)\n", + " ├── [3]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[2]} (Subtree Omitted)\n", + " ├── [3]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[3]} (Subtree Omitted)\n", + " ├── [3]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[0]} (Subtree Omitted)\n", + " └── [3]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[1]}\n", + " └── [3]@localhost:1|[1] PROJECT\n", + " └── [3]@localhost:1|[1] TABLE SCAN (b) null\n" + ] + }, + { + "description": "explain plan with colocated join and a projection that doesn't keep the key column", + "sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.mySum, b.col3 FROM (select col3 as col2, col3 + col2 as mySum from a /*+ tableOptions(partition_function='hashcode', partition_key='col2', partition_size='4') */) as a JOIN b /*+ tableOptions(partition_function='hashcode', partition_key='col1', partition_size='4') */ ON a.col2 = b.col1", + "output": [ + "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n", + "├── [1]@localhost:1|[1] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n", + "└── [1]@localhost:2|[0] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n", + " └── [1]@localhost:2|[0] PROJECT\n", + " └── [1]@localhost:2|[0] JOIN\n", + " ├── [1]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n", + " │ ├── [2]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n", + " │ ├── [2]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n", + " │ ├── [2]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n", + " │ └── [2]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n", + " │ └── [2]@localhost:1|[1] PROJECT\n", + " │ └── [2]@localhost:1|[1] TABLE SCAN (a) null\n", + " └── [1]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n", + " ├── [3]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n", + " ├── [3]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n", + " ├── [3]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n", + " └── [3]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n", + " └── [3]@localhost:1|[1] PROJECT\n", + " └── [3]@localhost:1|[1] TABLE SCAN (b) null\n" + ] } ] } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org