This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit c28ced1ebb8d76a0e8e0a8bdbf1b6bb68d0d5796 Author: wangbo <wan...@apache.org> AuthorDate: Wed Jan 31 16:29:28 2024 +0800 [Feature](executor)Insert select limited by WorkloadGroup #30610 --- be/src/olap/delta_writer.cpp | 2 +- be/src/olap/delta_writer_v2.cpp | 6 +++++ be/src/olap/memtable_flush_executor.cpp | 12 +++++++++ be/src/olap/memtable_flush_executor.h | 3 +++ be/src/olap/memtable_writer.cpp | 11 +++++--- be/src/olap/memtable_writer.h | 2 +- be/src/vec/sink/writer/async_result_writer.cpp | 30 ++++++++++++++------- .../main/java/org/apache/doris/qe/Coordinator.java | 31 +++++++++++++--------- .../java/org/apache/doris/qe/StmtExecutor.java | 5 ---- 9 files changed, 71 insertions(+), 31 deletions(-) diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index bc1152fcd1f..c1bf7cd5381 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -105,7 +105,7 @@ Status BaseDeltaWriter::init() { RETURN_IF_ERROR(_rowset_builder->init()); RETURN_IF_ERROR(_memtable_writer->init( _rowset_builder->rowset_writer(), _rowset_builder->tablet_schema(), - _rowset_builder->get_partial_update_info(), + _rowset_builder->get_partial_update_info(), nullptr, _rowset_builder->tablet()->enable_unique_key_merge_on_write())); ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer); _is_init = true; diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp index db2c7ef80ce..e97db641a80 100644 --- a/be/src/olap/delta_writer_v2.cpp +++ b/be/src/olap/delta_writer_v2.cpp @@ -51,6 +51,7 @@ #include "olap/tablet_manager.h" #include "olap/tablet_schema.h" #include "runtime/exec_env.h" +#include "runtime/query_context.h" #include "service/backend_options.h" #include "util/brpc_client_cache.h" #include "util/debug_points.h" @@ -134,7 +135,12 @@ Status DeltaWriterV2::init() { _rowset_writer = std::make_shared<BetaRowsetWriterV2>(_streams); RETURN_IF_ERROR(_rowset_writer->init(context)); + ThreadPool* wg_thread_pool_ptr = nullptr; + if (_state->get_query_ctx()) { + wg_thread_pool_ptr = _state->get_query_ctx()->get_non_pipe_exec_thread_pool(); + } RETURN_IF_ERROR(_memtable_writer->init(_rowset_writer, _tablet_schema, _partial_update_info, + wg_thread_pool_ptr, _streams[0]->enable_unique_mow(_req.index_id))); ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer); _is_init = true; diff --git a/be/src/olap/memtable_flush_executor.cpp b/be/src/olap/memtable_flush_executor.cpp index e1fa68a6a53..4d0af7496d9 100644 --- a/be/src/olap/memtable_flush_executor.cpp +++ b/be/src/olap/memtable_flush_executor.cpp @@ -238,6 +238,18 @@ Status MemTableFlushExecutor::create_flush_token(std::unique_ptr<FlushToken>& fl } } +Status MemTableFlushExecutor::create_flush_token(std::unique_ptr<FlushToken>& flush_token, + RowsetWriter* rowset_writer, + ThreadPool* wg_flush_pool_ptr) { + if (rowset_writer->type() == BETA_ROWSET) { + flush_token = std::make_unique<FlushToken>(wg_flush_pool_ptr); + } else { + return Status::InternalError<false>("not support alpha rowset load now."); + } + flush_token->set_rowset_writer(rowset_writer); + return Status::OK(); +} + void MemTableFlushExecutor::_register_metrics() { REGISTER_HOOK_METRIC(flush_thread_pool_queue_size, [this]() { return _flush_pool->get_queue_size(); }); diff --git a/be/src/olap/memtable_flush_executor.h b/be/src/olap/memtable_flush_executor.h index 80983baa66f..9896b8382da 100644 --- a/be/src/olap/memtable_flush_executor.h +++ b/be/src/olap/memtable_flush_executor.h @@ -128,6 +128,9 @@ public: Status create_flush_token(std::unique_ptr<FlushToken>& flush_token, RowsetWriter* rowset_writer, bool is_high_priority); + Status create_flush_token(std::unique_ptr<FlushToken>& flush_token, RowsetWriter* rowset_writer, + ThreadPool* wg_flush_pool_ptr); + private: void _register_metrics(); static void _deregister_metrics(); diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp index d3ff02333a1..4fb24cb01cd 100644 --- a/be/src/olap/memtable_writer.cpp +++ b/be/src/olap/memtable_writer.cpp @@ -65,7 +65,7 @@ MemTableWriter::~MemTableWriter() { Status MemTableWriter::init(std::shared_ptr<RowsetWriter> rowset_writer, TabletSchemaSPtr tablet_schema, std::shared_ptr<PartialUpdateInfo> partial_update_info, - bool unique_key_mow) { + ThreadPool* wg_flush_pool_ptr, bool unique_key_mow) { _rowset_writer = rowset_writer; _tablet_schema = tablet_schema; _unique_key_mow = unique_key_mow; @@ -76,8 +76,13 @@ Status MemTableWriter::init(std::shared_ptr<RowsetWriter> rowset_writer, // create flush handler // by assigning segment_id to memtable before submiting to flush executor, // we can make sure same keys sort in the same order in all replicas. - RETURN_IF_ERROR(StorageEngine::instance()->memtable_flush_executor()->create_flush_token( - _flush_token, _rowset_writer.get(), _req.is_high_priority)); + if (wg_flush_pool_ptr) { + RETURN_IF_ERROR(StorageEngine::instance()->memtable_flush_executor()->create_flush_token( + _flush_token, _rowset_writer.get(), wg_flush_pool_ptr)); + } else { + RETURN_IF_ERROR(StorageEngine::instance()->memtable_flush_executor()->create_flush_token( + _flush_token, _rowset_writer.get(), _req.is_high_priority)); + } _is_init = true; return Status::OK(); diff --git a/be/src/olap/memtable_writer.h b/be/src/olap/memtable_writer.h index a2687d9402c..6b48421cb6e 100644 --- a/be/src/olap/memtable_writer.h +++ b/be/src/olap/memtable_writer.h @@ -69,7 +69,7 @@ public: Status init(std::shared_ptr<RowsetWriter> rowset_writer, TabletSchemaSPtr tablet_schema, std::shared_ptr<PartialUpdateInfo> partial_update_info, - bool unique_key_mow = false); + ThreadPool* wg_flush_pool_ptr, bool unique_key_mow = false); Status write(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs, bool is_append = false); diff --git a/be/src/vec/sink/writer/async_result_writer.cpp b/be/src/vec/sink/writer/async_result_writer.cpp index bb8e3ea77e4..f7f4c40f010 100644 --- a/be/src/vec/sink/writer/async_result_writer.cpp +++ b/be/src/vec/sink/writer/async_result_writer.cpp @@ -92,15 +92,27 @@ void AsyncResultWriter::start_writer(RuntimeState* state, RuntimeProfile* profil // This is a async thread, should lock the task ctx, to make sure runtimestate and profile // not deconstructed before the thread exit. auto task_ctx = state->get_task_execution_context(); - static_cast<void>(ExecEnv::GetInstance()->fragment_mgr()->get_thread_pool()->submit_func( - [this, state, profile, task_ctx]() { - auto task_lock = task_ctx.lock(); - if (task_lock == nullptr) { - _writer_thread_closed = true; - return; - } - this->process_block(state, profile); - })); + if (state->get_query_ctx() && state->get_query_ctx()->get_non_pipe_exec_thread_pool()) { + ThreadPool* pool_ptr = state->get_query_ctx()->get_non_pipe_exec_thread_pool(); + static_cast<void>(pool_ptr->submit_func([this, state, profile, task_ctx]() { + auto task_lock = task_ctx.lock(); + if (task_lock == nullptr) { + _writer_thread_closed = true; + return; + } + this->process_block(state, profile); + })); + } else { + static_cast<void>(ExecEnv::GetInstance()->fragment_mgr()->get_thread_pool()->submit_func( + [this, state, profile, task_ctx]() { + auto task_lock = task_ctx.lock(); + if (task_lock == nullptr) { + _writer_thread_closed = true; + return; + } + this->process_block(state, profile); + })); + } } void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profile) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index cca9014a715..57dfde62f6e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -599,18 +599,25 @@ public class Coordinator implements CoordInterface { @Override public void exec() throws Exception { // LoadTask does not have context, not controlled by queue now - if (Config.enable_workload_group && Config.enable_query_queue && context != null) { - queryQueue = context.getEnv().getWorkloadGroupMgr().getWorkloadGroupQueryQueue(context); - if (queryQueue == null) { - // This logic is actually useless, because when could not find query queue, it will - // throw exception during workload group manager. - throw new UserException("could not find query queue"); - } - queueToken = queryQueue.getToken(); - if (!queueToken.waitSignal(this.queryOptions.getExecutionTimeout() * 1000)) { - LOG.error("query (id=" + DebugUtil.printId(queryId) + ") " + queueToken.getOfferResultDetail()); - queryQueue.returnToken(queueToken); - throw new UserException(queueToken.getOfferResultDetail()); + if (context != null) { + if (Config.enable_workload_group) { + this.setTWorkloadGroups(context.getEnv().getWorkloadGroupMgr().getWorkloadGroup(context)); + if (Config.enable_query_queue) { + queryQueue = context.getEnv().getWorkloadGroupMgr().getWorkloadGroupQueryQueue(context); + if (queryQueue == null) { + // This logic is actually useless, because when could not find query queue, it will + // throw exception during workload group manager. + throw new UserException("could not find query queue"); + } + queueToken = queryQueue.getToken(); + if (!queueToken.waitSignal(this.queryOptions.getExecutionTimeout() * 1000)) { + LOG.error("query (id=" + DebugUtil.printId(queryId) + ") " + queueToken.getOfferResultDetail()); + queryQueue.returnToken(queueToken); + throw new UserException(queueToken.getOfferResultDetail()); + } + } + } else { + context.setWorkloadGroupName(""); } } execInternal(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 94a9fa5712c..173a4c1d53f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -1525,11 +1525,6 @@ public class StmtExecutor { coordBase = new PointQueryExec(planner, analyzer); } else { coord = new Coordinator(context, analyzer, planner, context.getStatsErrorEstimator()); - if (Config.enable_workload_group) { - coord.setTWorkloadGroups(context.getEnv().getWorkloadGroupMgr().getWorkloadGroup(context)); - } else { - context.setWorkloadGroupName(""); - } QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord)); profile.setExecutionProfile(coord.getExecutionProfile()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org