This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.1-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-1.1-lts by this push:
     new 9810266ad1 [enhancement](load) enhance load from orc file (#12901)
9810266ad1 is described below

commit 9810266ad1a8af581230c8f08b280d7724ca1069
Author: wxy <dut.xian...@gmail.com>
AuthorDate: Mon Sep 26 09:28:25 2022 +0800

    [enhancement](load) enhance load from orc file (#12901)
    
    This is part of PR 11742
    Only support orc file.
    On master branch, we would like to support it in new file scan framework
---
 be/src/exec/orc_scanner.cpp | 76 ++++++++++++++++++++++++++++++++++++---------
 be/src/exec/orc_scanner.h   |  7 +++++
 2 files changed, 69 insertions(+), 14 deletions(-)

diff --git a/be/src/exec/orc_scanner.cpp b/be/src/exec/orc_scanner.cpp
index 6b3384f042..b8d2e91c4d 100644
--- a/be/src/exec/orc_scanner.cpp
+++ b/be/src/exec/orc_scanner.cpp
@@ -139,18 +139,6 @@ ORCScanner::~ORCScanner() {
 
 Status ORCScanner::open() {
     RETURN_IF_ERROR(BaseScanner::open());
-    if (!_ranges.empty()) {
-        std::list<std::string> include_cols;
-        TBrokerRangeDesc range = _ranges[0];
-        _num_of_columns_from_file = range.__isset.num_of_columns_from_file
-                                            ? range.num_of_columns_from_file
-                                            : _src_slot_descs.size();
-        for (int i = 0; i < _num_of_columns_from_file; i++) {
-            auto slot_desc = _src_slot_descs.at(i);
-            include_cols.push_back(slot_desc->col_name());
-        }
-        _row_reader_options.include(include_cols);
-    }
 
     return Status::OK();
 }
@@ -186,8 +174,13 @@ Status ORCScanner::get_next(Tuple* tuple, MemPool* 
tuple_pool, bool* eof, bool*
                     ((orc::StructVectorBatch*)_batch.get())->fields;
             for (int column_ipos = 0; column_ipos < _num_of_columns_from_file; 
++column_ipos) {
                 auto slot_desc = _src_slot_descs[column_ipos];
-                orc::ColumnVectorBatch* cvb = 
batch_vec[_position_in_orc_original[column_ipos]];
+                if (_map_column_to_id.find(slot_desc->col_name()) == 
_map_column_to_id.end()) {
+                    // if slot not exist in file, set to null
+                    _src_tuple->set_null(slot_desc->null_indicator_offset());
+                    continue;
+                }
 
+                orc::ColumnVectorBatch* cvb = 
batch_vec[_position_in_orc_original[column_ipos]];
                 if (cvb->hasNulls && !cvb->notNull[_current_line_of_group]) {
                     if (!slot_desc->is_nullable()) {
                         std::stringstream str_error;
@@ -446,6 +439,24 @@ Status ORCScanner::open_next_reader() {
         if (_reader->getNumberOfRows() == 0) {
             continue;
         }
+        // build map from column name to type id
+        build_name_id_map();
+        // set include names into read options
+        std::map<int, int> _include_cols_in_src_slots;
+        std::list<std::string> cols;
+        _num_of_columns_from_file = range.__isset.num_of_columns_from_file
+                                            ? range.num_of_columns_from_file
+                                            : _src_slot_descs.size();
+        for (int i = 0; i < _num_of_columns_from_file; i++) {
+            auto slot_desc = _src_slot_descs.at(i);
+
+            // get only columns exist orc file
+            if (_map_column_to_id.find(slot_desc->col_name()) != 
_map_column_to_id.end()) {
+                _include_cols_in_src_slots[cols.size()] = i;
+                cols.push_back(slot_desc->col_name());
+            }
+        }
+        _row_reader_options.include(cols);
 
         _total_groups = _reader->getNumberOfStripes();
         _current_group = 0;
@@ -462,7 +473,9 @@ Status ORCScanner::open_next_reader() {
             //include columns must in reader field, otherwise createRowReader 
will throw exception
             auto pos = std::find(include_cols.begin(), include_cols.end(),
                                  
_row_reader->getSelectedType().getFieldName(i));
-            _position_in_orc_original.at(std::distance(include_cols.begin(), 
pos)) = orc_index++;
+            _position_in_orc_original.at(
+                    
_include_cols_in_src_slots[std::distance(include_cols.begin(), pos)]) =
+                    orc_index++;
         }
         return Status::OK();
     }
@@ -475,4 +488,39 @@ void ORCScanner::close() {
     _row_reader.reset(nullptr);
 }
 
+void ORCScanner::build_name_id_map() {
+    _map_column_to_id.clear();
+    std::vector<std::string> columns;
+    const orc::Type& type = _reader->getType();
+    build_name_id_map_impl(columns, &type);
+}
+
+void ORCScanner::build_name_id_map_impl(std::vector<std::string>& columns, 
const orc::Type* type) {
+    if (orc::STRUCT == type->getKind()) {
+        for (size_t i = 0; i < type->getSubtypeCount(); ++i) {
+            const std::string& fieldName = type->getFieldName(i);
+            columns.push_back(fieldName);
+            _map_column_to_id[dot_column_path(columns)] = 
type->getSubtype(i)->getColumnId();
+            build_name_id_map_impl(columns, type->getSubtype(i));
+            columns.pop_back();
+        }
+    } else {
+        // other non-primitive type
+        for (size_t j = 0; j < type->getSubtypeCount(); ++j) {
+            build_name_id_map_impl(columns, type->getSubtype(j));
+        }
+    }
+}
+
+std::string ORCScanner::dot_column_path(const std::vector<std::string>& 
columns) {
+    if (columns.empty()) {
+        return std::string();
+    }
+    std::ostringstream columnStream;
+    std::copy(columns.begin(), columns.end(),
+              std::ostream_iterator<std::string>(columnStream, "."));
+    std::string columnPath = columnStream.str();
+    return columnPath.substr(0, columnPath.length() - 1);
+}
+
 } // namespace doris
diff --git a/be/src/exec/orc_scanner.h b/be/src/exec/orc_scanner.h
index 86b73b6b99..ee2cd3b36d 100644
--- a/be/src/exec/orc_scanner.h
+++ b/be/src/exec/orc_scanner.h
@@ -19,6 +19,7 @@
 #define ORC_SCANNER_H
 
 #include <orc/OrcFile.hh>
+#include <orc/Type.hh>
 
 #include "exec/base_scanner.h"
 
@@ -47,6 +48,11 @@ public:
 private:
     // Read next buffer from reader
     Status open_next_reader();
+    // Generate column path
+    std::string dot_column_path(const std::vector<std::string>& columns);
+    // Build map from column name to type id
+    void build_name_id_map();
+    void build_name_id_map_impl(std::vector<std::string>& columns, const 
orc::Type* type);
 
 private:
     const std::vector<TBrokerRangeDesc>& _ranges;
@@ -62,6 +68,7 @@ private:
     std::shared_ptr<orc::ColumnVectorBatch> _batch;
     std::unique_ptr<orc::Reader> _reader;
     std::unique_ptr<orc::RowReader> _row_reader;
+    std::map<std::string, int> _map_column_to_id;
     // The batch after reading from orc data is arranged in the original order,
     // so we need to record the index in the original order to correspond the 
column names to the order
     std::vector<int> _position_in_orc_original;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to