This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit c28ced1ebb8d76a0e8e0a8bdbf1b6bb68d0d5796
Author: wangbo <wan...@apache.org>
AuthorDate: Wed Jan 31 16:29:28 2024 +0800

    [Feature](executor)Insert select limited by WorkloadGroup #30610
---
 be/src/olap/delta_writer.cpp                       |  2 +-
 be/src/olap/delta_writer_v2.cpp                    |  6 +++++
 be/src/olap/memtable_flush_executor.cpp            | 12 +++++++++
 be/src/olap/memtable_flush_executor.h              |  3 +++
 be/src/olap/memtable_writer.cpp                    | 11 +++++---
 be/src/olap/memtable_writer.h                      |  2 +-
 be/src/vec/sink/writer/async_result_writer.cpp     | 30 ++++++++++++++-------
 .../main/java/org/apache/doris/qe/Coordinator.java | 31 +++++++++++++---------
 .../java/org/apache/doris/qe/StmtExecutor.java     |  5 ----
 9 files changed, 71 insertions(+), 31 deletions(-)

diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index bc1152fcd1f..c1bf7cd5381 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -105,7 +105,7 @@ Status BaseDeltaWriter::init() {
     RETURN_IF_ERROR(_rowset_builder->init());
     RETURN_IF_ERROR(_memtable_writer->init(
             _rowset_builder->rowset_writer(), _rowset_builder->tablet_schema(),
-            _rowset_builder->get_partial_update_info(),
+            _rowset_builder->get_partial_update_info(), nullptr,
             _rowset_builder->tablet()->enable_unique_key_merge_on_write()));
     
ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer);
     _is_init = true;
diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp
index db2c7ef80ce..e97db641a80 100644
--- a/be/src/olap/delta_writer_v2.cpp
+++ b/be/src/olap/delta_writer_v2.cpp
@@ -51,6 +51,7 @@
 #include "olap/tablet_manager.h"
 #include "olap/tablet_schema.h"
 #include "runtime/exec_env.h"
+#include "runtime/query_context.h"
 #include "service/backend_options.h"
 #include "util/brpc_client_cache.h"
 #include "util/debug_points.h"
@@ -134,7 +135,12 @@ Status DeltaWriterV2::init() {
 
     _rowset_writer = std::make_shared<BetaRowsetWriterV2>(_streams);
     RETURN_IF_ERROR(_rowset_writer->init(context));
+    ThreadPool* wg_thread_pool_ptr = nullptr;
+    if (_state->get_query_ctx()) {
+        wg_thread_pool_ptr = 
_state->get_query_ctx()->get_non_pipe_exec_thread_pool();
+    }
     RETURN_IF_ERROR(_memtable_writer->init(_rowset_writer, _tablet_schema, 
_partial_update_info,
+                                           wg_thread_pool_ptr,
                                            
_streams[0]->enable_unique_mow(_req.index_id)));
     
ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer);
     _is_init = true;
diff --git a/be/src/olap/memtable_flush_executor.cpp 
b/be/src/olap/memtable_flush_executor.cpp
index e1fa68a6a53..4d0af7496d9 100644
--- a/be/src/olap/memtable_flush_executor.cpp
+++ b/be/src/olap/memtable_flush_executor.cpp
@@ -238,6 +238,18 @@ Status 
MemTableFlushExecutor::create_flush_token(std::unique_ptr<FlushToken>& fl
     }
 }
 
