This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5aa90a3bce5 [pipelineX](local shuffle) Fix bucket hash shuffle (#28202)
5aa90a3bce5 is described below

commit 5aa90a3bce5ac55ef480c940a01667774064d10c
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Sun Dec 10 00:35:00 2023 +0800

    [pipelineX](local shuffle) Fix bucket hash shuffle (#28202)
---
 fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java | 9 ++++++++-
 1 file changed, 8 insertions(+), 1 deletion(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index a2877e7538c..cd498ccf9bc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -3596,6 +3596,7 @@ public class Coordinator implements CoordInterface {
             }
 
             Map<TNetworkAddress, TPipelineFragmentParams> res = new HashMap();
+            Map<TNetworkAddress, Integer> instanceIdx = new HashMap();
             for (int i = 0; i < instanceExecParams.size(); ++i) {
                 final FInstanceExecParam instanceExecParam = 
instanceExecParams.get(i);
                 if (!res.containsKey(instanceExecParam.host)) {
@@ -3625,10 +3626,16 @@ public class Coordinator implements CoordInterface {
                     params.setNumBuckets(fragment.getBucketNum());
                     res.put(instanceExecParam.host, params);
                     
res.get(instanceExecParam.host).setBucketSeqToInstanceIdx(new HashMap<Integer, 
Integer>());
+                    instanceIdx.put(instanceExecParam.host, 0);
                 }
+                // Set each bucket belongs to which instance on this BE.
+                // This is used for LocalExchange(BUCKET_HASH_SHUFFLE).
+                int instanceId = instanceIdx.get(instanceExecParam.host);
                 for (int bucket : instanceExecParam.bucketSeqSet) {
-                    
res.get(instanceExecParam.host).getBucketSeqToInstanceIdx().put(bucket, i);
+                    
res.get(instanceExecParam.host).getBucketSeqToInstanceIdx().put(bucket, 
instanceId);
+
                 }
+                instanceIdx.replace(instanceExecParam.host, ++instanceId);
                 TPipelineFragmentParams params = 
res.get(instanceExecParam.host);
                 TPipelineInstanceParams localParams = new 
TPipelineInstanceParams();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to