This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 5faa792762 Support partition parallelism for partitioned table scan (#11266) 5faa792762 is described below commit 5faa792762a2a3e7ed5839eefd80b578e0cd3d46 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Fri Aug 25 16:08:40 2023 -0700 Support partition parallelism for partitioned table scan (#11266) --- .../apache/calcite/rel/hint/PinotHintOptions.java | 1 + .../planner/physical/DispatchablePlanMetadata.java | 9 ++++ .../planner/physical/MailboxAssignmentVisitor.java | 60 ++++++++++++++++------ .../apache/pinot/query/routing/WorkerManager.java | 55 +++++++++++++------- .../src/test/resources/queries/QueryHints.json | 12 +++++ 5 files changed, 102 insertions(+), 35 deletions(-) diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/hint/PinotHintOptions.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/hint/PinotHintOptions.java index 7129d832a2..1625fa1e9c 100644 --- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/hint/PinotHintOptions.java +++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/hint/PinotHintOptions.java @@ -83,5 +83,6 @@ public class PinotHintOptions { public static class TableHintOptions { public static final String PARTITION_KEY = "partition_key"; public static final String PARTITION_SIZE = "partition_size"; + public static final String PARTITION_PARALLELISM = "partition_parallelism"; } } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java index abe4f64a46..48378d4e90 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java @@ -70,6 +70,7 @@ public class DispatchablePlanMetadata implements Serializable { // whether a stage is partitioned table scan private boolean _isPartitionedTableScan; + private int _partitionParallelism; public DispatchablePlanMetadata() { _scannedTables = new ArrayList<>(); @@ -143,6 +144,14 @@ public class DispatchablePlanMetadata implements Serializable { _isPartitionedTableScan = isPartitionedTableScan; } + public int getPartitionParallelism() { + return _partitionParallelism; + } + + public void setPartitionParallelism(int partitionParallelism) { + _partitionParallelism = partitionParallelism; + } + public Map<String, Set<String>> getTableToUnavailableSegmentsMap() { return _tableToUnavailableSegmentsMap; } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java index a6d758335d..4d3855b2be 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java @@ -69,23 +69,49 @@ public class MailboxAssignmentVisitor extends DefaultPostOrderTraversalVisitor<V } } else if (senderMetadata.isPartitionedTableScan()) { // For partitioned table scan, send the data to the worker with the same worker id (not necessary the same - // instance) - // TODO: Support further split the single partition into multiple workers - Preconditions.checkState(numSenders == numReceivers, - "Got different number of workers for partitioned table scan, sender: %s, receiver: %s", numSenders, - numReceivers); - for (int workerId = 0; workerId < numSenders; workerId++) { - String mailboxId = MailboxIdUtils.toPlanMailboxId(senderFragmentId, workerId, receiverFragmentId, workerId); - MailboxMetadata serderMailboxMetadata = new MailboxMetadata(Collections.singletonList(mailboxId), - Collections.singletonList(new VirtualServerAddress(receiverServerMap.get(workerId), workerId)), - Collections.emptyMap()); - MailboxMetadata receiverMailboxMetadata = new MailboxMetadata(Collections.singletonList(mailboxId), - Collections.singletonList(new VirtualServerAddress(senderServerMap.get(workerId), workerId)), - Collections.emptyMap()); - senderMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()) - .put(receiverFragmentId, serderMailboxMetadata); - receiverMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()) - .put(senderFragmentId, receiverMailboxMetadata); + // instance). When partition parallelism is configured, send the data to the corresponding workers. + // NOTE: Do not use partitionParallelism from the metadata because it might be configured only in the first + // child. Re-compute it based on the number of receivers. + int partitionParallelism = numReceivers / numSenders; + if (partitionParallelism == 1) { + for (int workerId = 0; workerId < numSenders; workerId++) { + String mailboxId = MailboxIdUtils.toPlanMailboxId(senderFragmentId, workerId, receiverFragmentId, workerId); + MailboxMetadata serderMailboxMetadata = new MailboxMetadata(Collections.singletonList(mailboxId), + Collections.singletonList(new VirtualServerAddress(receiverServerMap.get(workerId), workerId)), + Collections.emptyMap()); + MailboxMetadata receiverMailboxMetadata = new MailboxMetadata(Collections.singletonList(mailboxId), + Collections.singletonList(new VirtualServerAddress(senderServerMap.get(workerId), workerId)), + Collections.emptyMap()); + senderMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()) + .put(receiverFragmentId, serderMailboxMetadata); + receiverMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()) + .put(senderFragmentId, receiverMailboxMetadata); + } + } else { + int receiverWorkerId = 0; + for (int senderWorkerId = 0; senderWorkerId < numSenders; senderWorkerId++) { + VirtualServerAddress senderAddress = + new VirtualServerAddress(senderServerMap.get(senderWorkerId), senderWorkerId); + MailboxMetadata senderMailboxMetadata = new MailboxMetadata(); + senderMailboxesMap.computeIfAbsent(senderWorkerId, k -> new HashMap<>()) + .put(receiverFragmentId, senderMailboxMetadata); + for (int i = 0; i < partitionParallelism; i++) { + VirtualServerAddress receiverAddress = + new VirtualServerAddress(receiverServerMap.get(receiverWorkerId), receiverWorkerId); + String mailboxId = MailboxIdUtils.toPlanMailboxId(senderFragmentId, senderWorkerId, receiverFragmentId, + receiverWorkerId); + senderMailboxMetadata.getMailBoxIdList().add(mailboxId); + senderMailboxMetadata.getVirtualAddressList().add(receiverAddress); + + MailboxMetadata receiverMailboxMetadata = + receiverMailboxesMap.computeIfAbsent(receiverWorkerId, k -> new HashMap<>()) + .computeIfAbsent(senderFragmentId, k -> new MailboxMetadata()); + receiverMailboxMetadata.getMailBoxIdList().add(mailboxId); + receiverMailboxMetadata.getVirtualAddressList().add(senderAddress); + + receiverWorkerId++; + } + } } } else { // For other exchange types, send the data to all the instances in the receiver fragment diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java index da3821d07a..2befb33306 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java @@ -102,21 +102,22 @@ public class WorkerManager { DispatchablePlanMetadata metadata = context.getDispatchablePlanMetadataMap().get(fragment.getFragmentId()); Map<String, String> tableOptions = metadata.getTableOptions(); - String partitionKey = null; - int numPartitions = 0; - if (tableOptions != null) { - partitionKey = tableOptions.get(PinotHintOptions.TableHintOptions.PARTITION_KEY); - String partitionSize = tableOptions.get(PinotHintOptions.TableHintOptions.PARTITION_SIZE); - if (partitionSize != null) { - numPartitions = Integer.parseInt(partitionSize); - } - } + String partitionKey = + tableOptions != null ? tableOptions.get(PinotHintOptions.TableHintOptions.PARTITION_KEY) : null; if (partitionKey == null) { assignWorkersToNonPartitionedLeafFragment(metadata, context); } else { - Preconditions.checkState(numPartitions > 0, "'%s' must be provided for partition key: %s", + String numPartitionsStr = tableOptions.get(PinotHintOptions.TableHintOptions.PARTITION_SIZE); + Preconditions.checkState(numPartitionsStr != null, "'%s' must be provided for partition key: %s", PinotHintOptions.TableHintOptions.PARTITION_SIZE, partitionKey); - assignWorkersToPartitionedLeafFragment(metadata, context, partitionKey, numPartitions); + int numPartitions = Integer.parseInt(numPartitionsStr); + Preconditions.checkState(numPartitions > 0, "'%s' must be positive, got: %s", + PinotHintOptions.TableHintOptions.PARTITION_SIZE, numPartitions); + String partitionParallelismStr = tableOptions.get(PinotHintOptions.TableHintOptions.PARTITION_PARALLELISM); + int partitionParallelism = partitionParallelismStr != null ? Integer.parseInt(partitionParallelismStr) : 1; + Preconditions.checkState(partitionParallelism > 0, "'%s' must be positive: %s, got: %s", + PinotHintOptions.TableHintOptions.PARTITION_PARALLELISM, partitionParallelism); + assignWorkersToPartitionedLeafFragment(metadata, context, partitionKey, numPartitions, partitionParallelism); } } @@ -207,7 +208,7 @@ public class WorkerManager { } private void assignWorkersToPartitionedLeafFragment(DispatchablePlanMetadata metadata, - DispatchablePlanContext context, String partitionKey, int numPartitions) { + DispatchablePlanContext context, String partitionKey, int numPartitions, int partitionParallelism) { String tableName = metadata.getScannedTables().get(0); ColocatedTableInfo colocatedTableInfo = getColocatedTableInfo(tableName, partitionKey, numPartitions); @@ -238,6 +239,7 @@ public class WorkerManager { metadata.setWorkerIdToSegmentsMap(workerIdToSegmentsMap); metadata.setTimeBoundaryInfo(colocatedTableInfo._timeBoundaryInfo); metadata.setPartitionedTableScan(true); + metadata.setPartitionParallelism(partitionParallelism); } private void assignWorkersToIntermediateFragment(PlanFragment fragment, DispatchablePlanContext context) { @@ -249,12 +251,29 @@ public class WorkerManager { Map<Integer, DispatchablePlanMetadata> metadataMap = context.getDispatchablePlanMetadataMap(); DispatchablePlanMetadata metadata = metadataMap.get(fragment.getFragmentId()); - // If the first child is partitioned table scan, use the same worker assignment to avoid shuffling data - // TODO: Introduce a hint to control this - if (children.size() > 0) { + // If the first child is partitioned table scan, use the same worker assignment to avoid shuffling data. When + // partition parallelism is configured, create multiple intermediate stage workers on the same instance for each + // worker in the first child. + if (!children.isEmpty()) { DispatchablePlanMetadata firstChildMetadata = metadataMap.get(children.get(0).getFragmentId()); if (firstChildMetadata.isPartitionedTableScan()) { - metadata.setWorkerIdToServerInstanceMap(firstChildMetadata.getWorkerIdToServerInstanceMap()); + int partitionParallelism = firstChildMetadata.getPartitionParallelism(); + Map<Integer, QueryServerInstance> childWorkerIdToServerInstanceMap = + firstChildMetadata.getWorkerIdToServerInstanceMap(); + if (partitionParallelism == 1) { + metadata.setWorkerIdToServerInstanceMap(childWorkerIdToServerInstanceMap); + } else { + int numChildWorkers = childWorkerIdToServerInstanceMap.size(); + Map<Integer, QueryServerInstance> workerIdToServerInstanceMap = new HashMap<>(); + int workerId = 0; + for (int i = 0; i < numChildWorkers; i++) { + QueryServerInstance serverInstance = childWorkerIdToServerInstanceMap.get(i); + for (int j = 0; j < partitionParallelism; j++) { + workerIdToServerInstanceMap.put(workerId++, serverInstance); + } + } + metadata.setWorkerIdToServerInstanceMap(workerIdToServerInstanceMap); + } return; } } @@ -308,13 +327,13 @@ public class WorkerManager { throw new IllegalStateException( "No server instance found for intermediate stage for tables: " + Arrays.toString(tableNames.toArray())); } - Map<String, String> options = context.getPlannerContext().getOptions(); - int stageParallelism = Integer.parseInt(options.getOrDefault(QueryOptionKey.STAGE_PARALLELISM, "1")); if (metadata.isRequiresSingletonInstance()) { // require singleton should return a single global worker ID with 0; metadata.setWorkerIdToServerInstanceMap(Collections.singletonMap(0, new QueryServerInstance(serverInstances.get(RANDOM.nextInt(serverInstances.size()))))); } else { + Map<String, String> options = context.getPlannerContext().getOptions(); + int stageParallelism = Integer.parseInt(options.getOrDefault(QueryOptionKey.STAGE_PARALLELISM, "1")); Map<Integer, QueryServerInstance> workerIdToServerInstanceMap = new HashMap<>(); int workerId = 0; for (ServerInstance serverInstance : serverInstances) { diff --git a/pinot-query-runtime/src/test/resources/queries/QueryHints.json b/pinot-query-runtime/src/test/resources/queries/QueryHints.json index 940b318f1a..c69e2adb2a 100644 --- a/pinot-query-runtime/src/test/resources/queries/QueryHints.json +++ b/pinot-query-runtime/src/test/resources/queries/QueryHints.json @@ -65,10 +65,22 @@ "description": "Group by partition column", "sql": "SELECT {tbl1}.num, COUNT(*) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ GROUP BY {tbl1}.num" }, + { + "description": "Group by partition column with partition parallelism", + "sql": "SELECT {tbl1}.num, COUNT(*) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4', partition_parallelism='2') */ GROUP BY {tbl1}.num" + }, { "description": "Colocated JOIN with partition column", "sql": "SELECT {tbl1}.num, {tbl1}.name, {tbl2}.num, {tbl2}.val FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.num" }, + { + "description": "Colocated JOIN with partition column with partition parallelism", + "sql": "SELECT {tbl1}.num, {tbl1}.name, {tbl2}.num, {tbl2}.val FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4', partition_parallelism='2') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4', partition_parallelism='2') */ ON {tbl1}.num = {tbl2}.num" + }, + { + "description": "Colocated JOIN with partition column with partition parallelism in first table", + "sql": "SELECT {tbl1}.num, {tbl1}.name, {tbl2}.num, {tbl2}.val FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4', partition_parallelism='2') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.num" + }, { "description": "Colocated JOIN with partition column and group by partition column", "sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, {tbl1}.name, SUM({tbl2}.num) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.num GROUP BY {tbl1}.num, {tbl1}.name" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org