This is an automated email from the ASF dual-hosted git repository.

ashingau pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 04008333d40 [opt](split) generate and get split batch concurrently 
(#36045)
04008333d40 is described below

commit 04008333d4081d3502330c8c78bd32634699aa6f
Author: Ashin Gau <ashin...@users.noreply.github.com>
AuthorDate: Wed Jun 19 16:15:47 2024 +0800

    [opt](split) generate and get split batch concurrently (#36045)
    
    ## Proposed changes
    
    Generate and get split batch concurrently.
    `SplitSource.getNextBatch` remove the synchronization, and make each get
    their splits concurrently, and `SplitAssignment` generates splits
    asynchronously.
---
 be/src/common/config.cpp                           |   2 +-
 be/src/vec/exec/scan/split_source_connector.cpp    |   1 +
 be/src/vec/exec/scan/split_source_connector.h      |   6 +
 be/src/vec/exec/scan/vfile_scanner.cpp             |   2 +
 be/src/vec/exec/scan/vfile_scanner.h               |   1 +
 .../org/apache/doris/common/util/LocationPath.java |  26 ++--
 .../apache/doris/datasource/ExternalCatalog.java   |   6 +-
 .../doris/datasource/ExternalMetaCacheMgr.java     |  10 ++
 .../apache/doris/datasource/FileQueryScanNode.java |  25 +++-
 .../org/apache/doris/datasource/FileScanNode.java  |   6 +-
 .../apache/doris/datasource/SplitAssignment.java   | 158 ++++++++++++++++-----
 .../apache/doris/datasource/SplitGenerator.java    |  26 ++--
 .../org/apache/doris/datasource/SplitSource.java   |  64 +++++----
 .../doris/datasource/hive/HiveMetaStoreCache.java  |  45 +++---
 .../doris/datasource/hive/source/HiveScanNode.java |  96 ++++++++-----
 .../doris/datasource/hudi/source/HudiScanNode.java | 144 +++++++++++--------
 .../java/org/apache/doris/planner/ScanNode.java    |   4 +-
 .../main/java/org/apache/doris/qe/Coordinator.java |   6 +
 18 files changed, 402 insertions(+), 226 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index b7fdac5f660..5eb0e8d26ba 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -248,7 +248,7 @@ DEFINE_Validator(doris_scanner_thread_pool_thread_num, 
[](const int config) -> b
     }
     return true;
 });
-DEFINE_Int32(remote_split_source_batch_size, "1024");
+DEFINE_Int32(remote_split_source_batch_size, "10240");
 DEFINE_Int32(doris_max_remote_scanner_thread_pool_thread_num, "-1");
 // number of olap scanner thread pool queue size
 DEFINE_Int32(doris_scanner_thread_pool_queue_size, "102400");
