This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 0e262ba0e4e500af10d8a38d76b64476f5c1cb65 Author: TengJianPing <18241664+jackte...@users.noreply.github.com> AuthorDate: Mon Apr 8 11:21:09 2024 +0800 [improvement](spill) improve cancel of spill and improve log printing (#33229) * [improvement](spill) improve cancel of spill and improve log printing * fix --- .../exec/partitioned_aggregation_sink_operator.cpp | 41 +++--- .../exec/partitioned_aggregation_sink_operator.h | 9 +- .../partitioned_aggregation_source_operator.cpp | 36 ++++-- .../exec/partitioned_hash_join_probe_operator.cpp | 4 +- .../exec/partitioned_hash_join_sink_operator.cpp | 2 +- be/src/pipeline/exec/spill_sort_sink_operator.cpp | 144 +++++++++++---------- .../pipeline/exec/spill_sort_source_operator.cpp | 54 +++++--- be/src/pipeline/pipeline_x/dependency.cpp | 18 ++- be/src/pipeline/pipeline_x/dependency.h | 4 +- be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 13 +- be/src/service/internal_service.cpp | 12 +- be/src/vec/spill/spill_stream.cpp | 14 +- be/src/vec/spill/spill_stream.h | 4 +- be/src/vec/spill/spill_stream_manager.cpp | 2 - be/src/vec/spill/spill_writer.cpp | 11 +- be/src/vec/spill/spill_writer.h | 5 +- .../java/org/apache/doris/qe/SessionVariable.java | 2 +- 17 files changed, 223 insertions(+), 152 deletions(-) diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp index 3dea330c117..4ea531bade0 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp @@ -148,9 +148,6 @@ Status PartitionedAggSinkOperatorX::open(RuntimeState* state) { return _agg_sink_operator->open(state); } -Status PartitionedAggSinkOperatorX::close(RuntimeState* state) { - return _agg_sink_operator->close(state); -} Status PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in_block, bool eos) { auto& local_state = get_local_state(state); @@ -227,8 +224,9 @@ Status PartitionedAggSinkLocalState::setup_in_memory_agg_op(RuntimeState* state) } Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) { - LOG(INFO) << "agg node " << Base::_parent->id() << " revoke_memory" - << ", eos: " << _eos; + VLOG_DEBUG << "query " << print_id(state->query_id()) << " agg node " << Base::_parent->id() + << " revoke_memory" + << ", eos: " << _eos; RETURN_IF_ERROR(Base::_shared_state->sink_status); if (!_shared_state->is_spilled) { _shared_state->is_spilled = true; @@ -258,28 +256,33 @@ Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) { [this, &parent, state, execution_context, submit_timer] { auto execution_context_lock = execution_context.lock(); if (!execution_context_lock) { - LOG(INFO) << "execution_context released, maybe query was cancelled."; + LOG(INFO) << "query " << print_id(state->query_id()) + << " execution_context released, maybe query was cancelled."; return Status::Cancelled("Cancelled"); } _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); SCOPED_ATTACH_TASK(state); SCOPED_TIMER(Base::_spill_timer); Defer defer {[&]() { - if (!Base::_shared_state->sink_status.ok()) { - LOG(WARNING) - << "agg node " << Base::_parent->id() - << " revoke_memory error: " << Base::_shared_state->sink_status; + if (!_shared_state->sink_status.ok() || state->is_cancelled()) { + if (!_shared_state->sink_status.ok()) { + LOG(WARNING) + << "query " << print_id(state->query_id()) << " agg node " + << Base::_parent->id() + << " revoke_memory error: " << Base::_shared_state->sink_status; + } + _shared_state->close(); } else { - LOG(INFO) << " agg node " << Base::_parent->id() << " revoke_memory finish" - << ", eos: " << _eos; + VLOG_DEBUG << "query " << print_id(state->query_id()) << " agg node " + << Base::_parent->id() << " revoke_memory finish" + << ", eos: " << _eos; } - { - if (_eos) { - Base::_dependency->set_ready_to_read(); - _finish_dependency->set_ready(); - } else { - Base::_dependency->Dependency::set_ready(); - } + + if (_eos) { + Base::_dependency->set_ready_to_read(); + _finish_dependency->set_ready(); + } else { + Base::_dependency->Dependency::set_ready(); } }}; auto* runtime_state = _runtime_state.get(); diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h index d63f272092b..7ec582905d0 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h @@ -71,7 +71,8 @@ public: std::vector<TmpSpillInfo<typename HashTableType::key_type>> spill_infos( Base::_shared_state->partition_count); auto& iter = Base::_shared_state->in_mem_shared_state->aggregate_data_container->iterator; - while (iter != Base::_shared_state->in_mem_shared_state->aggregate_data_container->end()) { + while (iter != Base::_shared_state->in_mem_shared_state->aggregate_data_container->end() && + !state->is_cancelled()) { const auto& key = iter.template get_key<typename HashTableType::key_type>(); auto partition_index = Base::_shared_state->get_partition_index(hash_table.hash(key)); spill_infos[partition_index].keys_.emplace_back(key); @@ -93,7 +94,7 @@ public: ++iter; } auto hash_null_key_data = hash_table.has_null_key_data(); - for (int i = 0; i < Base::_shared_state->partition_count; ++i) { + for (int i = 0; i < Base::_shared_state->partition_count && !state->is_cancelled(); ++i) { auto spill_null_key_data = (hash_null_key_data && i == Base::_shared_state->partition_count - 1); if (spill_infos[i].keys_.size() > 0 || spill_null_key_data) { @@ -160,7 +161,7 @@ public: SCOPED_TIMER(_spill_write_disk_timer); Status status; Defer defer {[&]() { spill_stream->end_spill(status); }}; - status = spill_stream->spill_block(block_, false); + status = spill_stream->spill_block(state, block_, false); return status; }); if (!status.ok()) { @@ -320,8 +321,6 @@ public: Status open(RuntimeState* state) override; - Status close(RuntimeState* state) override; - Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; DataDistribution required_data_distribution() const override { diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp index 5680b75c87e..a2484cd6db4 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp @@ -24,7 +24,6 @@ #include "common/status.h" #include "pipeline/exec/operator.h" #include "util/runtime_profile.h" -#include "vec//utils/util.hpp" #include "vec/spill/spill_stream_manager.h" namespace doris::pipeline { @@ -123,12 +122,19 @@ Status PartitionedAggSourceOperatorX::close(RuntimeState* state) { Status PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, bool* eos) { auto& local_state = get_local_state(state); + Defer defer {[&]() { + if (!local_state._status.ok() || *eos) { + local_state._shared_state->close(); + } + }}; + local_state.inc_running_big_mem_op_num(state); SCOPED_TIMER(local_state.exec_time_counter()); RETURN_IF_ERROR(local_state._status); if (local_state._shared_state->is_spilled) { - RETURN_IF_ERROR(local_state.initiate_merge_spill_partition_agg_data(state)); + local_state._status = local_state.initiate_merge_spill_partition_agg_data(state); + RETURN_IF_ERROR(local_state._status); /// When `_is_merging` is true means we are reading spilled data and merging the data into hash table. if (local_state._is_merging) { @@ -138,7 +144,8 @@ Status PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized: // not spilled in sink or current partition still has data auto* runtime_state = local_state._runtime_state.get(); - RETURN_IF_ERROR(_agg_source_operator->get_block(runtime_state, block, eos)); + local_state._status = _agg_source_operator->get_block(runtime_state, block, eos); + RETURN_IF_ERROR(local_state._status); if (local_state._runtime_state) { auto* source_local_state = local_state._runtime_state->get_local_state(_agg_source_operator->operator_id()); @@ -190,7 +197,8 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime } _is_merging = true; - LOG(INFO) << "agg node " << _parent->node_id() << " merge spilled agg data"; + VLOG_DEBUG << "query " << print_id(state->query_id()) << " agg node " << _parent->node_id() + << " merge spilled agg data"; RETURN_IF_ERROR(Base::_shared_state->in_mem_shared_state->reset_hash_table()); _dependency->Dependency::block(); @@ -206,7 +214,8 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime [this, state, execution_context, submit_timer] { auto execution_context_lock = execution_context.lock(); if (!execution_context_lock) { - LOG(INFO) << "execution_context released, maybe query was cancelled."; + LOG(INFO) << "query " << print_id(state->query_id()) + << " execution_context released, maybe query was cancelled."; // FIXME: return status is meaningless? return Status::Cancelled("Cancelled"); } @@ -214,12 +223,17 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); SCOPED_ATTACH_TASK(state); Defer defer {[&]() { - if (!_status.ok()) { - LOG(WARNING) << "agg node " << _parent->node_id() - << " merge spilled agg data error: " << _status; + if (!_status.ok() || state->is_cancelled()) { + if (!_status.ok()) { + LOG(WARNING) << "query " << print_id(state->query_id()) + << " agg node " << _parent->node_id() + << " merge spilled agg data error: " << _status; + } + _shared_state->close(); } else if (_shared_state->spill_partitions.empty()) { - LOG(INFO) << "agg node " << _parent->node_id() - << " merge spilled agg data finish"; + VLOG_DEBUG << "query " << print_id(state->query_id()) + << " agg node " << _parent->node_id() + << " merge spilled agg data finish"; } Base::_shared_state->in_mem_shared_state->aggregate_data_container ->init_once(); @@ -237,7 +251,7 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime Base::_spill_read_bytes, Base::_spill_read_wait_io_timer); vectorized::Block block; bool eos = false; - while (!eos) { + while (!eos && !state->is_cancelled()) { { SCOPED_TIMER(Base::_spill_recover_time); _status = stream->read_next_block_sync(&block, &eos); diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index a8eb926243f..0f837f8bbda 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -195,7 +195,7 @@ Status PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state if (_spill_status_ok) { auto build_block = mutable_block->to_block(); DCHECK_EQ(mutable_block->rows(), 0); - auto st = build_spilling_stream->spill_block(build_block, false); + auto st = build_spilling_stream->spill_block(state, build_block, false); if (!st.ok()) { std::unique_lock<std::mutex> lock(_spill_lock); _spill_status_ok = false; @@ -262,7 +262,7 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat auto block = std::move(blocks.back()); blocks.pop_back(); if (_spill_status_ok) { - auto st = spilling_stream->spill_block(block, false); + auto st = spilling_stream->spill_block(state, block, false); if (!st.ok()) { std::unique_lock<std::mutex> lock(_spill_lock); _spill_status_ok = false; diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index 4fd399464c1..370606b1904 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -231,7 +231,7 @@ void PartitionedHashJoinSinkLocalState::_spill_to_disk( if (_spill_status_ok) { auto block = partitioned_block->to_block(); partitioned_block = vectorized::MutableBlock::create_unique(block.clone_empty()); - auto st = spilling_stream->spill_block(block, false); + auto st = spilling_stream->spill_block(state(), block, false); if (!st.ok()) { _spill_status_ok = false; std::lock_guard<std::mutex> l(_spill_status_lock); diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index 523ff2cfaaf..78c5c9f51e9 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -203,8 +203,9 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) { profile()->add_info_string("Spilled", "true"); } - LOG(INFO) << "sort node " << Base::_parent->id() << " revoke_memory" - << ", eos: " << _eos; + VLOG_DEBUG << "query " << print_id(state->query_id()) << " sort node " << Base::_parent->id() + << " revoke_memory" + << ", eos: " << _eos; RETURN_IF_ERROR(Base::_shared_state->sink_status); auto status = ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream( @@ -234,73 +235,78 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) { MonotonicStopWatch submit_timer; submit_timer.start(); - status = - ExecEnv::GetInstance() - ->spill_stream_mgr() - ->get_spill_io_thread_pool(_spilling_stream->get_spill_root_dir()) - ->submit_func([this, state, &parent, execution_context, submit_timer] { - auto execution_context_lock = execution_context.lock(); - if (!execution_context_lock) { - LOG(INFO) << "execution_context released, maybe query was cancelled."; - return Status::OK(); - } - - _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); - SCOPED_ATTACH_TASK(state); - Defer defer {[&]() { - if (!_shared_state->sink_status.ok()) { - LOG(WARNING) - << "sort node " << _parent->id() - << " revoke memory error: " << _shared_state->sink_status; - } else { - LOG(INFO) - << "sort node " << _parent->id() << " revoke memory finish"; - } - - _spilling_stream->end_spill(_shared_state->sink_status); - if (!_shared_state->sink_status.ok()) { - _shared_state->clear(); - } - - _spilling_stream.reset(); - if (_eos) { - _dependency->set_ready_to_read(); - _finish_dependency->set_ready(); - } else { - _dependency->Dependency::set_ready(); - } - }}; - - _shared_state->sink_status = - parent._sort_sink_operator->prepare_for_spill(_runtime_state.get()); - RETURN_IF_ERROR(_shared_state->sink_status); - - auto* sink_local_state = _runtime_state->get_sink_local_state(); - update_profile(sink_local_state->profile()); - - bool eos = false; - vectorized::Block block; - while (!eos && !state->is_cancelled()) { - { - SCOPED_TIMER(_spill_merge_sort_timer); - _shared_state->sink_status = - parent._sort_sink_operator->merge_sort_read_for_spill( - _runtime_state.get(), &block, - _shared_state->spill_block_batch_row_count, &eos); - } - RETURN_IF_ERROR(_shared_state->sink_status); - { - SCOPED_TIMER(Base::_spill_timer); - _shared_state->sink_status = - _spilling_stream->spill_block(block, eos); - } - RETURN_IF_ERROR(_shared_state->sink_status); - block.clear_column_data(); - } - parent._sort_sink_operator->reset(_runtime_state.get()); - - return Status::OK(); - }); + status = ExecEnv::GetInstance() + ->spill_stream_mgr() + ->get_spill_io_thread_pool(_spilling_stream->get_spill_root_dir()) + ->submit_func([this, state, &parent, execution_context, submit_timer] { + auto execution_context_lock = execution_context.lock(); + if (!execution_context_lock) { + LOG(INFO) << "query " << print_id(state->query_id()) + << " execution_context released, maybe query was cancelled."; + return Status::OK(); + } + + _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); + SCOPED_ATTACH_TASK(state); + Defer defer {[&]() { + if (!_shared_state->sink_status.ok() || state->is_cancelled()) { + if (!_shared_state->sink_status.ok()) { + LOG(WARNING) << "query " << print_id(state->query_id()) + << " sort node " << _parent->id() + << " revoke memory error: " + << _shared_state->sink_status; + } + _shared_state->close(); + } else { + VLOG_DEBUG << "query " << print_id(state->query_id()) + << " sort node " << _parent->id() + << " revoke memory finish"; + } + + _spilling_stream->end_spill(_shared_state->sink_status); + if (!_shared_state->sink_status.ok()) { + _shared_state->close(); + } + + _spilling_stream.reset(); + if (_eos) { + _dependency->set_ready_to_read(); + _finish_dependency->set_ready(); + } else { + _dependency->Dependency::set_ready(); + } + }}; + + _shared_state->sink_status = parent._sort_sink_operator->prepare_for_spill( + _runtime_state.get()); + RETURN_IF_ERROR(_shared_state->sink_status); + + auto* sink_local_state = _runtime_state->get_sink_local_state(); + update_profile(sink_local_state->profile()); + + bool eos = false; + vectorized::Block block; + while (!eos && !state->is_cancelled()) { + { + SCOPED_TIMER(_spill_merge_sort_timer); + _shared_state->sink_status = + parent._sort_sink_operator->merge_sort_read_for_spill( + _runtime_state.get(), &block, + _shared_state->spill_block_batch_row_count, &eos); + } + RETURN_IF_ERROR(_shared_state->sink_status); + { + SCOPED_TIMER(Base::_spill_timer); + _shared_state->sink_status = + _spilling_stream->spill_block(state, block, eos); + } + RETURN_IF_ERROR(_shared_state->sink_status); + block.clear_column_data(); + } + parent._sort_sink_operator->reset(_runtime_state.get()); + + return Status::OK(); + }); if (!status.ok()) { _spilling_stream->end_spill(status); diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp b/be/src/pipeline/exec/spill_sort_source_operator.cpp index c53c057088c..5edb0daf7fc 100644 --- a/be/src/pipeline/exec/spill_sort_source_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp @@ -72,7 +72,8 @@ int SpillSortLocalState::_calc_spill_blocks_to_merge() const { } Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* state) { auto& parent = Base::_parent->template cast<Parent>(); - LOG(INFO) << "sort node " << _parent->node_id() << " merge spill data"; + VLOG_DEBUG << "query " << print_id(state->query_id()) << " sort node " << _parent->node_id() + << " merge spill data"; _dependency->Dependency::block(); Status status; @@ -91,7 +92,8 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat auto spill_func = [this, state, &parent, execution_context, submit_timer] { auto execution_context_lock = execution_context.lock(); if (!execution_context_lock) { - LOG(INFO) << "execution_context released, maybe query was cancelled."; + LOG(INFO) << "query " << print_id(state->query_id()) + << " execution_context released, maybe query was cancelled."; return Status::OK(); } @@ -99,21 +101,30 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat SCOPED_TIMER(_spill_merge_sort_timer); SCOPED_ATTACH_TASK(state); Defer defer {[&]() { - if (!_status.ok()) { - LOG(WARNING) << "sort node " << _parent->node_id() - << " merge spill data error: " << _status; + if (!_status.ok() || state->is_cancelled()) { + if (!_status.ok()) { + LOG(WARNING) << "query " << print_id(state->query_id()) << " sort node " + << _parent->node_id() << " merge spill data error: " << _status; + } + _shared_state->close(); + for (auto& stream : _current_merging_streams) { + (void)ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream); + } + _current_merging_streams.clear(); } else { - LOG(INFO) << "sort node " << _parent->node_id() << " merge spill data finish"; + VLOG_DEBUG << "query " << print_id(state->query_id()) << " sort node " + << _parent->node_id() << " merge spill data finish"; } _dependency->Dependency::set_ready(); }}; vectorized::Block merge_sorted_block; vectorized::SpillStreamSPtr tmp_stream; - while (true) { + while (!state->is_cancelled()) { int max_stream_count = _calc_spill_blocks_to_merge(); - LOG(INFO) << "sort node " << _parent->id() << " merge spill streams, streams count: " - << _shared_state->sorted_streams.size() - << ", curren merge max stream count: " << max_stream_count; + VLOG_DEBUG << "query " << print_id(state->query_id()) << " sort node " << _parent->id() + << " merge spill streams, streams count: " + << _shared_state->sorted_streams.size() + << ", curren merge max stream count: " << max_stream_count; { SCOPED_TIMER(Base::_spill_recover_time); _status = _create_intermediate_merger( @@ -150,7 +161,7 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat _status = _merger->get_next(&merge_sorted_block, &eos); } RETURN_IF_ERROR(_status); - _status = tmp_stream->spill_block(merge_sorted_block, eos); + _status = tmp_stream->spill_block(state, merge_sorted_block, eos); RETURN_IF_ERROR(_status); } } @@ -159,7 +170,6 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat } _current_merging_streams.clear(); } - DCHECK(false); return Status::OK(); }; return ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool()->submit_func( @@ -242,6 +252,15 @@ Status SpillSortSourceOperatorX::close(RuntimeState* state) { Status SpillSortSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, bool* eos) { auto& local_state = get_local_state(state); + Defer defer {[&]() { + if (!local_state._status.ok() || *eos) { + local_state._shared_state->close(); + for (auto& stream : local_state._current_merging_streams) { + (void)ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream); + } + local_state._current_merging_streams.clear(); + } + }}; if (local_state.Base::_shared_state->enable_spill) { local_state.inc_running_big_mem_op_num(state); } @@ -250,13 +269,16 @@ Status SpillSortSourceOperatorX::get_block(RuntimeState* state, vectorized::Bloc if (local_state.Base::_shared_state->enable_spill && local_state._shared_state->is_spilled) { if (!local_state._merger) { - return local_state.initiate_merge_sort_spill_streams(state); + local_state._status = local_state.initiate_merge_sort_spill_streams(state); + return local_state._status; } else { - RETURN_IF_ERROR(local_state._merger->get_next(block, eos)); + local_state._status = local_state._merger->get_next(block, eos); + RETURN_IF_ERROR(local_state._status); } } else { - RETURN_IF_ERROR( - _sort_source_operator->get_block(local_state._runtime_state.get(), block, eos)); + local_state._status = + _sort_source_operator->get_block(local_state._runtime_state.get(), block, eos); + RETURN_IF_ERROR(local_state._status); } local_state.reached_limit(block, eos); return Status::OK(); diff --git a/be/src/pipeline/pipeline_x/dependency.cpp b/be/src/pipeline/pipeline_x/dependency.cpp index 73f089f4c1b..3b415892c93 100644 --- a/be/src/pipeline/pipeline_x/dependency.cpp +++ b/be/src/pipeline/pipeline_x/dependency.cpp @@ -231,11 +231,27 @@ void AggSpillPartition::close() { } void PartitionedAggSharedState::close() { + // need to use CAS instead of only `if (!is_closed)` statement, + // to avoid concurrent entry of close() both pass the if statement + bool false_close = false; + if (!is_closed.compare_exchange_strong(false_close, true)) { + return; + } + DCHECK(!false_close && is_closed); for (auto partition : spill_partitions) { partition->close(); } + spill_partitions.clear(); } -void SpillSortSharedState::clear() { + +void SpillSortSharedState::close() { + // need to use CAS instead of only `if (!is_closed)` statement, + // to avoid concurrent entry of close() both pass the if statement + bool false_close = false; + if (!is_closed.compare_exchange_strong(false_close, true)) { + return; + } + DCHECK(!false_close && is_closed); for (auto& stream : sorted_streams) { (void)ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream); } diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index 8c0fb232b1b..a2c858e4c5c 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -398,6 +398,7 @@ struct PartitionedAggSharedState : public BasicSharedState, size_t max_partition_index; Status sink_status; bool is_spilled = false; + std::atomic_bool is_closed = false; std::deque<std::shared_ptr<AggSpillPartition>> spill_partitions; size_t get_partition_index(size_t hash_value) const { @@ -467,11 +468,12 @@ struct SpillSortSharedState : public BasicSharedState, LOG(INFO) << "spill sort block batch row count: " << spill_block_batch_row_count; } } - void clear(); + void close(); SortSharedState* in_mem_shared_state = nullptr; bool enable_spill = false; bool is_spilled = false; + std::atomic_bool is_closed = false; Status sink_status; std::shared_ptr<BasicSharedState> in_mem_shared_state_sptr; diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index 4bca88ec931..2627b56fe7d 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -312,7 +312,7 @@ bool PipelineXTask::should_revoke_memory(RuntimeState* state, int64_t revocable_ wg->check_mem_used(&is_wg_mem_low_water_mark, &is_wg_mem_high_water_mark); if (is_wg_mem_high_water_mark) { if (revocable_mem_bytes > min_revocable_mem_bytes) { - LOG_EVERY_N(INFO, 5) << "revoke memory, hight water mark"; + LOG_EVERY_N(INFO, 10) << "revoke memory, hight water mark"; return true; } return false; @@ -332,11 +332,12 @@ bool PipelineXTask::should_revoke_memory(RuntimeState* state, int64_t revocable_ mem_limit_of_op = query_weighted_limit / big_memory_operator_num; } - LOG_EVERY_N(INFO, 5) << "revoke memory, low water mark, revocable_mem_bytes: " - << PrettyPrinter::print_bytes(revocable_mem_bytes) - << ", mem_limit_of_op: " << PrettyPrinter::print_bytes(mem_limit_of_op) - << ", min_revocable_mem_bytes: " - << PrettyPrinter::print_bytes(min_revocable_mem_bytes); + LOG_EVERY_N(INFO, 10) << "revoke memory, low water mark, revocable_mem_bytes: " + << PrettyPrinter::print_bytes(revocable_mem_bytes) + << ", mem_limit_of_op: " + << PrettyPrinter::print_bytes(mem_limit_of_op) + << ", min_revocable_mem_bytes: " + << PrettyPrinter::print_bytes(min_revocable_mem_bytes); return (revocable_mem_bytes > mem_limit_of_op || revocable_mem_bytes > min_revocable_mem_bytes); } else { diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 1de6c9b8051..20ab10022d1 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -580,19 +580,23 @@ void PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController* Status st = Status::OK(); const bool has_cancel_reason = request->has_cancel_reason(); - LOG(INFO) << fmt::format("Cancel instance {}, reason: {}", print_id(tid), - has_cancel_reason - ? PPlanFragmentCancelReason_Name(request->cancel_reason()) - : "INTERNAL_ERROR"); if (request->has_fragment_id()) { TUniqueId query_id; query_id.__set_hi(request->query_id().hi()); query_id.__set_lo(request->query_id().lo()); + LOG(INFO) << fmt::format( + "Cancel query {}, reason: {}", print_id(query_id), + has_cancel_reason ? PPlanFragmentCancelReason_Name(request->cancel_reason()) + : "INTERNAL_ERROR"); _exec_env->fragment_mgr()->cancel_fragment( query_id, request->fragment_id(), has_cancel_reason ? request->cancel_reason() : PPlanFragmentCancelReason::INTERNAL_ERROR); } else { + LOG(INFO) << fmt::format( + "Cancel instance {}, reason: {}", print_id(tid), + has_cancel_reason ? PPlanFragmentCancelReason_Name(request->cancel_reason()) + : "INTERNAL_ERROR"); _exec_env->fragment_mgr()->cancel_instance( tid, has_cancel_reason ? request->cancel_reason() : PPlanFragmentCancelReason::INTERNAL_ERROR); diff --git a/be/src/vec/spill/spill_stream.cpp b/be/src/vec/spill/spill_stream.cpp index 843a9fc9658..f5b6fea096d 100644 --- a/be/src/vec/spill/spill_stream.cpp +++ b/be/src/vec/spill/spill_stream.cpp @@ -46,6 +46,16 @@ SpillStream::SpillStream(RuntimeState* state, int64_t stream_id, SpillDataDir* d ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(data_dir->path()); } +SpillStream::~SpillStream() { + bool exists = false; + auto status = io::global_local_filesystem()->exists(spill_dir_, &exists); + if (status.ok() && exists) { + auto gc_dir = fmt::format("{}/{}/{}", get_data_dir()->path(), SPILL_GC_DIR_PREFIX, + std::filesystem::path(spill_dir_).filename().string()); + (void)io::global_local_filesystem()->rename(spill_dir_, gc_dir); + } +} + Status SpillStream::prepare() { writer_ = std::make_unique<SpillWriter>(stream_id_, batch_rows_, data_dir_, spill_dir_); @@ -103,9 +113,9 @@ Status SpillStream::wait_spill() { return Status::OK(); } -Status SpillStream::spill_block(const Block& block, bool eof) { +Status SpillStream::spill_block(RuntimeState* state, const Block& block, bool eof) { size_t written_bytes = 0; - RETURN_IF_ERROR(writer_->write(block, written_bytes)); + RETURN_IF_ERROR(writer_->write(state, block, written_bytes)); if (eof) { RETURN_IF_ERROR(writer_->close()); writer_.reset(); diff --git a/be/src/vec/spill/spill_stream.h b/be/src/vec/spill/spill_stream.h index afec4734d8a..6b5166f2652 100644 --- a/be/src/vec/spill/spill_stream.h +++ b/be/src/vec/spill/spill_stream.h @@ -40,6 +40,8 @@ public: std::string spill_dir, size_t batch_rows, size_t batch_bytes, RuntimeProfile* profile); + ~SpillStream(); + int64_t id() const { return stream_id_; } SpillDataDir* get_data_dir() const { return data_dir_; } @@ -51,7 +53,7 @@ public: Status prepare_spill(); - Status spill_block(const Block& block, bool eof); + Status spill_block(RuntimeState* state, const Block& block, bool eof); void end_spill(const Status& status); diff --git a/be/src/vec/spill/spill_stream_manager.cpp b/be/src/vec/spill/spill_stream_manager.cpp index ecbee7fb858..4abca15082c 100644 --- a/be/src/vec/spill/spill_stream_manager.cpp +++ b/be/src/vec/spill/spill_stream_manager.cpp @@ -198,8 +198,6 @@ Status SpillStreamManager::register_spill_stream(RuntimeState* state, SpillStrea } void SpillStreamManager::delete_spill_stream(SpillStreamSPtr stream) { - stream->close(); - auto gc_dir = fmt::format("{}/{}/{}", stream->get_data_dir()->path(), SPILL_GC_DIR_PREFIX, std::filesystem::path(stream->get_spill_dir()).filename().string()); (void)io::global_local_filesystem()->rename(stream->get_spill_dir(), gc_dir); diff --git a/be/src/vec/spill/spill_writer.cpp b/be/src/vec/spill/spill_writer.cpp index 965d36e40c0..cf8c23e6c22 100644 --- a/be/src/vec/spill/spill_writer.cpp +++ b/be/src/vec/spill/spill_writer.cpp @@ -23,6 +23,7 @@ #include "io/fs/local_file_system.h" #include "io/fs/local_file_writer.h" #include "runtime/exec_env.h" +#include "runtime/runtime_state.h" #include "runtime/thread_context.h" #include "vec/spill/spill_stream_manager.h" @@ -36,12 +37,6 @@ Status SpillWriter::open() { return Status::OK(); } -SpillWriter::~SpillWriter() { - if (!closed_) { - (void)Status::Error<ErrorCode::INTERNAL_ERROR>("spill writer not closed correctly"); - } -} - Status SpillWriter::close() { if (closed_ || !file_writer_) { return Status::OK(); @@ -68,7 +63,7 @@ Status SpillWriter::close() { return Status::OK(); } -Status SpillWriter::write(const Block& block, size_t& written_bytes) { +Status SpillWriter::write(RuntimeState* state, const Block& block, size_t& written_bytes) { written_bytes = 0; DCHECK(file_writer_); auto rows = block.rows(); @@ -79,7 +74,7 @@ Status SpillWriter::write(const Block& block, size_t& written_bytes) { auto tmp_block = block.clone_empty(); const auto& src_data = block.get_columns_with_type_and_name(); - for (size_t row_idx = 0; row_idx < rows;) { + for (size_t row_idx = 0; row_idx < rows && !state->is_cancelled();) { tmp_block.clear_column_data(); auto& dst_data = tmp_block.get_columns_with_type_and_name(); diff --git a/be/src/vec/spill/spill_writer.h b/be/src/vec/spill/spill_writer.h index 45317e8b8cf..865b7aeb002 100644 --- a/be/src/vec/spill/spill_writer.h +++ b/be/src/vec/spill/spill_writer.h @@ -25,6 +25,7 @@ #include "util/runtime_profile.h" #include "vec/core/block.h" namespace doris { +class RuntimeState; namespace vectorized { class SpillDataDir; @@ -35,13 +36,11 @@ public: file_path_ = dir + "/" + std::to_string(file_index_); } - ~SpillWriter(); - Status open(); Status close(); - Status write(const Block& block, size_t& written_bytes); + Status write(RuntimeState* state, const Block& block, size_t& written_bytes); int64_t get_id() const { return stream_id_; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 35601cf4bde..72df816bf62 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -1726,7 +1726,7 @@ public class SessionVariable implements Serializable, Writable { public static final int MAX_EXTERNAL_AGG_PARTITION_BITS = 20; @VariableMgr.VarAttr(name = EXTERNAL_AGG_PARTITION_BITS, checker = "checkExternalAggPartitionBits", fuzzy = true) - public int externalAggPartitionBits = 8; // means that the hash table will be partitioned into 256 blocks. + public int externalAggPartitionBits = 5; // means that the hash table will be partitioned into 32 blocks. public boolean isEnableJoinSpill() { return enableJoinSpill; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org