This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ad5bb408a4 Fix LOOKUP join (#15223)
ad5bb408a4 is described below

commit ad5bb408a406972336a8ffcc8d7c153fc1a07e75
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Tue Mar 18 06:56:09 2025 -0600

    Fix LOOKUP join (#15223)
---
 .../java/org/apache/pinot/query/routing/WorkerManager.java   | 12 +++++++++++-
 1 file changed, 11 insertions(+), 1 deletion(-)

diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
index f2b296ea3b..6963b6f907 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
@@ -105,8 +105,18 @@ public class WorkerManager {
       // find a single local exchange child in the leaf stage, assign workers 
based on the local exchange child.
       if (children.size() == 1 && isLocalExchange(children.get(0), context)) {
         DispatchablePlanMetadata childMetadata = 
metadataMap.get(children.get(0).getFragmentId());
-        
metadata.setWorkerIdToServerInstanceMap(assignWorkersForLocalExchange(childMetadata));
+        Map<Integer, QueryServerInstance> workerIdToServerInstanceMap = 
assignWorkersForLocalExchange(childMetadata);
+        metadata.setWorkerIdToServerInstanceMap(workerIdToServerInstanceMap);
         metadata.setPartitionFunction(childMetadata.getPartitionFunction());
+        // Fake a segments map so that the worker can be correctly identified 
as leaf stage
+        // TODO: Add a query test for LOOKUP join
+        Map<String, List<String>> segmentsMap = 
Map.of(TableType.OFFLINE.name(), List.of());
+        Map<Integer, Map<String, List<String>>> workerIdToSegmentsMap =
+            
Maps.newHashMapWithExpectedSize(workerIdToServerInstanceMap.size());
+        for (Integer workerId : workerIdToServerInstanceMap.keySet()) {
+          workerIdToSegmentsMap.put(workerId, segmentsMap);
+        }
+        metadata.setWorkerIdToSegmentsMap(workerIdToSegmentsMap);
       } else {
         assignWorkersToLeafFragment(fragment, context);
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to