diff --git a/be/src/vec/exec/scan/split_source_connector.cpp 
b/be/src/vec/exec/scan/split_source_connector.cpp
index 2d35d3796bc..fae65543e53 100644
--- a/be/src/vec/exec/scan/split_source_connector.cpp
+++ b/be/src/vec/exec/scan/split_source_connector.cpp
@@ -45,6 +45,7 @@ Status RemoteSplitSourceConnector::get_next(bool* has_next, 
TFileRangeDesc* rang
     std::lock_guard<std::mutex> l(_range_lock);
     *has_next = false;
     if (_scan_index == _scan_ranges.size() && !_last_batch) {
+        SCOPED_RAW_TIMER(&_get_split_timer);
         Status coord_status;
         FrontendServiceConnection 
coord(_state->exec_env()->frontend_client_cache(),
                                         _state->get_query_ctx()->coord_addr, 
&coord_status);
diff --git a/be/src/vec/exec/scan/split_source_connector.h 
b/be/src/vec/exec/scan/split_source_connector.h
index cf358846b30..bfda961df34 100644
--- a/be/src/vec/exec/scan/split_source_connector.h
+++ b/be/src/vec/exec/scan/split_source_connector.h
@@ -43,6 +43,8 @@ public:
     virtual int num_scan_ranges() = 0;
 
     virtual TFileScanRangeParams* get_params() = 0;
+
+    virtual int64_t get_split_time() { return 0; }
 };
 
 /**
@@ -95,6 +97,8 @@ private:
     int _scan_index = 0;
     int _range_index = 0;
 
+    int64_t _get_split_timer = 0;
+
 public:
     RemoteSplitSourceConnector(RuntimeState* state, int64 split_source_id, int 
num_splits)
             : _state(state), _split_source_id(split_source_id), 
_num_splits(num_splits) {}
@@ -110,6 +114,8 @@ public:
     TFileScanRangeParams* get_params() override {
         LOG(FATAL) << "Unreachable, params is got by 
file_scan_range_params_map";
     }
+
+    int64_t get_split_time() override { return _get_split_timer; }
 };
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp 
b/be/src/vec/exec/scan/vfile_scanner.cpp
index f6f029b9de0..4932e164649 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -138,6 +138,7 @@ Status VFileScanner::prepare(
     _file_counter = ADD_COUNTER(_local_state->scanner_profile(), "FileNumber", 
TUnit::UNIT);
     _has_fully_rf_file_counter =
             ADD_COUNTER(_local_state->scanner_profile(), 
"HasFullyRfFileNumber", TUnit::UNIT);
+    _get_split_timer = ADD_TIMER(_local_state->scanner_profile(), 
"GetSplitTime");
 
     _file_cache_statistics.reset(new io::FileCacheStatistics());
     _io_ctx.reset(new io::IOContext());
@@ -1162,6 +1163,7 @@ Status VFileScanner::close(RuntimeState* state) {
     if (_cur_reader) {
         RETURN_IF_ERROR(_cur_reader->close());
     }
+    COUNTER_UPDATE(_get_split_timer, _split_source->get_split_time());
 
     RETURN_IF_ERROR(VScanner::close(state));
     return Status::OK();
diff --git a/be/src/vec/exec/scan/vfile_scanner.h 
b/be/src/vec/exec/scan/vfile_scanner.h
index 332bdfe11e1..61d75d65683 100644
--- a/be/src/vec/exec/scan/vfile_scanner.h
+++ b/be/src/vec/exec/scan/vfile_scanner.h
@@ -180,6 +180,7 @@ private:
     RuntimeProfile::Counter* _empty_file_counter = nullptr;
     RuntimeProfile::Counter* _file_counter = nullptr;
     RuntimeProfile::Counter* _has_fully_rf_file_counter = nullptr;
+    RuntimeProfile::Counter* _get_split_timer = nullptr;
 
     const std::unordered_map<std::string, int>* _col_name_to_slot_id = nullptr;
     // single slot filter conjuncts
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
index 38b5250a157..dd1641126bf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
@@ -41,7 +41,7 @@ import java.net.URLEncoder;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.InvalidPathException;
 import java.nio.file.Paths;
-import java.util.HashMap;
+import java.util.Collections;
 import java.util.Map;
 import java.util.UUID;
 
@@ -74,10 +74,14 @@ public class LocationPath {
     }
 
     private LocationPath(String location) {
-        this(location, new HashMap<>());
+        this(location, Collections.emptyMap(), true);
     }
 
     public LocationPath(String location, Map<String, String> props) {
+        this(location, props, true);
+    }
+
+    public LocationPath(String location, Map<String, String> props, boolean 
convertPath) {
         String scheme = parseScheme(location).toLowerCase();
         if (scheme.isEmpty()) {
             locationType = LocationType.NOSCHEME;
@@ -88,7 +92,7 @@ public class LocationPath {
                     locationType = LocationType.HDFS;
                     // Need add hdfs host to location
                     String host = props.get(HdfsResource.DSF_NAMESERVICES);
-                    this.location = normalizedHdfsPath(location, host);
+                    this.location = convertPath ? normalizedHdfsPath(location, 
host) : location;
                     break;
                 case FeConstants.FS_PREFIX_S3:
                     locationType = LocationType.S3;
@@ -96,22 +100,22 @@ public class LocationPath {
                     break;
                 case FeConstants.FS_PREFIX_S3A:
                     locationType = LocationType.S3A;
-                    this.location = convertToS3(location);
+                    this.location = convertPath ? convertToS3(location) : 
location;
                     break;
                 case FeConstants.FS_PREFIX_S3N:
                     // include the check for multi locations and in a table, 
such as both s3 and hdfs are in a table.
                     locationType = LocationType.S3N;
-                    this.location = convertToS3(location);
+                    this.location = convertPath ? convertToS3(location) : 
location;
                     break;
                 case FeConstants.FS_PREFIX_BOS:
                     locationType = LocationType.BOS;
                     // use s3 client to access
-                    this.location = convertToS3(location);
+                    this.location = convertPath ? convertToS3(location) : 
location;
                     break;
                 case FeConstants.FS_PREFIX_GCS:
                     locationType = LocationType.GCS;
                     // use s3 client to access
-                    this.location = convertToS3(location);
+                    this.location = convertPath ? convertToS3(location) : 
location;
                     break;
                 case FeConstants.FS_PREFIX_OSS:
                     if (isHdfsOnOssEndpoint(location)) {
@@ -119,7 +123,7 @@ public class LocationPath {
                         this.location = location;
                     } else {
                         if (useS3EndPoint(props)) {
-                            this.location = convertToS3(location);
+                            this.location = convertPath ? 
convertToS3(location) : location;
                         } else {
                             this.location = location;
                         }
@@ -128,7 +132,7 @@ public class LocationPath {
                     break;
                 case FeConstants.FS_PREFIX_COS:
                     if (useS3EndPoint(props)) {
-                        this.location = convertToS3(location);
+                        this.location = convertPath ? convertToS3(location) : 
location;
                     } else {
                         this.location = location;
                     }
@@ -136,7 +140,7 @@ public class LocationPath {
                     break;
                 case FeConstants.FS_PREFIX_OBS:
                     if (useS3EndPoint(props)) {
-                        this.location = convertToS3(location);
+                        this.location = convertPath ? convertToS3(location) : 
location;
                     } else {
                         this.location = location;
                     }
@@ -331,7 +335,7 @@ public class LocationPath {
         if (location == null || location.isEmpty()) {
             return null;
         }
-        LocationPath locationPath = new LocationPath(location);
+        LocationPath locationPath = new LocationPath(location, 
Collections.emptyMap(), false);
         return locationPath.getTFileTypeForBE();
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index a72bd709541..5bdbe594059 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -792,11 +792,7 @@ public abstract class ExternalCatalog
     }
 
     public String bindBrokerName() {
-        Map<String, String> properties = catalogProperty.getProperties();
-        if (properties.containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) {
-            return properties.get(HMSExternalCatalog.BIND_BROKER_NAME);
-        }
-        return null;
+        return 
catalogProperty.getProperties().get(HMSExternalCatalog.BIND_BROKER_NAME);
     }
 
     // ATTN: this method only return all cached databases.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
index 06da3c9d5e1..513fc951672 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
@@ -77,6 +77,7 @@ public class ExternalMetaCacheMgr {
     private ExecutorService rowCountRefreshExecutor;
     private ExecutorService commonRefreshExecutor;
     private ExecutorService fileListingExecutor;
+    private ExecutorService scheduleExecutor;
 
     // catalog id -> HiveMetaStoreCache
     private final Map<Long, HiveMetaStoreCache> cacheMap = 
Maps.newConcurrentMap();
@@ -109,6 +110,11 @@ public class ExternalMetaCacheMgr {
                 Config.max_external_cache_loader_thread_pool_size * 1000,
                 "FileListingExecutor", 10, true);
 
+        scheduleExecutor = ThreadPoolManager.newDaemonFixedThreadPool(
+                Config.max_external_cache_loader_thread_pool_size,
+                Config.max_external_cache_loader_thread_pool_size * 1000,
+                "scheduleExecutor", 10, true);
+
         fsCache = new FileSystemCache();
         rowCountCache = new ExternalRowCountCache(rowCountRefreshExecutor);
 
@@ -121,6 +127,10 @@ public class ExternalMetaCacheMgr {
         return fileListingExecutor;
     }
 
+    public ExecutorService getScheduleExecutor() {
+        return scheduleExecutor;
+    }
+
     public HiveMetaStoreCache getMetaStoreCache(HMSExternalCatalog catalog) {
         HiveMetaStoreCache cache = cacheMap.get(catalog.getId());
         if (cache == null) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
index ce5e7991515..9039f1a8c58 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
@@ -317,18 +317,19 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
         if (isBatchMode()) {
             // File splits are generated lazily, and fetched by backends while 
scanning.
             // Only provide the unique ID of split source to backend.
-            SplitAssignment splitAssignment = new 
SplitAssignment(backendPolicy, this);
+            splitAssignment = new SplitAssignment(
+                    backendPolicy, this, this::splitToScanRange, 
locationProperties, pathPartitionKeys);
             splitAssignment.init();
             if (ConnectContext.get().getExecutor() != null) {
                 
ConnectContext.get().getExecutor().getSummaryProfile().setGetSplitsFinishTime();
             }
-            if (splitAssignment.getCurrentAssignment().isEmpty() && 
!(getLocationType() == TFileType.FILE_STREAM)) {
+            if (splitAssignment.getSampleSplit() == null && 
!(getLocationType() == TFileType.FILE_STREAM)) {
                 return;
             }
-            inputSplitsNum = splitAssignment.numApproximateSplits();
+            inputSplitsNum = numApproximateSplits();
 
             TFileType locationType;
-            FileSplit fileSplit = (FileSplit) 
splitAssignment.getCurrentAssignment().values().iterator().next();
+            FileSplit fileSplit = (FileSplit) splitAssignment.getSampleSplit();
             if (fileSplit instanceof IcebergSplit
                     && ((IcebergSplit) 
fileSplit).getConfig().containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) {
                 locationType = TFileType.FILE_BROKER;
@@ -337,10 +338,9 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
             }
             totalFileSize = fileSplit.getLength() * inputSplitsNum;
             // Not accurate, only used to estimate concurrency.
-            int numSplitsPerBE = splitAssignment.numApproximateSplits() / 
backendPolicy.numBackends();
+            int numSplitsPerBE = numApproximateSplits() / 
backendPolicy.numBackends();
             for (Backend backend : backendPolicy.getBackends()) {
-                SplitSource splitSource = new SplitSource(
-                        this::splitToScanRange, backend, locationProperties, 
splitAssignment, pathPartitionKeys);
+                SplitSource splitSource = new SplitSource(backend, 
splitAssignment);
                 splitSources.add(splitSource);
                 
Env.getCurrentEnv().getSplitSourceManager().registerSplitSource(splitSource);
                 TScanRangeLocations curLocations = newLocations();
@@ -582,4 +582,15 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
     protected abstract TableIf getTargetTable() throws UserException;
 
     protected abstract Map<String, String> getLocationProperties() throws 
UserException;
+
+    @Override
+    public void stop() {
+        if (splitAssignment != null) {
+            splitAssignment.stop();
+            SplitSourceManager manager = 
Env.getCurrentEnv().getSplitSourceManager();
+            for (Long sourceId : splitAssignment.getSources()) {
+                manager.removeSplitSource(sourceId);
+            }
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
index 92cdfbcfa1f..bb6865582fc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
@@ -110,7 +110,11 @@ public abstract class FileScanNode extends 
ExternalScanNode {
             output.append(getRuntimeFilterExplainString(false));
         }
 
-        
output.append(prefix).append("inputSplitNum=").append(inputSplitsNum).append(", 
totalFileSize=")
+        output.append(prefix);
+        if (isBatchMode()) {
+            output.append("(approximate)");
+        }
+        output.append("inputSplitNum=").append(inputSplitsNum).append(", 
totalFileSize=")
             .append(totalFileSize).append(", 
scanRanges=").append(scanRangeLocations.size()).append("\n");
         
output.append(prefix).append("partition=").append(readPartitionNum).append("/").append(totalPartitionNum)
             .append("\n");
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java
index f41eaba7dd8..928854b91d1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java
@@ -18,68 +18,160 @@
 package org.apache.doris.datasource;
 
 import org.apache.doris.common.UserException;
-import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.spi.Split;
 import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TScanRangeLocations;
 
-import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Multimap;
 
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * When file splits are supplied in batch mode, splits are generated lazily 
and assigned in each call of `getNextBatch`.
  * `SplitGenerator` provides the file splits, and `FederationBackendPolicy` 
assigns these splits to backends.
  */
 public class SplitAssignment {
-    // magic number to estimate how many splits are allocated to BE in each 
batch
-    private static final int NUM_SPLITS_PER_BE = 1024;
-    // magic number to estimate how many splits are generated of each 
partition in each batch.
-    private static final int NUM_SPLITS_PER_PARTITION = 10;
-
+    private final Set<Long> sources = new HashSet<>();
     private final FederationBackendPolicy backendPolicy;
     private final SplitGenerator splitGenerator;
-    // Store the current assignment of file splits
-    private final Multimap<Backend, Split> assignment;
-    private final int maxBatchSize;
+    private final ConcurrentHashMap<Backend, 
BlockingQueue<Collection<TScanRangeLocations>>> assignment
+            = new ConcurrentHashMap<>();
+    private final SplitToScanRange splitToScanRange;
+    private final Map<String, String> locationProperties;
+    private final List<String> pathPartitionKeys;
+    private final Object assignLock = new Object();
+    private Split sampleSplit = null;
+    private final AtomicBoolean isStop = new AtomicBoolean(false);
+    private final AtomicBoolean scheduleFinished = new AtomicBoolean(false);
+
+    private UserException exception = null;
 
-    public SplitAssignment(FederationBackendPolicy backendPolicy, 
SplitGenerator splitGenerator) {
+    public SplitAssignment(
+            FederationBackendPolicy backendPolicy,
+            SplitGenerator splitGenerator,
+            SplitToScanRange splitToScanRange,
+            Map<String, String> locationProperties,
+            List<String> pathPartitionKeys) {
         this.backendPolicy = backendPolicy;
         this.splitGenerator = splitGenerator;
-        this.assignment = ArrayListMultimap.create();
-        int numPartitions = 
ConnectContext.get().getSessionVariable().getNumPartitionsInBatchMode();
-        maxBatchSize = Math.min(NUM_SPLITS_PER_PARTITION * numPartitions,
-                NUM_SPLITS_PER_BE * backendPolicy.numBackends());
+        this.splitToScanRange = splitToScanRange;
+        this.locationProperties = locationProperties;
+        this.pathPartitionKeys = pathPartitionKeys;
     }
 
     public void init() throws UserException {
-        if (assignment.isEmpty() && splitGenerator.hasNext()) {
-            
assignment.putAll(backendPolicy.computeScanRangeAssignment(splitGenerator.getNextBatch(maxBatchSize)));
+        splitGenerator.startSplit();
+        synchronized (assignLock) {
+            while (sampleSplit == null && waitFirstSplit()) {
+                try {
+                    assignLock.wait(100);
+                } catch (InterruptedException e) {
+                    throw new UserException(e.getMessage(), e);
+                }
+            }
+        }
+        if (exception != null) {
+            throw exception;
+        }
+    }
+
+    private boolean waitFirstSplit() {
+        return !scheduleFinished.get() && !isStop.get() && exception == null;
+    }
+
+    private void appendBatch(Multimap<Backend, Split> batch) throws 
UserException {
+        for (Backend backend : batch.keySet()) {
+            Collection<Split> splits = batch.get(backend);
+            List<TScanRangeLocations> locations = new 
ArrayList<>(splits.size());
+            for (Split split : splits) {
+                locations.add(splitToScanRange.getScanRange(backend, 
locationProperties, split, pathPartitionKeys));
+            }
+            if (!assignment.computeIfAbsent(backend, be -> new 
LinkedBlockingQueue<>()).offer(locations)) {
+                throw new UserException("Failed to offer batch split");
+            }
         }
     }
 
-    public Multimap<Backend, Split> getCurrentAssignment() {
-        return assignment;
+    public void registerSource(long uniqueId) {
+        sources.add(uniqueId);
+    }
+
+    public Set<Long> getSources() {
+        return sources;
     }
 
-    public int numApproximateSplits() {
-        return splitGenerator.numApproximateSplits();
+    public Split getSampleSplit() {
+        return sampleSplit;
     }
 
-    public synchronized Collection<Split> getNextBatch(Backend backend) throws 
UserException {
-        // Each call should consume all splits
-        Collection<Split> splits = assignment.removeAll(backend);
-        while (splits.isEmpty()) {
-            // Get the next batch of splits, and assign to backends
-            // If there is data skewing, it maybe causes splits to accumulate 
on some BE
-            if (!splitGenerator.hasNext()) {
-                return splits;
+    public void addToQueue(List<Split> splits) {
+        if (splits.isEmpty()) {
+            return;
+        }
+        Multimap<Backend, Split> batch = null;
+        synchronized (assignLock) {
+            if (sampleSplit == null) {
+                sampleSplit = splits.get(0);
+                assignLock.notify();
+            }
+            try {
+                batch = backendPolicy.computeScanRangeAssignment(splits);
+            } catch (UserException e) {
+                exception = e;
+            }
+        }
+        if (batch != null) {
+            try {
+                appendBatch(batch);
+            } catch (UserException e) {
+                exception = e;
             }
-            // todo: In each batch, it's to find the optimal assignment for 
partial splits,
-            //  how to solve the global data skew?
-            
assignment.putAll(backendPolicy.computeScanRangeAssignment(splitGenerator.getNextBatch(maxBatchSize)));
-            splits = assignment.removeAll(backend);
+        }
+    }
+
+    private void notifyAssignment() {
+        synchronized (assignLock) {
+            assignLock.notify();
+        }
+    }
+
+    public BlockingQueue<Collection<TScanRangeLocations>> 
getAssignedSplits(Backend backend) throws UserException {
+        if (exception != null) {
+            throw exception;
+        }
+        BlockingQueue<Collection<TScanRangeLocations>> splits = 
assignment.computeIfAbsent(backend,
+                be -> new LinkedBlockingQueue<>());
+        if (scheduleFinished.get() && splits.isEmpty() || isStop.get()) {
+            return null;
         }
         return splits;
     }
+
+    public void setException(UserException e) {
+        exception = e;
+        notifyAssignment();
+    }
+
+    public void finishSchedule() {
+        scheduleFinished.set(true);
+        notifyAssignment();
+    }
+
+    public void stop() {
+        isStop.set(true);
+        notifyAssignment();
+    }
+
+    public boolean isStop() {
+        return isStop.get();
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java
index b819c7f9ef2..c4a373bc85b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java
@@ -28,27 +28,12 @@ import java.util.List;
  * The consumer should call `getNextBatch` to fetch the next batch of splits.
  */
 public interface SplitGenerator {
-    /**
-     * Get the next batch of splits. If the producer(e.g. ScanNode) doesn't 
support batch mode,
-     * should throw user exceptions.
-     */
-    default List<Split> getNextBatch(int maxBatchSize) throws UserException {
-        throw new NotImplementedException("Should implement getNextBatch if in 
batch mode.");
-    }
-
     /**
      * Get all file splits if the producer doesn't support batch mode.
      */
     default List<Split> getSplits() throws UserException {
         // todo: remove this interface if batch mode is stable
-        throw new NotImplementedException("Scan node sub class need to 
implement getSplits interface.");
-    }
-
-    /**
-     * `getNextBatch` should return empty list even if `hasNext` returns false.
-     */
-    default boolean hasNext() {
-        return false;
+        throw new NotImplementedException("Not implement");
     }
 
     /**
@@ -65,4 +50,13 @@ public interface SplitGenerator {
     default int numApproximateSplits() {
         return -1;
     }
+
+    default void startSplit() {
+    }
+
+    /**
+     * Close split generator, and stop the split executor
+     */
+    default void stop() {
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitSource.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitSource.java
index 74e6aa88ba3..dce135292ec 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitSource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitSource.java
@@ -18,16 +18,17 @@
 package org.apache.doris.datasource;
 
 import org.apache.doris.common.UserException;
-import org.apache.doris.spi.Split;
 import org.apache.doris.system.Backend;
 import org.apache.doris.thrift.TScanRangeLocations;
 
-import java.util.ArrayList;
+import com.google.common.collect.Lists;
+
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -42,28 +43,20 @@ import java.util.concurrent.atomic.AtomicLong;
  */
 public class SplitSource {
     private static final AtomicLong UNIQUE_ID_GENERATOR = new AtomicLong(0);
+    private static final long WAIT_TIME_OUT = 100; // 100ms
+    private static final long MAX_WAIT_TIME_OUT = 500; // 500ms
 
     private final long uniqueId;
-    private final SplitToScanRange splitToScanRange;
     private final Backend backend;
-    private final Map<String, String> locationProperties;
-    private final List<String> pathPartitionKeys;
     private final SplitAssignment splitAssignment;
-    private Iterator<Split> splitIterator = null;
-    private boolean isLastBatch = false;
+    private final AtomicBoolean isLastBatch;
 
-    public SplitSource(
-            SplitToScanRange splitToScanRange,
-            Backend backend,
-            Map<String, String> locationProperties,
-            SplitAssignment splitAssignment,
-            List<String> pathPartitionKeys) {
+    public SplitSource(Backend backend, SplitAssignment splitAssignment) {
         this.uniqueId = UNIQUE_ID_GENERATOR.getAndIncrement();
-        this.splitToScanRange = splitToScanRange;
         this.backend = backend;
-        this.locationProperties = locationProperties;
-        this.pathPartitionKeys = pathPartitionKeys;
         this.splitAssignment = splitAssignment;
+        this.isLastBatch = new AtomicBoolean(false);
+        splitAssignment.registerSource(uniqueId);
     }
 
     public long getUniqueId() {
@@ -73,22 +66,33 @@ public class SplitSource {
     /**
      * Get the next batch of file splits. If there's no more split, return 
empty list.
      */
-    public synchronized List<TScanRangeLocations> getNextBatch(int 
maxBatchSize) throws UserException {
-        if (isLastBatch) {
+    public List<TScanRangeLocations> getNextBatch(int maxBatchSize) throws 
UserException {
+        if (isLastBatch.get()) {
             return Collections.emptyList();
         }
-        List<TScanRangeLocations> scanRanges = new ArrayList<>(maxBatchSize);
-        for (int i = 0; i < maxBatchSize; i++) {
-            if (splitIterator == null || !splitIterator.hasNext()) {
-                Collection<Split> splits = 
splitAssignment.getNextBatch(backend);
-                if (splits.isEmpty()) {
-                    isLastBatch = true;
-                    return scanRanges;
+        List<TScanRangeLocations> scanRanges = 
Lists.newArrayListWithExpectedSize(maxBatchSize);
+        long maxTimeOut = 0;
+        while (scanRanges.size() < maxBatchSize) {
+            BlockingQueue<Collection<TScanRangeLocations>> splits = 
splitAssignment.getAssignedSplits(backend);
+            if (splits == null) {
+                isLastBatch.set(true);
+                break;
+            }
+            while (scanRanges.size() < maxBatchSize) {
+                try {
+                    Collection<TScanRangeLocations> splitCollection = 
splits.poll(WAIT_TIME_OUT, TimeUnit.MILLISECONDS);
+                    if (splitCollection == null) {
+                        maxTimeOut += WAIT_TIME_OUT;
+                        break;
+                    }
+                    scanRanges.addAll(splitCollection);
+                } catch (InterruptedException e) {
+                    throw new UserException("Failed to get next batch of 
splits", e);
                 }
-                splitIterator = splits.iterator();
             }
-            scanRanges.add(splitToScanRange.getScanRange(
-                    backend, locationProperties, splitIterator.next(), 
pathPartitionKeys));
+            if (maxTimeOut >= MAX_WAIT_TIME_OUT && !scanRanges.isEmpty()) {
+                break;
+            }
         }
         return scanRanges;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index a22e951be40..b76b4675dee 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -30,7 +30,6 @@ import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.CacheFactory;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.FeConstants;
-import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.security.authentication.AuthenticationConfig;
 import org.apache.doris.common.util.CacheBulkLoader;
@@ -468,37 +467,39 @@ public class HiveMetaStoreCache {
 
     public List<FileCacheValue> 
getFilesByPartitionsWithCache(List<HivePartition> partitions,
                                                               String 
bindBrokerName) {
-        return getFilesByPartitions(partitions, true, bindBrokerName);
+        return getFilesByPartitions(partitions, true, true, bindBrokerName);
     }
 
     public List<FileCacheValue> 
getFilesByPartitionsWithoutCache(List<HivePartition> partitions,
                                                                  String 
bindBrokerName) {
-        return getFilesByPartitions(partitions, false, bindBrokerName);
+        return getFilesByPartitions(partitions, false, true, bindBrokerName);
     }
 
-    private List<FileCacheValue> getFilesByPartitions(List<HivePartition> 
partitions,
-                                                      boolean withCache, 
String bindBrokerName) {
+    public List<FileCacheValue> getFilesByPartitions(List<HivePartition> 
partitions,
+                                                     boolean withCache,
+                                                     boolean concurrent,
+                                                     String bindBrokerName) {
         long start = System.currentTimeMillis();
-        List<FileCacheKey> keys = partitions.stream().map(p -> {
-            FileCacheKey fileCacheKey = p.isDummyPartition()
-                    ? FileCacheKey.createDummyCacheKey(p.getDbName(), 
p.getTblName(), p.getPath(),
-                    p.getInputFormat(), bindBrokerName)
-                    : new FileCacheKey(p.getPath(), p.getInputFormat(), 
p.getPartitionValues(), bindBrokerName);
-            return fileCacheKey;
-        }).collect(Collectors.toList());
+        List<FileCacheKey> keys = partitions.stream().map(p -> 
p.isDummyPartition()
+                ? FileCacheKey.createDummyCacheKey(
+                        p.getDbName(), p.getTblName(), p.getPath(), 
p.getInputFormat(), bindBrokerName)
+                : new FileCacheKey(p.getPath(), p.getInputFormat(), 
p.getPartitionValues(), bindBrokerName))
+                .collect(Collectors.toList());
 
         List<FileCacheValue> fileLists;
         try {
             if (withCache) {
-                fileLists = 
fileCacheRef.get().getAll(keys).values().stream().collect(Collectors.toList());
+                fileLists = new 
ArrayList<>(fileCacheRef.get().getAll(keys).values());
             } else {
-                List<Pair<FileCacheKey, Future<FileCacheValue>>> pList = 
keys.stream()
-                        .map(key -> Pair.of(key, fileListingExecutor.submit(() 
-> loadFiles(key))))
-                        .collect(Collectors.toList());
-
-                fileLists = Lists.newArrayListWithExpectedSize(keys.size());
-                for (Pair<FileCacheKey, Future<FileCacheValue>> p : pList) {
-                    fileLists.add(p.second.get());
+                if (concurrent) {
+                    List<Future<FileCacheValue>> pList = keys.stream().map(
+                            key -> fileListingExecutor.submit(() -> 
loadFiles(key))).collect(Collectors.toList());
+                    fileLists = 
Lists.newArrayListWithExpectedSize(keys.size());
+                    for (Future<FileCacheValue> p : pList) {
+                        fileLists.add(p.get());
+                    }
+                } else {
+                    fileLists = 
keys.stream().map(this::loadFiles).collect(Collectors.toList());
                 }
             }
         } catch (ExecutionException e) {
@@ -810,7 +811,7 @@ public class HiveMetaStoreCache {
                     RemoteFileSystem fs = 
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
                             new FileSystemCache.FileSystemCacheKey(
                                     LocationPath.getFSIdentity(location, 
bindBrokerName),
-                                            properties, bindBrokerName));
+                                    properties, bindBrokerName));
                     List<RemoteFile> remoteFiles = new ArrayList<>();
                     Status status = fs.listFiles(location, false, remoteFiles);
                     if (status.ok()) {
@@ -837,7 +838,7 @@ public class HiveMetaStoreCache {
                     RemoteFileSystem fs = 
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
                             new FileSystemCache.FileSystemCacheKey(
                                     LocationPath.getFSIdentity(location, 
bindBrokerName),
-                                            properties, bindBrokerName));
+                                    properties, bindBrokerName));
                     List<RemoteFile> remoteFiles = new ArrayList<>();
                     Status status = fs.listFiles(location, false, remoteFiles);
                     if (status.ok()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
index 12024c25616..1970a48f2d4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
@@ -26,6 +26,7 @@ import org.apache.doris.catalog.PartitionItem;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.DebugUtil;
@@ -63,14 +64,17 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 public class HiveScanNode extends FileQueryScanNode {
@@ -98,9 +102,10 @@ public class HiveScanNode extends FileQueryScanNode {
     private SelectedPartitions selectedPartitions = null;
 
     private boolean partitionInit = false;
+    private final AtomicReference<UserException> batchException = new 
AtomicReference<>(null);
     private List<HivePartition> prunedPartitions;
-    private Iterator<HivePartition> prunedPartitionsIter;
-    private int numSplitsPerPartition = NUM_SPLITS_PER_PARTITION;
+    private final Semaphore splittersOnFlight = new 
Semaphore(NUM_SPLITTERS_ON_FLIGHT);
+    private final AtomicInteger numSplitsPerPartition = new 
AtomicInteger(NUM_SPLITS_PER_PARTITION);
 
     /**
      * * External file scan node for Query Hive table
@@ -140,7 +145,7 @@ public class HiveScanNode extends FileQueryScanNode {
         List<Type> partitionColumnTypes = hmsTable.getPartitionColumnTypes();
         if (!partitionColumnTypes.isEmpty()) {
             // partitioned table
-            boolean isPartitionPruned = selectedPartitions == null ? false : 
selectedPartitions.isPruned;
+            boolean isPartitionPruned = selectedPartitions != null && 
selectedPartitions.isPruned;
             Collection<PartitionItem> partitionItems;
             if (!isPartitionPruned) {
                 // partitionItems is null means that the partition is not 
pruned by Nereids,
@@ -232,36 +237,52 @@ public class HiveScanNode extends FileQueryScanNode {
     }
 
     @Override
-    public List<Split> getNextBatch(int maxBatchSize) throws UserException {
-        try {
-            HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
-                    .getMetaStoreCache((HMSExternalCatalog) 
hmsTable.getCatalog());
-            String bindBrokerName = hmsTable.getCatalog().bindBrokerName();
-            List<Split> allFiles = Lists.newArrayList();
-            int numPartitions = 0;
-            while (allFiles.size() < maxBatchSize && 
prunedPartitionsIter.hasNext()) {
-                List<HivePartition> partitions = new 
ArrayList<>(NUM_PARTITIONS_PER_LOOP);
-                for (int i = 0; i < NUM_PARTITIONS_PER_LOOP && 
prunedPartitionsIter.hasNext(); ++i) {
-                    partitions.add(prunedPartitionsIter.next());
-                    numPartitions++;
+    public void startSplit() {
+        if (prunedPartitions.isEmpty()) {
+            splitAssignment.finishSchedule();
+            return;
+        }
+        HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
+                .getMetaStoreCache((HMSExternalCatalog) hmsTable.getCatalog());
+        Executor scheduleExecutor = 
Env.getCurrentEnv().getExtMetaCacheMgr().getScheduleExecutor();
+        String bindBrokerName = hmsTable.getCatalog().bindBrokerName();
+        AtomicInteger numFinishedPartitions = new AtomicInteger(0);
+        CompletableFuture.runAsync(() -> {
+            for (HivePartition partition : prunedPartitions) {
+                if (batchException.get() != null || splitAssignment.isStop()) {
+                    break;
+                }
+                try {
+                    splittersOnFlight.acquire();
+                } catch (InterruptedException e) {
+                    batchException.set(new UserException(e.getMessage(), e));
+                    break;
                 }
-                getFileSplitByPartitions(cache, partitions, allFiles, 
bindBrokerName);
+                CompletableFuture.runAsync(() -> {
+                    try {
+                        List<Split> allFiles = Lists.newArrayList();
+                        getFileSplitByPartitions(cache, 
Collections.singletonList(partition), allFiles, bindBrokerName);
+                        if (allFiles.size() > numSplitsPerPartition.get()) {
+                            numSplitsPerPartition.set(allFiles.size());
+                        }
+                        splitAssignment.addToQueue(allFiles);
+                    } catch (IOException e) {
+                        batchException.set(new UserException(e.getMessage(), 
e));
+                    } finally {
+                        splittersOnFlight.release();
+                        if (batchException.get() != null) {
+                            splitAssignment.setException(batchException.get());
+                        }
+                        if (numFinishedPartitions.incrementAndGet() == 
prunedPartitions.size()) {
+                            splitAssignment.finishSchedule();
+                        }
+                    }
+                }, scheduleExecutor);
             }
-            if (allFiles.size() / numPartitions > numSplitsPerPartition) {
-                numSplitsPerPartition = allFiles.size() / numPartitions;
+            if (batchException.get() != null) {
+                splitAssignment.setException(batchException.get());
             }
-            return allFiles;
-        } catch (Throwable t) {
-            LOG.warn("get file split failed for table: {}", 
hmsTable.getName(), t);
-            throw new UserException(
-                    "get file split failed for table: " + hmsTable.getName() + 
", err: " + Util.getRootCauseMessage(t),
-                    t);
-        }
-    }
-
-    @Override
-    public boolean hasNext() {
-        return prunedPartitionsIter.hasNext();
+        });
     }
 
     @Override
@@ -272,7 +293,6 @@ public class HiveScanNode extends FileQueryScanNode {
             } catch (Exception e) {
                 return false;
             }
-            prunedPartitionsIter = prunedPartitions.iterator();
             partitionInit = true;
         }
         int numPartitions = 
ConnectContext.get().getSessionVariable().getNumPartitionsInBatchMode();
@@ -281,7 +301,7 @@ public class HiveScanNode extends FileQueryScanNode {
 
     @Override
     public int numApproximateSplits() {
-        return numSplitsPerPartition * prunedPartitions.size();
+        return numSplitsPerPartition.get() * prunedPartitions.size();
     }
 
     private void getFileSplitByPartitions(HiveMetaStoreCache cache, 
List<HivePartition> partitions,
@@ -290,7 +310,8 @@ public class HiveScanNode extends FileQueryScanNode {
         if (hiveTransaction != null) {
             fileCaches = getFileSplitByTransaction(cache, partitions, 
bindBrokerName);
         } else {
-            fileCaches = cache.getFilesByPartitionsWithCache(partitions, 
bindBrokerName);
+            boolean withCache = Config.max_external_file_cache_num > 0;
+            fileCaches = cache.getFilesByPartitions(partitions, withCache, 
withCache, bindBrokerName);
         }
         if (tableSample != null) {
             List<HiveMetaStoreCache.HiveFileStatus> hiveFileStatuses = 
selectFiles(fileCaches);
@@ -463,10 +484,7 @@ public class HiveScanNode extends FileQueryScanNode {
     public boolean pushDownAggNoGrouping(FunctionCallExpr aggExpr) {
 
         String aggFunctionName = aggExpr.getFnName().getFunction();
-        if (aggFunctionName.equalsIgnoreCase("COUNT")) {
-            return true;
-        }
-        return false;
+        return aggFunctionName.equalsIgnoreCase("COUNT");
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
index 8f2b3e598b9..82e21bcdd17 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
@@ -69,13 +69,15 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
@@ -101,9 +103,11 @@ public class HudiScanNode extends HiveScanNode {
     private HoodieTimeline timeline;
     private Option<String> snapshotTimestamp;
     private String queryInstant;
+
+    private final AtomicReference<UserException> batchException = new 
AtomicReference<>(null);
     private List<HivePartition> prunedPartitions;
-    private Iterator<HivePartition> prunedPartitionsIter;
-    private int numSplitsPerPartition = NUM_SPLITS_PER_PARTITION;
+    private final Semaphore splittersOnFlight = new 
Semaphore(NUM_SPLITTERS_ON_FLIGHT);
+    private final AtomicInteger numSplitsPerPartition = new 
AtomicInteger(NUM_SPLITS_PER_PARTITION);
 
     private boolean incrementalRead = false;
     private TableScanParams scanParams;
@@ -206,7 +210,6 @@ public class HudiScanNode extends HiveScanNode {
             Option<HoodieInstant> snapshotInstant = timeline.lastInstant();
             if (!snapshotInstant.isPresent()) {
                 prunedPartitions = Collections.emptyList();
-                prunedPartitionsIter = prunedPartitions.iterator();
                 partitionInit = true;
                 return;
             }
@@ -320,47 +323,47 @@ public class HudiScanNode extends HiveScanNode {
                 incrementalRelation.getEndTs())).collect(Collectors.toList());
     }
 
+    private void getPartitionSplits(HivePartition partition, List<Split> 
splits) throws IOException {
+        String globPath;
+        String partitionName;
+        if (partition.isDummyPartition()) {
+            partitionName = "";
+            globPath = hudiClient.getBasePathV2().toString() + "/*";
+        } else {
+            partitionName = 
FSUtils.getRelativePartitionPath(hudiClient.getBasePathV2(),
+                    new Path(partition.getPath()));
+            globPath = String.format("%s/%s/*", 
hudiClient.getBasePathV2().toString(), partitionName);
+        }
+        List<FileStatus> statuses = FSUtils.getGlobStatusExcludingMetaFolder(
+                hudiClient.getRawFs(), new Path(globPath));
+        HoodieTableFileSystemView fileSystemView = new 
HoodieTableFileSystemView(hudiClient,
+                timeline, statuses.toArray(new FileStatus[0]));
+
+        if (isCowOrRoTable) {
+            fileSystemView.getLatestBaseFilesBeforeOrOn(partitionName, 
queryInstant).forEach(baseFile -> {
+                noLogsSplitNum.incrementAndGet();
+                String filePath = baseFile.getPath();
+                long fileSize = baseFile.getFileSize();
+                // Need add hdfs host to location
+                LocationPath locationPath = new LocationPath(filePath, 
hmsTable.getCatalogProperties());
+                Path splitFilePath = locationPath.toStorageLocation();
+                splits.add(new FileSplit(splitFilePath, 0, fileSize, fileSize,
+                        new String[0], partition.getPartitionValues()));
+            });
+        } else {
+            fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionName, 
queryInstant)
+                    .forEach(fileSlice -> splits.add(
+                            generateHudiSplit(fileSlice, 
partition.getPartitionValues(), queryInstant)));
+        }
+    }
+
     private void getPartitionSplits(List<HivePartition> partitions, 
List<Split> splits) {
         Executor executor = 
Env.getCurrentEnv().getExtMetaCacheMgr().getFileListingExecutor();
         CountDownLatch countDownLatch = new CountDownLatch(partitions.size());
         AtomicReference<Throwable> throwable = new AtomicReference<>();
         partitions.forEach(partition -> executor.execute(() -> {
             try {
-                String globPath;
-                String partitionName = "";
-                if (partition.isDummyPartition()) {
-                    globPath = hudiClient.getBasePathV2().toString() + "/*";
-                } else {
-                    partitionName = 
FSUtils.getRelativePartitionPath(hudiClient.getBasePathV2(),
-                            new Path(partition.getPath()));
-                    globPath = String.format("%s/%s/*", 
hudiClient.getBasePathV2().toString(), partitionName);
-                }
-                List<FileStatus> statuses;
-                try {
-                    statuses = 
FSUtils.getGlobStatusExcludingMetaFolder(hudiClient.getRawFs(),
-                            new Path(globPath));
-                } catch (IOException e) {
-                    throw new RuntimeException("Failed to get hudi file 
statuses on path: " + globPath, e);
-                }
-                HoodieTableFileSystemView fileSystemView = new 
HoodieTableFileSystemView(hudiClient,
-                        timeline, statuses.toArray(new FileStatus[0]));
-
-                if (isCowOrRoTable) {
-                    fileSystemView.getLatestBaseFilesBeforeOrOn(partitionName, 
queryInstant).forEach(baseFile -> {
-                        noLogsSplitNum.incrementAndGet();
-                        String filePath = baseFile.getPath();
-                        long fileSize = baseFile.getFileSize();
-                        // Need add hdfs host to location
-                        LocationPath locationPath = new LocationPath(filePath, 
hmsTable.getCatalogProperties());
-                        Path splitFilePath = locationPath.toStorageLocation();
-                        splits.add(new FileSplit(splitFilePath, 0, fileSize, 
fileSize,
-                                new String[0], 
partition.getPartitionValues()));
-                    });
-                } else {
-                    
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionName, queryInstant)
-                            .forEach(fileSlice -> splits.add(
-                                    generateHudiSplit(fileSlice, 
partition.getPartitionValues(), queryInstant)));
-                }
+                getPartitionSplits(partition, splits);
             } catch (Throwable t) {
                 throwable.set(t);
             } finally {
@@ -394,26 +397,48 @@ public class HudiScanNode extends HiveScanNode {
     }
 
     @Override
-    public List<Split> getNextBatch(int maxBatchSize) throws UserException {
-        List<Split> splits = Collections.synchronizedList(new ArrayList<>());
-        int numPartitions = 0;
-        while (splits.size() < maxBatchSize && prunedPartitionsIter.hasNext()) 
{
-            List<HivePartition> partitions = new 
ArrayList<>(NUM_PARTITIONS_PER_LOOP);
-            for (int i = 0; i < NUM_PARTITIONS_PER_LOOP && 
prunedPartitionsIter.hasNext(); ++i) {
-                partitions.add(prunedPartitionsIter.next());
-                numPartitions++;
-            }
-            getPartitionSplits(partitions, splits);
+    public void startSplit() {
+        if (prunedPartitions.isEmpty()) {
+            splitAssignment.finishSchedule();
+            return;
         }
-        if (splits.size() / numPartitions > numSplitsPerPartition) {
-            numSplitsPerPartition = splits.size() / numPartitions;
-        }
-        return splits;
-    }
-
-    @Override
-    public boolean hasNext() {
-        return prunedPartitionsIter.hasNext();
+        AtomicInteger numFinishedPartitions = new AtomicInteger(0);
+        CompletableFuture.runAsync(() -> {
+            for (HivePartition partition : prunedPartitions) {
+                if (batchException.get() != null || splitAssignment.isStop()) {
+                    break;
+                }
+                try {
+                    splittersOnFlight.acquire();
+                } catch (InterruptedException e) {
+                    batchException.set(new UserException(e.getMessage(), e));
+                    break;
+                }
+                CompletableFuture.runAsync(() -> {
+                    try {
+                        List<Split> allFiles = Lists.newArrayList();
+                        getPartitionSplits(partition, allFiles);
+                        if (allFiles.size() > numSplitsPerPartition.get()) {
+                            numSplitsPerPartition.set(allFiles.size());
+                        }
+                        splitAssignment.addToQueue(allFiles);
+                    } catch (IOException e) {
+                        batchException.set(new UserException(e.getMessage(), 
e));
+                    } finally {
+                        splittersOnFlight.release();
+                        if (batchException.get() != null) {
+                            splitAssignment.setException(batchException.get());
+                        }
+                        if (numFinishedPartitions.incrementAndGet() == 
prunedPartitions.size()) {
+                            splitAssignment.finishSchedule();
+                        }
+                    }
+                });
+            }
+            if (batchException.get() != null) {
+                splitAssignment.setException(batchException.get());
+            }
+        });
     }
 
     @Override
@@ -426,7 +451,6 @@ public class HudiScanNode extends HiveScanNode {
             prunedPartitions = HiveMetaStoreClientHelper.ugiDoAs(
                     HiveMetaStoreClientHelper.getConfiguration(hmsTable),
                     () -> getPrunedPartitions(hudiClient, snapshotTimestamp));
-            prunedPartitionsIter = prunedPartitions.iterator();
             partitionInit = true;
         }
         int numPartitions = 
ConnectContext.get().getSessionVariable().getNumPartitionsInBatchMode();
@@ -435,7 +459,7 @@ public class HudiScanNode extends HiveScanNode {
 
     @Override
     public int numApproximateSplits() {
-        return numSplitsPerPartition * prunedPartitions.size();
+        return numSplitsPerPartition.get() * prunedPartitions.size();
     }
 
     private HudiSplit generateHudiSplit(FileSlice fileSlice, List<String> 
partitionValues, String queryInstant) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index a11f3300b9d..9c896bd7504 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -44,6 +44,7 @@ import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.UserException;
 import org.apache.doris.datasource.FederationBackendPolicy;
+import org.apache.doris.datasource.SplitAssignment;
 import org.apache.doris.datasource.SplitGenerator;
 import org.apache.doris.datasource.SplitSource;
 import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
@@ -84,7 +85,7 @@ import java.util.stream.IntStream;
 public abstract class ScanNode extends PlanNode implements SplitGenerator {
     private static final Logger LOG = LogManager.getLogger(ScanNode.class);
     protected static final int NUM_SPLITS_PER_PARTITION = 10;
-    protected static final int NUM_PARTITIONS_PER_LOOP = 100;
+    protected static final int NUM_SPLITTERS_ON_FLIGHT = 
Config.max_external_cache_loader_thread_pool_size;
     protected final TupleDescriptor desc;
     // for distribution prunner
     protected Map<String, PartitionColumnFilter> columnFilters = 
Maps.newHashMap();
@@ -95,6 +96,7 @@ public abstract class ScanNode extends PlanNode implements 
SplitGenerator {
     protected List<TScanRangeLocations> scanRangeLocations = 
Lists.newArrayList();
     protected List<SplitSource> splitSources = Lists.newArrayList();
     protected PartitionInfo partitionsInfo = null;
+    protected SplitAssignment splitAssignment = null;
 
     // create a mapping between output slot's id and project expr
     Map<SlotId, Expr> outputSlotToProjectExpr = new HashMap<>();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 83acbba5d16..c6755746106 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -634,6 +634,9 @@ public class Coordinator implements CoordInterface {
 
     @Override
     public void close() {
+        for (ScanNode scanNode : scanNodes) {
+            scanNode.stop();
+        }
         if (queryQueue != null && queueToken != null) {
             try {
                 queryQueue.releaseAndNotify(queueToken);
@@ -1208,6 +1211,9 @@ public class Coordinator implements CoordInterface {
 
     @Override
     public void cancel(Status cancelReason) {
+        for (ScanNode scanNode : scanNodes) {
+            scanNode.stop();
+        }
         if (cancelReason.ok()) {
             throw new RuntimeException("Should use correct cancel reason, but 
it is "
                     + cancelReason.toString());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to