This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 5aa90a3bce5 [pipelineX](local shuffle) Fix bucket hash shuffle (#28202) 5aa90a3bce5 is described below commit 5aa90a3bce5ac55ef480c940a01667774064d10c Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Sun Dec 10 00:35:00 2023 +0800 [pipelineX](local shuffle) Fix bucket hash shuffle (#28202) --- fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index a2877e7538c..cd498ccf9bc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -3596,6 +3596,7 @@ public class Coordinator implements CoordInterface { } Map<TNetworkAddress, TPipelineFragmentParams> res = new HashMap(); + Map<TNetworkAddress, Integer> instanceIdx = new HashMap(); for (int i = 0; i < instanceExecParams.size(); ++i) { final FInstanceExecParam instanceExecParam = instanceExecParams.get(i); if (!res.containsKey(instanceExecParam.host)) { @@ -3625,10 +3626,16 @@ public class Coordinator implements CoordInterface { params.setNumBuckets(fragment.getBucketNum()); res.put(instanceExecParam.host, params); res.get(instanceExecParam.host).setBucketSeqToInstanceIdx(new HashMap<Integer, Integer>()); + instanceIdx.put(instanceExecParam.host, 0); } + // Set each bucket belongs to which instance on this BE. + // This is used for LocalExchange(BUCKET_HASH_SHUFFLE). + int instanceId = instanceIdx.get(instanceExecParam.host); for (int bucket : instanceExecParam.bucketSeqSet) { - res.get(instanceExecParam.host).getBucketSeqToInstanceIdx().put(bucket, i); + res.get(instanceExecParam.host).getBucketSeqToInstanceIdx().put(bucket, instanceId); + } + instanceIdx.replace(instanceExecParam.host, ++instanceId); TPipelineFragmentParams params = res.get(instanceExecParam.host); TPipelineInstanceParams localParams = new TPipelineInstanceParams(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org