This is an automated email from the ASF dual-hosted git repository. ashingau pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 04008333d40 [opt](split) generate and get split batch concurrently (#36045) 04008333d40 is described below commit 04008333d4081d3502330c8c78bd32634699aa6f Author: Ashin Gau <ashin...@users.noreply.github.com> AuthorDate: Wed Jun 19 16:15:47 2024 +0800 [opt](split) generate and get split batch concurrently (#36045) ## Proposed changes Generate and get split batch concurrently. `SplitSource.getNextBatch` remove the synchronization, and make each get their splits concurrently, and `SplitAssignment` generates splits asynchronously. --- be/src/common/config.cpp | 2 +- be/src/vec/exec/scan/split_source_connector.cpp | 1 + be/src/vec/exec/scan/split_source_connector.h | 6 + be/src/vec/exec/scan/vfile_scanner.cpp | 2 + be/src/vec/exec/scan/vfile_scanner.h | 1 + .../org/apache/doris/common/util/LocationPath.java | 26 ++-- .../apache/doris/datasource/ExternalCatalog.java | 6 +- .../doris/datasource/ExternalMetaCacheMgr.java | 10 ++ .../apache/doris/datasource/FileQueryScanNode.java | 25 +++- .../org/apache/doris/datasource/FileScanNode.java | 6 +- .../apache/doris/datasource/SplitAssignment.java | 158 ++++++++++++++++----- .../apache/doris/datasource/SplitGenerator.java | 26 ++-- .../org/apache/doris/datasource/SplitSource.java | 64 +++++---- .../doris/datasource/hive/HiveMetaStoreCache.java | 45 +++--- .../doris/datasource/hive/source/HiveScanNode.java | 96 ++++++++----- .../doris/datasource/hudi/source/HudiScanNode.java | 144 +++++++++++-------- .../java/org/apache/doris/planner/ScanNode.java | 4 +- .../main/java/org/apache/doris/qe/Coordinator.java | 6 + 18 files changed, 402 insertions(+), 226 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index b7fdac5f660..5eb0e8d26ba 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -248,7 +248,7 @@ DEFINE_Validator(doris_scanner_thread_pool_thread_num, [](const int config) -> b } return true; }); -DEFINE_Int32(remote_split_source_batch_size, "1024"); +DEFINE_Int32(remote_split_source_batch_size, "10240"); DEFINE_Int32(doris_max_remote_scanner_thread_pool_thread_num, "-1"); // number of olap scanner thread pool queue size DEFINE_Int32(doris_scanner_thread_pool_queue_size, "102400"); diff --git a/be/src/vec/exec/scan/split_source_connector.cpp b/be/src/vec/exec/scan/split_source_connector.cpp index 2d35d3796bc..fae65543e53 100644 --- a/be/src/vec/exec/scan/split_source_connector.cpp +++ b/be/src/vec/exec/scan/split_source_connector.cpp @@ -45,6 +45,7 @@ Status RemoteSplitSourceConnector::get_next(bool* has_next, TFileRangeDesc* rang std::lock_guard<std::mutex> l(_range_lock); *has_next = false; if (_scan_index == _scan_ranges.size() && !_last_batch) { + SCOPED_RAW_TIMER(&_get_split_timer); Status coord_status; FrontendServiceConnection coord(_state->exec_env()->frontend_client_cache(), _state->get_query_ctx()->coord_addr, &coord_status); diff --git a/be/src/vec/exec/scan/split_source_connector.h b/be/src/vec/exec/scan/split_source_connector.h index cf358846b30..bfda961df34 100644 --- a/be/src/vec/exec/scan/split_source_connector.h +++ b/be/src/vec/exec/scan/split_source_connector.h @@ -43,6 +43,8 @@ public: virtual int num_scan_ranges() = 0; virtual TFileScanRangeParams* get_params() = 0; + + virtual int64_t get_split_time() { return 0; } }; /** @@ -95,6 +97,8 @@ private: int _scan_index = 0; int _range_index = 0; + int64_t _get_split_timer = 0; + public: RemoteSplitSourceConnector(RuntimeState* state, int64 split_source_id, int num_splits) : _state(state), _split_source_id(split_source_id), _num_splits(num_splits) {} @@ -110,6 +114,8 @@ public: TFileScanRangeParams* get_params() override { LOG(FATAL) << "Unreachable, params is got by file_scan_range_params_map"; } + + int64_t get_split_time() override { return _get_split_timer; } }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index f6f029b9de0..4932e164649 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -138,6 +138,7 @@ Status VFileScanner::prepare( _file_counter = ADD_COUNTER(_local_state->scanner_profile(), "FileNumber", TUnit::UNIT); _has_fully_rf_file_counter = ADD_COUNTER(_local_state->scanner_profile(), "HasFullyRfFileNumber", TUnit::UNIT); + _get_split_timer = ADD_TIMER(_local_state->scanner_profile(), "GetSplitTime"); _file_cache_statistics.reset(new io::FileCacheStatistics()); _io_ctx.reset(new io::IOContext()); @@ -1162,6 +1163,7 @@ Status VFileScanner::close(RuntimeState* state) { if (_cur_reader) { RETURN_IF_ERROR(_cur_reader->close()); } + COUNTER_UPDATE(_get_split_timer, _split_source->get_split_time()); RETURN_IF_ERROR(VScanner::close(state)); return Status::OK(); diff --git a/be/src/vec/exec/scan/vfile_scanner.h b/be/src/vec/exec/scan/vfile_scanner.h index 332bdfe11e1..61d75d65683 100644 --- a/be/src/vec/exec/scan/vfile_scanner.h +++ b/be/src/vec/exec/scan/vfile_scanner.h @@ -180,6 +180,7 @@ private: RuntimeProfile::Counter* _empty_file_counter = nullptr; RuntimeProfile::Counter* _file_counter = nullptr; RuntimeProfile::Counter* _has_fully_rf_file_counter = nullptr; + RuntimeProfile::Counter* _get_split_timer = nullptr; const std::unordered_map<std::string, int>* _col_name_to_slot_id = nullptr; // single slot filter conjuncts diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java index 38b5250a157..dd1641126bf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java @@ -41,7 +41,7 @@ import java.net.URLEncoder; import java.nio.charset.StandardCharsets; import java.nio.file.InvalidPathException; import java.nio.file.Paths; -import java.util.HashMap; +import java.util.Collections; import java.util.Map; import java.util.UUID; @@ -74,10 +74,14 @@ public class LocationPath { } private LocationPath(String location) { - this(location, new HashMap<>()); + this(location, Collections.emptyMap(), true); } public LocationPath(String location, Map<String, String> props) { + this(location, props, true); + } + + public LocationPath(String location, Map<String, String> props, boolean convertPath) { String scheme = parseScheme(location).toLowerCase(); if (scheme.isEmpty()) { locationType = LocationType.NOSCHEME; @@ -88,7 +92,7 @@ public class LocationPath { locationType = LocationType.HDFS; // Need add hdfs host to location String host = props.get(HdfsResource.DSF_NAMESERVICES); - this.location = normalizedHdfsPath(location, host); + this.location = convertPath ? normalizedHdfsPath(location, host) : location; break; case FeConstants.FS_PREFIX_S3: locationType = LocationType.S3; @@ -96,22 +100,22 @@ public class LocationPath { break; case FeConstants.FS_PREFIX_S3A: locationType = LocationType.S3A; - this.location = convertToS3(location); + this.location = convertPath ? convertToS3(location) : location; break; case FeConstants.FS_PREFIX_S3N: // include the check for multi locations and in a table, such as both s3 and hdfs are in a table. locationType = LocationType.S3N; - this.location = convertToS3(location); + this.location = convertPath ? convertToS3(location) : location; break; case FeConstants.FS_PREFIX_BOS: locationType = LocationType.BOS; // use s3 client to access - this.location = convertToS3(location); + this.location = convertPath ? convertToS3(location) : location; break; case FeConstants.FS_PREFIX_GCS: locationType = LocationType.GCS; // use s3 client to access - this.location = convertToS3(location); + this.location = convertPath ? convertToS3(location) : location; break; case FeConstants.FS_PREFIX_OSS: if (isHdfsOnOssEndpoint(location)) { @@ -119,7 +123,7 @@ public class LocationPath { this.location = location; } else { if (useS3EndPoint(props)) { - this.location = convertToS3(location); + this.location = convertPath ? convertToS3(location) : location; } else { this.location = location; } @@ -128,7 +132,7 @@ public class LocationPath { break; case FeConstants.FS_PREFIX_COS: if (useS3EndPoint(props)) { - this.location = convertToS3(location); + this.location = convertPath ? convertToS3(location) : location; } else { this.location = location; } @@ -136,7 +140,7 @@ public class LocationPath { break; case FeConstants.FS_PREFIX_OBS: if (useS3EndPoint(props)) { - this.location = convertToS3(location); + this.location = convertPath ? convertToS3(location) : location; } else { this.location = location; } @@ -331,7 +335,7 @@ public class LocationPath { if (location == null || location.isEmpty()) { return null; } - LocationPath locationPath = new LocationPath(location); + LocationPath locationPath = new LocationPath(location, Collections.emptyMap(), false); return locationPath.getTFileTypeForBE(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index a72bd709541..5bdbe594059 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -792,11 +792,7 @@ public abstract class ExternalCatalog } public String bindBrokerName() { - Map<String, String> properties = catalogProperty.getProperties(); - if (properties.containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) { - return properties.get(HMSExternalCatalog.BIND_BROKER_NAME); - } - return null; + return catalogProperty.getProperties().get(HMSExternalCatalog.BIND_BROKER_NAME); } // ATTN: this method only return all cached databases. diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java index 06da3c9d5e1..513fc951672 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java @@ -77,6 +77,7 @@ public class ExternalMetaCacheMgr { private ExecutorService rowCountRefreshExecutor; private ExecutorService commonRefreshExecutor; private ExecutorService fileListingExecutor; + private ExecutorService scheduleExecutor; // catalog id -> HiveMetaStoreCache private final Map<Long, HiveMetaStoreCache> cacheMap = Maps.newConcurrentMap(); @@ -109,6 +110,11 @@ public class ExternalMetaCacheMgr { Config.max_external_cache_loader_thread_pool_size * 1000, "FileListingExecutor", 10, true); + scheduleExecutor = ThreadPoolManager.newDaemonFixedThreadPool( + Config.max_external_cache_loader_thread_pool_size, + Config.max_external_cache_loader_thread_pool_size * 1000, + "scheduleExecutor", 10, true); + fsCache = new FileSystemCache(); rowCountCache = new ExternalRowCountCache(rowCountRefreshExecutor); @@ -121,6 +127,10 @@ public class ExternalMetaCacheMgr { return fileListingExecutor; } + public ExecutorService getScheduleExecutor() { + return scheduleExecutor; + } + public HiveMetaStoreCache getMetaStoreCache(HMSExternalCatalog catalog) { HiveMetaStoreCache cache = cacheMap.get(catalog.getId()); if (cache == null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java index ce5e7991515..9039f1a8c58 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java @@ -317,18 +317,19 @@ public abstract class FileQueryScanNode extends FileScanNode { if (isBatchMode()) { // File splits are generated lazily, and fetched by backends while scanning. // Only provide the unique ID of split source to backend. - SplitAssignment splitAssignment = new SplitAssignment(backendPolicy, this); + splitAssignment = new SplitAssignment( + backendPolicy, this, this::splitToScanRange, locationProperties, pathPartitionKeys); splitAssignment.init(); if (ConnectContext.get().getExecutor() != null) { ConnectContext.get().getExecutor().getSummaryProfile().setGetSplitsFinishTime(); } - if (splitAssignment.getCurrentAssignment().isEmpty() && !(getLocationType() == TFileType.FILE_STREAM)) { + if (splitAssignment.getSampleSplit() == null && !(getLocationType() == TFileType.FILE_STREAM)) { return; } - inputSplitsNum = splitAssignment.numApproximateSplits(); + inputSplitsNum = numApproximateSplits(); TFileType locationType; - FileSplit fileSplit = (FileSplit) splitAssignment.getCurrentAssignment().values().iterator().next(); + FileSplit fileSplit = (FileSplit) splitAssignment.getSampleSplit(); if (fileSplit instanceof IcebergSplit && ((IcebergSplit) fileSplit).getConfig().containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) { locationType = TFileType.FILE_BROKER; @@ -337,10 +338,9 @@ public abstract class FileQueryScanNode extends FileScanNode { } totalFileSize = fileSplit.getLength() * inputSplitsNum; // Not accurate, only used to estimate concurrency. - int numSplitsPerBE = splitAssignment.numApproximateSplits() / backendPolicy.numBackends(); + int numSplitsPerBE = numApproximateSplits() / backendPolicy.numBackends(); for (Backend backend : backendPolicy.getBackends()) { - SplitSource splitSource = new SplitSource( - this::splitToScanRange, backend, locationProperties, splitAssignment, pathPartitionKeys); + SplitSource splitSource = new SplitSource(backend, splitAssignment); splitSources.add(splitSource); Env.getCurrentEnv().getSplitSourceManager().registerSplitSource(splitSource); TScanRangeLocations curLocations = newLocations(); @@ -582,4 +582,15 @@ public abstract class FileQueryScanNode extends FileScanNode { protected abstract TableIf getTargetTable() throws UserException; protected abstract Map<String, String> getLocationProperties() throws UserException; + + @Override + public void stop() { + if (splitAssignment != null) { + splitAssignment.stop(); + SplitSourceManager manager = Env.getCurrentEnv().getSplitSourceManager(); + for (Long sourceId : splitAssignment.getSources()) { + manager.removeSplitSource(sourceId); + } + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java index 92cdfbcfa1f..bb6865582fc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java @@ -110,7 +110,11 @@ public abstract class FileScanNode extends ExternalScanNode { output.append(getRuntimeFilterExplainString(false)); } - output.append(prefix).append("inputSplitNum=").append(inputSplitsNum).append(", totalFileSize=") + output.append(prefix); + if (isBatchMode()) { + output.append("(approximate)"); + } + output.append("inputSplitNum=").append(inputSplitsNum).append(", totalFileSize=") .append(totalFileSize).append(", scanRanges=").append(scanRangeLocations.size()).append("\n"); output.append(prefix).append("partition=").append(readPartitionNum).append("/").append(totalPartitionNum) .append("\n"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java index f41eaba7dd8..928854b91d1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java @@ -18,68 +18,160 @@ package org.apache.doris.datasource; import org.apache.doris.common.UserException; -import org.apache.doris.qe.ConnectContext; import org.apache.doris.spi.Split; import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TScanRangeLocations; -import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Multimap; +import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; /** * When file splits are supplied in batch mode, splits are generated lazily and assigned in each call of `getNextBatch`. * `SplitGenerator` provides the file splits, and `FederationBackendPolicy` assigns these splits to backends. */ public class SplitAssignment { - // magic number to estimate how many splits are allocated to BE in each batch - private static final int NUM_SPLITS_PER_BE = 1024; - // magic number to estimate how many splits are generated of each partition in each batch. - private static final int NUM_SPLITS_PER_PARTITION = 10; - + private final Set<Long> sources = new HashSet<>(); private final FederationBackendPolicy backendPolicy; private final SplitGenerator splitGenerator; - // Store the current assignment of file splits - private final Multimap<Backend, Split> assignment; - private final int maxBatchSize; + private final ConcurrentHashMap<Backend, BlockingQueue<Collection<TScanRangeLocations>>> assignment + = new ConcurrentHashMap<>(); + private final SplitToScanRange splitToScanRange; + private final Map<String, String> locationProperties; + private final List<String> pathPartitionKeys; + private final Object assignLock = new Object(); + private Split sampleSplit = null; + private final AtomicBoolean isStop = new AtomicBoolean(false); + private final AtomicBoolean scheduleFinished = new AtomicBoolean(false); + + private UserException exception = null; - public SplitAssignment(FederationBackendPolicy backendPolicy, SplitGenerator splitGenerator) { + public SplitAssignment( + FederationBackendPolicy backendPolicy, + SplitGenerator splitGenerator, + SplitToScanRange splitToScanRange, + Map<String, String> locationProperties, + List<String> pathPartitionKeys) { this.backendPolicy = backendPolicy; this.splitGenerator = splitGenerator; - this.assignment = ArrayListMultimap.create(); - int numPartitions = ConnectContext.get().getSessionVariable().getNumPartitionsInBatchMode(); - maxBatchSize = Math.min(NUM_SPLITS_PER_PARTITION * numPartitions, - NUM_SPLITS_PER_BE * backendPolicy.numBackends()); + this.splitToScanRange = splitToScanRange; + this.locationProperties = locationProperties; + this.pathPartitionKeys = pathPartitionKeys; } public void init() throws UserException { - if (assignment.isEmpty() && splitGenerator.hasNext()) { - assignment.putAll(backendPolicy.computeScanRangeAssignment(splitGenerator.getNextBatch(maxBatchSize))); + splitGenerator.startSplit(); + synchronized (assignLock) { + while (sampleSplit == null && waitFirstSplit()) { + try { + assignLock.wait(100); + } catch (InterruptedException e) { + throw new UserException(e.getMessage(), e); + } + } + } + if (exception != null) { + throw exception; + } + } + + private boolean waitFirstSplit() { + return !scheduleFinished.get() && !isStop.get() && exception == null; + } + + private void appendBatch(Multimap<Backend, Split> batch) throws UserException { + for (Backend backend : batch.keySet()) { + Collection<Split> splits = batch.get(backend); + List<TScanRangeLocations> locations = new ArrayList<>(splits.size()); + for (Split split : splits) { + locations.add(splitToScanRange.getScanRange(backend, locationProperties, split, pathPartitionKeys)); + } + if (!assignment.computeIfAbsent(backend, be -> new LinkedBlockingQueue<>()).offer(locations)) { + throw new UserException("Failed to offer batch split"); + } } } - public Multimap<Backend, Split> getCurrentAssignment() { - return assignment; + public void registerSource(long uniqueId) { + sources.add(uniqueId); + } + + public Set<Long> getSources() { + return sources; } - public int numApproximateSplits() { - return splitGenerator.numApproximateSplits(); + public Split getSampleSplit() { + return sampleSplit; } - public synchronized Collection<Split> getNextBatch(Backend backend) throws UserException { - // Each call should consume all splits - Collection<Split> splits = assignment.removeAll(backend); - while (splits.isEmpty()) { - // Get the next batch of splits, and assign to backends - // If there is data skewing, it maybe causes splits to accumulate on some BE - if (!splitGenerator.hasNext()) { - return splits; + public void addToQueue(List<Split> splits) { + if (splits.isEmpty()) { + return; + } + Multimap<Backend, Split> batch = null; + synchronized (assignLock) { + if (sampleSplit == null) { + sampleSplit = splits.get(0); + assignLock.notify(); + } + try { + batch = backendPolicy.computeScanRangeAssignment(splits); + } catch (UserException e) { + exception = e; + } + } + if (batch != null) { + try { + appendBatch(batch); + } catch (UserException e) { + exception = e; } - // todo: In each batch, it's to find the optimal assignment for partial splits, - // how to solve the global data skew? - assignment.putAll(backendPolicy.computeScanRangeAssignment(splitGenerator.getNextBatch(maxBatchSize))); - splits = assignment.removeAll(backend); + } + } + + private void notifyAssignment() { + synchronized (assignLock) { + assignLock.notify(); + } + } + + public BlockingQueue<Collection<TScanRangeLocations>> getAssignedSplits(Backend backend) throws UserException { + if (exception != null) { + throw exception; + } + BlockingQueue<Collection<TScanRangeLocations>> splits = assignment.computeIfAbsent(backend, + be -> new LinkedBlockingQueue<>()); + if (scheduleFinished.get() && splits.isEmpty() || isStop.get()) { + return null; } return splits; } + + public void setException(UserException e) { + exception = e; + notifyAssignment(); + } + + public void finishSchedule() { + scheduleFinished.set(true); + notifyAssignment(); + } + + public void stop() { + isStop.set(true); + notifyAssignment(); + } + + public boolean isStop() { + return isStop.get(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java index b819c7f9ef2..c4a373bc85b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java @@ -28,27 +28,12 @@ import java.util.List; * The consumer should call `getNextBatch` to fetch the next batch of splits. */ public interface SplitGenerator { - /** - * Get the next batch of splits. If the producer(e.g. ScanNode) doesn't support batch mode, - * should throw user exceptions. - */ - default List<Split> getNextBatch(int maxBatchSize) throws UserException { - throw new NotImplementedException("Should implement getNextBatch if in batch mode."); - } - /** * Get all file splits if the producer doesn't support batch mode. */ default List<Split> getSplits() throws UserException { // todo: remove this interface if batch mode is stable - throw new NotImplementedException("Scan node sub class need to implement getSplits interface."); - } - - /** - * `getNextBatch` should return empty list even if `hasNext` returns false. - */ - default boolean hasNext() { - return false; + throw new NotImplementedException("Not implement"); } /** @@ -65,4 +50,13 @@ public interface SplitGenerator { default int numApproximateSplits() { return -1; } + + default void startSplit() { + } + + /** + * Close split generator, and stop the split executor + */ + default void stop() { + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitSource.java index 74e6aa88ba3..dce135292ec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitSource.java @@ -18,16 +18,17 @@ package org.apache.doris.datasource; import org.apache.doris.common.UserException; -import org.apache.doris.spi.Split; import org.apache.doris.system.Backend; import org.apache.doris.thrift.TScanRangeLocations; -import java.util.ArrayList; +import com.google.common.collect.Lists; + import java.util.Collection; import java.util.Collections; -import java.util.Iterator; import java.util.List; -import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; /** @@ -42,28 +43,20 @@ import java.util.concurrent.atomic.AtomicLong; */ public class SplitSource { private static final AtomicLong UNIQUE_ID_GENERATOR = new AtomicLong(0); + private static final long WAIT_TIME_OUT = 100; // 100ms + private static final long MAX_WAIT_TIME_OUT = 500; // 500ms private final long uniqueId; - private final SplitToScanRange splitToScanRange; private final Backend backend; - private final Map<String, String> locationProperties; - private final List<String> pathPartitionKeys; private final SplitAssignment splitAssignment; - private Iterator<Split> splitIterator = null; - private boolean isLastBatch = false; + private final AtomicBoolean isLastBatch; - public SplitSource( - SplitToScanRange splitToScanRange, - Backend backend, - Map<String, String> locationProperties, - SplitAssignment splitAssignment, - List<String> pathPartitionKeys) { + public SplitSource(Backend backend, SplitAssignment splitAssignment) { this.uniqueId = UNIQUE_ID_GENERATOR.getAndIncrement(); - this.splitToScanRange = splitToScanRange; this.backend = backend; - this.locationProperties = locationProperties; - this.pathPartitionKeys = pathPartitionKeys; this.splitAssignment = splitAssignment; + this.isLastBatch = new AtomicBoolean(false); + splitAssignment.registerSource(uniqueId); } public long getUniqueId() { @@ -73,22 +66,33 @@ public class SplitSource { /** * Get the next batch of file splits. If there's no more split, return empty list. */ - public synchronized List<TScanRangeLocations> getNextBatch(int maxBatchSize) throws UserException { - if (isLastBatch) { + public List<TScanRangeLocations> getNextBatch(int maxBatchSize) throws UserException { + if (isLastBatch.get()) { return Collections.emptyList(); } - List<TScanRangeLocations> scanRanges = new ArrayList<>(maxBatchSize); - for (int i = 0; i < maxBatchSize; i++) { - if (splitIterator == null || !splitIterator.hasNext()) { - Collection<Split> splits = splitAssignment.getNextBatch(backend); - if (splits.isEmpty()) { - isLastBatch = true; - return scanRanges; + List<TScanRangeLocations> scanRanges = Lists.newArrayListWithExpectedSize(maxBatchSize); + long maxTimeOut = 0; + while (scanRanges.size() < maxBatchSize) { + BlockingQueue<Collection<TScanRangeLocations>> splits = splitAssignment.getAssignedSplits(backend); + if (splits == null) { + isLastBatch.set(true); + break; + } + while (scanRanges.size() < maxBatchSize) { + try { + Collection<TScanRangeLocations> splitCollection = splits.poll(WAIT_TIME_OUT, TimeUnit.MILLISECONDS); + if (splitCollection == null) { + maxTimeOut += WAIT_TIME_OUT; + break; + } + scanRanges.addAll(splitCollection); + } catch (InterruptedException e) { + throw new UserException("Failed to get next batch of splits", e); } - splitIterator = splits.iterator(); } - scanRanges.add(splitToScanRange.getScanRange( - backend, locationProperties, splitIterator.next(), pathPartitionKeys)); + if (maxTimeOut >= MAX_WAIT_TIME_OUT && !scanRanges.isEmpty()) { + break; + } } return scanRanges; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index a22e951be40..b76b4675dee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -30,7 +30,6 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.CacheFactory; import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; -import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.security.authentication.AuthenticationConfig; import org.apache.doris.common.util.CacheBulkLoader; @@ -468,37 +467,39 @@ public class HiveMetaStoreCache { public List<FileCacheValue> getFilesByPartitionsWithCache(List<HivePartition> partitions, String bindBrokerName) { - return getFilesByPartitions(partitions, true, bindBrokerName); + return getFilesByPartitions(partitions, true, true, bindBrokerName); } public List<FileCacheValue> getFilesByPartitionsWithoutCache(List<HivePartition> partitions, String bindBrokerName) { - return getFilesByPartitions(partitions, false, bindBrokerName); + return getFilesByPartitions(partitions, false, true, bindBrokerName); } - private List<FileCacheValue> getFilesByPartitions(List<HivePartition> partitions, - boolean withCache, String bindBrokerName) { + public List<FileCacheValue> getFilesByPartitions(List<HivePartition> partitions, + boolean withCache, + boolean concurrent, + String bindBrokerName) { long start = System.currentTimeMillis(); - List<FileCacheKey> keys = partitions.stream().map(p -> { - FileCacheKey fileCacheKey = p.isDummyPartition() - ? FileCacheKey.createDummyCacheKey(p.getDbName(), p.getTblName(), p.getPath(), - p.getInputFormat(), bindBrokerName) - : new FileCacheKey(p.getPath(), p.getInputFormat(), p.getPartitionValues(), bindBrokerName); - return fileCacheKey; - }).collect(Collectors.toList()); + List<FileCacheKey> keys = partitions.stream().map(p -> p.isDummyPartition() + ? FileCacheKey.createDummyCacheKey( + p.getDbName(), p.getTblName(), p.getPath(), p.getInputFormat(), bindBrokerName) + : new FileCacheKey(p.getPath(), p.getInputFormat(), p.getPartitionValues(), bindBrokerName)) + .collect(Collectors.toList()); List<FileCacheValue> fileLists; try { if (withCache) { - fileLists = fileCacheRef.get().getAll(keys).values().stream().collect(Collectors.toList()); + fileLists = new ArrayList<>(fileCacheRef.get().getAll(keys).values()); } else { - List<Pair<FileCacheKey, Future<FileCacheValue>>> pList = keys.stream() - .map(key -> Pair.of(key, fileListingExecutor.submit(() -> loadFiles(key)))) - .collect(Collectors.toList()); - - fileLists = Lists.newArrayListWithExpectedSize(keys.size()); - for (Pair<FileCacheKey, Future<FileCacheValue>> p : pList) { - fileLists.add(p.second.get()); + if (concurrent) { + List<Future<FileCacheValue>> pList = keys.stream().map( + key -> fileListingExecutor.submit(() -> loadFiles(key))).collect(Collectors.toList()); + fileLists = Lists.newArrayListWithExpectedSize(keys.size()); + for (Future<FileCacheValue> p : pList) { + fileLists.add(p.get()); + } + } else { + fileLists = keys.stream().map(this::loadFiles).collect(Collectors.toList()); } } } catch (ExecutionException e) { @@ -810,7 +811,7 @@ public class HiveMetaStoreCache { RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( new FileSystemCache.FileSystemCacheKey( LocationPath.getFSIdentity(location, bindBrokerName), - properties, bindBrokerName)); + properties, bindBrokerName)); List<RemoteFile> remoteFiles = new ArrayList<>(); Status status = fs.listFiles(location, false, remoteFiles); if (status.ok()) { @@ -837,7 +838,7 @@ public class HiveMetaStoreCache { RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( new FileSystemCache.FileSystemCacheKey( LocationPath.getFSIdentity(location, bindBrokerName), - properties, bindBrokerName)); + properties, bindBrokerName)); List<RemoteFile> remoteFiles = new ArrayList<>(); Status status = fs.listFiles(location, false, remoteFiles); if (status.ok()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java index 12024c25616..1970a48f2d4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java @@ -26,6 +26,7 @@ import org.apache.doris.catalog.PartitionItem; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugUtil; @@ -63,14 +64,17 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.io.IOException; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; public class HiveScanNode extends FileQueryScanNode { @@ -98,9 +102,10 @@ public class HiveScanNode extends FileQueryScanNode { private SelectedPartitions selectedPartitions = null; private boolean partitionInit = false; + private final AtomicReference<UserException> batchException = new AtomicReference<>(null); private List<HivePartition> prunedPartitions; - private Iterator<HivePartition> prunedPartitionsIter; - private int numSplitsPerPartition = NUM_SPLITS_PER_PARTITION; + private final Semaphore splittersOnFlight = new Semaphore(NUM_SPLITTERS_ON_FLIGHT); + private final AtomicInteger numSplitsPerPartition = new AtomicInteger(NUM_SPLITS_PER_PARTITION); /** * * External file scan node for Query Hive table @@ -140,7 +145,7 @@ public class HiveScanNode extends FileQueryScanNode { List<Type> partitionColumnTypes = hmsTable.getPartitionColumnTypes(); if (!partitionColumnTypes.isEmpty()) { // partitioned table - boolean isPartitionPruned = selectedPartitions == null ? false : selectedPartitions.isPruned; + boolean isPartitionPruned = selectedPartitions != null && selectedPartitions.isPruned; Collection<PartitionItem> partitionItems; if (!isPartitionPruned) { // partitionItems is null means that the partition is not pruned by Nereids, @@ -232,36 +237,52 @@ public class HiveScanNode extends FileQueryScanNode { } @Override - public List<Split> getNextBatch(int maxBatchSize) throws UserException { - try { - HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() - .getMetaStoreCache((HMSExternalCatalog) hmsTable.getCatalog()); - String bindBrokerName = hmsTable.getCatalog().bindBrokerName(); - List<Split> allFiles = Lists.newArrayList(); - int numPartitions = 0; - while (allFiles.size() < maxBatchSize && prunedPartitionsIter.hasNext()) { - List<HivePartition> partitions = new ArrayList<>(NUM_PARTITIONS_PER_LOOP); - for (int i = 0; i < NUM_PARTITIONS_PER_LOOP && prunedPartitionsIter.hasNext(); ++i) { - partitions.add(prunedPartitionsIter.next()); - numPartitions++; + public void startSplit() { + if (prunedPartitions.isEmpty()) { + splitAssignment.finishSchedule(); + return; + } + HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() + .getMetaStoreCache((HMSExternalCatalog) hmsTable.getCatalog()); + Executor scheduleExecutor = Env.getCurrentEnv().getExtMetaCacheMgr().getScheduleExecutor(); + String bindBrokerName = hmsTable.getCatalog().bindBrokerName(); + AtomicInteger numFinishedPartitions = new AtomicInteger(0); + CompletableFuture.runAsync(() -> { + for (HivePartition partition : prunedPartitions) { + if (batchException.get() != null || splitAssignment.isStop()) { + break; + } + try { + splittersOnFlight.acquire(); + } catch (InterruptedException e) { + batchException.set(new UserException(e.getMessage(), e)); + break; } - getFileSplitByPartitions(cache, partitions, allFiles, bindBrokerName); + CompletableFuture.runAsync(() -> { + try { + List<Split> allFiles = Lists.newArrayList(); + getFileSplitByPartitions(cache, Collections.singletonList(partition), allFiles, bindBrokerName); + if (allFiles.size() > numSplitsPerPartition.get()) { + numSplitsPerPartition.set(allFiles.size()); + } + splitAssignment.addToQueue(allFiles); + } catch (IOException e) { + batchException.set(new UserException(e.getMessage(), e)); + } finally { + splittersOnFlight.release(); + if (batchException.get() != null) { + splitAssignment.setException(batchException.get()); + } + if (numFinishedPartitions.incrementAndGet() == prunedPartitions.size()) { + splitAssignment.finishSchedule(); + } + } + }, scheduleExecutor); } - if (allFiles.size() / numPartitions > numSplitsPerPartition) { - numSplitsPerPartition = allFiles.size() / numPartitions; + if (batchException.get() != null) { + splitAssignment.setException(batchException.get()); } - return allFiles; - } catch (Throwable t) { - LOG.warn("get file split failed for table: {}", hmsTable.getName(), t); - throw new UserException( - "get file split failed for table: " + hmsTable.getName() + ", err: " + Util.getRootCauseMessage(t), - t); - } - } - - @Override - public boolean hasNext() { - return prunedPartitionsIter.hasNext(); + }); } @Override @@ -272,7 +293,6 @@ public class HiveScanNode extends FileQueryScanNode { } catch (Exception e) { return false; } - prunedPartitionsIter = prunedPartitions.iterator(); partitionInit = true; } int numPartitions = ConnectContext.get().getSessionVariable().getNumPartitionsInBatchMode(); @@ -281,7 +301,7 @@ public class HiveScanNode extends FileQueryScanNode { @Override public int numApproximateSplits() { - return numSplitsPerPartition * prunedPartitions.size(); + return numSplitsPerPartition.get() * prunedPartitions.size(); } private void getFileSplitByPartitions(HiveMetaStoreCache cache, List<HivePartition> partitions, @@ -290,7 +310,8 @@ public class HiveScanNode extends FileQueryScanNode { if (hiveTransaction != null) { fileCaches = getFileSplitByTransaction(cache, partitions, bindBrokerName); } else { - fileCaches = cache.getFilesByPartitionsWithCache(partitions, bindBrokerName); + boolean withCache = Config.max_external_file_cache_num > 0; + fileCaches = cache.getFilesByPartitions(partitions, withCache, withCache, bindBrokerName); } if (tableSample != null) { List<HiveMetaStoreCache.HiveFileStatus> hiveFileStatuses = selectFiles(fileCaches); @@ -463,10 +484,7 @@ public class HiveScanNode extends FileQueryScanNode { public boolean pushDownAggNoGrouping(FunctionCallExpr aggExpr) { String aggFunctionName = aggExpr.getFnName().getFunction(); - if (aggFunctionName.equalsIgnoreCase("COUNT")) { - return true; - } - return false; + return aggFunctionName.equalsIgnoreCase("COUNT"); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java index 8f2b3e598b9..82e21bcdd17 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java @@ -69,13 +69,15 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -101,9 +103,11 @@ public class HudiScanNode extends HiveScanNode { private HoodieTimeline timeline; private Option<String> snapshotTimestamp; private String queryInstant; + + private final AtomicReference<UserException> batchException = new AtomicReference<>(null); private List<HivePartition> prunedPartitions; - private Iterator<HivePartition> prunedPartitionsIter; - private int numSplitsPerPartition = NUM_SPLITS_PER_PARTITION; + private final Semaphore splittersOnFlight = new Semaphore(NUM_SPLITTERS_ON_FLIGHT); + private final AtomicInteger numSplitsPerPartition = new AtomicInteger(NUM_SPLITS_PER_PARTITION); private boolean incrementalRead = false; private TableScanParams scanParams; @@ -206,7 +210,6 @@ public class HudiScanNode extends HiveScanNode { Option<HoodieInstant> snapshotInstant = timeline.lastInstant(); if (!snapshotInstant.isPresent()) { prunedPartitions = Collections.emptyList(); - prunedPartitionsIter = prunedPartitions.iterator(); partitionInit = true; return; } @@ -320,47 +323,47 @@ public class HudiScanNode extends HiveScanNode { incrementalRelation.getEndTs())).collect(Collectors.toList()); } + private void getPartitionSplits(HivePartition partition, List<Split> splits) throws IOException { + String globPath; + String partitionName; + if (partition.isDummyPartition()) { + partitionName = ""; + globPath = hudiClient.getBasePathV2().toString() + "/*"; + } else { + partitionName = FSUtils.getRelativePartitionPath(hudiClient.getBasePathV2(), + new Path(partition.getPath())); + globPath = String.format("%s/%s/*", hudiClient.getBasePathV2().toString(), partitionName); + } + List<FileStatus> statuses = FSUtils.getGlobStatusExcludingMetaFolder( + hudiClient.getRawFs(), new Path(globPath)); + HoodieTableFileSystemView fileSystemView = new HoodieTableFileSystemView(hudiClient, + timeline, statuses.toArray(new FileStatus[0])); + + if (isCowOrRoTable) { + fileSystemView.getLatestBaseFilesBeforeOrOn(partitionName, queryInstant).forEach(baseFile -> { + noLogsSplitNum.incrementAndGet(); + String filePath = baseFile.getPath(); + long fileSize = baseFile.getFileSize(); + // Need add hdfs host to location + LocationPath locationPath = new LocationPath(filePath, hmsTable.getCatalogProperties()); + Path splitFilePath = locationPath.toStorageLocation(); + splits.add(new FileSplit(splitFilePath, 0, fileSize, fileSize, + new String[0], partition.getPartitionValues())); + }); + } else { + fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionName, queryInstant) + .forEach(fileSlice -> splits.add( + generateHudiSplit(fileSlice, partition.getPartitionValues(), queryInstant))); + } + } + private void getPartitionSplits(List<HivePartition> partitions, List<Split> splits) { Executor executor = Env.getCurrentEnv().getExtMetaCacheMgr().getFileListingExecutor(); CountDownLatch countDownLatch = new CountDownLatch(partitions.size()); AtomicReference<Throwable> throwable = new AtomicReference<>(); partitions.forEach(partition -> executor.execute(() -> { try { - String globPath; - String partitionName = ""; - if (partition.isDummyPartition()) { - globPath = hudiClient.getBasePathV2().toString() + "/*"; - } else { - partitionName = FSUtils.getRelativePartitionPath(hudiClient.getBasePathV2(), - new Path(partition.getPath())); - globPath = String.format("%s/%s/*", hudiClient.getBasePathV2().toString(), partitionName); - } - List<FileStatus> statuses; - try { - statuses = FSUtils.getGlobStatusExcludingMetaFolder(hudiClient.getRawFs(), - new Path(globPath)); - } catch (IOException e) { - throw new RuntimeException("Failed to get hudi file statuses on path: " + globPath, e); - } - HoodieTableFileSystemView fileSystemView = new HoodieTableFileSystemView(hudiClient, - timeline, statuses.toArray(new FileStatus[0])); - - if (isCowOrRoTable) { - fileSystemView.getLatestBaseFilesBeforeOrOn(partitionName, queryInstant).forEach(baseFile -> { - noLogsSplitNum.incrementAndGet(); - String filePath = baseFile.getPath(); - long fileSize = baseFile.getFileSize(); - // Need add hdfs host to location - LocationPath locationPath = new LocationPath(filePath, hmsTable.getCatalogProperties()); - Path splitFilePath = locationPath.toStorageLocation(); - splits.add(new FileSplit(splitFilePath, 0, fileSize, fileSize, - new String[0], partition.getPartitionValues())); - }); - } else { - fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionName, queryInstant) - .forEach(fileSlice -> splits.add( - generateHudiSplit(fileSlice, partition.getPartitionValues(), queryInstant))); - } + getPartitionSplits(partition, splits); } catch (Throwable t) { throwable.set(t); } finally { @@ -394,26 +397,48 @@ public class HudiScanNode extends HiveScanNode { } @Override - public List<Split> getNextBatch(int maxBatchSize) throws UserException { - List<Split> splits = Collections.synchronizedList(new ArrayList<>()); - int numPartitions = 0; - while (splits.size() < maxBatchSize && prunedPartitionsIter.hasNext()) { - List<HivePartition> partitions = new ArrayList<>(NUM_PARTITIONS_PER_LOOP); - for (int i = 0; i < NUM_PARTITIONS_PER_LOOP && prunedPartitionsIter.hasNext(); ++i) { - partitions.add(prunedPartitionsIter.next()); - numPartitions++; - } - getPartitionSplits(partitions, splits); + public void startSplit() { + if (prunedPartitions.isEmpty()) { + splitAssignment.finishSchedule(); + return; } - if (splits.size() / numPartitions > numSplitsPerPartition) { - numSplitsPerPartition = splits.size() / numPartitions; - } - return splits; - } - - @Override - public boolean hasNext() { - return prunedPartitionsIter.hasNext(); + AtomicInteger numFinishedPartitions = new AtomicInteger(0); + CompletableFuture.runAsync(() -> { + for (HivePartition partition : prunedPartitions) { + if (batchException.get() != null || splitAssignment.isStop()) { + break; + } + try { + splittersOnFlight.acquire(); + } catch (InterruptedException e) { + batchException.set(new UserException(e.getMessage(), e)); + break; + } + CompletableFuture.runAsync(() -> { + try { + List<Split> allFiles = Lists.newArrayList(); + getPartitionSplits(partition, allFiles); + if (allFiles.size() > numSplitsPerPartition.get()) { + numSplitsPerPartition.set(allFiles.size()); + } + splitAssignment.addToQueue(allFiles); + } catch (IOException e) { + batchException.set(new UserException(e.getMessage(), e)); + } finally { + splittersOnFlight.release(); + if (batchException.get() != null) { + splitAssignment.setException(batchException.get()); + } + if (numFinishedPartitions.incrementAndGet() == prunedPartitions.size()) { + splitAssignment.finishSchedule(); + } + } + }); + } + if (batchException.get() != null) { + splitAssignment.setException(batchException.get()); + } + }); } @Override @@ -426,7 +451,6 @@ public class HudiScanNode extends HiveScanNode { prunedPartitions = HiveMetaStoreClientHelper.ugiDoAs( HiveMetaStoreClientHelper.getConfiguration(hmsTable), () -> getPrunedPartitions(hudiClient, snapshotTimestamp)); - prunedPartitionsIter = prunedPartitions.iterator(); partitionInit = true; } int numPartitions = ConnectContext.get().getSessionVariable().getNumPartitionsInBatchMode(); @@ -435,7 +459,7 @@ public class HudiScanNode extends HiveScanNode { @Override public int numApproximateSplits() { - return numSplitsPerPartition * prunedPartitions.size(); + return numSplitsPerPartition.get() * prunedPartitions.size(); } private HudiSplit generateHudiSplit(FileSlice fileSlice, List<String> partitionValues, String queryInstant) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java index a11f3300b9d..9c896bd7504 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java @@ -44,6 +44,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.UserException; import org.apache.doris.datasource.FederationBackendPolicy; +import org.apache.doris.datasource.SplitAssignment; import org.apache.doris.datasource.SplitGenerator; import org.apache.doris.datasource.SplitSource; import org.apache.doris.nereids.glue.translator.PlanTranslatorContext; @@ -84,7 +85,7 @@ import java.util.stream.IntStream; public abstract class ScanNode extends PlanNode implements SplitGenerator { private static final Logger LOG = LogManager.getLogger(ScanNode.class); protected static final int NUM_SPLITS_PER_PARTITION = 10; - protected static final int NUM_PARTITIONS_PER_LOOP = 100; + protected static final int NUM_SPLITTERS_ON_FLIGHT = Config.max_external_cache_loader_thread_pool_size; protected final TupleDescriptor desc; // for distribution prunner protected Map<String, PartitionColumnFilter> columnFilters = Maps.newHashMap(); @@ -95,6 +96,7 @@ public abstract class ScanNode extends PlanNode implements SplitGenerator { protected List<TScanRangeLocations> scanRangeLocations = Lists.newArrayList(); protected List<SplitSource> splitSources = Lists.newArrayList(); protected PartitionInfo partitionsInfo = null; + protected SplitAssignment splitAssignment = null; // create a mapping between output slot's id and project expr Map<SlotId, Expr> outputSlotToProjectExpr = new HashMap<>(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 83acbba5d16..c6755746106 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -634,6 +634,9 @@ public class Coordinator implements CoordInterface { @Override public void close() { + for (ScanNode scanNode : scanNodes) { + scanNode.stop(); + } if (queryQueue != null && queueToken != null) { try { queryQueue.releaseAndNotify(queueToken); @@ -1208,6 +1211,9 @@ public class Coordinator implements CoordInterface { @Override public void cancel(Status cancelReason) { + for (ScanNode scanNode : scanNodes) { + scanNode.stop(); + } if (cancelReason.ok()) { throw new RuntimeException("Should use correct cancel reason, but it is " + cancelReason.toString()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org