This is an automated email from the ASF dual-hosted git repository.

huajianlan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a4f7c6e7ca [fix](Nereids) fix new coordinator compute a wrong 
scanRangeNum (#43850)
5a4f7c6e7ca is described below

commit 5a4f7c6e7ca460cce36d6b85f3eb56b30eedc788
Author: 924060929 <lanhuaj...@selectdb.com>
AuthorDate: Wed Nov 13 18:49:21 2024 +0800

    [fix](Nereids) fix new coordinator compute a wrong scanRangeNum (#43850)
    
    fix new coordinator compute a wrong scanRangeNum, introduced by #41730
    
    This bug will show a wrong progress in s3 load:
    ```
    Progress: 0.00%(73/2147483647)
    ```
---
 .../org/apache/doris/qe/CoordinatorContext.java    | 62 +++++++++++++++++-
 .../org/apache/doris/nereids/util/PlanChecker.java | 11 ++++
 .../apache/doris/qe/NereidsCoordinatorTest.java    | 73 ++++++++++++++++++++++
 3 files changed, 144 insertions(+), 2 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java
index 2ab94808f27..aed0fd9c98c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java
@@ -30,6 +30,10 @@ import 
org.apache.doris.nereids.trees.plans.distribute.DistributedPlan;
 import org.apache.doris.nereids.trees.plans.distribute.PipelineDistributedPlan;
 import 
org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker;
 import org.apache.doris.nereids.trees.plans.distribute.worker.job.AssignedJob;
+import 
org.apache.doris.nereids.trees.plans.distribute.worker.job.BucketScanSource;
+import 
org.apache.doris.nereids.trees.plans.distribute.worker.job.DefaultScanSource;
+import org.apache.doris.nereids.trees.plans.distribute.worker.job.ScanRanges;
+import org.apache.doris.nereids.trees.plans.distribute.worker.job.ScanSource;
 import org.apache.doris.nereids.trees.plans.physical.TopnFilter;
 import org.apache.doris.planner.DataSink;
 import org.apache.doris.planner.PlanFragment;
@@ -40,12 +44,18 @@ import org.apache.doris.qe.runtime.QueryProcessor;
 import org.apache.doris.resource.workloadgroup.QueryQueue;
 import org.apache.doris.resource.workloadgroup.QueueToken;
 import org.apache.doris.service.ExecuteEnv;
+import org.apache.doris.thrift.TBrokerScanRange;
 import org.apache.doris.thrift.TDescriptorTable;
+import org.apache.doris.thrift.TExternalScanRange;
+import org.apache.doris.thrift.TFileScanRange;
 import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TPaloScanRange;
 import org.apache.doris.thrift.TPipelineWorkloadGroup;
 import org.apache.doris.thrift.TQueryGlobals;
 import org.apache.doris.thrift.TQueryOptions;
 import org.apache.doris.thrift.TResourceLimit;
+import org.apache.doris.thrift.TScanRange;
+import org.apache.doris.thrift.TScanRangeParams;
 import org.apache.doris.thrift.TUniqueId;
 
 import com.google.common.base.Suppliers;
@@ -418,9 +428,57 @@ public class CoordinatorContext {
 
     private int getScanRangeNum() {
         int scanRangeNum = 0;
-        for (ScanNode scanNode : scanNodes) {
-            scanRangeNum += scanNode.getScanRangeNum();
+        for (PipelineDistributedPlan distributedPlan : distributedPlans) {
+            for (AssignedJob instanceJob : distributedPlan.getInstanceJobs()) {
+                ScanSource scanSource = instanceJob.getScanSource();
+                if (scanSource instanceof BucketScanSource) {
+                    BucketScanSource bucketScanSource = (BucketScanSource) 
scanSource;
+                    for (Map<ScanNode, ScanRanges> kv : 
bucketScanSource.bucketIndexToScanNodeToTablets.values()) {
+                        for (ScanRanges scanRanges : kv.values()) {
+                            for (TScanRangeParams param : scanRanges.params) {
+                                scanRangeNum += 
computeScanRangeNumByScanRange(param);
+                            }
+                        }
+                    }
+                } else {
+                    DefaultScanSource defaultScanSource = (DefaultScanSource) 
scanSource;
+                    for (ScanRanges scanRanges : 
defaultScanSource.scanNodeToScanRanges.values()) {
+                        for (TScanRangeParams param : scanRanges.params) {
+                            scanRangeNum += 
computeScanRangeNumByScanRange(param);
+                        }
+                    }
+                }
+            }
+        }
+        return scanRangeNum;
+    }
+
+    private int computeScanRangeNumByScanRange(TScanRangeParams param) {
+        int scanRangeNum = 0;
+        TScanRange scanRange = param.getScanRange();
+        if (scanRange == null) {
+            return scanRangeNum;
+        }
+        TBrokerScanRange brokerScanRange = scanRange.getBrokerScanRange();
+        if (brokerScanRange != null) {
+            scanRangeNum += brokerScanRange.getRanges().size();
+        }
+        TExternalScanRange externalScanRange = scanRange.getExtScanRange();
+        if (externalScanRange != null) {
+            TFileScanRange fileScanRange = 
externalScanRange.getFileScanRange();
+            if (fileScanRange != null) {
+                if (fileScanRange.isSetRanges()) {
+                    scanRangeNum += fileScanRange.getRanges().size();
+                } else if (fileScanRange.isSetSplitSource()) {
+                    scanRangeNum += 
fileScanRange.getSplitSource().getNumSplits();
+                }
+            }
+        }
+        TPaloScanRange paloScanRange = scanRange.getPaloScanRange();
+        if (paloScanRange != null) {
+            scanRangeNum += 1;
         }
+        // TODO: more ranges?
         return scanRangeNum;
     }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanChecker.java 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanChecker.java
index b95027a1385..f0a45d1e7bc 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanChecker.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanChecker.java
@@ -270,6 +270,17 @@ public class PlanChecker {
         return this;
     }
 
+    public NereidsPlanner plan(String sql) {
+        StatementContext statementContext = new 
StatementContext(connectContext, new OriginStatement(sql, 0));
+        connectContext.setStatementContext(statementContext);
+        NereidsPlanner planner = new NereidsPlanner(statementContext);
+        LogicalPlan parsedPlan = new NereidsParser().parseSingle(sql);
+        LogicalPlanAdapter parsedPlanAdaptor = new 
LogicalPlanAdapter(parsedPlan, statementContext);
+        statementContext.setParsedStatement(parsedPlanAdaptor);
+        planner.planWithLock(parsedPlanAdaptor);
+        return planner;
+    }
+
     public PlanChecker dpHypOptimize() {
         double now = System.currentTimeMillis();
         cascadesContext.getStatementContext().setDpHyp(true);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/qe/NereidsCoordinatorTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/qe/NereidsCoordinatorTest.java
new file mode 100644
index 00000000000..23326d94310
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/NereidsCoordinatorTest.java
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe;
+
+import org.apache.doris.catalog.EnvFactory;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.nereids.NereidsPlanner;
+import org.apache.doris.nereids.util.PlanChecker;
+import org.apache.doris.thrift.TUniqueId;
+import org.apache.doris.utframe.TestWithFeService;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.UUID;
+
+public class NereidsCoordinatorTest extends TestWithFeService {
+    @BeforeAll
+    public void init() throws Exception {
+        FeConstants.runningUnitTest = true;
+
+        createDatabase("test");
+        useDatabase("test");
+
+        createTable("create table tbl(id int) distributed by hash(id) buckets 
10 properties('replication_num' = '1');");
+    }
+
+    @Test
+    public void testNereidsCoordinatorScanRangeNum() throws IOException {
+        NereidsPlanner planner = plan("select * from test.tbl");
+        NereidsCoordinator coordinator = (NereidsCoordinator) 
EnvFactory.getInstance()
+                .createCoordinator(connectContext, null, planner, null);
+        int scanRangeNum = coordinator.getScanRangeNum();
+        Assertions.assertEquals(10, scanRangeNum);
+    }
+
+    @Test
+    public void testNereidsCoordinatorScanRangeNum2() throws IOException {
+        NereidsPlanner planner = plan("select * from 
information_schema.columns");
+        NereidsCoordinator coordinator = (NereidsCoordinator) 
EnvFactory.getInstance()
+                .createCoordinator(connectContext, null, planner, null);
+        int scanRangeNum = coordinator.getScanRangeNum();
+        Assertions.assertEquals(0, scanRangeNum);
+    }
+
+    private NereidsPlanner plan(String sql) throws IOException {
+        ConnectContext connectContext = createDefaultCtx();
+        
connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION,OLAP_SCAN_TABLET_PRUNE");
+        connectContext.setThreadLocalInfo();
+
+        UUID uuid = UUID.randomUUID();
+        connectContext.setQueryId(new TUniqueId(uuid.getMostSignificantBits(), 
uuid.getLeastSignificantBits()));
+        NereidsPlanner planner = PlanChecker.from(connectContext).plan(sql);
+        return planner;
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to