HappenLee commented on code in PR #49944:
URL: https://github.com/apache/doris/pull/49944#discussion_r2041174660


##########
be/src/vec/exec/scan/vfile_scanner.cpp:
##########
@@ -1156,41 +1087,216 @@ Status VFileScanner::_get_next_reader() {
             return Status::InternalError("failed to init reader, err: {}", 
init_status.to_string());
         }
 
-        _name_to_col_type.clear();
-        _missing_cols.clear();
-        RETURN_IF_ERROR(_cur_reader->get_columns(&_name_to_col_type, 
&_missing_cols));
         _cur_reader->set_push_down_agg_type(_get_push_down_agg_type());
-        RETURN_IF_ERROR(_generate_missing_columns());
-        RETURN_IF_ERROR(_cur_reader->set_fill_columns(_partition_col_descs, 
_missing_col_descs));
-        if (VLOG_NOTICE_IS_ON && !_missing_cols.empty() && _is_load) {
-            fmt::memory_buffer col_buf;
-            for (auto& col : _missing_cols) {
-                fmt::format_to(col_buf, " {}", col);
-            }
-            VLOG_NOTICE << fmt::format("Unknown columns:{} in file {}", 
fmt::to_string(col_buf),
-                                       range.path);
-        }
-
-        _source_file_col_names.clear();
-        _source_file_col_types.clear();
-        _source_file_col_name_types.clear();
-        if (_state->query_options().truncate_char_or_varchar_columns && 
need_to_get_parsed_schema) {
-            Status status = 
_cur_reader->get_parsed_schema(&_source_file_col_names,
-                                                           
&_source_file_col_types);
-            if (!status.ok() && status.code() != 
TStatusCode::NOT_IMPLEMENTED_ERROR) {
-                return status;
-            }
-            DCHECK(_source_file_col_names.size() == 
_source_file_col_types.size());
-            for (int i = 0; i < _source_file_col_names.size(); ++i) {
-                _source_file_col_name_types[_source_file_col_names[i]] = 
&_source_file_col_types[i];
-            }
-        }
+        
RETURN_IF_ERROR(_set_fill_or_truncate_columns(need_to_get_parsed_schema));
         _cur_reader_eof = false;
         break;
     }
     return Status::OK();
 }
 
