This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 27ee6ed1bd6 branch-3.0: [fix](coordinator) Fix wrong bucket assginment 
in old-version coordin… #44539 (#44571)
27ee6ed1bd6 is described below

commit 27ee6ed1bd62fcfe93682c1d9433b24f77d544e9
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Nov 26 10:51:36 2024 +0800

    branch-3.0: [fix](coordinator) Fix wrong bucket assginment in old-version 
coordin… #44539 (#44571)
    
    Cherry-picked from #44539
    
    Co-authored-by: Gabriel <liwenqi...@selectdb.com>
---
 .../main/java/org/apache/doris/qe/Coordinator.java | 95 +++++++++++-----------
 1 file changed, 47 insertions(+), 48 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 139996ce9ab..8be50772094 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1437,31 +1437,8 @@ public class Coordinator implements CoordInterface {
                     destParams.instanceExecParams.get(0).bucketSeqSet.add(0);
                 }
                 // process bucket shuffle join on fragment without scan node
-                TNetworkAddress dummyServer = new TNetworkAddress("0.0.0.0", 
0);
                 while (bucketSeq < bucketNum) {
-                    TPlanFragmentDestination dest = new 
TPlanFragmentDestination();
-
-                    dest.fragment_instance_id = new TUniqueId(-1, -1);
-                    dest.server = dummyServer;
-                    dest.setBrpcServer(dummyServer);
-
-                    Set<TNetworkAddress> hostSet = new HashSet<>();
-                    for (int insIdx = 0; insIdx < 
destParams.instanceExecParams.size(); insIdx++) {
-                        FInstanceExecParam instanceExecParams = 
destParams.instanceExecParams.get(insIdx);
-                        if (destParams.ignoreDataDistribution
-                                && hostSet.contains(instanceExecParams.host)) {
-                            continue;
-                        }
-                        hostSet.add(instanceExecParams.host);
-                        if 
(instanceExecParams.bucketSeqSet.contains(bucketSeq)) {
-                            dest.fragment_instance_id = 
instanceExecParams.instanceId;
-                            dest.server = toRpcHost(instanceExecParams.host);
-                            
dest.setBrpcServer(toBrpcHost(instanceExecParams.host));
-                            instanceExecParams.recvrId = 
params.destinations.size();
-                            break;
-                        }
-                    }
-
+                    TPlanFragmentDestination dest = setDestination(destParams, 
params.destinations.size(), bucketSeq);
                     bucketSeq++;
                     params.destinations.add(dest);
                 }
@@ -1508,6 +1485,50 @@ public class Coordinator implements CoordInterface {
         }
     }
 
+    private TPlanFragmentDestination setDestination(FragmentExecParams 
destParams, int recvrId, int bucketSeq)
+            throws Exception {
+        TPlanFragmentDestination dest = new TPlanFragmentDestination();
+        TNetworkAddress dummyServer = new TNetworkAddress("0.0.0.0", 0);
+        dest.fragment_instance_id = new TUniqueId(-1, -1);
+        dest.server = dummyServer;
+        dest.setBrpcServer(dummyServer);
+
+        if (destParams.ignoreDataDistribution) {
+            Map<TNetworkAddress, Pair<TUniqueId, Set<Integer>>> 
hostToInstanceIdAndBucketSeq
+                    = new HashMap<>();
+            for (int insIdx = 0; insIdx < 
destParams.instanceExecParams.size(); insIdx++) {
+                FInstanceExecParam instanceExecParams = 
destParams.instanceExecParams.get(insIdx);
+                
hostToInstanceIdAndBucketSeq.putIfAbsent(instanceExecParams.host,
+                        Pair.of(instanceExecParams.instanceId, new 
HashSet<>()));
+                
hostToInstanceIdAndBucketSeq.get(instanceExecParams.host).second.addAll(
+                        instanceExecParams.bucketSeqSet);
+            }
+            for (int insIdx = 0; insIdx < 
destParams.instanceExecParams.size(); insIdx++) {
+                FInstanceExecParam instanceExecParams = 
destParams.instanceExecParams.get(insIdx);
+                if 
(hostToInstanceIdAndBucketSeq.get(instanceExecParams.host).second.contains(bucketSeq))
 {
+                    dest.fragment_instance_id = 
hostToInstanceIdAndBucketSeq.get(instanceExecParams.host)
+                            .first;
+                    dest.server = toRpcHost(instanceExecParams.host);
+                    dest.setBrpcServer(toBrpcHost(instanceExecParams.host));
+                    instanceExecParams.recvrId = recvrId;
+                    break;
+                }
+            }
+        } else {
+            for (int insIdx = 0; insIdx < 
destParams.instanceExecParams.size(); insIdx++) {
+                FInstanceExecParam instanceExecParams = 
destParams.instanceExecParams.get(insIdx);
+                if (instanceExecParams.bucketSeqSet.contains(bucketSeq)) {
+                    dest.fragment_instance_id = instanceExecParams.instanceId;
+                    dest.server = toRpcHost(instanceExecParams.host);
+                    dest.setBrpcServer(toBrpcHost(instanceExecParams.host));
+                    instanceExecParams.recvrId = recvrId;
+                    break;
+                }
+            }
+        }
+        return dest;
+    }
+
     private void computeMultiCastFragmentParams() throws Exception {
         for (FragmentExecParams params : fragmentExecParamsMap.values()) {
             if (!(params.fragment instanceof MultiCastPlanFragment)) {
@@ -1560,31 +1581,9 @@ public class Coordinator implements CoordInterface {
                         
destParams.instanceExecParams.get(0).bucketSeqSet.add(0);
                     }
                     // process bucket shuffle join on fragment without scan 
node
-                    TNetworkAddress dummyServer = new 
TNetworkAddress("0.0.0.0", 0);
                     while (bucketSeq < bucketNum) {
-                        TPlanFragmentDestination dest = new 
TPlanFragmentDestination();
-
-                        dest.fragment_instance_id = new TUniqueId(-1, -1);
-                        dest.server = dummyServer;
-                        dest.setBrpcServer(dummyServer);
-
-                        Set<TNetworkAddress> hostSet = new HashSet<>();
-                        for (int insIdx = 0; insIdx < 
destParams.instanceExecParams.size(); insIdx++) {
-                            FInstanceExecParam instanceExecParams = 
destParams.instanceExecParams.get(insIdx);
-                            if (destParams.ignoreDataDistribution
-                                    && 
hostSet.contains(instanceExecParams.host)) {
-                                continue;
-                            }
-                            hostSet.add(instanceExecParams.host);
-                            if 
(instanceExecParams.bucketSeqSet.contains(bucketSeq)) {
-                                dest.fragment_instance_id = 
instanceExecParams.instanceId;
-                                dest.server = 
toRpcHost(instanceExecParams.host);
-                                
dest.setBrpcServer(toBrpcHost(instanceExecParams.host));
-                                instanceExecParams.recvrId = 
params.destinations.size();
-                                break;
-                            }
-                        }
-
+                        TPlanFragmentDestination dest = 
setDestination(destParams, params.destinations.size(),
+                                bucketSeq);
                         bucketSeq++;
                         destinations.add(dest);
                     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to