This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new d661bd867e [chore](scanner) check columns' nullable with schema (#24724) (#24811) d661bd867e is described below commit d661bd867e0ac9eb5df84203141d4d722f5504d2 Author: Jerry Hu <mrh...@gmail.com> AuthorDate: Sat Sep 23 22:49:19 2023 +0800 [chore](scanner) check columns' nullable with schema (#24724) (#24811) Add a validation to prevent potential schema inconsistency issues. --- be/src/vec/exec/scan/new_olap_scanner.cpp | 6 ++++++ be/src/vec/exec/scan/new_olap_scanner.h | 2 +- be/src/vec/exec/scan/pip_scanner_context.h | 3 +++ be/src/vec/exec/scan/scanner_context.cpp | 29 +++++++++++++++++++++++++++++ be/src/vec/exec/scan/scanner_context.h | 2 ++ 5 files changed, 41 insertions(+), 1 deletion(-) diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index 852d9a3ce6..232493b66f 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -434,6 +434,12 @@ Status NewOlapScanner::_init_return_columns() { _return_columns.push_back(index); if (slot->is_nullable() && !_tablet_schema->column(index).is_nullable()) { _tablet_columns_convert_to_null_set.emplace(index); + } else if (!slot->is_nullable() && _tablet_schema->column(index).is_nullable()) { + return Status::Error<ErrorCode::INVALID_SCHEMA>( + "slot(id: {}, name: {})'s nullable does not match " + "column(tablet id: {}, index: {}, name: {}) ", + slot->id(), slot->col_name(), _tablet_schema->table_id(), index, + _tablet_schema->column(index).name()); } } diff --git a/be/src/vec/exec/scan/new_olap_scanner.h b/be/src/vec/exec/scan/new_olap_scanner.h index a411056097..41dd4e416c 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.h +++ b/be/src/vec/exec/scan/new_olap_scanner.h @@ -87,7 +87,7 @@ private: const FilterPredicates& filter_predicates, const std::vector<FunctionFilter>& function_filters); - Status _init_return_columns(); + [[nodiscard]] Status _init_return_columns(); bool _aggregation; diff --git a/be/src/vec/exec/scan/pip_scanner_context.h b/be/src/vec/exec/scan/pip_scanner_context.h index a6bfe54aa0..3d3f80b467 100644 --- a/be/src/vec/exec/scan/pip_scanner_context.h +++ b/be/src/vec/exec/scan/pip_scanner_context.h @@ -57,6 +57,9 @@ public: return Status::OK(); } } + + RETURN_IF_ERROR(validate_block_schema((*block).get())); + _current_used_bytes -= (*block)->allocated_bytes(); return Status::OK(); } diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 1c1cde4647..a5f575d175 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -200,6 +200,9 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo if (!_blocks_queue.empty()) { *block = std::move(_blocks_queue.front()); _blocks_queue.pop_front(); + + RETURN_IF_ERROR(validate_block_schema((*block).get())); + auto block_bytes = (*block)->allocated_bytes(); _cur_bytes_in_queue -= block_bytes; _queued_blocks_memory_usage->add(-block_bytes); @@ -210,6 +213,32 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo return Status::OK(); } +Status ScannerContext::validate_block_schema(Block* block) { + size_t index = 0; + for (auto& slot : _output_tuple_desc->slots()) { + if (!slot->need_materialize()) { + continue; + } + auto& data = block->get_by_position(index++); + if (data.column->is_nullable() != data.type->is_nullable()) { + return Status::Error<ErrorCode::INVALID_SCHEMA>( + "column(name: {}) nullable({}) does not match type nullable({}), slot(id: {}, " + "name:{})", + data.name, data.column->is_nullable(), data.type->is_nullable(), slot->id(), + slot->col_name()); + } + + if (data.column->is_nullable() != slot->is_nullable()) { + return Status::Error<ErrorCode::INVALID_SCHEMA>( + "column(name: {}) nullable({}) does not match slot(id: {}, name: {}) " + "nullable({})", + data.name, data.column->is_nullable(), slot->id(), slot->col_name(), + slot->is_nullable()); + } + } + return Status::OK(); +} + bool ScannerContext::set_status_on_error(const Status& status, bool need_lock) { std::unique_lock l(_transfer_lock, std::defer_lock); if (need_lock) { diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index a87ef5c603..3aad0d6263 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -78,6 +78,8 @@ public: virtual Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* block, bool* eos, int id, bool wait = true); + [[nodiscard]] Status validate_block_schema(Block* block); + // When a scanner complete a scan, this method will be called // to return the scanner to the list for next scheduling. void push_back_scanner_and_reschedule(VScannerSPtr scanner); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org