hubgeter commented on code in PR #49944:
URL: https://github.com/apache/doris/pull/49944#discussion_r2046172798


##########
be/src/vec/exec/format/orc/vorc_reader.cpp:
##########
@@ -1864,7 +1881,7 @@ std::string OrcReader::get_field_name_lower_case(const 
orc::Type* orc_type, int
 
 Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
     RETURN_IF_ERROR(get_next_block_impl(block, read_rows, eof));
-    if (*eof) {
+    if (*eof && _profile != nullptr) {

Review Comment:
   In the materialization phase, `vfile_scanner` does not need `_profile`, so 
`_profile` here is nullptr. Since `_orc_profile` is not initialized when 
_profile = nullptr, I added this condition.



##########
be/src/vec/exec/scan/vfile_scanner.cpp:
##########
@@ -1156,41 +1087,216 @@ Status VFileScanner::_get_next_reader() {
             return Status::InternalError("failed to init reader, err: {}", 
init_status.to_string());
         }
 
-        _name_to_col_type.clear();
-        _missing_cols.clear();
-        RETURN_IF_ERROR(_cur_reader->get_columns(&_name_to_col_type, 
&_missing_cols));
         _cur_reader->set_push_down_agg_type(_get_push_down_agg_type());
-        RETURN_IF_ERROR(_generate_missing_columns());
-        RETURN_IF_ERROR(_cur_reader->set_fill_columns(_partition_col_descs, 
_missing_col_descs));
-        if (VLOG_NOTICE_IS_ON && !_missing_cols.empty() && _is_load) {
-            fmt::memory_buffer col_buf;
-            for (auto& col : _missing_cols) {
-                fmt::format_to(col_buf, " {}", col);
-            }
-            VLOG_NOTICE << fmt::format("Unknown columns:{} in file {}", 
fmt::to_string(col_buf),
-                                       range.path);
-        }
-
-        _source_file_col_names.clear();
-        _source_file_col_types.clear();
-        _source_file_col_name_types.clear();
-        if (_state->query_options().truncate_char_or_varchar_columns && 
need_to_get_parsed_schema) {
-            Status status = 
_cur_reader->get_parsed_schema(&_source_file_col_names,
-                                                           
&_source_file_col_types);
-            if (!status.ok() && status.code() != 
TStatusCode::NOT_IMPLEMENTED_ERROR) {
-                return status;
-            }
-            DCHECK(_source_file_col_names.size() == 
_source_file_col_types.size());
-            for (int i = 0; i < _source_file_col_names.size(); ++i) {
-                _source_file_col_name_types[_source_file_col_names[i]] = 
&_source_file_col_types[i];
-            }
-        }
+        
RETURN_IF_ERROR(_set_fill_or_truncate_columns(need_to_get_parsed_schema));
         _cur_reader_eof = false;
         break;
     }
     return Status::OK();
 }
 
+Status VFileScanner::_init_parquet_reader(std::unique_ptr<ParquetReader>&& 
parquet_reader) {
+    const TFileRangeDesc& range = _current_range;
+    Status init_status  = Status::OK();
+
+    if (range.__isset.table_format_params &&
+        range.table_format_params.table_format_type == "iceberg") {
+        std::unique_ptr<IcebergParquetReader> iceberg_reader =
+                IcebergParquetReader::create_unique(std::move(parquet_reader), 
_profile,
+                                                    _state, *_params, range, 
_kv_cache,
+                                                    _io_ctx.get());
+        init_status = iceberg_reader->init_reader(
+                _file_col_names, _col_id_name_map, _colname_to_value_range,
+                _push_down_conjuncts, _real_tuple_desc, 
_default_val_row_desc.get(),
+                _col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
+                &_slot_id_to_filter_conjuncts);
+        _cur_reader = std::move(iceberg_reader);
+    } else if (range.__isset.table_format_params &&
+               range.table_format_params.table_format_type == "paimon") {
+        std::vector<std::string> place_holder;
+        init_status = parquet_reader->init_reader(
+                _file_col_names, place_holder, _colname_to_value_range,
+                _push_down_conjuncts, _real_tuple_desc, 
_default_val_row_desc.get(),
+                _col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
+                &_slot_id_to_filter_conjuncts);
+        std::unique_ptr<PaimonParquetReader> paimon_reader =
+                PaimonParquetReader::create_unique(std::move(parquet_reader), 
_profile,
+                                                   _state, *_params, range, 
_io_ctx.get());
+        RETURN_IF_ERROR(paimon_reader->init_row_filters());
+        _cur_reader = std::move(paimon_reader);
+    } else {
+        bool hive_parquet_use_column_names = true;
+
+        if (range.__isset.table_format_params &&
+            range.table_format_params.table_format_type == "hive" && _state != 
nullptr)
+                [[likely]] {
+            hive_parquet_use_column_names =
+                    _state->query_options().hive_parquet_use_column_names;
+        }
+
+        std::vector<std::string> place_holder;
+        init_status = parquet_reader->init_reader(
+                _file_col_names, place_holder, _colname_to_value_range,
+                _push_down_conjuncts, _real_tuple_desc, 
_default_val_row_desc.get(),
+                _col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
+                &_slot_id_to_filter_conjuncts, true, 
hive_parquet_use_column_names);
+        _cur_reader = std::move(parquet_reader);
+    }
+    return init_status;
+}
+
+
+Status VFileScanner::_init_orc_reader(std::unique_ptr<OrcReader>&& orc_reader) 
{
+    const TFileRangeDesc& range = _current_range;
+    Status init_status  = Status::OK();
+
+    if (range.__isset.table_format_params &&
+        range.table_format_params.table_format_type == "transactional_hive") {
+        std::unique_ptr<TransactionalHiveReader> tran_orc_reader =
+                TransactionalHiveReader::create_unique(std::move(orc_reader), 
_profile,
+                                                       _state, *_params, range,
+                                                       _io_ctx.get());
+        init_status = tran_orc_reader->init_reader(
+                _file_col_names, _colname_to_value_range, _push_down_conjuncts,
+                _real_tuple_desc, _default_val_row_desc.get(),
+                &_not_single_slot_filter_conjuncts, 
&_slot_id_to_filter_conjuncts);
+        RETURN_IF_ERROR(tran_orc_reader->init_row_filters());
+        _cur_reader = std::move(tran_orc_reader);
+    } else if (range.__isset.table_format_params &&
+               range.table_format_params.table_format_type == "iceberg") {
+        std::unique_ptr<IcebergOrcReader> iceberg_reader =
+                IcebergOrcReader::create_unique(std::move(orc_reader), 
_profile, _state,
+                                                *_params, range, _kv_cache, 
_io_ctx.get());
+
+        init_status = iceberg_reader->init_reader(
+                _file_col_names, _col_id_name_map, _colname_to_value_range,
+                _push_down_conjuncts, _real_tuple_desc, 
_default_val_row_desc.get(),
+                _col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
+                &_slot_id_to_filter_conjuncts);
+        _cur_reader = std::move(iceberg_reader);
+    } else if (range.__isset.table_format_params &&
+               range.table_format_params.table_format_type == "paimon") {
+        init_status = orc_reader->init_reader(
+                &_file_col_names, _colname_to_value_range, 
_push_down_conjuncts, false,
+                _real_tuple_desc, _default_val_row_desc.get(),
+                &_not_single_slot_filter_conjuncts, 
&_slot_id_to_filter_conjuncts);
+        std::unique_ptr<PaimonOrcReader> paimon_reader = 
PaimonOrcReader::create_unique(
+                std::move(orc_reader), _profile, _state, *_params, range, 
_io_ctx.get());
+        RETURN_IF_ERROR(paimon_reader->init_row_filters());
+        _cur_reader = std::move(paimon_reader);
+    } else {
+        bool hive_orc_use_column_names = true;
+
+        if (range.__isset.table_format_params &&
+            range.table_format_params.table_format_type == "hive" && _state != 
nullptr)
+                [[likely]] {
+            hive_orc_use_column_names = 
_state->query_options().hive_orc_use_column_names;
+        }
+        init_status = orc_reader->init_reader(
+                &_file_col_names, _colname_to_value_range, 
_push_down_conjuncts, false,
+                _real_tuple_desc, _default_val_row_desc.get(),
+                &_not_single_slot_filter_conjuncts, 
&_slot_id_to_filter_conjuncts,
+                hive_orc_use_column_names);
+        _cur_reader = std::move(orc_reader);
+    }
+
+    return init_status;
+}
+
+Status VFileScanner::_set_fill_or_truncate_columns(bool 
need_to_get_parsed_schema) {
+    _name_to_col_type.clear();
+    _missing_cols.clear();
+    RETURN_IF_ERROR(_cur_reader->get_columns(&_name_to_col_type, 
&_missing_cols));
+    RETURN_IF_ERROR(_generate_missing_columns());
+    RETURN_IF_ERROR(_cur_reader->set_fill_columns(_partition_col_descs, 
_missing_col_descs));
+    if (VLOG_NOTICE_IS_ON && !_missing_cols.empty() && _is_load) {
+        fmt::memory_buffer col_buf;
+        for (auto& col : _missing_cols) {
+            fmt::format_to(col_buf, " {}", col);
+        }
+        VLOG_NOTICE << fmt::format("Unknown columns:{} in file {}", 
fmt::to_string(col_buf),
+                                   _current_range.path);
+    }
+
+    RETURN_IF_ERROR(_generate_truncate_columns(need_to_get_parsed_schema));
+    return Status::OK();
+}
+
+Status VFileScanner::_generate_truncate_columns(bool 
need_to_get_parsed_schema) {
+    _source_file_col_names.clear();
+    _source_file_col_types.clear();
+    _source_file_col_name_types.clear();
+    if (_state->query_options().truncate_char_or_varchar_columns && 
need_to_get_parsed_schema) {
+        Status status = _cur_reader->get_parsed_schema(&_source_file_col_names,
+                                                       
&_source_file_col_types);
+        if (!status.ok() && status.code() != 
TStatusCode::NOT_IMPLEMENTED_ERROR) {
+            return status;
+        }
+        DCHECK(_source_file_col_names.size() == _source_file_col_types.size());
+        for (int i = 0; i < _source_file_col_names.size(); ++i) {
+            _source_file_col_name_types[_source_file_col_names[i]] = 
&_source_file_col_types[i];
+        }
+    }
+    return Status::OK();
+}
+
+
+Status VFileScanner::read_one_line_from_current_range(segment_v2::rowid_t 
rowid,  Block* result_block,
+                                                      ExternalFileMappingInfo 
external_info) {
+    const TFileRangeDesc& range = _current_range;
+
+    RETURN_IF_ERROR(_init_io_ctx());

Review Comment:
   fix . `prepare_for_read_one_line` is only called once



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to