This is an automated email from the ASF dual-hosted git repository. chrispeck pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 7c3d24ee57 [multistage] Fix Bug in MailboxInfo Ordering (#16224) 7c3d24ee57 is described below commit 7c3d24ee57dceb5b086e2495f80ce284276047b2 Author: Ankit Sultana <ankitsult...@uber.com> AuthorDate: Mon Jun 30 11:59:06 2025 -0500 [multistage] Fix Bug in MailboxInfo Ordering (#16224) --- .../planner/physical/v2/PlanFragmentAndMailboxAssignment.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PlanFragmentAndMailboxAssignment.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PlanFragmentAndMailboxAssignment.java index abaa363daf..bfc200981d 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PlanFragmentAndMailboxAssignment.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PlanFragmentAndMailboxAssignment.java @@ -21,6 +21,7 @@ package org.apache.pinot.query.planner.physical.v2; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import java.util.ArrayList; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -253,6 +254,14 @@ public class PlanFragmentAndMailboxAssignment { for (var entry : workersByUniqueHostPort.entrySet()) { result.add(new MailboxInfo(entry.getKey().getHostname(), entry.getKey().getQueryMailboxPort(), entry.getValue())); } + // IMP: Return mailbox info sorted by workerIds. This is because SendingMailbox are created in this order, and + // record assignment for hash exchange follows modulo arithmetic. e.g. if we have sending mailbox in order: + // [worker-1, worker-0], then records with modulo 0 hash would end up in worker-1. + // Note that the workerIds list will be >1 in length only when there's a parallelism change. It's important to + // also know that MailboxSendOperator will iterate over this List<MailboxInfo> in order, and within each iteration + // iterate over all the workerIds of that MailboxInfo. The result List<SendingMailbox> is used for modulo + // arithmetic for any partitioning exchange strategy. + result.sort(Comparator.comparingInt(info -> info.getWorkerIds().get(0))); return result; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org