This is an automated email from the ASF dual-hosted git repository.

mrhhsg pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git

commit d89033d7846058e94d0fccea734e3c9a7668063d
Author: Jerry Hu <mrh...@gmail.com>
AuthorDate: Mon Sep 9 17:28:19 2024 +0800

    [opt](spill) avoid query blocked when reserving failed (#40550)
    
    1. Trigger revoking proactively when revocable size is big and reserving
    failed.
    2. Set the brpc timeout as the query timeout.
    
    ## Proposed changes
    
    Issue Number: close #xxx
    
    <!--Describe your changes.-->
---
 be/src/pipeline/common/agg_utils.h                 | 10 +++++----
 be/src/pipeline/exec/exchange_sink_buffer.cpp      |  5 ++++-
 be/src/pipeline/exec/exchange_sink_buffer.h        |  5 +++--
 be/src/pipeline/exec/exchange_sink_operator.cpp    |  4 ++--
 be/src/pipeline/exec/hashjoin_build_sink.cpp       | 13 ++++++-----
 .../exec/partitioned_aggregation_sink_operator.cpp | 25 +++++++++++-----------
 .../exec/partitioned_aggregation_sink_operator.h   | 21 +++++++++++++++---
 be/src/pipeline/pipeline_task.cpp                  | 16 ++++++++++++--
 be/src/vec/sink/vdata_stream_sender.cpp            |  7 +++++-
 9 files changed, 74 insertions(+), 32 deletions(-)

diff --git a/be/src/pipeline/common/agg_utils.h 
b/be/src/pipeline/common/agg_utils.h
index e0435954b8b..d67ebb9fdf7 100644
--- a/be/src/pipeline/common/agg_utils.h
+++ b/be/src/pipeline/common/agg_utils.h
@@ -275,15 +275,17 @@ public:
         using IteratorBase<ConstIterator, true>::IteratorBase;
     };
 
-    ConstIterator begin() const { return ConstIterator(this, 0); }
+    ConstIterator begin() const { return {this, 0}; }
 
     ConstIterator cbegin() const { return begin(); }
 
-    Iterator begin() { return Iterator(this, 0); }
+    Iterator begin() { return {this, 0}; }
 
-    ConstIterator end() const { return ConstIterator(this, _total_count); }
+    ConstIterator end() const { return {this, _total_count}; }
     ConstIterator cend() const { return end(); }
