This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7434f80300 [pipelineX](refactor) Refactor pending finish dependency
(#25181)
7434f80300 is described below
commit 7434f8030069ca42642e87ae73fe22c30ec741a6
Author: Gabriel <[email protected]>
AuthorDate: Tue Oct 10 11:56:02 2023 +0800
[pipelineX](refactor) Refactor pending finish dependency (#25181)
---
be/src/pipeline/exec/exchange_sink_buffer.cpp | 50 +++++++++++++++++-----
be/src/pipeline/exec/exchange_sink_buffer.h | 11 ++++-
be/src/pipeline/exec/exchange_sink_operator.cpp | 5 ++-
be/src/pipeline/exec/exchange_sink_operator.h | 2 +-
be/src/pipeline/exec/exchange_source_operator.cpp | 4 --
be/src/pipeline/exec/exchange_source_operator.h | 2 -
be/src/pipeline/exec/jdbc_table_sink_operator.cpp | 4 +-
be/src/pipeline/exec/jdbc_table_sink_operator.h | 2 +-
be/src/pipeline/exec/result_file_sink_operator.cpp | 6 +--
be/src/pipeline/exec/result_file_sink_operator.h | 4 +-
be/src/pipeline/exec/result_sink_operator.h | 12 +++---
be/src/pipeline/exec/scan_operator.cpp | 7 +--
be/src/pipeline/exec/scan_operator.h | 10 ++---
be/src/pipeline/pipeline_x/dependency.h | 42 ++++++++++++++++++
be/src/pipeline/pipeline_x/operator.cpp | 23 +++++++---
be/src/pipeline/pipeline_x/operator.h | 21 +++------
be/src/pipeline/pipeline_x/pipeline_x_task.h | 19 ++++----
be/src/vec/exec/scan/pip_scanner_context.h | 4 +-
be/src/vec/exec/scan/scanner_context.cpp | 31 ++++++++++++++
be/src/vec/exec/scan/scanner_context.h | 17 +++-----
be/src/vec/sink/writer/async_result_writer.cpp | 10 +++++
be/src/vec/sink/writer/async_result_writer.h | 5 ++-
22 files changed, 205 insertions(+), 86 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index f055151698..22b270c3b8 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -101,7 +101,7 @@ bool ExchangeSinkBuffer<Parent>::is_pending_finish() {
for (auto& pair : _instance_to_package_queue_mutex) {
std::unique_lock<std::mutex> lock(*(pair.second));
auto& id = pair.first;
- if (!_instance_to_sending_by_pipeline.at(id)) {
+ if (!_rpc_channel_is_idle.at(id)) {
// when pending finish, we need check whether current query is
cancelled
if (need_cancel && _instance_to_rpc_ctx.find(id) !=
_instance_to_rpc_ctx.end()) {
auto& rpc_ctx = _instance_to_rpc_ctx[id];
@@ -135,7 +135,7 @@ void ExchangeSinkBuffer<Parent>::register_sink(TUniqueId
fragment_instance_id) {
PUniqueId finst_id;
finst_id.set_hi(fragment_instance_id.hi);
finst_id.set_lo(fragment_instance_id.lo);
- _instance_to_sending_by_pipeline[low_id] = true;
+ _rpc_channel_is_idle[low_id] = true;
_instance_to_rpc_ctx[low_id] = {};
_instance_to_receiver_eof[low_id] = false;
_instance_to_rpc_time[low_id] = 0;
@@ -152,9 +152,13 @@ Status
ExchangeSinkBuffer<Parent>::add_block(TransmitInfo<Parent>&& request) {
{
std::unique_lock<std::mutex>
lock(*_instance_to_package_queue_mutex[ins_id.lo]);
// Do not have in process rpc, directly send
- if (_instance_to_sending_by_pipeline[ins_id.lo]) {
+ if (_rpc_channel_is_idle[ins_id.lo]) {
send_now = true;
- _instance_to_sending_by_pipeline[ins_id.lo] = false;
+ _rpc_channel_is_idle[ins_id.lo] = false;
+ _busy_channels++;
+ if (_finish_dependency) {
+ _finish_dependency->block_finishing();
+ }
}
_instance_to_package_queue[ins_id.lo].emplace(std::move(request));
_total_queue_size++;
@@ -187,9 +191,13 @@ Status
ExchangeSinkBuffer<Parent>::add_block(BroadcastTransmitInfo<Parent>&& req
{
std::unique_lock<std::mutex>
lock(*_instance_to_package_queue_mutex[ins_id.lo]);
// Do not have in process rpc, directly send
- if (_instance_to_sending_by_pipeline[ins_id.lo]) {
+ if (_rpc_channel_is_idle[ins_id.lo]) {
send_now = true;
- _instance_to_sending_by_pipeline[ins_id.lo] = false;
+ _rpc_channel_is_idle[ins_id.lo] = false;
+ _busy_channels++;
+ if (_finish_dependency) {
+ _finish_dependency->block_finishing();
+ }
}
_instance_to_broadcast_package_queue[ins_id.lo].emplace(request);
}
@@ -204,7 +212,7 @@ template <typename Parent>
Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId id) {
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
- DCHECK(_instance_to_sending_by_pipeline[id] == false);
+ DCHECK(_rpc_channel_is_idle[id] == false);
std::queue<TransmitInfo<Parent>, std::list<TransmitInfo<Parent>>>& q =
_instance_to_package_queue[id];
@@ -212,7 +220,11 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId
id) {
broadcast_q = _instance_to_broadcast_package_queue[id];
if (_is_finishing) {
- _instance_to_sending_by_pipeline[id] = true;
+ _rpc_channel_is_idle[id] = true;
+ _busy_channels--;
+ if (_finish_dependency && _busy_channels == 0) {
+ _finish_dependency->set_ready_to_finish();
+ }
return Status::OK();
}
@@ -326,7 +338,11 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId
id) {
}
broadcast_q.pop();
} else {
- _instance_to_sending_by_pipeline[id] = true;
+ _rpc_channel_is_idle[id] = true;
+ _busy_channels--;
+ if (_finish_dependency && _busy_channels == 0) {
+ _finish_dependency->set_ready_to_finish();
+ }
}
return Status::OK();
@@ -346,7 +362,13 @@ void
ExchangeSinkBuffer<Parent>::_construct_request(InstanceLoId id, PUniqueId f
template <typename Parent>
void ExchangeSinkBuffer<Parent>::_ended(InstanceLoId id) {
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
- _instance_to_sending_by_pipeline[id] = true;
+ if (!_rpc_channel_is_idle[id]) {
+ _busy_channels--;
+ if (_finish_dependency && _busy_channels == 0) {
+ _finish_dependency->set_ready_to_finish();
+ }
+ }
+ _rpc_channel_is_idle[id] = true;
}
template <typename Parent>
@@ -360,7 +382,13 @@ template <typename Parent>
void ExchangeSinkBuffer<Parent>::_set_receiver_eof(InstanceLoId id) {
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
_instance_to_receiver_eof[id] = true;
- _instance_to_sending_by_pipeline[id] = true;
+ if (!_rpc_channel_is_idle[id]) {
+ _busy_channels--;
+ if (_finish_dependency && _busy_channels == 0) {
+ _finish_dependency->set_ready_to_finish();
+ }
+ }
+ _rpc_channel_is_idle[id] = true;
}
template <typename Parent>
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h
b/be/src/pipeline/exec/exchange_sink_buffer.h
index d5e530af5c..c47d6c6a14 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -45,6 +45,7 @@ using InstanceLoId = int64_t;
namespace pipeline {
class BroadcastDependency;
class ExchangeSinkQueueDependency;
+class FinishDependency;
} // namespace pipeline
namespace vectorized {
@@ -182,8 +183,10 @@ public:
void set_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t
receive_rpc_time);
void update_profile(RuntimeProfile* profile);
- void set_queue_dependency(std::shared_ptr<ExchangeSinkQueueDependency>
queue_dependency) {
+ void set_dependency(std::shared_ptr<ExchangeSinkQueueDependency>
queue_dependency,
+ std::shared_ptr<FinishDependency> finish_dependency) {
_queue_dependency = queue_dependency;
+ _finish_dependency = finish_dependency;
}
private:
@@ -202,7 +205,10 @@ private:
// TODO: make all flat_hash_map to a STRUT
phmap::flat_hash_map<InstanceLoId, PackageSeq> _instance_to_seq;
phmap::flat_hash_map<InstanceLoId, std::unique_ptr<PTransmitDataParams>>
_instance_to_request;
- phmap::flat_hash_map<InstanceLoId, bool> _instance_to_sending_by_pipeline;
+ // One channel is corresponding to a downstream instance.
+ phmap::flat_hash_map<InstanceLoId, bool> _rpc_channel_is_idle;
+ // Number of busy channels;
+ std::atomic<int> _busy_channels = 0;
phmap::flat_hash_map<InstanceLoId, bool> _instance_to_receiver_eof;
phmap::flat_hash_map<InstanceLoId, int64_t> _instance_to_rpc_time;
phmap::flat_hash_map<InstanceLoId, ExchangeRpcContext>
_instance_to_rpc_ctx;
@@ -230,6 +236,7 @@ private:
static constexpr int QUEUE_CAPACITY_FACTOR = 64;
int _queue_capacity = 0;
std::shared_ptr<ExchangeSinkQueueDependency> _queue_dependency = nullptr;
+ std::shared_ptr<FinishDependency> _finish_dependency = nullptr;
};
} // namespace pipeline
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 46f5e99614..c778e397b8 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -179,6 +179,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& inf
_exchange_sink_dependency = AndDependency::create_shared(_parent->id());
_queue_dependency =
ExchangeSinkQueueDependency::create_shared(_parent->id());
+ _sink_buffer->set_dependency(_queue_dependency, _finish_dependency);
_exchange_sink_dependency->add_child(_queue_dependency);
if ((p._part_type == TPartitionType::UNPARTITIONED || channels.size() ==
1) &&
!only_local_exchange) {
@@ -557,9 +558,9 @@ WriteDependency*
ExchangeSinkOperatorX::wait_for_dependency(RuntimeState* state)
return local_state._exchange_sink_dependency->write_blocked_by();
}
-bool ExchangeSinkOperatorX::is_pending_finish(RuntimeState* state) const {
+FinishDependency* ExchangeSinkOperatorX::finish_blocked_by(RuntimeState*
state) const {
auto& local_state =
state->get_sink_local_state(id())->cast<ExchangeSinkLocalState>();
- return local_state._sink_buffer->is_pending_finish();
+ return local_state._finish_dependency->finish_blocked_by();
}
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h
b/be/src/pipeline/exec/exchange_sink_operator.h
index db83a71097..f76f24479e 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -235,7 +235,7 @@ public:
Status try_close(RuntimeState* state, Status exec_status) override;
WriteDependency* wait_for_dependency(RuntimeState* state) override;
- bool is_pending_finish(RuntimeState* state) const override;
+ FinishDependency* finish_blocked_by(RuntimeState* state) const override;
private:
friend class ExchangeSinkLocalState;
diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp
b/be/src/pipeline/exec/exchange_source_operator.cpp
index 0569670b74..66a6ecc536 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -171,10 +171,6 @@ Dependency*
ExchangeSourceOperatorX::wait_for_dependency(RuntimeState* state) {
return local_state.source_dependency->read_blocked_by();
}
-bool ExchangeSourceOperatorX::is_pending_finish(RuntimeState* /*state*/) const
{
- return false;
-}
-
Status ExchangeLocalState::close(RuntimeState* state) {
SCOPED_TIMER(profile()->total_time_counter());
SCOPED_TIMER(_close_timer);
diff --git a/be/src/pipeline/exec/exchange_source_operator.h
b/be/src/pipeline/exec/exchange_source_operator.h
index a7e146b54d..b0f455cefc 100644
--- a/be/src/pipeline/exec/exchange_source_operator.h
+++ b/be/src/pipeline/exec/exchange_source_operator.h
@@ -113,8 +113,6 @@ public:
ExchangeSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, const
DescriptorTbl& descs,
int num_senders);
Dependency* wait_for_dependency(RuntimeState* state) override;
- bool is_pending_finish(RuntimeState* state) const override;
-
Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;
diff --git a/be/src/pipeline/exec/jdbc_table_sink_operator.cpp
b/be/src/pipeline/exec/jdbc_table_sink_operator.cpp
index a551762027..32c5958b95 100644
--- a/be/src/pipeline/exec/jdbc_table_sink_operator.cpp
+++ b/be/src/pipeline/exec/jdbc_table_sink_operator.cpp
@@ -68,9 +68,9 @@ WriteDependency*
JdbcTableSinkOperatorX::wait_for_dependency(RuntimeState* state
return local_state.write_blocked_by();
}
-bool JdbcTableSinkOperatorX::is_pending_finish(RuntimeState* state) const {
+FinishDependency* JdbcTableSinkOperatorX::finish_blocked_by(RuntimeState*
state) const {
auto& local_state =
state->get_sink_local_state(id())->cast<JdbcTableSinkLocalState>();
- return local_state.is_pending_finish();
+ return local_state._finish_dependency->finish_blocked_by();
}
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/jdbc_table_sink_operator.h
b/be/src/pipeline/exec/jdbc_table_sink_operator.h
index 6db9c38065..a37e1c4098 100644
--- a/be/src/pipeline/exec/jdbc_table_sink_operator.h
+++ b/be/src/pipeline/exec/jdbc_table_sink_operator.h
@@ -55,7 +55,7 @@ public:
SourceState source_state) override;
WriteDependency* wait_for_dependency(RuntimeState* state) override;
- bool is_pending_finish(RuntimeState* state) const override;
+ FinishDependency* finish_blocked_by(RuntimeState* state) const override;
private:
friend class JdbcTableSinkLocalState;
diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp
b/be/src/pipeline/exec/result_file_sink_operator.cpp
index 4369c6b5d8..40190ded8a 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_file_sink_operator.cpp
@@ -146,7 +146,7 @@ Status ResultFileSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& i
}
_only_local_exchange = local_size == _channels.size();
}
- _writer->set_dependency(_async_writer_dependency.get());
+ _writer->set_dependency(_async_writer_dependency.get(),
_finish_dependency.get());
_writer->set_header_info(p._header_type, p._header);
return Status::OK();
}
@@ -268,9 +268,9 @@ Status ResultFileSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* in_
return local_state.sink(state, in_block, source_state);
}
-bool ResultFileSinkOperatorX::is_pending_finish(RuntimeState* state) const {
+FinishDependency* ResultFileSinkOperatorX::finish_blocked_by(RuntimeState*
state) const {
auto& local_state =
state->get_sink_local_state(id())->cast<ResultFileSinkLocalState>();
- return local_state.is_pending_finish();
+ return local_state._finish_dependency->finish_blocked_by();
}
WriteDependency* ResultFileSinkOperatorX::wait_for_dependency(RuntimeState*
state) {
diff --git a/be/src/pipeline/exec/result_file_sink_operator.h
b/be/src/pipeline/exec/result_file_sink_operator.h
index fb77e7e8c9..5ce028e63c 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.h
+++ b/be/src/pipeline/exec/result_file_sink_operator.h
@@ -55,7 +55,7 @@ public:
Status open(RuntimeState* state) override;
Status close(RuntimeState* state, Status exec_status) override;
- int sender_id() const { return _sender_id; }
+ [[nodiscard]] int sender_id() const { return _sender_id; }
RuntimeProfile::Counter* brpc_wait_timer() { return _brpc_wait_timer; }
@@ -94,7 +94,7 @@ public:
WriteDependency* wait_for_dependency(RuntimeState* state) override;
- bool is_pending_finish(RuntimeState* state) const override;
+ FinishDependency* finish_blocked_by(RuntimeState* state) const override;
private:
friend class ResultFileSinkLocalState;
diff --git a/be/src/pipeline/exec/result_sink_operator.h
b/be/src/pipeline/exec/result_sink_operator.h
index 799389ffe3..c6c3a1c69d 100644
--- a/be/src/pipeline/exec/result_sink_operator.h
+++ b/be/src/pipeline/exec/result_sink_operator.h
@@ -43,29 +43,29 @@ public:
bool can_write() override;
};
-class ResultBufferDependency : public WriteDependency {
+class ResultBufferDependency final : public WriteDependency {
public:
ENABLE_FACTORY_CREATOR(ResultBufferDependency);
ResultBufferDependency(int id) : WriteDependency(id,
"ResultBufferDependency") {}
- ~ResultBufferDependency() = default;
+ ~ResultBufferDependency() override = default;
void* shared_state() override { return nullptr; }
};
-class ResultQueueDependency : public WriteDependency {
+class ResultQueueDependency final : public WriteDependency {
public:
ENABLE_FACTORY_CREATOR(ResultQueueDependency);
ResultQueueDependency(int id) : WriteDependency(id,
"ResultQueueDependency") {}
- ~ResultQueueDependency() = default;
+ ~ResultQueueDependency() override = default;
void* shared_state() override { return nullptr; }
};
-class CancelDependency : public WriteDependency {
+class CancelDependency final : public WriteDependency {
public:
ENABLE_FACTORY_CREATOR(CancelDependency);
CancelDependency(int id) : WriteDependency(id, "CancelDependency") {
_ready_for_write = false; }
- ~CancelDependency() = default;
+ ~CancelDependency() override = default;
void* shared_state() override { return nullptr; }
};
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index 3ac6d3d900..df45db62ed 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -1189,7 +1189,8 @@ Status ScanLocalState<Derived>::_start_scanners(
_data_ready_dependency = DataReadyDependency::create_shared(p.id(),
_scanner_ctx.get());
_source_dependency->add_child(_data_ready_dependency);
- _scanner_ctx->set_dependency(_data_ready_dependency,
_scanner_done_dependency);
+ _scanner_ctx->set_dependency(_data_ready_dependency,
_scanner_done_dependency,
+ _finish_dependency);
return Status::OK();
}
@@ -1291,9 +1292,9 @@ Dependency*
ScanOperatorX<LocalStateType>::wait_for_dependency(RuntimeState* sta
}
template <typename LocalStateType>
-bool ScanOperatorX<LocalStateType>::is_pending_finish(RuntimeState* state)
const {
+FinishDependency*
ScanOperatorX<LocalStateType>::finish_blocked_by(RuntimeState* state) const {
auto& local_state = state->get_local_state(id())->template
cast<LocalStateType>();
- return local_state._scanner_ctx &&
!local_state._scanner_ctx->no_schedule();
+ return local_state._finish_dependency->finish_blocked_by();
}
template <typename LocalStateType>
diff --git a/be/src/pipeline/exec/scan_operator.h
b/be/src/pipeline/exec/scan_operator.h
index 602c1ce4ea..5cdbac8957 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -65,14 +65,14 @@ public:
[[nodiscard]] int64_t read_watcher_elapse_time() override { return 0; }
};
-struct EosDependency : public Dependency {
+class EosDependency : public Dependency {
public:
ENABLE_FACTORY_CREATOR(EosDependency);
EosDependency(int id) : Dependency(id, "EosDependency") {}
void* shared_state() override { return nullptr; }
};
-struct ScannerDoneDependency : public Dependency {
+class ScannerDoneDependency : public Dependency {
public:
ENABLE_FACTORY_CREATOR(ScannerDoneDependency);
ScannerDoneDependency(int id, vectorized::ScannerContext* scanner_ctx)
@@ -90,7 +90,7 @@ private:
vectorized::ScannerContext* _scanner_ctx;
};
-struct DataReadyDependency : public Dependency {
+class DataReadyDependency : public Dependency {
public:
ENABLE_FACTORY_CREATOR(DataReadyDependency);
DataReadyDependency(int id, vectorized::ScannerContext* scanner_ctx)
@@ -417,14 +417,14 @@ public:
Status try_close(RuntimeState* state) override;
Dependency* wait_for_dependency(RuntimeState* state) override;
- bool is_pending_finish(RuntimeState* state) const override;
+ FinishDependency* finish_blocked_by(RuntimeState* state) const override;
Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override { return
OperatorXBase::prepare(state); }
Status open(RuntimeState* state) override;
Status get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) override;
- bool is_source() const override { return true; }
+ [[nodiscard]] bool is_source() const override { return true; }
const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() override {
return _runtime_filter_descs;
diff --git a/be/src/pipeline/pipeline_x/dependency.h
b/be/src/pipeline/pipeline_x/dependency.h
index 575b305b01..943e58cd31 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -151,6 +151,48 @@ protected:
MonotonicStopWatch _write_dependency_watcher;
};
+class FinishDependency : public Dependency {
+public:
+ FinishDependency(int id, std::string name) : Dependency(id, name),
_ready_to_finish(true) {}
+ ~FinishDependency() override = default;
+
+ void start_finish_watcher() {
+ for (auto& child : _children) {
+ ((FinishDependency*)child.get())->start_finish_watcher();
+ }
+ _finish_dependency_watcher.start();
+ }
+
+ [[nodiscard]] virtual int64_t finish_watcher_elapse_time() {
+ return _finish_dependency_watcher.elapsed_time();
+ }
+
+ [[nodiscard]] virtual FinishDependency* finish_blocked_by() {
+ if (config::enable_fuzzy_mode && !_ready_to_finish &&
+ _finish_dependency_watcher.elapsed_time() >
SLOW_DEPENDENCY_THRESHOLD) {
+ LOG(WARNING) << "========Dependency may be blocked by some
reasons: " << name() << " "
+ << id();
+ }
+ return _ready_to_finish ? nullptr : this;
+ }
+
+ void set_ready_to_finish() {
+ if (_ready_to_finish) {
+ return;
+ }
+ _finish_dependency_watcher.stop();
+ _ready_to_finish = true;
+ }
+
+ void block_finishing() { _ready_to_finish = false; }
+
+ void* shared_state() override { return nullptr; }
+
+protected:
+ std::atomic<bool> _ready_to_finish;
+ MonotonicStopWatch _finish_dependency_watcher;
+};
+
class AndDependency : public WriteDependency {
public:
ENABLE_FACTORY_CREATOR(AndDependency);
diff --git a/be/src/pipeline/pipeline_x/operator.cpp
b/be/src/pipeline/pipeline_x/operator.cpp
index be573c4a89..d20c3c3263 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -354,6 +354,22 @@ Status
OperatorX<UnionSourceLocalState>::setup_local_states(RuntimeState* state,
return Status::OK();
}
+PipelineXSinkLocalStateBase::PipelineXSinkLocalStateBase(DataSinkOperatorXBase*
parent,
+ RuntimeState* state)
+ : _parent(parent),
+ _state(state),
+ _finish_dependency(new FinishDependency(parent->id(),
parent->get_name())) {}
+
+PipelineXLocalStateBase::PipelineXLocalStateBase(RuntimeState* state,
OperatorXBase* parent)
+ : _num_rows_returned(0),
+ _rows_returned_counter(nullptr),
+ _rows_returned_rate(nullptr),
+ _memory_used_counter(nullptr),
+ _peak_memory_usage_counter(nullptr),
+ _parent(parent),
+ _state(state),
+ _finish_dependency(new FinishDependency(parent->id(),
parent->get_name())) {}
+
template <typename DependencyType>
Status PipelineXLocalState<DependencyType>::init(RuntimeState* state,
LocalStateInfo& info) {
_runtime_profile.reset(new RuntimeProfile(_parent->get_name() +
@@ -499,7 +515,7 @@ Status AsyncWriterSink<Writer, Parent>::init(RuntimeState*
state, LocalSinkState
_writer.reset(new Writer(info.tsink, _output_vexpr_ctxs));
_async_writer_dependency =
AsyncWriterDependency::create_shared(_parent->id());
- _writer->set_dependency(_async_writer_dependency.get());
+ _writer->set_dependency(_async_writer_dependency.get(),
_finish_dependency.get());
_wait_for_dependency_timer =
ADD_TIMER(_profile, "WaitForDependency[" +
_async_writer_dependency->name() + "]Time");
@@ -548,11 +564,6 @@ Status AsyncWriterSink<Writer,
Parent>::try_close(RuntimeState* state, Status ex
return Status::OK();
}
-template <typename Writer, typename Parent>
-bool AsyncWriterSink<Writer, Parent>::is_pending_finish() {
- return _writer->is_pending_finish();
-}
-
#define DECLARE_OPERATOR_X(LOCAL_STATE) template class
DataSinkOperatorX<LOCAL_STATE>;
DECLARE_OPERATOR_X(HashJoinBuildSinkLocalState)
DECLARE_OPERATOR_X(ResultSinkLocalState)
diff --git a/be/src/pipeline/pipeline_x/operator.h
b/be/src/pipeline/pipeline_x/operator.h
index f01119daef..d880e55feb 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -58,14 +58,7 @@ struct LocalSinkStateInfo {
class PipelineXLocalStateBase {
public:
- PipelineXLocalStateBase(RuntimeState* state, OperatorXBase* parent)
- : _num_rows_returned(0),
- _rows_returned_counter(nullptr),
- _rows_returned_rate(nullptr),
- _memory_used_counter(nullptr),
- _peak_memory_usage_counter(nullptr),
- _parent(parent),
- _state(state) {}
+ PipelineXLocalStateBase(RuntimeState* state, OperatorXBase* parent);
virtual ~PipelineXLocalStateBase() = default;
template <class TARGET>
@@ -147,6 +140,7 @@ protected:
vectorized::VExprContextSPtrs _projections;
bool _closed = false;
vectorized::Block _origin_block;
+ std::shared_ptr<FinishDependency> _finish_dependency;
};
class OperatorXBase : public OperatorBase {
@@ -224,7 +218,7 @@ public:
virtual Dependency* wait_for_dependency(RuntimeState* state) { return
nullptr; }
- virtual bool is_pending_finish(RuntimeState* state) const { return false; }
+ virtual FinishDependency* finish_blocked_by(RuntimeState* state) const {
return nullptr; }
[[nodiscard]] virtual const RowDescriptor& intermediate_row_desc() const {
return _row_descriptor;
@@ -338,8 +332,7 @@ class DataSinkOperatorXBase;
class PipelineXSinkLocalStateBase {
public:
- PipelineXSinkLocalStateBase(DataSinkOperatorXBase* parent_, RuntimeState*
state_)
- : _parent(parent_), _state(state_) {}
+ PipelineXSinkLocalStateBase(DataSinkOperatorXBase* parent_, RuntimeState*
state_);
virtual ~PipelineXSinkLocalStateBase() = default;
// Do initialization. This step should be executed only once and in
bthread, so we can do some
@@ -401,6 +394,8 @@ protected:
RuntimeProfile::Counter* _open_timer = nullptr;
RuntimeProfile::Counter* _close_timer = nullptr;
RuntimeProfile::Counter* _wait_for_dependency_timer;
+
+ std::shared_ptr<FinishDependency> _finish_dependency;
};
class DataSinkOperatorXBase : public OperatorBase {
@@ -469,7 +464,7 @@ public:
virtual WriteDependency* wait_for_dependency(RuntimeState* state) { return
nullptr; }
- virtual bool is_pending_finish(RuntimeState* state) const { return false; }
+ virtual FinishDependency* finish_blocked_by(RuntimeState* state) const {
return nullptr; }
[[nodiscard]] std::string debug_string() const override { return ""; }
@@ -629,8 +624,6 @@ public:
Status try_close(RuntimeState* state, Status exec_status) override;
- bool is_pending_finish();
-
protected:
vectorized::VExprContextSPtrs _output_vexpr_ctxs;
std::unique_ptr<Writer> _writer;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index 04b647b28a..c406d7f30f 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -101,18 +101,17 @@ public:
std::string debug_string() override;
bool is_pending_finish() override {
- bool source_ret = _source->is_pending_finish(_state);
- if (source_ret) {
- return true;
- } else {
- set_src_pending_finish_time();
+ for (auto& op : _operators) {
+ auto dep = op->finish_blocked_by(_state);
+ if (dep != nullptr) {
+ dep->start_finish_watcher();
+ return true;
+ }
}
-
- bool sink_ret = _sink->is_pending_finish(_state);
- if (sink_ret) {
+ auto dep = _sink->finish_blocked_by(_state);
+ if (dep != nullptr) {
+ dep->start_finish_watcher();
return true;
- } else {
- set_dst_pending_finish_time();
}
return false;
}
diff --git a/be/src/vec/exec/scan/pip_scanner_context.h
b/be/src/vec/exec/scan/pip_scanner_context.h
index b06d84f0e1..159cf2ba65 100644
--- a/be/src/vec/exec/scan/pip_scanner_context.h
+++ b/be/src/vec/exec/scan/pip_scanner_context.h
@@ -51,9 +51,11 @@ public:
_need_colocate_distribute(!_col_distribute_ids.empty()) {}
void set_dependency(std::shared_ptr<DataReadyDependency> dependency,
- std::shared_ptr<ScannerDoneDependency>
scanner_done_dependency) override {
+ std::shared_ptr<ScannerDoneDependency>
scanner_done_dependency,
+ std::shared_ptr<FinishDependency> finish_dependency)
override {
_data_dependency = dependency;
_scanner_done_dependency = scanner_done_dependency;
+ _finish_dependency = finish_dependency;
}
Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr*
block, bool* eos,
diff --git a/be/src/vec/exec/scan/scanner_context.cpp
b/be/src/vec/exec/scan/scanner_context.cpp
index d5ca622dec..9363759941 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -129,6 +129,10 @@ Status ScannerContext::init() {
// 4. This ctx will be submitted to the scanner scheduler right after init.
// So set _num_scheduling_ctx to 1 here.
_num_scheduling_ctx = 1;
+ if (_finish_dependency) {
+ std::lock_guard l(_transfer_lock);
+ _finish_dependency->block_finishing();
+ }
_num_unfinished_scanners = _scanners.size();
@@ -208,6 +212,9 @@ Status ScannerContext::get_block_from_queue(RuntimeState*
state, vectorized::Blo
auto state = _scanner_scheduler->submit(this);
if (state.ok()) {
_num_scheduling_ctx++;
+ if (_finish_dependency) {
+ _finish_dependency->block_finishing();
+ }
} else {
set_status_on_error(state, false);
}
@@ -283,6 +290,21 @@ void ScannerContext::set_should_stop() {
_blocks_queue_added_cv.notify_one();
}
+void ScannerContext::update_num_running(int32_t scanner_inc, int32_t
sched_inc) {
+ std::lock_guard l(_transfer_lock);
+ _num_running_scanners += scanner_inc;
+ _num_scheduling_ctx += sched_inc;
+ if (_finish_dependency) {
+ if (_num_running_scanners == 0 && _num_scheduling_ctx == 0) {
+ _finish_dependency->set_ready_to_finish();
+ } else {
+ _finish_dependency->block_finishing();
+ }
+ }
+ _blocks_queue_added_cv.notify_one();
+ _ctx_finish_cv.notify_one();
+}
+
bool ScannerContext::set_status_on_error(const Status& status, bool need_lock)
{
std::unique_lock l(_transfer_lock, std::defer_lock);
if (need_lock) {
@@ -405,6 +427,9 @@ void ScannerContext::reschedule_scanner_ctx() {
//todo(wb) rethinking is it better to mark current scan_context failed
when submit failed many times?
if (state.ok()) {
_num_scheduling_ctx++;
+ if (_finish_dependency) {
+ _finish_dependency->block_finishing();
+ }
} else {
set_status_on_error(state, false);
}
@@ -421,11 +446,17 @@ void
ScannerContext::push_back_scanner_and_reschedule(VScannerSPtr scanner) {
// We have to decrease _num_running_scanners before schedule, otherwise
// schedule does not woring due to _num_running_scanners.
_num_running_scanners--;
+ if (_finish_dependency && _num_running_scanners == 0 &&
_num_scheduling_ctx == 0) {
+ _finish_dependency->set_ready_to_finish();
+ }
if (should_be_scheduled()) {
auto state = _scanner_scheduler->submit(this);
if (state.ok()) {
_num_scheduling_ctx++;
+ if (_finish_dependency) {
+ _finish_dependency->block_finishing();
+ }
} else {
set_status_on_error(state, false);
}
diff --git a/be/src/vec/exec/scan/scanner_context.h
b/be/src/vec/exec/scan/scanner_context.h
index 1b0ebef8b5..07f9f05551 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -43,8 +43,9 @@ class TupleDescriptor;
namespace pipeline {
class ScanLocalStateBase;
-struct ScannerDoneDependency;
-struct DataReadyDependency;
+class ScannerDoneDependency;
+class FinishDependency;
+class DataReadyDependency;
} // namespace pipeline
namespace taskgroup {
@@ -106,7 +107,8 @@ public:
virtual void set_dependency(
std::shared_ptr<pipeline::DataReadyDependency> dependency,
- std::shared_ptr<pipeline::ScannerDoneDependency>
scanner_done_dependency) {}
+ std::shared_ptr<pipeline::ScannerDoneDependency>
scanner_done_dependency,
+ std::shared_ptr<pipeline::FinishDependency> finish_dependency) {}
// Called by ScanNode.
// Used to notify the scheduler that this ScannerContext can stop working.
@@ -116,13 +118,7 @@ public:
virtual bool done() { return _is_finished || _should_stop; }
// Update the running num of scanners and contexts
- void update_num_running(int32_t scanner_inc, int32_t sched_inc) {
- std::lock_guard l(_transfer_lock);
- _num_running_scanners += scanner_inc;
- _num_scheduling_ctx += sched_inc;
- _blocks_queue_added_cv.notify_one();
- _ctx_finish_cv.notify_one();
- }
+ void update_num_running(int32_t scanner_inc, int32_t sched_inc);
int get_num_running_scanners() const { return _num_running_scanners; }
@@ -278,6 +274,7 @@ protected:
RuntimeProfile::Counter* _scanner_wait_batch_timer = nullptr;
std::shared_ptr<pipeline::ScannerDoneDependency> _scanner_done_dependency
= nullptr;
+ std::shared_ptr<pipeline::FinishDependency> _finish_dependency = nullptr;
};
} // namespace vectorized
} // namespace doris
diff --git a/be/src/vec/sink/writer/async_result_writer.cpp
b/be/src/vec/sink/writer/async_result_writer.cpp
index 2a7d8d988d..c142eb1ebc 100644
--- a/be/src/vec/sink/writer/async_result_writer.cpp
+++ b/be/src/vec/sink/writer/async_result_writer.cpp
@@ -35,6 +35,13 @@ namespace vectorized {
AsyncResultWriter::AsyncResultWriter(const
doris::vectorized::VExprContextSPtrs& output_expr_ctxs)
: _vec_output_expr_ctxs(output_expr_ctxs), _dependency(nullptr) {};
+void AsyncResultWriter::set_dependency(pipeline::AsyncWriterDependency* dep,
+ pipeline::FinishDependency* finish_dep)
{
+ _dependency = dep;
+ _finish_dependency = finish_dep;
+ _finish_dependency->block_finishing();
+}
+
Status AsyncResultWriter::sink(Block* block, bool eos) {
auto rows = block->rows();
auto status = Status::OK();
@@ -132,6 +139,9 @@ void AsyncResultWriter::process_block(RuntimeState* state,
RuntimeProfile* profi
_need_normal_close = false;
}
_writer_thread_closed = true;
+ if (_finish_dependency) {
+ _finish_dependency->set_ready_to_finish();
+ }
}
Status AsyncResultWriter::_projection_block(doris::vectorized::Block&
input_block,
diff --git a/be/src/vec/sink/writer/async_result_writer.h
b/be/src/vec/sink/writer/async_result_writer.h
index 5d0cd26052..780f8b506e 100644
--- a/be/src/vec/sink/writer/async_result_writer.h
+++ b/be/src/vec/sink/writer/async_result_writer.h
@@ -35,6 +35,7 @@ class TExpr;
namespace pipeline {
class AsyncWriterDependency;
class WriteDependency;
+class FinishDependency;
} // namespace pipeline
@@ -56,7 +57,8 @@ class AsyncResultWriter : public ResultWriter {
public:
AsyncResultWriter(const VExprContextSPtrs& output_expr_ctxs);
- void set_dependency(pipeline::AsyncWriterDependency* dep) { _dependency =
dep; }
+ void set_dependency(pipeline::AsyncWriterDependency* dep,
+ pipeline::FinishDependency* finish_dep);
void force_close(Status s);
@@ -117,6 +119,7 @@ private:
// Used by pipelineX
pipeline::AsyncWriterDependency* _dependency;
+ pipeline::FinishDependency* _finish_dependency;
moodycamel::ConcurrentQueue<std::unique_ptr<Block>> _free_blocks;
};
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]