gortiz commented on code in PR #15764:
URL: https://github.com/apache/pinot/pull/15764#discussion_r2086044084


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java:
##########
@@ -560,30 +560,86 @@ private void 
assignWorkersToPartitionedLeafFragment(DispatchablePlanMetadata met
     // calculates the partition table info using the routing manager
     PartitionTableInfo partitionTableInfo = 
calculatePartitionTableInfo(tableName);
     // verifies that the partition table obtained from routing manager is 
compatible with the hint options
-    checkPartitionInfoMap(partitionTableInfo, tableName, partitionKey, 
numPartitions, partitionFunction);
+    checkPartitionInfoMap(partitionTableInfo, tableName, partitionKey, 
partitionFunction, numWorkers);
 
-    // Pick one server per partition
-    // NOTE: Pick server based on the request id so that the same server is 
picked across different table scan when the
+    // NOTE: Pick worker based on the request id so that the same worker is 
picked across different table scan when the
     //       segments for the same partition is colocated
     long indexToPick = context.getRequestId();
     PartitionInfo[] partitionInfoMap = partitionTableInfo._partitionInfoMap;
+    int numPartitions = partitionInfoMap.length;
+    assert numPartitions % numWorkers == 0;
+    int numPartitionsPerWorker = numPartitions / numWorkers;
     int workerId = 0;
     Map<Integer, QueryServerInstance> workedIdToServerInstanceMap = new 
HashMap<>();
     Map<Integer, Map<String, List<String>>> workerIdToSegmentsMap = new 
HashMap<>();
     Map<String, ServerInstance> enabledServerInstanceMap = 
_routingManager.getEnabledServerInstanceMap();
-    for (int i = 0; i < numPartitions; i++) {
-      PartitionInfo partitionInfo = partitionInfoMap[i];
-      // TODO: Currently we don't support the case when a partition doesn't 
contain any segment. The reason is that the
-      //       leaf stage won't be able to directly return empty response.
-      Preconditions.checkState(partitionInfo != null, "Failed to find any 
segment for table: %s, partition: %s",
-          tableName, i);
-      ServerInstance serverInstance =
-          pickEnabledServer(partitionInfo._fullyReplicatedServers, 
enabledServerInstanceMap, indexToPick++);
-      Preconditions.checkState(serverInstance != null,
-          "Failed to find enabled fully replicated server for table: %s, 
partition: %s", tableName, i);
-      workedIdToServerInstanceMap.put(workerId, new 
QueryServerInstance(serverInstance));
-      workerIdToSegmentsMap.put(workerId, getSegmentsMap(partitionInfo));
-      workerId++;
+    if (numPartitionsPerWorker == 1) {
+      // Pick one worker per partition
+      for (int i = 0; i < numWorkers; i++) {
+        PartitionInfo partitionInfo = partitionInfoMap[i];
+        // TODO: Currently we don't support the case when a partition doesn't 
contain any segment. The reason is that
+        //       the leaf stage won't be able to directly return empty 
response.
+        Preconditions.checkState(partitionInfo != null, "Failed to find any 
segment for table: %s, partition: %s",
+            tableName, i);
+        ServerInstance serverInstance =
+            pickEnabledServer(partitionInfo._fullyReplicatedServers, 
enabledServerInstanceMap, indexToPick++);
+        Preconditions.checkState(serverInstance != null,
+            "Failed to find enabled fully replicated server for table: %s, 
partition: %s", tableName, i);
+        workedIdToServerInstanceMap.put(workerId, new 
QueryServerInstance(serverInstance));
+        workerIdToSegmentsMap.put(workerId,
+            getSegmentsMap(partitionInfo._offlineSegments, 
partitionInfo._realtimeSegments));
+        workerId++;
+      }
+    } else {
+      // Round-robin partitions to workers, where each worker gets 
numPartitionsPerWorker partitions. This setup works
+      // only if all segments for these partitions are assigned to the same 
group of servers. This is useful when user
+      // wants to colocate tables with different partition count, but same 
partition function.
+      // E.g. when there are 16 partitions for table A and 4 partitions for 
table B, we may assign 16 partitions for
+      // table A to 4 workers, where partition 0, 4, 8, 12 goes to worker 0, 
partition 1, 5, 9, 13 goes to worker 1,
+      // etc.
+      for (int i = 0; i < numWorkers; i++) {
+        Set<String> fullyReplicatedServers = null;
+        List<String> offlineSegments = null;
+        List<String> realtimeSegments = null;
+        for (int j = i; j < numPartitions; j += numWorkers) {
+          PartitionInfo partitionInfo = partitionInfoMap[j];
+          if (partitionInfo == null) {
+            continue;
+          }
+          if (fullyReplicatedServers == null) {
+            fullyReplicatedServers = new 
HashSet<>(partitionInfo._fullyReplicatedServers);
+          } else {
+            
fullyReplicatedServers.retainAll(partitionInfo._fullyReplicatedServers);
+          }
+          if (partitionInfo._offlineSegments != null) {
+            if (offlineSegments == null) {
+              offlineSegments = new 
ArrayList<>(partitionInfo._offlineSegments);
+            } else {
+              offlineSegments.addAll(partitionInfo._offlineSegments);
+            }

Review Comment:
   nit: This seems like over-engineering. We could allocate the list at the 
beginning. Remember that a `new ArrayList()` doesn't actually allocate any 
`Object[]` until the first element is added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to