This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new b864aa7aa23 [fix](pipeline) Fix query hang up if limited rows is
reached (#35513) (#35746)
b864aa7aa23 is described below
commit b864aa7aa2349a8411425e8d878493d363f15d93
Author: Gabriel <[email protected]>
AuthorDate: Fri May 31 22:50:57 2024 +0800
[fix](pipeline) Fix query hang up if limited rows is reached (#35513)
(#35746)
Follow-up for #35466.
We should assure closed tasks will not block other tasks.
## Proposed changes
Issue Number: close #xxx
<!--Describe your changes.-->
---
be/src/pipeline/exec/exchange_sink_buffer.cpp | 48 ++++++++++++++--------
be/src/pipeline/exec/exchange_sink_buffer.h | 9 ++--
be/src/pipeline/exec/exchange_sink_operator.cpp | 9 ++--
be/src/pipeline/exec/exchange_sink_operator.h | 3 ++
be/src/pipeline/pipeline_fragment_context.h | 2 +
.../local_exchange_source_operator.cpp | 8 ++++
.../pipeline_x/local_exchange/local_exchanger.cpp | 25 +++++++++++
.../pipeline_x/local_exchange/local_exchanger.h | 4 ++
be/src/pipeline/pipeline_x/operator.h | 8 ++++
.../pipeline_x/pipeline_x_fragment_context.h | 7 ++++
be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 11 +++--
be/src/pipeline/pipeline_x/pipeline_x_task.h | 6 +++
be/src/runtime/fragment_mgr.cpp | 2 +
13 files changed, 114 insertions(+), 28 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 7f46bfcf353..39a6a59bd49 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -85,7 +85,8 @@ namespace pipeline {
template <typename Parent>
ExchangeSinkBuffer<Parent>::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId
dest_node_id,
- int send_id, int be_number,
RuntimeState* state)
+ int send_id, int be_number,
RuntimeState* state,
+ ExchangeSinkLocalState* parent)
: HasTaskExecutionCtx(state),
_queue_capacity(0),
_is_finishing(false),
@@ -94,10 +95,8 @@ ExchangeSinkBuffer<Parent>::ExchangeSinkBuffer(PUniqueId
query_id, PlanNodeId de
_sender_id(send_id),
_be_number(be_number),
_state(state),
- _context(state->get_query_ctx()) {}
-
-template <typename Parent>
-ExchangeSinkBuffer<Parent>::~ExchangeSinkBuffer() = default;
+ _context(state->get_query_ctx()),
+ _parent(parent) {}
template <typename Parent>
void ExchangeSinkBuffer<Parent>::close() {
@@ -251,8 +250,7 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId
id) {
broadcast_q = _instance_to_broadcast_package_queue[id];
if (_is_finishing) {
- _rpc_channel_is_idle[id] = true;
- _set_ready_to_finish(_busy_channels.fetch_sub(1) == 1);
+ _turn_off_channel(id);
return Status::OK();
}
@@ -410,8 +408,7 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId
id) {
}
broadcast_q.pop();
} else {
- _rpc_channel_is_idle[id] = true;
- _set_ready_to_finish(_busy_channels.fetch_sub(1) == 1);
+ _turn_off_channel(id);
}
return Status::OK();
@@ -443,10 +440,7 @@ void ExchangeSinkBuffer<Parent>::_ended(InstanceLoId id) {
__builtin_unreachable();
} else {
std::unique_lock<std::mutex>
lock(*_instance_to_package_queue_mutex[id]);
- if (!_rpc_channel_is_idle[id]) {
- _rpc_channel_is_idle[id] = true;
- _set_ready_to_finish(_busy_channels.fetch_sub(1) == 1);
- }
+ _turn_off_channel(id);
}
}
@@ -454,17 +448,19 @@ template <typename Parent>
void ExchangeSinkBuffer<Parent>::_failed(InstanceLoId id, const std::string&
err) {
_is_finishing = true;
_context->cancel(err, Status::Cancelled(err));
- _ended(id);
+ if constexpr (std::is_same_v<ExchangeSinkLocalState, Parent>) {
+ std::unique_lock<std::mutex>
lock(*_instance_to_package_queue_mutex[id]);
+ _turn_off_channel(id, true);
+ } else {
+ _ended(id);
+ }
}
template <typename Parent>
void ExchangeSinkBuffer<Parent>::_set_receiver_eof(InstanceLoId id) {
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
_instance_to_receiver_eof[id] = true;
- if (!_rpc_channel_is_idle[id]) {
- _rpc_channel_is_idle[id] = true;
- _set_ready_to_finish(_busy_channels.fetch_sub(1) == 1);
- }
+ _turn_off_channel(id, std::is_same_v<ExchangeSinkLocalState, Parent> ?
true : false);
std::queue<BroadcastTransmitInfo<Parent>,
std::list<BroadcastTransmitInfo<Parent>>> empty;
swap(empty, _instance_to_broadcast_package_queue[id]);
}
@@ -475,6 +471,22 @@ bool
ExchangeSinkBuffer<Parent>::_is_receiver_eof(InstanceLoId id) {
return _instance_to_receiver_eof[id];
}
+template <typename Parent>
+void ExchangeSinkBuffer<Parent>::_turn_off_channel(InstanceLoId id, bool
cleanup) {
+ if (!_rpc_channel_is_idle[id]) {
+ _rpc_channel_is_idle[id] = true;
+ auto all_done = _busy_channels.fetch_sub(1) == 1;
+ _set_ready_to_finish(all_done);
+ if (cleanup && all_done) {
+ auto weak_task_ctx = weak_task_exec_ctx();
+ if (auto pip_ctx = weak_task_ctx.lock()) {
+ DCHECK(_parent);
+ _parent->set_reach_limit();
+ }
+ }
+ }
+}
+
template <typename Parent>
void ExchangeSinkBuffer<Parent>::get_max_min_rpc_time(int64_t* max_time,
int64_t* min_time) {
int64_t local_max_time = 0;
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h
b/be/src/pipeline/exec/exchange_sink_buffer.h
index 74207bbffd3..cd5502ee6d0 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -47,6 +47,7 @@ using InstanceLoId = int64_t;
namespace pipeline {
class Dependency;
+class ExchangeSinkLocalState;
} // namespace pipeline
namespace vectorized {
@@ -199,8 +200,8 @@ template <typename Parent>
class ExchangeSinkBuffer : public HasTaskExecutionCtx {
public:
ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, int
send_id, int be_number,
- RuntimeState* state);
- ~ExchangeSinkBuffer();
+ RuntimeState* state, ExchangeSinkLocalState* parent =
nullptr);
+ ~ExchangeSinkBuffer() = default;
void register_sink(TUniqueId);
Status add_block(TransmitInfo<Parent>&& request);
@@ -271,6 +272,7 @@ private:
inline void _failed(InstanceLoId id, const std::string& err);
inline void _set_receiver_eof(InstanceLoId id);
inline bool _is_receiver_eof(InstanceLoId id);
+ inline void _turn_off_channel(InstanceLoId id, bool cleanup = false);
void get_max_min_rpc_time(int64_t* max_time, int64_t* min_time);
int64_t get_sum_rpc_time();
@@ -278,7 +280,8 @@ private:
std::shared_ptr<Dependency> _queue_dependency = nullptr;
std::shared_ptr<Dependency> _finish_dependency = nullptr;
std::shared_ptr<Dependency> _broadcast_dependency = nullptr;
- std::atomic<bool> _should_stop {false};
+ std::atomic<bool> _should_stop = false;
+ ExchangeSinkLocalState* _parent = nullptr;
};
} // namespace pipeline
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index dfcb3e8e120..33b68a5ac30 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -185,7 +185,7 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
id.set_hi(_state->query_id().hi);
id.set_lo(_state->query_id().lo);
_sink_buffer =
std::make_unique<ExchangeSinkBuffer<ExchangeSinkLocalState>>(
- id, p._dest_node_id, _sender_id, _state->be_number(), state);
+ id, p._dest_node_id, _sender_id, _state->be_number(), state, this);
register_channels(_sink_buffer.get());
_queue_dependency =
@@ -681,8 +681,11 @@ Status ExchangeSinkOperatorX::channel_add_rows_with_idx(
std::string ExchangeSinkLocalState::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}",
Base::debug_string(indentation_level));
- fmt::format_to(debug_string_buffer, ", Sink Buffer: (_should_stop = {},
_busy_channels = {})",
- _sink_buffer->_should_stop.load(),
_sink_buffer->_busy_channels.load());
+ fmt::format_to(debug_string_buffer,
+ ", Sink Buffer: (_should_stop = {}, _busy_channels = {},
_is_finishing = {}), "
+ "_reach_limit: {}",
+ _sink_buffer->_should_stop.load(),
_sink_buffer->_busy_channels.load(),
+ _sink_buffer->_is_finishing.load(), _reach_limit.load());
return fmt::to_string(debug_string_buffer);
}
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h
b/be/src/pipeline/exec/exchange_sink_operator.h
index f275365c0e8..4a217c940ce 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -131,6 +131,8 @@ public:
RuntimeProfile::Counter* compress_timer() { return _compress_timer; }
RuntimeProfile::Counter* uncompressed_bytes_counter() { return
_uncompressed_bytes_counter; }
[[nodiscard]] bool transfer_large_data_by_brpc() const;
+ bool is_finished() const override { return _reach_limit.load(); }
+ void set_reach_limit() { _reach_limit = true; };
[[nodiscard]] int sender_id() const { return _sender_id; }
@@ -231,6 +233,7 @@ private:
// for external table sink hash partition
std::unique_ptr<HashPartitionFunction> _partition_function = nullptr;
+ std::atomic<bool> _reach_limit = false;
};
class ExchangeSinkOperatorX final : public
DataSinkOperatorX<ExchangeSinkLocalState> {
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index 009a2a2f22d..770b64d0dc0 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -110,6 +110,8 @@ public:
void close_a_pipeline();
+ virtual void clear_finished_tasks() {}
+
virtual void add_merge_controller_handler(
std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) {}
diff --git
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
index b56cd2b8531..dbc4e37bbab 100644
---
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
+++
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
@@ -52,6 +52,9 @@ Status LocalExchangeSourceLocalState::close(RuntimeState*
state) {
return Status::OK();
}
+ if (_exchanger) {
+ _exchanger->close(*this);
+ }
if (_shared_state) {
_shared_state->sub_running_source_operators();
}
@@ -67,6 +70,11 @@ std::string LocalExchangeSourceLocalState::debug_string(int
indentation_level) c
Base::debug_string(indentation_level), _channel_id,
_exchanger->_num_partitions,
_exchanger->_num_senders, _exchanger->_num_sources,
_exchanger->_running_sink_operators,
_exchanger->_running_source_operators);
+ size_t i = 0;
+ fmt::format_to(debug_string_buffer, ", MemTrackers: ");
+ for (auto* mem_tracker : _shared_state->mem_trackers) {
+ fmt::format_to(debug_string_buffer, "{}: {}, ", i,
mem_tracker->consumption());
+ }
return fmt::to_string(debug_string_buffer);
}
diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
index ce1f05a22bf..abcc0161fd7 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
@@ -39,6 +39,16 @@ Status ShuffleExchanger::sink(RuntimeState* state,
vectorized::Block* in_block,
return Status::OK();
}
+void ShuffleExchanger::close(LocalExchangeSourceLocalState& local_state) {
+ PartitionedBlock partitioned_block;
+ while
(_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
+ auto block_wrapper = partitioned_block.first;
+ local_state._shared_state->sub_mem_usage(
+ local_state._channel_id,
block_wrapper->data_block.allocated_bytes(), false);
+ block_wrapper->unref(local_state._shared_state);
+ }
+}
+
Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block*
block, bool* eos,
LocalExchangeSourceLocalState& local_state)
{
PartitionedBlock partitioned_block;
@@ -185,6 +195,14 @@ Status PassthroughExchanger::sink(RuntimeState* state,
vectorized::Block* in_blo
return Status::OK();
}
+void PassthroughExchanger::close(LocalExchangeSourceLocalState& local_state) {
+ vectorized::Block next_block;
+ while (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
+ local_state._shared_state->sub_mem_usage(local_state._channel_id,
+ next_block.allocated_bytes());
+ }
+}
+
Status PassthroughExchanger::get_block(RuntimeState* state, vectorized::Block*
block, bool* eos,
LocalExchangeSourceLocalState&
local_state) {
vectorized::Block next_block;
@@ -254,6 +272,13 @@ Status BroadcastExchanger::sink(RuntimeState* state,
vectorized::Block* in_block
return Status::OK();
}
+void BroadcastExchanger::close(LocalExchangeSourceLocalState& local_state) {
+ vectorized::Block next_block;
+ while (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
+ // do nothing
+ }
+}
+
Status BroadcastExchanger::get_block(RuntimeState* state, vectorized::Block*
block, bool* eos,
LocalExchangeSourceLocalState&
local_state) {
vectorized::Block next_block;
diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
index cc91b4f1a10..f3b210b11f3 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
@@ -45,6 +45,7 @@ public:
virtual Status sink(RuntimeState* state, vectorized::Block* in_block, bool
eos,
LocalExchangeSinkLocalState& local_state) = 0;
virtual ExchangeType get_type() const = 0;
+ virtual void close(LocalExchangeSourceLocalState& local_state) {}
protected:
friend struct LocalExchangeSharedState;
@@ -95,6 +96,7 @@ public:
Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos,
LocalExchangeSourceLocalState& local_state) override;
+ void close(LocalExchangeSourceLocalState& local_state) override;
ExchangeType get_type() const override { return
ExchangeType::HASH_SHUFFLE; }
protected:
@@ -137,6 +139,7 @@ public:
Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos,
LocalExchangeSourceLocalState& local_state) override;
ExchangeType get_type() const override { return ExchangeType::PASSTHROUGH;
}
+ void close(LocalExchangeSourceLocalState& local_state) override;
private:
std::vector<moodycamel::ConcurrentQueue<vectorized::Block>> _data_queue;
@@ -175,6 +178,7 @@ public:
Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos,
LocalExchangeSourceLocalState& local_state) override;
ExchangeType get_type() const override { return ExchangeType::BROADCAST; }
+ void close(LocalExchangeSourceLocalState& local_state) override;
private:
std::vector<moodycamel::ConcurrentQueue<vectorized::Block>> _data_queue;
diff --git a/be/src/pipeline/pipeline_x/operator.h
b/be/src/pipeline/pipeline_x/operator.h
index efa35b2c2fe..97035358037 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -490,6 +490,7 @@ public:
// idempotent (e.g. wait for runtime filters).
virtual Status open(RuntimeState* state) = 0;
virtual Status close(RuntimeState* state, Status exec_status) = 0;
+ [[nodiscard]] virtual bool is_finished() const { return false; }
[[nodiscard]] virtual std::string debug_string(int indentation_level)
const = 0;
@@ -595,6 +596,13 @@ public:
[[nodiscard]] virtual Status setup_local_state(RuntimeState* state,
LocalSinkStateInfo& info) =
0;
+ [[nodiscard]] bool is_finished(RuntimeState* state) const {
+ auto result = state->get_sink_local_state_result();
+ if (!result) {
+ return result.error();
+ }
+ return result.value()->is_finished();
+ }
template <class TARGET>
TARGET& cast() {
DCHECK(dynamic_cast<TARGET*>(this))
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
index 34d00c07652..ea2212357a2 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
@@ -70,6 +70,13 @@ public:
~PipelineXFragmentContext() override;
+ void clear_finished_tasks() override {
+ for (size_t j = 0; j < _tasks.size(); j++) {
+ for (size_t i = 0; i < _tasks[j].size(); i++) {
+ _tasks[j][i]->stop_if_finished();
+ }
+ }
+ };
void instance_ids(std::vector<TUniqueId>& ins_ids) const override {
ins_ids.resize(_fragment_instance_ids.size());
for (size_t i = 0; i < _fragment_instance_ids.size(); i++) {
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index c995b66997d..b13cd8c9c61 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -247,7 +247,10 @@ Status PipelineXTask::execute(bool* eos) {
cpu_qs->add_cpu_nanos(delta_cpu_time);
}
}};
- *eos = false;
+ *eos = _sink->is_finished(_state);
+ if (*eos) {
+ return Status::OK();
+ }
if (has_dependency()) {
set_state(PipelineTaskState::BLOCKED_FOR_DEPENDENCY);
return Status::OK();
@@ -310,7 +313,9 @@ Status PipelineXTask::execute(bool* eos) {
return status;
});
// Pull block from operator chain
- if (!_dry_run) {
+ if (_dry_run || _sink->is_finished(_state)) {
+ *eos = true;
+ } else {
SCOPED_TIMER(_get_block_timer);
_get_block_counter->update(1);
try {
@@ -319,8 +324,6 @@ Status PipelineXTask::execute(bool* eos) {
return Status::InternalError(e.to_string() +
" task debug string: " +
debug_string());
}
- } else {
- *eos = true;
}
if (_block->rows() != 0 || *eos) {
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index ef55423e16c..83c07827848 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -157,6 +157,12 @@ public:
return false;
}
+ void stop_if_finished() {
+ if (_sink->is_finished(_state)) {
+ clear_blocking_state();
+ }
+ }
+
static bool should_revoke_memory(RuntimeState* state, int64_t
revocable_mem_bytes);
private:
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index ba30323addf..7003f1d49f5 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -1122,6 +1122,8 @@ void FragmentMgr::cancel_worker() {
for (auto& ins_id : ins_ids) {
to_cancel.push_back(ins_id);
}
+ } else {
+ pipeline_itr.second->clear_finished_tasks();
}
}
for (auto it = _query_ctx_map.begin(); it !=
_query_ctx_map.end();) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]