This is an automated email from the ASF dual-hosted git repository. huajianlan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 5a4f7c6e7ca [fix](Nereids) fix new coordinator compute a wrong scanRangeNum (#43850) 5a4f7c6e7ca is described below commit 5a4f7c6e7ca460cce36d6b85f3eb56b30eedc788 Author: 924060929 <lanhuaj...@selectdb.com> AuthorDate: Wed Nov 13 18:49:21 2024 +0800 [fix](Nereids) fix new coordinator compute a wrong scanRangeNum (#43850) fix new coordinator compute a wrong scanRangeNum, introduced by #41730 This bug will show a wrong progress in s3 load: ``` Progress: 0.00%(73/2147483647) ``` --- .../org/apache/doris/qe/CoordinatorContext.java | 62 +++++++++++++++++- .../org/apache/doris/nereids/util/PlanChecker.java | 11 ++++ .../apache/doris/qe/NereidsCoordinatorTest.java | 73 ++++++++++++++++++++++ 3 files changed, 144 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java index 2ab94808f27..aed0fd9c98c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java @@ -30,6 +30,10 @@ import org.apache.doris.nereids.trees.plans.distribute.DistributedPlan; import org.apache.doris.nereids.trees.plans.distribute.PipelineDistributedPlan; import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker; import org.apache.doris.nereids.trees.plans.distribute.worker.job.AssignedJob; +import org.apache.doris.nereids.trees.plans.distribute.worker.job.BucketScanSource; +import org.apache.doris.nereids.trees.plans.distribute.worker.job.DefaultScanSource; +import org.apache.doris.nereids.trees.plans.distribute.worker.job.ScanRanges; +import org.apache.doris.nereids.trees.plans.distribute.worker.job.ScanSource; import org.apache.doris.nereids.trees.plans.physical.TopnFilter; import org.apache.doris.planner.DataSink; import org.apache.doris.planner.PlanFragment; @@ -40,12 +44,18 @@ import org.apache.doris.qe.runtime.QueryProcessor; import org.apache.doris.resource.workloadgroup.QueryQueue; import org.apache.doris.resource.workloadgroup.QueueToken; import org.apache.doris.service.ExecuteEnv; +import org.apache.doris.thrift.TBrokerScanRange; import org.apache.doris.thrift.TDescriptorTable; +import org.apache.doris.thrift.TExternalScanRange; +import org.apache.doris.thrift.TFileScanRange; import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TPaloScanRange; import org.apache.doris.thrift.TPipelineWorkloadGroup; import org.apache.doris.thrift.TQueryGlobals; import org.apache.doris.thrift.TQueryOptions; import org.apache.doris.thrift.TResourceLimit; +import org.apache.doris.thrift.TScanRange; +import org.apache.doris.thrift.TScanRangeParams; import org.apache.doris.thrift.TUniqueId; import com.google.common.base.Suppliers; @@ -418,9 +428,57 @@ public class CoordinatorContext { private int getScanRangeNum() { int scanRangeNum = 0; - for (ScanNode scanNode : scanNodes) { - scanRangeNum += scanNode.getScanRangeNum(); + for (PipelineDistributedPlan distributedPlan : distributedPlans) { + for (AssignedJob instanceJob : distributedPlan.getInstanceJobs()) { + ScanSource scanSource = instanceJob.getScanSource(); + if (scanSource instanceof BucketScanSource) { + BucketScanSource bucketScanSource = (BucketScanSource) scanSource; + for (Map<ScanNode, ScanRanges> kv : bucketScanSource.bucketIndexToScanNodeToTablets.values()) { + for (ScanRanges scanRanges : kv.values()) { + for (TScanRangeParams param : scanRanges.params) { + scanRangeNum += computeScanRangeNumByScanRange(param); + } + } + } + } else { + DefaultScanSource defaultScanSource = (DefaultScanSource) scanSource; + for (ScanRanges scanRanges : defaultScanSource.scanNodeToScanRanges.values()) { + for (TScanRangeParams param : scanRanges.params) { + scanRangeNum += computeScanRangeNumByScanRange(param); + } + } + } + } + } + return scanRangeNum; + } + + private int computeScanRangeNumByScanRange(TScanRangeParams param) { + int scanRangeNum = 0; + TScanRange scanRange = param.getScanRange(); + if (scanRange == null) { + return scanRangeNum; + } + TBrokerScanRange brokerScanRange = scanRange.getBrokerScanRange(); + if (brokerScanRange != null) { + scanRangeNum += brokerScanRange.getRanges().size(); + } + TExternalScanRange externalScanRange = scanRange.getExtScanRange(); + if (externalScanRange != null) { + TFileScanRange fileScanRange = externalScanRange.getFileScanRange(); + if (fileScanRange != null) { + if (fileScanRange.isSetRanges()) { + scanRangeNum += fileScanRange.getRanges().size(); + } else if (fileScanRange.isSetSplitSource()) { + scanRangeNum += fileScanRange.getSplitSource().getNumSplits(); + } + } + } + TPaloScanRange paloScanRange = scanRange.getPaloScanRange(); + if (paloScanRange != null) { + scanRangeNum += 1; } + // TODO: more ranges? return scanRangeNum; } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanChecker.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanChecker.java index b95027a1385..f0a45d1e7bc 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanChecker.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanChecker.java @@ -270,6 +270,17 @@ public class PlanChecker { return this; } + public NereidsPlanner plan(String sql) { + StatementContext statementContext = new StatementContext(connectContext, new OriginStatement(sql, 0)); + connectContext.setStatementContext(statementContext); + NereidsPlanner planner = new NereidsPlanner(statementContext); + LogicalPlan parsedPlan = new NereidsParser().parseSingle(sql); + LogicalPlanAdapter parsedPlanAdaptor = new LogicalPlanAdapter(parsedPlan, statementContext); + statementContext.setParsedStatement(parsedPlanAdaptor); + planner.planWithLock(parsedPlanAdaptor); + return planner; + } + public PlanChecker dpHypOptimize() { double now = System.currentTimeMillis(); cascadesContext.getStatementContext().setDpHyp(true); diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/NereidsCoordinatorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/NereidsCoordinatorTest.java new file mode 100644 index 00000000000..23326d94310 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/NereidsCoordinatorTest.java @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.qe; + +import org.apache.doris.catalog.EnvFactory; +import org.apache.doris.common.FeConstants; +import org.apache.doris.nereids.NereidsPlanner; +import org.apache.doris.nereids.util.PlanChecker; +import org.apache.doris.thrift.TUniqueId; +import org.apache.doris.utframe.TestWithFeService; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.UUID; + +public class NereidsCoordinatorTest extends TestWithFeService { + @BeforeAll + public void init() throws Exception { + FeConstants.runningUnitTest = true; + + createDatabase("test"); + useDatabase("test"); + + createTable("create table tbl(id int) distributed by hash(id) buckets 10 properties('replication_num' = '1');"); + } + + @Test + public void testNereidsCoordinatorScanRangeNum() throws IOException { + NereidsPlanner planner = plan("select * from test.tbl"); + NereidsCoordinator coordinator = (NereidsCoordinator) EnvFactory.getInstance() + .createCoordinator(connectContext, null, planner, null); + int scanRangeNum = coordinator.getScanRangeNum(); + Assertions.assertEquals(10, scanRangeNum); + } + + @Test + public void testNereidsCoordinatorScanRangeNum2() throws IOException { + NereidsPlanner planner = plan("select * from information_schema.columns"); + NereidsCoordinator coordinator = (NereidsCoordinator) EnvFactory.getInstance() + .createCoordinator(connectContext, null, planner, null); + int scanRangeNum = coordinator.getScanRangeNum(); + Assertions.assertEquals(0, scanRangeNum); + } + + private NereidsPlanner plan(String sql) throws IOException { + ConnectContext connectContext = createDefaultCtx(); + connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION,OLAP_SCAN_TABLET_PRUNE"); + connectContext.setThreadLocalInfo(); + + UUID uuid = UUID.randomUUID(); + connectContext.setQueryId(new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits())); + NereidsPlanner planner = PlanChecker.from(connectContext).plan(sql); + return planner; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org