This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 0e262ba0e4e500af10d8a38d76b64476f5c1cb65
Author: TengJianPing <18241664+jackte...@users.noreply.github.com>
AuthorDate: Mon Apr 8 11:21:09 2024 +0800

    [improvement](spill) improve cancel of spill and improve log printing 
(#33229)
    
    * [improvement](spill) improve cancel of spill and improve log printing
    
    * fix
---
 .../exec/partitioned_aggregation_sink_operator.cpp |  41 +++---
 .../exec/partitioned_aggregation_sink_operator.h   |   9 +-
 .../partitioned_aggregation_source_operator.cpp    |  36 ++++--
 .../exec/partitioned_hash_join_probe_operator.cpp  |   4 +-
 .../exec/partitioned_hash_join_sink_operator.cpp   |   2 +-
 be/src/pipeline/exec/spill_sort_sink_operator.cpp  | 144 +++++++++++----------
 .../pipeline/exec/spill_sort_source_operator.cpp   |  54 +++++---
 be/src/pipeline/pipeline_x/dependency.cpp          |  18 ++-
 be/src/pipeline/pipeline_x/dependency.h            |   4 +-
 be/src/pipeline/pipeline_x/pipeline_x_task.cpp     |  13 +-
 be/src/service/internal_service.cpp                |  12 +-
 be/src/vec/spill/spill_stream.cpp                  |  14 +-
 be/src/vec/spill/spill_stream.h                    |   4 +-
 be/src/vec/spill/spill_stream_manager.cpp          |   2 -
 be/src/vec/spill/spill_writer.cpp                  |  11 +-
 be/src/vec/spill/spill_writer.h                    |   5 +-
 .../java/org/apache/doris/qe/SessionVariable.java  |   2 +-
 17 files changed, 223 insertions(+), 152 deletions(-)

diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index 3dea330c117..4ea531bade0 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -148,9 +148,6 @@ Status PartitionedAggSinkOperatorX::open(RuntimeState* 
state) {
     return _agg_sink_operator->open(state);
 }
 
-Status PartitionedAggSinkOperatorX::close(RuntimeState* state) {
-    return _agg_sink_operator->close(state);
-}
 Status PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, 
vectorized::Block* in_block,
                                          bool eos) {
     auto& local_state = get_local_state(state);
@@ -227,8 +224,9 @@ Status 
PartitionedAggSinkLocalState::setup_in_memory_agg_op(RuntimeState* state)
 }
 
 Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
