This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 27ee6ed1bd6 branch-3.0: [fix](coordinator) Fix wrong bucket assginment in old-version coordin… #44539 (#44571) 27ee6ed1bd6 is described below commit 27ee6ed1bd62fcfe93682c1d9433b24f77d544e9 Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Tue Nov 26 10:51:36 2024 +0800 branch-3.0: [fix](coordinator) Fix wrong bucket assginment in old-version coordin… #44539 (#44571) Cherry-picked from #44539 Co-authored-by: Gabriel <liwenqi...@selectdb.com> --- .../main/java/org/apache/doris/qe/Coordinator.java | 95 +++++++++++----------- 1 file changed, 47 insertions(+), 48 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 139996ce9ab..8be50772094 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1437,31 +1437,8 @@ public class Coordinator implements CoordInterface { destParams.instanceExecParams.get(0).bucketSeqSet.add(0); } // process bucket shuffle join on fragment without scan node - TNetworkAddress dummyServer = new TNetworkAddress("0.0.0.0", 0); while (bucketSeq < bucketNum) { - TPlanFragmentDestination dest = new TPlanFragmentDestination(); - - dest.fragment_instance_id = new TUniqueId(-1, -1); - dest.server = dummyServer; - dest.setBrpcServer(dummyServer); - - Set<TNetworkAddress> hostSet = new HashSet<>(); - for (int insIdx = 0; insIdx < destParams.instanceExecParams.size(); insIdx++) { - FInstanceExecParam instanceExecParams = destParams.instanceExecParams.get(insIdx); - if (destParams.ignoreDataDistribution - && hostSet.contains(instanceExecParams.host)) { - continue; - } - hostSet.add(instanceExecParams.host); - if (instanceExecParams.bucketSeqSet.contains(bucketSeq)) { - dest.fragment_instance_id = instanceExecParams.instanceId; - dest.server = toRpcHost(instanceExecParams.host); - dest.setBrpcServer(toBrpcHost(instanceExecParams.host)); - instanceExecParams.recvrId = params.destinations.size(); - break; - } - } - + TPlanFragmentDestination dest = setDestination(destParams, params.destinations.size(), bucketSeq); bucketSeq++; params.destinations.add(dest); } @@ -1508,6 +1485,50 @@ public class Coordinator implements CoordInterface { } } + private TPlanFragmentDestination setDestination(FragmentExecParams destParams, int recvrId, int bucketSeq) + throws Exception { + TPlanFragmentDestination dest = new TPlanFragmentDestination(); + TNetworkAddress dummyServer = new TNetworkAddress("0.0.0.0", 0); + dest.fragment_instance_id = new TUniqueId(-1, -1); + dest.server = dummyServer; + dest.setBrpcServer(dummyServer); + + if (destParams.ignoreDataDistribution) { + Map<TNetworkAddress, Pair<TUniqueId, Set<Integer>>> hostToInstanceIdAndBucketSeq + = new HashMap<>(); + for (int insIdx = 0; insIdx < destParams.instanceExecParams.size(); insIdx++) { + FInstanceExecParam instanceExecParams = destParams.instanceExecParams.get(insIdx); + hostToInstanceIdAndBucketSeq.putIfAbsent(instanceExecParams.host, + Pair.of(instanceExecParams.instanceId, new HashSet<>())); + hostToInstanceIdAndBucketSeq.get(instanceExecParams.host).second.addAll( + instanceExecParams.bucketSeqSet); + } + for (int insIdx = 0; insIdx < destParams.instanceExecParams.size(); insIdx++) { + FInstanceExecParam instanceExecParams = destParams.instanceExecParams.get(insIdx); + if (hostToInstanceIdAndBucketSeq.get(instanceExecParams.host).second.contains(bucketSeq)) { + dest.fragment_instance_id = hostToInstanceIdAndBucketSeq.get(instanceExecParams.host) + .first; + dest.server = toRpcHost(instanceExecParams.host); + dest.setBrpcServer(toBrpcHost(instanceExecParams.host)); + instanceExecParams.recvrId = recvrId; + break; + } + } + } else { + for (int insIdx = 0; insIdx < destParams.instanceExecParams.size(); insIdx++) { + FInstanceExecParam instanceExecParams = destParams.instanceExecParams.get(insIdx); + if (instanceExecParams.bucketSeqSet.contains(bucketSeq)) { + dest.fragment_instance_id = instanceExecParams.instanceId; + dest.server = toRpcHost(instanceExecParams.host); + dest.setBrpcServer(toBrpcHost(instanceExecParams.host)); + instanceExecParams.recvrId = recvrId; + break; + } + } + } + return dest; + } + private void computeMultiCastFragmentParams() throws Exception { for (FragmentExecParams params : fragmentExecParamsMap.values()) { if (!(params.fragment instanceof MultiCastPlanFragment)) { @@ -1560,31 +1581,9 @@ public class Coordinator implements CoordInterface { destParams.instanceExecParams.get(0).bucketSeqSet.add(0); } // process bucket shuffle join on fragment without scan node - TNetworkAddress dummyServer = new TNetworkAddress("0.0.0.0", 0); while (bucketSeq < bucketNum) { - TPlanFragmentDestination dest = new TPlanFragmentDestination(); - - dest.fragment_instance_id = new TUniqueId(-1, -1); - dest.server = dummyServer; - dest.setBrpcServer(dummyServer); - - Set<TNetworkAddress> hostSet = new HashSet<>(); - for (int insIdx = 0; insIdx < destParams.instanceExecParams.size(); insIdx++) { - FInstanceExecParam instanceExecParams = destParams.instanceExecParams.get(insIdx); - if (destParams.ignoreDataDistribution - && hostSet.contains(instanceExecParams.host)) { - continue; - } - hostSet.add(instanceExecParams.host); - if (instanceExecParams.bucketSeqSet.contains(bucketSeq)) { - dest.fragment_instance_id = instanceExecParams.instanceId; - dest.server = toRpcHost(instanceExecParams.host); - dest.setBrpcServer(toBrpcHost(instanceExecParams.host)); - instanceExecParams.recvrId = params.destinations.size(); - break; - } - } - + TPlanFragmentDestination dest = setDestination(destParams, params.destinations.size(), + bucketSeq); bucketSeq++; destinations.add(dest); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org