walterddr commented on code in PR #9873:
URL: https://github.com/apache/pinot/pull/9873#discussion_r1067530814


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java:
##########
@@ -102,6 +111,15 @@ private StageNode walkRelPlan(RelNode node, int 
currentStageId) {
     }
   }
 
+  // TODO: Switch to Worker SPI to avoid multiple-places where workers are 
assigned.
+  private void runPhysicalOptimizers(QueryPlan queryPlan) {
+    StageNode globalStageRoot = queryPlan.getQueryStageMap().get(0);
+    if (_plannerContext.getOptions().getOrDefault("useColocatedJoin", 
"false").equals("true")) {
+      GreedyShuffleRewriteVisitor.optimizeShuffles(queryPlan, _tableCache);
+    } else {
+      ShuffleRewriteVisitor.optimizeShuffles(globalStageRoot);

Review Comment:
   default shuffleRewriteVisitor is no longer used. remove this branch



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java:
##########
@@ -79,11 +80,14 @@ public class QueryEnvironment {
   // Pinot extensions
   private final Collection<RelOptRule> _logicalRuleSet;
   private final WorkerManager _workerManager;
+  private final TableCache _tableCache;

Review Comment:
   passing in _tableCache seems to be a bit weird to me. but i don't have a 
better abstraction here. 
   can we add a TODO and indicate exactly what this tableCache is intended to 
be used and ensure that this is not being abused. 
   
   b/c all info that planner needs are already encapsulated inside the 
CalciteSchema. 



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/KeySelector.java:
##########
@@ -37,4 +37,9 @@
   OUT getKey(IN input);
 
   int computeHash(IN input);
+
+  /**
+   * @return the hash-algorithm used for distributing rows
+   */
+  String hashAlgorithm();

Review Comment:
   are we using this at all in this moment? if not i would suggest add this API 
in the future



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/DefaultPostOrderTraversalVisitor.java:
##########
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.stage;
+
+public abstract class DefaultPostOrderTraversalVisitor<T, C> implements 
StageNodeVisitor<T, C> {

Review Comment:
   missing javadoc



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java:
##########
@@ -104,6 +112,16 @@ private StageNode walkRelPlan(RelNode node, int 
currentStageId) {
     }
   }
 
+  // Could run CBO here later
+  private void runPhysicalOptimizers(QueryPlan queryPlan) {

Review Comment:
   for now it is fine. let's consider the SPI for worker assignment and we can 
refactor later



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java:
##########
@@ -79,11 +80,14 @@ public class QueryEnvironment {
   // Pinot extensions
   private final Collection<RelOptRule> _logicalRuleSet;
   private final WorkerManager _workerManager;
+  private final TableCache _tableCache;

Review Comment:
   actually IMO this info should be encapsulated in 
`org.apache.pinot.query.catalog.PinotTable` but we don't have to fix this in 
this PR



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/PartitionKey.java:
##########
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.physical.colocated;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import org.apache.calcite.rex.RexInputRef;
+
+
+/**
+ * PartitionKey describes how data is distributed in a given stage. It 
consists of a list of columns which are stored
+ * as a list of {@link RexInputRef#getIndex()}, the number of partitions and 
the hash-algorithm used. A given stage may
+ * have more than 1 PartitionKey, in which case one may use a {@link 
java.util.Set<PartitionKey>} to represent this
+ * behavior.
+ *
+ * <p>
+ *  In other words, when a StageNode has the schema: (user_uuid, col1, col2, 
...), and the PartitionKey is
+ *  ([0], 8, murmur), then that means that the data for the StageNode is 
partitioned using the user_uuid column, into
+ *  8 partitions where the partitionId is computed using murmur(user_uuid) % 8.
+ *
+ *  For a join stage the data is partitioned by the senders using their 
respective join-keys. In that case, we may
+ *  have more than 1 PartitionKey applicable for the JoinNode, and it can be 
represented by a set as:
+ *  {([0], 8, murmur), ([leftSchemaSize + 0], 8, murmur)}, assuming both 
senders partition using Murmur into 8
+ *  partitions. Note that a set of PartitionKey means that the partition keys 
are independent and they don't have any
+ *  ordering, i.e. the data is partitioned by both the join-key of the left 
child and the join-key of the right child.
+ * </p>
+ */
+public class PartitionKey {

Review Comment:
   make this class package private. and suggest renaming it to ColocationKey



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to