This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ad08a7016 Fix colocated join when no mapping project is used (#13666)
6ad08a7016 is described below

commit 6ad08a7016a19c34e2cce2c0834ac38b451798e4
Author: Gonzalo Ortiz Jaureguizar <gor...@users.noreply.github.com>
AuthorDate: Tue Jul 30 11:32:48 2024 +0200

    Fix colocated join when no mapping project is used (#13666)
---
 .../rel/rules/PinotRelDistributionTraitRule.java   | 21 ++++++++-
 .../resources/queries/ExplainPhysicalPlans.json    | 52 ++++++++++++++++++++++
 2 files changed, 72 insertions(+), 1 deletion(-)

diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotRelDistributionTraitRule.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotRelDistributionTraitRule.java
index 821d1cf5ad..8fbd8da202 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotRelDistributionTraitRule.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotRelDistributionTraitRule.java
@@ -29,17 +29,23 @@ import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.RelDistributions;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Exchange;
+import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.calcite.rel.logical.LogicalProject;
 import org.apache.calcite.rel.logical.LogicalTableScan;
 import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.calcite.util.mapping.IntPair;
+import org.apache.calcite.util.mapping.Mapping;
+import org.apache.calcite.util.mapping.MappingType;
 import org.apache.calcite.util.mapping.Mappings;
 import org.apache.pinot.calcite.rel.hint.PinotHintOptions;
 import org.apache.pinot.calcite.rel.hint.PinotHintStrategyTable;
 import org.apache.pinot.calcite.rel.logical.PinotLogicalAggregate;
 import org.apache.pinot.calcite.rel.logical.PinotLogicalExchange;
 import org.apache.pinot.query.planner.plannode.AggregateNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -52,6 +58,7 @@ import org.apache.pinot.query.planner.plannode.AggregateNode;
 public class PinotRelDistributionTraitRule extends RelOptRule {
   public static final PinotRelDistributionTraitRule INSTANCE =
       new PinotRelDistributionTraitRule(PinotRuleUtils.PINOT_REL_FACTORY);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotRelDistributionTraitRule.class);
 
   public PinotRelDistributionTraitRule(RelBuilderFactory factory) {
     super(operand(RelNode.class, any()));
@@ -110,10 +117,22 @@ public class PinotRelDistributionTraitRule extends 
RelOptRule {
       LogicalProject project = (LogicalProject) node;
       try {
         if (inputRelDistribution != null) {
-          return inputRelDistribution.apply(project.getMapping());
+          Mappings.TargetMapping mapping =
+              Project.getPartialMapping(input.getRowType().getFieldCount(), 
project.getProjects());
+          // Note(gonzalo): In Calcite 1.37 mapping.getTargetOpt will fail in 
what it looks like a Calcite bug.
+          //  Therefore here we apply a workaround and create a new map where 
the same elements (extracted with
+          //  iterator, which actually work) are added to the new mapping.
+          //  See 
https://lists.apache.org/thread/qz18qxrfp5bqldnoln2tg4582g402zyv
+          Mapping actualMapping = 
Mappings.create(MappingType.PARTIAL_FUNCTION, 
input.getRowType().getFieldCount(),
+              project.getRowType().getFieldCount());
+          for (IntPair intPair : mapping) {
+            actualMapping.set(intPair.source, intPair.target);
+          }
+          return inputRelDistribution.apply(actualMapping);
         }
       } catch (Exception e) {
         // ... skip;
+        LOGGER.warn("Failed to derive distribution from input for node: {}", 
node, e);
       }
     } else if (node instanceof LogicalFilter) {
       assert inputs.size() == 1;
diff --git 
a/pinot-query-planner/src/test/resources/queries/ExplainPhysicalPlans.json 
b/pinot-query-planner/src/test/resources/queries/ExplainPhysicalPlans.json
index f72f8c1852..31db5ee99b 100644
--- a/pinot-query-planner/src/test/resources/queries/ExplainPhysicalPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/ExplainPhysicalPlans.json
@@ -449,6 +449,58 @@
           "                                    └── [5]@localhost:1|[0] TABLE 
SCAN (b) null\n",
           ""
         ]
+      },
+      {
+        "description": "explain plan with colocated join and a projection that 
is not mapping",
+        "sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.mySum, b.col3 FROM 
(select col2, col3 + col2 as mySum from a /*+ 
tableOptions(partition_function='hashcode', partition_key='col2', 
partition_size='4') */) as a JOIN b /*+ 
tableOptions(partition_function='hashcode', partition_key='col1', 
partition_size='4') */ ON a.col2 = b.col1",
+        "output": [
+          "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
+          "├── [1]@localhost:2|[2] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+          "├── [1]@localhost:2|[3] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+          "├── [1]@localhost:1|[0] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+          "└── [1]@localhost:1|[1] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+          "    └── [1]@localhost:1|[1] PROJECT\n",
+          "        └── [1]@localhost:1|[1] JOIN\n",
+          "            ├── [1]@localhost:1|[1] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "            │   ├── [2]@localhost:2|[2] 
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[2]} (Subtree 
Omitted)\n",
+          "            │   ├── [2]@localhost:2|[3] 
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[3]} (Subtree 
Omitted)\n",
+          "            │   ├── [2]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[0]} (Subtree 
Omitted)\n",
+          "            │   └── [2]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[1]}\n",
+          "            │       └── [2]@localhost:1|[1] PROJECT\n",
+          "            │           └── [2]@localhost:1|[1] TABLE SCAN (a) 
null\n",
+          "            └── [1]@localhost:1|[1] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "                ├── [3]@localhost:2|[2] 
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[2]} (Subtree 
Omitted)\n",
+          "                ├── [3]@localhost:2|[3] 
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[3]} (Subtree 
Omitted)\n",
+          "                ├── [3]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[0]} (Subtree 
Omitted)\n",
+          "                └── [3]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[1]}\n",
+          "                    └── [3]@localhost:1|[1] PROJECT\n",
+          "                        └── [3]@localhost:1|[1] TABLE SCAN (b) 
null\n"
+        ]
+      },
+      {
+        "description": "explain plan with colocated join and a projection that 
doesn't keep the key column",
+        "sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.mySum, b.col3 FROM 
(select col3 as col2, col3 + col2 as mySum from a /*+ 
tableOptions(partition_function='hashcode', partition_key='col2', 
partition_size='4') */) as a JOIN b /*+ 
tableOptions(partition_function='hashcode', partition_key='col1', 
partition_size='4') */ ON a.col2 = b.col1",
+        "output": [
+          "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
+          "├── [1]@localhost:1|[1] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+          "└── [1]@localhost:2|[0] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+          "    └── [1]@localhost:2|[0] PROJECT\n",
+          "        └── [1]@localhost:2|[0] JOIN\n",
+          "            ├── [1]@localhost:2|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "            │   ├── [2]@localhost:2|[2] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree 
Omitted)\n",
+          "            │   ├── [2]@localhost:2|[3] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree 
Omitted)\n",
+          "            │   ├── [2]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree 
Omitted)\n",
+          "            │   └── [2]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
+          "            │       └── [2]@localhost:1|[1] PROJECT\n",
+          "            │           └── [2]@localhost:1|[1] TABLE SCAN (a) 
null\n",
+          "            └── [1]@localhost:2|[0] 
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "                ├── [3]@localhost:2|[2] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree 
Omitted)\n",
+          "                ├── [3]@localhost:2|[3] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree 
Omitted)\n",
+          "                ├── [3]@localhost:1|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree 
Omitted)\n",
+          "                └── [3]@localhost:1|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
+          "                    └── [3]@localhost:1|[1] PROJECT\n",
+          "                        └── [3]@localhost:1|[1] TABLE SCAN (b) 
null\n"
+        ]
       }
     ]
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to