-    LOG(INFO) << "agg node " << Base::_parent->id() << " revoke_memory"
-              << ", eos: " << _eos;
+    VLOG_DEBUG << "query " << print_id(state->query_id()) << " agg node " << 
Base::_parent->id()
+               << " revoke_memory"
+               << ", eos: " << _eos;
     RETURN_IF_ERROR(Base::_shared_state->sink_status);
     if (!_shared_state->is_spilled) {
         _shared_state->is_spilled = true;
@@ -258,28 +256,33 @@ Status 
PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
             [this, &parent, state, execution_context, submit_timer] {
                 auto execution_context_lock = execution_context.lock();
                 if (!execution_context_lock) {
-                    LOG(INFO) << "execution_context released, maybe query was 
cancelled.";
+                    LOG(INFO) << "query " << print_id(state->query_id())
+                              << " execution_context released, maybe query was 
cancelled.";
                     return Status::Cancelled("Cancelled");
                 }
                 
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
                 SCOPED_ATTACH_TASK(state);
                 SCOPED_TIMER(Base::_spill_timer);
                 Defer defer {[&]() {
-                    if (!Base::_shared_state->sink_status.ok()) {
-                        LOG(WARNING)
-                                << "agg node " << Base::_parent->id()
-                                << " revoke_memory error: " << 
Base::_shared_state->sink_status;
+                    if (!_shared_state->sink_status.ok() || 
state->is_cancelled()) {
+                        if (!_shared_state->sink_status.ok()) {
+                            LOG(WARNING)
+                                    << "query " << print_id(state->query_id()) 
<< " agg node "
+                                    << Base::_parent->id()
+                                    << " revoke_memory error: " << 
Base::_shared_state->sink_status;
+                        }
+                        _shared_state->close();
                     } else {
-                        LOG(INFO) << " agg node " << Base::_parent->id() << " 
revoke_memory finish"
-                                  << ", eos: " << _eos;
+                        VLOG_DEBUG << "query " << print_id(state->query_id()) 
<< " agg node "
+                                   << Base::_parent->id() << " revoke_memory 
finish"
+                                   << ", eos: " << _eos;
                     }
-                    {
-                        if (_eos) {
-                            Base::_dependency->set_ready_to_read();
-                            _finish_dependency->set_ready();
-                        } else {
-                            Base::_dependency->Dependency::set_ready();
-                        }
+
+                    if (_eos) {
+                        Base::_dependency->set_ready_to_read();
+                        _finish_dependency->set_ready();
+                    } else {
+                        Base::_dependency->Dependency::set_ready();
                     }
                 }};
                 auto* runtime_state = _runtime_state.get();
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
index d63f272092b..7ec582905d0 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
@@ -71,7 +71,8 @@ public:
         std::vector<TmpSpillInfo<typename HashTableType::key_type>> 
spill_infos(
                 Base::_shared_state->partition_count);
         auto& iter = 
Base::_shared_state->in_mem_shared_state->aggregate_data_container->iterator;
-        while (iter != 
Base::_shared_state->in_mem_shared_state->aggregate_data_container->end()) {
+        while (iter != 
Base::_shared_state->in_mem_shared_state->aggregate_data_container->end() &&
+               !state->is_cancelled()) {
             const auto& key = iter.template get_key<typename 
HashTableType::key_type>();
             auto partition_index = 
Base::_shared_state->get_partition_index(hash_table.hash(key));
             spill_infos[partition_index].keys_.emplace_back(key);
@@ -93,7 +94,7 @@ public:
             ++iter;
         }
         auto hash_null_key_data = hash_table.has_null_key_data();
-        for (int i = 0; i < Base::_shared_state->partition_count; ++i) {
+        for (int i = 0; i < Base::_shared_state->partition_count && 
!state->is_cancelled(); ++i) {
             auto spill_null_key_data =
                     (hash_null_key_data && i == 
Base::_shared_state->partition_count - 1);
             if (spill_infos[i].keys_.size() > 0 || spill_null_key_data) {
@@ -160,7 +161,7 @@ public:
                              SCOPED_TIMER(_spill_write_disk_timer);
                              Status status;
                              Defer defer {[&]() { 
spill_stream->end_spill(status); }};
-                             status = spill_stream->spill_block(block_, false);
+                             status = spill_stream->spill_block(state, block_, 
false);
                              return status;
                          });
         if (!status.ok()) {
@@ -320,8 +321,6 @@ public:
 
     Status open(RuntimeState* state) override;
 
-    Status close(RuntimeState* state) override;
-
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) 
override;
 
     DataDistribution required_data_distribution() const override {
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index 5680b75c87e..a2484cd6db4 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -24,7 +24,6 @@
 #include "common/status.h"
 #include "pipeline/exec/operator.h"
 #include "util/runtime_profile.h"
-#include "vec//utils/util.hpp"
 #include "vec/spill/spill_stream_manager.h"
 
 namespace doris::pipeline {
@@ -123,12 +122,19 @@ Status PartitionedAggSourceOperatorX::close(RuntimeState* 
state) {
 Status PartitionedAggSourceOperatorX::get_block(RuntimeState* state, 
vectorized::Block* block,
                                                 bool* eos) {
     auto& local_state = get_local_state(state);
+    Defer defer {[&]() {
+        if (!local_state._status.ok() || *eos) {
+            local_state._shared_state->close();
+        }
+    }};
+
     local_state.inc_running_big_mem_op_num(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     RETURN_IF_ERROR(local_state._status);
 
     if (local_state._shared_state->is_spilled) {
-        
RETURN_IF_ERROR(local_state.initiate_merge_spill_partition_agg_data(state));
+        local_state._status = 
local_state.initiate_merge_spill_partition_agg_data(state);
+        RETURN_IF_ERROR(local_state._status);
 
         /// When `_is_merging` is true means we are reading spilled data and 
merging the data into hash table.
         if (local_state._is_merging) {
@@ -138,7 +144,8 @@ Status 
PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized:
 
     // not spilled in sink or current partition still has data
     auto* runtime_state = local_state._runtime_state.get();
-    RETURN_IF_ERROR(_agg_source_operator->get_block(runtime_state, block, 
eos));
+    local_state._status = _agg_source_operator->get_block(runtime_state, 
block, eos);
+    RETURN_IF_ERROR(local_state._status);
     if (local_state._runtime_state) {
         auto* source_local_state =
                 
local_state._runtime_state->get_local_state(_agg_source_operator->operator_id());
@@ -190,7 +197,8 @@ Status 
PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
     }
 
     _is_merging = true;
-    LOG(INFO) << "agg node " << _parent->node_id() << " merge spilled agg 
data";
+    VLOG_DEBUG << "query " << print_id(state->query_id()) << " agg node " << 
_parent->node_id()
+               << " merge spilled agg data";
 
     
RETURN_IF_ERROR(Base::_shared_state->in_mem_shared_state->reset_hash_table());
     _dependency->Dependency::block();
@@ -206,7 +214,8 @@ Status 
PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
                     [this, state, execution_context, submit_timer] {
                         auto execution_context_lock = execution_context.lock();
                         if (!execution_context_lock) {
-                            LOG(INFO) << "execution_context released, maybe 
query was cancelled.";
+                            LOG(INFO) << "query " << 
print_id(state->query_id())
+                                      << " execution_context released, maybe 
query was cancelled.";
                             // FIXME: return status is meaningless?
                             return Status::Cancelled("Cancelled");
                         }
@@ -214,12 +223,17 @@ Status 
PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
                         
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
                         SCOPED_ATTACH_TASK(state);
                         Defer defer {[&]() {
-                            if (!_status.ok()) {
-                                LOG(WARNING) << "agg node " << 
_parent->node_id()
-                                             << " merge spilled agg data 
error: " << _status;
+                            if (!_status.ok() || state->is_cancelled()) {
+                                if (!_status.ok()) {
+                                    LOG(WARNING) << "query " << 
print_id(state->query_id())
+                                                 << " agg node " << 
_parent->node_id()
+                                                 << " merge spilled agg data 
error: " << _status;
+                                }
+                                _shared_state->close();
                             } else if 
(_shared_state->spill_partitions.empty()) {
-                                LOG(INFO) << "agg node " << _parent->node_id()
-                                          << " merge spilled agg data finish";
+                                VLOG_DEBUG << "query " << 
print_id(state->query_id())
+                                           << " agg node " << 
_parent->node_id()
+                                           << " merge spilled agg data finish";
                             }
                             
Base::_shared_state->in_mem_shared_state->aggregate_data_container
                                     ->init_once();
@@ -237,7 +251,7 @@ Status 
PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
                                         Base::_spill_read_bytes, 
Base::_spill_read_wait_io_timer);
                                 vectorized::Block block;
                                 bool eos = false;
-                                while (!eos) {
+                                while (!eos && !state->is_cancelled()) {
                                     {
                                         
SCOPED_TIMER(Base::_spill_recover_time);
                                         _status = 
stream->read_next_block_sync(&block, &eos);
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index a8eb926243f..0f837f8bbda 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -195,7 +195,7 @@ Status 
PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state
                 if (_spill_status_ok) {
                     auto build_block = mutable_block->to_block();
                     DCHECK_EQ(mutable_block->rows(), 0);
-                    auto st = build_spilling_stream->spill_block(build_block, 
false);
+                    auto st = build_spilling_stream->spill_block(state, 
build_block, false);
                     if (!st.ok()) {
                         std::unique_lock<std::mutex> lock(_spill_lock);
                         _spill_status_ok = false;
@@ -262,7 +262,7 @@ Status 
PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
                         auto block = std::move(blocks.back());
                         blocks.pop_back();
                         if (_spill_status_ok) {
-                            auto st = spilling_stream->spill_block(block, 
false);
+                            auto st = spilling_stream->spill_block(state, 
block, false);
                             if (!st.ok()) {
                                 std::unique_lock<std::mutex> lock(_spill_lock);
                                 _spill_status_ok = false;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index 4fd399464c1..370606b1904 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -231,7 +231,7 @@ void PartitionedHashJoinSinkLocalState::_spill_to_disk(
     if (_spill_status_ok) {
         auto block = partitioned_block->to_block();
         partitioned_block = 
vectorized::MutableBlock::create_unique(block.clone_empty());
-        auto st = spilling_stream->spill_block(block, false);
+        auto st = spilling_stream->spill_block(state(), block, false);
         if (!st.ok()) {
             _spill_status_ok = false;
             std::lock_guard<std::mutex> l(_spill_status_lock);
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp 
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index 523ff2cfaaf..78c5c9f51e9 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -203,8 +203,9 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* 
state) {
         profile()->add_info_string("Spilled", "true");
     }
 
-    LOG(INFO) << "sort node " << Base::_parent->id() << " revoke_memory"
-              << ", eos: " << _eos;
+    VLOG_DEBUG << "query " << print_id(state->query_id()) << " sort node " << 
Base::_parent->id()
+               << " revoke_memory"
+               << ", eos: " << _eos;
     RETURN_IF_ERROR(Base::_shared_state->sink_status);
 
     auto status = 
ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
@@ -234,73 +235,78 @@ Status 
SpillSortSinkLocalState::revoke_memory(RuntimeState* state) {
     MonotonicStopWatch submit_timer;
     submit_timer.start();
 
-    status =
-            ExecEnv::GetInstance()
-                    ->spill_stream_mgr()
-                    
->get_spill_io_thread_pool(_spilling_stream->get_spill_root_dir())
-                    ->submit_func([this, state, &parent, execution_context, 
submit_timer] {
-                        auto execution_context_lock = execution_context.lock();
-                        if (!execution_context_lock) {
-                            LOG(INFO) << "execution_context released, maybe 
query was cancelled.";
-                            return Status::OK();
-                        }
-
-                        
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
-                        SCOPED_ATTACH_TASK(state);
-                        Defer defer {[&]() {
-                            if (!_shared_state->sink_status.ok()) {
-                                LOG(WARNING)
-                                        << "sort node " << _parent->id()
-                                        << " revoke memory error: " << 
_shared_state->sink_status;
-                            } else {
-                                LOG(INFO)
-                                        << "sort node " << _parent->id() << " 
revoke memory finish";
-                            }
-
-                            
_spilling_stream->end_spill(_shared_state->sink_status);
-                            if (!_shared_state->sink_status.ok()) {
-                                _shared_state->clear();
-                            }
-
-                            _spilling_stream.reset();
-                            if (_eos) {
-                                _dependency->set_ready_to_read();
-                                _finish_dependency->set_ready();
-                            } else {
-                                _dependency->Dependency::set_ready();
-                            }
-                        }};
-
-                        _shared_state->sink_status =
-                                
parent._sort_sink_operator->prepare_for_spill(_runtime_state.get());
-                        RETURN_IF_ERROR(_shared_state->sink_status);
-
-                        auto* sink_local_state = 
_runtime_state->get_sink_local_state();
-                        update_profile(sink_local_state->profile());
-
-                        bool eos = false;
-                        vectorized::Block block;
-                        while (!eos && !state->is_cancelled()) {
-                            {
-                                SCOPED_TIMER(_spill_merge_sort_timer);
-                                _shared_state->sink_status =
-                                        
parent._sort_sink_operator->merge_sort_read_for_spill(
-                                                _runtime_state.get(), &block,
-                                                
_shared_state->spill_block_batch_row_count, &eos);
-                            }
-                            RETURN_IF_ERROR(_shared_state->sink_status);
-                            {
-                                SCOPED_TIMER(Base::_spill_timer);
-                                _shared_state->sink_status =
-                                        _spilling_stream->spill_block(block, 
eos);
-                            }
-                            RETURN_IF_ERROR(_shared_state->sink_status);
-                            block.clear_column_data();
-                        }
-                        
parent._sort_sink_operator->reset(_runtime_state.get());
-
-                        return Status::OK();
-                    });
+    status = ExecEnv::GetInstance()
+                     ->spill_stream_mgr()
+                     
->get_spill_io_thread_pool(_spilling_stream->get_spill_root_dir())
+                     ->submit_func([this, state, &parent, execution_context, 
submit_timer] {
+                         auto execution_context_lock = 
execution_context.lock();
+                         if (!execution_context_lock) {
+                             LOG(INFO) << "query " << 
print_id(state->query_id())
+                                       << " execution_context released, maybe 
query was cancelled.";
+                             return Status::OK();
+                         }
+
+                         
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
+                         SCOPED_ATTACH_TASK(state);
+                         Defer defer {[&]() {
+                             if (!_shared_state->sink_status.ok() || 
state->is_cancelled()) {
+                                 if (!_shared_state->sink_status.ok()) {
+                                     LOG(WARNING) << "query " << 
print_id(state->query_id())
+                                                  << " sort node " << 
_parent->id()
+                                                  << " revoke memory error: "
+                                                  << 
_shared_state->sink_status;
+                                 }
+                                 _shared_state->close();
+                             } else {
+                                 VLOG_DEBUG << "query " << 
print_id(state->query_id())
+                                            << " sort node " << _parent->id()
+                                            << " revoke memory finish";
+                             }
+
+                             
_spilling_stream->end_spill(_shared_state->sink_status);
+                             if (!_shared_state->sink_status.ok()) {
+                                 _shared_state->close();
+                             }
+
+                             _spilling_stream.reset();
+                             if (_eos) {
+                                 _dependency->set_ready_to_read();
+                                 _finish_dependency->set_ready();
+                             } else {
+                                 _dependency->Dependency::set_ready();
+                             }
+                         }};
+
+                         _shared_state->sink_status = 
parent._sort_sink_operator->prepare_for_spill(
+                                 _runtime_state.get());
+                         RETURN_IF_ERROR(_shared_state->sink_status);
+
+                         auto* sink_local_state = 
_runtime_state->get_sink_local_state();
+                         update_profile(sink_local_state->profile());
+
+                         bool eos = false;
+                         vectorized::Block block;
+                         while (!eos && !state->is_cancelled()) {
+                             {
+                                 SCOPED_TIMER(_spill_merge_sort_timer);
+                                 _shared_state->sink_status =
+                                         
parent._sort_sink_operator->merge_sort_read_for_spill(
+                                                 _runtime_state.get(), &block,
+                                                 
_shared_state->spill_block_batch_row_count, &eos);
+                             }
+                             RETURN_IF_ERROR(_shared_state->sink_status);
+                             {
+                                 SCOPED_TIMER(Base::_spill_timer);
+                                 _shared_state->sink_status =
+                                         _spilling_stream->spill_block(state, 
block, eos);
+                             }
+                             RETURN_IF_ERROR(_shared_state->sink_status);
+                             block.clear_column_data();
+                         }
+                         
parent._sort_sink_operator->reset(_runtime_state.get());
+
+                         return Status::OK();
+                     });
     if (!status.ok()) {
         _spilling_stream->end_spill(status);
 
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp 
b/be/src/pipeline/exec/spill_sort_source_operator.cpp
index c53c057088c..5edb0daf7fc 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp
@@ -72,7 +72,8 @@ int SpillSortLocalState::_calc_spill_blocks_to_merge() const {
 }
 Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* 
state) {
     auto& parent = Base::_parent->template cast<Parent>();
-    LOG(INFO) << "sort node " << _parent->node_id() << " merge spill data";
+    VLOG_DEBUG << "query " << print_id(state->query_id()) << " sort node " << 
_parent->node_id()
+               << " merge spill data";
     _dependency->Dependency::block();
 
     Status status;
@@ -91,7 +92,8 @@ Status 
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
     auto spill_func = [this, state, &parent, execution_context, submit_timer] {
         auto execution_context_lock = execution_context.lock();
         if (!execution_context_lock) {
-            LOG(INFO) << "execution_context released, maybe query was 
cancelled.";
+            LOG(INFO) << "query " << print_id(state->query_id())
+                      << " execution_context released, maybe query was 
cancelled.";
             return Status::OK();
         }
 
@@ -99,21 +101,30 @@ Status 
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
         SCOPED_TIMER(_spill_merge_sort_timer);
         SCOPED_ATTACH_TASK(state);
         Defer defer {[&]() {
-            if (!_status.ok()) {
-                LOG(WARNING) << "sort node " << _parent->node_id()
-                             << " merge spill data error: " << _status;
+            if (!_status.ok() || state->is_cancelled()) {
+                if (!_status.ok()) {
+                    LOG(WARNING) << "query " << print_id(state->query_id()) << 
" sort node "
+                                 << _parent->node_id() << " merge spill data 
error: " << _status;
+                }
+                _shared_state->close();
+                for (auto& stream : _current_merging_streams) {
+                    
(void)ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
+                }
+                _current_merging_streams.clear();
             } else {
-                LOG(INFO) << "sort node " << _parent->node_id() << " merge 
spill data finish";
+                VLOG_DEBUG << "query " << print_id(state->query_id()) << " 
sort node "
+                           << _parent->node_id() << " merge spill data finish";
             }
             _dependency->Dependency::set_ready();
         }};
         vectorized::Block merge_sorted_block;
         vectorized::SpillStreamSPtr tmp_stream;
-        while (true) {
+        while (!state->is_cancelled()) {
             int max_stream_count = _calc_spill_blocks_to_merge();
-            LOG(INFO) << "sort node " << _parent->id() << " merge spill 
streams, streams count: "
-                      << _shared_state->sorted_streams.size()
-                      << ", curren merge max stream count: " << 
max_stream_count;
+            VLOG_DEBUG << "query " << print_id(state->query_id()) << " sort 
node " << _parent->id()
+                       << " merge spill streams, streams count: "
+                       << _shared_state->sorted_streams.size()
+                       << ", curren merge max stream count: " << 
max_stream_count;
             {
                 SCOPED_TIMER(Base::_spill_recover_time);
                 _status = _create_intermediate_merger(
@@ -150,7 +161,7 @@ Status 
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
                         _status = _merger->get_next(&merge_sorted_block, &eos);
                     }
                     RETURN_IF_ERROR(_status);
-                    _status = tmp_stream->spill_block(merge_sorted_block, eos);
+                    _status = tmp_stream->spill_block(state, 
merge_sorted_block, eos);
                     RETURN_IF_ERROR(_status);
                 }
             }
@@ -159,7 +170,6 @@ Status 
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
             }
             _current_merging_streams.clear();
         }
-        DCHECK(false);
         return Status::OK();
     };
     return 
ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool()->submit_func(
@@ -242,6 +252,15 @@ Status SpillSortSourceOperatorX::close(RuntimeState* 
state) {
 Status SpillSortSourceOperatorX::get_block(RuntimeState* state, 
vectorized::Block* block,
                                            bool* eos) {
     auto& local_state = get_local_state(state);
+    Defer defer {[&]() {
+        if (!local_state._status.ok() || *eos) {
+            local_state._shared_state->close();
+            for (auto& stream : local_state._current_merging_streams) {
+                
(void)ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
+            }
+            local_state._current_merging_streams.clear();
+        }
+    }};
     if (local_state.Base::_shared_state->enable_spill) {
         local_state.inc_running_big_mem_op_num(state);
     }
@@ -250,13 +269,16 @@ Status SpillSortSourceOperatorX::get_block(RuntimeState* 
state, vectorized::Bloc
 
     if (local_state.Base::_shared_state->enable_spill && 
local_state._shared_state->is_spilled) {
         if (!local_state._merger) {
-            return local_state.initiate_merge_sort_spill_streams(state);
+            local_state._status = 
local_state.initiate_merge_sort_spill_streams(state);
+            return local_state._status;
         } else {
-            RETURN_IF_ERROR(local_state._merger->get_next(block, eos));
+            local_state._status = local_state._merger->get_next(block, eos);
+            RETURN_IF_ERROR(local_state._status);
         }
     } else {
-        RETURN_IF_ERROR(
-                
_sort_source_operator->get_block(local_state._runtime_state.get(), block, eos));
+        local_state._status =
+                
_sort_source_operator->get_block(local_state._runtime_state.get(), block, eos);
+        RETURN_IF_ERROR(local_state._status);
     }
     local_state.reached_limit(block, eos);
     return Status::OK();
diff --git a/be/src/pipeline/pipeline_x/dependency.cpp 
b/be/src/pipeline/pipeline_x/dependency.cpp
index 73f089f4c1b..3b415892c93 100644
--- a/be/src/pipeline/pipeline_x/dependency.cpp
+++ b/be/src/pipeline/pipeline_x/dependency.cpp
@@ -231,11 +231,27 @@ void AggSpillPartition::close() {
 }
 
 void PartitionedAggSharedState::close() {
+    // need to use CAS instead of only `if (!is_closed)` statement,
+    // to avoid concurrent entry of close() both pass the if statement
+    bool false_close = false;
+    if (!is_closed.compare_exchange_strong(false_close, true)) {
+        return;
+    }
+    DCHECK(!false_close && is_closed);
     for (auto partition : spill_partitions) {
         partition->close();
     }
+    spill_partitions.clear();
 }
-void SpillSortSharedState::clear() {
+
+void SpillSortSharedState::close() {
+    // need to use CAS instead of only `if (!is_closed)` statement,
+    // to avoid concurrent entry of close() both pass the if statement
+    bool false_close = false;
+    if (!is_closed.compare_exchange_strong(false_close, true)) {
+        return;
+    }
+    DCHECK(!false_close && is_closed);
     for (auto& stream : sorted_streams) {
         
(void)ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
     }
diff --git a/be/src/pipeline/pipeline_x/dependency.h 
b/be/src/pipeline/pipeline_x/dependency.h
index 8c0fb232b1b..a2c858e4c5c 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -398,6 +398,7 @@ struct PartitionedAggSharedState : public BasicSharedState,
     size_t max_partition_index;
     Status sink_status;
     bool is_spilled = false;
+    std::atomic_bool is_closed = false;
     std::deque<std::shared_ptr<AggSpillPartition>> spill_partitions;
 
     size_t get_partition_index(size_t hash_value) const {
@@ -467,11 +468,12 @@ struct SpillSortSharedState : public BasicSharedState,
             LOG(INFO) << "spill sort block batch row count: " << 
spill_block_batch_row_count;
         }
     }
-    void clear();
+    void close();
 
     SortSharedState* in_mem_shared_state = nullptr;
     bool enable_spill = false;
     bool is_spilled = false;
+    std::atomic_bool is_closed = false;
     Status sink_status;
     std::shared_ptr<BasicSharedState> in_mem_shared_state_sptr;
 
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index 4bca88ec931..2627b56fe7d 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -312,7 +312,7 @@ bool PipelineXTask::should_revoke_memory(RuntimeState* 
state, int64_t revocable_
     wg->check_mem_used(&is_wg_mem_low_water_mark, &is_wg_mem_high_water_mark);
     if (is_wg_mem_high_water_mark) {
         if (revocable_mem_bytes > min_revocable_mem_bytes) {
-            LOG_EVERY_N(INFO, 5) << "revoke memory, hight water mark";
+            LOG_EVERY_N(INFO, 10) << "revoke memory, hight water mark";
             return true;
         }
         return false;
@@ -332,11 +332,12 @@ bool PipelineXTask::should_revoke_memory(RuntimeState* 
state, int64_t revocable_
             mem_limit_of_op = query_weighted_limit / big_memory_operator_num;
         }
 
-        LOG_EVERY_N(INFO, 5) << "revoke memory, low water mark, 
revocable_mem_bytes: "
-                             << PrettyPrinter::print_bytes(revocable_mem_bytes)
-                             << ", mem_limit_of_op: " << 
PrettyPrinter::print_bytes(mem_limit_of_op)
-                             << ", min_revocable_mem_bytes: "
-                             << 
PrettyPrinter::print_bytes(min_revocable_mem_bytes);
+        LOG_EVERY_N(INFO, 10) << "revoke memory, low water mark, 
revocable_mem_bytes: "
+                              << 
PrettyPrinter::print_bytes(revocable_mem_bytes)
+                              << ", mem_limit_of_op: "
+                              << PrettyPrinter::print_bytes(mem_limit_of_op)
+                              << ", min_revocable_mem_bytes: "
+                              << 
PrettyPrinter::print_bytes(min_revocable_mem_bytes);
         return (revocable_mem_bytes > mem_limit_of_op ||
                 revocable_mem_bytes > min_revocable_mem_bytes);
     } else {
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 1de6c9b8051..20ab10022d1 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -580,19 +580,23 @@ void 
PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController*
         Status st = Status::OK();
 
         const bool has_cancel_reason = request->has_cancel_reason();
-        LOG(INFO) << fmt::format("Cancel instance {}, reason: {}", 
print_id(tid),
-                                 has_cancel_reason
-                                         ? 
PPlanFragmentCancelReason_Name(request->cancel_reason())
-                                         : "INTERNAL_ERROR");
         if (request->has_fragment_id()) {
             TUniqueId query_id;
             query_id.__set_hi(request->query_id().hi());
             query_id.__set_lo(request->query_id().lo());
+            LOG(INFO) << fmt::format(
+                    "Cancel query {}, reason: {}", print_id(query_id),
+                    has_cancel_reason ? 
PPlanFragmentCancelReason_Name(request->cancel_reason())
+                                      : "INTERNAL_ERROR");
             _exec_env->fragment_mgr()->cancel_fragment(
                     query_id, request->fragment_id(),
                     has_cancel_reason ? request->cancel_reason()
                                       : 
PPlanFragmentCancelReason::INTERNAL_ERROR);
         } else {
+            LOG(INFO) << fmt::format(
+                    "Cancel instance {}, reason: {}", print_id(tid),
+                    has_cancel_reason ? 
PPlanFragmentCancelReason_Name(request->cancel_reason())
+                                      : "INTERNAL_ERROR");
             _exec_env->fragment_mgr()->cancel_instance(
                     tid, has_cancel_reason ? request->cancel_reason()
                                            : 
PPlanFragmentCancelReason::INTERNAL_ERROR);
diff --git a/be/src/vec/spill/spill_stream.cpp 
b/be/src/vec/spill/spill_stream.cpp
index 843a9fc9658..f5b6fea096d 100644
--- a/be/src/vec/spill/spill_stream.cpp
+++ b/be/src/vec/spill/spill_stream.cpp
@@ -46,6 +46,16 @@ SpillStream::SpillStream(RuntimeState* state, int64_t 
stream_id, SpillDataDir* d
             
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(data_dir->path());
 }
 
+SpillStream::~SpillStream() {
+    bool exists = false;
+    auto status = io::global_local_filesystem()->exists(spill_dir_, &exists);
+    if (status.ok() && exists) {
+        auto gc_dir = fmt::format("{}/{}/{}", get_data_dir()->path(), 
SPILL_GC_DIR_PREFIX,
+                                  
std::filesystem::path(spill_dir_).filename().string());
+        (void)io::global_local_filesystem()->rename(spill_dir_, gc_dir);
+    }
+}
+
 Status SpillStream::prepare() {
     writer_ = std::make_unique<SpillWriter>(stream_id_, batch_rows_, 
data_dir_, spill_dir_);
 
@@ -103,9 +113,9 @@ Status SpillStream::wait_spill() {
     return Status::OK();
 }
 
-Status SpillStream::spill_block(const Block& block, bool eof) {
+Status SpillStream::spill_block(RuntimeState* state, const Block& block, bool 
eof) {
     size_t written_bytes = 0;
-    RETURN_IF_ERROR(writer_->write(block, written_bytes));
+    RETURN_IF_ERROR(writer_->write(state, block, written_bytes));
     if (eof) {
         RETURN_IF_ERROR(writer_->close());
         writer_.reset();
diff --git a/be/src/vec/spill/spill_stream.h b/be/src/vec/spill/spill_stream.h
index afec4734d8a..6b5166f2652 100644
--- a/be/src/vec/spill/spill_stream.h
+++ b/be/src/vec/spill/spill_stream.h
@@ -40,6 +40,8 @@ public:
                 std::string spill_dir, size_t batch_rows, size_t batch_bytes,
                 RuntimeProfile* profile);
 
+    ~SpillStream();
+
     int64_t id() const { return stream_id_; }
 
     SpillDataDir* get_data_dir() const { return data_dir_; }
@@ -51,7 +53,7 @@ public:
 
     Status prepare_spill();
 
-    Status spill_block(const Block& block, bool eof);
+    Status spill_block(RuntimeState* state, const Block& block, bool eof);
 
     void end_spill(const Status& status);
 
diff --git a/be/src/vec/spill/spill_stream_manager.cpp 
b/be/src/vec/spill/spill_stream_manager.cpp
index ecbee7fb858..4abca15082c 100644
--- a/be/src/vec/spill/spill_stream_manager.cpp
+++ b/be/src/vec/spill/spill_stream_manager.cpp
@@ -198,8 +198,6 @@ Status 
SpillStreamManager::register_spill_stream(RuntimeState* state, SpillStrea
 }
 
 void SpillStreamManager::delete_spill_stream(SpillStreamSPtr stream) {
-    stream->close();
-
     auto gc_dir = fmt::format("{}/{}/{}", stream->get_data_dir()->path(), 
SPILL_GC_DIR_PREFIX,
                               
std::filesystem::path(stream->get_spill_dir()).filename().string());
     (void)io::global_local_filesystem()->rename(stream->get_spill_dir(), 
gc_dir);
diff --git a/be/src/vec/spill/spill_writer.cpp 
b/be/src/vec/spill/spill_writer.cpp
index 965d36e40c0..cf8c23e6c22 100644
--- a/be/src/vec/spill/spill_writer.cpp
+++ b/be/src/vec/spill/spill_writer.cpp
@@ -23,6 +23,7 @@
 #include "io/fs/local_file_system.h"
 #include "io/fs/local_file_writer.h"
 #include "runtime/exec_env.h"
+#include "runtime/runtime_state.h"
 #include "runtime/thread_context.h"
 #include "vec/spill/spill_stream_manager.h"
 
@@ -36,12 +37,6 @@ Status SpillWriter::open() {
     return Status::OK();
 }
 
-SpillWriter::~SpillWriter() {
-    if (!closed_) {
-        (void)Status::Error<ErrorCode::INTERNAL_ERROR>("spill writer not 
closed correctly");
-    }
-}
-
 Status SpillWriter::close() {
     if (closed_ || !file_writer_) {
         return Status::OK();
@@ -68,7 +63,7 @@ Status SpillWriter::close() {
     return Status::OK();
 }
 
-Status SpillWriter::write(const Block& block, size_t& written_bytes) {
+Status SpillWriter::write(RuntimeState* state, const Block& block, size_t& 
written_bytes) {
     written_bytes = 0;
     DCHECK(file_writer_);
     auto rows = block.rows();
@@ -79,7 +74,7 @@ Status SpillWriter::write(const Block& block, size_t& 
written_bytes) {
         auto tmp_block = block.clone_empty();
         const auto& src_data = block.get_columns_with_type_and_name();
 
-        for (size_t row_idx = 0; row_idx < rows;) {
+        for (size_t row_idx = 0; row_idx < rows && !state->is_cancelled();) {
             tmp_block.clear_column_data();
 
             auto& dst_data = tmp_block.get_columns_with_type_and_name();
diff --git a/be/src/vec/spill/spill_writer.h b/be/src/vec/spill/spill_writer.h
index 45317e8b8cf..865b7aeb002 100644
--- a/be/src/vec/spill/spill_writer.h
+++ b/be/src/vec/spill/spill_writer.h
@@ -25,6 +25,7 @@
 #include "util/runtime_profile.h"
 #include "vec/core/block.h"
 namespace doris {
+class RuntimeState;
 
 namespace vectorized {
 class SpillDataDir;
@@ -35,13 +36,11 @@ public:
         file_path_ = dir + "/" + std::to_string(file_index_);
     }
 
-    ~SpillWriter();
-
     Status open();
 
     Status close();
 
-    Status write(const Block& block, size_t& written_bytes);
+    Status write(RuntimeState* state, const Block& block, size_t& 
written_bytes);
 
     int64_t get_id() const { return stream_id_; }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 35601cf4bde..72df816bf62 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -1726,7 +1726,7 @@ public class SessionVariable implements Serializable, 
Writable {
     public static final int MAX_EXTERNAL_AGG_PARTITION_BITS = 20;
     @VariableMgr.VarAttr(name = EXTERNAL_AGG_PARTITION_BITS,
             checker = "checkExternalAggPartitionBits", fuzzy = true)
-    public int externalAggPartitionBits = 8; // means that the hash table will 
be partitioned into 256 blocks.
+    public int externalAggPartitionBits = 5; // means that the hash table will 
be partitioned into 32 blocks.
 
     public boolean isEnableJoinSpill() {
         return enableJoinSpill;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to