TangSiyang2001 opened a new issue, #25773: URL: https://github.com/apache/doris/issues/25773
### Search before asking - [X] I had searched in the [issues](https://github.com/apache/doris/issues?q=is%3Aissue) and found no similar issues. ### Description When operating S3 Load performance test, we found that the sink throughput is unbalanced obviously. test with TPCH sf100  We found that during scanning, the unbalance had been existed.  However, the scan ranges are correctly planned. ``` I20231019 20:06:06.431310 689692 vfile_scanner.cpp:112] ==>scanrange params: TFileRangeDesc { 02: path (string) = "s3://doris-build-1308700295/performance/data/tpch_sf100/lineitem.tbl.1", 03: start_offset (i64) = 2631899122, 04: size (i64) = 2631899122, 05: file_size (i64) = 7895697364, 06: columns_from_path (list) = list<string>[0] { }, 09: modification_time (i64) = 1694075115000, } ==>scanrange params: TFileRangeDesc { 02: path (string) = "s3://doris-build-1308700295/performance/data/tpch_sf100/lineitem.tbl.1", 03: start_offset (i64) = 0, 04: size (i64) = 2631899122, 05: file_size (i64) = 7895697364, 06: columns_from_path (list) = list<string>[0] { }, 09: modification_time (i64) = 1694075115000, } ==>scanrange params: TFileRangeDesc { 02: path (string) = "s3://doris-build-1308700295/performance/data/tpch_sf100/lineitem.tbl.1", 03: start_offset (i64) = 5263798244, 04: size (i64) = 2631899120, 05: file_size (i64) = 7895697364, 06: columns_from_path (list) = list<string>[0] { }, 09: modification_time (i64) = 1694075115000, } ``` According to the profile, we found that the distinction was related to `FileScannerGetBlockTime` in this scope. ```cpp { SCOPED_TIMER(_get_block_timer); // Read next block. // Some of column in block may not be filled (column not exist in file) RETURN_IF_ERROR( _cur_reader->get_next_block(_src_block_ptr, &read_rows, &_cur_reader_eof)); } ``` Then we found some distinction about the underlying delegated file reader, which is `PrefetchBufferedReader`, from the profile.  ### Solution Actually this function was only called once in slow nodes, which indicates they occurred test fail of `off >= _file_range.end_offset`, the `_file_range.end_offset` is incorrect. ```cpp Status PrefetchBuffer::read_buffer(size_t off, const char* out, size_t buf_len, size_t* bytes_read) { if (UNLIKELY(off >= _file_range.end_offset)) { // Reader can read out of [start_offset, end_offset) by synchronous method. return _reader->read_at(off, Slice {out, buf_len}, bytes_read, _io_ctx); } if (_exceed) { reset_offset((off / _size) * _size); return read_buffer(off, out, buf_len, bytes_read); } { std::unique_lock lck {_lock}; // buffer must be prefetched or it's closed if (!_prefetched.wait_for(lck, std::chrono::milliseconds(WAIT_TIME_OUT_MS), [this]() { return _buffer_status == BufferStatus::PREFETCHED || _buffer_status == BufferStatus::CLOSED; })) { _prefetch_status = Status::TimedOut("time out when read prefetch buffer"); return _prefetch_status; } if (UNLIKELY(BufferStatus::CLOSED == _buffer_status)) { return Status::OK(); } } RETURN_IF_ERROR(_prefetch_status); // there is only parquet would do not sequence read // it would read the end of the file first if (UNLIKELY(!contains(off))) { reset_offset((off / _size) * _size); return read_buffer(off, out, buf_len, bytes_read); } if (UNLIKELY(0 == _len || _offset + _len < off)) { return Status::OK(); } // [0]: maximum len trying to read, [1] maximum length buffer can provide, [2] actual len buffer has size_t read_len = std::min({buf_len, _offset + _size - off, _offset + _len - off}); { SCOPED_RAW_TIMER(&_statis.copy_time); memcpy((void*)out, _buf.get() + (off - _offset), read_len); } *bytes_read = read_len; _statis.request_io += 1; _statis.request_bytes += read_len; if (off + *bytes_read == _offset + _len) { reset_offset(_offset + _whole_buffer_size); } return Status::OK(); } ``` Found that in CsvReader, end offset of the PrefetchRange is incorrect. ```cpp RETURN_IF_ERROR(io::DelegateReader::create_file_reader( _profile, _system_properties, _file_description, reader_options, &_file_system, &_file_reader, io::DelegateReader::AccessMode::SEQUENTIAL, _io_ctx, io::PrefetchRange(_range.start_offset, _range.size))); ``` ### Are you willing to submit PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