+Status MemTableFlushExecutor::create_flush_token(std::unique_ptr<FlushToken>& 
flush_token,
+                                                 RowsetWriter* rowset_writer,
+                                                 ThreadPool* 
wg_flush_pool_ptr) {
+    if (rowset_writer->type() == BETA_ROWSET) {
+        flush_token = std::make_unique<FlushToken>(wg_flush_pool_ptr);
+    } else {
+        return Status::InternalError<false>("not support alpha rowset load 
now.");
+    }
+    flush_token->set_rowset_writer(rowset_writer);
+    return Status::OK();
+}
+
 void MemTableFlushExecutor::_register_metrics() {
     REGISTER_HOOK_METRIC(flush_thread_pool_queue_size,
                          [this]() { return _flush_pool->get_queue_size(); });
diff --git a/be/src/olap/memtable_flush_executor.h 
b/be/src/olap/memtable_flush_executor.h
index 80983baa66f..9896b8382da 100644
--- a/be/src/olap/memtable_flush_executor.h
+++ b/be/src/olap/memtable_flush_executor.h
@@ -128,6 +128,9 @@ public:
     Status create_flush_token(std::unique_ptr<FlushToken>& flush_token, 
RowsetWriter* rowset_writer,
                               bool is_high_priority);
 
+    Status create_flush_token(std::unique_ptr<FlushToken>& flush_token, 
RowsetWriter* rowset_writer,
+                              ThreadPool* wg_flush_pool_ptr);
+
 private:
     void _register_metrics();
     static void _deregister_metrics();
diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp
index d3ff02333a1..4fb24cb01cd 100644
--- a/be/src/olap/memtable_writer.cpp
+++ b/be/src/olap/memtable_writer.cpp
@@ -65,7 +65,7 @@ MemTableWriter::~MemTableWriter() {
 Status MemTableWriter::init(std::shared_ptr<RowsetWriter> rowset_writer,
                             TabletSchemaSPtr tablet_schema,
                             std::shared_ptr<PartialUpdateInfo> 
partial_update_info,
-                            bool unique_key_mow) {
+                            ThreadPool* wg_flush_pool_ptr, bool 
unique_key_mow) {
     _rowset_writer = rowset_writer;
     _tablet_schema = tablet_schema;
     _unique_key_mow = unique_key_mow;
@@ -76,8 +76,13 @@ Status MemTableWriter::init(std::shared_ptr<RowsetWriter> 
rowset_writer,
     // create flush handler
     // by assigning segment_id to memtable before submiting to flush executor,
     // we can make sure same keys sort in the same order in all replicas.
-    
RETURN_IF_ERROR(StorageEngine::instance()->memtable_flush_executor()->create_flush_token(
-            _flush_token, _rowset_writer.get(), _req.is_high_priority));
+    if (wg_flush_pool_ptr) {
+        
RETURN_IF_ERROR(StorageEngine::instance()->memtable_flush_executor()->create_flush_token(
+                _flush_token, _rowset_writer.get(), wg_flush_pool_ptr));
+    } else {
+        
RETURN_IF_ERROR(StorageEngine::instance()->memtable_flush_executor()->create_flush_token(
+                _flush_token, _rowset_writer.get(), _req.is_high_priority));
+    }
 
     _is_init = true;
     return Status::OK();
diff --git a/be/src/olap/memtable_writer.h b/be/src/olap/memtable_writer.h
index a2687d9402c..6b48421cb6e 100644
--- a/be/src/olap/memtable_writer.h
+++ b/be/src/olap/memtable_writer.h
@@ -69,7 +69,7 @@ public:
 
     Status init(std::shared_ptr<RowsetWriter> rowset_writer, TabletSchemaSPtr 
tablet_schema,
                 std::shared_ptr<PartialUpdateInfo> partial_update_info,
-                bool unique_key_mow = false);
+                ThreadPool* wg_flush_pool_ptr, bool unique_key_mow = false);
 
     Status write(const vectorized::Block* block, const std::vector<uint32_t>& 
row_idxs,
                  bool is_append = false);
diff --git a/be/src/vec/sink/writer/async_result_writer.cpp 
b/be/src/vec/sink/writer/async_result_writer.cpp
index bb8e3ea77e4..f7f4c40f010 100644
--- a/be/src/vec/sink/writer/async_result_writer.cpp
+++ b/be/src/vec/sink/writer/async_result_writer.cpp
@@ -92,15 +92,27 @@ void AsyncResultWriter::start_writer(RuntimeState* state, 
RuntimeProfile* profil
     // This is a async thread, should lock the task ctx, to make sure 
runtimestate and profile
     // not deconstructed before the thread exit.
     auto task_ctx = state->get_task_execution_context();
-    
static_cast<void>(ExecEnv::GetInstance()->fragment_mgr()->get_thread_pool()->submit_func(
-            [this, state, profile, task_ctx]() {
-                auto task_lock = task_ctx.lock();
-                if (task_lock == nullptr) {
-                    _writer_thread_closed = true;
-                    return;
-                }
-                this->process_block(state, profile);
-            }));
+    if (state->get_query_ctx() && 
state->get_query_ctx()->get_non_pipe_exec_thread_pool()) {
+        ThreadPool* pool_ptr = 
state->get_query_ctx()->get_non_pipe_exec_thread_pool();
+        static_cast<void>(pool_ptr->submit_func([this, state, profile, 
task_ctx]() {
+            auto task_lock = task_ctx.lock();
+            if (task_lock == nullptr) {
+                _writer_thread_closed = true;
+                return;
+            }
+            this->process_block(state, profile);
+        }));
+    } else {
+        
static_cast<void>(ExecEnv::GetInstance()->fragment_mgr()->get_thread_pool()->submit_func(
+                [this, state, profile, task_ctx]() {
+                    auto task_lock = task_ctx.lock();
+                    if (task_lock == nullptr) {
+                        _writer_thread_closed = true;
+                        return;
+                    }
+                    this->process_block(state, profile);
+                }));
+    }
 }
 
 void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* 
profile) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index cca9014a715..57dfde62f6e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -599,18 +599,25 @@ public class Coordinator implements CoordInterface {
     @Override
     public void exec() throws Exception {
         // LoadTask does not have context, not controlled by queue now
-        if (Config.enable_workload_group && Config.enable_query_queue && 
context != null) {
-            queryQueue = 
context.getEnv().getWorkloadGroupMgr().getWorkloadGroupQueryQueue(context);
-            if (queryQueue == null) {
-                // This logic is actually useless, because when could not find 
query queue, it will
-                // throw exception during workload group manager.
-                throw new UserException("could not find query queue");
-            }
-            queueToken = queryQueue.getToken();
-            if (!queueToken.waitSignal(this.queryOptions.getExecutionTimeout() 
* 1000)) {
-                LOG.error("query (id=" + DebugUtil.printId(queryId) + ") " + 
queueToken.getOfferResultDetail());
-                queryQueue.returnToken(queueToken);
-                throw new UserException(queueToken.getOfferResultDetail());
+        if (context != null) {
+            if (Config.enable_workload_group) {
+                
this.setTWorkloadGroups(context.getEnv().getWorkloadGroupMgr().getWorkloadGroup(context));
+                if (Config.enable_query_queue) {
+                    queryQueue = 
context.getEnv().getWorkloadGroupMgr().getWorkloadGroupQueryQueue(context);
+                    if (queryQueue == null) {
+                        // This logic is actually useless, because when could 
not find query queue, it will
+                        // throw exception during workload group manager.
+                        throw new UserException("could not find query queue");
+                    }
+                    queueToken = queryQueue.getToken();
+                    if 
(!queueToken.waitSignal(this.queryOptions.getExecutionTimeout() * 1000)) {
+                        LOG.error("query (id=" + DebugUtil.printId(queryId) + 
") " + queueToken.getOfferResultDetail());
+                        queryQueue.returnToken(queueToken);
+                        throw new 
UserException(queueToken.getOfferResultDetail());
+                    }
+                }
+            } else {
+                context.setWorkloadGroupName("");
             }
         }
         execInternal();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 94a9fa5712c..173a4c1d53f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -1525,11 +1525,6 @@ public class StmtExecutor {
             coordBase = new PointQueryExec(planner, analyzer);
         } else {
             coord = new Coordinator(context, analyzer, planner, 
context.getStatsErrorEstimator());
-            if (Config.enable_workload_group) {
-                
coord.setTWorkloadGroups(context.getEnv().getWorkloadGroupMgr().getWorkloadGroup(context));
-            } else {
-                context.setWorkloadGroupName("");
-            }
             QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
                     new QeProcessorImpl.QueryInfo(context, 
originStmt.originStmt, coord));
             profile.setExecutionProfile(coord.getExecutionProfile());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to