This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4bc65aa921 [fix](load) PrefetchBufferedReader  Crashing caused 
updating counter with an invalid runtime profile (#22464)
4bc65aa921 is described below

commit 4bc65aa921399fba784ded9e54efe62e6a8ed69b
Author: Jerry Hu <mrh...@gmail.com>
AuthorDate: Wed Aug 2 18:19:48 2023 +0800

    [fix](load) PrefetchBufferedReader  Crashing caused updating counter with 
an invalid runtime profile (#22464)
---
 be/src/io/fs/buffered_reader.cpp                   | 17 ++++++++++++++---
 be/src/io/fs/buffered_reader.h                     |  2 ++
 be/src/io/fs/stream_load_pipe.h                    |  4 +++-
 be/src/vec/exec/format/csv/csv_reader.cpp          | 10 ++++++++++
 be/src/vec/exec/format/csv/csv_reader.h            |  2 ++
 be/src/vec/exec/format/generic_reader.h            |  2 ++
 be/src/vec/exec/format/parquet/vparquet_reader.cpp |  6 +++++-
 be/src/vec/exec/format/parquet/vparquet_reader.h   |  3 ++-
 be/src/vec/exec/scan/vfile_scanner.cpp             |  7 +++++++
 9 files changed, 47 insertions(+), 6 deletions(-)

diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp
index be64439128..726f5331c9 100644
--- a/be/src/io/fs/buffered_reader.cpp
+++ b/be/src/io/fs/buffered_reader.cpp
@@ -624,8 +624,11 @@ 
PrefetchBufferedReader::PrefetchBufferedReader(RuntimeProfile* profile, io::File
 }
 
 PrefetchBufferedReader::~PrefetchBufferedReader() {
-    close();
-    _closed = true;
+    /// set `_sync_profile` to nullptr to avoid updating counter after the 
runtime profile has been released.
+    std::for_each(_pre_buffers.begin(), _pre_buffers.end(),
+                  [](std::shared_ptr<PrefetchBuffer>& buffer) { 
buffer->_sync_profile = nullptr; });
+    /// Better not to call virtual functions in a destructor.
+    _close_internal();
 }
 
 Status PrefetchBufferedReader::read_at_impl(size_t offset, Slice result, 
size_t* bytes_read,
@@ -654,6 +657,10 @@ Status PrefetchBufferedReader::read_at_impl(size_t offset, 
Slice result, size_t*
 }
 
 Status PrefetchBufferedReader::close() {
+    return _close_internal();
+}
+
+Status PrefetchBufferedReader::_close_internal() {
     if (!_closed) {
         _closed = true;
         std::for_each(_pre_buffers.begin(), _pre_buffers.end(),
@@ -669,10 +676,14 @@ InMemoryFileReader::InMemoryFileReader(io::FileReaderSPtr 
reader) : _reader(std:
 }
 
 InMemoryFileReader::~InMemoryFileReader() {
-    close();
+    _close_internal();
 }
 
 Status InMemoryFileReader::close() {
+    return _close_internal();
+}
+
+Status InMemoryFileReader::_close_internal() {
     if (!_closed) {
         _closed = true;
         return _reader->close();
diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h
index b0728e6af1..34e1ff34fe 100644
--- a/be/src/io/fs/buffered_reader.h
+++ b/be/src/io/fs/buffered_reader.h
@@ -383,6 +383,7 @@ protected:
                         const IOContext* io_ctx) override;
 
 private:
+    Status _close_internal();
     size_t get_buffer_pos(int64_t position) const {
         return (position % _whole_pre_buffer_size) / s_max_pre_buffer_size;
     }
@@ -436,6 +437,7 @@ protected:
                         const IOContext* io_ctx) override;
 
 private:
+    Status _close_internal();
     io::FileReaderSPtr _reader;
     std::unique_ptr<char[]> _data = nullptr;
     size_t _size;
diff --git a/be/src/io/fs/stream_load_pipe.h b/be/src/io/fs/stream_load_pipe.h
index a184cf9e78..508620d5af 100644
--- a/be/src/io/fs/stream_load_pipe.h
+++ b/be/src/io/fs/stream_load_pipe.h
@@ -60,7 +60,9 @@ public:
 
     // called when consumer finished
     Status close() override {
-        cancel("closed");
+        if (!(_finished || _cancelled)) {
+            cancel("closed");
+        }
         return Status::OK();
     }
 
diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp 
b/be/src/vec/exec/format/csv/csv_reader.cpp
index 55a43ed4c5..43c1dde1ce 100644
--- a/be/src/vec/exec/format/csv/csv_reader.cpp
+++ b/be/src/vec/exec/format/csv/csv_reader.cpp
@@ -753,4 +753,14 @@ Status CsvReader::_parse_col_types(size_t col_nums, 
std::vector<TypeDescriptor>*
     return Status::OK();
 }
 
+void CsvReader::close() {
+    if (_line_reader) {
+        _line_reader->close();
+    }
+
+    if (_file_reader) {
+        _file_reader->close();
+    }
+}
+
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/csv/csv_reader.h 
b/be/src/vec/exec/format/csv/csv_reader.h
index da3505e09b..42178846f1 100644
--- a/be/src/vec/exec/format/csv/csv_reader.h
+++ b/be/src/vec/exec/format/csv/csv_reader.h
@@ -80,6 +80,8 @@ public:
     Status get_parsed_schema(std::vector<std::string>* col_names,
                              std::vector<TypeDescriptor>* col_types) override;
 
+    void close() override;
+
 private:
     // used for stream/broker load of csv file.
     Status _create_decompressor();
diff --git a/be/src/vec/exec/format/generic_reader.h 
b/be/src/vec/exec/format/generic_reader.h
index a712fb2847..7b6f3c7b9c 100644
--- a/be/src/vec/exec/format/generic_reader.h
+++ b/be/src/vec/exec/format/generic_reader.h
@@ -67,6 +67,8 @@ public:
         return Status::OK();
     }
 
+    virtual void close() {}
+
 protected:
     const size_t _MIN_BATCH_SIZE = 4064; // 4094 - 32(padding)
 
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 9599ed70b4..ba80992a04 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -100,7 +100,7 @@ ParquetReader::ParquetReader(const TFileScanRangeParams& 
params, const TFileRang
 }
 
 ParquetReader::~ParquetReader() {
-    close();
+    _close_internal();
 }
 
 void ParquetReader::_init_profile() {
@@ -162,6 +162,10 @@ void ParquetReader::_init_profile() {
 }
 
 void ParquetReader::close() {
+    _close_internal();
+}
+
+void ParquetReader::_close_internal() {
     if (!_closed) {
         if (_profile != nullptr) {
             COUNTER_UPDATE(_parquet_profile.filtered_row_groups, 
_statistics.filtered_row_groups);
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_reader.h
index 63f760abcd..0f3996db40 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -119,7 +119,7 @@ public:
 
     Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
 
-    void close();
+    void close() override;
 
     RowRange get_whole_range() { return _whole_range; }
 
@@ -182,6 +182,7 @@ private:
 
     Status _open_file();
     void _init_profile();
+    void _close_internal();
     Status _next_row_group_reader();
     RowGroupReader::PositionDeleteContext _get_position_delete_ctx(
             const tparquet::RowGroup& row_group,
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp 
b/be/src/vec/exec/scan/vfile_scanner.cpp
index ac12172221..e30d7640ed 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -558,6 +558,9 @@ Status VFileScanner::_convert_to_output_block(Block* block) 
{
 
 Status VFileScanner::_get_next_reader() {
     while (true) {
+        if (_cur_reader) {
+            _cur_reader->close();
+        }
         _cur_reader.reset(nullptr);
         _src_block_init = false;
         if (_next_range >= _ranges.size()) {
@@ -936,6 +939,10 @@ Status VFileScanner::close(RuntimeState* state) {
         cache_profile.update(_file_cache_statistics.get());
     }
 
+    if (_cur_reader) {
+        _cur_reader->close();
+    }
+
     RETURN_IF_ERROR(VScanner::close(state));
     return Status::OK();
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to