HappenLee commented on code in PR #49944: URL: https://github.com/apache/doris/pull/49944#discussion_r2041174660
########## be/src/vec/exec/scan/vfile_scanner.cpp: ########## @@ -1156,41 +1087,216 @@ Status VFileScanner::_get_next_reader() { return Status::InternalError("failed to init reader, err: {}", init_status.to_string()); } - _name_to_col_type.clear(); - _missing_cols.clear(); - RETURN_IF_ERROR(_cur_reader->get_columns(&_name_to_col_type, &_missing_cols)); _cur_reader->set_push_down_agg_type(_get_push_down_agg_type()); - RETURN_IF_ERROR(_generate_missing_columns()); - RETURN_IF_ERROR(_cur_reader->set_fill_columns(_partition_col_descs, _missing_col_descs)); - if (VLOG_NOTICE_IS_ON && !_missing_cols.empty() && _is_load) { - fmt::memory_buffer col_buf; - for (auto& col : _missing_cols) { - fmt::format_to(col_buf, " {}", col); - } - VLOG_NOTICE << fmt::format("Unknown columns:{} in file {}", fmt::to_string(col_buf), - range.path); - } - - _source_file_col_names.clear(); - _source_file_col_types.clear(); - _source_file_col_name_types.clear(); - if (_state->query_options().truncate_char_or_varchar_columns && need_to_get_parsed_schema) { - Status status = _cur_reader->get_parsed_schema(&_source_file_col_names, - &_source_file_col_types); - if (!status.ok() && status.code() != TStatusCode::NOT_IMPLEMENTED_ERROR) { - return status; - } - DCHECK(_source_file_col_names.size() == _source_file_col_types.size()); - for (int i = 0; i < _source_file_col_names.size(); ++i) { - _source_file_col_name_types[_source_file_col_names[i]] = &_source_file_col_types[i]; - } - } + RETURN_IF_ERROR(_set_fill_or_truncate_columns(need_to_get_parsed_schema)); _cur_reader_eof = false; break; } return Status::OK(); } +Status VFileScanner::_init_parquet_reader(std::unique_ptr<ParquetReader>&& parquet_reader) { + const TFileRangeDesc& range = _current_range; + Status init_status = Status::OK(); + + if (range.__isset.table_format_params && + range.table_format_params.table_format_type == "iceberg") { + std::unique_ptr<IcebergParquetReader> iceberg_reader = + IcebergParquetReader::create_unique(std::move(parquet_reader), _profile, + _state, *_params, range, _kv_cache, + _io_ctx.get()); + init_status = iceberg_reader->init_reader( + _file_col_names, _col_id_name_map, _colname_to_value_range, + _push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(), + _col_name_to_slot_id, &_not_single_slot_filter_conjuncts, + &_slot_id_to_filter_conjuncts); + _cur_reader = std::move(iceberg_reader); + } else if (range.__isset.table_format_params && + range.table_format_params.table_format_type == "paimon") { + std::vector<std::string> place_holder; + init_status = parquet_reader->init_reader( + _file_col_names, place_holder, _colname_to_value_range, + _push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(), + _col_name_to_slot_id, &_not_single_slot_filter_conjuncts, + &_slot_id_to_filter_conjuncts); + std::unique_ptr<PaimonParquetReader> paimon_reader = + PaimonParquetReader::create_unique(std::move(parquet_reader), _profile, + _state, *_params, range, _io_ctx.get()); + RETURN_IF_ERROR(paimon_reader->init_row_filters()); + _cur_reader = std::move(paimon_reader); + } else { + bool hive_parquet_use_column_names = true; + + if (range.__isset.table_format_params && + range.table_format_params.table_format_type == "hive" && _state != nullptr) + [[likely]] { + hive_parquet_use_column_names = + _state->query_options().hive_parquet_use_column_names; + } + + std::vector<std::string> place_holder; + init_status = parquet_reader->init_reader( + _file_col_names, place_holder, _colname_to_value_range, + _push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(), + _col_name_to_slot_id, &_not_single_slot_filter_conjuncts, + &_slot_id_to_filter_conjuncts, true, hive_parquet_use_column_names); + _cur_reader = std::move(parquet_reader); + } + return init_status; +} + + +Status VFileScanner::_init_orc_reader(std::unique_ptr<OrcReader>&& orc_reader) { + const TFileRangeDesc& range = _current_range; + Status init_status = Status::OK(); + + if (range.__isset.table_format_params && + range.table_format_params.table_format_type == "transactional_hive") { + std::unique_ptr<TransactionalHiveReader> tran_orc_reader = + TransactionalHiveReader::create_unique(std::move(orc_reader), _profile, + _state, *_params, range, + _io_ctx.get()); + init_status = tran_orc_reader->init_reader( + _file_col_names, _colname_to_value_range, _push_down_conjuncts, + _real_tuple_desc, _default_val_row_desc.get(), + &_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts); + RETURN_IF_ERROR(tran_orc_reader->init_row_filters()); + _cur_reader = std::move(tran_orc_reader); + } else if (range.__isset.table_format_params && + range.table_format_params.table_format_type == "iceberg") { + std::unique_ptr<IcebergOrcReader> iceberg_reader = + IcebergOrcReader::create_unique(std::move(orc_reader), _profile, _state, + *_params, range, _kv_cache, _io_ctx.get()); + + init_status = iceberg_reader->init_reader( + _file_col_names, _col_id_name_map, _colname_to_value_range, + _push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(), + _col_name_to_slot_id, &_not_single_slot_filter_conjuncts, + &_slot_id_to_filter_conjuncts); + _cur_reader = std::move(iceberg_reader); + } else if (range.__isset.table_format_params && + range.table_format_params.table_format_type == "paimon") { + init_status = orc_reader->init_reader( + &_file_col_names, _colname_to_value_range, _push_down_conjuncts, false, + _real_tuple_desc, _default_val_row_desc.get(), + &_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts); + std::unique_ptr<PaimonOrcReader> paimon_reader = PaimonOrcReader::create_unique( + std::move(orc_reader), _profile, _state, *_params, range, _io_ctx.get()); + RETURN_IF_ERROR(paimon_reader->init_row_filters()); + _cur_reader = std::move(paimon_reader); + } else { + bool hive_orc_use_column_names = true; + + if (range.__isset.table_format_params && + range.table_format_params.table_format_type == "hive" && _state != nullptr) + [[likely]] { + hive_orc_use_column_names = _state->query_options().hive_orc_use_column_names; + } + init_status = orc_reader->init_reader( + &_file_col_names, _colname_to_value_range, _push_down_conjuncts, false, + _real_tuple_desc, _default_val_row_desc.get(), + &_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts, + hive_orc_use_column_names); + _cur_reader = std::move(orc_reader); + } + + return init_status; +} + +Status VFileScanner::_set_fill_or_truncate_columns(bool need_to_get_parsed_schema) { + _name_to_col_type.clear(); + _missing_cols.clear(); + RETURN_IF_ERROR(_cur_reader->get_columns(&_name_to_col_type, &_missing_cols)); + RETURN_IF_ERROR(_generate_missing_columns()); + RETURN_IF_ERROR(_cur_reader->set_fill_columns(_partition_col_descs, _missing_col_descs)); + if (VLOG_NOTICE_IS_ON && !_missing_cols.empty() && _is_load) { + fmt::memory_buffer col_buf; + for (auto& col : _missing_cols) { + fmt::format_to(col_buf, " {}", col); + } + VLOG_NOTICE << fmt::format("Unknown columns:{} in file {}", fmt::to_string(col_buf), + _current_range.path); + } + + RETURN_IF_ERROR(_generate_truncate_columns(need_to_get_parsed_schema)); + return Status::OK(); +} + +Status VFileScanner::_generate_truncate_columns(bool need_to_get_parsed_schema) { + _source_file_col_names.clear(); + _source_file_col_types.clear(); + _source_file_col_name_types.clear(); + if (_state->query_options().truncate_char_or_varchar_columns && need_to_get_parsed_schema) { + Status status = _cur_reader->get_parsed_schema(&_source_file_col_names, + &_source_file_col_types); + if (!status.ok() && status.code() != TStatusCode::NOT_IMPLEMENTED_ERROR) { + return status; + } + DCHECK(_source_file_col_names.size() == _source_file_col_types.size()); + for (int i = 0; i < _source_file_col_names.size(); ++i) { + _source_file_col_name_types[_source_file_col_names[i]] = &_source_file_col_types[i]; + } + } + return Status::OK(); +} + + +Status VFileScanner::read_one_line_from_current_range(segment_v2::rowid_t rowid, Block* result_block, + ExternalFileMappingInfo external_info) { + const TFileRangeDesc& range = _current_range; + + RETURN_IF_ERROR(_init_io_ctx()); Review Comment: some init work maybe only do once time -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org