HappenLee commented on code in PR #49944:
URL: https://github.com/apache/doris/pull/49944#discussion_r2041174660


##########
be/src/vec/exec/scan/vfile_scanner.cpp:
##########
@@ -1156,41 +1087,216 @@ Status VFileScanner::_get_next_reader() {
             return Status::InternalError("failed to init reader, err: {}", 
init_status.to_string());
         }
 
-        _name_to_col_type.clear();
-        _missing_cols.clear();
-        RETURN_IF_ERROR(_cur_reader->get_columns(&_name_to_col_type, 
&_missing_cols));
         _cur_reader->set_push_down_agg_type(_get_push_down_agg_type());
-        RETURN_IF_ERROR(_generate_missing_columns());
-        RETURN_IF_ERROR(_cur_reader->set_fill_columns(_partition_col_descs, 
_missing_col_descs));
-        if (VLOG_NOTICE_IS_ON && !_missing_cols.empty() && _is_load) {
-            fmt::memory_buffer col_buf;
-            for (auto& col : _missing_cols) {
-                fmt::format_to(col_buf, " {}", col);
-            }
-            VLOG_NOTICE << fmt::format("Unknown columns:{} in file {}", 
fmt::to_string(col_buf),
-                                       range.path);
-        }
-
-        _source_file_col_names.clear();
-        _source_file_col_types.clear();
-        _source_file_col_name_types.clear();
-        if (_state->query_options().truncate_char_or_varchar_columns && 
need_to_get_parsed_schema) {
-            Status status = 
_cur_reader->get_parsed_schema(&_source_file_col_names,
-                                                           
&_source_file_col_types);
-            if (!status.ok() && status.code() != 
TStatusCode::NOT_IMPLEMENTED_ERROR) {
-                return status;
-            }
-            DCHECK(_source_file_col_names.size() == 
_source_file_col_types.size());
-            for (int i = 0; i < _source_file_col_names.size(); ++i) {
-                _source_file_col_name_types[_source_file_col_names[i]] = 
&_source_file_col_types[i];
-            }
-        }
+        
RETURN_IF_ERROR(_set_fill_or_truncate_columns(need_to_get_parsed_schema));
         _cur_reader_eof = false;
         break;
     }
     return Status::OK();
 }
 
