walterddr commented on code in PR #11234:
URL: https://github.com/apache/pinot/pull/11234#discussion_r1281239345


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java:
##########
@@ -96,19 +97,66 @@ private static boolean isLeafPlan(DispatchablePlanMetadata 
metadata) {
   }
 
   private void assignWorkersToLeafFragment(PlanFragment fragment, 
DispatchablePlanContext context) {
-    DispatchablePlanMetadata metadata = 
context.getDispatchablePlanMetadataMap().get(fragment.getFragmentId());
-    // table scan stage, need to attach server as well as segment info for 
each physical table type.
-    List<String> scannedTables = metadata.getScannedTables();
-    String logicalTableName = scannedTables.get(0);
-    Map<String, RoutingTable> routingTableMap = 
getRoutingTable(logicalTableName, context.getRequestId());
-    if (routingTableMap.size() == 0) {
-      throw new IllegalArgumentException("Unable to find routing entries for 
table: " + logicalTableName);
+    // NOTE: For pipeline breaker, leaf fragment can also have children
+    for (PlanFragment child : fragment.getChildren()) {
+      assignWorkersToNonRootFragment(child, context);
+    }
+
+    TableScanNode tableScanNode = 
findTableScanNode(fragment.getFragmentRoot());
+    Preconditions.checkState(tableScanNode != null, "Failed to find table scan 
node under leaf fragment");
+    String tableName = tableScanNode.getTableName();
+
+    // Extract partitionKey and numPartitions from hint if provided
+    Map<String, String> tableHintOptions =
+        
tableScanNode.getNodeHint()._hintOptions.get(PinotHintOptions.TABLE_HINT_OPTIONS);
+    String partitionKey = null;
+    int numPartitions = 0;
+    if (tableHintOptions != null) {
+      partitionKey = 
tableHintOptions.get(PinotHintOptions.TableHintOptions.PARTITION_KEY);
+      String partitionSize = 
tableHintOptions.get(PinotHintOptions.TableHintOptions.PARTITION_SIZE);
+      if (partitionSize != null) {
+        numPartitions = Integer.parseInt(partitionSize);
+      }
+    }
+
+    if (partitionKey == null) {
+      assignWorkersToNonPartitionedLeafFragment(fragment, context, tableName);
+    } else {
+      Preconditions.checkState(numPartitions > 0, "'%s' must be provided for 
partition key: %s",
+          PinotHintOptions.TableHintOptions.PARTITION_SIZE, partitionKey);
+      assignWorkersToPartitionedLeafFragment(fragment, context, tableName, 
partitionKey, numPartitions);
     }
+  }
+
+  @Nullable
+  private TableScanNode findTableScanNode(PlanNode planNode) {

Review Comment:
   nit: this can be set into the fragment metadata during fragment visitor



##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java:
##########
@@ -98,51 +101,54 @@ public void setUp()
         boolean allowEmptySegment = 
!BooleanUtils.toBoolean(extractExtraProps(testCase._extraProps, 
"noEmptySegment"));
         String tableName = testCaseName + "_" + tableEntry.getKey();
         // Testing only OFFLINE table b/c Hybrid table test is a special case 
to test separately.
-        String tableNameWithType = 
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(tableName);
-        org.apache.pinot.spi.data.Schema pinotSchema = 
constructSchema(tableName, tableEntry.getValue()._schema);
+        String offlineTableName = 
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(tableName);
+        Schema pinotSchema = constructSchema(tableName, 
tableEntry.getValue()._schema);
         schemaMap.put(tableName, pinotSchema);
-        factory1.registerTable(pinotSchema, tableNameWithType);
-        factory2.registerTable(pinotSchema, tableNameWithType);
+        factory1.registerTable(pinotSchema, offlineTableName);
+        factory2.registerTable(pinotSchema, offlineTableName);
         List<QueryTestCase.ColumnAndType> columnAndTypes = 
tableEntry.getValue()._schema;
         List<GenericRow> genericRows = toRow(columnAndTypes, 
tableEntry.getValue()._inputs);
 
         // generate segments and dump into server1 and server2
         List<String> partitionColumns = 
tableEntry.getValue()._partitionColumns;
+        String partitionColumn = null;
+        List<List<String>> partitionIdToSegmentsMap = null;
+        if (partitionColumns != null && partitionColumns.size() == 1) {
+          partitionColumn = partitionColumns.get(0);
+          partitionIdToSegmentsMap = new ArrayList<>();
+          for (int i = 0; i < 4; i++) {

Review Comment:
   make the `4` private static final ?



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java:
##########
@@ -59,9 +59,38 @@ public Void process(PlanNode node, DispatchablePlanContext 
context) {
             receiverMailboxesMap.computeIfAbsent(workerId, k -> new 
HashMap<>()).put(senderFragmentId, mailboxMetadata);
           }
         });
+      } else if (senderMetadata.isPartitionedTableScan()) {
+        // For partitioned table scan, send the data to the worker with the 
same worker id (not necessary the same
+        // instance)

Review Comment:
   maybe we can add a TODO - which instead of same worker-id we will do a 1:2 
or 1:4 fan-out. 
   (possibly based on the parallelism setting such as `set 
stageParallelism=xxx`)



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java:
##########
@@ -96,19 +97,66 @@ private static boolean isLeafPlan(DispatchablePlanMetadata 
metadata) {
   }
 
   private void assignWorkersToLeafFragment(PlanFragment fragment, 
DispatchablePlanContext context) {
-    DispatchablePlanMetadata metadata = 
context.getDispatchablePlanMetadataMap().get(fragment.getFragmentId());
-    // table scan stage, need to attach server as well as segment info for 
each physical table type.
-    List<String> scannedTables = metadata.getScannedTables();
-    String logicalTableName = scannedTables.get(0);
-    Map<String, RoutingTable> routingTableMap = 
getRoutingTable(logicalTableName, context.getRequestId());
-    if (routingTableMap.size() == 0) {
-      throw new IllegalArgumentException("Unable to find routing entries for 
table: " + logicalTableName);
+    // NOTE: For pipeline breaker, leaf fragment can also have children
+    for (PlanFragment child : fragment.getChildren()) {
+      assignWorkersToNonRootFragment(child, context);
+    }
+
+    TableScanNode tableScanNode = 
findTableScanNode(fragment.getFragmentRoot());
+    Preconditions.checkState(tableScanNode != null, "Failed to find table scan 
node under leaf fragment");
+    String tableName = tableScanNode.getTableName();
+
+    // Extract partitionKey and numPartitions from hint if provided
+    Map<String, String> tableHintOptions =
+        
tableScanNode.getNodeHint()._hintOptions.get(PinotHintOptions.TABLE_HINT_OPTIONS);

Review Comment:
   nit: can we simplify all these by putting the hint on the dispatchable plan 
metadata map during visitor pattern? 



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java:
##########
@@ -354,14 +349,12 @@ private ColocatedTableInfo getColocatedTableInfo(String 
tableName) {
         TimeBoundaryInfo timeBoundaryInfo = 
_routingManager.getTimeBoundaryInfo(offlineTableName);
         // Ignore OFFLINE side when time boundary info is unavailable
         if (timeBoundaryInfo == null) {
-          return getRealtimeColocatedTableInfo(realtimeTableName);
+          return getRealtimeColocatedTableInfo(realtimeTableName, 
partitionKey, numPartitions);

Review Comment:
   should we add a integration-test for hybrid table colocated join? i am not 
sure how we can mock that in unit-test but an integration test should be super 
helpful to cover this.



##########
pinot-query-runtime/src/test/resources/queries/QueryHints.json:
##########
@@ -51,37 +51,41 @@
       }
     },
     "queries": [
+      {
+        "description": "Group by partition column",
+        "sql": "SELECT {tbl1}.num, COUNT(*) FROM {tbl1} /*+ 
tableOptions(partition_key='num', partition_size='4') */ GROUP BY {tbl1}.num"
+      },
       {
         "description": "Colocated JOIN with partition column",
-        "sql": "SELECT /*+ joinOptions(is_colocated_by_join_keys='true') */ 
{tbl1}.num, {tbl1}.name, {tbl2}.num, {tbl2}.val FROM {tbl1} JOIN {tbl2} ON 
{tbl1}.num = {tbl2}.num"
+        "sql": "SELECT {tbl1}.num, {tbl1}.name, {tbl2}.num, {tbl2}.val FROM 
{tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} 
/*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = 
{tbl2}.num"

Review Comment:
   can we add a test to indicate partition size or partition key invalid (e.g. 
not `num` or number is not `4`) should throw (or should fallback?)



##########
pinot-query-runtime/src/test/resources/queries/QueryHints.json:
##########
@@ -51,37 +51,41 @@
       }
     },
     "queries": [
+      {
+        "description": "Group by partition column",
+        "sql": "SELECT {tbl1}.num, COUNT(*) FROM {tbl1} /*+ 
tableOptions(partition_key='num', partition_size='4') */ GROUP BY {tbl1}.num"
+      },
       {
         "description": "Colocated JOIN with partition column",
-        "sql": "SELECT /*+ joinOptions(is_colocated_by_join_keys='true') */ 
{tbl1}.num, {tbl1}.name, {tbl2}.num, {tbl2}.val FROM {tbl1} JOIN {tbl2} ON 
{tbl1}.num = {tbl2}.num"
+        "sql": "SELECT {tbl1}.num, {tbl1}.name, {tbl2}.num, {tbl2}.val FROM 
{tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} 
/*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = 
{tbl2}.num"
       },
       {
         "description": "Colocated JOIN with partition column and group by 
partition column",
-        "sql": "SELECT /*+ joinOptions(is_colocated_by_join_keys='true'), 
aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, {tbl1}.name, 
SUM({tbl2}.num) FROM {tbl1} JOIN {tbl2} ON {tbl1}.num = {tbl2}.num GROUP BY 
{tbl1}.num, {tbl1}.name"
+        "sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') 
*/ {tbl1}.num, {tbl1}.name, SUM({tbl2}.num) FROM {tbl1} /*+ 
tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ 
tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = 
{tbl2}.num GROUP BY {tbl1}.num, {tbl1}.name"
       },
       {
         "description": "Colocated JOIN with partition column and group by 
non-partitioned column",
-        "sql": "SELECT /*+ joinOptions(is_colocated_by_join_keys='true'), 
aggOptions(is_partitioned_by_group_by_keys='false') */ {tbl1}.name, 
SUM({tbl2}.num) FROM {tbl1} JOIN {tbl2} ON {tbl1}.num = {tbl2}.num GROUP BY 
{tbl1}.name"
+        "sql": "SELECT {tbl1}.name, SUM({tbl2}.num) FROM {tbl1} /*+ 
tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ 
tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = 
{tbl2}.num GROUP BY {tbl1}.name"
       },
       {
         "description": "Colocated, Dynamic broadcast SEMI-JOIN with partition 
column",
-        "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast', 
is_colocated_by_join_keys='true') */ {tbl1}.num, {tbl1}.name FROM {tbl1} WHERE 
{tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} WHERE {tbl2}.val IN ('xxx', 
'yyy'))"
+        "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */ 
{tbl1}.num, {tbl1}.name FROM {tbl1} /*+ tableOptions(partition_key='num', 
partition_size='4') */ WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} /*+ 
tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.val IN 
('xxx', 'yyy'))"
       },
       {
         "description": "Colocated, Dynamic broadcast SEMI-JOIN with partition 
column and group by partition column",
-        "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast', 
is_colocated_by_join_keys='true'), 
aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, 
COUNT({tbl1}.name) FROM {tbl1} WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM 
{tbl2} WHERE {tbl2}.val IN ('xxx', 'yyy')) GROUP BY {tbl1}.num, {tbl1}.name"
+        "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast'), 
aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, 
COUNT({tbl1}.name) FROM {tbl1} /*+ tableOptions(partition_key='num', 
partition_size='4') */ WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} /*+ 
tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.val IN 
('xxx', 'yyy')) GROUP BY {tbl1}.num, {tbl1}.name"
       },
       {
         "description": "Colocated, Dynamic broadcast SEMI-JOIN with partition 
column and group by non-partitioned column",
-        "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast', 
is_colocated_by_join_keys='true') */ {tbl1}.name, COUNT(*) FROM {tbl1} WHERE 
{tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} WHERE {tbl2}.val IN ('xxx', 
'yyy')) GROUP BY {tbl1}.name"
+        "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */ 
{tbl1}.name, COUNT(*) FROM {tbl1} /*+ tableOptions(partition_key='num', 
partition_size='4') */ WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} /*+ 
tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.val IN 
('xxx', 'yyy')) GROUP BY {tbl1}.name"
       },
       {
         "description": "Dynamic broadcast SEMI-JOIN with empty right table 
result",
-        "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */ 
{tbl1}.name, COUNT(*) FROM {tbl1} WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM 
{tbl2} WHERE {tbl2}.val = 'non-exist') GROUP BY {tbl1}.name"
+        "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */ 
{tbl1}.name, COUNT(*) FROM {tbl1} /*+ tableOptions(partition_key='num', 
partition_size='4') */ WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} /*+ 
tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.val = 
'non-exist') GROUP BY {tbl1}.name"
       },
       {
         "description": "Colocated, Dynamic broadcast SEMI-JOIN with partially 
empty right table result for some servers",
-        "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast', 
is_colocated_by_join_keys='true') */ {tbl1}.name, COUNT(*) FROM {tbl1} WHERE 
{tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} WHERE {tbl2}.val = 'z') GROUP BY 
{tbl1}.name"
+        "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */ 
{tbl1}.name, COUNT(*) FROM {tbl1} /*+ tableOptions(partition_key='num', 
partition_size='4') */ WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} /*+ 
tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.val = 
'z') GROUP BY {tbl1}.name"

Review Comment:
   can we add test with `set stageParallelism = 2;` variance to the hinted 
tests here. we never tested option + hints at the same time, good to add. (but 
can be followed up, i just tried and they all passed)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to