This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-1.1-lts in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.1-lts by this push: new 9810266ad1 [enhancement](load) enhance load from orc file (#12901) 9810266ad1 is described below commit 9810266ad1a8af581230c8f08b280d7724ca1069 Author: wxy <dut.xian...@gmail.com> AuthorDate: Mon Sep 26 09:28:25 2022 +0800 [enhancement](load) enhance load from orc file (#12901) This is part of PR 11742 Only support orc file. On master branch, we would like to support it in new file scan framework --- be/src/exec/orc_scanner.cpp | 76 ++++++++++++++++++++++++++++++++++++--------- be/src/exec/orc_scanner.h | 7 +++++ 2 files changed, 69 insertions(+), 14 deletions(-) diff --git a/be/src/exec/orc_scanner.cpp b/be/src/exec/orc_scanner.cpp index 6b3384f042..b8d2e91c4d 100644 --- a/be/src/exec/orc_scanner.cpp +++ b/be/src/exec/orc_scanner.cpp @@ -139,18 +139,6 @@ ORCScanner::~ORCScanner() { Status ORCScanner::open() { RETURN_IF_ERROR(BaseScanner::open()); - if (!_ranges.empty()) { - std::list<std::string> include_cols; - TBrokerRangeDesc range = _ranges[0]; - _num_of_columns_from_file = range.__isset.num_of_columns_from_file - ? range.num_of_columns_from_file - : _src_slot_descs.size(); - for (int i = 0; i < _num_of_columns_from_file; i++) { - auto slot_desc = _src_slot_descs.at(i); - include_cols.push_back(slot_desc->col_name()); - } - _row_reader_options.include(include_cols); - } return Status::OK(); } @@ -186,8 +174,13 @@ Status ORCScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bool* ((orc::StructVectorBatch*)_batch.get())->fields; for (int column_ipos = 0; column_ipos < _num_of_columns_from_file; ++column_ipos) { auto slot_desc = _src_slot_descs[column_ipos]; - orc::ColumnVectorBatch* cvb = batch_vec[_position_in_orc_original[column_ipos]]; + if (_map_column_to_id.find(slot_desc->col_name()) == _map_column_to_id.end()) { + // if slot not exist in file, set to null + _src_tuple->set_null(slot_desc->null_indicator_offset()); + continue; + } + orc::ColumnVectorBatch* cvb = batch_vec[_position_in_orc_original[column_ipos]]; if (cvb->hasNulls && !cvb->notNull[_current_line_of_group]) { if (!slot_desc->is_nullable()) { std::stringstream str_error; @@ -446,6 +439,24 @@ Status ORCScanner::open_next_reader() { if (_reader->getNumberOfRows() == 0) { continue; } + // build map from column name to type id + build_name_id_map(); + // set include names into read options + std::map<int, int> _include_cols_in_src_slots; + std::list<std::string> cols; + _num_of_columns_from_file = range.__isset.num_of_columns_from_file + ? range.num_of_columns_from_file + : _src_slot_descs.size(); + for (int i = 0; i < _num_of_columns_from_file; i++) { + auto slot_desc = _src_slot_descs.at(i); + + // get only columns exist orc file + if (_map_column_to_id.find(slot_desc->col_name()) != _map_column_to_id.end()) { + _include_cols_in_src_slots[cols.size()] = i; + cols.push_back(slot_desc->col_name()); + } + } + _row_reader_options.include(cols); _total_groups = _reader->getNumberOfStripes(); _current_group = 0; @@ -462,7 +473,9 @@ Status ORCScanner::open_next_reader() { //include columns must in reader field, otherwise createRowReader will throw exception auto pos = std::find(include_cols.begin(), include_cols.end(), _row_reader->getSelectedType().getFieldName(i)); - _position_in_orc_original.at(std::distance(include_cols.begin(), pos)) = orc_index++; + _position_in_orc_original.at( + _include_cols_in_src_slots[std::distance(include_cols.begin(), pos)]) = + orc_index++; } return Status::OK(); } @@ -475,4 +488,39 @@ void ORCScanner::close() { _row_reader.reset(nullptr); } +void ORCScanner::build_name_id_map() { + _map_column_to_id.clear(); + std::vector<std::string> columns; + const orc::Type& type = _reader->getType(); + build_name_id_map_impl(columns, &type); +} + +void ORCScanner::build_name_id_map_impl(std::vector<std::string>& columns, const orc::Type* type) { + if (orc::STRUCT == type->getKind()) { + for (size_t i = 0; i < type->getSubtypeCount(); ++i) { + const std::string& fieldName = type->getFieldName(i); + columns.push_back(fieldName); + _map_column_to_id[dot_column_path(columns)] = type->getSubtype(i)->getColumnId(); + build_name_id_map_impl(columns, type->getSubtype(i)); + columns.pop_back(); + } + } else { + // other non-primitive type + for (size_t j = 0; j < type->getSubtypeCount(); ++j) { + build_name_id_map_impl(columns, type->getSubtype(j)); + } + } +} + +std::string ORCScanner::dot_column_path(const std::vector<std::string>& columns) { + if (columns.empty()) { + return std::string(); + } + std::ostringstream columnStream; + std::copy(columns.begin(), columns.end(), + std::ostream_iterator<std::string>(columnStream, ".")); + std::string columnPath = columnStream.str(); + return columnPath.substr(0, columnPath.length() - 1); +} + } // namespace doris diff --git a/be/src/exec/orc_scanner.h b/be/src/exec/orc_scanner.h index 86b73b6b99..ee2cd3b36d 100644 --- a/be/src/exec/orc_scanner.h +++ b/be/src/exec/orc_scanner.h @@ -19,6 +19,7 @@ #define ORC_SCANNER_H #include <orc/OrcFile.hh> +#include <orc/Type.hh> #include "exec/base_scanner.h" @@ -47,6 +48,11 @@ public: private: // Read next buffer from reader Status open_next_reader(); + // Generate column path + std::string dot_column_path(const std::vector<std::string>& columns); + // Build map from column name to type id + void build_name_id_map(); + void build_name_id_map_impl(std::vector<std::string>& columns, const orc::Type* type); private: const std::vector<TBrokerRangeDesc>& _ranges; @@ -62,6 +68,7 @@ private: std::shared_ptr<orc::ColumnVectorBatch> _batch; std::unique_ptr<orc::Reader> _reader; std::unique_ptr<orc::RowReader> _row_reader; + std::map<std::string, int> _map_column_to_id; // The batch after reading from orc data is arranged in the original order, // so we need to record the index in the original order to correspond the column names to the order std::vector<int> _position_in_orc_original; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org