-    Iterator end() { return Iterator(this, _total_count); }
+    Iterator end() { return {this, _total_count}; }
+
+    [[nodiscard]] uint32_t total_count() const { return _total_count; }
 
     void init_once() {
         if (_inited) {
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp 
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 9a4764a24b9..5027d7c10de 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -87,7 +87,7 @@ void BroadcastPBlockHolderMemLimiter::release(const 
BroadcastPBlockHolder& holde
 namespace pipeline {
 
 ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId 
dest_node_id, int send_id,
-                                       int be_number, RuntimeState* state,
+                                       PlanNodeId node_id, int be_number, 
RuntimeState* state,
                                        ExchangeSinkLocalState* parent)
         : HasTaskExecutionCtx(state),
           _queue_capacity(0),
@@ -95,6 +95,7 @@ ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, 
PlanNodeId dest_node_
           _query_id(query_id),
           _dest_node_id(dest_node_id),
           _sender_id(send_id),
+          _node_id(node_id),
           _be_number(be_number),
           _state(state),
           _context(state->get_query_ctx()),
@@ -408,6 +409,8 @@ void ExchangeSinkBuffer::_ended(InstanceLoId id) {
 }
 
 void ExchangeSinkBuffer::_failed(InstanceLoId id, const std::string& err) {
+    LOG(INFO) << "send rpc failed, instance id: " << id << ", _dest_node_id: " 
<< _dest_node_id
+              << ",_sender_id: " << _sender_id << ", node id: " << _node_id << 
", err: " << err;
     _is_finishing = true;
     _context->cancel(Status::Cancelled(err));
     std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h 
b/be/src/pipeline/exec/exchange_sink_buffer.h
index 2d30a492a0d..d43d275cba6 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -177,8 +177,8 @@ struct ExchangeRpcContext {
 // Each ExchangeSinkOperator have one ExchangeSinkBuffer
 class ExchangeSinkBuffer final : public HasTaskExecutionCtx {
 public:
-    ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, int 
send_id, int be_number,
-                       RuntimeState* state, ExchangeSinkLocalState* parent);
+    ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, int 
send_id, PlanNodeId node_id,
+                       int be_number, RuntimeState* state, 
ExchangeSinkLocalState* parent);
     ~ExchangeSinkBuffer() override = default;
     void register_sink(TUniqueId);
 
@@ -235,6 +235,7 @@ private:
     PlanNodeId _dest_node_id;
     // Sender instance id, unique within a fragment. StreamSender save the 
variable
     int _sender_id;
+    PlanNodeId _node_id;
     int _be_number;
     std::atomic<int64_t> _rpc_count = 0;
     RuntimeState* _state = nullptr;
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 59d2d5f0551..9e17a76d272 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -132,8 +132,8 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
     PUniqueId id;
     id.set_hi(_state->query_id().hi);
     id.set_lo(_state->query_id().lo);
-    _sink_buffer = std::make_unique<ExchangeSinkBuffer>(id, p._dest_node_id, 
_sender_id,
-                                                        _state->be_number(), 
state, this);
+    _sink_buffer = std::make_unique<ExchangeSinkBuffer>(
+            id, p._dest_node_id, _sender_id, _parent->node_id(), 
_state->be_number(), state, this);
 
     register_channels(_sink_buffer.get());
     _queue_dependency = Dependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 9bf6c422af4..ae724549f71 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -123,11 +123,6 @@ size_t 
HashJoinBuildSinkLocalState::get_reserve_mem_size(RuntimeState* state) {
 
     if (!_build_side_mutable_block.empty()) {
         size_to_reserve += _build_side_mutable_block.allocated_bytes();
-
-        // estimating for serialized key
-        for (auto id : _build_col_ids) {
-            size_to_reserve += 
_build_side_mutable_block.get_column_by_position(id)->byte_size();
-        }
     }
 
     const size_t rows = _build_side_mutable_block.rows() + state->batch_size();
@@ -143,6 +138,14 @@ size_t 
HashJoinBuildSinkLocalState::get_reserve_mem_size(RuntimeState* state) {
     }
     size_to_reserve += _evaluate_mem_usage;
 
+    if (size_to_reserve > 2L * 1024 * 1024 * 1024) [[unlikely]] {
+        LOG(INFO) << "**** too big reserve size: " << size_to_reserve << ", 
rows: " << rows
+                  << ", bucket_size: " << bucket_size
+                  << ", mutable block size: " << 
_build_side_mutable_block.allocated_bytes()
+                  << ", mutable block cols: " << 
_build_side_mutable_block.columns()
+                  << ", _build_col_ids.size: " << _build_col_ids.size();
+    }
+
     return size_to_reserve;
 }
 
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index 5c26cfb6b97..17eeb8039df 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -241,6 +241,7 @@ size_t 
PartitionedAggSinkOperatorX::get_reserve_mem_size(RuntimeState* state) {
 }
 
 Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
+    const auto size_to_revoke = _parent->revocable_mem_size(state);
     VLOG_DEBUG << "query " << print_id(state->query_id()) << " agg node "
                << Base::_parent->node_id()
                << " revoke_memory, size: " << 
_parent->revocable_mem_size(state)
@@ -278,7 +279,7 @@ Status 
PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
     state->get_query_ctx()->increase_revoking_tasks_count();
     auto spill_runnable = std::make_shared<SpillRunnable>(
             state, _shared_state->shared_from_this(),
-            [this, &parent, state, query_id, submit_timer] {
+            [this, &parent, state, query_id, size_to_revoke, submit_timer] {
                 
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_sink::revoke_memory_cancel", {
                     auto st = Status::InternalError(
                             "fault_inject partitioned_agg_sink "
@@ -312,17 +313,17 @@ Status 
PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
                 }};
                 auto* runtime_state = _runtime_state.get();
                 auto* agg_data = 
parent._agg_sink_operator->get_agg_data(runtime_state);
-                Base::_shared_state->sink_status =
-                        std::visit(vectorized::Overload {
-                                           [&](std::monostate& arg) -> Status {
-                                               return 
Status::InternalError("Unit hash table");
-                                           },
-                                           [&](auto& agg_method) -> Status {
-                                               auto& hash_table = 
*agg_method.hash_table;
-                                               
RETURN_IF_CATCH_EXCEPTION(return _spill_hash_table(
-                                                       state, agg_method, 
hash_table, _eos));
-                                           }},
-                                   agg_data->method_variant);
+                Base::_shared_state->sink_status = std::visit(
+                        vectorized::Overload {
+                                [&](std::monostate& arg) -> Status {
+                                    return Status::InternalError("Unit hash 
table");
+                                },
+                                [&](auto& agg_method) -> Status {
+                                    auto& hash_table = *agg_method.hash_table;
+                                    RETURN_IF_CATCH_EXCEPTION(return 
_spill_hash_table(
+                                            state, agg_method, hash_table, 
size_to_revoke, _eos));
+                                }},
+                        agg_data->method_variant);
                 RETURN_IF_ERROR(Base::_shared_state->sink_status);
                 Base::_shared_state->sink_status =
                         
parent._agg_sink_operator->reset_hash_table(runtime_state);
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
index 4a6abb32ed4..65c1cefa63b 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
@@ -23,6 +23,7 @@
 #include "pipeline/exec/operator.h"
 #include "vec/exprs/vectorized_agg_fn.h"
 #include "vec/exprs/vexpr.h"
+#include "vec/spill/spill_stream.h"
 #include "vec/spill/spill_stream_manager.h"
 
 namespace doris::pipeline {
@@ -57,7 +58,7 @@ public:
     };
     template <typename HashTableCtxType, typename HashTableType>
     Status _spill_hash_table(RuntimeState* state, HashTableCtxType& context,
-                             HashTableType& hash_table, bool eos) {
+                             HashTableType& hash_table, const size_t 
size_to_revoke, bool eos) {
         Status status;
         Defer defer {[&]() {
             if (!status.ok()) {
@@ -69,8 +70,22 @@ public:
 
         
Base::_shared_state->in_mem_shared_state->aggregate_data_container->init_once();
 
-        static int spill_batch_rows = 4096;
-        int row_count = 0;
+        const auto total_rows =
+                
Base::_shared_state->in_mem_shared_state->aggregate_data_container->total_count();
+
+        const size_t size_to_revoke_ = std::max<size_t>(size_to_revoke, 1);
+
+        // `spill_batch_rows` will be between 4k and 1M
+        // and each block to spill will not be larger than 
32MB(`MAX_SPILL_WRITE_BATCH_MEM`)
+        const auto spill_batch_rows = std::min<size_t>(
+                1024 * 1024,
+                std::max<size_t>(4096, 
vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM *
+                                               total_rows / size_to_revoke_));
+
+        VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: " 
<< _parent->node_id()
+                   << ", spill_batch_rows: " << spill_batch_rows << ", total 
rows: " << total_rows
+                   << ", size_to_revoke: " << size_to_revoke;
+        size_t row_count = 0;
 
         std::vector<TmpSpillInfo<typename HashTableType::key_type>> 
spill_infos(
                 Base::_shared_state->partition_count);
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index e27d73be62e..1bd74bcab45 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -375,7 +375,8 @@ Status PipelineTask::execute(bool* eos) {
             _root->reset_reserve_mem_size(_state);
             DCHECK_EQ(_root->get_reserve_mem_size(_state), 0);
 
-            if (reserve_size > 0) {
+            auto workload_group = _state->get_query_ctx()->workload_group();
+            if (workload_group && reserve_size > 0) {
                 auto st = thread_context()->try_reserve_memory(reserve_size);
                 if (!st.ok()) {
                     VLOG_DEBUG << "query: " << print_id(query_id)
@@ -384,7 +385,18 @@ Status PipelineTask::execute(bool* eos) {
                                << ", sink name: " << _sink->get_name()
                                << ", node id: " << _sink->node_id() << " 
failed: " << st.to_string()
                                << ", debug info: " << 
GlobalMemoryArbitrator::process_mem_log_str();
-                    {
+                    bool is_high_wartermark = false;
+                    bool is_low_wartermark = false;
+                    workload_group->check_mem_used(&is_low_wartermark, 
&is_high_wartermark);
+                    if (is_low_wartermark || is_high_wartermark) {
+                        /// The larger reserved memory size is likely due to a 
larger available revocable size.
+                        /// If the available memory for revoking is large 
enough, here trigger revoking proactively.
+                        if (_sink->revocable_mem_size(_state) > 512L * 1024 * 
1024) {
+                            LOG(INFO) << "query: " << print_id(query_id)
+                                      << " has big memory to revoke.";
+                            RETURN_IF_ERROR(_sink->revoke_memory(_state));
+                        }
+
                         _memory_sufficient_dependency->block();
                         
_state->get_query_ctx()->get_pipe_exec_scheduler()->add_paused_task(this);
                         continue;
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index f18467fbad9..b6bfc7862dc 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -115,7 +115,12 @@ Status Channel<Parent>::open(RuntimeState* state) {
     _brpc_request->set_sender_id(_parent->sender_id());
     _brpc_request->set_be_number(_be_number);
 
-    _brpc_timeout_ms = std::min(3600, state->execution_timeout()) * 1000;
+    const auto& query_options = state->query_options();
+    if (query_options.__isset.query_timeout) {
+        _brpc_timeout_ms = query_options.query_timeout * 1000;
+    } else {
+        _brpc_timeout_ms = std::min(3600, state->execution_timeout()) * 1000;
+    }
 
     _serializer.set_is_local(_is_local);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to