This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch tpc_preview6
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/tpc_preview6 by this push:
new 115e2306cf0 adjust conjunct order by execute cost
115e2306cf0 is described below
commit 115e2306cf0405b9f67e89fcd856416c98dd4240
Author: BiteTheDDDDt <[email protected]>
AuthorDate: Mon Jan 19 18:31:45 2026 +0800
adjust conjunct order by execute cost
thrift: add enable_adjust_conjunct_order_by_cost to TQueryOptions
fe: add session variable enable_adjust_conjunct_order_by_cost
fe: forward enable_adjust_conjunct_order_by_cost via toThrift if supported
update
update
format
update
Update be/src/vec/exprs/vexpr_context.cpp
Co-authored-by: Copilot <[email protected]>
Update be/src/vec/exprs/vcompound_pred.h
Co-authored-by: Copilot <[email protected]>
Update be/src/vec/exprs/vectorized_fn_call.cpp
Co-authored-by: Copilot <[email protected]>
update
update
---
be/src/pipeline/exec/aggregation_sink_operator.cpp | 3 +--
be/src/pipeline/exec/operator.cpp | 15 +++++++++------
be/src/pipeline/exec/scan_operator.cpp | 12 +++++++++---
be/src/pipeline/exec/streaming_aggregation_operator.cpp | 1 -
be/src/pipeline/exec/streaming_aggregation_operator.h | 1 -
be/src/runtime/runtime_state.h | 5 +++++
be/src/vec/exec/format/parquet/vparquet_group_reader.cpp | 6 ++++++
be/src/vec/exprs/vcolumn_ref.h | 2 ++
be/src/vec/exprs/vcompound_pred.h | 8 ++++++++
be/src/vec/exprs/vectorized_fn_call.cpp | 13 +++++++++++++
be/src/vec/exprs/vectorized_fn_call.h | 1 +
be/src/vec/exprs/vexpr.h | 8 ++++++++
be/src/vec/exprs/vexpr_context.cpp | 9 +++++++++
be/src/vec/exprs/vexpr_context.h | 4 +++-
be/src/vec/exprs/vliteral.h | 2 ++
be/src/vec/exprs/vruntimefilter_wrapper.h | 2 ++
be/src/vec/exprs/vslot_ref.h | 2 ++
be/src/vec/functions/function.h | 4 ++++
be/src/vec/functions/functions_comparison.h | 2 ++
.../main/java/org/apache/doris/qe/SessionVariable.java | 4 ++++
gensrc/thrift/PaloInternalService.thrift | 2 ++
21 files changed, 92 insertions(+), 14 deletions(-)
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index ddb4bbfbe6f..9239e13f7d4 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -734,8 +734,7 @@ AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int
operator_id, int dest_i
_is_first_phase(tnode.agg_node.__isset.is_first_phase &&
tnode.agg_node.is_first_phase),
_pool(pool),
_limit(tnode.limit),
- _have_conjuncts((tnode.__isset.vconjunct &&
!tnode.vconjunct.nodes.empty()) ||
- (tnode.__isset.conjuncts &&
!tnode.conjuncts.empty())),
+ _have_conjuncts(tnode.__isset.conjuncts && !tnode.conjuncts.empty()),
_partition_exprs(
tnode.__isset.distribute_expr_lists &&
(require_bucket_distribution ||
diff --git a/be/src/pipeline/exec/operator.cpp
b/be/src/pipeline/exec/operator.cpp
index cf6327bcf0c..26b63443dd6 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -179,7 +179,7 @@ std::string OperatorXBase::debug_string(RuntimeState*
state, int indentation_lev
return
state->get_local_state(operator_id())->debug_string(indentation_level);
}
-Status OperatorXBase::init(const TPlanNode& tnode, RuntimeState* /*state*/) {
+Status OperatorXBase::init(const TPlanNode& tnode, RuntimeState* state) {
std::string node_name = print_plan_node_type(tnode.node_type);
_nereids_id = tnode.nereids_id;
if (!tnode.intermediate_output_tuple_id_list.empty()) {
@@ -199,11 +199,9 @@ Status OperatorXBase::init(const TPlanNode& tnode,
RuntimeState* /*state*/) {
_op_name = substr + "_OPERATOR";
if (tnode.__isset.vconjunct) {
- vectorized::VExprContextSPtr context;
- RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(tnode.vconjunct,
context));
- _conjuncts.emplace_back(context);
+ return Status::InternalError("vconjunct is not supported yet");
} else if (tnode.__isset.conjuncts) {
- for (auto& conjunct : tnode.conjuncts) {
+ for (const auto& conjunct : tnode.conjuncts) {
vectorized::VExprContextSPtr context;
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(conjunct,
context));
_conjuncts.emplace_back(context);
@@ -211,7 +209,6 @@ Status OperatorXBase::init(const TPlanNode& tnode,
RuntimeState* /*state*/) {
}
// create the projections expr
-
if (tnode.__isset.projections) {
DCHECK(tnode.__isset.output_tuple_id);
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(tnode.projections,
_projections));
@@ -232,6 +229,12 @@ Status OperatorXBase::prepare(RuntimeState* state) {
for (auto& conjunct : _conjuncts) {
RETURN_IF_ERROR(conjunct->prepare(state, intermediate_row_desc()));
}
+ if (state->enable_adjust_conjunct_order_by_cost()) {
+ std::ranges::sort(_conjuncts, [](const auto& a, const auto& b) {
+ return a->execute_cost() < b->execute_cost();
+ });
+ };
+
for (int i = 0; i < _intermediate_projections.size(); i++) {
RETURN_IF_ERROR(vectorized::VExpr::prepare(_intermediate_projections[i], state,
intermediate_row_desc(i)));
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index 7b08da8c9c2..504c821ce36 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -77,8 +77,14 @@ Status
ScanLocalStateBase::update_late_arrival_runtime_filter(RuntimeState* stat
int&
arrived_rf_num) {
// Lock needed because _conjuncts can be accessed concurrently by multiple
scanner threads
std::unique_lock lock(_conjuncts_lock);
- return _helper.try_append_late_arrival_runtime_filter(state,
_parent->row_descriptor(),
- arrived_rf_num,
_conjuncts);
+ RETURN_IF_ERROR(_helper.try_append_late_arrival_runtime_filter(state,
_parent->row_descriptor(),
+
arrived_rf_num, _conjuncts));
+ if (state->enable_adjust_conjunct_order_by_cost()) {
+ std::ranges::sort(_conjuncts, [](const auto& a, const auto& b) {
+ return a->execute_cost() < b->execute_cost();
+ });
+ };
+ return Status::OK();
}
Status ScanLocalStateBase::clone_conjunct_ctxs(vectorized::VExprContextSPtrs&
scanner_conjuncts) {
@@ -312,7 +318,7 @@ Status
ScanLocalState<Derived>::_normalize_conjuncts(RuntimeState* state) {
message += conjunct->root()->debug_string();
}
}
- custom_profile()->add_info_string("RemainedDownPredicates", message);
+ custom_profile()->add_info_string("RemainedPredicates", message);
}
for (auto& it : _slot_id_to_value_range) {
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
index 5959ef2cf03..c8568930176 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
@@ -825,7 +825,6 @@ StreamingAggOperatorX::StreamingAggOperatorX(ObjectPool*
pool, int operator_id,
_output_tuple_id(tnode.agg_node.output_tuple_id),
_needs_finalize(tnode.agg_node.need_finalize),
_is_first_phase(tnode.agg_node.__isset.is_first_phase &&
tnode.agg_node.is_first_phase),
- _have_conjuncts(tnode.__isset.vconjunct &&
!tnode.vconjunct.nodes.empty()),
_agg_fn_output_row_descriptor(descs, tnode.row_tuples),
_partition_exprs(
tnode.__isset.distribute_expr_lists &&
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.h
b/be/src/pipeline/exec/streaming_aggregation_operator.h
index ce45fe97c22..26ce294f8a9 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.h
@@ -265,7 +265,6 @@ private:
std::vector<vectorized::AggFnEvaluator*> _aggregate_evaluators;
bool _can_short_circuit = false;
std::vector<size_t> _make_nullable_keys;
- bool _have_conjuncts;
RowDescriptor _agg_fn_output_row_descriptor;
// For sort limit
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index cb80c4f1ba9..86ccb003279 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -125,6 +125,11 @@ public:
:
_query_options.mem_limit / 20;
}
+ bool enable_adjust_conjunct_order_by_cost() const {
+ return _query_options.__isset.enable_adjust_conjunct_order_by_cost &&
+ _query_options.enable_adjust_conjunct_order_by_cost;
+ }
+
int32_t max_column_reader_num() const {
return _query_options.__isset.max_column_reader_num ?
_query_options.max_column_reader_num
: 20000;
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index 7532a1259f7..88d083e10d2 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -188,6 +188,12 @@ Status RowGroupReader::init(
_lazy_read_ctx.missing_columns_conjuncts.end());
RETURN_IF_ERROR(_rewrite_dict_predicates());
}
+ // _state is nullptr in some ut.
+ if (_state && _state->enable_adjust_conjunct_order_by_cost()) {
+ std::ranges::sort(_filter_conjuncts, [](const auto& a, const auto& b) {
+ return a->execute_cost() < b->execute_cost();
+ });
+ }
return Status::OK();
}
diff --git a/be/src/vec/exprs/vcolumn_ref.h b/be/src/vec/exprs/vcolumn_ref.h
index 83dba5e3da2..3f4412cf12c 100644
--- a/be/src/vec/exprs/vcolumn_ref.h
+++ b/be/src/vec/exprs/vcolumn_ref.h
@@ -89,6 +89,8 @@ public:
return out.str();
}
+ double execute_cost() const override { return 0.0; }
+
private:
int _column_id;
std::atomic<int> _gap = 0;
diff --git a/be/src/vec/exprs/vcompound_pred.h
b/be/src/vec/exprs/vcompound_pred.h
index c1ba88fa2af..2ba4216847b 100644
--- a/be/src/vec/exprs/vcompound_pred.h
+++ b/be/src/vec/exprs/vcompound_pred.h
@@ -347,6 +347,14 @@ public:
return Status::OK();
}
+ double execute_cost() const override {
+ double cost = 0.3;
+ for (const auto& child : _children) {
+ cost += child->execute_cost();
+ }
+ return cost;
+ }
+
private:
static inline constexpr uint8_t apply_and_null(UInt8 a, UInt8 l_null,
UInt8 b, UInt8 r_null) {
// (<> && false) is false, (true && NULL) is NULL
diff --git a/be/src/vec/exprs/vectorized_fn_call.cpp
b/be/src/vec/exprs/vectorized_fn_call.cpp
index 0d4bc252394..cf6dc8fc6db 100644
--- a/be/src/vec/exprs/vectorized_fn_call.cpp
+++ b/be/src/vec/exprs/vectorized_fn_call.cpp
@@ -26,6 +26,7 @@
#include <ostream>
#include "common/config.h"
+#include "common/exception.h"
#include "common/logging.h"
#include "common/status.h"
#include "common/utils.h"
@@ -641,5 +642,17 @@ Status VectorizedFnCall::evaluate_ann_range_search(
return Status::OK();
}
+double VectorizedFnCall::execute_cost() const {
+ if (!_function) {
+ throw Exception(
+ Status::InternalError("Function is null in expression: {}",
this->debug_string()));
+ }
+ double cost = _function->execute_cost();
+ for (const auto& child : _children) {
+ cost += child->execute_cost();
+ }
+ return cost;
+}
+
#include "common/compile_check_end.h"
} // namespace doris::vectorized
diff --git a/be/src/vec/exprs/vectorized_fn_call.h
b/be/src/vec/exprs/vectorized_fn_call.h
index 2963e35931f..8da62455365 100644
--- a/be/src/vec/exprs/vectorized_fn_call.h
+++ b/be/src/vec/exprs/vectorized_fn_call.h
@@ -65,6 +65,7 @@ public:
const std::string& expr_name() const override;
std::string function_name() const;
std::string debug_string() const override;
+ double execute_cost() const override;
bool is_blockable() const override {
return _function->is_blockable() ||
std::any_of(_children.begin(), _children.end(),
diff --git a/be/src/vec/exprs/vexpr.h b/be/src/vec/exprs/vexpr.h
index 31812816e9b..efc39fffc9b 100644
--- a/be/src/vec/exprs/vexpr.h
+++ b/be/src/vec/exprs/vexpr.h
@@ -277,6 +277,14 @@ public:
return expr;
}
+ virtual double execute_cost() const {
+ double cost = 1.0;
+ for (const auto& child : _children) {
+ cost += child->execute_cost();
+ }
+ return cost;
+ }
+
// If this expr is a RuntimeFilterWrapper, this method will return an
underlying rf expression
virtual VExprSPtr get_impl() const { return {}; }
diff --git a/be/src/vec/exprs/vexpr_context.cpp
b/be/src/vec/exprs/vexpr_context.cpp
index de40e9dbe87..63e8deb7aee 100644
--- a/be/src/vec/exprs/vexpr_context.cpp
+++ b/be/src/vec/exprs/vexpr_context.cpp
@@ -473,5 +473,14 @@ uint64_t VExprContext::get_digest(uint64_t seed) const {
return _root->get_digest(seed);
}
+double VExprContext::execute_cost() const {
+ if (_root == nullptr) {
+ // When there is no expression root, treat the cost as a base value.
+ // This avoids null dereferences while keeping a deterministic cost.
+ return 0.0;
+ }
+ return _root->execute_cost();
+}
+
#include "common/compile_check_end.h"
} // namespace doris::vectorized
diff --git a/be/src/vec/exprs/vexpr_context.h b/be/src/vec/exprs/vexpr_context.h
index de3fe02612c..98a1491e5d6 100644
--- a/be/src/vec/exprs/vexpr_context.h
+++ b/be/src/vec/exprs/vexpr_context.h
@@ -201,7 +201,9 @@ public:
[[nodiscard]] Status execute_const_expr(ColumnWithTypeAndName& result);
- VExprSPtr root() const { return _root; }
+ double execute_cost() const;
+
+ VExprSPtr root() { return _root; }
void set_root(const VExprSPtr& expr) { _root = expr; }
void set_index_context(std::shared_ptr<IndexExecContext> index_context) {
_index_context = std::move(index_context);
diff --git a/be/src/vec/exprs/vliteral.h b/be/src/vec/exprs/vliteral.h
index 72f1d90bbbe..a77ed5dafd0 100644
--- a/be/src/vec/exprs/vliteral.h
+++ b/be/src/vec/exprs/vliteral.h
@@ -56,6 +56,8 @@ public:
const std::string& expr_name() const override { return _expr_name; }
std::string debug_string() const override;
+ double execute_cost() const override { return 0.0; }
+
MOCK_FUNCTION std::string value(const DataTypeSerDe::FormatOptions&
options) const;
const ColumnPtr& get_column_ptr() const { return _column_ptr; }
diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.h
b/be/src/vec/exprs/vruntimefilter_wrapper.h
index f9b6203ed6c..d3c42df23f4 100644
--- a/be/src/vec/exprs/vruntimefilter_wrapper.h
+++ b/be/src/vec/exprs/vruntimefilter_wrapper.h
@@ -64,6 +64,8 @@ public:
const VExprSPtrs& children() const override { return _impl->children(); }
TExprNodeType::type node_type() const override { return
_impl->node_type(); }
+ double execute_cost() const override { return _impl->execute_cost(); }
+
Status execute_filter(VExprContext* context, const Block* block,
uint8_t* __restrict result_filter_data, size_t rows,
bool accept_null,
bool* can_filter_all) const override;
diff --git a/be/src/vec/exprs/vslot_ref.h b/be/src/vec/exprs/vslot_ref.h
index ff7e3812d76..a5feb3c999a 100644
--- a/be/src/vec/exprs/vslot_ref.h
+++ b/be/src/vec/exprs/vslot_ref.h
@@ -72,6 +72,8 @@ public:
uint64_t get_digest(uint64_t seed) const override;
+ double execute_cost() const override { return 0.0; }
+
private:
int _slot_id;
int _column_id;
diff --git a/be/src/vec/functions/function.h b/be/src/vec/functions/function.h
index ca872a147a7..e5a692e473d 100644
--- a/be/src/vec/functions/function.h
+++ b/be/src/vec/functions/function.h
@@ -170,6 +170,8 @@ public:
virtual const DataTypes& get_argument_types() const = 0;
virtual const DataTypePtr& get_return_type() const = 0;
+ virtual double execute_cost() const { return 1.0; }
+
/// Do preparations and return executable.
/// sample_block should contain data types of arguments and values of
constants, if relevant.
virtual PreparedFunctionPtr prepare(FunctionContext* context, const Block&
sample_block,
@@ -451,6 +453,8 @@ public:
return function;
}
+ double execute_cost() const override { return function->execute_cost(); }
+
Status open(FunctionContext* context, FunctionContext::FunctionStateScope
scope) override {
return function->open(context, scope);
}
diff --git a/be/src/vec/functions/functions_comparison.h
b/be/src/vec/functions/functions_comparison.h
index 8c0d56e74ea..fc831997681 100644
--- a/be/src/vec/functions/functions_comparison.h
+++ b/be/src/vec/functions/functions_comparison.h
@@ -272,6 +272,8 @@ public:
FunctionComparison() = default;
+ double execute_cost() const override { return 0.5; }
+
private:
template <PrimitiveType PT>
Status execute_num_type(Block& block, uint32_t result, const ColumnPtr&
col_left_ptr,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 9e3f0112508..4ef012cb20a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -3164,6 +3164,9 @@ public class SessionVariable implements Serializable,
Writable {
})
public boolean enablePhraseQuerySequentialOpt = true;
+ @VariableMgr.VarAttr(name = "enable_adjust_conjunct_order_by_cost",
needForward = true)
+ public boolean enableAdjustConjunctOrderByCost = true;
+
@VariableMgr.VarAttr(name = REQUIRE_SEQUENCE_IN_INSERT, needForward =
true, description = {
"该变量用于控制,使用了 sequence 列的 unique key 表,insert into 操作是否要求必须提供每一行的
sequence 列的值",
"This variable controls whether the INSERT INTO operation on
unique key tables with a sequence"
@@ -5186,6 +5189,7 @@ public class SessionVariable implements Serializable,
Writable {
} else {
tResult.setFileCacheQueryLimitPercent(Config.file_cache_query_limit_max_percent);
}
+
tResult.setEnableAdjustConjunctOrderByCost(enableAdjustConjunctOrderByCost);
// Set Iceberg write target file size
tResult.setIcebergWriteTargetFileSizeBytes(icebergWriteTargetFileSizeBytes);
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index 43ccf013d9c..dbfc8f2de76 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -432,6 +432,8 @@ struct TQueryOptions {
188: optional bool enable_broadcast_join_force_passthrough;
+ 200: optional bool enable_adjust_conjunct_order_by_cost
+
// For cloud, to control if the content would be written into file cache
// In write path, to control if the content would be written into file cache.
// In read path, read from file cache or remote storage when execute query.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]