This is an automated email from the ASF dual-hosted git repository. kangkaisen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 1035e86 [Enhancement] Optimize the algorithm of selecting host for a bucket scan task when a backend not alive (#5133) 1035e86 is described below commit 1035e86e0bc568b9c54e823919aa247baf40faae Author: xinghuayu007 <1450306...@qq.com> AuthorDate: Wed Jan 6 10:20:16 2021 +0800 [Enhancement] Optimize the algorithm of selecting host for a bucket scan task when a backend not alive (#5133) --- .../main/java/org/apache/doris/qe/Coordinator.java | 14 ++++- .../java/org/apache/doris/qe/CoordinatorTest.java | 65 ++++++++++++++++++++++ 2 files changed, 76 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index ff05413..f5dd310 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1560,14 +1560,22 @@ public class Coordinator { break; } } - - buckendIdToBucketCountMap.put(buckendId, buckendIdToBucketCountMap.get(buckendId) + 1); Reference<Long> backendIdRef = new Reference<Long>(); TNetworkAddress execHostPort = SimpleScheduler.getHost(buckendId, seqLocation.locations, idToBackend, backendIdRef); if (execHostPort == null) { throw new UserException("there is no scanNode Backend"); } - + //the backend with buckendId is not alive, chose another new backend + if (backendIdRef.getRef() != buckendId) { + //buckendIdToBucketCountMap does not contain the new backend, insert into it + if (!buckendIdToBucketCountMap.containsKey(backendIdRef.getRef())) { + buckendIdToBucketCountMap.put(backendIdRef.getRef(), 1); + } else { //buckendIdToBucketCountMap contains the new backend, update it + buckendIdToBucketCountMap.put(backendIdRef.getRef(), buckendIdToBucketCountMap.get(backendIdRef.getRef()) + 1); + } + } else { //the backend with buckendId is alive, update buckendIdToBucketCountMap directly + buckendIdToBucketCountMap.put(buckendId, buckendIdToBucketCountMap.get(buckendId) + 1); + } addressToBackendID.put(execHostPort, backendIdRef.getRef()); this.fragmentIdToSeqToAddressMap.get(fragmentId).put(bucketSeq, execHostPort); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java index 35efae2..b56fc72 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java @@ -49,6 +49,7 @@ import org.apache.doris.thrift.TUniqueId; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; import org.junit.Assert; import org.junit.Test; @@ -530,5 +531,69 @@ public class CoordinatorTest extends Coordinator { } Assert.assertTrue(hosts.size() == 3); } + + @Test + public void testBucketShuffleWithUnaliveBackend() { + Coordinator coordinator = new Coordinator(context, analyzer, planner); + PlanFragmentId planFragmentId = new PlanFragmentId(1); + // each olaptable bucket have the same TScanRangeLocations, be id is {0, 1, 2} + TScanRangeLocations tScanRangeLocations = new TScanRangeLocations(); + TScanRangeLocation tScanRangeLocation0 = new TScanRangeLocation(); + tScanRangeLocation0.backend_id = 0; + tScanRangeLocation0.server = new TNetworkAddress("0.0.0.0", 9050); + TScanRangeLocation tScanRangeLocation1 = new TScanRangeLocation(); + tScanRangeLocation1.backend_id = 1; + tScanRangeLocation1.server = new TNetworkAddress("0.0.0.1", 9050); + TScanRangeLocation tScanRangeLocation2 = new TScanRangeLocation(); + tScanRangeLocation2.backend_id = 2; + tScanRangeLocation2.server = new TNetworkAddress("0.0.0.2", 9050); + tScanRangeLocations.locations = new ArrayList<>(); + tScanRangeLocations.locations.add(tScanRangeLocation0); + tScanRangeLocations.locations.add(tScanRangeLocation1); + tScanRangeLocations.locations.add(tScanRangeLocation2); + + // init all backend + Backend backend0 = new Backend(0, "0.0.0.0", 9060); + backend0.setAlive(false); + backend0.setBePort(9050); + Backend backend1 = new Backend(1, "0.0.0.1", 9060); + backend1.setAlive(true); + backend1.setBePort(9050); + Backend backend2 = new Backend(2, "0.0.0.2", 9060); + backend2.setAlive(true); + backend2.setBePort(9050); + Map<TNetworkAddress, Long> addressToBackendID = Maps.newHashMap(); + addressToBackendID.put(tScanRangeLocation0.server, tScanRangeLocation0.backend_id); + addressToBackendID.put(tScanRangeLocation1.server, tScanRangeLocation1.backend_id); + addressToBackendID.put(tScanRangeLocation2.server, tScanRangeLocation2.backend_id); + + ImmutableMap<Long, Backend> idToBackend = + new ImmutableMap.Builder<Long, Backend>(). + put(0l, backend0). + put(1l, backend1). + put(2l, backend2).build(); + Map<PlanFragmentId, Map<Long, Integer>> fragmentIdToBuckendIdBucketCountMap = Maps.newHashMap(); + Map<Long, Integer> backendIdBucketCountMap = new HashMap<Long, Integer>(); + fragmentIdToBuckendIdBucketCountMap.put(planFragmentId, backendIdBucketCountMap); + Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds = new HashMap<>(); + BucketShuffleJoinController controller = new BucketShuffleJoinController(fragmentIdToScanNodeIds); + Map<PlanFragmentId, Map<Integer, TNetworkAddress>> fragmentIdToSeqToAddressMap = Maps.newHashMap(); + fragmentIdToSeqToAddressMap.put(planFragmentId, new HashMap<Integer, TNetworkAddress>()); + Deencapsulation.setField(controller, "fragmentIdToBuckendIdBucketCountMap", fragmentIdToBuckendIdBucketCountMap); + Deencapsulation.setField(controller, "fragmentIdToSeqToAddressMap", fragmentIdToSeqToAddressMap); + Deencapsulation.invoke(controller, "getExecHostPortForFragmentIDAndBucketSeq", + tScanRangeLocations, planFragmentId, 1, idToBackend, addressToBackendID); + Assert.assertTrue(backendIdBucketCountMap.size() == 2); + List<Long> backendIds = new ArrayList<Long>(); + List<Integer> counts = new ArrayList<Integer>(); + for (Map.Entry<Long, Integer> item:backendIdBucketCountMap.entrySet()) { + backendIds.add(item.getKey()); + counts.add(item.getValue()); + } + Assert.assertTrue(backendIds.get(0) == 0); + Assert.assertTrue(counts.get(0) == 0); + Assert.assertTrue(backendIds.get(1) == 1); + Assert.assertTrue(counts.get(1) == 1); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org