walterddr commented on code in PR #11234: URL: https://github.com/apache/pinot/pull/11234#discussion_r1281239345
########## pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java: ########## @@ -96,19 +97,66 @@ private static boolean isLeafPlan(DispatchablePlanMetadata metadata) { } private void assignWorkersToLeafFragment(PlanFragment fragment, DispatchablePlanContext context) { - DispatchablePlanMetadata metadata = context.getDispatchablePlanMetadataMap().get(fragment.getFragmentId()); - // table scan stage, need to attach server as well as segment info for each physical table type. - List<String> scannedTables = metadata.getScannedTables(); - String logicalTableName = scannedTables.get(0); - Map<String, RoutingTable> routingTableMap = getRoutingTable(logicalTableName, context.getRequestId()); - if (routingTableMap.size() == 0) { - throw new IllegalArgumentException("Unable to find routing entries for table: " + logicalTableName); + // NOTE: For pipeline breaker, leaf fragment can also have children + for (PlanFragment child : fragment.getChildren()) { + assignWorkersToNonRootFragment(child, context); + } + + TableScanNode tableScanNode = findTableScanNode(fragment.getFragmentRoot()); + Preconditions.checkState(tableScanNode != null, "Failed to find table scan node under leaf fragment"); + String tableName = tableScanNode.getTableName(); + + // Extract partitionKey and numPartitions from hint if provided + Map<String, String> tableHintOptions = + tableScanNode.getNodeHint()._hintOptions.get(PinotHintOptions.TABLE_HINT_OPTIONS); + String partitionKey = null; + int numPartitions = 0; + if (tableHintOptions != null) { + partitionKey = tableHintOptions.get(PinotHintOptions.TableHintOptions.PARTITION_KEY); + String partitionSize = tableHintOptions.get(PinotHintOptions.TableHintOptions.PARTITION_SIZE); + if (partitionSize != null) { + numPartitions = Integer.parseInt(partitionSize); + } + } + + if (partitionKey == null) { + assignWorkersToNonPartitionedLeafFragment(fragment, context, tableName); + } else { + Preconditions.checkState(numPartitions > 0, "'%s' must be provided for partition key: %s", + PinotHintOptions.TableHintOptions.PARTITION_SIZE, partitionKey); + assignWorkersToPartitionedLeafFragment(fragment, context, tableName, partitionKey, numPartitions); } + } + + @Nullable + private TableScanNode findTableScanNode(PlanNode planNode) { Review Comment: nit: this can be set into the fragment metadata during fragment visitor ########## pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java: ########## @@ -98,51 +101,54 @@ public void setUp() boolean allowEmptySegment = !BooleanUtils.toBoolean(extractExtraProps(testCase._extraProps, "noEmptySegment")); String tableName = testCaseName + "_" + tableEntry.getKey(); // Testing only OFFLINE table b/c Hybrid table test is a special case to test separately. - String tableNameWithType = TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(tableName); - org.apache.pinot.spi.data.Schema pinotSchema = constructSchema(tableName, tableEntry.getValue()._schema); + String offlineTableName = TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(tableName); + Schema pinotSchema = constructSchema(tableName, tableEntry.getValue()._schema); schemaMap.put(tableName, pinotSchema); - factory1.registerTable(pinotSchema, tableNameWithType); - factory2.registerTable(pinotSchema, tableNameWithType); + factory1.registerTable(pinotSchema, offlineTableName); + factory2.registerTable(pinotSchema, offlineTableName); List<QueryTestCase.ColumnAndType> columnAndTypes = tableEntry.getValue()._schema; List<GenericRow> genericRows = toRow(columnAndTypes, tableEntry.getValue()._inputs); // generate segments and dump into server1 and server2 List<String> partitionColumns = tableEntry.getValue()._partitionColumns; + String partitionColumn = null; + List<List<String>> partitionIdToSegmentsMap = null; + if (partitionColumns != null && partitionColumns.size() == 1) { + partitionColumn = partitionColumns.get(0); + partitionIdToSegmentsMap = new ArrayList<>(); + for (int i = 0; i < 4; i++) { Review Comment: make the `4` private static final ? ########## pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java: ########## @@ -59,9 +59,38 @@ public Void process(PlanNode node, DispatchablePlanContext context) { receiverMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()).put(senderFragmentId, mailboxMetadata); } }); + } else if (senderMetadata.isPartitionedTableScan()) { + // For partitioned table scan, send the data to the worker with the same worker id (not necessary the same + // instance) Review Comment: maybe we can add a TODO - which instead of same worker-id we will do a 1:2 or 1:4 fan-out. (possibly based on the parallelism setting such as `set stageParallelism=xxx`) ########## pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java: ########## @@ -96,19 +97,66 @@ private static boolean isLeafPlan(DispatchablePlanMetadata metadata) { } private void assignWorkersToLeafFragment(PlanFragment fragment, DispatchablePlanContext context) { - DispatchablePlanMetadata metadata = context.getDispatchablePlanMetadataMap().get(fragment.getFragmentId()); - // table scan stage, need to attach server as well as segment info for each physical table type. - List<String> scannedTables = metadata.getScannedTables(); - String logicalTableName = scannedTables.get(0); - Map<String, RoutingTable> routingTableMap = getRoutingTable(logicalTableName, context.getRequestId()); - if (routingTableMap.size() == 0) { - throw new IllegalArgumentException("Unable to find routing entries for table: " + logicalTableName); + // NOTE: For pipeline breaker, leaf fragment can also have children + for (PlanFragment child : fragment.getChildren()) { + assignWorkersToNonRootFragment(child, context); + } + + TableScanNode tableScanNode = findTableScanNode(fragment.getFragmentRoot()); + Preconditions.checkState(tableScanNode != null, "Failed to find table scan node under leaf fragment"); + String tableName = tableScanNode.getTableName(); + + // Extract partitionKey and numPartitions from hint if provided + Map<String, String> tableHintOptions = + tableScanNode.getNodeHint()._hintOptions.get(PinotHintOptions.TABLE_HINT_OPTIONS); Review Comment: nit: can we simplify all these by putting the hint on the dispatchable plan metadata map during visitor pattern? ########## pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java: ########## @@ -354,14 +349,12 @@ private ColocatedTableInfo getColocatedTableInfo(String tableName) { TimeBoundaryInfo timeBoundaryInfo = _routingManager.getTimeBoundaryInfo(offlineTableName); // Ignore OFFLINE side when time boundary info is unavailable if (timeBoundaryInfo == null) { - return getRealtimeColocatedTableInfo(realtimeTableName); + return getRealtimeColocatedTableInfo(realtimeTableName, partitionKey, numPartitions); Review Comment: should we add a integration-test for hybrid table colocated join? i am not sure how we can mock that in unit-test but an integration test should be super helpful to cover this. ########## pinot-query-runtime/src/test/resources/queries/QueryHints.json: ########## @@ -51,37 +51,41 @@ } }, "queries": [ + { + "description": "Group by partition column", + "sql": "SELECT {tbl1}.num, COUNT(*) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ GROUP BY {tbl1}.num" + }, { "description": "Colocated JOIN with partition column", - "sql": "SELECT /*+ joinOptions(is_colocated_by_join_keys='true') */ {tbl1}.num, {tbl1}.name, {tbl2}.num, {tbl2}.val FROM {tbl1} JOIN {tbl2} ON {tbl1}.num = {tbl2}.num" + "sql": "SELECT {tbl1}.num, {tbl1}.name, {tbl2}.num, {tbl2}.val FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.num" Review Comment: can we add a test to indicate partition size or partition key invalid (e.g. not `num` or number is not `4`) should throw (or should fallback?) ########## pinot-query-runtime/src/test/resources/queries/QueryHints.json: ########## @@ -51,37 +51,41 @@ } }, "queries": [ + { + "description": "Group by partition column", + "sql": "SELECT {tbl1}.num, COUNT(*) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ GROUP BY {tbl1}.num" + }, { "description": "Colocated JOIN with partition column", - "sql": "SELECT /*+ joinOptions(is_colocated_by_join_keys='true') */ {tbl1}.num, {tbl1}.name, {tbl2}.num, {tbl2}.val FROM {tbl1} JOIN {tbl2} ON {tbl1}.num = {tbl2}.num" + "sql": "SELECT {tbl1}.num, {tbl1}.name, {tbl2}.num, {tbl2}.val FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.num" }, { "description": "Colocated JOIN with partition column and group by partition column", - "sql": "SELECT /*+ joinOptions(is_colocated_by_join_keys='true'), aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, {tbl1}.name, SUM({tbl2}.num) FROM {tbl1} JOIN {tbl2} ON {tbl1}.num = {tbl2}.num GROUP BY {tbl1}.num, {tbl1}.name" + "sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, {tbl1}.name, SUM({tbl2}.num) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.num GROUP BY {tbl1}.num, {tbl1}.name" }, { "description": "Colocated JOIN with partition column and group by non-partitioned column", - "sql": "SELECT /*+ joinOptions(is_colocated_by_join_keys='true'), aggOptions(is_partitioned_by_group_by_keys='false') */ {tbl1}.name, SUM({tbl2}.num) FROM {tbl1} JOIN {tbl2} ON {tbl1}.num = {tbl2}.num GROUP BY {tbl1}.name" + "sql": "SELECT {tbl1}.name, SUM({tbl2}.num) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.num GROUP BY {tbl1}.name" }, { "description": "Colocated, Dynamic broadcast SEMI-JOIN with partition column", - "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast', is_colocated_by_join_keys='true') */ {tbl1}.num, {tbl1}.name FROM {tbl1} WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} WHERE {tbl2}.val IN ('xxx', 'yyy'))" + "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */ {tbl1}.num, {tbl1}.name FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.val IN ('xxx', 'yyy'))" }, { "description": "Colocated, Dynamic broadcast SEMI-JOIN with partition column and group by partition column", - "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast', is_colocated_by_join_keys='true'), aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, COUNT({tbl1}.name) FROM {tbl1} WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} WHERE {tbl2}.val IN ('xxx', 'yyy')) GROUP BY {tbl1}.num, {tbl1}.name" + "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast'), aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, COUNT({tbl1}.name) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.val IN ('xxx', 'yyy')) GROUP BY {tbl1}.num, {tbl1}.name" }, { "description": "Colocated, Dynamic broadcast SEMI-JOIN with partition column and group by non-partitioned column", - "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast', is_colocated_by_join_keys='true') */ {tbl1}.name, COUNT(*) FROM {tbl1} WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} WHERE {tbl2}.val IN ('xxx', 'yyy')) GROUP BY {tbl1}.name" + "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */ {tbl1}.name, COUNT(*) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.val IN ('xxx', 'yyy')) GROUP BY {tbl1}.name" }, { "description": "Dynamic broadcast SEMI-JOIN with empty right table result", - "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */ {tbl1}.name, COUNT(*) FROM {tbl1} WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} WHERE {tbl2}.val = 'non-exist') GROUP BY {tbl1}.name" + "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */ {tbl1}.name, COUNT(*) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.val = 'non-exist') GROUP BY {tbl1}.name" }, { "description": "Colocated, Dynamic broadcast SEMI-JOIN with partially empty right table result for some servers", - "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast', is_colocated_by_join_keys='true') */ {tbl1}.name, COUNT(*) FROM {tbl1} WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} WHERE {tbl2}.val = 'z') GROUP BY {tbl1}.name" + "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */ {tbl1}.name, COUNT(*) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.val = 'z') GROUP BY {tbl1}.name" Review Comment: can we add test with `set stageParallelism = 2;` variance to the hinted tests here. we never tested option + hints at the same time, good to add. (but can be followed up, i just tried and they all passed) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org