This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new a5c35eb [Bug] Fix the bug of null pointer exception of colocate join (#5961) a5c35eb is described below commit a5c35eb20f3a10834f2920bf45515b2f4e411edf Author: HappenLee <happen...@hotmail.com> AuthorDate: Thu Jun 3 21:19:58 2021 -0500 [Bug] Fix the bug of null pointer exception of colocate join (#5961) --- fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java | 7 +++++-- .../src/test/java/org/apache/doris/qe/CoordinatorTest.java | 9 +++++++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 4b0018f..cb93a09 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1109,7 +1109,6 @@ public class Coordinator { } } - // One fragment could only have one HashJoinNode private boolean isColocateJoin(PlanNode node) { // TODO(cmy): some internal process, such as broker load task, do not have ConnectContext. // Any configurations needed by the Coordinator should be passed in Coordinator initialization. @@ -1177,6 +1176,7 @@ public class Coordinator { private void computeColocateJoinInstanceParam(PlanFragmentId fragmentId, int parallelExecInstanceNum, FragmentExecParams params) { Map<Integer, TNetworkAddress> bucketSeqToAddress = fragmentIdToSeqToAddressMap.get(fragmentId); + BucketSeqToScanRange bucketSeqToScanRange = fragmentIdTobucketSeqToScanRangeMap.get(fragmentId); Set<Integer> scanNodeIds = fragmentIdToScanNodeIds.get(fragmentId); // 1. count each node in one fragment should scan how many tablet, gather them in one list @@ -1282,8 +1282,11 @@ public class Coordinator { final OlapScanNode scanNode) throws Exception { if (!fragmentIdToSeqToAddressMap.containsKey(scanNode.getFragmentId())) { fragmentIdToSeqToAddressMap.put(scanNode.getFragmentId(), new HashedMap()); + fragmentIdTobucketSeqToScanRangeMap.put(scanNode.getFragmentId(), new BucketSeqToScanRange()); } Map<Integer, TNetworkAddress> bucketSeqToAddress = fragmentIdToSeqToAddressMap.get(scanNode.getFragmentId()); + BucketSeqToScanRange bucketSeqToScanRange = fragmentIdTobucketSeqToScanRangeMap.get(scanNode.getFragmentId()); + HashMap<TNetworkAddress, Long> assignedBytesPerHost = Maps.newHashMap(); for (Integer bucketSeq : scanNode.bucketSeq2locations.keySet()) { //fill scanRangeParamsList @@ -1708,7 +1711,7 @@ public class Coordinator { } } - private BucketSeqToScanRange bucketSeqToScanRange = new BucketSeqToScanRange(); + private Map<PlanFragmentId, BucketSeqToScanRange> fragmentIdTobucketSeqToScanRangeMap = Maps.newHashMap(); private Map<PlanFragmentId, Map<Integer, TNetworkAddress>> fragmentIdToSeqToAddressMap = Maps.newHashMap(); // cache the fragment id to its scan node ids. Used for colocate join. private Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds = Maps.newHashMap(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java index 41e1755..97143fa 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java @@ -108,6 +108,7 @@ public class CoordinatorTest extends Coordinator { Deencapsulation.setField(coordinator, "fragmentIdToSeqToAddressMap", fragmentToBucketSeqToAddress); // 2. set bucketSeqToScanRange in coordinator + Map<PlanFragmentId, BucketSeqToScanRange> fragmentIdBucketSeqToScanRangeMap = new HashMap<>(); BucketSeqToScanRange bucketSeqToScanRange = new BucketSeqToScanRange(); Map<Integer, List<TScanRangeParams>> ScanRangeMap = new HashMap<>(); List<TScanRangeParams> scanRangeParamsList = new ArrayList<>(); @@ -117,7 +118,8 @@ public class CoordinatorTest extends Coordinator { for (int i = 0; i < 3; i++) { bucketSeqToScanRange.put(i, ScanRangeMap); } - Deencapsulation.setField(coordinator, "bucketSeqToScanRange", bucketSeqToScanRange); + fragmentIdBucketSeqToScanRangeMap.put(planFragmentId, bucketSeqToScanRange); + Deencapsulation.setField(coordinator, "fragmentIdTobucketSeqToScanRangeMap", fragmentIdBucketSeqToScanRangeMap); FragmentExecParams params = new FragmentExecParams(null); Deencapsulation.invoke(coordinator, "computeColocateJoinInstanceParam", planFragmentId, 1, params); @@ -293,13 +295,16 @@ public class CoordinatorTest extends Coordinator { Deencapsulation.setField(coordinator, "fragmentIdToSeqToAddressMap", fragmentToBucketSeqToAddress); // 2. set bucketSeqToScanRange in coordinator + Map<PlanFragmentId, BucketSeqToScanRange> fragmentIdBucketSeqToScanRangeMap = new HashMap<>(); BucketSeqToScanRange bucketSeqToScanRange = new BucketSeqToScanRange(); Map<Integer, List<TScanRangeParams>> ScanRangeMap = new HashMap<>(); ScanRangeMap.put(scanNodeId, new ArrayList<>()); for (int i = 0; i < 3; i++) { bucketSeqToScanRange.put(i, ScanRangeMap); } - Deencapsulation.setField(coordinator, "bucketSeqToScanRange", bucketSeqToScanRange); + fragmentIdBucketSeqToScanRangeMap.put(planFragmentId, bucketSeqToScanRange); + Deencapsulation.setField(coordinator, "fragmentIdTobucketSeqToScanRangeMap", fragmentIdBucketSeqToScanRangeMap); + TupleDescriptor tupleDescriptor = new TupleDescriptor(new TupleId(-1)); OlapScanNode olapScanNode = new OlapScanNode(new PlanNodeId(scanNodeId), tupleDescriptor, "test"); PlanFragment fragment = new PlanFragment(planFragmentId, olapScanNode, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org