This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a5c35eb  [Bug] Fix the bug of null pointer exception of colocate join 
(#5961)
a5c35eb is described below

commit a5c35eb20f3a10834f2920bf45515b2f4e411edf
Author: HappenLee <happen...@hotmail.com>
AuthorDate: Thu Jun 3 21:19:58 2021 -0500

    [Bug] Fix the bug of null pointer exception of colocate join (#5961)
---
 fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java    | 7 +++++--
 .../src/test/java/org/apache/doris/qe/CoordinatorTest.java       | 9 +++++++--
 2 files changed, 12 insertions(+), 4 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 4b0018f..cb93a09 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1109,7 +1109,6 @@ public class Coordinator {
         }
     }
 
-    // One fragment could only have one HashJoinNode
     private boolean isColocateJoin(PlanNode node) {
         // TODO(cmy): some internal process, such as broker load task, do not 
have ConnectContext.
         // Any configurations needed by the Coordinator should be passed in 
Coordinator initialization.
@@ -1177,6 +1176,7 @@ public class Coordinator {
 
     private void computeColocateJoinInstanceParam(PlanFragmentId fragmentId, 
int parallelExecInstanceNum, FragmentExecParams params) {
         Map<Integer, TNetworkAddress> bucketSeqToAddress = 
fragmentIdToSeqToAddressMap.get(fragmentId);
+        BucketSeqToScanRange bucketSeqToScanRange = 
fragmentIdTobucketSeqToScanRangeMap.get(fragmentId);
         Set<Integer> scanNodeIds = fragmentIdToScanNodeIds.get(fragmentId);
 
         // 1. count each node in one fragment should scan how many tablet, 
gather them in one list
@@ -1282,8 +1282,11 @@ public class Coordinator {
             final OlapScanNode scanNode) throws Exception {
         if 
(!fragmentIdToSeqToAddressMap.containsKey(scanNode.getFragmentId())) {
             fragmentIdToSeqToAddressMap.put(scanNode.getFragmentId(), new 
HashedMap());
+            fragmentIdTobucketSeqToScanRangeMap.put(scanNode.getFragmentId(), 
new BucketSeqToScanRange());
         }
         Map<Integer, TNetworkAddress> bucketSeqToAddress = 
fragmentIdToSeqToAddressMap.get(scanNode.getFragmentId());
+        BucketSeqToScanRange bucketSeqToScanRange = 
fragmentIdTobucketSeqToScanRangeMap.get(scanNode.getFragmentId());
+
         HashMap<TNetworkAddress, Long> assignedBytesPerHost = 
Maps.newHashMap();
         for (Integer bucketSeq : scanNode.bucketSeq2locations.keySet()) {
             //fill scanRangeParamsList
@@ -1708,7 +1711,7 @@ public class Coordinator {
         }
     }
 
-    private BucketSeqToScanRange bucketSeqToScanRange = new 
BucketSeqToScanRange();
+    private Map<PlanFragmentId, BucketSeqToScanRange> 
fragmentIdTobucketSeqToScanRangeMap = Maps.newHashMap();
     private Map<PlanFragmentId, Map<Integer, TNetworkAddress>> 
fragmentIdToSeqToAddressMap = Maps.newHashMap();
     // cache the fragment id to its scan node ids. Used for colocate join.
     private Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds = 
Maps.newHashMap();
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
index 41e1755..97143fa 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
@@ -108,6 +108,7 @@ public class CoordinatorTest extends Coordinator {
         Deencapsulation.setField(coordinator, "fragmentIdToSeqToAddressMap", 
fragmentToBucketSeqToAddress);
 
         // 2. set bucketSeqToScanRange in coordinator
+        Map<PlanFragmentId, BucketSeqToScanRange> 
fragmentIdBucketSeqToScanRangeMap = new HashMap<>();
         BucketSeqToScanRange bucketSeqToScanRange = new BucketSeqToScanRange();
         Map<Integer, List<TScanRangeParams>> ScanRangeMap = new HashMap<>();
         List<TScanRangeParams> scanRangeParamsList = new ArrayList<>();
@@ -117,7 +118,8 @@ public class CoordinatorTest extends Coordinator {
         for (int i = 0; i < 3; i++) {
             bucketSeqToScanRange.put(i, ScanRangeMap);
         }
-        Deencapsulation.setField(coordinator, "bucketSeqToScanRange", 
bucketSeqToScanRange);
+        fragmentIdBucketSeqToScanRangeMap.put(planFragmentId, 
bucketSeqToScanRange);
+        Deencapsulation.setField(coordinator, 
"fragmentIdTobucketSeqToScanRangeMap", fragmentIdBucketSeqToScanRangeMap);
 
         FragmentExecParams params = new FragmentExecParams(null);
         Deencapsulation.invoke(coordinator, 
"computeColocateJoinInstanceParam", planFragmentId, 1, params);
@@ -293,13 +295,16 @@ public class CoordinatorTest extends Coordinator {
         Deencapsulation.setField(coordinator, "fragmentIdToSeqToAddressMap", 
fragmentToBucketSeqToAddress);
 
         // 2. set bucketSeqToScanRange in coordinator
+        Map<PlanFragmentId, BucketSeqToScanRange> 
fragmentIdBucketSeqToScanRangeMap = new HashMap<>();
         BucketSeqToScanRange bucketSeqToScanRange = new BucketSeqToScanRange();
         Map<Integer, List<TScanRangeParams>> ScanRangeMap = new HashMap<>();
         ScanRangeMap.put(scanNodeId, new ArrayList<>());
         for (int i = 0; i < 3; i++) {
             bucketSeqToScanRange.put(i, ScanRangeMap);
         }
-        Deencapsulation.setField(coordinator, "bucketSeqToScanRange", 
bucketSeqToScanRange);
+        fragmentIdBucketSeqToScanRangeMap.put(planFragmentId, 
bucketSeqToScanRange);
+        Deencapsulation.setField(coordinator, 
"fragmentIdTobucketSeqToScanRangeMap", fragmentIdBucketSeqToScanRangeMap);
+
         TupleDescriptor tupleDescriptor = new TupleDescriptor(new TupleId(-1));
         OlapScanNode olapScanNode = new OlapScanNode(new 
PlanNodeId(scanNodeId), tupleDescriptor, "test");
         PlanFragment fragment = new PlanFragment(planFragmentId, olapScanNode,

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to