This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5d80e7dc2fe [Improvement](pipelineX) Improve local exchange on
pipelineX engine (#26464)
5d80e7dc2fe is described below
commit 5d80e7dc2fe7a9cc80cc26244146d5dda8070908
Author: Gabriel <[email protected]>
AuthorDate: Tue Nov 7 22:11:44 2023 +0800
[Improvement](pipelineX) Improve local exchange on pipelineX engine (#26464)
---
be/src/pipeline/exec/exchange_sink_operator.cpp | 16 ++++-----
be/src/pipeline/exec/exchange_sink_operator.h | 29 +++++++++++++---
be/src/runtime/runtime_state.h | 4 +++
be/src/vec/runtime/vdata_stream_recvr.cpp | 46 +++++++++++++++++--------
be/src/vec/runtime/vdata_stream_recvr.h | 22 +++++++-----
be/src/vec/sink/vdata_stream_sender.h | 12 +++----
6 files changed, 85 insertions(+), 44 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index b5cceb5a291..e6bcccbabb2 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -192,15 +192,14 @@ Status ExchangeSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& inf
ADD_CHILD_TIMER(_profile, "WaitForBroadcastBuffer",
timer_name);
} else if (local_size > 0) {
size_t dep_id = 0;
- _channels_dependency.resize(local_size);
+ _local_channels_dependency.resize(local_size);
_wait_channel_timer.resize(local_size);
auto deps_for_channels =
AndDependency::create_shared(_parent->operator_id());
for (auto channel : channels) {
if (channel->is_local()) {
- _channels_dependency[dep_id] =
-
ChannelDependency::create_shared(_parent->operator_id());
- channel->set_dependency(_channels_dependency[dep_id]);
- deps_for_channels->add_child(_channels_dependency[dep_id]);
+ _local_channels_dependency[dep_id] =
channel->get_local_channel_dependency();
+ DCHECK(_local_channels_dependency[dep_id] != nullptr);
+
deps_for_channels->add_child(_local_channels_dependency[dep_id]);
_wait_channel_timer[dep_id] =
ADD_CHILD_TIMER(_profile,
"WaitForLocalExchangeBuffer", timer_name);
dep_id++;
@@ -428,7 +427,8 @@ Status
ExchangeSinkOperatorX::serialize_block(ExchangeSinkLocalState& state, vec
{
SCOPED_TIMER(state.serialize_batch_timer());
dest->Clear();
- size_t uncompressed_bytes = 0, compressed_bytes = 0;
+ size_t uncompressed_bytes = 0;
+ size_t compressed_bytes = 0;
RETURN_IF_ERROR(src->serialize(_state->be_exec_version(), dest,
&uncompressed_bytes,
&compressed_bytes, _compression_type,
_transfer_large_data_by_brpc));
@@ -521,9 +521,9 @@ Status ExchangeSinkLocalState::close(RuntimeState* state,
Status exec_status) {
COUNTER_UPDATE(_wait_broadcast_buffer_timer,
_broadcast_dependency->write_watcher_elapse_time());
}
- for (size_t i = 0; i < _channels_dependency.size(); i++) {
+ for (size_t i = 0; i < _local_channels_dependency.size(); i++) {
COUNTER_UPDATE(_wait_channel_timer[i],
- _channels_dependency[i]->write_watcher_elapse_time());
+
_local_channels_dependency[i]->write_watcher_elapse_time());
}
_sink_buffer->update_profile(profile());
_sink_buffer->close();
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h
b/be/src/pipeline/exec/exchange_sink_operator.h
index 815d3930577..2b42d28958a 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -113,11 +113,30 @@ private:
std::atomic<int> _available_block;
};
-class ChannelDependency final : public WriteDependency {
+/**
+ * We use this to control the execution for local exchange.
+ * +---------------+
+---------------+ +---------------+
+ * | ExchangeSink1 | |
ExchangeSink2 | | ExchangeSink3 |
+ * +---------------+
+---------------+ +---------------+
+ * | |
|
+ * |
+----------------------------+----------------------------------+ |
+ *
+----+------------------|------------------------------------------+
| |
+ * | |
+------------------------|--------------------|------------+-----+
+ * Dependency 1-1 | Dependency 2-1 | Dependency 3-1 |
Dependency 1-2 | Dependency 2-2 | Dependency 3-2 |
+ * +----------------------------------------------+
+----------------------------------------------+
+ * | queue1 queue2 queue3 |
| queue1 queue2 queue3 |
+ * | LocalRecvr |
| LocalRecvr |
+ * +----------------------------------------------+
+----------------------------------------------+
+ * +-----------------+
+------------------+
+ * | ExchangeSource1 |
| ExchangeSource2 |
+ * +-----------------+
+------------------+
+ */
+class LocalExchangeChannelDependency final : public WriteDependency {
public:
- ENABLE_FACTORY_CREATOR(ChannelDependency);
- ChannelDependency(int id) : WriteDependency(id, "ChannelDependency") {}
- ~ChannelDependency() override = default;
+ ENABLE_FACTORY_CREATOR(LocalExchangeChannelDependency);
+ LocalExchangeChannelDependency(int id)
+ : WriteDependency(id, "LocalExchangeChannelDependency") {}
+ ~LocalExchangeChannelDependency() override = default;
void* shared_state() override { return nullptr; }
};
@@ -209,7 +228,7 @@ private:
std::shared_ptr<ExchangeSinkQueueDependency> _queue_dependency = nullptr;
std::shared_ptr<AndDependency> _exchange_sink_dependency = nullptr;
std::shared_ptr<BroadcastDependency> _broadcast_dependency = nullptr;
- std::vector<std::shared_ptr<ChannelDependency>> _channels_dependency;
+ std::vector<std::shared_ptr<LocalExchangeChannelDependency>>
_local_channels_dependency;
std::unique_ptr<vectorized::PartitionerBase> _partitioner;
int _partition_count;
};
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 1c20725cd33..805951e34e3 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -329,6 +329,10 @@ public:
return _query_options.__isset.enable_pipeline_engine &&
_query_options.enable_pipeline_engine;
}
+ bool enable_pipeline_x_exec() const {
+ return _query_options.__isset.enable_pipeline_x_engine &&
+ _query_options.enable_pipeline_x_engine;
+ }
bool enable_local_shuffle() const {
return _query_options.__isset.enable_local_shuffle &&
_query_options.enable_local_shuffle;
}
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index b842cbeea48..9384d4abbfc 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -100,7 +100,12 @@ Status
VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* block
_recvr->update_blocks_memory_usage(-block_byte_size);
_block_queue.pop_front();
if (_block_queue.size() == 0 && _dependency) {
- _dependency->block_reading();
+ if (!_is_cancelled && _num_remaining_senders > 0) {
+ _dependency->block_reading();
+ }
+ for (auto& it : _local_channel_dependency) {
+ it->set_ready_for_write();
+ }
}
if (!_pending_closures.empty()) {
@@ -350,6 +355,14 @@ VDataStreamRecvr::VDataStreamRecvr(
std::make_unique<MemTracker>("VDataStreamRecvr:" +
print_id(_fragment_instance_id));
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
+ if (state->enable_pipeline_x_exec()) {
+ _sender_to_local_channel_dependency.resize(num_senders);
+ for (size_t i = 0; i < num_senders; i++) {
+ _sender_to_local_channel_dependency[i] =
+
pipeline::LocalExchangeChannelDependency::create_shared(_dest_node_id);
+ }
+ }
+
// Create one queue per sender if is_merging is true.
int num_queues = is_merging ? num_senders : 1;
_sender_queues.reserve(num_queues);
@@ -358,6 +371,16 @@ VDataStreamRecvr::VDataStreamRecvr(
SenderQueue* queue = nullptr;
if (_enable_pipeline) {
queue = _sender_queue_pool.add(new PipSenderQueue(this,
num_sender_per_queue, profile));
+ if (state->enable_pipeline_x_exec()) {
+ auto dependencies =
+ is_merging
+ ? std::vector<std::shared_ptr<
+ pipeline::
+
LocalExchangeChannelDependency>> {_sender_to_local_channel_dependency
+
[i]}
+ : _sender_to_local_channel_dependency;
+ queue->set_local_channel_dependency(dependencies);
+ }
} else {
queue = _sender_queue_pool.add(new SenderQueue(this,
num_sender_per_queue, profile));
}
@@ -424,11 +447,11 @@ bool VDataStreamRecvr::sender_queue_empty(int sender_id) {
return _sender_queues[use_sender_id]->queue_empty();
}
-void
VDataStreamRecvr::set_dependency(std::shared_ptr<pipeline::ChannelDependency>
dependency) {
- _dependency = dependency;
- for (auto& queue : _sender_queues) {
- queue->set_channel_dependency(dependency);
- }
+std::shared_ptr<pipeline::LocalExchangeChannelDependency>
+VDataStreamRecvr::get_local_channel_dependency(int sender_id) {
+ DCHECK_GT(_sender_to_local_channel_dependency.size(), sender_id);
+ DCHECK(_sender_to_local_channel_dependency[sender_id] != nullptr);
+ return _sender_to_local_channel_dependency[sender_id];
}
bool VDataStreamRecvr::ready_to_read() {
@@ -482,13 +505,6 @@ void VDataStreamRecvr::cancel_stream(Status exec_status) {
void VDataStreamRecvr::update_blocks_memory_usage(int64_t size) {
_blocks_memory_usage->add(size);
_blocks_memory_usage_current_value = _blocks_memory_usage->current_value();
- if (_dependency && size > 0 &&
- _blocks_memory_usage_current_value >
config::exchg_node_buffer_size_bytes && !_is_closed) {
- _dependency->block_writing();
- } else if (_dependency && size < 0 &&
- _blocks_memory_usage_current_value <=
config::exchg_node_buffer_size_bytes) {
- _dependency->set_ready_for_write();
- }
}
void VDataStreamRecvr::close() {
@@ -496,8 +512,8 @@ void VDataStreamRecvr::close() {
return;
}
_is_closed = true;
- if (_dependency) {
- _dependency->set_ready_for_write();
+ for (auto& it : _sender_to_local_channel_dependency) {
+ it->set_ready_for_write();
}
for (int i = 0; i < _sender_queues.size(); ++i) {
_sender_queues[i]->close();
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h
b/be/src/vec/runtime/vdata_stream_recvr.h
index b2fd6afdc7f..2f5c88301e2 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -59,7 +59,7 @@ class RuntimeState;
namespace pipeline {
struct ExchangeDataDependency;
-class ChannelDependency;
+class LocalExchangeChannelDependency;
} // namespace pipeline
namespace vectorized {
@@ -125,7 +125,8 @@ public:
bool is_closed() const { return _is_closed; }
- void set_dependency(std::shared_ptr<pipeline::ChannelDependency>
dependency);
+ std::shared_ptr<pipeline::LocalExchangeChannelDependency>
get_local_channel_dependency(
+ int sender_id);
private:
void update_blocks_memory_usage(int64_t size);
@@ -183,7 +184,8 @@ private:
std::shared_ptr<QueryStatisticsRecvr> _sub_plan_query_statistics_recvr;
bool _enable_pipeline;
- std::shared_ptr<pipeline::ChannelDependency> _dependency;
+ std::vector<std::shared_ptr<pipeline::LocalExchangeChannelDependency>>
+ _sender_to_local_channel_dependency;
};
class ThreadClosure : public google::protobuf::Closure {
@@ -201,6 +203,12 @@ public:
virtual ~SenderQueue();
+ void set_local_channel_dependency(
+
std::vector<std::shared_ptr<pipeline::LocalExchangeChannelDependency>>&
+ local_channel_dependency) {
+ _local_channel_dependency = local_channel_dependency;
+ }
+
virtual bool should_wait();
virtual Status get_batch(Block* next_block, bool* eos);
@@ -225,10 +233,6 @@ public:
_dependency = dependency;
}
- void set_channel_dependency(std::shared_ptr<pipeline::ChannelDependency>
channel_dependency) {
- _channel_dependency = channel_dependency;
- }
-
protected:
Status _inner_get_batch_without_lock(Block* block, bool* eos);
@@ -251,7 +255,8 @@ protected:
std::unordered_map<std::thread::id, std::unique_ptr<ThreadClosure>>
_local_closure;
std::shared_ptr<pipeline::ExchangeDataDependency> _dependency = nullptr;
- std::shared_ptr<pipeline::ChannelDependency> _channel_dependency = nullptr;
+ std::vector<std::shared_ptr<pipeline::LocalExchangeChannelDependency>>
+ _local_channel_dependency;
};
class VDataStreamRecvr::PipSenderQueue : public SenderQueue {
@@ -270,5 +275,6 @@ public:
void add_block(Block* block, bool use_move) override;
};
+
} // namespace vectorized
} // namespace doris
diff --git a/be/src/vec/sink/vdata_stream_sender.h
b/be/src/vec/sink/vdata_stream_sender.h
index a09cb4b7d47..30a55f13296 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -65,7 +65,7 @@ enum CompressionTypePB : int;
namespace pipeline {
class ExchangeSinkOperator;
class ExchangeSinkOperatorX;
-class ChannelDependency;
+class LocalExchangeChannelDependency;
} // namespace pipeline
namespace vectorized {
@@ -310,11 +310,6 @@ public:
bool is_local() const { return _is_local; }
- VDataStreamRecvr* local_recvr() {
- DCHECK(_is_local && _local_recvr != nullptr);
- return _local_recvr.get();
- }
-
virtual void ch_roll_pb_block();
bool can_write() {
@@ -557,11 +552,12 @@ public:
return _closure.get();
}
- void set_dependency(std::shared_ptr<pipeline::ChannelDependency>
dependency) {
+ std::shared_ptr<pipeline::LocalExchangeChannelDependency>
get_local_channel_dependency() {
if (!Channel<Parent>::_local_recvr) {
throw Exception(ErrorCode::INTERNAL_ERROR, "_local_recvr is null");
}
- Channel<Parent>::_local_recvr->set_dependency(dependency);
+ return Channel<Parent>::_local_recvr->get_local_channel_dependency(
+ Channel<Parent>::_parent->sender_id());
}
private:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]