This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 6563f258ee9155cd9bc5b9c8ba40267eadf62891 Author: Ashin Gau <ashin...@users.noreply.github.com> AuthorDate: Mon Jul 1 22:05:11 2024 +0800 [opt](split) add max wait time of getting splits (#36843) Add session variable `fetch_splits_max_wait_time` to control the max wait time of getting splits to prevent long waiting time. --- be/src/pipeline/exec/file_scan_operator.cpp | 3 ++- be/src/vec/exec/scan/split_source_connector.cpp | 2 +- be/src/vec/exec/scan/split_source_connector.h | 15 +++++++-------- be/src/vec/exec/scan/vfile_scanner.cpp | 2 -- be/src/vec/exec/scan/vfile_scanner.h | 1 - .../org/apache/doris/datasource/FileQueryScanNode.java | 3 ++- .../java/org/apache/doris/datasource/SplitSource.java | 18 ++++++++++-------- .../main/java/org/apache/doris/qe/SessionVariable.java | 17 +++++++++++++++++ 8 files changed, 39 insertions(+), 22 deletions(-) diff --git a/be/src/pipeline/exec/file_scan_operator.cpp b/be/src/pipeline/exec/file_scan_operator.cpp index 9e636d0331f..98cc91824f6 100644 --- a/be/src/pipeline/exec/file_scan_operator.cpp +++ b/be/src/pipeline/exec/file_scan_operator.cpp @@ -71,8 +71,9 @@ void FileScanLocalState::set_scan_ranges(RuntimeState* state, auto scan_range = scan_ranges[0].scan_range.ext_scan_range.file_scan_range; if (scan_range.__isset.split_source) { auto split_source = scan_range.split_source; + RuntimeProfile::Counter* get_split_timer = ADD_TIMER(_runtime_profile, "GetSplitTime"); _split_source = std::make_shared<vectorized::RemoteSplitSourceConnector>( - state, split_source.split_source_id, split_source.num_splits); + state, get_split_timer, split_source.split_source_id, split_source.num_splits); } } if (_split_source == nullptr) { diff --git a/be/src/vec/exec/scan/split_source_connector.cpp b/be/src/vec/exec/scan/split_source_connector.cpp index fae65543e53..9bba44b4e76 100644 --- a/be/src/vec/exec/scan/split_source_connector.cpp +++ b/be/src/vec/exec/scan/split_source_connector.cpp @@ -45,7 +45,7 @@ Status RemoteSplitSourceConnector::get_next(bool* has_next, TFileRangeDesc* rang std::lock_guard<std::mutex> l(_range_lock); *has_next = false; if (_scan_index == _scan_ranges.size() && !_last_batch) { - SCOPED_RAW_TIMER(&_get_split_timer); + SCOPED_TIMER(_get_split_timer); Status coord_status; FrontendServiceConnection coord(_state->exec_env()->frontend_client_cache(), _state->get_query_ctx()->coord_addr, &coord_status); diff --git a/be/src/vec/exec/scan/split_source_connector.h b/be/src/vec/exec/scan/split_source_connector.h index bfda961df34..f62b45612bf 100644 --- a/be/src/vec/exec/scan/split_source_connector.h +++ b/be/src/vec/exec/scan/split_source_connector.h @@ -43,8 +43,6 @@ public: virtual int num_scan_ranges() = 0; virtual TFileScanRangeParams* get_params() = 0; - - virtual int64_t get_split_time() { return 0; } }; /** @@ -89,6 +87,7 @@ class RemoteSplitSourceConnector : public SplitSourceConnector { private: std::mutex _range_lock; RuntimeState* _state; + RuntimeProfile::Counter* _get_split_timer; int64 _split_source_id; int _num_splits; @@ -97,11 +96,13 @@ private: int _scan_index = 0; int _range_index = 0; - int64_t _get_split_timer = 0; - public: - RemoteSplitSourceConnector(RuntimeState* state, int64 split_source_id, int num_splits) - : _state(state), _split_source_id(split_source_id), _num_splits(num_splits) {} + RemoteSplitSourceConnector(RuntimeState* state, RuntimeProfile::Counter* get_split_timer, + int64 split_source_id, int num_splits) + : _state(state), + _get_split_timer(get_split_timer), + _split_source_id(split_source_id), + _num_splits(num_splits) {} Status get_next(bool* has_next, TFileRangeDesc* range) override; @@ -114,8 +115,6 @@ public: TFileScanRangeParams* get_params() override { LOG(FATAL) << "Unreachable, params is got by file_scan_range_params_map"; } - - int64_t get_split_time() override { return _get_split_timer; } }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 4932e164649..f6f029b9de0 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -138,7 +138,6 @@ Status VFileScanner::prepare( _file_counter = ADD_COUNTER(_local_state->scanner_profile(), "FileNumber", TUnit::UNIT); _has_fully_rf_file_counter = ADD_COUNTER(_local_state->scanner_profile(), "HasFullyRfFileNumber", TUnit::UNIT); - _get_split_timer = ADD_TIMER(_local_state->scanner_profile(), "GetSplitTime"); _file_cache_statistics.reset(new io::FileCacheStatistics()); _io_ctx.reset(new io::IOContext()); @@ -1163,7 +1162,6 @@ Status VFileScanner::close(RuntimeState* state) { if (_cur_reader) { RETURN_IF_ERROR(_cur_reader->close()); } - COUNTER_UPDATE(_get_split_timer, _split_source->get_split_time()); RETURN_IF_ERROR(VScanner::close(state)); return Status::OK(); diff --git a/be/src/vec/exec/scan/vfile_scanner.h b/be/src/vec/exec/scan/vfile_scanner.h index 61d75d65683..332bdfe11e1 100644 --- a/be/src/vec/exec/scan/vfile_scanner.h +++ b/be/src/vec/exec/scan/vfile_scanner.h @@ -180,7 +180,6 @@ private: RuntimeProfile::Counter* _empty_file_counter = nullptr; RuntimeProfile::Counter* _file_counter = nullptr; RuntimeProfile::Counter* _has_fully_rf_file_counter = nullptr; - RuntimeProfile::Counter* _get_split_timer = nullptr; const std::unordered_map<std::string, int>* _col_name_to_slot_id = nullptr; // single slot filter conjuncts diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java index 9039f1a8c58..af42093442b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java @@ -337,10 +337,11 @@ public abstract class FileQueryScanNode extends FileScanNode { locationType = getLocationType(fileSplit.getPath().toString()); } totalFileSize = fileSplit.getLength() * inputSplitsNum; + long maxWaitTime = ConnectContext.get().getSessionVariable().getFetchSplitsMaxWaitTime(); // Not accurate, only used to estimate concurrency. int numSplitsPerBE = numApproximateSplits() / backendPolicy.numBackends(); for (Backend backend : backendPolicy.getBackends()) { - SplitSource splitSource = new SplitSource(backend, splitAssignment); + SplitSource splitSource = new SplitSource(backend, splitAssignment, maxWaitTime); splitSources.add(splitSource); Env.getCurrentEnv().getSplitSourceManager().registerSplitSource(splitSource); TScanRangeLocations curLocations = newLocations(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitSource.java index dce135292ec..8515e686f36 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitSource.java @@ -44,17 +44,18 @@ import java.util.concurrent.atomic.AtomicLong; public class SplitSource { private static final AtomicLong UNIQUE_ID_GENERATOR = new AtomicLong(0); private static final long WAIT_TIME_OUT = 100; // 100ms - private static final long MAX_WAIT_TIME_OUT = 500; // 500ms private final long uniqueId; private final Backend backend; private final SplitAssignment splitAssignment; private final AtomicBoolean isLastBatch; + private final long maxWaitTime; - public SplitSource(Backend backend, SplitAssignment splitAssignment) { + public SplitSource(Backend backend, SplitAssignment splitAssignment, long maxWaitTime) { this.uniqueId = UNIQUE_ID_GENERATOR.getAndIncrement(); this.backend = backend; this.splitAssignment = splitAssignment; + this.maxWaitTime = maxWaitTime; this.isLastBatch = new AtomicBoolean(false); splitAssignment.registerSource(uniqueId); } @@ -71,7 +72,7 @@ public class SplitSource { return Collections.emptyList(); } List<TScanRangeLocations> scanRanges = Lists.newArrayListWithExpectedSize(maxBatchSize); - long maxTimeOut = 0; + long startTime = System.currentTimeMillis(); while (scanRanges.size() < maxBatchSize) { BlockingQueue<Collection<TScanRangeLocations>> splits = splitAssignment.getAssignedSplits(backend); if (splits == null) { @@ -81,18 +82,19 @@ public class SplitSource { while (scanRanges.size() < maxBatchSize) { try { Collection<TScanRangeLocations> splitCollection = splits.poll(WAIT_TIME_OUT, TimeUnit.MILLISECONDS); + if (splitCollection != null) { + scanRanges.addAll(splitCollection); + } + if (!scanRanges.isEmpty() && System.currentTimeMillis() - startTime > maxWaitTime) { + return scanRanges; + } if (splitCollection == null) { - maxTimeOut += WAIT_TIME_OUT; break; } - scanRanges.addAll(splitCollection); } catch (InterruptedException e) { throw new UserException("Failed to get next batch of splits", e); } } - if (maxTimeOut >= MAX_WAIT_TIME_OUT && !scanRanges.isEmpty()) { - break; - } } return scanRanges; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 68c6505beb3..7aa78c2bbb2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -446,6 +446,8 @@ public class SessionVariable implements Serializable, Writable { public static final String NUM_PARTITIONS_IN_BATCH_MODE = "num_partitions_in_batch_mode"; + public static final String FETCH_SPLITS_MAX_WAIT_TIME = "fetch_splits_max_wait_time_ms"; + /** * use insert stmt as the unified backend for all loads */ @@ -1555,6 +1557,13 @@ public class SessionVariable implements Serializable, Writable { needForward = true) public int numPartitionsInBatchMode = 1024; + @VariableMgr.VarAttr( + name = FETCH_SPLITS_MAX_WAIT_TIME, + description = {"batch方式中BE获取splits的最大等待时间", + "The max wait time of getting splits in batch mode."}, + needForward = true) + public long fetchSplitsMaxWaitTime = 4000; + @VariableMgr.VarAttr( name = ENABLE_PARQUET_LAZY_MAT, description = {"控制 parquet reader 是否启用延迟物化技术。默认为 true。", @@ -2870,6 +2879,14 @@ public class SessionVariable implements Serializable, Writable { this.numPartitionsInBatchMode = numPartitionsInBatchMode; } + public long getFetchSplitsMaxWaitTime() { + return fetchSplitsMaxWaitTime; + } + + public void setFetchSplitsMaxWaitTime(long fetchSplitsMaxWaitTime) { + this.fetchSplitsMaxWaitTime = fetchSplitsMaxWaitTime; + } + public boolean isEnableParquetLazyMat() { return enableParquetLazyMat; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org