This is an automated email from the ASF dual-hosted git repository.

chrispeck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c3d24ee57 [multistage] Fix Bug in MailboxInfo Ordering (#16224)
7c3d24ee57 is described below

commit 7c3d24ee57dceb5b086e2495f80ce284276047b2
Author: Ankit Sultana <ankitsult...@uber.com>
AuthorDate: Mon Jun 30 11:59:06 2025 -0500

    [multistage] Fix Bug in MailboxInfo Ordering (#16224)
---
 .../planner/physical/v2/PlanFragmentAndMailboxAssignment.java    | 9 +++++++++
 1 file changed, 9 insertions(+)

diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PlanFragmentAndMailboxAssignment.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PlanFragmentAndMailboxAssignment.java
index abaa363daf..bfc200981d 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PlanFragmentAndMailboxAssignment.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PlanFragmentAndMailboxAssignment.java
@@ -21,6 +21,7 @@ package org.apache.pinot.query.planner.physical.v2;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -253,6 +254,14 @@ public class PlanFragmentAndMailboxAssignment {
     for (var entry : workersByUniqueHostPort.entrySet()) {
       result.add(new MailboxInfo(entry.getKey().getHostname(), 
entry.getKey().getQueryMailboxPort(), entry.getValue()));
     }
+    // IMP: Return mailbox info sorted by workerIds. This is because 
SendingMailbox are created in this order, and
+    // record assignment for hash exchange follows modulo arithmetic. e.g. if 
we have sending mailbox in order:
+    // [worker-1, worker-0], then records with modulo 0 hash would end up in 
worker-1.
+    // Note that the workerIds list will be >1 in length only when there's a 
parallelism change. It's important to
+    // also know that MailboxSendOperator will iterate over this 
List<MailboxInfo> in order, and within each iteration
+    // iterate over all the workerIds of that MailboxInfo. The result 
List<SendingMailbox> is used for modulo
+    // arithmetic for any partitioning exchange strategy.
+    result.sort(Comparator.comparingInt(info -> info.getWorkerIds().get(0)));
     return result;
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to