This is an automated email from the ASF dual-hosted git repository.
Gabriel39 pushed a commit to branch refact_reader_branch
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/refact_reader_branch by this
push:
new 5ad0921fe02 [fix](be) Cast localized filter slots for file schema
types (#63754)
5ad0921fe02 is described below
commit 5ad0921fe021d5c939824b95f44b882fe678470d
Author: Gabriel <[email protected]>
AuthorDate: Wed May 27 19:12:12 2026 +0800
[fix](be) Cast localized filter slots for file schema types (#63754)
---
be/src/format/reader/column_mapper.cpp | 53 +++++++++++++------
be/src/format/reader/file_reader.h | 2 +-
be/test/format/reader/expr/cast_test.cpp | 89 ++++++++++++++++++++++++++++++++
3 files changed, 127 insertions(+), 17 deletions(-)
diff --git a/be/src/format/reader/column_mapper.cpp
b/be/src/format/reader/column_mapper.cpp
index 1a33781b965..80a81f6c76d 100644
--- a/be/src/format/reader/column_mapper.cpp
+++ b/be/src/format/reader/column_mapper.cpp
@@ -36,18 +36,33 @@
namespace doris::reader {
+struct FileSlotRewriteInfo {
+ size_t block_position = 0;
+ DataTypePtr file_type;
+ DataTypePtr table_type;
+ std::string file_column_name;
+};
+
static VExprSPtr rewrite_table_expr_to_file_expr(
- const VExprSPtr& expr, const std::map<int32_t, size_t>&
table_column_to_file_position) {
+ const VExprSPtr& expr,
+ const std::map<int32_t, FileSlotRewriteInfo>&
table_column_to_file_slot) {
if (expr == nullptr) {
return nullptr;
}
if (expr->is_slot_ref()) {
const auto* slot_ref = assert_cast<const VSlotRef*>(expr.get());
- const auto position_it =
table_column_to_file_position.find(slot_ref->slot_id());
- if (position_it != table_column_to_file_position.end()) {
- return TableSlotRef::create_shared(slot_ref->slot_id(),
-
cast_set<int>(position_it->second), -1,
- slot_ref->data_type(),
slot_ref->expr_name());
+ const auto rewrite_it =
table_column_to_file_slot.find(slot_ref->slot_id());
+ if (rewrite_it != table_column_to_file_slot.end()) {
+ const auto& rewrite_info = rewrite_it->second;
+ auto file_slot = TableSlotRef::create_shared(
+ slot_ref->slot_id(),
cast_set<int>(rewrite_info.block_position), -1,
+ rewrite_info.file_type, rewrite_info.file_column_name);
+ if (rewrite_info.file_type->equals(*rewrite_info.table_type)) {
+ return file_slot;
+ }
+ auto cast_expr = Cast::create_shared(rewrite_info.table_type);
+ cast_expr->add_child(std::move(file_slot));
+ return cast_expr;
}
return expr;
}
@@ -59,7 +74,7 @@ static VExprSPtr rewrite_table_expr_to_file_expr(
rewritten_children.reserve(expr->children().size());
for (const auto& child : expr->children()) {
rewritten_children.push_back(
- rewrite_table_expr_to_file_expr(child,
table_column_to_file_position));
+ rewrite_table_expr_to_file_expr(child,
table_column_to_file_slot));
}
expr->set_children(std::move(rewritten_children));
return expr;
@@ -95,19 +110,25 @@ static void rebuild_projection(ColumnMapping* mapping,
size_t block_position) {
mapping->projection = VExprContext::create_shared(expr);
}
-static std::map<int32_t, size_t> build_file_position_map(const
std::vector<ColumnMapping>& mappings,
- const
FileScanRequest& file_request) {
- std::map<int32_t, size_t> table_column_to_file_position;
+// Build a map from table column id to file slot rewrite info for all columns
in the given mappings that have a file column id and are present in the file
request.
+static std::map<int32_t, FileSlotRewriteInfo> build_file_slot_rewrite_map(
+ const std::vector<ColumnMapping>& mappings, const FileScanRequest&
file_request) {
+ std::map<int32_t, FileSlotRewriteInfo> table_column_to_file_slot;
for (const auto& mapping : mappings) {
if (!mapping.file_column_id.has_value()) {
continue;
}
const auto position_it =
file_request.column_positions.find(*mapping.file_column_id);
if (position_it != file_request.column_positions.end()) {
- table_column_to_file_position.emplace(mapping.table_column_id,
position_it->second);
+ table_column_to_file_slot.emplace(
+ mapping.table_column_id,
+ FileSlotRewriteInfo {.block_position = position_it->second,
+ .file_type = mapping.file_type,
+ .table_type = mapping.table_type,
+ .file_column_name =
mapping.file_column_name});
}
}
- return table_column_to_file_position;
+ return table_column_to_file_slot;
}
static bool is_complex_type(const DataTypePtr& type) {
@@ -348,9 +369,9 @@ Status TableColumnMapper::localize_filters(const
std::vector<TableFilter>& table
add_scan_column(file_request, *mapping->file_column_id,
&file_request->predicate_columns);
}
- // Build the complete table-slot to file-block position map after all
predicate columns have
- // been assigned. This keeps expression localization independent from
filter iteration order.
- const auto table_column_to_file_position =
build_file_position_map(_mappings, *file_request);
+ // Build the complete table-slot rewrite map after all predicate columns
have been assigned.
+ // This keeps expression localization independent from filter iteration
order.
+ const auto table_column_to_file_slot =
build_file_slot_rewrite_map(_mappings, *file_request);
for (const auto& table_filter : table_filters) {
if (!table_filter.can_be_localized()) {
continue;
@@ -359,7 +380,7 @@ Status TableColumnMapper::localize_filters(const
std::vector<TableFilter>& table
FileExpressionFilter expression_filter;
expression_filter.conjunct =
VExprContext::create_shared(rewrite_table_expr_to_file_expr(
- table_filter.conjunct->root(),
table_column_to_file_position));
+ table_filter.conjunct->root(),
table_column_to_file_slot));
expression_filter.file_column_ids.reserve(table_filter.slot_ids.size());
for (const auto table_column_id : table_filter.slot_ids) {
const auto* mapping = _find_mapping(table_column_id);
diff --git a/be/src/format/reader/file_reader.h
b/be/src/format/reader/file_reader.h
index 69720bc8f9a..28de8f068b0 100644
--- a/be/src/format/reader/file_reader.h
+++ b/be/src/format/reader/file_reader.h
@@ -106,7 +106,7 @@ struct FileScanRequest {
std::vector<ColumnId> predicate_columns;
std::vector<ColumnId> non_predicate_columns;
- std::map<ColumnId, size_t> column_positions;
+ std::map<ColumnId, size_t> column_positions; // file_column_id ->
file-local block position
std::map<ColumnId, FieldProjection> complex_projections;
std::vector<FileExpressionFilter> expression_filters;
std::vector<FileColumnPredicateFilter> column_predicate_filters;
diff --git a/be/test/format/reader/expr/cast_test.cpp
b/be/test/format/reader/expr/cast_test.cpp
index 4f215418953..cab4e6c5b0d 100644
--- a/be/test/format/reader/expr/cast_test.cpp
+++ b/be/test/format/reader/expr/cast_test.cpp
@@ -64,6 +64,33 @@ protected:
MockRuntimeState state;
};
+class Int64ChildGreaterThanExpr final : public VExpr {
+public:
+ explicit Int64ChildGreaterThanExpr(int64_t value)
+ : VExpr(std::make_shared<DataTypeUInt8>(), false), _value(value) {}
+
+ Status execute_column_impl(VExprContext* context, const Block* block,
const Selector* selector,
+ size_t count, ColumnPtr& result_column) const
override {
+ ColumnPtr child_column;
+ RETURN_IF_ERROR(get_child(0)->execute_column(context, block, selector,
count, child_column));
+ const auto& input = assert_cast<const ColumnInt64&>(*child_column);
+ auto result = ColumnUInt8::create();
+ auto& result_data = result->get_data();
+ result_data.resize(count);
+ for (size_t row = 0; row < count; ++row) {
+ result_data[row] = input.get_element(row) > _value;
+ }
+ result_column = std::move(result);
+ return Status::OK();
+ }
+
+ const std::string& expr_name() const override { return _expr_name; }
+
+private:
+ const int64_t _value;
+ const std::string _expr_name = "Int64ChildGreaterThanExpr";
+};
+
TEST_F(CastTest, CastIntSlotToBigInt) {
auto source_type = std::make_shared<DataTypeInt32>();
auto return_type = std::make_shared<DataTypeInt64>();
@@ -189,6 +216,9 @@ TEST_F(CastTest,
ColumnMapperBuildsCastProjectionForTypeMismatch) {
auto status = mapper.create_mapping(projected_columns, {}, file_schema);
ASSERT_TRUE(status.ok()) << status;
ASSERT_EQ(mapper.mappings().size(), 1);
+ reader::FileScanRequest file_request;
+ status = mapper.create_scan_request({}, {}, projected_columns,
&file_request);
+ ASSERT_TRUE(status.ok()) << status;
const auto& mapping = mapper.mappings()[0];
EXPECT_FALSE(mapping.is_trivial);
ASSERT_NE(mapping.projection, nullptr);
@@ -207,4 +237,63 @@ TEST_F(CastTest,
ColumnMapperBuildsCastProjectionForTypeMismatch) {
mapping.projection->close();
}
+TEST_F(CastTest, ColumnMapperBuildsCastFilterForTypeMismatch) {
+ reader::TableColumnMapper mapper;
+ reader::TableColumn table_column;
+ table_column.id = 7;
+ table_column.name = "value";
+ table_column.type = std::make_shared<DataTypeInt64>();
+ std::vector<reader::TableColumn> projected_columns {table_column};
+
+ reader::SchemaField file_field;
+ file_field.id = 0;
+ file_field.name = "value";
+ file_field.type = std::make_shared<DataTypeInt32>();
+ std::vector<reader::SchemaField> file_schema {file_field};
+
+ auto status = mapper.create_mapping(projected_columns, {}, file_schema);
+ ASSERT_TRUE(status.ok()) << status;
+
+ auto predicate = std::make_shared<Int64ChildGreaterThanExpr>(15);
+ predicate->add_child(TableSlotRef::create_shared(7, 7, -1,
table_column.type, "value"));
+ reader::TableFilter table_filter;
+ table_filter.conjunct = VExprContext::create_shared(predicate);
+ table_filter.slot_ids = {7};
+
+ reader::FileScanRequest file_request;
+ ASSERT_TRUE(mapper.create_scan_request({table_filter}, {},
projected_columns, &file_request)
+ .ok());
+ ASSERT_EQ(file_request.expression_filters.size(), 1);
+ ASSERT_EQ(file_request.predicate_columns,
std::vector<reader::ColumnId>({0}));
+ const auto& localized_expr =
file_request.expression_filters[0].conjunct->root();
+ ASSERT_EQ(localized_expr->get_num_children(), 1);
+ const auto& localized_child = localized_expr->children()[0];
+ ASSERT_NE(dynamic_cast<const Cast*>(localized_child.get()), nullptr);
+ ASSERT_EQ(localized_child->get_num_children(), 1);
+ const auto* localized_slot =
+ assert_cast<const
TableSlotRef*>(localized_child->children()[0].get());
+ EXPECT_EQ(localized_slot->column_id(), 0);
+ EXPECT_TRUE(localized_slot->data_type()->equals(*file_field.type));
+ EXPECT_TRUE(localized_child->data_type()->equals(*table_column.type));
+
+ Block block;
+ block.insert(ColumnHelper::create_column_with_name<DataTypeInt32>({11,
22}));
+ auto* conjunct = file_request.expression_filters[0].conjunct.get();
+ status = conjunct->prepare(&state, RowDescriptor());
+ ASSERT_TRUE(status.ok()) << status;
+ status = conjunct->open(&state);
+ ASSERT_TRUE(status.ok()) << status;
+ IColumn::Filter filter(block.rows(), 1);
+ bool can_filter_all = false;
+ status = conjunct->execute_filter(&block, filter.data(), block.rows(),
false,
+ &can_filter_all);
+ ASSERT_TRUE(status.ok()) << status;
+ EXPECT_FALSE(can_filter_all);
+ ASSERT_EQ(filter.size(), 2);
+ EXPECT_EQ(filter[0], 0);
+ EXPECT_EQ(filter[1], 1);
+
+ file_request.expression_filters[0].conjunct->close();
+}
+
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]