This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 4bc65aa921 [fix](load) PrefetchBufferedReader Crashing caused updating counter with an invalid runtime profile (#22464) 4bc65aa921 is described below commit 4bc65aa921399fba784ded9e54efe62e6a8ed69b Author: Jerry Hu <mrh...@gmail.com> AuthorDate: Wed Aug 2 18:19:48 2023 +0800 [fix](load) PrefetchBufferedReader Crashing caused updating counter with an invalid runtime profile (#22464) --- be/src/io/fs/buffered_reader.cpp | 17 ++++++++++++++--- be/src/io/fs/buffered_reader.h | 2 ++ be/src/io/fs/stream_load_pipe.h | 4 +++- be/src/vec/exec/format/csv/csv_reader.cpp | 10 ++++++++++ be/src/vec/exec/format/csv/csv_reader.h | 2 ++ be/src/vec/exec/format/generic_reader.h | 2 ++ be/src/vec/exec/format/parquet/vparquet_reader.cpp | 6 +++++- be/src/vec/exec/format/parquet/vparquet_reader.h | 3 ++- be/src/vec/exec/scan/vfile_scanner.cpp | 7 +++++++ 9 files changed, 47 insertions(+), 6 deletions(-) diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp index be64439128..726f5331c9 100644 --- a/be/src/io/fs/buffered_reader.cpp +++ b/be/src/io/fs/buffered_reader.cpp @@ -624,8 +624,11 @@ PrefetchBufferedReader::PrefetchBufferedReader(RuntimeProfile* profile, io::File } PrefetchBufferedReader::~PrefetchBufferedReader() { - close(); - _closed = true; + /// set `_sync_profile` to nullptr to avoid updating counter after the runtime profile has been released. + std::for_each(_pre_buffers.begin(), _pre_buffers.end(), + [](std::shared_ptr<PrefetchBuffer>& buffer) { buffer->_sync_profile = nullptr; }); + /// Better not to call virtual functions in a destructor. + _close_internal(); } Status PrefetchBufferedReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read, @@ -654,6 +657,10 @@ Status PrefetchBufferedReader::read_at_impl(size_t offset, Slice result, size_t* } Status PrefetchBufferedReader::close() { + return _close_internal(); +} + +Status PrefetchBufferedReader::_close_internal() { if (!_closed) { _closed = true; std::for_each(_pre_buffers.begin(), _pre_buffers.end(), @@ -669,10 +676,14 @@ InMemoryFileReader::InMemoryFileReader(io::FileReaderSPtr reader) : _reader(std: } InMemoryFileReader::~InMemoryFileReader() { - close(); + _close_internal(); } Status InMemoryFileReader::close() { + return _close_internal(); +} + +Status InMemoryFileReader::_close_internal() { if (!_closed) { _closed = true; return _reader->close(); diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h index b0728e6af1..34e1ff34fe 100644 --- a/be/src/io/fs/buffered_reader.h +++ b/be/src/io/fs/buffered_reader.h @@ -383,6 +383,7 @@ protected: const IOContext* io_ctx) override; private: + Status _close_internal(); size_t get_buffer_pos(int64_t position) const { return (position % _whole_pre_buffer_size) / s_max_pre_buffer_size; } @@ -436,6 +437,7 @@ protected: const IOContext* io_ctx) override; private: + Status _close_internal(); io::FileReaderSPtr _reader; std::unique_ptr<char[]> _data = nullptr; size_t _size; diff --git a/be/src/io/fs/stream_load_pipe.h b/be/src/io/fs/stream_load_pipe.h index a184cf9e78..508620d5af 100644 --- a/be/src/io/fs/stream_load_pipe.h +++ b/be/src/io/fs/stream_load_pipe.h @@ -60,7 +60,9 @@ public: // called when consumer finished Status close() override { - cancel("closed"); + if (!(_finished || _cancelled)) { + cancel("closed"); + } return Status::OK(); } diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index 55a43ed4c5..43c1dde1ce 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -753,4 +753,14 @@ Status CsvReader::_parse_col_types(size_t col_nums, std::vector<TypeDescriptor>* return Status::OK(); } +void CsvReader::close() { + if (_line_reader) { + _line_reader->close(); + } + + if (_file_reader) { + _file_reader->close(); + } +} + } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/csv/csv_reader.h b/be/src/vec/exec/format/csv/csv_reader.h index da3505e09b..42178846f1 100644 --- a/be/src/vec/exec/format/csv/csv_reader.h +++ b/be/src/vec/exec/format/csv/csv_reader.h @@ -80,6 +80,8 @@ public: Status get_parsed_schema(std::vector<std::string>* col_names, std::vector<TypeDescriptor>* col_types) override; + void close() override; + private: // used for stream/broker load of csv file. Status _create_decompressor(); diff --git a/be/src/vec/exec/format/generic_reader.h b/be/src/vec/exec/format/generic_reader.h index a712fb2847..7b6f3c7b9c 100644 --- a/be/src/vec/exec/format/generic_reader.h +++ b/be/src/vec/exec/format/generic_reader.h @@ -67,6 +67,8 @@ public: return Status::OK(); } + virtual void close() {} + protected: const size_t _MIN_BATCH_SIZE = 4064; // 4094 - 32(padding) diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 9599ed70b4..ba80992a04 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -100,7 +100,7 @@ ParquetReader::ParquetReader(const TFileScanRangeParams& params, const TFileRang } ParquetReader::~ParquetReader() { - close(); + _close_internal(); } void ParquetReader::_init_profile() { @@ -162,6 +162,10 @@ void ParquetReader::_init_profile() { } void ParquetReader::close() { + _close_internal(); +} + +void ParquetReader::_close_internal() { if (!_closed) { if (_profile != nullptr) { COUNTER_UPDATE(_parquet_profile.filtered_row_groups, _statistics.filtered_row_groups); diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index 63f760abcd..0f3996db40 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -119,7 +119,7 @@ public: Status get_next_block(Block* block, size_t* read_rows, bool* eof) override; - void close(); + void close() override; RowRange get_whole_range() { return _whole_range; } @@ -182,6 +182,7 @@ private: Status _open_file(); void _init_profile(); + void _close_internal(); Status _next_row_group_reader(); RowGroupReader::PositionDeleteContext _get_position_delete_ctx( const tparquet::RowGroup& row_group, diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index ac12172221..e30d7640ed 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -558,6 +558,9 @@ Status VFileScanner::_convert_to_output_block(Block* block) { Status VFileScanner::_get_next_reader() { while (true) { + if (_cur_reader) { + _cur_reader->close(); + } _cur_reader.reset(nullptr); _src_block_init = false; if (_next_range >= _ranges.size()) { @@ -936,6 +939,10 @@ Status VFileScanner::close(RuntimeState* state) { cache_profile.update(_file_cache_statistics.get()); } + if (_cur_reader) { + _cur_reader->close(); + } + RETURN_IF_ERROR(VScanner::close(state)); return Status::OK(); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org