+Status VFileScanner::_init_parquet_reader(std::unique_ptr<ParquetReader>&& 
parquet_reader) {
+    const TFileRangeDesc& range = _current_range;
+    Status init_status  = Status::OK();
+
+    if (range.__isset.table_format_params &&
+        range.table_format_params.table_format_type == "iceberg") {
+        std::unique_ptr<IcebergParquetReader> iceberg_reader =
+                IcebergParquetReader::create_unique(std::move(parquet_reader), 
_profile,
+                                                    _state, *_params, range, 
_kv_cache,
+                                                    _io_ctx.get());
+        init_status = iceberg_reader->init_reader(
+                _file_col_names, _col_id_name_map, _colname_to_value_range,
+                _push_down_conjuncts, _real_tuple_desc, 
_default_val_row_desc.get(),
+                _col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
+                &_slot_id_to_filter_conjuncts);
+        _cur_reader = std::move(iceberg_reader);
+    } else if (range.__isset.table_format_params &&
+               range.table_format_params.table_format_type == "paimon") {
+        std::vector<std::string> place_holder;
+        init_status = parquet_reader->init_reader(
+                _file_col_names, place_holder, _colname_to_value_range,
+                _push_down_conjuncts, _real_tuple_desc, 
_default_val_row_desc.get(),
+                _col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
+                &_slot_id_to_filter_conjuncts);
+        std::unique_ptr<PaimonParquetReader> paimon_reader =
+                PaimonParquetReader::create_unique(std::move(parquet_reader), 
_profile,
+                                                   _state, *_params, range, 
_io_ctx.get());
+        RETURN_IF_ERROR(paimon_reader->init_row_filters());
+        _cur_reader = std::move(paimon_reader);
+    } else {
+        bool hive_parquet_use_column_names = true;
+
+        if (range.__isset.table_format_params &&
+            range.table_format_params.table_format_type == "hive" && _state != 
nullptr)
+                [[likely]] {
+            hive_parquet_use_column_names =
+                    _state->query_options().hive_parquet_use_column_names;
+        }
+
+        std::vector<std::string> place_holder;
+        init_status = parquet_reader->init_reader(
+                _file_col_names, place_holder, _colname_to_value_range,
+                _push_down_conjuncts, _real_tuple_desc, 
_default_val_row_desc.get(),
+                _col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
+                &_slot_id_to_filter_conjuncts, true, 
hive_parquet_use_column_names);
+        _cur_reader = std::move(parquet_reader);
+    }
+    return init_status;
+}
+
+
+Status VFileScanner::_init_orc_reader(std::unique_ptr<OrcReader>&& orc_reader) 
{
+    const TFileRangeDesc& range = _current_range;
+    Status init_status  = Status::OK();
+
+    if (range.__isset.table_format_params &&
+        range.table_format_params.table_format_type == "transactional_hive") {
+        std::unique_ptr<TransactionalHiveReader> tran_orc_reader =
+                TransactionalHiveReader::create_unique(std::move(orc_reader), 
_profile,
+                                                       _state, *_params, range,
+                                                       _io_ctx.get());
+        init_status = tran_orc_reader->init_reader(
+                _file_col_names, _colname_to_value_range, _push_down_conjuncts,
+                _real_tuple_desc, _default_val_row_desc.get(),
+                &_not_single_slot_filter_conjuncts, 
&_slot_id_to_filter_conjuncts);
+        RETURN_IF_ERROR(tran_orc_reader->init_row_filters());
+        _cur_reader = std::move(tran_orc_reader);
+    } else if (range.__isset.table_format_params &&
+               range.table_format_params.table_format_type == "iceberg") {
+        std::unique_ptr<IcebergOrcReader> iceberg_reader =
+                IcebergOrcReader::create_unique(std::move(orc_reader), 
_profile, _state,
+                                                *_params, range, _kv_cache, 
_io_ctx.get());
+
+        init_status = iceberg_reader->init_reader(
+                _file_col_names, _col_id_name_map, _colname_to_value_range,
+                _push_down_conjuncts, _real_tuple_desc, 
_default_val_row_desc.get(),
+                _col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
+                &_slot_id_to_filter_conjuncts);
+        _cur_reader = std::move(iceberg_reader);
+    } else if (range.__isset.table_format_params &&
+               range.table_format_params.table_format_type == "paimon") {
+        init_status = orc_reader->init_reader(
+                &_file_col_names, _colname_to_value_range, 
_push_down_conjuncts, false,
+                _real_tuple_desc, _default_val_row_desc.get(),
+                &_not_single_slot_filter_conjuncts, 
&_slot_id_to_filter_conjuncts);
+        std::unique_ptr<PaimonOrcReader> paimon_reader = 
PaimonOrcReader::create_unique(
+                std::move(orc_reader), _profile, _state, *_params, range, 
_io_ctx.get());
+        RETURN_IF_ERROR(paimon_reader->init_row_filters());
+        _cur_reader = std::move(paimon_reader);
+    } else {
+        bool hive_orc_use_column_names = true;
+
+        if (range.__isset.table_format_params &&
+            range.table_format_params.table_format_type == "hive" && _state != 
nullptr)
+                [[likely]] {
+            hive_orc_use_column_names = 
_state->query_options().hive_orc_use_column_names;
+        }
+        init_status = orc_reader->init_reader(
+                &_file_col_names, _colname_to_value_range, 
_push_down_conjuncts, false,
+                _real_tuple_desc, _default_val_row_desc.get(),
+                &_not_single_slot_filter_conjuncts, 
&_slot_id_to_filter_conjuncts,
+                hive_orc_use_column_names);
+        _cur_reader = std::move(orc_reader);
+    }
+
+    return init_status;
+}
+
+Status VFileScanner::_set_fill_or_truncate_columns(bool 
need_to_get_parsed_schema) {
+    _name_to_col_type.clear();
+    _missing_cols.clear();
+    RETURN_IF_ERROR(_cur_reader->get_columns(&_name_to_col_type, 
&_missing_cols));
+    RETURN_IF_ERROR(_generate_missing_columns());
+    RETURN_IF_ERROR(_cur_reader->set_fill_columns(_partition_col_descs, 
_missing_col_descs));
+    if (VLOG_NOTICE_IS_ON && !_missing_cols.empty() && _is_load) {
+        fmt::memory_buffer col_buf;
+        for (auto& col : _missing_cols) {
+            fmt::format_to(col_buf, " {}", col);
+        }
+        VLOG_NOTICE << fmt::format("Unknown columns:{} in file {}", 
fmt::to_string(col_buf),
+                                   _current_range.path);
+    }
+
+    RETURN_IF_ERROR(_generate_truncate_columns(need_to_get_parsed_schema));
+    return Status::OK();
+}
+
+Status VFileScanner::_generate_truncate_columns(bool 
need_to_get_parsed_schema) {
+    _source_file_col_names.clear();
+    _source_file_col_types.clear();
+    _source_file_col_name_types.clear();
+    if (_state->query_options().truncate_char_or_varchar_columns && 
need_to_get_parsed_schema) {
+        Status status = _cur_reader->get_parsed_schema(&_source_file_col_names,
+                                                       
&_source_file_col_types);
+        if (!status.ok() && status.code() != 
TStatusCode::NOT_IMPLEMENTED_ERROR) {
+            return status;
+        }
+        DCHECK(_source_file_col_names.size() == _source_file_col_types.size());
+        for (int i = 0; i < _source_file_col_names.size(); ++i) {
+            _source_file_col_name_types[_source_file_col_names[i]] = 
&_source_file_col_types[i];
+        }
+    }
+    return Status::OK();
+}
+
+
+Status VFileScanner::read_one_line_from_current_range(segment_v2::rowid_t 
rowid,  Block* result_block,
+                                                      ExternalFileMappingInfo 
external_info) {
+    const TFileRangeDesc& range = _current_range;
+
+    RETURN_IF_ERROR(_init_io_ctx());

Review Comment:
   some init work maybe only do once time



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to