+Status VFileScanner::_init_parquet_reader(std::unique_ptr<ParquetReader>&& 
parquet_reader) {
+    const TFileRangeDesc& range = _current_range;
+    Status init_status  = Status::OK();
+
+    if (range.__isset.table_format_params &&
+        range.table_format_params.table_format_type == "iceberg") {
+        std::unique_ptr<IcebergParquetReader> iceberg_reader =
+                IcebergParquetReader::create_unique(std::move(parquet_reader), 
_profile,
+                                                    _state, *_params, range, 
_kv_cache,
+                                                    _io_ctx.get());
+        init_status = iceberg_reader->init_reader(
+                _file_col_names, _col_id_name_map, _colname_to_value_range,
+                _push_down_conjuncts, _real_tuple_desc, 
_default_val_row_desc.get(),
+                _col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
+                &_slot_id_to_filter_conjuncts);
+        _cur_reader = std::move(iceberg_reader);
+    } else if (range.__isset.table_format_params &&
+               range.table_format_params.table_format_type == "paimon") {
+        std::vector<std::string> place_holder;
+        init_status = parquet_reader->init_reader(
+                _file_col_names, place_holder, _colname_to_value_range,
+                _push_down_conjuncts, _real_tuple_desc, 
_default_val_row_desc.get(),
+                _col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
+                &_slot_id_to_filter_conjuncts);
+        std::unique_ptr<PaimonParquetReader> paimon_reader =
+                PaimonParquetReader::create_unique(std::move(parquet_reader), 
_profile,
+                                                   _state, *_params, range, 
_io_ctx.get());
+        RETURN_IF_ERROR(paimon_reader->init_row_filters());
+        _cur_reader = std::move(paimon_reader);
+    } else {
+        bool hive_parquet_use_column_names = true;
+
+        if (range.__isset.table_format_params &&
+            range.table_format_params.table_format_type == "hive" && _state != 
nullptr)
+                [[likely]] {
+            hive_parquet_use_column_names =
+                    _state->query_options().hive_parquet_use_column_names;
+        }
+
+        std::vector<std::string> place_holder;
+        init_status = parquet_reader->init_reader(
+                _file_col_names, place_holder, _colname_to_value_range,
+                _push_down_conjuncts, _real_tuple_desc, 
_default_val_row_desc.get(),
+                _col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
+                &_slot_id_to_filter_conjuncts, true, 
hive_parquet_use_column_names);
+        _cur_reader = std::move(parquet_reader);
+    }
+    return init_status;
+}
+
+
+Status VFileScanner::_init_orc_reader(std::unique_ptr<OrcReader>&& orc_reader) 
{
+    const TFileRangeDesc& range = _current_range;
+    Status init_status  = Status::OK();
+
+    if (range.__isset.table_format_params &&
+        range.table_format_params.table_format_type == "transactional_hive") {
+        std::unique_ptr<TransactionalHiveReader> tran_orc_reader =
+                TransactionalHiveReader::create_unique(std::move(orc_reader), 
_profile,
+                                                       _state, *_params, range,
+                                                       _io_ctx.get());
+        init_status = tran_orc_reader->init_reader(
+                _file_col_names, _colname_to_value_range, _push_down_conjuncts,
+                _real_tuple_desc, _default_val_row_desc.get(),
+                &_not_single_slot_filter_conjuncts, 
&_slot_id_to_filter_conjuncts);
+        RETURN_IF_ERROR(tran_orc_reader->init_row_filters());
+        _cur_reader = std::move(tran_orc_reader);
+    } else if (range.__isset.table_format_params &&
+               range.table_format_params.table_format_type == "iceberg") {
+        std::unique_ptr<IcebergOrcReader> iceberg_reader =
+                IcebergOrcReader::create_unique(std::move(orc_reader), 
_profile, _state,
+                                                *_params, range, _kv_cache, 
_io_ctx.get());
+
+        init_status = iceberg_reader->init_reader(
+                _file_col_names, _col_id_name_map, _colname_to_value_range,
+                _push_down_conjuncts, _real_tuple_desc, 
_default_val_row_desc.get(),
+                _col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
+                &_slot_id_to_filter_conjuncts);
+        _cur_reader = std::move(iceberg_reader);
+    } else if (range.__isset.table_format_params &&
+               range.table_format_params.table_format_type == "paimon") {
+        init_status = orc_reader->init_reader(
+                &_file_col_names, _colname_to_value_range, 
_push_down_conjuncts, false,
+                _real_tuple_desc, _default_val_row_desc.get(),
+                &_not_single_slot_filter_conjuncts, 
&_slot_id_to_filter_conjuncts);
+        std::unique_ptr<PaimonOrcReader> paimon_reader = 
PaimonOrcReader::create_unique(
+                std::move(orc_reader), _profile, _state, *_params, range, 
_io_ctx.get());
+        RETURN_IF_ERROR(paimon_reader->init_row_filters());
+        _cur_reader = std::move(paimon_reader);
+    } else {
+        bool hive_orc_use_column_names = true;
+
+        if (range.__isset.table_format_params &&
+            range.table_format_params.table_format_type == "hive" && _state != 
nullptr)
+                [[likely]] {
+            hive_orc_use_column_names = 
_state->query_options().hive_orc_use_column_names;
+        }
+        init_status = orc_reader->init_reader(
+                &_file_col_names, _colname_to_value_range, 
_push_down_conjuncts, false,
+                _real_tuple_desc, _default_val_row_desc.get(),
+                &_not_single_slot_filter_conjuncts, 
&_slot_id_to_filter_conjuncts,
+                hive_orc_use_column_names);
+        _cur_reader = std::move(orc_reader);
+    }
+
+    return init_status;
+}
+
+Status VFileScanner::_set_fill_or_truncate_columns(bool 
need_to_get_parsed_schema) {
+    _name_to_col_type.clear();
+    _missing_cols.clear();
+    RETURN_IF_ERROR(_cur_reader->get_columns(&_name_to_col_type, 
&_missing_cols));
+    RETURN_IF_ERROR(_generate_missing_columns());
+    RETURN_IF_ERROR(_cur_reader->set_fill_columns(_partition_col_descs, 
_missing_col_descs));
+    if (VLOG_NOTICE_IS_ON && !_missing_cols.empty() && _is_load) {
+        fmt::memory_buffer col_buf;
+        for (auto& col : _missing_cols) {
+            fmt::format_to(col_buf, " {}", col);
+        }
+        VLOG_NOTICE << fmt::format("Unknown columns:{} in file {}", 
fmt::to_string(col_buf),
+                                   _current_range.path);
+    }
+
+    RETURN_IF_ERROR(_generate_truncate_columns(need_to_get_parsed_schema));
+    return Status::OK();
+}
+
+Status VFileScanner::_generate_truncate_columns(bool 
need_to_get_parsed_schema) {
+    _source_file_col_names.clear();
+    _source_file_col_types.clear();
+    _source_file_col_name_types.clear();
+    if (_state->query_options().truncate_char_or_varchar_columns && 
need_to_get_parsed_schema) {
+        Status status = _cur_reader->get_parsed_schema(&_source_file_col_names,
+                                                       
&_source_file_col_types);
+        if (!status.ok() && status.code() != 
TStatusCode::NOT_IMPLEMENTED_ERROR) {
+            return status;
+        }
+        DCHECK(_source_file_col_names.size() == _source_file_col_types.size());
+        for (int i = 0; i < _source_file_col_names.size(); ++i) {
+            _source_file_col_name_types[_source_file_col_names[i]] = 
&_source_file_col_types[i];
+        }
+    }
+    return Status::OK();
+}
+
+
+Status VFileScanner::read_one_line_from_current_range(segment_v2::rowid_t 
rowid,  Block* result_block,
+                                                      ExternalFileMappingInfo 
external_info) {
+    const TFileRangeDesc& range = _current_range;
+
+    RETURN_IF_ERROR(_init_io_ctx());

Review Comment:
   some init work maybe only do once time, we better reuse the vfile_scanner 
the struct



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to