This is an automated email from the ASF dual-hosted git repository.
zclll pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3676605ecaf [refine](code)Remove the column_to_keep parameter from the
filter_block function (#60899)
3676605ecaf is described below
commit 3676605ecaf0f9e1b8ca8dabad3c34aaf1896197
Author: Mryange <[email protected]>
AuthorDate: Wed Mar 4 14:59:37 2026 +0800
[refine](code)Remove the column_to_keep parameter from the filter_block
function (#60899)
This is part of a series of PRs. Our eventual goal is to delete the
erase_useless_column function, so we need to refactor gradually.
In this PR, all uses of filter_block are to filter the entire block.
---
be/src/pipeline/exec/aggregation_source_operator.cpp | 2 +-
be/src/pipeline/exec/analytic_source_operator.cpp | 3 +--
be/src/pipeline/exec/assert_num_rows_operator.cpp | 2 +-
be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp | 2 +-
be/src/pipeline/exec/hashjoin_probe_operator.cpp | 2 +-
be/src/pipeline/exec/nested_loop_join_probe_operator.cpp | 5 ++---
be/src/pipeline/exec/operator.cpp | 4 ++--
be/src/pipeline/exec/operator.h | 2 +-
be/src/pipeline/exec/partition_sort_source_operator.cpp | 3 +--
be/src/pipeline/exec/rec_cte_scan_operator.h | 2 +-
be/src/pipeline/exec/rec_cte_source_operator.h | 3 +--
be/src/pipeline/exec/schema_scan_operator.cpp | 4 ++--
be/src/pipeline/exec/select_operator.h | 2 +-
be/src/pipeline/exec/streaming_aggregation_operator.cpp | 2 +-
14 files changed, 17 insertions(+), 21 deletions(-)
diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp
b/be/src/pipeline/exec/aggregation_source_operator.cpp
index 846bfdf1c12..c78cd3111ac 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -448,7 +448,7 @@ Status AggSourceOperatorX::get_block(RuntimeState* state,
vectorized::Block* blo
RETURN_IF_ERROR(local_state._executor.get_result(state, block, eos));
local_state.make_nullable_output_key(block);
// dispose the having clause, should not be execute in prestreaming agg
- RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block,
block->columns()));
+ RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block));
local_state.do_agg_limit(block, eos);
return Status::OK();
}
diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp
b/be/src/pipeline/exec/analytic_source_operator.cpp
index 5170711acc3..86188f45faf 100644
--- a/be/src/pipeline/exec/analytic_source_operator.cpp
+++ b/be/src/pipeline/exec/analytic_source_operator.cpp
@@ -60,8 +60,7 @@ Status AnalyticSourceOperatorX::get_block(RuntimeState*
state, vectorized::Block
local_state._shared_state->blocks_buffer.pop();
output_rows = output_block->rows();
//if buffer have no data and sink not eos, block reading and wait
for signal again
- RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts,
output_block,
- output_block->columns()));
+ RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts,
output_block));
if (local_state._shared_state->blocks_buffer.empty() &&
!local_state._shared_state->sink_eos) {
// add this mutex to check, as in some case maybe is doing
block(), and the sink is doing set eos.
diff --git a/be/src/pipeline/exec/assert_num_rows_operator.cpp
b/be/src/pipeline/exec/assert_num_rows_operator.cpp
index 47a97e0af64..dca10ec02b8 100644
--- a/be/src/pipeline/exec/assert_num_rows_operator.cpp
+++ b/be/src/pipeline/exec/assert_num_rows_operator.cpp
@@ -120,7 +120,7 @@ Status AssertNumRowsOperatorX::pull(doris::RuntimeState*
state, vectorized::Bloc
_node_id, to_string_lambda(_assertion), _desired_num_rows,
num_rows_returned,
_subquery_string);
}
- RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block,
block->columns()));
+ RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block));
return Status::OK();
}
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
index 972fc9ba923..75ca1c17e34 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
@@ -403,7 +403,7 @@ Status DistinctStreamingAggOperatorX::pull(RuntimeState*
state, vectorized::Bloc
local_state._make_nullable_output_key(block);
if (!_is_streaming_preagg) {
// dispose the having clause, should not be execute in prestreaming agg
- RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts,
block, block->columns()));
+ RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts,
block));
}
local_state.add_num_rows_returned(block->rows());
// If the limit is not reached, it is important to ensure that
_aggregated_block is empty
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index cb2c0df540c..bb54947423b 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -382,7 +382,7 @@ Status
HashJoinProbeLocalState::filter_data_and_build_output(RuntimeState* state
}
{
SCOPED_TIMER(_join_filter_timer);
- RETURN_IF_ERROR(filter_block(_conjuncts, temp_block,
temp_block->columns()));
+ RETURN_IF_ERROR(filter_block(_conjuncts, temp_block));
}
RETURN_IF_ERROR(_build_output_block(temp_block, output_block));
diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
index 74f0b4c5a36..5f455789894 100644
--- a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
@@ -654,9 +654,8 @@ Status NestedLoopJoinProbeOperatorX::pull(RuntimeState*
state, vectorized::Block
{
SCOPED_TIMER(local_state._join_filter_timer);
-
RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts,
-
&local_state._join_block,
-
local_state._join_block.columns()));
+ RETURN_IF_ERROR(
+ local_state.filter_block(local_state._conjuncts,
&local_state._join_block));
}
RETURN_IF_ERROR(local_state._build_output_block(&local_state._join_block,
block));
}
diff --git a/be/src/pipeline/exec/operator.cpp
b/be/src/pipeline/exec/operator.cpp
index 1b9c0adf230..60193253071 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -291,8 +291,8 @@ void PipelineXLocalStateBase::clear_origin_block() {
}
Status PipelineXLocalStateBase::filter_block(const
vectorized::VExprContextSPtrs& expr_contexts,
- vectorized::Block* block, size_t
column_to_keep) {
- RETURN_IF_ERROR(vectorized::VExprContext::filter_block(expr_contexts,
block, column_to_keep));
+ vectorized::Block* block) {
+ RETURN_IF_ERROR(vectorized::VExprContext::filter_block(expr_contexts,
block, block->columns()));
_estimate_memory_usage +=
vectorized::VExprContext::get_memory_usage(expr_contexts);
return Status::OK();
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 10e780c07f7..d6244f850c0 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -266,7 +266,7 @@ public:
virtual std::vector<Dependency*> execution_dependencies() { return {}; }
Status filter_block(const vectorized::VExprContextSPtrs& expr_contexts,
- vectorized::Block* block, size_t column_to_keep);
+ vectorized::Block* block);
int64_t& estimate_memory_usage() { return _estimate_memory_usage; }
diff --git a/be/src/pipeline/exec/partition_sort_source_operator.cpp
b/be/src/pipeline/exec/partition_sort_source_operator.cpp
index 127498c18e3..795b1a223c5 100644
--- a/be/src/pipeline/exec/partition_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_source_operator.cpp
@@ -74,8 +74,7 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState*
state, vectorized::
if (!output_block->empty()) {
//if buffer have no data and sink not eos, block reading and wait for
signal again
- RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts,
output_block,
- output_block->columns()));
+ RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts,
output_block));
local_state._num_rows_returned += output_block->rows();
}
return Status::OK();
diff --git a/be/src/pipeline/exec/rec_cte_scan_operator.h
b/be/src/pipeline/exec/rec_cte_scan_operator.h
index 5b03766c163..6579d72957f 100644
--- a/be/src/pipeline/exec/rec_cte_scan_operator.h
+++ b/be/src/pipeline/exec/rec_cte_scan_operator.h
@@ -77,7 +77,7 @@ public:
return Status::OK();
}
*block = std::move(local_state._blocks.back());
- RETURN_IF_ERROR(local_state.filter_block(local_state.conjuncts(),
block, block->columns()));
+ RETURN_IF_ERROR(local_state.filter_block(local_state.conjuncts(),
block));
local_state._blocks.pop_back();
return Status::OK();
}
diff --git a/be/src/pipeline/exec/rec_cte_source_operator.h
b/be/src/pipeline/exec/rec_cte_source_operator.h
index 7f42495116e..0bd58106146 100644
--- a/be/src/pipeline/exec/rec_cte_source_operator.h
+++ b/be/src/pipeline/exec/rec_cte_source_operator.h
@@ -121,8 +121,7 @@ public:
*eos = true;
} else {
block->swap(ctx->blocks.back());
- RETURN_IF_ERROR(
- local_state.filter_block(local_state.conjuncts(),
block, block->columns()));
+
RETURN_IF_ERROR(local_state.filter_block(local_state.conjuncts(), block));
ctx->blocks.pop_back();
}
}
diff --git a/be/src/pipeline/exec/schema_scan_operator.cpp
b/be/src/pipeline/exec/schema_scan_operator.cpp
index 79987c001de..362f32699a3 100644
--- a/be/src/pipeline/exec/schema_scan_operator.cpp
+++ b/be/src/pipeline/exec/schema_scan_operator.cpp
@@ -257,8 +257,8 @@ Status SchemaScanOperatorX::get_block(RuntimeState* state,
vectorized::Block* bl
*src_block.safe_get_by_position(_slot_offsets[i]).column, 0,
src_block.rows());
}
- RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts,
block,
-
_dest_tuple_desc->slots().size()));
+ DCHECK_EQ(block->columns(), _dest_tuple_desc->slots().size());
+ RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts,
block));
src_block.clear();
}
} while (block->rows() == 0 && !*eos);
diff --git a/be/src/pipeline/exec/select_operator.h
b/be/src/pipeline/exec/select_operator.h
index f033c7c0de8..bcdb8924068 100644
--- a/be/src/pipeline/exec/select_operator.h
+++ b/be/src/pipeline/exec/select_operator.h
@@ -47,7 +47,7 @@ public:
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
RETURN_IF_CANCELLED(state);
- RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts,
block, block->columns()));
+ RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts,
block));
local_state.reached_limit(block, eos);
return Status::OK();
}
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
index fe9aef1662a..898be928121 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
@@ -1024,7 +1024,7 @@ Status StreamingAggOperatorX::pull(RuntimeState* state,
vectorized::Block* block
RETURN_IF_ERROR(local_state._get_results_with_serialized_key(state,
block, eos));
local_state.make_nullable_output_key(block);
// dispose the having clause, should not be execute in prestreaming agg
- RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts,
block, block->columns()));
+ RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts,
block));
}
local_state.reached_limit(block, eos);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]