This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 5faa792762 Support partition parallelism for partitioned table scan 
(#11266)
5faa792762 is described below

commit 5faa792762a2a3e7ed5839eefd80b578e0cd3d46
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Fri Aug 25 16:08:40 2023 -0700

    Support partition parallelism for partitioned table scan (#11266)
---
 .../apache/calcite/rel/hint/PinotHintOptions.java  |  1 +
 .../planner/physical/DispatchablePlanMetadata.java |  9 ++++
 .../planner/physical/MailboxAssignmentVisitor.java | 60 ++++++++++++++++------
 .../apache/pinot/query/routing/WorkerManager.java  | 55 +++++++++++++-------
 .../src/test/resources/queries/QueryHints.json     | 12 +++++
 5 files changed, 102 insertions(+), 35 deletions(-)

diff --git 
a/pinot-query-planner/src/main/java/org/apache/calcite/rel/hint/PinotHintOptions.java
 
b/pinot-query-planner/src/main/java/org/apache/calcite/rel/hint/PinotHintOptions.java
index 7129d832a2..1625fa1e9c 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/calcite/rel/hint/PinotHintOptions.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/calcite/rel/hint/PinotHintOptions.java
@@ -83,5 +83,6 @@ public class PinotHintOptions {
   public static class TableHintOptions {
     public static final String PARTITION_KEY = "partition_key";
     public static final String PARTITION_SIZE = "partition_size";
+    public static final String PARTITION_PARALLELISM = "partition_parallelism";
   }
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
index abe4f64a46..48378d4e90 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
@@ -70,6 +70,7 @@ public class DispatchablePlanMetadata implements Serializable 
{
 
   // whether a stage is partitioned table scan
   private boolean _isPartitionedTableScan;
+  private int _partitionParallelism;
 
   public DispatchablePlanMetadata() {
     _scannedTables = new ArrayList<>();
@@ -143,6 +144,14 @@ public class DispatchablePlanMetadata implements 
Serializable {
     _isPartitionedTableScan = isPartitionedTableScan;
   }
 
+  public int getPartitionParallelism() {
+    return _partitionParallelism;
+  }
+
+  public void setPartitionParallelism(int partitionParallelism) {
+    _partitionParallelism = partitionParallelism;
+  }
+
   public Map<String, Set<String>> getTableToUnavailableSegmentsMap() {
     return _tableToUnavailableSegmentsMap;
   }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
index a6d758335d..4d3855b2be 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
@@ -69,23 +69,49 @@ public class MailboxAssignmentVisitor extends 
DefaultPostOrderTraversalVisitor<V
         }
       } else if (senderMetadata.isPartitionedTableScan()) {
         // For partitioned table scan, send the data to the worker with the 
same worker id (not necessary the same
-        // instance)
-        // TODO: Support further split the single partition into multiple 
workers
-        Preconditions.checkState(numSenders == numReceivers,
-            "Got different number of workers for partitioned table scan, 
sender: %s, receiver: %s", numSenders,
-            numReceivers);
-        for (int workerId = 0; workerId < numSenders; workerId++) {
-          String mailboxId = MailboxIdUtils.toPlanMailboxId(senderFragmentId, 
workerId, receiverFragmentId, workerId);
-          MailboxMetadata serderMailboxMetadata = new 
MailboxMetadata(Collections.singletonList(mailboxId),
-              Collections.singletonList(new 
VirtualServerAddress(receiverServerMap.get(workerId), workerId)),
-              Collections.emptyMap());
-          MailboxMetadata receiverMailboxMetadata = new 
MailboxMetadata(Collections.singletonList(mailboxId),
-              Collections.singletonList(new 
VirtualServerAddress(senderServerMap.get(workerId), workerId)),
-              Collections.emptyMap());
-          senderMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>())
-              .put(receiverFragmentId, serderMailboxMetadata);
-          receiverMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>())
-              .put(senderFragmentId, receiverMailboxMetadata);
+        // instance). When partition parallelism is configured, send the data 
to the corresponding workers.
+        // NOTE: Do not use partitionParallelism from the metadata because it 
might be configured only in the first
+        //       child. Re-compute it based on the number of receivers.
+        int partitionParallelism = numReceivers / numSenders;
+        if (partitionParallelism == 1) {
+          for (int workerId = 0; workerId < numSenders; workerId++) {
+            String mailboxId = 
MailboxIdUtils.toPlanMailboxId(senderFragmentId, workerId, receiverFragmentId, 
workerId);
+            MailboxMetadata serderMailboxMetadata = new 
MailboxMetadata(Collections.singletonList(mailboxId),
+                Collections.singletonList(new 
VirtualServerAddress(receiverServerMap.get(workerId), workerId)),
+                Collections.emptyMap());
+            MailboxMetadata receiverMailboxMetadata = new 
MailboxMetadata(Collections.singletonList(mailboxId),
+                Collections.singletonList(new 
VirtualServerAddress(senderServerMap.get(workerId), workerId)),
+                Collections.emptyMap());
+            senderMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>())
+                .put(receiverFragmentId, serderMailboxMetadata);
+            receiverMailboxesMap.computeIfAbsent(workerId, k -> new 
HashMap<>())
+                .put(senderFragmentId, receiverMailboxMetadata);
+          }
+        } else {
+          int receiverWorkerId = 0;
+          for (int senderWorkerId = 0; senderWorkerId < numSenders; 
senderWorkerId++) {
+            VirtualServerAddress senderAddress =
+                new VirtualServerAddress(senderServerMap.get(senderWorkerId), 
senderWorkerId);
+            MailboxMetadata senderMailboxMetadata = new MailboxMetadata();
+            senderMailboxesMap.computeIfAbsent(senderWorkerId, k -> new 
HashMap<>())
+                .put(receiverFragmentId, senderMailboxMetadata);
+            for (int i = 0; i < partitionParallelism; i++) {
+              VirtualServerAddress receiverAddress =
+                  new 
VirtualServerAddress(receiverServerMap.get(receiverWorkerId), receiverWorkerId);
+              String mailboxId = 
MailboxIdUtils.toPlanMailboxId(senderFragmentId, senderWorkerId, 
receiverFragmentId,
+                  receiverWorkerId);
+              senderMailboxMetadata.getMailBoxIdList().add(mailboxId);
+              
senderMailboxMetadata.getVirtualAddressList().add(receiverAddress);
+
+              MailboxMetadata receiverMailboxMetadata =
+                  receiverMailboxesMap.computeIfAbsent(receiverWorkerId, k -> 
new HashMap<>())
+                      .computeIfAbsent(senderFragmentId, k -> new 
MailboxMetadata());
+              receiverMailboxMetadata.getMailBoxIdList().add(mailboxId);
+              
receiverMailboxMetadata.getVirtualAddressList().add(senderAddress);
+
+              receiverWorkerId++;
+            }
+          }
         }
       } else {
         // For other exchange types, send the data to all the instances in the 
receiver fragment
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
index da3821d07a..2befb33306 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
@@ -102,21 +102,22 @@ public class WorkerManager {
 
     DispatchablePlanMetadata metadata = 
context.getDispatchablePlanMetadataMap().get(fragment.getFragmentId());
     Map<String, String> tableOptions = metadata.getTableOptions();
-    String partitionKey = null;
-    int numPartitions = 0;
-    if (tableOptions != null) {
-      partitionKey = 
tableOptions.get(PinotHintOptions.TableHintOptions.PARTITION_KEY);
-      String partitionSize = 
tableOptions.get(PinotHintOptions.TableHintOptions.PARTITION_SIZE);
-      if (partitionSize != null) {
-        numPartitions = Integer.parseInt(partitionSize);
-      }
-    }
+    String partitionKey =
+        tableOptions != null ? 
tableOptions.get(PinotHintOptions.TableHintOptions.PARTITION_KEY) : null;
     if (partitionKey == null) {
       assignWorkersToNonPartitionedLeafFragment(metadata, context);
     } else {
-      Preconditions.checkState(numPartitions > 0, "'%s' must be provided for 
partition key: %s",
+      String numPartitionsStr = 
tableOptions.get(PinotHintOptions.TableHintOptions.PARTITION_SIZE);
+      Preconditions.checkState(numPartitionsStr != null, "'%s' must be 
provided for partition key: %s",
           PinotHintOptions.TableHintOptions.PARTITION_SIZE, partitionKey);
-      assignWorkersToPartitionedLeafFragment(metadata, context, partitionKey, 
numPartitions);
+      int numPartitions = Integer.parseInt(numPartitionsStr);
+      Preconditions.checkState(numPartitions > 0, "'%s' must be positive, got: 
%s",
+          PinotHintOptions.TableHintOptions.PARTITION_SIZE, numPartitions);
+      String partitionParallelismStr = 
tableOptions.get(PinotHintOptions.TableHintOptions.PARTITION_PARALLELISM);
+      int partitionParallelism = partitionParallelismStr != null ? 
Integer.parseInt(partitionParallelismStr) : 1;
+      Preconditions.checkState(partitionParallelism > 0, "'%s' must be 
positive: %s, got: %s",
+          PinotHintOptions.TableHintOptions.PARTITION_PARALLELISM, 
partitionParallelism);
+      assignWorkersToPartitionedLeafFragment(metadata, context, partitionKey, 
numPartitions, partitionParallelism);
     }
   }
 
@@ -207,7 +208,7 @@ public class WorkerManager {
   }
 
   private void assignWorkersToPartitionedLeafFragment(DispatchablePlanMetadata 
metadata,
-      DispatchablePlanContext context, String partitionKey, int numPartitions) 
{
+      DispatchablePlanContext context, String partitionKey, int numPartitions, 
int partitionParallelism) {
     String tableName = metadata.getScannedTables().get(0);
     ColocatedTableInfo colocatedTableInfo = getColocatedTableInfo(tableName, 
partitionKey, numPartitions);
 
@@ -238,6 +239,7 @@ public class WorkerManager {
     metadata.setWorkerIdToSegmentsMap(workerIdToSegmentsMap);
     metadata.setTimeBoundaryInfo(colocatedTableInfo._timeBoundaryInfo);
     metadata.setPartitionedTableScan(true);
+    metadata.setPartitionParallelism(partitionParallelism);
   }
 
   private void assignWorkersToIntermediateFragment(PlanFragment fragment, 
DispatchablePlanContext context) {
@@ -249,12 +251,29 @@ public class WorkerManager {
     Map<Integer, DispatchablePlanMetadata> metadataMap = 
context.getDispatchablePlanMetadataMap();
     DispatchablePlanMetadata metadata = 
metadataMap.get(fragment.getFragmentId());
 
-    // If the first child is partitioned table scan, use the same worker 
assignment to avoid shuffling data
-    // TODO: Introduce a hint to control this
-    if (children.size() > 0) {
+    // If the first child is partitioned table scan, use the same worker 
assignment to avoid shuffling data. When
+    // partition parallelism is configured, create multiple intermediate stage 
workers on the same instance for each
+    // worker in the first child.
+    if (!children.isEmpty()) {
       DispatchablePlanMetadata firstChildMetadata = 
metadataMap.get(children.get(0).getFragmentId());
       if (firstChildMetadata.isPartitionedTableScan()) {
-        
metadata.setWorkerIdToServerInstanceMap(firstChildMetadata.getWorkerIdToServerInstanceMap());
+        int partitionParallelism = 
firstChildMetadata.getPartitionParallelism();
+        Map<Integer, QueryServerInstance> childWorkerIdToServerInstanceMap =
+            firstChildMetadata.getWorkerIdToServerInstanceMap();
+        if (partitionParallelism == 1) {
+          
metadata.setWorkerIdToServerInstanceMap(childWorkerIdToServerInstanceMap);
+        } else {
+          int numChildWorkers = childWorkerIdToServerInstanceMap.size();
+          Map<Integer, QueryServerInstance> workerIdToServerInstanceMap = new 
HashMap<>();
+          int workerId = 0;
+          for (int i = 0; i < numChildWorkers; i++) {
+            QueryServerInstance serverInstance = 
childWorkerIdToServerInstanceMap.get(i);
+            for (int j = 0; j < partitionParallelism; j++) {
+              workerIdToServerInstanceMap.put(workerId++, serverInstance);
+            }
+          }
+          metadata.setWorkerIdToServerInstanceMap(workerIdToServerInstanceMap);
+        }
         return;
       }
     }
@@ -308,13 +327,13 @@ public class WorkerManager {
       throw new IllegalStateException(
           "No server instance found for intermediate stage for tables: " + 
Arrays.toString(tableNames.toArray()));
     }
-    Map<String, String> options = context.getPlannerContext().getOptions();
-    int stageParallelism = 
Integer.parseInt(options.getOrDefault(QueryOptionKey.STAGE_PARALLELISM, "1"));
     if (metadata.isRequiresSingletonInstance()) {
       // require singleton should return a single global worker ID with 0;
       metadata.setWorkerIdToServerInstanceMap(Collections.singletonMap(0,
           new 
QueryServerInstance(serverInstances.get(RANDOM.nextInt(serverInstances.size())))));
     } else {
+      Map<String, String> options = context.getPlannerContext().getOptions();
+      int stageParallelism = 
Integer.parseInt(options.getOrDefault(QueryOptionKey.STAGE_PARALLELISM, "1"));
       Map<Integer, QueryServerInstance> workerIdToServerInstanceMap = new 
HashMap<>();
       int workerId = 0;
       for (ServerInstance serverInstance : serverInstances) {
diff --git a/pinot-query-runtime/src/test/resources/queries/QueryHints.json 
b/pinot-query-runtime/src/test/resources/queries/QueryHints.json
index 940b318f1a..c69e2adb2a 100644
--- a/pinot-query-runtime/src/test/resources/queries/QueryHints.json
+++ b/pinot-query-runtime/src/test/resources/queries/QueryHints.json
@@ -65,10 +65,22 @@
         "description": "Group by partition column",
         "sql": "SELECT {tbl1}.num, COUNT(*) FROM {tbl1} /*+ 
tableOptions(partition_key='num', partition_size='4') */ GROUP BY {tbl1}.num"
       },
+      {
+        "description": "Group by partition column with partition parallelism",
+        "sql": "SELECT {tbl1}.num, COUNT(*) FROM {tbl1} /*+ 
tableOptions(partition_key='num', partition_size='4', 
partition_parallelism='2') */ GROUP BY {tbl1}.num"
+      },
       {
         "description": "Colocated JOIN with partition column",
         "sql": "SELECT {tbl1}.num, {tbl1}.name, {tbl2}.num, {tbl2}.val FROM 
{tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} 
/*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = 
{tbl2}.num"
       },
+      {
+        "description": "Colocated JOIN with partition column with partition 
parallelism",
+        "sql": "SELECT {tbl1}.num, {tbl1}.name, {tbl2}.num, {tbl2}.val FROM 
{tbl1} /*+ tableOptions(partition_key='num', partition_size='4', 
partition_parallelism='2') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', 
partition_size='4', partition_parallelism='2') */ ON {tbl1}.num = {tbl2}.num"
+      },
+      {
+        "description": "Colocated JOIN with partition column with partition 
parallelism in first table",
+        "sql": "SELECT {tbl1}.num, {tbl1}.name, {tbl2}.num, {tbl2}.val FROM 
{tbl1} /*+ tableOptions(partition_key='num', partition_size='4', 
partition_parallelism='2') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', 
partition_size='4') */ ON {tbl1}.num = {tbl2}.num"
+      },
       {
         "description": "Colocated JOIN with partition column and group by 
partition column",
         "sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') 
*/ {tbl1}.num, {tbl1}.name, SUM({tbl2}.num) FROM {tbl1} /*+ 
tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ 
tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = 
{tbl2}.num GROUP BY {tbl1}.num, {tbl1}.name"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to