This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3354645  [BugFix][ColocateJoin] Fix bug of issue 4305 (#4306)
3354645 is described below

commit 3354645c77fd86d9bf0f562e2a5692039629cc20
Author: HappenLee <happen...@hotmail.com>
AuthorDate: Wed Aug 12 12:11:47 2020 +0800

    [BugFix][ColocateJoin] Fix bug of issue 4305 (#4306)
    
    This PR use fragmentIdToSeqToAddressMap replace seqtoAddresss,
    Beacause SeqBucket to Address should bind to fragment
---
 .../src/main/java/org/apache/doris/qe/Coordinator.java | 18 +++++++++++++-----
 1 file changed, 13 insertions(+), 5 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 01da5c6..09e3873 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.qe;
 
+import org.apache.commons.collections.map.HashedMap;
 import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.DescriptorTable;
 import org.apache.doris.catalog.Catalog;
@@ -977,7 +978,9 @@ public class Coordinator {
             }
 
             //for ColocateJoin fragment
-            if (bucketSeqToAddress.size() > 0 && 
isColocateJoin(fragment.getPlanRoot())) {
+            if (isColocateJoin(fragment.getPlanRoot()) && 
fragmentIdToSeqToAddressMap.containsKey(fragment.getFragmentId())
+                    && 
fragmentIdToSeqToAddressMap.get(fragment.getFragmentId()).size() > 0) {
+                Map<Integer, TNetworkAddress> bucketSeqToAddress = 
fragmentIdToSeqToAddressMap.get(fragment.getFragmentId());
                 for (Map.Entry<Integer, Map<Integer, List<TScanRangeParams>>> 
scanRanges : bucketSeqToScanRange.entrySet()) {
                     FInstanceExecParam instanceParam = new 
FInstanceExecParam(null, bucketSeqToAddress.get(scanRanges.getKey()), 0, 
params);
 
@@ -1127,11 +1130,16 @@ public class Coordinator {
             final OlapScanNode scanNode,
             FragmentScanRangeAssignment assignment) throws Exception {
 
+        if 
(!fragmentIdToSeqToAddressMap.containsKey(scanNode.getFragmentId())) {
+            fragmentIdToSeqToAddressMap.put(scanNode.getFragmentId(), new 
HashedMap());
+        }
+        Map<Integer, TNetworkAddress> bucketSeqToAddress = 
fragmentIdToSeqToAddressMap.get(scanNode.getFragmentId());
+
         for(Integer bucketSeq: scanNode.bucketSeq2locations.keySet()) {
             //fill scanRangeParamsList
             List<TScanRangeLocations> locations = 
scanNode.bucketSeq2locations.get(bucketSeq);
             if (!bucketSeqToAddress.containsKey(bucketSeq)) {
-                getExecHostPortForBucketSeq(locations.get(0), bucketSeq);
+                getExecHostPortForFragmentIDAndBucketSeq(locations.get(0), 
scanNode.getFragmentId(), bucketSeq);
             }
 
             for(TScanRangeLocations location: locations) {
@@ -1150,7 +1158,7 @@ public class Coordinator {
     }
 
     // randomly choose a backend from the TScanRangeLocations for a certain 
bucket sequence.
-    private void getExecHostPortForBucketSeq(TScanRangeLocations seqLocation, 
Integer bucketSeq) throws Exception {
+    private void getExecHostPortForFragmentIDAndBucketSeq(TScanRangeLocations 
seqLocation, PlanFragmentId fragmentId, Integer bucketSeq) throws Exception {
         int randomLocation = new 
Random().nextInt(seqLocation.locations.size());
         Reference<Long> backendIdRef = new Reference<Long>();
         TNetworkAddress execHostPort = 
SimpleScheduler.getHost(seqLocation.locations.get(randomLocation).backend_id, 
seqLocation.locations, this.idToBackend, backendIdRef);
@@ -1158,7 +1166,7 @@ public class Coordinator {
             throw new UserException("there is no scanNode Backend");
         }
         this.addressToBackendID.put(execHostPort, backendIdRef.getRef());
-        this.bucketSeqToAddress.put(bucketSeq, execHostPort);
+        this.fragmentIdToSeqToAddressMap.get(fragmentId).put(bucketSeq, 
execHostPort);
     }
 
     private void computeScanRangeAssignmentByScheduler(
@@ -1349,7 +1357,7 @@ public class Coordinator {
     }
 
     private BucketSeqToScanRange bucketSeqToScanRange = new 
BucketSeqToScanRange();
-    private Map<Integer, TNetworkAddress> bucketSeqToAddress = 
Maps.newHashMap();
+    private Map<PlanFragmentId, Map<Integer, TNetworkAddress>> 
fragmentIdToSeqToAddressMap = Maps.newHashMap();
     private Set<Integer> colocateFragmentIds = new HashSet<>();
 
     // record backend execute state


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to