This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b25376cf8 [feature](torc) support insert only transactional hive 
table on be side (#19518)
0b25376cf8 is described below

commit 0b25376cf8c87381bf6ddc7f795e2cf11db5ee9a
Author: Qi Chen <kaka11.c...@gmail.com>
AuthorDate: Thu May 11 14:15:09 2023 +0800

    [feature](torc) support insert only transactional hive table on be side 
(#19518)
---
 be/src/vec/exec/format/orc/vorc_reader.cpp | 105 +++++++++++++++++++++++++----
 be/src/vec/exec/format/orc/vorc_reader.h   |   9 +++
 2 files changed, 100 insertions(+), 14 deletions(-)

diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp 
b/be/src/vec/exec/format/orc/vorc_reader.cpp
index 6a88360b49..eb944bb84c 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -86,6 +86,14 @@ enum class FileCachePolicy : uint8_t;
 
 namespace doris::vectorized {
 
+static const char* ACID_EVENT_FIELD_NAMES[] = {"operation", 
"originalTransaction", "bucket",
+                                               "rowId",     
"currentTransaction",  "row"};
+
+static const char* ACID_EVENT_FIELD_NAMES_LOWER_CASE[] = {
+        "operation", "originaltransaction", "bucket", "rowid", 
"currenttransaction", "row"};
+
+static const int ACID_ROW_OFFSET = 5;
+
 #define FOR_FLAT_ORC_COLUMNS(M)                            \
     M(TypeIndex::Int8, Int8, orc::LongVectorBatch)         \
     M(TypeIndex::UInt8, UInt8, orc::LongVectorBatch)       \
@@ -245,7 +253,7 @@ Status OrcReader::init_reader(
 Status OrcReader::get_parsed_schema(std::vector<std::string>* col_names,
                                     std::vector<TypeDescriptor>* col_types) {
     RETURN_IF_ERROR(_create_file_reader());
-    auto& root_type = _reader->getType();
+    auto& root_type = _remove_acid(_reader->getType());
     for (int i = 0; i < root_type.getSubtypeCount(); ++i) {
         col_names->emplace_back(_get_field_name_lower_case(&root_type, i));
         
col_types->emplace_back(_convert_to_doris_type(root_type.getSubtype(i)));
@@ -257,23 +265,32 @@ Status OrcReader::_init_read_columns() {
     auto& root_type = _reader->getType();
     std::vector<std::string> orc_cols;
     std::vector<std::string> orc_cols_lower_case;
-    for (int i = 0; i < root_type.getSubtypeCount(); ++i) {
-        orc_cols.emplace_back(root_type.getFieldName(i));
-        
orc_cols_lower_case.emplace_back(_get_field_name_lower_case(&root_type, i));
-    }
+    _init_orc_cols(root_type, orc_cols, orc_cols_lower_case);
+
+    bool is_acid = _check_acid_schema(root_type);
     for (auto& col_name : _column_names) {
         if (_is_hive) {
             auto iter = _scan_params.slot_name_to_schema_pos.find(col_name);
             DCHECK(iter != _scan_params.slot_name_to_schema_pos.end());
             int pos = iter->second;
-            orc_cols_lower_case[pos] = iter->first;
+            if (is_acid) {
+                orc_cols_lower_case[ACID_ROW_OFFSET + 1 + pos] = iter->first;
+            } else {
+                orc_cols_lower_case[pos] = iter->first;
+            }
         }
         auto iter = std::find(orc_cols_lower_case.begin(), 
orc_cols_lower_case.end(), col_name);
         if (iter == orc_cols_lower_case.end()) {
             _missing_cols.emplace_back(col_name);
         } else {
             int pos = std::distance(orc_cols_lower_case.begin(), iter);
-            _read_cols.emplace_back(orc_cols[pos]);
+            if (is_acid) {
+                auto read_col = fmt::format("{}.{}", 
ACID_EVENT_FIELD_NAMES[ACID_ROW_OFFSET],
+                                            orc_cols[pos]);
+                _read_cols.emplace_back(read_col);
+            } else {
+                _read_cols.emplace_back(orc_cols[pos]);
+            }
             _read_cols_lower_case.emplace_back(col_name);
             // For hive engine, store the orc column name to schema column 
name map.
             // This is for Hive 1.x orc file with internal column name _col0, 
_col1...
@@ -286,6 +303,44 @@ Status OrcReader::_init_read_columns() {
     return Status::OK();
 }
 
+void OrcReader::_init_orc_cols(const orc::Type& type, 
std::vector<std::string>& orc_cols,
+                               std::vector<std::string>& orc_cols_lower_case) {
+    for (int i = 0; i < type.getSubtypeCount(); ++i) {
+        orc_cols.emplace_back(type.getFieldName(i));
+        orc_cols_lower_case.emplace_back(_get_field_name_lower_case(&type, i));
+        const orc::Type* sub_type = type.getSubtype(i);
+        if (sub_type->getKind() == orc::TypeKind::STRUCT) {
+            _init_orc_cols(*sub_type, orc_cols, orc_cols_lower_case);
+        }
+    }
+}
+
+bool OrcReader::_check_acid_schema(const orc::Type& type) {
+    if (orc::TypeKind::STRUCT == type.getKind()) {
+        if (type.getSubtypeCount() != std::size(ACID_EVENT_FIELD_NAMES)) {
+            return false;
+        }
+        for (uint64_t i = 0; i < type.getSubtypeCount(); ++i) {
+            const std::string& field_name = type.getFieldName(i);
+            std::string field_name_lower_case = field_name;
+            std::transform(field_name.begin(), field_name.end(), 
field_name_lower_case.begin(),
+                           [](unsigned char c) { return std::tolower(c); });
+            if (field_name_lower_case != ACID_EVENT_FIELD_NAMES_LOWER_CASE[i]) 
{
+                return false;
+            }
+        }
+    }
+    return true;
+}
+
+const orc::Type& OrcReader::_remove_acid(const orc::Type& type) {
+    if (_check_acid_schema(type)) {
+        return *(type.getSubtype(ACID_ROW_OFFSET));
+    } else {
+        return type;
+    }
+}
+
 //  orc only support LONG, FLOAT, STRING, DATE, DECIMAL, TIMESTAMP, BOOLEAN to 
push down predicates
 static std::unordered_map<orc::TypeKind, orc::PredicateDataType> 
TYPEKIND_TO_PREDICATE_TYPE = {
         {orc::TypeKind::BYTE, orc::PredicateDataType::LONG},
@@ -678,18 +733,27 @@ Status OrcReader::set_fill_columns(
         return Status::InternalError("Failed to create orc row reader. reason 
= {}", e.what());
     }
     auto& selected_type = _row_reader->getSelectedType();
-    _col_orc_type.resize(selected_type.getSubtypeCount());
-    for (int i = 0; i < selected_type.getSubtypeCount(); ++i) {
+    int idx = 0;
+    _init_select_types(selected_type, idx);
+    return Status::OK();
+}
+
+Status OrcReader::_init_select_types(const orc::Type& type, int idx) {
+    for (int i = 0; i < type.getSubtypeCount(); ++i) {
         std::string name;
         // For hive engine, translate the column name in orc file to schema 
column name.
         // This is for Hive 1.x which use internal column name _col0, _col1...
         if (_is_hive) {
-            name = _file_col_to_schema_col[selected_type.getFieldName(i)];
+            name = _file_col_to_schema_col[type.getFieldName(i)];
         } else {
-            name = _get_field_name_lower_case(&selected_type, i);
+            name = _get_field_name_lower_case(&type, i);
+        }
+        _colname_to_idx[name] = idx++;
+        const orc::Type* sub_type = type.getSubtype(i);
+        _col_orc_type.push_back(sub_type);
+        if (sub_type->getKind() == orc::TypeKind::STRUCT) {
+            _init_select_types(*sub_type, idx);
         }
-        _colname_to_idx[name] = i;
-        _col_orc_type[i] = selected_type.getSubtype(i);
     }
     return Status::OK();
 }
@@ -1151,7 +1215,10 @@ Status OrcReader::get_next_block(Block* block, size_t* 
read_rows, bool* eof) {
                 return Status::OK();
             }
         }
-        const auto& batch_vec = 
down_cast<orc::StructVectorBatch*>(_batch.get())->fields;
+
+        std::vector<orc::ColumnVectorBatch*> batch_vec;
+        _fill_batch_vec(batch_vec, _batch.get(), 0);
+
         for (auto& col_name : _lazy_read_ctx.all_read_columns) {
             auto& column_with_type_and_name = block->get_by_name(col_name);
             auto& column_ptr = column_with_type_and_name.column;
@@ -1186,6 +1253,16 @@ Status OrcReader::get_next_block(Block* block, size_t* 
read_rows, bool* eof) {
     return Status::OK();
 }
 
+void OrcReader::_fill_batch_vec(std::vector<orc::ColumnVectorBatch*>& result,
+                                orc::ColumnVectorBatch* batch, int idx) {
+    for (auto* field : down_cast<orc::StructVectorBatch*>(batch)->fields) {
+        result.push_back(field);
+        if (_col_orc_type[idx++]->getKind() == orc::TypeKind::STRUCT) {
+            _fill_batch_vec(result, field, idx);
+        }
+    }
+}
+
 Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t 
size, void* arg) {
     Block* block = (Block*)arg;
 
diff --git a/be/src/vec/exec/format/orc/vorc_reader.h 
b/be/src/vec/exec/format/orc/vorc_reader.h
index 690a8932d0..45e2034b07 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.h
+++ b/be/src/vec/exec/format/orc/vorc_reader.h
@@ -149,6 +149,8 @@ public:
                     partition_columns,
             const std::unordered_map<std::string, VExprContext*>& 
missing_columns) override;
 
+    Status _init_select_types(const orc::Type& type, int idx);
+
     Status _fill_partition_columns(
             Block* block, size_t rows,
             const std::unordered_map<std::string, std::tuple<std::string, 
const SlotDescriptor*>>&
@@ -159,6 +161,9 @@ public:
 
     Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
 
+    void _fill_batch_vec(std::vector<orc::ColumnVectorBatch*>& result,
+                         orc::ColumnVectorBatch* batch, int idx);
+
     void close();
 
     int64_t size() const;
@@ -204,6 +209,10 @@ private:
 
     void _init_profile();
     Status _init_read_columns();
+    void _init_orc_cols(const orc::Type& type, std::vector<std::string>& 
orc_cols,
+                        std::vector<std::string>& orc_cols_lower_case);
+    static bool _check_acid_schema(const orc::Type& type);
+    static const orc::Type& _remove_acid(const orc::Type& type);
     TypeDescriptor _convert_to_doris_type(const orc::Type* orc_type);
     bool _init_search_argument(
             std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to