This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
     new 637ecc450c [Improvement](scan) add a config for scan queue memory 
limit (#19439) (#19996)
637ecc450c is described below

commit 637ecc450c6665f0cb8ab0c651f9a403f9028dd8
Author: Xinyi Zou <zouxiny...@gmail.com>
AuthorDate: Thu May 25 15:01:00 2023 +0800

    [Improvement](scan) add a config for scan queue memory limit (#19439) 
(#19996)
    
    Co-authored-by: Gabriel <gabrielleeb...@gmail.com>
---
 be/src/runtime/runtime_state.h                             |  4 ++++
 be/src/vec/exec/scan/vscan_node.cpp                        |  2 +-
 .../src/main/java/org/apache/doris/analysis/SetVar.java    |  4 ++++
 .../src/main/java/org/apache/doris/qe/SessionVariable.java | 14 ++++++++++++++
 gensrc/thrift/PaloInternalService.thrift                   |  2 ++
 5 files changed, 25 insertions(+), 1 deletion(-)

diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 95cddb0424..dc4c5f97d1 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -85,6 +85,10 @@ public:
     Status create_load_dir();
 
     const TQueryOptions& query_options() const { return _query_options; }
+    int64_t scan_queue_mem_limit() const {
+        return _query_options.__isset.scan_queue_mem_limit ? 
_query_options.scan_queue_mem_limit
+                                                           : 
_query_options.mem_limit / 20;
+    }
     ObjectPool* obj_pool() const { return _obj_pool.get(); }
 
     std::shared_ptr<ObjectPool> obj_pool_ptr() const { return _obj_pool; }
diff --git a/be/src/vec/exec/scan/vscan_node.cpp 
b/be/src/vec/exec/scan/vscan_node.cpp
index 002229ebb3..e708c22997 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -187,7 +187,7 @@ Status VScanNode::_init_profile() {
 Status VScanNode::_start_scanners(const std::list<VScanner*>& scanners) {
     _scanner_ctx.reset(new ScannerContext(_state, this, _input_tuple_desc, 
_output_tuple_desc,
                                           scanners, limit(),
-                                          _state->query_options().mem_limit / 
20));
+                                          _state->scan_queue_mem_limit()));
     RETURN_IF_ERROR(_scanner_ctx->init());
     
RETURN_IF_ERROR(_state->exec_env()->scanner_scheduler()->submit(_scanner_ctx.get()));
     return Status::OK();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java
index ebe38983f9..f880bf00d9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java
@@ -147,6 +147,10 @@ public class SetVar {
             this.value = new 
StringLiteral(Long.toString(ParseUtil.analyzeDataVolumn(getValue().getStringValue())));
             this.result = (LiteralExpr) this.value;
         }
+        if 
(getVariable().equalsIgnoreCase(SessionVariable.SCAN_QUEUE_MEM_LIMIT)) {
+            this.value = new 
StringLiteral(Long.toString(ParseUtil.analyzeDataVolumn(getValue().getStringValue())));
+            this.result = (LiteralExpr) this.value;
+        }
         if (getVariable().equalsIgnoreCase("is_report_success")) {
             variable = SessionVariable.ENABLE_PROFILE;
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index e1ac24bd55..432e6b4cc1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -50,6 +50,7 @@ public class SessionVariable implements Serializable, 
Writable {
     public static final Logger LOG = 
LogManager.getLogger(SessionVariable.class);
 
     public static final String EXEC_MEM_LIMIT = "exec_mem_limit";
+    public static final String SCAN_QUEUE_MEM_LIMIT = "scan_queue_mem_limit";
     public static final String QUERY_TIMEOUT = "query_timeout";
     public static final String ENABLE_PROFILE = "enable_profile";
     public static final String SQL_MODE = "sql_mode";
@@ -271,6 +272,9 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = EXEC_MEM_LIMIT)
     public long maxExecMemByte = 2147483648L;
 
+    @VariableMgr.VarAttr(name = SCAN_QUEUE_MEM_LIMIT)
+    public long maxScanQueueMemByte = 2147483648L / 20;
+
     @VariableMgr.VarAttr(name = ENABLE_SPILLING)
     public boolean enableSpilling = false;
 
@@ -736,6 +740,10 @@ public class SessionVariable implements Serializable, 
Writable {
         return maxExecMemByte;
     }
 
+    public long getMaxScanQueueExecMemByte() {
+        return maxScanQueueMemByte;
+    }
+
     public int getQueryTimeoutS() {
         return queryTimeoutS;
     }
@@ -875,6 +883,10 @@ public class SessionVariable implements Serializable, 
Writable {
         }
     }
 
+    public void setMaxScanQueueMemByte(long scanQueueMemByte) {
+        this.maxScanQueueMemByte = Math.min(scanQueueMemByte, maxExecMemByte / 
20);
+    }
+
     public boolean isSqlQuoteShowCreate() {
         return sqlQuoteShowCreate;
     }
@@ -1355,6 +1367,7 @@ public class SessionVariable implements Serializable, 
Writable {
     public TQueryOptions toThrift() {
         TQueryOptions tResult = new TQueryOptions();
         tResult.setMemLimit(maxExecMemByte);
+        tResult.setScanQueueMemLimit(Math.min(maxScanQueueMemByte, 
maxExecMemByte / 20));
 
         // TODO chenhao, reservation will be calculated by cost
         tResult.setMinReservation(0);
@@ -1583,6 +1596,7 @@ public class SessionVariable implements Serializable, 
Writable {
     public TQueryOptions getQueryOptionVariables() {
         TQueryOptions queryOptions = new TQueryOptions();
         queryOptions.setMemLimit(maxExecMemByte);
+        queryOptions.setScanQueueMemLimit(Math.min(maxScanQueueMemByte, 
maxExecMemByte / 20));
         queryOptions.setQueryTimeout(queryTimeoutS);
         return queryOptions;
     }
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index 816b932e7d..6ebf0f4a3a 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -188,6 +188,8 @@ struct TQueryOptions {
 
   // For debug purpose, skip delete bitmap when reading data
   56: optional bool skip_delete_bitmap = false
+
+  57: optional i64 scan_queue_mem_limit
 }
     
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to