This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 3354645 [BugFix][ColocateJoin] Fix bug of issue 4305 (#4306) 3354645 is described below commit 3354645c77fd86d9bf0f562e2a5692039629cc20 Author: HappenLee <happen...@hotmail.com> AuthorDate: Wed Aug 12 12:11:47 2020 +0800 [BugFix][ColocateJoin] Fix bug of issue 4305 (#4306) This PR use fragmentIdToSeqToAddressMap replace seqtoAddresss, Beacause SeqBucket to Address should bind to fragment --- .../src/main/java/org/apache/doris/qe/Coordinator.java | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 01da5c6..09e3873 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -17,6 +17,7 @@ package org.apache.doris.qe; +import org.apache.commons.collections.map.HashedMap; import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.DescriptorTable; import org.apache.doris.catalog.Catalog; @@ -977,7 +978,9 @@ public class Coordinator { } //for ColocateJoin fragment - if (bucketSeqToAddress.size() > 0 && isColocateJoin(fragment.getPlanRoot())) { + if (isColocateJoin(fragment.getPlanRoot()) && fragmentIdToSeqToAddressMap.containsKey(fragment.getFragmentId()) + && fragmentIdToSeqToAddressMap.get(fragment.getFragmentId()).size() > 0) { + Map<Integer, TNetworkAddress> bucketSeqToAddress = fragmentIdToSeqToAddressMap.get(fragment.getFragmentId()); for (Map.Entry<Integer, Map<Integer, List<TScanRangeParams>>> scanRanges : bucketSeqToScanRange.entrySet()) { FInstanceExecParam instanceParam = new FInstanceExecParam(null, bucketSeqToAddress.get(scanRanges.getKey()), 0, params); @@ -1127,11 +1130,16 @@ public class Coordinator { final OlapScanNode scanNode, FragmentScanRangeAssignment assignment) throws Exception { + if (!fragmentIdToSeqToAddressMap.containsKey(scanNode.getFragmentId())) { + fragmentIdToSeqToAddressMap.put(scanNode.getFragmentId(), new HashedMap()); + } + Map<Integer, TNetworkAddress> bucketSeqToAddress = fragmentIdToSeqToAddressMap.get(scanNode.getFragmentId()); + for(Integer bucketSeq: scanNode.bucketSeq2locations.keySet()) { //fill scanRangeParamsList List<TScanRangeLocations> locations = scanNode.bucketSeq2locations.get(bucketSeq); if (!bucketSeqToAddress.containsKey(bucketSeq)) { - getExecHostPortForBucketSeq(locations.get(0), bucketSeq); + getExecHostPortForFragmentIDAndBucketSeq(locations.get(0), scanNode.getFragmentId(), bucketSeq); } for(TScanRangeLocations location: locations) { @@ -1150,7 +1158,7 @@ public class Coordinator { } // randomly choose a backend from the TScanRangeLocations for a certain bucket sequence. - private void getExecHostPortForBucketSeq(TScanRangeLocations seqLocation, Integer bucketSeq) throws Exception { + private void getExecHostPortForFragmentIDAndBucketSeq(TScanRangeLocations seqLocation, PlanFragmentId fragmentId, Integer bucketSeq) throws Exception { int randomLocation = new Random().nextInt(seqLocation.locations.size()); Reference<Long> backendIdRef = new Reference<Long>(); TNetworkAddress execHostPort = SimpleScheduler.getHost(seqLocation.locations.get(randomLocation).backend_id, seqLocation.locations, this.idToBackend, backendIdRef); @@ -1158,7 +1166,7 @@ public class Coordinator { throw new UserException("there is no scanNode Backend"); } this.addressToBackendID.put(execHostPort, backendIdRef.getRef()); - this.bucketSeqToAddress.put(bucketSeq, execHostPort); + this.fragmentIdToSeqToAddressMap.get(fragmentId).put(bucketSeq, execHostPort); } private void computeScanRangeAssignmentByScheduler( @@ -1349,7 +1357,7 @@ public class Coordinator { } private BucketSeqToScanRange bucketSeqToScanRange = new BucketSeqToScanRange(); - private Map<Integer, TNetworkAddress> bucketSeqToAddress = Maps.newHashMap(); + private Map<PlanFragmentId, Map<Integer, TNetworkAddress>> fragmentIdToSeqToAddressMap = Maps.newHashMap(); private Set<Integer> colocateFragmentIds = new HashSet<>(); // record backend execute state --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org