This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 6563f258ee9155cd9bc5b9c8ba40267eadf62891
Author: Ashin Gau <ashin...@users.noreply.github.com>
AuthorDate: Mon Jul 1 22:05:11 2024 +0800

    [opt](split) add max wait time of getting splits (#36843)
    
    Add session variable `fetch_splits_max_wait_time` to control the max
    wait time of getting splits to prevent long waiting time.
---
 be/src/pipeline/exec/file_scan_operator.cpp            |  3 ++-
 be/src/vec/exec/scan/split_source_connector.cpp        |  2 +-
 be/src/vec/exec/scan/split_source_connector.h          | 15 +++++++--------
 be/src/vec/exec/scan/vfile_scanner.cpp                 |  2 --
 be/src/vec/exec/scan/vfile_scanner.h                   |  1 -
 .../org/apache/doris/datasource/FileQueryScanNode.java |  3 ++-
 .../java/org/apache/doris/datasource/SplitSource.java  | 18 ++++++++++--------
 .../main/java/org/apache/doris/qe/SessionVariable.java | 17 +++++++++++++++++
 8 files changed, 39 insertions(+), 22 deletions(-)

diff --git a/be/src/pipeline/exec/file_scan_operator.cpp 
b/be/src/pipeline/exec/file_scan_operator.cpp
index 9e636d0331f..98cc91824f6 100644
--- a/be/src/pipeline/exec/file_scan_operator.cpp
+++ b/be/src/pipeline/exec/file_scan_operator.cpp
@@ -71,8 +71,9 @@ void FileScanLocalState::set_scan_ranges(RuntimeState* state,
         auto scan_range = 
scan_ranges[0].scan_range.ext_scan_range.file_scan_range;
         if (scan_range.__isset.split_source) {
             auto split_source = scan_range.split_source;
+            RuntimeProfile::Counter* get_split_timer = 
ADD_TIMER(_runtime_profile, "GetSplitTime");
             _split_source = 
std::make_shared<vectorized::RemoteSplitSourceConnector>(
-                    state, split_source.split_source_id, 
split_source.num_splits);
+                    state, get_split_timer, split_source.split_source_id, 
split_source.num_splits);
         }
     }
     if (_split_source == nullptr) {
diff --git a/be/src/vec/exec/scan/split_source_connector.cpp 
b/be/src/vec/exec/scan/split_source_connector.cpp
index fae65543e53..9bba44b4e76 100644
--- a/be/src/vec/exec/scan/split_source_connector.cpp
+++ b/be/src/vec/exec/scan/split_source_connector.cpp
@@ -45,7 +45,7 @@ Status RemoteSplitSourceConnector::get_next(bool* has_next, 
TFileRangeDesc* rang
     std::lock_guard<std::mutex> l(_range_lock);
     *has_next = false;
     if (_scan_index == _scan_ranges.size() && !_last_batch) {
-        SCOPED_RAW_TIMER(&_get_split_timer);
+        SCOPED_TIMER(_get_split_timer);
         Status coord_status;
         FrontendServiceConnection 
coord(_state->exec_env()->frontend_client_cache(),
                                         _state->get_query_ctx()->coord_addr, 
&coord_status);
diff --git a/be/src/vec/exec/scan/split_source_connector.h 
b/be/src/vec/exec/scan/split_source_connector.h
index bfda961df34..f62b45612bf 100644
--- a/be/src/vec/exec/scan/split_source_connector.h
+++ b/be/src/vec/exec/scan/split_source_connector.h
@@ -43,8 +43,6 @@ public:
     virtual int num_scan_ranges() = 0;
 
     virtual TFileScanRangeParams* get_params() = 0;
-
-    virtual int64_t get_split_time() { return 0; }
 };
 
 /**
@@ -89,6 +87,7 @@ class RemoteSplitSourceConnector : public 
SplitSourceConnector {
 private:
     std::mutex _range_lock;
     RuntimeState* _state;
+    RuntimeProfile::Counter* _get_split_timer;
     int64 _split_source_id;
     int _num_splits;
 
@@ -97,11 +96,13 @@ private:
     int _scan_index = 0;
     int _range_index = 0;
 
-    int64_t _get_split_timer = 0;
-
 public:
-    RemoteSplitSourceConnector(RuntimeState* state, int64 split_source_id, int 
num_splits)
-            : _state(state), _split_source_id(split_source_id), 
_num_splits(num_splits) {}
+    RemoteSplitSourceConnector(RuntimeState* state, RuntimeProfile::Counter* 
get_split_timer,
+                               int64 split_source_id, int num_splits)
+            : _state(state),
+              _get_split_timer(get_split_timer),
+              _split_source_id(split_source_id),
+              _num_splits(num_splits) {}
 
     Status get_next(bool* has_next, TFileRangeDesc* range) override;
 
@@ -114,8 +115,6 @@ public:
     TFileScanRangeParams* get_params() override {
         LOG(FATAL) << "Unreachable, params is got by 
file_scan_range_params_map";
     }
-
-    int64_t get_split_time() override { return _get_split_timer; }
 };
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp 
b/be/src/vec/exec/scan/vfile_scanner.cpp
index 4932e164649..f6f029b9de0 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -138,7 +138,6 @@ Status VFileScanner::prepare(
     _file_counter = ADD_COUNTER(_local_state->scanner_profile(), "FileNumber", 
TUnit::UNIT);
     _has_fully_rf_file_counter =
             ADD_COUNTER(_local_state->scanner_profile(), 
"HasFullyRfFileNumber", TUnit::UNIT);
-    _get_split_timer = ADD_TIMER(_local_state->scanner_profile(), 
"GetSplitTime");
 
     _file_cache_statistics.reset(new io::FileCacheStatistics());
     _io_ctx.reset(new io::IOContext());
@@ -1163,7 +1162,6 @@ Status VFileScanner::close(RuntimeState* state) {
     if (_cur_reader) {
         RETURN_IF_ERROR(_cur_reader->close());
     }
-    COUNTER_UPDATE(_get_split_timer, _split_source->get_split_time());
 
     RETURN_IF_ERROR(VScanner::close(state));
     return Status::OK();
diff --git a/be/src/vec/exec/scan/vfile_scanner.h 
b/be/src/vec/exec/scan/vfile_scanner.h
index 61d75d65683..332bdfe11e1 100644
--- a/be/src/vec/exec/scan/vfile_scanner.h
+++ b/be/src/vec/exec/scan/vfile_scanner.h
@@ -180,7 +180,6 @@ private:
     RuntimeProfile::Counter* _empty_file_counter = nullptr;
     RuntimeProfile::Counter* _file_counter = nullptr;
     RuntimeProfile::Counter* _has_fully_rf_file_counter = nullptr;
-    RuntimeProfile::Counter* _get_split_timer = nullptr;
 
     const std::unordered_map<std::string, int>* _col_name_to_slot_id = nullptr;
     // single slot filter conjuncts
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
index 9039f1a8c58..af42093442b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
@@ -337,10 +337,11 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
                 locationType = getLocationType(fileSplit.getPath().toString());
             }
             totalFileSize = fileSplit.getLength() * inputSplitsNum;
+            long maxWaitTime = 
ConnectContext.get().getSessionVariable().getFetchSplitsMaxWaitTime();
             // Not accurate, only used to estimate concurrency.
             int numSplitsPerBE = numApproximateSplits() / 
backendPolicy.numBackends();
             for (Backend backend : backendPolicy.getBackends()) {
-                SplitSource splitSource = new SplitSource(backend, 
splitAssignment);
+                SplitSource splitSource = new SplitSource(backend, 
splitAssignment, maxWaitTime);
                 splitSources.add(splitSource);
                 
Env.getCurrentEnv().getSplitSourceManager().registerSplitSource(splitSource);
                 TScanRangeLocations curLocations = newLocations();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitSource.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitSource.java
index dce135292ec..8515e686f36 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitSource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitSource.java
@@ -44,17 +44,18 @@ import java.util.concurrent.atomic.AtomicLong;
 public class SplitSource {
     private static final AtomicLong UNIQUE_ID_GENERATOR = new AtomicLong(0);
     private static final long WAIT_TIME_OUT = 100; // 100ms
-    private static final long MAX_WAIT_TIME_OUT = 500; // 500ms
 
     private final long uniqueId;
     private final Backend backend;
     private final SplitAssignment splitAssignment;
     private final AtomicBoolean isLastBatch;
+    private final long maxWaitTime;
 
-    public SplitSource(Backend backend, SplitAssignment splitAssignment) {
+    public SplitSource(Backend backend, SplitAssignment splitAssignment, long 
maxWaitTime) {
         this.uniqueId = UNIQUE_ID_GENERATOR.getAndIncrement();
         this.backend = backend;
         this.splitAssignment = splitAssignment;
+        this.maxWaitTime = maxWaitTime;
         this.isLastBatch = new AtomicBoolean(false);
         splitAssignment.registerSource(uniqueId);
     }
@@ -71,7 +72,7 @@ public class SplitSource {
             return Collections.emptyList();
         }
         List<TScanRangeLocations> scanRanges = 
Lists.newArrayListWithExpectedSize(maxBatchSize);
-        long maxTimeOut = 0;
+        long startTime = System.currentTimeMillis();
         while (scanRanges.size() < maxBatchSize) {
             BlockingQueue<Collection<TScanRangeLocations>> splits = 
splitAssignment.getAssignedSplits(backend);
             if (splits == null) {
@@ -81,18 +82,19 @@ public class SplitSource {
             while (scanRanges.size() < maxBatchSize) {
                 try {
                     Collection<TScanRangeLocations> splitCollection = 
splits.poll(WAIT_TIME_OUT, TimeUnit.MILLISECONDS);
+                    if (splitCollection != null) {
+                        scanRanges.addAll(splitCollection);
+                    }
+                    if (!scanRanges.isEmpty() && System.currentTimeMillis() - 
startTime > maxWaitTime) {
+                        return scanRanges;
+                    }
                     if (splitCollection == null) {
-                        maxTimeOut += WAIT_TIME_OUT;
                         break;
                     }
-                    scanRanges.addAll(splitCollection);
                 } catch (InterruptedException e) {
                     throw new UserException("Failed to get next batch of 
splits", e);
                 }
             }
-            if (maxTimeOut >= MAX_WAIT_TIME_OUT && !scanRanges.isEmpty()) {
-                break;
-            }
         }
         return scanRanges;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 68c6505beb3..7aa78c2bbb2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -446,6 +446,8 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String NUM_PARTITIONS_IN_BATCH_MODE = 
"num_partitions_in_batch_mode";
 
+    public static final String FETCH_SPLITS_MAX_WAIT_TIME = 
"fetch_splits_max_wait_time_ms";
+
     /**
      * use insert stmt as the unified backend for all loads
      */
@@ -1555,6 +1557,13 @@ public class SessionVariable implements Serializable, 
Writable {
             needForward = true)
     public int numPartitionsInBatchMode = 1024;
 
+    @VariableMgr.VarAttr(
+            name = FETCH_SPLITS_MAX_WAIT_TIME,
+            description = {"batch方式中BE获取splits的最大等待时间",
+                    "The max wait time of getting splits in batch mode."},
+            needForward = true)
+    public long fetchSplitsMaxWaitTime = 4000;
+
     @VariableMgr.VarAttr(
             name = ENABLE_PARQUET_LAZY_MAT,
             description = {"控制 parquet reader 是否启用延迟物化技术。默认为 true。",
@@ -2870,6 +2879,14 @@ public class SessionVariable implements Serializable, 
Writable {
         this.numPartitionsInBatchMode = numPartitionsInBatchMode;
     }
 
+    public long getFetchSplitsMaxWaitTime() {
+        return fetchSplitsMaxWaitTime;
+    }
+
+    public void setFetchSplitsMaxWaitTime(long fetchSplitsMaxWaitTime) {
+        this.fetchSplitsMaxWaitTime = fetchSplitsMaxWaitTime;
+    }
+
     public boolean isEnableParquetLazyMat() {
         return enableParquetLazyMat;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to