morningman commented on code in PR #45966: URL: https://github.com/apache/doris/pull/45966#discussion_r1955522599
########## be/src/vec/exec/format/orc/vorc_reader.cpp: ########## @@ -2660,45 +2685,112 @@ MutableColumnPtr OrcReader::_convert_dict_column_to_string_column( void ORCFileInputStream::beforeReadStripe( std::unique_ptr<orc::StripeInformation> current_strip_information, - std::vector<bool> selected_columns) { + std::vector<bool> selected_columns, + std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>& streams) { if (_is_all_tiny_stripes) { return; } if (_file_reader != nullptr) { _file_reader->collect_profile_before_close(); } - // Generate prefetch ranges, build stripe file reader. + for (const auto& stripe_stream : _stripe_streams) { + if (stripe_stream != nullptr) { + stripe_stream->collect_profile_before_close(); + } + } + _stripe_streams.clear(); + uint64_t offset = current_strip_information->getOffset(); - std::vector<io::PrefetchRange> prefetch_ranges; - size_t total_io_size = 0; + std::unordered_map<orc::StreamId, io::PrefetchRange> prefetch_ranges; for (uint64_t stream_id = 0; stream_id < current_strip_information->getNumberOfStreams(); ++stream_id) { std::unique_ptr<orc::StreamInformation> stream = current_strip_information->getStreamInformation(stream_id); uint32_t columnId = stream->getColumnId(); uint64_t length = stream->getLength(); if (selected_columns[columnId]) { - total_io_size += length; doris::io::PrefetchRange prefetch_range = {offset, offset + length}; - prefetch_ranges.emplace_back(std::move(prefetch_range)); + orc::StreamId streamId(stream->getColumnId(), stream->getKind()); + prefetch_ranges.emplace(std::move(streamId), std::move(prefetch_range)); } offset += length; } - size_t num_columns = std::count_if(selected_columns.begin(), selected_columns.end(), - [](bool selected) { return selected; }); - if (total_io_size / num_columns < io::MergeRangeFileReader::SMALL_IO) { - // The underlying page reader will prefetch data in column. - _file_reader.reset(new io::MergeRangeFileReader(_profile, _inner_reader, prefetch_ranges)); - } else { - _file_reader = _inner_reader; + _build_input_stripe_streams(prefetch_ranges, streams); +} + +void ORCFileInputStream::_build_input_stripe_streams( + const std::unordered_map<orc::StreamId, io::PrefetchRange>& ranges, + std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>& streams) { + if (ranges.empty()) { + return; + } + + std::unordered_map<orc::StreamId, io::PrefetchRange> small_ranges; + std::unordered_map<orc::StreamId, io::PrefetchRange> large_ranges; + + for (const auto& range : ranges) { + if (range.second.end_offset - range.second.start_offset <= _orc_once_max_read_bytes) { + small_ranges.emplace(range.first, range.second); + } else { + large_ranges.emplace(range.first, range.second); + } + } + + _build_small_ranges_input_stripe_streams(small_ranges, streams); + _build_large_ranges_input_stripe_streams(large_ranges, streams); +} + +void ORCFileInputStream::_build_small_ranges_input_stripe_streams( Review Comment: we can opt this logic by sorting the ranges first, so no need to traverse all origin ranges each time: ``` void ORCFileInputStream::_build_small_ranges_input_stripe_streams( const std::unordered_map<orc::StreamId, io::PrefetchRange>& ranges, std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>& streams) { // Sort ranges by start_offset for efficient searching std::vector<std::pair<orc::StreamId, io::PrefetchRange>> sorted_ranges(ranges.begin(), ranges.end()); std::sort(sorted_ranges.begin(), sorted_ranges.end(), [](const auto& a, const auto& b) { return a.second.start_offset < b.second.start_offset; }); for (const auto& merged_range : merged_ranges) { auto merge_range_file_reader = std::make_shared<OrcMergeRangeFileReader>(_profile, _file_reader, merged_range); // Use binary search to find the starting point in sorted_ranges auto it = std::lower_bound(sorted_ranges.begin(), sorted_ranges.end(), merged_range.start_offset, [](const auto& pair, uint64_t offset) { return pair.second.start_offset < offset; }); // Iterate from the found starting point for (; it != sorted_ranges.end() && it->second.start_offset < merged_range.end_offset; ++it) { if (it->second.end_offset <= merged_range.end_offset) { auto stripe_stream_input_stream = std::make_shared<StripeStreamInputStream>( getName(), merge_range_file_reader, _statistics, _io_ctx, _profile); streams.emplace(it->first, stripe_stream_input_stream); _stripe_streams.emplace_back(stripe_stream_input_stream); } } } } ``` ########## be/src/vec/exec/format/orc/vorc_reader.cpp: ########## @@ -2660,45 +2685,112 @@ MutableColumnPtr OrcReader::_convert_dict_column_to_string_column( void ORCFileInputStream::beforeReadStripe( std::unique_ptr<orc::StripeInformation> current_strip_information, - std::vector<bool> selected_columns) { + std::vector<bool> selected_columns, Review Comment: ```suggestion const std::vector<bool>& selected_columns, ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org