gortiz commented on code in PR #15764: URL: https://github.com/apache/pinot/pull/15764#discussion_r2086044084
########## pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java: ########## @@ -560,30 +560,86 @@ private void assignWorkersToPartitionedLeafFragment(DispatchablePlanMetadata met // calculates the partition table info using the routing manager PartitionTableInfo partitionTableInfo = calculatePartitionTableInfo(tableName); // verifies that the partition table obtained from routing manager is compatible with the hint options - checkPartitionInfoMap(partitionTableInfo, tableName, partitionKey, numPartitions, partitionFunction); + checkPartitionInfoMap(partitionTableInfo, tableName, partitionKey, partitionFunction, numWorkers); - // Pick one server per partition - // NOTE: Pick server based on the request id so that the same server is picked across different table scan when the + // NOTE: Pick worker based on the request id so that the same worker is picked across different table scan when the // segments for the same partition is colocated long indexToPick = context.getRequestId(); PartitionInfo[] partitionInfoMap = partitionTableInfo._partitionInfoMap; + int numPartitions = partitionInfoMap.length; + assert numPartitions % numWorkers == 0; + int numPartitionsPerWorker = numPartitions / numWorkers; int workerId = 0; Map<Integer, QueryServerInstance> workedIdToServerInstanceMap = new HashMap<>(); Map<Integer, Map<String, List<String>>> workerIdToSegmentsMap = new HashMap<>(); Map<String, ServerInstance> enabledServerInstanceMap = _routingManager.getEnabledServerInstanceMap(); - for (int i = 0; i < numPartitions; i++) { - PartitionInfo partitionInfo = partitionInfoMap[i]; - // TODO: Currently we don't support the case when a partition doesn't contain any segment. The reason is that the - // leaf stage won't be able to directly return empty response. - Preconditions.checkState(partitionInfo != null, "Failed to find any segment for table: %s, partition: %s", - tableName, i); - ServerInstance serverInstance = - pickEnabledServer(partitionInfo._fullyReplicatedServers, enabledServerInstanceMap, indexToPick++); - Preconditions.checkState(serverInstance != null, - "Failed to find enabled fully replicated server for table: %s, partition: %s", tableName, i); - workedIdToServerInstanceMap.put(workerId, new QueryServerInstance(serverInstance)); - workerIdToSegmentsMap.put(workerId, getSegmentsMap(partitionInfo)); - workerId++; + if (numPartitionsPerWorker == 1) { + // Pick one worker per partition + for (int i = 0; i < numWorkers; i++) { + PartitionInfo partitionInfo = partitionInfoMap[i]; + // TODO: Currently we don't support the case when a partition doesn't contain any segment. The reason is that + // the leaf stage won't be able to directly return empty response. + Preconditions.checkState(partitionInfo != null, "Failed to find any segment for table: %s, partition: %s", + tableName, i); + ServerInstance serverInstance = + pickEnabledServer(partitionInfo._fullyReplicatedServers, enabledServerInstanceMap, indexToPick++); + Preconditions.checkState(serverInstance != null, + "Failed to find enabled fully replicated server for table: %s, partition: %s", tableName, i); + workedIdToServerInstanceMap.put(workerId, new QueryServerInstance(serverInstance)); + workerIdToSegmentsMap.put(workerId, + getSegmentsMap(partitionInfo._offlineSegments, partitionInfo._realtimeSegments)); + workerId++; + } + } else { + // Round-robin partitions to workers, where each worker gets numPartitionsPerWorker partitions. This setup works + // only if all segments for these partitions are assigned to the same group of servers. This is useful when user + // wants to colocate tables with different partition count, but same partition function. + // E.g. when there are 16 partitions for table A and 4 partitions for table B, we may assign 16 partitions for + // table A to 4 workers, where partition 0, 4, 8, 12 goes to worker 0, partition 1, 5, 9, 13 goes to worker 1, + // etc. + for (int i = 0; i < numWorkers; i++) { + Set<String> fullyReplicatedServers = null; + List<String> offlineSegments = null; + List<String> realtimeSegments = null; + for (int j = i; j < numPartitions; j += numWorkers) { + PartitionInfo partitionInfo = partitionInfoMap[j]; + if (partitionInfo == null) { + continue; + } + if (fullyReplicatedServers == null) { + fullyReplicatedServers = new HashSet<>(partitionInfo._fullyReplicatedServers); + } else { + fullyReplicatedServers.retainAll(partitionInfo._fullyReplicatedServers); + } + if (partitionInfo._offlineSegments != null) { + if (offlineSegments == null) { + offlineSegments = new ArrayList<>(partitionInfo._offlineSegments); + } else { + offlineSegments.addAll(partitionInfo._offlineSegments); + } Review Comment: nit: This seems like over-engineering. We could allocate the list at the beginning. Remember that a `new ArrayList()` doesn't actually allocate any `Object[]` until the first element is added. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org