This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new d661bd867e [chore](scanner) check columns' nullable with schema 
(#24724) (#24811)
d661bd867e is described below

commit d661bd867e0ac9eb5df84203141d4d722f5504d2
Author: Jerry Hu <mrh...@gmail.com>
AuthorDate: Sat Sep 23 22:49:19 2023 +0800

    [chore](scanner) check columns' nullable with schema (#24724) (#24811)
    
    Add a validation to prevent potential schema inconsistency issues.
---
 be/src/vec/exec/scan/new_olap_scanner.cpp  |  6 ++++++
 be/src/vec/exec/scan/new_olap_scanner.h    |  2 +-
 be/src/vec/exec/scan/pip_scanner_context.h |  3 +++
 be/src/vec/exec/scan/scanner_context.cpp   | 29 +++++++++++++++++++++++++++++
 be/src/vec/exec/scan/scanner_context.h     |  2 ++
 5 files changed, 41 insertions(+), 1 deletion(-)

diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp 
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index 852d9a3ce6..232493b66f 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -434,6 +434,12 @@ Status NewOlapScanner::_init_return_columns() {
         _return_columns.push_back(index);
         if (slot->is_nullable() && 
!_tablet_schema->column(index).is_nullable()) {
             _tablet_columns_convert_to_null_set.emplace(index);
+        } else if (!slot->is_nullable() && 
_tablet_schema->column(index).is_nullable()) {
+            return Status::Error<ErrorCode::INVALID_SCHEMA>(
+                    "slot(id: {}, name: {})'s nullable does not match "
+                    "column(tablet id: {}, index: {}, name: {}) ",
+                    slot->id(), slot->col_name(), _tablet_schema->table_id(), 
index,
+                    _tablet_schema->column(index).name());
         }
     }
 
diff --git a/be/src/vec/exec/scan/new_olap_scanner.h 
b/be/src/vec/exec/scan/new_olap_scanner.h
index a411056097..41dd4e416c 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.h
+++ b/be/src/vec/exec/scan/new_olap_scanner.h
@@ -87,7 +87,7 @@ private:
                                       const FilterPredicates& 
filter_predicates,
                                       const std::vector<FunctionFilter>& 
function_filters);
 
-    Status _init_return_columns();
+    [[nodiscard]] Status _init_return_columns();
 
     bool _aggregation;
 
diff --git a/be/src/vec/exec/scan/pip_scanner_context.h 
b/be/src/vec/exec/scan/pip_scanner_context.h
index a6bfe54aa0..3d3f80b467 100644
--- a/be/src/vec/exec/scan/pip_scanner_context.h
+++ b/be/src/vec/exec/scan/pip_scanner_context.h
@@ -57,6 +57,9 @@ public:
                 return Status::OK();
             }
         }
+
+        RETURN_IF_ERROR(validate_block_schema((*block).get()));
+
         _current_used_bytes -= (*block)->allocated_bytes();
         return Status::OK();
     }
diff --git a/be/src/vec/exec/scan/scanner_context.cpp 
b/be/src/vec/exec/scan/scanner_context.cpp
index 1c1cde4647..a5f575d175 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -200,6 +200,9 @@ Status ScannerContext::get_block_from_queue(RuntimeState* 
state, vectorized::Blo
     if (!_blocks_queue.empty()) {
         *block = std::move(_blocks_queue.front());
         _blocks_queue.pop_front();
+
+        RETURN_IF_ERROR(validate_block_schema((*block).get()));
+
         auto block_bytes = (*block)->allocated_bytes();
         _cur_bytes_in_queue -= block_bytes;
         _queued_blocks_memory_usage->add(-block_bytes);
@@ -210,6 +213,32 @@ Status ScannerContext::get_block_from_queue(RuntimeState* 
state, vectorized::Blo
     return Status::OK();
 }
 
+Status ScannerContext::validate_block_schema(Block* block) {
+    size_t index = 0;
+    for (auto& slot : _output_tuple_desc->slots()) {
+        if (!slot->need_materialize()) {
+            continue;
+        }
+        auto& data = block->get_by_position(index++);
+        if (data.column->is_nullable() != data.type->is_nullable()) {
+            return Status::Error<ErrorCode::INVALID_SCHEMA>(
+                    "column(name: {}) nullable({}) does not match type 
nullable({}), slot(id: {}, "
+                    "name:{})",
+                    data.name, data.column->is_nullable(), 
data.type->is_nullable(), slot->id(),
+                    slot->col_name());
+        }
+
+        if (data.column->is_nullable() != slot->is_nullable()) {
+            return Status::Error<ErrorCode::INVALID_SCHEMA>(
+                    "column(name: {}) nullable({}) does not match slot(id: {}, 
name: {}) "
+                    "nullable({})",
+                    data.name, data.column->is_nullable(), slot->id(), 
slot->col_name(),
+                    slot->is_nullable());
+        }
+    }
+    return Status::OK();
+}
+
 bool ScannerContext::set_status_on_error(const Status& status, bool need_lock) 
{
     std::unique_lock l(_transfer_lock, std::defer_lock);
     if (need_lock) {
diff --git a/be/src/vec/exec/scan/scanner_context.h 
b/be/src/vec/exec/scan/scanner_context.h
index a87ef5c603..3aad0d6263 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -78,6 +78,8 @@ public:
     virtual Status get_block_from_queue(RuntimeState* state, 
vectorized::BlockUPtr* block,
                                         bool* eos, int id, bool wait = true);
 
+    [[nodiscard]] Status validate_block_schema(Block* block);
+
     // When a scanner complete a scan, this method will be called
     // to return the scanner to the list for next scheduling.
     void push_back_scanner_and_reschedule(VScannerSPtr scanner);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to