This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d0beec01d [fix](orc) fix heap-use-after-free and potential memory 
leak of orc reader (#17431)
3d0beec01d is described below

commit 3d0beec01d7a12ea0a8a16bcfba07344be5cb19c
Author: Mingyu Chen <morning...@163.com>
AuthorDate: Mon Mar 6 08:42:35 2023 +0800

    [fix](orc) fix heap-use-after-free and potential memory leak of orc reader 
(#17431)
    
    fix heap-use-after-free
    The OrcReader has a internal FileInputStream, If the file is empty, the 
memory of FileInputStream will leak.
    Besides, there is a Statistics instance in FileInputStream. FileInputStream 
maybe delete if the orc reader
    is inited failed, but Statistics maybe used when orc reader is closed, 
causing heap-use-after-free error.
    
    Potential memory leak
    When init file scanner in file scan node, the file scanner prepare failed, 
the memory of file scanner will leak.
---
 be/src/common/signal_handler.h              |  2 +-
 be/src/vec/exec/format/orc/vorc_reader.cpp  | 89 ++++++++++++-----------------
 be/src/vec/exec/format/orc/vorc_reader.h    | 73 +++++++++++------------
 be/src/vec/exec/scan/new_file_scan_node.cpp | 16 ++----
 be/src/vec/exec/scan/new_file_scan_node.h   |  2 -
 be/src/vec/exec/scan/vmeta_scan_node.cpp    |  2 +-
 6 files changed, 83 insertions(+), 101 deletions(-)

diff --git a/be/src/common/signal_handler.h b/be/src/common/signal_handler.h
index e88829441d..52cc797140 100644
--- a/be/src/common/signal_handler.h
+++ b/be/src/common/signal_handler.h
@@ -300,7 +300,7 @@ void DumpSignalInfo(int signal_number, siginfo_t* siginfo) {
     if (reason != nullptr) {
         formatter.AppendString(reason);
     } else {
-        formatter.AppendString("unkown detail explain");
+        formatter.AppendString("unknown detail explain");
     }
     formatter.AppendString(" (@0x");
     formatter.AppendUint64(reinterpret_cast<uintptr_t>(siginfo->si_addr), 16);
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp 
b/be/src/vec/exec/format/orc/vorc_reader.cpp
index 5ff453df06..6df70e0f16 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -48,9 +48,9 @@ namespace doris::vectorized {
     M(TypeIndex::Float64, Float64, orc::DoubleVectorBatch)
 
 void ORCFileInputStream::read(void* buf, uint64_t length, uint64_t offset) {
-    _statistics.read_calls++;
-    _statistics.read_bytes += length;
-    SCOPED_RAW_TIMER(&_statistics.read_time);
+    _statistics->fs_read_calls++;
+    _statistics->fs_read_bytes += length;
+    SCOPED_RAW_TIMER(&_statistics->fs_read_time);
     uint64_t has_read = 0;
     char* out = reinterpret_cast<char*>(buf);
     IOContext io_ctx;
@@ -113,23 +113,28 @@ OrcReader::~OrcReader() {
 
 void OrcReader::close() {
     if (!_closed) {
-        if (_profile != nullptr) {
-            if (_file_reader != nullptr) {
-                auto& fst = _file_reader->statistics();
-                COUNTER_UPDATE(_orc_profile.read_time, fst.read_time);
-                COUNTER_UPDATE(_orc_profile.read_calls, fst.read_calls);
-                COUNTER_UPDATE(_orc_profile.read_bytes, fst.read_bytes);
-            }
-            COUNTER_UPDATE(_orc_profile.column_read_time, 
_statistics.column_read_time);
-            COUNTER_UPDATE(_orc_profile.get_batch_time, 
_statistics.get_batch_time);
-            COUNTER_UPDATE(_orc_profile.parse_meta_time, 
_statistics.parse_meta_time);
-            COUNTER_UPDATE(_orc_profile.decode_value_time, 
_statistics.decode_value_time);
-            COUNTER_UPDATE(_orc_profile.decode_null_map_time, 
_statistics.decode_null_map_time);
-        }
+        _collect_profile_on_close();
         _closed = true;
     }
 }
 
+void OrcReader::_collect_profile_on_close() {
+    if (_profile != nullptr) {
+        COUNTER_UPDATE(_orc_profile.read_time, _statistics.fs_read_time);
+        COUNTER_UPDATE(_orc_profile.read_calls, _statistics.fs_read_calls);
+        COUNTER_UPDATE(_orc_profile.read_bytes, _statistics.fs_read_bytes);
+        COUNTER_UPDATE(_orc_profile.column_read_time, 
_statistics.column_read_time);
+        COUNTER_UPDATE(_orc_profile.get_batch_time, 
_statistics.get_batch_time);
+        COUNTER_UPDATE(_orc_profile.parse_meta_time, 
_statistics.parse_meta_time);
+        COUNTER_UPDATE(_orc_profile.decode_value_time, 
_statistics.decode_value_time);
+        COUNTER_UPDATE(_orc_profile.decode_null_map_time, 
_statistics.decode_null_map_time);
+    }
+}
+
+int64_t OrcReader::size() const {
+    return _file_input_stream->getLength();
+}
+
 void OrcReader::_init_profile() {
     if (_profile != nullptr) {
         static const char* orc_profile = "OrcReader";
@@ -146,33 +151,33 @@ void OrcReader::_init_profile() {
     }
 }
 
-Status OrcReader::init_reader(
-        std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range) {
-    SCOPED_RAW_TIMER(&_statistics.parse_meta_time);
-    if (_file_reader == nullptr) {
+Status OrcReader::_create_file_reader() {
+    if (_file_input_stream == nullptr) {
         io::FileReaderSPtr inner_reader;
-
         RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, 
_system_properties,
                                                         _file_description, 
&_file_system,
                                                         &inner_reader, 
_io_ctx));
-
-        _file_reader = new ORCFileInputStream(_scan_range.path, inner_reader);
+        _file_input_stream.reset(
+                new ORCFileInputStream(_scan_range.path, inner_reader, 
&_statistics));
     }
-    if (_file_reader->getLength() == 0) {
-        return Status::EndOfFile("init reader failed, empty orc file: " + 
_scan_range.path);
+    if (_file_input_stream->getLength() == 0) {
+        return Status::EndOfFile("empty orc file: " + _scan_range.path);
     }
-
     // create orc reader
     try {
         orc::ReaderOptions options;
-        _reader = 
orc::createReader(std::unique_ptr<ORCFileInputStream>(_file_reader), options);
+        _reader = orc::createReader(
+                
std::unique_ptr<ORCFileInputStream>(_file_input_stream.release()), options);
     } catch (std::exception& e) {
         return Status::InternalError("Init OrcReader failed. reason = {}", 
e.what());
     }
-    if (_reader->getNumberOfRows() == 0) {
-        return Status::EndOfFile("init reader failed, empty orc file with row 
num 0: " +
-                                 _scan_range.path);
-    }
+    return Status::OK();
+}
+
+Status OrcReader::init_reader(
+        std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range) {
+    SCOPED_RAW_TIMER(&_statistics.parse_meta_time);
+    RETURN_IF_ERROR(_create_file_reader());
     // _init_bloom_filter(colname_to_value_range);
 
     // create orc row reader
@@ -206,27 +211,7 @@ Status OrcReader::init_reader(
 
 Status OrcReader::get_parsed_schema(std::vector<std::string>* col_names,
                                     std::vector<TypeDescriptor>* col_types) {
-    if (_file_reader == nullptr) {
-        io::FileReaderSPtr inner_reader;
-
-        RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, 
_system_properties,
-                                                        _file_description, 
&_file_system,
-                                                        &inner_reader, 
_io_ctx));
-
-        _file_reader = new ORCFileInputStream(_scan_range.path, inner_reader);
-    }
-    if (_file_reader->getLength() == 0) {
-        return Status::EndOfFile("get parsed schema fail, empty orc file: " + 
_scan_range.path);
-    }
-
-    // create orc reader
-    try {
-        orc::ReaderOptions options;
-        _reader = 
orc::createReader(std::unique_ptr<ORCFileInputStream>(_file_reader), options);
-    } catch (std::exception& e) {
-        return Status::InternalError("Init OrcReader failed. reason = {}", 
e.what());
-    }
-
+    RETURN_IF_ERROR(_create_file_reader());
     auto& root_type = _reader->getType();
     for (int i = 0; i < root_type.getSubtypeCount(); ++i) {
         col_names->emplace_back(_get_field_name_lower_case(&root_type, i));
diff --git a/be/src/vec/exec/format/orc/vorc_reader.h 
b/be/src/vec/exec/format/orc/vorc_reader.h
index 4df04357fb..741addc345 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.h
+++ b/be/src/vec/exec/format/orc/vorc_reader.h
@@ -31,38 +31,13 @@
 
 namespace doris::vectorized {
 
-class ORCFileInputStream : public orc::InputStream {
-public:
-    struct Statistics {
-        int64_t read_time = 0;
-        int64_t read_calls = 0;
-        int64_t read_bytes = 0;
-    };
-
-    ORCFileInputStream(const std::string& file_name, io::FileReaderSPtr 
file_reader)
-            : _file_name(file_name), _file_reader(file_reader) {}
-
-    ~ORCFileInputStream() override = default;
-
-    uint64_t getLength() const override { return _file_reader->size(); }
-
-    uint64_t getNaturalReadSize() const override { return 
config::orc_natural_read_size_mb << 20; }
-
-    void read(void* buf, uint64_t length, uint64_t offset) override;
-
-    const std::string& getName() const override { return _file_name; }
-
-    Statistics& statistics() { return _statistics; }
-
-private:
-    Statistics _statistics;
-    const std::string& _file_name;
-    io::FileReaderSPtr _file_reader;
-};
-
+class ORCFileInputStream;
 class OrcReader : public GenericReader {
 public:
     struct Statistics {
+        int64_t fs_read_time = 0;
+        int64_t fs_read_calls = 0;
+        int64_t fs_read_bytes = 0;
         int64_t column_read_time = 0;
         int64_t get_batch_time = 0;
         int64_t parse_meta_time = 0;
@@ -79,10 +54,6 @@ public:
               IOContext* io_ctx);
 
     ~OrcReader() override;
-    // for test
-    void set_file_reader(const std::string& file_name, io::FileReaderSPtr 
file_reader) {
-        _file_reader = new ORCFileInputStream(file_name, file_reader);
-    }
 
     Status init_reader(
             std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range);
@@ -91,7 +62,7 @@ public:
 
     void close();
 
-    int64_t size() const { return _file_reader->getLength(); }
+    int64_t size() const;
 
     std::unordered_map<std::string, TypeDescriptor> get_name_to_type() 
override;
     Status get_columns(std::unordered_map<std::string, TypeDescriptor>* 
name_to_type,
@@ -111,6 +82,12 @@ private:
         RuntimeProfile::Counter* decode_value_time;
         RuntimeProfile::Counter* decode_null_map_time;
     };
+
+    // Create inner orc file,
+    // return EOF if file is empty
+    // return EROOR if encounter error.
+    Status _create_file_reader();
+
     void _init_profile();
     Status _init_read_columns();
     TypeDescriptor _convert_to_doris_type(const orc::Type* orc_type);
@@ -261,6 +238,9 @@ private:
 
     std::string _get_field_name_lower_case(const orc::Type* orc_type, int pos);
 
+    void _collect_profile_on_close();
+
+private:
     RuntimeProfile* _profile;
     const TFileScanRangeParams& _scan_params;
     const TFileRangeDesc& _scan_range;
@@ -284,7 +264,7 @@ private:
     // Flag for hive engine. True if the external table engine is Hive.
     bool _is_hive = false;
     std::vector<const orc::Type*> _col_orc_type;
-    ORCFileInputStream* _file_reader = nullptr;
+    std::unique_ptr<ORCFileInputStream> _file_input_stream;
     Statistics _statistics;
     OrcProfile _orc_profile;
     bool _closed = false;
@@ -303,4 +283,27 @@ private:
     DecimalScaleParams _decimal_scale_params;
 };
 
+class ORCFileInputStream : public orc::InputStream {
+public:
+    ORCFileInputStream(const std::string& file_name, io::FileReaderSPtr 
file_reader,
+                       OrcReader::Statistics* statistics)
+            : _file_name(file_name), _file_reader(file_reader), 
_statistics(statistics) {}
+
+    ~ORCFileInputStream() override = default;
+
+    uint64_t getLength() const override { return _file_reader->size(); }
+
+    uint64_t getNaturalReadSize() const override { return 
config::orc_natural_read_size_mb << 20; }
+
+    void read(void* buf, uint64_t length, uint64_t offset) override;
+
+    const std::string& getName() const override { return _file_name; }
+
+private:
+    const std::string& _file_name;
+    io::FileReaderSPtr _file_reader;
+    // Owned by OrcReader
+    OrcReader::Statistics* _statistics;
+};
+
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/new_file_scan_node.cpp 
b/be/src/vec/exec/scan/new_file_scan_node.cpp
index a8ce6422e1..50b54f1691 100644
--- a/be/src/vec/exec/scan/new_file_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_file_scan_node.cpp
@@ -88,20 +88,16 @@ Status 
NewFileScanNode::_init_scanners(std::list<VScanner*>* scanners) {
     }
 
     for (auto& scan_range : _scan_ranges) {
-        VScanner* scanner =
-                
(VScanner*)_create_scanner(scan_range.scan_range.ext_scan_range.file_scan_range);
+        VScanner* scanner = new VFileScanner(_state, this, _limit_per_scanner,
+                                             
scan_range.scan_range.ext_scan_range.file_scan_range,
+                                             runtime_profile(), _kv_cache);
+        _scanner_pool.add(scanner);
+        RETURN_IF_ERROR(((VFileScanner*)scanner)
+                                ->prepare(_vconjunct_ctx_ptr.get(), 
&_colname_to_value_range));
         scanners->push_back(scanner);
     }
 
     return Status::OK();
 }
 
-VScanner* NewFileScanNode::_create_scanner(const TFileScanRange& scan_range) {
-    VScanner* scanner = new VFileScanner(_state, this, _limit_per_scanner, 
scan_range,
-                                         runtime_profile(), _kv_cache);
-    ((VFileScanner*)scanner)->prepare(_vconjunct_ctx_ptr.get(), 
&_colname_to_value_range);
-    _scanner_pool.add(scanner);
-    return scanner;
-}
-
 }; // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/new_file_scan_node.h 
b/be/src/vec/exec/scan/new_file_scan_node.h
index 8ece99a961..f45ba9ddf9 100644
--- a/be/src/vec/exec/scan/new_file_scan_node.h
+++ b/be/src/vec/exec/scan/new_file_scan_node.h
@@ -38,8 +38,6 @@ protected:
     Status _init_scanners(std::list<VScanner*>* scanners) override;
 
 private:
-    VScanner* _create_scanner(const TFileScanRange& scan_range);
-
     std::vector<TScanRangeParams> _scan_ranges;
     KVCache<std::string> _kv_cache;
 };
diff --git a/be/src/vec/exec/scan/vmeta_scan_node.cpp 
b/be/src/vec/exec/scan/vmeta_scan_node.cpp
index a404d9929b..6a9e86636c 100644
--- a/be/src/vec/exec/scan/vmeta_scan_node.cpp
+++ b/be/src/vec/exec/scan/vmeta_scan_node.cpp
@@ -54,8 +54,8 @@ Status VMetaScanNode::_init_scanners(std::list<VScanner*>* 
scanners) {
     for (auto& scan_range : _scan_ranges) {
         VMetaScanner* scanner = new VMetaScanner(_state, this, _tuple_id, 
scan_range,
                                                  _limit_per_scanner, 
runtime_profile());
-        RETURN_IF_ERROR(scanner->prepare(_state, _vconjunct_ctx_ptr.get()));
         _scanner_pool.add(scanner);
+        RETURN_IF_ERROR(scanner->prepare(_state, _vconjunct_ctx_ptr.get()));
         scanners->push_back(static_cast<VScanner*>(scanner));
     }
     return Status::OK();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to