This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push: new bba1119f741 Opt debug log (#44331) bba1119f741 is described below commit bba1119f741fdbb5b6c02a42310f8607c5741e24 Author: Jerry Hu <hushengg...@selectdb.com> AuthorDate: Wed Nov 20 15:34:32 2024 +0800 Opt debug log (#44331) --- be/src/pipeline/exec/exchange_sink_operator.cpp | 2 +- be/src/pipeline/exec/multi_cast_data_streamer.cpp | 13 ++- .../exec/partitioned_aggregation_sink_operator.cpp | 6 +- .../exec/partitioned_aggregation_sink_operator.h | 2 +- .../partitioned_aggregation_source_operator.cpp | 2 +- .../exec/partitioned_hash_join_probe_operator.cpp | 28 +++--- .../exec/partitioned_hash_join_sink_operator.cpp | 13 +-- be/src/pipeline/exec/spill_sort_sink_operator.cpp | 6 +- .../pipeline/exec/spill_sort_source_operator.cpp | 8 +- be/src/pipeline/exec/spill_utils.h | 4 +- be/src/pipeline/pipeline_fragment_context.cpp | 2 +- be/src/pipeline/pipeline_task.cpp | 39 ++++---- be/src/runtime/fragment_mgr.cpp | 4 +- .../workload_group/workload_group_manager.cpp | 101 +++++++++++++-------- .../workload_group/workload_group_manager.h | 4 +- .../workload_management/workload_action.cpp | 2 +- 16 files changed, 130 insertions(+), 106 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 52c55ccabbc..0e26570d9d4 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -106,7 +106,7 @@ void ExchangeSinkLocalState::on_channel_finished(InstanceLoId channel_id) { std::lock_guard<std::mutex> lock(_finished_channels_mutex); if (_finished_channels.contains(channel_id)) { - LOG(WARNING) << "query: " << print_id(_state->query_id()) + LOG(WARNING) << "Query: " << print_id(_state->query_id()) << ", on_channel_finished on already finished channel: " << channel_id; return; } else { diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.cpp b/be/src/pipeline/exec/multi_cast_data_streamer.cpp index f1e399a3289..e1484c64614 100644 --- a/be/src/pipeline/exec/multi_cast_data_streamer.cpp +++ b/be/src/pipeline/exec/multi_cast_data_streamer.cpp @@ -126,7 +126,7 @@ Status MultiCastDataStreamer::pull(RuntimeState* state, int sender_idx, vectoriz const auto end = _multi_cast_blocks.end(); if (pos_to_pull == end) { _block_reading(sender_idx); - VLOG_DEBUG << "query: " << print_id(state->query_id()) + VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", pos_to_pull end: " << (void*)(_write_dependency); *eos = _eos; return Status::OK(); @@ -151,8 +151,6 @@ Status MultiCastDataStreamer::pull(RuntimeState* state, int sender_idx, vectoriz _cumulative_mem_size.fetch_sub(mem_size); _multi_cast_blocks.pop_front(); _write_dependency->set_ready(); - VLOG_DEBUG << "**** query: " << print_id(state->query_id()) - << ", set ready: " << (void*)(_write_dependency); } else { _copy_block(block, *un_finish_copy); } @@ -175,6 +173,11 @@ void MultiCastDataStreamer::_wait_copy_block(vectorized::Block* block, int& un_f } Status MultiCastDataStreamer::_trigger_spill_if_need(RuntimeState* state, bool* triggered) { + if (!state->enable_spill() && !state->enable_force_spill()) { + *triggered = false; + return Status::OK(); + } + vectorized::SpillStreamSPtr spill_stream; *triggered = false; if (_cumulative_mem_size.load() >= config::exchg_node_buffer_size_bytes && @@ -245,7 +248,7 @@ Status MultiCastDataStreamer::_submit_spill_task(RuntimeState* state, RETURN_IF_ERROR(spill_stream->spill_block(state, block, false)); } - VLOG_DEBUG << "query: " << print_id(state->query_id()) << " multi cast write " + VLOG_DEBUG << "Query: " << print_id(state->query_id()) << " multi cast write " << blocks_count << " blocks"; return spill_stream->spill_eof(); }; @@ -256,7 +259,7 @@ Status MultiCastDataStreamer::_submit_spill_task(RuntimeState* state, _write_dependency->set_ready(); if (!status.ok()) { - LOG(WARNING) << "query: " << query_id + LOG(WARNING) << "Query: " << query_id << " multi cast write failed: " << status.to_string() << ", dependency: " << (void*)_spill_dependency.get(); } else { diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp index de2f3b29d36..bc3c1fccba5 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp @@ -268,7 +268,7 @@ size_t PartitionedAggSinkOperatorX::get_reserve_mem_size(RuntimeState* state, bo Status PartitionedAggSinkLocalState::revoke_memory( RuntimeState* state, const std::shared_ptr<SpillContext>& spill_context) { const auto size_to_revoke = _parent->revocable_mem_size(state); - VLOG_DEBUG << "query " << print_id(state->query_id()) << " agg node " + VLOG_DEBUG << "Query " << print_id(state->query_id()) << " agg node " << Base::_parent->node_id() << " revoke_memory, size: " << _parent->revocable_mem_size(state) << ", eos: " << _eos; @@ -318,13 +318,13 @@ Status PartitionedAggSinkLocalState::revoke_memory( Defer defer {[&]() { if (!status.ok() || state->is_cancelled()) { if (!status.ok()) { - LOG(WARNING) << "query " << print_id(query_id) << " agg node " + LOG(WARNING) << "Query " << print_id(query_id) << " agg node " << Base::_parent->node_id() << " revoke_memory error: " << status; } _shared_state->close(); } else { - VLOG_DEBUG << "query " << print_id(query_id) << " agg node " + VLOG_DEBUG << "Query " << print_id(query_id) << " agg node " << Base::_parent->node_id() << " revoke_memory finish, size: " << _parent->revocable_mem_size(state) << ", eos: " << _eos; } diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h index 8b7836134fa..2c77ed15436 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h @@ -83,7 +83,7 @@ public: std::max<size_t>(4096, vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM * total_rows / size_to_revoke_)); - VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: " << _parent->node_id() + VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node: " << _parent->node_id() << ", spill_batch_rows: " << spill_batch_rows << ", total rows: " << total_rows << ", size_to_revoke: " << size_to_revoke; size_t row_count = 0; diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp index 027e726e358..046f8df44ae 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp @@ -249,7 +249,7 @@ Status PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b Defer defer {[&]() { if (!status.ok() || state->is_cancelled()) { if (!status.ok()) { - LOG(WARNING) << "query " << print_id(query_id) << " agg node " + LOG(WARNING) << "Query " << print_id(query_id) << " agg node " << _parent->node_id() << " recover agg data error: " << status; } _shared_state->close(); diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index bdba90aac37..1e3c1d18a71 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -237,7 +237,7 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks( } COUNTER_SET(_probe_blocks_bytes, int64_t(not_revoked_size)); - VLOG_DEBUG << "query: " << print_id(query_id) + VLOG_DEBUG << "Query: " << print_id(query_id) << " hash probe revoke done, node: " << p.node_id() << ", task: " << state->task_id(); return Status::OK(); @@ -285,7 +285,7 @@ Status PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t partition_in Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(RuntimeState* state, uint32_t partition_index, bool& has_data) { - VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: " << _parent->node_id() + VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node: " << _parent->node_id() << ", task id: " << state->task_id() << ", partition: " << partition_index << " recover_build_blocks_from_disk"; auto& spilled_stream = _shared_state->spilled_streams[partition_index]; @@ -301,7 +301,7 @@ Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim SCOPED_TIMER(_recovery_build_timer); bool eos = false; - VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: " << _parent->node_id() + VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node: " << _parent->node_id() << ", task id: " << state->task_id() << ", partition: " << partition_index << ", recoverying build data"; Status status; @@ -348,7 +348,7 @@ Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim if (eos) { ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream); _shared_state->spilled_streams[partition_index].reset(); - VLOG_DEBUG << "query: " << print_id(state->query_id()) + VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node: " << _parent->node_id() << ", task id: " << state->task_id() << ", partition: " << partition_index; } @@ -379,7 +379,7 @@ Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim auto* pipeline_task = state->get_task(); if (pipeline_task) { auto& p = _parent->cast<PartitionedHashJoinProbeOperatorX>(); - VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: " << p.node_id() + VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node: " << p.node_id() << ", task id: " << state->task_id() << ", partition: " << partition_index << ", dependency: " << _dependency << ", task debug_string: " << pipeline_task->debug_string(); @@ -396,7 +396,7 @@ Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim auto spill_runnable = std::make_shared<SpillRecoverRunnable>( state, _spill_dependency, _runtime_profile.get(), _shared_state->shared_from_this(), exception_catch_func); - VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: " << _parent->node_id() + VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node: " << _parent->node_id() << ", task id: " << state->task_id() << ", partition: " << partition_index << " recover_build_blocks_from_disk submit func"; return spill_io_pool->submit(std::move(spill_runnable)); @@ -466,7 +466,7 @@ Status PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_disk(Runtim } } if (eos) { - VLOG_DEBUG << "query: " << print_id(query_id) + VLOG_DEBUG << "Query: " << print_id(query_id) << ", recovery probe data done: " << spilled_stream->get_spill_dir(); ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream); spilled_stream.reset(); @@ -683,7 +683,7 @@ Status PartitionedHashJoinProbeOperatorX::_setup_internal_operators( }); RETURN_IF_ERROR(_inner_sink_operator->sink(local_state._runtime_state.get(), &block, true)); - VLOG_DEBUG << "query: " << print_id(state->query_id()) + VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", internal build operator finished, node id: " << node_id() << ", task id: " << state->task_id() << ", partition: " << local_state._partition_cursor << "rows: " << block.rows() @@ -744,7 +744,7 @@ Status PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state, if (!has_data) { vectorized::Block block; RETURN_IF_ERROR(_inner_probe_operator->push(runtime_state, &block, true)); - VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: " << node_id() + VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node: " << node_id() << ", task: " << state->task_id() << "partition: " << partition_index << " has no data to recovery"; break; @@ -765,7 +765,7 @@ Status PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state, *eos = false; if (in_mem_eos) { - VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: " << node_id() + VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node: " << node_id() << ", task: " << state->task_id() << ", partition: " << local_state._partition_cursor; local_state._partition_cursor++; @@ -858,11 +858,11 @@ size_t PartitionedHashJoinProbeOperatorX::get_reserve_mem_size(RuntimeState* sta Status PartitionedHashJoinProbeOperatorX::revoke_memory( RuntimeState* state, const std::shared_ptr<SpillContext>& spill_context) { auto& local_state = get_local_state(state); - VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", hash probe node: " << node_id() + VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", hash probe node: " << node_id() << ", task: " << state->task_id() << ", child eos: " << local_state._child_eos; if (local_state._child_eos) { - VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", hash probe node: " << node_id() + VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", hash probe node: " << node_id() << ", task: " << state->task_id() << ", child eos: " << local_state._child_eos << ", will not revoke size: " << revocable_mem_size(state); return Status::OK(); @@ -878,7 +878,7 @@ Status PartitionedHashJoinProbeOperatorX::revoke_memory( Status PartitionedHashJoinProbeOperatorX::_revoke_memory(RuntimeState* state) { auto& local_state = get_local_state(state); - VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", hash probe node: " << node_id() + VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", hash probe node: " << node_id() << ", task: " << state->task_id(); RETURN_IF_ERROR(local_state.spill_probe_blocks(state)); @@ -915,7 +915,7 @@ Status PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori #ifndef NDEBUG Defer eos_check_defer([&] { if (*eos) { - LOG(INFO) << "query: " << print_id(state->query_id()) + LOG(INFO) << "Query: " << print_id(state->query_id()) << ", hash probe node: " << node_id() << ", task: " << state->task_id() << ", eos with child eos: " << local_state._child_eos << ", need spill: " << need_to_spill; diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index 6f2f7c8bc15..cabcfd7d450 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -291,7 +291,7 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block( Status PartitionedHashJoinSinkLocalState::revoke_memory( RuntimeState* state, const std::shared_ptr<SpillContext>& spill_context) { SCOPED_TIMER(_spill_total_timer); - VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", task: " << state->task_id() + VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", task: " << state->task_id() << " hash join sink " << _parent->node_id() << " revoke_memory" << ", eos: " << _child_eos; DCHECK_EQ(_spilling_task_count, 0); @@ -317,7 +317,7 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory( Status status; if (_child_eos) { - VLOG_DEBUG << "query:" << print_id(this->state()->query_id()) << ", hash join sink " + VLOG_DEBUG << "Query:" << print_id(this->state()->query_id()) << ", hash join sink " << _parent->node_id() << " set_ready_to_read" << ", task id: " << state->task_id(); std::for_each(_shared_state->partitioned_build_blocks.begin(), @@ -403,7 +403,7 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory( } if (_child_eos) { - VLOG_DEBUG << "query:" << print_id(state->query_id()) << ", hash join sink " + VLOG_DEBUG << "Query:" << print_id(state->query_id()) << ", hash join sink " << _parent->node_id() << " set_ready_to_read" << ", task id: " << state->task_id(); std::for_each(_shared_state->partitioned_build_blocks.begin(), @@ -493,9 +493,6 @@ Status PartitionedHashJoinSinkLocalState::_spill_to_disk( status = spilling_stream->spill_block(state(), block, false); } - VLOG_DEBUG << "query: " << print_id(_state->query_id()) << ", task: " << _state->task_id() - << ", join sink " << _parent->node_id() << " revoke done"; - return status; } @@ -597,7 +594,7 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B const auto need_to_spill = local_state._shared_state->need_to_spill; if (rows == 0) { if (eos) { - VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", hash join sink " + VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", hash join sink " << node_id() << " sink eos, set_ready_to_read" << ", task id: " << state->task_id() << ", need spill: " << need_to_spill; @@ -655,7 +652,7 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B local_state._shared_state->inner_runtime_state.get(), in_block, eos)); if (eos) { - VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", hash join sink " + VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", hash join sink " << node_id() << " sink eos, set_ready_to_read" << ", task id: " << state->task_id(); local_state._dependency->set_ready_to_read(); diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index 83c7ccfc1a3..cd40e6a9ded 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -207,7 +207,7 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state, profile()->add_info_string("Spilled", "true"); } - VLOG_DEBUG << "query " << print_id(state->query_id()) << " sort node " + VLOG_DEBUG << "Query " << print_id(state->query_id()) << " sort node " << Base::_parent->node_id() << " revoke_memory" << ", eos: " << _eos; if (!_shared_state->_spill_status.ok()) { @@ -235,12 +235,12 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state, Defer defer {[&]() { if (!status.ok() || state->is_cancelled()) { if (!status.ok()) { - LOG(WARNING) << "query " << print_id(query_id) << " sort node " + LOG(WARNING) << "Query " << print_id(query_id) << " sort node " << _parent->node_id() << " revoke memory error: " << status; } _shared_state->close(); } else { - VLOG_DEBUG << "query " << print_id(query_id) << " sort node " << _parent->node_id() + VLOG_DEBUG << "Query " << print_id(query_id) << " sort node " << _parent->node_id() << " revoke memory finish"; } diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp b/be/src/pipeline/exec/spill_sort_source_operator.cpp index b0b5ebbcbd7..447c306c9ba 100644 --- a/be/src/pipeline/exec/spill_sort_source_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp @@ -73,7 +73,7 @@ int SpillSortLocalState::_calc_spill_blocks_to_merge() const { } Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* state) { auto& parent = Base::_parent->template cast<Parent>(); - VLOG_DEBUG << "query " << print_id(state->query_id()) << " sort node " << _parent->node_id() + VLOG_DEBUG << "Query " << print_id(state->query_id()) << " sort node " << _parent->node_id() << " merge spill data"; _spill_dependency->Dependency::block(); @@ -85,7 +85,7 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat Defer defer {[&]() { if (!status.ok() || state->is_cancelled()) { if (!status.ok()) { - LOG(WARNING) << "query " << print_id(query_id) << " sort node " + LOG(WARNING) << "Query " << print_id(query_id) << " sort node " << _parent->node_id() << " merge spill data error: " << status; } _shared_state->close(); @@ -94,7 +94,7 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat } _current_merging_streams.clear(); } else { - VLOG_DEBUG << "query " << print_id(query_id) << " sort node " << _parent->node_id() + VLOG_DEBUG << "Query " << print_id(query_id) << " sort node " << _parent->node_id() << " merge spill data finish"; } }}; @@ -102,7 +102,7 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat vectorized::SpillStreamSPtr tmp_stream; while (!state->is_cancelled()) { int max_stream_count = _calc_spill_blocks_to_merge(); - VLOG_DEBUG << "query " << print_id(query_id) << " sort node " << _parent->node_id() + VLOG_DEBUG << "Query " << print_id(query_id) << " sort node " << _parent->node_id() << " merge spill streams, streams count: " << _shared_state->sorted_streams.size() << ", curren merge max stream count: " << max_stream_count; diff --git a/be/src/pipeline/exec/spill_utils.h b/be/src/pipeline/exec/spill_utils.h index bf877382129..9986031ccc3 100644 --- a/be/src/pipeline/exec/spill_utils.h +++ b/be/src/pipeline/exec/spill_utils.h @@ -49,11 +49,11 @@ struct SpillContext { ~SpillContext() { LOG_IF(WARNING, running_tasks_count.load() != 0) - << "query: " << print_id(query_id) + << "Query: " << print_id(query_id) << " not all spill tasks finished, remaining tasks: " << running_tasks_count.load(); LOG_IF(WARNING, _running_non_sink_tasks_count.load() != 0) - << "query: " << print_id(query_id) + << "Query: " << print_id(query_id) << " not all spill tasks(non sink tasks) finished, remaining tasks: " << _running_non_sink_tasks_count.load(); } diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 944374b66e2..74008ccd527 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -1843,7 +1843,7 @@ size_t PipelineFragmentContext::get_revocable_size(bool* has_running_task) const for (const auto& task_instances : _tasks) { for (const auto& task : task_instances) { if (task->is_running() || task->is_revoking()) { - LOG_EVERY_N(INFO, 50) << "query: " << print_id(_query_id) + LOG_EVERY_N(INFO, 50) << "Query: " << print_id(_query_id) << " is running, task: " << (void*)task.get() << ", task->is_revoking(): " << task->is_revoking() << ", " << task->is_running(); diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 69a1c490911..cbc0f491dc2 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -300,11 +300,6 @@ bool PipelineTask::_is_blocked() { } // If all dependencies are ready for this operator, we can execute this task if no datum is needed from upstream operators. if (!_operators[i]->need_more_input_data(_state)) { - // if (VLOG_DEBUG_IS_ON) { - // VLOG_DEBUG << "query: " << print_id(_state->query_id()) - // << ", task id: " << _index << ", operator " << i - // << " not need_more_input_data"; - // } break; } } @@ -408,7 +403,7 @@ Status PipelineTask::execute(bool* eos) { // _state->get_query_ctx()->update_low_memory_mode(); if (_pending_block) [[unlikely]] { - LOG(INFO) << "query: " << print_id(query_id) + LOG(INFO) << "Query: " << print_id(query_id) << " has pending block, size: " << _pending_block->allocated_bytes(); _block = std::move(_pending_block); block = _block.get(); @@ -432,16 +427,17 @@ Status PipelineTask::execute(bool* eos) { COUNTER_UPDATE(_memory_reserve_times, 1); if (!st.ok()) { COUNTER_UPDATE(_memory_reserve_failed_times, 1); - LOG(INFO) << "query: " << print_id(query_id) << ", try to reserve: " - << PrettyPrinter::print(reserve_size, TUnit::BYTES) - << ", sink name: " << _sink->get_name() - << ", node id: " << _sink->node_id() - << ", task id: " << _state->task_id() - << ", failed: " << st.to_string() - << ", debug info: " << GlobalMemoryArbitrator::process_mem_log_str(); + VLOG_DEBUG << "Query: " << print_id(query_id) << ", try to reserve: " + << PrettyPrinter::print(reserve_size, TUnit::BYTES) + << ", sink name: " << _sink->get_name() + << ", node id: " << _sink->node_id() + << ", task id: " << _state->task_id() + << ", failed: " << st.to_string() + << ", debug info: " << GlobalMemoryArbitrator::process_mem_log_str(); _state->get_query_ctx()->update_paused_reason(st); _state->get_query_ctx()->set_low_memory_mode(); + _state->get_query_ctx()->set_memory_sufficient(false); ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query( _state->get_query_ctx()->shared_from_this(), reserve_size); continue; @@ -462,15 +458,16 @@ Status PipelineTask::execute(bool* eos) { status = thread_context()->try_reserve_memory(sink_reserve_size); if (!status.ok()) { COUNTER_UPDATE(_memory_reserve_failed_times, 1); - LOG(INFO) << "query: " << print_id(query_id) << ", try to reserve: " - << PrettyPrinter::print(sink_reserve_size, TUnit::BYTES) - << ", sink name: " << _sink->get_name() - << ", node id: " << _sink->node_id() - << ", task id: " << _state->task_id() - << ", failed: " << status.to_string() - << ", debug info: " << GlobalMemoryArbitrator::process_mem_log_str(); + VLOG_DEBUG << "Query: " << print_id(query_id) << ", try to reserve: " + << PrettyPrinter::print(sink_reserve_size, TUnit::BYTES) + << ", sink name: " << _sink->get_name() + << ", node id: " << _sink->node_id() + << ", task id: " << _state->task_id() + << ", failed: " << status.to_string() + << ", debug info: " << GlobalMemoryArbitrator::process_mem_log_str(); _state->get_query_ctx()->update_paused_reason(status); _state->get_query_ctx()->set_low_memory_mode(); + _state->get_query_ctx()->set_memory_sufficient(false); ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query( _state->get_query_ctx()->shared_from_this(), sink_reserve_size); DCHECK_EQ(_pending_block.get(), nullptr); @@ -617,7 +614,7 @@ Status PipelineTask::revoke_memory(const std::shared_ptr<SpillContext>& spill_co RETURN_IF_ERROR(_sink->revoke_memory(_state, spill_context)); } else if (spill_context) { spill_context->on_task_finished(); - LOG(INFO) << "query: " << print_id(_state->query_id()) << ", task: " << ((void*)this) + LOG(INFO) << "Query: " << print_id(_state->query_id()) << ", task: " << ((void*)this) << " has not enough data to revoke: " << revocable_size; } return Status::OK(); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 18aacb452a6..113b08c5a9f 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -755,11 +755,11 @@ std::string FragmentMgr::dump_pipeline_tasks(TUniqueId& query_id) { Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, QuerySource query_source, const FinishCallback& cb) { - VLOG_ROW << "query: " << print_id(params.query_id) << " exec_plan_fragment params is " + VLOG_ROW << "Query: " << print_id(params.query_id) << " exec_plan_fragment params is " << apache::thrift::ThriftDebugString(params).c_str(); // sometimes TExecPlanFragmentParams debug string is too long and glog // will truncate the log line, so print query options seperately for debuggin purpose - VLOG_ROW << "query: " << print_id(params.query_id) << "query options is " + VLOG_ROW << "Query: " << print_id(params.query_id) << "query options is " << apache::thrift::ThriftDebugString(params.query_options).c_str(); std::shared_ptr<QueryContext> query_ctx; diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index 8d82817b3b0..b22eb21d31d 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -264,6 +264,8 @@ void WorkloadGroupMgr::add_paused_query(const std::shared_ptr<QueryContext>& que } } +constexpr int64_t TIMEOUT_IN_QUEUE_LIMIT = 1000L * 60; + /** * Strategy 1: A revocable query should not have any running task(PipelineTask). * strategy 2: If the workload group has any task exceed workload group memlimit, then set all queryctx's memlimit @@ -281,13 +283,15 @@ void WorkloadGroupMgr::handle_paused_queries() { } } } - const int64_t TIMEOUT_IN_QUEUE = 1000L * 3; + std::unique_lock<std::mutex> lock(_paused_queries_lock); bool has_revoked_from_other_group = false; for (auto it = _paused_queries_list.begin(); it != _paused_queries_list.end();) { auto& queries_list = it->second; const auto& wg = it->first; + LOG_EVERY_T(INFO, 120) << "Paused queries count: " << queries_list.size(); + bool is_low_watermark = false; bool is_high_watermark = false; wg->check_mem_used(&is_low_watermark, &is_high_watermark); @@ -302,10 +306,11 @@ void WorkloadGroupMgr::handle_paused_queries() { // The query is finished during in paused list. if (query_ctx == nullptr) { query_it = queries_list.erase(query_it); + LOG(INFO) << "Query: " << query_it->query_id() << " is nullptr, erase it."; continue; } if (query_ctx->is_cancelled()) { - LOG(INFO) << "query: " << print_id(query_ctx->query_id()) + LOG(INFO) << "Query: " << print_id(query_ctx->query_id()) << " was canceled, remove from paused list"; query_it = queries_list.erase(query_it); continue; @@ -314,8 +319,9 @@ void WorkloadGroupMgr::handle_paused_queries() { if (query_ctx->paused_reason().is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) { // Streamload, kafka load, group commit will never have query memory exceeded error because // their query limit is very large. - bool spill_res = handle_single_query_(query_ctx, query_it->reserve_size_, - query_ctx->paused_reason()); + bool spill_res = + handle_single_query_(query_ctx, query_it->reserve_size_, + query_it->elapsed_time(), query_ctx->paused_reason()); if (!spill_res) { ++query_it; continue; @@ -331,7 +337,7 @@ void WorkloadGroupMgr::handle_paused_queries() { // the wg is converted to soft limit. // So that should resume the query. LOG(WARNING) - << "query: " << print_id(query_ctx->query_id()) + << "Query: " << print_id(query_ctx->query_id()) << " reserve memory failed because exceed workload group memlimit, it " "should not happen, resume it again. paused reason: " << query_ctx->paused_reason(); @@ -346,7 +352,7 @@ void WorkloadGroupMgr::handle_paused_queries() { query_ctx->get_mem_tracker()->consumption() + query_it->reserve_size_) { query_ctx->set_mem_limit(query_ctx->expected_mem_limit()); query_ctx->set_memory_sufficient(true); - LOG(INFO) << "workload group memory reserve failed because " + LOG(INFO) << "Workload group memory reserve failed because " << query_ctx->debug_string() << " reserve size " << PrettyPrinter::print_bytes(query_it->reserve_size_) << " is too large, set hard limit to " @@ -368,8 +374,9 @@ void WorkloadGroupMgr::handle_paused_queries() { if (!has_changed_hard_limit) { update_queries_limit_(wg, true); has_changed_hard_limit = true; - LOG(INFO) << "query: " << print_id(query_ctx->query_id()) - << " reserve memory failed due to workload group memory exceed, " + LOG(INFO) << "Query: " << print_id(query_ctx->query_id()) << " reserve memory(" + << PrettyPrinter::print_bytes(query_it->reserve_size_) + << ") failed due to workload group memory exceed, " "should set the workload group work in memory insufficent mode, " "so that other query will reduce their memory. wg: " << wg->debug_string(); @@ -380,6 +387,7 @@ void WorkloadGroupMgr::handle_paused_queries() { // not encourage not enable slot memory. // TODO should kill the query that exceed limit. bool spill_res = handle_single_query_(query_ctx, query_it->reserve_size_, + query_it->elapsed_time(), query_ctx->paused_reason()); if (!spill_res) { ++query_it; @@ -391,9 +399,9 @@ void WorkloadGroupMgr::handle_paused_queries() { } else { // Should not put the query back to task scheduler immediately, because when wg's memory not sufficient, // and then set wg's flag, other query may not free memory very quickly. - if (query_it->elapsed_time() > TIMEOUT_IN_QUEUE) { + if (query_it->elapsed_time() > TIMEOUT_IN_QUEUE_LIMIT) { // set wg's memory to insufficent, then add it back to task scheduler to run. - LOG(INFO) << "query: " << print_id(query_ctx->query_id()) + LOG(INFO) << "Query: " << print_id(query_ctx->query_id()) << " will be resume."; query_ctx->set_memory_sufficient(true); query_it = queries_list.erase(query_it); @@ -441,7 +449,8 @@ void WorkloadGroupMgr::handle_paused_queries() { continue; } else { bool spill_res = handle_single_query_( - query_ctx, query_it->reserve_size_, query_ctx->paused_reason()); + query_ctx, query_it->reserve_size_, query_it->elapsed_time(), + query_ctx->paused_reason()); if (spill_res) { query_it = queries_list.erase(query_it); continue; @@ -461,7 +470,7 @@ void WorkloadGroupMgr::handle_paused_queries() { if (doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted < 0.001 && query_it->cache_ratio_ > 0.001) { - LOG(INFO) << "query: " << print_id(query_ctx->query_id()) + LOG(INFO) << "Query: " << print_id(query_ctx->query_id()) << " will be resume after cache adjust."; query_ctx->set_memory_sufficient(true); query_it = queries_list.erase(query_it); @@ -613,15 +622,16 @@ int64_t WorkloadGroupMgr::cancel_top_query_in_overcommit_group_(int64_t need_fre // If the query could release some memory, for example, spill disk, then the return value is true. // If the query could not release memory, then cancel the query, the return value is true. // If the query is not ready to do these tasks, it means just wait, then return value is false. -bool WorkloadGroupMgr::handle_single_query_(std::shared_ptr<QueryContext> query_ctx, - size_t size_to_reserve, Status paused_reason) { +bool WorkloadGroupMgr::handle_single_query_(const std::shared_ptr<QueryContext>& query_ctx, + size_t size_to_reserve, int64_t time_in_queue, + Status paused_reason) { size_t revocable_size = 0; size_t memory_usage = 0; bool has_running_task = false; const auto query_id = print_id(query_ctx->query_id()); query_ctx->get_revocable_info(&revocable_size, &memory_usage, &has_running_task); if (has_running_task) { - LOG(INFO) << "query: " << print_id(query_ctx->query_id()) + LOG(INFO) << "Query: " << print_id(query_ctx->query_id()) << " is paused, but still has running task, skip it."; return false; } @@ -633,14 +643,14 @@ bool WorkloadGroupMgr::handle_single_query_(std::shared_ptr<QueryContext> query_ // During waiting time, another operator in the query may finished and release // many memory and we could run. if ((memory_usage + size_to_reserve) < limit) { - LOG(INFO) << "query: " << query_id << ", usage(" << memory_usage << " + " + LOG(INFO) << "Query: " << query_id << ", usage(" << memory_usage << " + " << size_to_reserve << ") less than limit(" << limit << "), resume it."; query_ctx->set_memory_sufficient(true); return true; - } else { + } else if (time_in_queue >= TIMEOUT_IN_QUEUE_LIMIT) { // Use MEM_LIMIT_EXCEEDED so that FE could parse the error code and do try logic auto msg1 = fmt::format( - "query {} reserve memory failed, but could not find memory that could " + "Query {} reserve memory failed, but could not find memory that could " "release or spill to disk. Query memory usage: {}, limit: {}, process " "memory info: {}" ", wg info: {}.", @@ -657,37 +667,54 @@ bool WorkloadGroupMgr::handle_single_query_(std::shared_ptr<QueryContext> query_ MemTrackerLimiter::Type::LOAD)); LOG(INFO) << msg2; query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>(msg1)); + } else { + return false; + } + } else if (paused_reason.is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>()) { + if (!query_ctx->workload_group()->exceed_limit()) { + LOG(INFO) << "Query: " << query_id + << " paused caused by WORKLOAD_GROUP_MEMORY_EXCEEDED, now resume it."; + query_ctx->set_memory_sufficient(true); + return true; + } else if (time_in_queue > TIMEOUT_IN_QUEUE_LIMIT) { + LOG(INFO) << "Query: " << query_id << ", workload group exceeded, info: " + << GlobalMemoryArbitrator::process_memory_used_details_str() + << ", wg info: " << query_ctx->workload_group()->memory_debug_string(); + query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>( + "The query({}) reserved memory failed because workload group limit " + "exceeded, and there is no cache now. And could not find task to spill. " + "Maybe you should set the workload group's limit to a lower value.", + query_id)); + } else { + return false; } } else { // Should not consider about process memory. For example, the query's limit is 100g, workload // group's memlimit is 10g, process memory is 20g. The query reserve will always failed in wg // limit, and process is always have memory, so that it will resume and failed reserve again. - /* if (!GlobalMemoryArbitrator::is_exceed_hard_mem_limit()) { - LOG(INFO) << "query: " << query_id + LOG(INFO) << "Query: " << query_id << ", process limit not exceeded now, resume this query" << ", process memory info: " << GlobalMemoryArbitrator::process_memory_used_details_str() << ", wg info: " << query_ctx->workload_group()->memory_debug_string(); query_ctx->set_memory_sufficient(true); return true; + } else if (time_in_queue > TIMEOUT_IN_QUEUE_LIMIT) { + LOG(INFO) << "Query: " << query_id << ", process limit exceeded, info: " + << GlobalMemoryArbitrator::process_memory_used_details_str() + << ", wg info: " << query_ctx->workload_group()->memory_debug_string(); + query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>( + "The query({}) reserved memory failed because process limit exceeded, " + "and " + "there is no cache now. And could not find task to spill. Maybe you " + "should " + "set " + "the workload group's limit to a lower value.", + query_id)); + } else { + return false; } - - LOG(INFO) << "query: " << query_id << ", process limit exceeded, info: " - << GlobalMemoryArbitrator::process_memory_used_details_str() - << ", wg info: " << query_ctx->workload_group()->memory_debug_string(); - query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>( - "The query({}) reserved memory failed because process limit exceeded, and " - "there is no cache now. And could not find task to spill. Maybe you should " - "set " - "the workload group's limit to a lower value.", - query_id)); - */ - query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>( - "The query({}) reserved memory failed and could not find task to spill. Maybe " - "you should " - "set the query's memlimit or workload group's limit to a lower value.", - query_id)); } } else { SCOPED_ATTACH_TASK(query_ctx.get()); @@ -759,7 +786,7 @@ void WorkloadGroupMgr::update_queries_limit_(WorkloadGroupPtr wg, bool enable_ha if (wg->slot_memory_policy() == TWgSlotMemoryPolicy::FIXED) { if (total_slot_count < 1) { LOG(WARNING) - << "query " << print_id(query_ctx->query_id()) + << "Query " << print_id(query_ctx->query_id()) << " enabled hard limit, but the slot count < 1, could not take affect"; } else { // If the query enable hard limit, then not use weighted info any more, just use the settings limit. diff --git a/be/src/runtime/workload_group/workload_group_manager.h b/be/src/runtime/workload_group/workload_group_manager.h index 065528c66ec..9e6ac17b5dc 100644 --- a/be/src/runtime/workload_group/workload_group_manager.h +++ b/be/src/runtime/workload_group/workload_group_manager.h @@ -110,8 +110,8 @@ private: RuntimeProfile* profile); int64_t flush_memtable_from_current_group_(std::shared_ptr<QueryContext> requestor, WorkloadGroupPtr wg, int64_t need_free_mem); - bool handle_single_query_(std::shared_ptr<QueryContext> query_ctx, size_t size_to_reserve, - Status paused_reason); + bool handle_single_query_(const std::shared_ptr<QueryContext>& query_ctx, + size_t size_to_reserve, int64_t time_in_queue, Status paused_reason); int64_t revoke_memory_from_other_group_(std::shared_ptr<QueryContext> requestor, bool hard_limit, int64_t need_free_mem); int64_t revoke_overcommited_memory_(std::shared_ptr<QueryContext> requestor, diff --git a/be/src/runtime/workload_management/workload_action.cpp b/be/src/runtime/workload_management/workload_action.cpp index 8e6e3b19e2c..895269870ea 100644 --- a/be/src/runtime/workload_management/workload_action.cpp +++ b/be/src/runtime/workload_management/workload_action.cpp @@ -23,7 +23,7 @@ namespace doris { void WorkloadActionCancelQuery::exec(WorkloadQueryInfo* query_info) { std::stringstream msg; - msg << "query " << query_info->query_id + msg << "Query " << query_info->query_id << " cancelled by workload policy: " << query_info->policy_name << ", id:" << query_info->policy_id; std::string msg_str = msg.str(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org