This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3b0b440965c [refactor](pipeline) Delete pipeline option (#35943)
3b0b440965c is described below
commit 3b0b440965cfd331df1f8c6b02aaa85b02dd73a4
Author: Gabriel <[email protected]>
AuthorDate: Thu Jun 6 15:39:47 2024 +0800
[refactor](pipeline) Delete pipeline option (#35943)
---
be/src/exprs/runtime_filter.cpp | 59 +++++++---------------
be/src/exprs/runtime_filter.h | 26 +++-------
be/src/pipeline/exec/aggregation_sink_operator.cpp | 3 +-
.../exec/streaming_aggregation_operator.cpp | 3 +-
be/src/pipeline/local_exchange/local_exchanger.cpp | 1 -
be/src/pipeline/pipeline_fragment_context.cpp | 4 --
be/src/runtime/fragment_mgr.cpp | 13 +----
be/src/runtime/query_context.cpp | 28 ++++------
be/src/runtime/query_context.h | 7 ---
be/src/runtime/runtime_filter_mgr.cpp | 2 -
be/src/runtime/runtime_filter_mgr.h | 1 -
be/src/runtime/runtime_state.h | 6 ---
be/src/vec/runtime/vdata_stream_recvr.cpp | 24 +++------
be/src/vec/runtime/vdata_stream_recvr.h | 1 -
be/src/vec/runtime/vsorted_run_merger.cpp | 27 ++++------
be/src/vec/runtime/vsorted_run_merger.h | 4 --
16 files changed, 55 insertions(+), 154 deletions(-)
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index dc999ae137e..2a98be965f6 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -1117,7 +1117,7 @@ Status IRuntimeFilter::push_to_remote(const
TNetworkAddress* addr) {
pfragment_instance_id->set_lo((int64_t)this);
merge_filter_request->set_filter_id(_filter_id);
- merge_filter_request->set_is_pipeline(_state->enable_pipeline_exec);
+ merge_filter_request->set_is_pipeline(true);
auto column_type = _wrapper->column_type();
merge_filter_request->set_column_type(to_proto(column_type));
merge_filter_callback->cntl_->set_timeout_ms(wait_time_ms());
@@ -1170,35 +1170,21 @@ bool IRuntimeFilter::await() {
int64_t wait_times_ms = _wrapper->get_real_type() ==
RuntimeFilterType::BITMAP_FILTER
? execution_timeout
: runtime_filter_wait_time_ms;
- if (_enable_pipeline_exec) {
- auto expected = _rf_state_atomic.load(std::memory_order_acquire);
- if (expected == RuntimeFilterState::NOT_READY) {
- if (!_rf_state_atomic.compare_exchange_strong(
- expected,
- MonotonicMillis() - registration_time_ >= wait_times_ms
- ? RuntimeFilterState::TIME_OUT
- : RuntimeFilterState::NOT_READY,
- std::memory_order_acq_rel)) {
- DCHECK(expected == RuntimeFilterState::READY ||
- expected == RuntimeFilterState::TIME_OUT);
- return (expected == RuntimeFilterState::READY);
- }
- return false;
- } else if (expected == RuntimeFilterState::TIME_OUT) {
- return false;
- }
- } else {
- std::unique_lock lock(_inner_mutex);
- if (_rf_state != RuntimeFilterState::READY) {
- int64_t ms_since_registration = MonotonicMillis() -
registration_time_;
- int64_t ms_remaining = wait_times_ms - ms_since_registration;
- _rf_state = RuntimeFilterState::TIME_OUT;
- if (ms_remaining <= 0) {
- return false;
- }
- return _inner_cv.wait_for(lock,
std::chrono::milliseconds(ms_remaining),
- [this] { return _rf_state ==
RuntimeFilterState::READY; });
+ auto expected = _rf_state_atomic.load(std::memory_order_acquire);
+ if (expected == RuntimeFilterState::NOT_READY) {
+ if (!_rf_state_atomic.compare_exchange_strong(
+ expected,
+ MonotonicMillis() - registration_time_ >= wait_times_ms
+ ? RuntimeFilterState::TIME_OUT
+ : RuntimeFilterState::NOT_READY,
+ std::memory_order_acq_rel)) {
+ DCHECK(expected == RuntimeFilterState::READY ||
+ expected == RuntimeFilterState::TIME_OUT);
+ return (expected == RuntimeFilterState::READY);
}
+ return false;
+ } else if (expected == RuntimeFilterState::TIME_OUT) {
+ return false;
}
return true;
}
@@ -1212,7 +1198,6 @@ void IRuntimeFilter::update_state() {
? execution_timeout
: runtime_filter_wait_time_ms;
auto expected = _rf_state_atomic.load(std::memory_order_acquire);
- DCHECK(_enable_pipeline_exec);
// In pipelineX, runtime filters will be ready or timeout before open
phase.
if (expected == RuntimeFilterState::NOT_READY) {
DCHECK(MonotonicMillis() - registration_time_ >= wait_times_ms);
@@ -1234,17 +1219,11 @@ PrimitiveType IRuntimeFilter::column_type() const {
void IRuntimeFilter::signal() {
DCHECK(is_consumer());
- if (_enable_pipeline_exec) {
- _rf_state_atomic.store(RuntimeFilterState::READY);
- if (!_filter_timer.empty()) {
- for (auto& timer : _filter_timer) {
- timer->call_ready();
- }
+ _rf_state_atomic.store(RuntimeFilterState::READY);
+ if (!_filter_timer.empty()) {
+ for (auto& timer : _filter_timer) {
+ timer->call_ready();
}
- } else {
- std::unique_lock lock(_inner_mutex);
- _rf_state = RuntimeFilterState::READY;
- _inner_cv.notify_all();
}
if (_wrapper->get_real_type() == RuntimeFilterType::IN_FILTER) {
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index 982a6f6eb8b..a78d732b687 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -207,7 +207,6 @@ public:
registration_time_(MonotonicMillis()),
_wait_infinitely(_state->runtime_filter_wait_infinitely),
_rf_wait_time_ms(_state->runtime_filter_wait_time_ms),
- _enable_pipeline_exec(_state->enable_pipeline_exec),
_runtime_filter_type(get_runtime_filter_type(desc)),
_profile(
new RuntimeProfile(fmt::format("RuntimeFilter: (id = {},
type = {})",
@@ -247,12 +246,10 @@ public:
bool has_local_target() const { return _has_local_target; }
bool is_ready() const {
- return (!_enable_pipeline_exec && _rf_state ==
RuntimeFilterState::READY) ||
- (_enable_pipeline_exec &&
- _rf_state_atomic.load(std::memory_order_acquire) ==
RuntimeFilterState::READY);
+ return _rf_state_atomic.load(std::memory_order_acquire) ==
RuntimeFilterState::READY;
}
RuntimeFilterState current_state() const {
- return _enable_pipeline_exec ?
_rf_state_atomic.load(std::memory_order_acquire) : _rf_state;
+ return _rf_state_atomic.load(std::memory_order_acquire);
}
bool is_producer() const { return _role == RuntimeFilterRole::PRODUCER; }
@@ -390,18 +387,11 @@ protected:
void _set_push_down(bool push_down) { _is_push_down = push_down; }
std::string _get_explain_state_string() const {
- if (_enable_pipeline_exec) {
- return _rf_state_atomic.load(std::memory_order_acquire) ==
RuntimeFilterState::READY
- ? "READY"
- : _rf_state_atomic.load(std::memory_order_acquire) ==
- RuntimeFilterState::TIME_OUT
- ? "TIME_OUT"
- : "NOT_READY";
- } else {
- return _rf_state == RuntimeFilterState::READY ? "READY"
- : _rf_state == RuntimeFilterState::TIME_OUT ? "TIME_OUT"
- : "NOT_READY";
- }
+ return _rf_state_atomic.load(std::memory_order_acquire) ==
RuntimeFilterState::READY
+ ? "READY"
+ : _rf_state_atomic.load(std::memory_order_acquire) ==
RuntimeFilterState::TIME_OUT
+ ? "TIME_OUT"
+ : "NOT_READY";
}
RuntimeFilterParamsContext* _state = nullptr;
@@ -436,8 +426,6 @@ protected:
const bool _wait_infinitely;
const int32_t _rf_wait_time_ms;
- const bool _enable_pipeline_exec;
-
std::atomic<bool> _profile_init = false;
// runtime filter type
RuntimeFilterType _runtime_filter_type;
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index 41498fd94fa..7a4a9d9c951 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -743,8 +743,7 @@ Status AggSinkOperatorX::init(const TPlanNode& tnode,
RuntimeState* state) {
_aggregate_evaluators.reserve(tnode.agg_node.aggregate_functions.size());
// In case of : `select * from (select GoodEvent from hits union select
CounterID from hits) as h limit 10;`
// only union with limit: we can short circuit query the pipeline exec
engine.
- _can_short_circuit =
- tnode.agg_node.aggregate_functions.empty() &&
state->enable_pipeline_x_exec();
+ _can_short_circuit = tnode.agg_node.aggregate_functions.empty();
TSortInfo dummy;
for (int i = 0; i < tnode.agg_node.aggregate_functions.size(); ++i) {
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
index 837a33dc437..40b63783c12 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
@@ -1148,8 +1148,7 @@ Status StreamingAggOperatorX::init(const TPlanNode&
tnode, RuntimeState* state)
_aggregate_evaluators.reserve(tnode.agg_node.aggregate_functions.size());
// In case of : `select * from (select GoodEvent from hits union select
CounterID from hits) as h limit 10;`
// only union with limit: we can short circuit query the pipeline exec
engine.
- _can_short_circuit =
- tnode.agg_node.aggregate_functions.empty() &&
state->enable_pipeline_x_exec();
+ _can_short_circuit = tnode.agg_node.aggregate_functions.empty();
TSortInfo dummy;
for (int i = 0; i < tnode.agg_node.aggregate_functions.size(); ++i) {
diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index 980078b8fe8..a7c6446be43 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -311,7 +311,6 @@ Status LocalMergeSortExchanger::build_merger(RuntimeState*
state,
child_block_suppliers.push_back(block_supplier);
}
RETURN_IF_ERROR(_merger->prepare(child_block_suppliers));
- _merger->set_pipeline_engine_enabled(true);
return Status::OK();
}
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index a85b64c9154..72c06721d89 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -405,7 +405,6 @@ Status PipelineFragmentContext::_build_pipeline_tasks(
_runtime_state->runtime_filter_wait_infinitely();
filterparams->runtime_filter_wait_time_ms =
_runtime_state->runtime_filter_wait_time_ms();
- filterparams->enable_pipeline_exec =
_runtime_state->enable_pipeline_x_exec();
filterparams->execution_timeout =
_runtime_state->execution_timeout();
filterparams->exec_env = ExecEnv::GetInstance();
@@ -1648,9 +1647,6 @@ std::string PipelineFragmentContext::debug_string() {
std::vector<std::shared_ptr<TRuntimeProfileTree>>
PipelineFragmentContext::collect_realtime_profile_x() const {
std::vector<std::shared_ptr<TRuntimeProfileTree>> res;
- DCHECK(_query_ctx->enable_pipeline_x_exec() == true)
- << fmt::format("Query {} calling a pipeline X function, but its
pipeline X is disabled",
- print_id(this->_query_id));
// we do not have mutex to protect pipeline_id_to_profile
// so we need to make sure this funciton is invoked after fragment context
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 41ba68e8cbb..8222b75cfc3 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -710,10 +710,6 @@ Status FragmentMgr::exec_plan_fragment(const
TPipelineFragmentParams& params,
std::shared_ptr<QueryContext> query_ctx;
RETURN_IF_ERROR(_get_query_ctx(params, params.query_id, true, query_ctx));
SCOPED_ATTACH_TASK_WITH_ID(query_ctx->query_mem_tracker, params.query_id);
- DCHECK((params.query_options.__isset.enable_pipeline_x_engine &&
- params.query_options.enable_pipeline_x_engine) ||
- (params.query_options.__isset.enable_pipeline_engine &&
- params.query_options.enable_pipeline_engine));
int64_t duration_ns = 0;
std::shared_ptr<pipeline::PipelineFragmentContext> context =
std::make_shared<pipeline::PipelineFragmentContext>(
@@ -866,13 +862,10 @@ void FragmentMgr::cancel_worker() {
}
for (auto it = _query_ctx_map.begin(); it !=
_query_ctx_map.end();) {
if (auto q_ctx = it->second.lock()) {
- if (q_ctx->is_timeout(now) &&
q_ctx->enable_pipeline_x_exec()) {
+ if (q_ctx->is_timeout(now)) {
LOG_WARNING("Query {} is timeout",
print_id(it->first));
queries_timeout.push_back(it->first);
++it;
- } else if (q_ctx->is_timeout(now)) {
- LOG_WARNING("Query {} is timeout",
print_id(it->first));
- it = _query_ctx_map.erase(it);
} else {
++it;
}
@@ -1240,9 +1233,7 @@ Status FragmentMgr::get_realtime_exec_status(const
TUniqueId& query_id,
return Status::NotFound("Query {} not found", print_id(query_id));
}
- if (query_context->enable_pipeline_x_exec()) {
- *exec_status = query_context->get_realtime_exec_status();
- }
+ *exec_status = query_context->get_realtime_exec_status();
return Status::OK();
}
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 44bdaa5971a..2dafb8dd3ec 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -393,10 +393,6 @@ std::unordered_map<int,
std::vector<std::shared_ptr<TRuntimeProfileTree>>>
QueryContext::_collect_realtime_query_profile() const {
std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>
res;
- if (!enable_pipeline_x_exec()) {
- return res;
- }
-
for (auto& [fragment_id, fragment_ctx_wptr] :
_fragment_id_to_pipeline_ctx) {
if (auto fragment_ctx = fragment_ctx_wptr.lock()) {
if (fragment_ctx == nullptr) {
@@ -429,25 +425,19 @@ QueryContext::_collect_realtime_query_profile() const {
TReportExecStatusParams QueryContext::get_realtime_exec_status() const {
TReportExecStatusParams exec_status;
- if (enable_pipeline_x_exec()) {
- auto realtime_query_profile = _collect_realtime_query_profile();
- std::vector<std::shared_ptr<TRuntimeProfileTree>>
load_channel_profiles;
+ auto realtime_query_profile = _collect_realtime_query_profile();
+ std::vector<std::shared_ptr<TRuntimeProfileTree>> load_channel_profiles;
- for (auto load_channel_profile : _load_channel_profile_map) {
- if (load_channel_profile.second != nullptr) {
- load_channel_profiles.push_back(load_channel_profile.second);
- }
+ for (auto load_channel_profile : _load_channel_profile_map) {
+ if (load_channel_profile.second != nullptr) {
+ load_channel_profiles.push_back(load_channel_profile.second);
}
-
- exec_status =
RuntimeQueryStatiticsMgr::create_report_exec_status_params(
- this->_query_id, std::move(realtime_query_profile),
- std::move(load_channel_profiles), /*is_done=*/false);
- } else {
- auto msg = fmt::format("Query {} is not pipelineX query",
print_id(_query_id));
- LOG_ERROR(msg);
- DCHECK(false) << msg;
}
+ exec_status = RuntimeQueryStatiticsMgr::create_report_exec_status_params(
+ this->_query_id, std::move(realtime_query_profile),
std::move(load_channel_profiles),
+ /*is_done=*/false);
+
return exec_status;
}
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index ee744a89466..4c1ee2cf574 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -167,13 +167,6 @@ public:
_query_options.runtime_filter_wait_infinitely;
}
- bool enable_pipeline_x_exec() const {
- return (_query_options.__isset.enable_pipeline_x_engine &&
- _query_options.enable_pipeline_x_engine) ||
- (_query_options.__isset.enable_pipeline_engine &&
- _query_options.enable_pipeline_engine);
- }
-
int be_exec_version() const {
if (!_query_options.__isset.be_exec_version) {
return 0;
diff --git a/be/src/runtime/runtime_filter_mgr.cpp
b/be/src/runtime/runtime_filter_mgr.cpp
index 104d0e342f7..c9812508446 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -500,7 +500,6 @@ RuntimeFilterParamsContext*
RuntimeFilterParamsContext::create(RuntimeState* sta
state->get_query_ctx()->obj_pool.add(new
RuntimeFilterParamsContext());
params->runtime_filter_wait_infinitely =
state->runtime_filter_wait_infinitely();
params->runtime_filter_wait_time_ms = state->runtime_filter_wait_time_ms();
- params->enable_pipeline_exec = state->enable_pipeline_x_exec();
params->execution_timeout = state->execution_timeout();
params->runtime_filter_mgr = state->local_runtime_filter_mgr();
params->exec_env = state->exec_env();
@@ -516,7 +515,6 @@ RuntimeFilterParamsContext*
RuntimeFilterParamsContext::create(QueryContext* que
RuntimeFilterParamsContext* params = query_ctx->obj_pool.add(new
RuntimeFilterParamsContext());
params->runtime_filter_wait_infinitely =
query_ctx->runtime_filter_wait_infinitely();
params->runtime_filter_wait_time_ms =
query_ctx->runtime_filter_wait_time_ms();
- params->enable_pipeline_exec = query_ctx->enable_pipeline_x_exec();
params->execution_timeout = query_ctx->execution_timeout();
params->runtime_filter_mgr = query_ctx->runtime_filter_mgr();
params->exec_env = query_ctx->exec_env();
diff --git a/be/src/runtime/runtime_filter_mgr.h
b/be/src/runtime/runtime_filter_mgr.h
index 706e5ae5e31..fb0970a541d 100644
--- a/be/src/runtime/runtime_filter_mgr.h
+++ b/be/src/runtime/runtime_filter_mgr.h
@@ -282,7 +282,6 @@ struct RuntimeFilterParamsContext {
bool runtime_filter_wait_infinitely;
int32_t runtime_filter_wait_time_ms;
- bool enable_pipeline_exec;
int32_t execution_timeout;
RuntimeFilterMgr* runtime_filter_mgr;
ExecEnv* exec_env;
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 2b303603e7a..4df2b0a45a5 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -355,12 +355,6 @@ public:
}
return _query_options.be_exec_version;
}
- bool enable_pipeline_x_exec() const {
- return (_query_options.__isset.enable_pipeline_x_engine &&
- _query_options.enable_pipeline_x_engine) ||
- (_query_options.__isset.enable_pipeline_engine &&
- _query_options.enable_pipeline_engine);
- }
bool enable_local_shuffle() const {
return _query_options.__isset.enable_local_shuffle &&
_query_options.enable_local_shuffle;
}
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 7d0e131f5b7..1eaed2a62e4 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -342,8 +342,7 @@ VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr*
stream_mgr, RuntimeState* sta
_row_desc(row_desc),
_is_merging(is_merging),
_is_closed(false),
- _profile(profile),
- _enable_pipeline(state->enable_pipeline_x_exec()) {
+ _profile(profile) {
// DataStreamRecvr may be destructed after the instance execution thread
ends.
_mem_tracker =
std::make_unique<MemTracker>("VDataStreamRecvr:" +
print_id(_fragment_instance_id));
@@ -351,26 +350,18 @@ VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr*
stream_mgr, RuntimeState* sta
// Create one queue per sender if is_merging is true.
int num_queues = is_merging ? num_senders : 1;
- if (state->enable_pipeline_x_exec()) {
- _sender_to_local_channel_dependency.resize(num_queues);
- for (size_t i = 0; i < num_queues; i++) {
- _sender_to_local_channel_dependency[i] =
pipeline::Dependency::create_shared(
- _dest_node_id, _dest_node_id,
"LocalExchangeChannelDependency", true);
- }
+ _sender_to_local_channel_dependency.resize(num_queues);
+ for (size_t i = 0; i < num_queues; i++) {
+ _sender_to_local_channel_dependency[i] =
pipeline::Dependency::create_shared(
+ _dest_node_id, _dest_node_id,
"LocalExchangeChannelDependency", true);
}
_sender_queues.reserve(num_queues);
int num_sender_per_queue = is_merging ? 1 : num_senders;
_sender_queue_mem_limit = std::max(20480,
config::exchg_node_buffer_size_bytes / num_queues);
for (int i = 0; i < num_queues; ++i) {
SenderQueue* queue = nullptr;
- if (_enable_pipeline) {
- queue = _sender_queue_pool.add(new PipSenderQueue(this,
num_sender_per_queue, profile));
- if (state->enable_pipeline_x_exec()) {
-
queue->set_local_channel_dependency(_sender_to_local_channel_dependency[i]);
- }
- } else {
- queue = _sender_queue_pool.add(new SenderQueue(this,
num_sender_per_queue, profile));
- }
+ queue = _sender_queue_pool.add(new PipSenderQueue(this,
num_sender_per_queue, profile));
+
queue->set_local_channel_dependency(_sender_to_local_channel_dependency[i]);
_sender_queues.push_back(queue);
}
@@ -411,7 +402,6 @@ Status VDataStreamRecvr::create_merger(const
VExprContextSPtrs& ordering_expr,
_sender_queues[i],
std::placeholders::_1,
std::placeholders::_2));
}
- _merger->set_pipeline_engine_enabled(_enable_pipeline);
RETURN_IF_ERROR(_merger->prepare(child_block_suppliers));
return Status::OK();
}
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h
b/be/src/vec/runtime/vdata_stream_recvr.h
index 310b9ced5dc..e89eb7ba824 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -167,7 +167,6 @@ private:
// Number of blocks received
RuntimeProfile::Counter* _blocks_produced_counter = nullptr;
- bool _enable_pipeline;
std::vector<std::shared_ptr<pipeline::Dependency>>
_sender_to_local_channel_dependency;
};
diff --git a/be/src/vec/runtime/vsorted_run_merger.cpp
b/be/src/vec/runtime/vsorted_run_merger.cpp
index 3b17f957deb..ef054190a3b 100644
--- a/be/src/vec/runtime/vsorted_run_merger.cpp
+++ b/be/src/vec/runtime/vsorted_run_merger.cpp
@@ -119,12 +119,9 @@ Status VSortedRunMerger::get_next(Block* output_block,
bool* eos) {
while (_offset != 0 && current->block_ptr() != nullptr) {
if (_offset >= current->rows - current->pos) {
_offset -= (current->rows - current->pos);
- if (_pipeline_engine_enabled) {
- _pending_cursor = current.impl;
- _priority_queue.pop();
- return Status::OK();
- }
- has_next_block(current);
+ _pending_cursor = current.impl;
+ _priority_queue.pop();
+ return Status::OK();
} else {
current->pos += _offset;
_offset = 0;
@@ -134,12 +131,9 @@ Status VSortedRunMerger::get_next(Block* output_block,
bool* eos) {
if (current->is_first()) {
if (current->block_ptr() != nullptr) {
current->block_ptr()->swap(*output_block);
- if (_pipeline_engine_enabled) {
- _pending_cursor = current.impl;
- _priority_queue.pop();
- return Status::OK();
- }
- *eos = !has_next_block(current);
+ _pending_cursor = current.impl;
+ _priority_queue.pop();
+ return Status::OK();
} else {
*eos = true;
}
@@ -151,12 +145,9 @@ Status VSortedRunMerger::get_next(Block* output_block,
bool* eos) {
current->pos, current->rows - current->pos);
}
current->block_ptr()->swap(*output_block);
- if (_pipeline_engine_enabled) {
- _pending_cursor = current.impl;
- _priority_queue.pop();
- return Status::OK();
- }
- *eos = !has_next_block(current);
+ _pending_cursor = current.impl;
+ _priority_queue.pop();
+ return Status::OK();
} else {
*eos = true;
}
diff --git a/be/src/vec/runtime/vsorted_run_merger.h
b/be/src/vec/runtime/vsorted_run_merger.h
index 943956d8c38..8dd706cad16 100644
--- a/be/src/vec/runtime/vsorted_run_merger.h
+++ b/be/src/vec/runtime/vsorted_run_merger.h
@@ -62,8 +62,6 @@ public:
// Return the next block of sorted rows from this merger.
Status get_next(Block* output_block, bool* eos);
- void set_pipeline_engine_enabled(bool value) { _pipeline_engine_enabled =
value; }
-
protected:
const VExprContextSPtrs _ordering_expr;
SortDescription _desc;
@@ -76,8 +74,6 @@ protected:
int64_t _limit = -1;
size_t _offset = 0;
- bool _pipeline_engine_enabled = false;
-
std::vector<BlockSupplierSortCursorImpl> _cursors;
std::priority_queue<MergeSortCursor> _priority_queue;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]