This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 0b25376cf8 [feature](torc) support insert only transactional hive table on be side (#19518) 0b25376cf8 is described below commit 0b25376cf8c87381bf6ddc7f795e2cf11db5ee9a Author: Qi Chen <kaka11.c...@gmail.com> AuthorDate: Thu May 11 14:15:09 2023 +0800 [feature](torc) support insert only transactional hive table on be side (#19518) --- be/src/vec/exec/format/orc/vorc_reader.cpp | 105 +++++++++++++++++++++++++---- be/src/vec/exec/format/orc/vorc_reader.h | 9 +++ 2 files changed, 100 insertions(+), 14 deletions(-) diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index 6a88360b49..eb944bb84c 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -86,6 +86,14 @@ enum class FileCachePolicy : uint8_t; namespace doris::vectorized { +static const char* ACID_EVENT_FIELD_NAMES[] = {"operation", "originalTransaction", "bucket", + "rowId", "currentTransaction", "row"}; + +static const char* ACID_EVENT_FIELD_NAMES_LOWER_CASE[] = { + "operation", "originaltransaction", "bucket", "rowid", "currenttransaction", "row"}; + +static const int ACID_ROW_OFFSET = 5; + #define FOR_FLAT_ORC_COLUMNS(M) \ M(TypeIndex::Int8, Int8, orc::LongVectorBatch) \ M(TypeIndex::UInt8, UInt8, orc::LongVectorBatch) \ @@ -245,7 +253,7 @@ Status OrcReader::init_reader( Status OrcReader::get_parsed_schema(std::vector<std::string>* col_names, std::vector<TypeDescriptor>* col_types) { RETURN_IF_ERROR(_create_file_reader()); - auto& root_type = _reader->getType(); + auto& root_type = _remove_acid(_reader->getType()); for (int i = 0; i < root_type.getSubtypeCount(); ++i) { col_names->emplace_back(_get_field_name_lower_case(&root_type, i)); col_types->emplace_back(_convert_to_doris_type(root_type.getSubtype(i))); @@ -257,23 +265,32 @@ Status OrcReader::_init_read_columns() { auto& root_type = _reader->getType(); std::vector<std::string> orc_cols; std::vector<std::string> orc_cols_lower_case; - for (int i = 0; i < root_type.getSubtypeCount(); ++i) { - orc_cols.emplace_back(root_type.getFieldName(i)); - orc_cols_lower_case.emplace_back(_get_field_name_lower_case(&root_type, i)); - } + _init_orc_cols(root_type, orc_cols, orc_cols_lower_case); + + bool is_acid = _check_acid_schema(root_type); for (auto& col_name : _column_names) { if (_is_hive) { auto iter = _scan_params.slot_name_to_schema_pos.find(col_name); DCHECK(iter != _scan_params.slot_name_to_schema_pos.end()); int pos = iter->second; - orc_cols_lower_case[pos] = iter->first; + if (is_acid) { + orc_cols_lower_case[ACID_ROW_OFFSET + 1 + pos] = iter->first; + } else { + orc_cols_lower_case[pos] = iter->first; + } } auto iter = std::find(orc_cols_lower_case.begin(), orc_cols_lower_case.end(), col_name); if (iter == orc_cols_lower_case.end()) { _missing_cols.emplace_back(col_name); } else { int pos = std::distance(orc_cols_lower_case.begin(), iter); - _read_cols.emplace_back(orc_cols[pos]); + if (is_acid) { + auto read_col = fmt::format("{}.{}", ACID_EVENT_FIELD_NAMES[ACID_ROW_OFFSET], + orc_cols[pos]); + _read_cols.emplace_back(read_col); + } else { + _read_cols.emplace_back(orc_cols[pos]); + } _read_cols_lower_case.emplace_back(col_name); // For hive engine, store the orc column name to schema column name map. // This is for Hive 1.x orc file with internal column name _col0, _col1... @@ -286,6 +303,44 @@ Status OrcReader::_init_read_columns() { return Status::OK(); } +void OrcReader::_init_orc_cols(const orc::Type& type, std::vector<std::string>& orc_cols, + std::vector<std::string>& orc_cols_lower_case) { + for (int i = 0; i < type.getSubtypeCount(); ++i) { + orc_cols.emplace_back(type.getFieldName(i)); + orc_cols_lower_case.emplace_back(_get_field_name_lower_case(&type, i)); + const orc::Type* sub_type = type.getSubtype(i); + if (sub_type->getKind() == orc::TypeKind::STRUCT) { + _init_orc_cols(*sub_type, orc_cols, orc_cols_lower_case); + } + } +} + +bool OrcReader::_check_acid_schema(const orc::Type& type) { + if (orc::TypeKind::STRUCT == type.getKind()) { + if (type.getSubtypeCount() != std::size(ACID_EVENT_FIELD_NAMES)) { + return false; + } + for (uint64_t i = 0; i < type.getSubtypeCount(); ++i) { + const std::string& field_name = type.getFieldName(i); + std::string field_name_lower_case = field_name; + std::transform(field_name.begin(), field_name.end(), field_name_lower_case.begin(), + [](unsigned char c) { return std::tolower(c); }); + if (field_name_lower_case != ACID_EVENT_FIELD_NAMES_LOWER_CASE[i]) { + return false; + } + } + } + return true; +} + +const orc::Type& OrcReader::_remove_acid(const orc::Type& type) { + if (_check_acid_schema(type)) { + return *(type.getSubtype(ACID_ROW_OFFSET)); + } else { + return type; + } +} + // orc only support LONG, FLOAT, STRING, DATE, DECIMAL, TIMESTAMP, BOOLEAN to push down predicates static std::unordered_map<orc::TypeKind, orc::PredicateDataType> TYPEKIND_TO_PREDICATE_TYPE = { {orc::TypeKind::BYTE, orc::PredicateDataType::LONG}, @@ -678,18 +733,27 @@ Status OrcReader::set_fill_columns( return Status::InternalError("Failed to create orc row reader. reason = {}", e.what()); } auto& selected_type = _row_reader->getSelectedType(); - _col_orc_type.resize(selected_type.getSubtypeCount()); - for (int i = 0; i < selected_type.getSubtypeCount(); ++i) { + int idx = 0; + _init_select_types(selected_type, idx); + return Status::OK(); +} + +Status OrcReader::_init_select_types(const orc::Type& type, int idx) { + for (int i = 0; i < type.getSubtypeCount(); ++i) { std::string name; // For hive engine, translate the column name in orc file to schema column name. // This is for Hive 1.x which use internal column name _col0, _col1... if (_is_hive) { - name = _file_col_to_schema_col[selected_type.getFieldName(i)]; + name = _file_col_to_schema_col[type.getFieldName(i)]; } else { - name = _get_field_name_lower_case(&selected_type, i); + name = _get_field_name_lower_case(&type, i); + } + _colname_to_idx[name] = idx++; + const orc::Type* sub_type = type.getSubtype(i); + _col_orc_type.push_back(sub_type); + if (sub_type->getKind() == orc::TypeKind::STRUCT) { + _init_select_types(*sub_type, idx); } - _colname_to_idx[name] = i; - _col_orc_type[i] = selected_type.getSubtype(i); } return Status::OK(); } @@ -1151,7 +1215,10 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { return Status::OK(); } } - const auto& batch_vec = down_cast<orc::StructVectorBatch*>(_batch.get())->fields; + + std::vector<orc::ColumnVectorBatch*> batch_vec; + _fill_batch_vec(batch_vec, _batch.get(), 0); + for (auto& col_name : _lazy_read_ctx.all_read_columns) { auto& column_with_type_and_name = block->get_by_name(col_name); auto& column_ptr = column_with_type_and_name.column; @@ -1186,6 +1253,16 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { return Status::OK(); } +void OrcReader::_fill_batch_vec(std::vector<orc::ColumnVectorBatch*>& result, + orc::ColumnVectorBatch* batch, int idx) { + for (auto* field : down_cast<orc::StructVectorBatch*>(batch)->fields) { + result.push_back(field); + if (_col_orc_type[idx++]->getKind() == orc::TypeKind::STRUCT) { + _fill_batch_vec(result, field, idx); + } + } +} + Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t size, void* arg) { Block* block = (Block*)arg; diff --git a/be/src/vec/exec/format/orc/vorc_reader.h b/be/src/vec/exec/format/orc/vorc_reader.h index 690a8932d0..45e2034b07 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.h +++ b/be/src/vec/exec/format/orc/vorc_reader.h @@ -149,6 +149,8 @@ public: partition_columns, const std::unordered_map<std::string, VExprContext*>& missing_columns) override; + Status _init_select_types(const orc::Type& type, int idx); + Status _fill_partition_columns( Block* block, size_t rows, const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>& @@ -159,6 +161,9 @@ public: Status get_next_block(Block* block, size_t* read_rows, bool* eof) override; + void _fill_batch_vec(std::vector<orc::ColumnVectorBatch*>& result, + orc::ColumnVectorBatch* batch, int idx); + void close(); int64_t size() const; @@ -204,6 +209,10 @@ private: void _init_profile(); Status _init_read_columns(); + void _init_orc_cols(const orc::Type& type, std::vector<std::string>& orc_cols, + std::vector<std::string>& orc_cols_lower_case); + static bool _check_acid_schema(const orc::Type& type); + static const orc::Type& _remove_acid(const orc::Type& type); TypeDescriptor _convert_to_doris_type(const orc::Type* orc_type); bool _init_search_argument( std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org