This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 3d0beec01d [fix](orc) fix heap-use-after-free and potential memory leak of orc reader (#17431) 3d0beec01d is described below commit 3d0beec01d7a12ea0a8a16bcfba07344be5cb19c Author: Mingyu Chen <morning...@163.com> AuthorDate: Mon Mar 6 08:42:35 2023 +0800 [fix](orc) fix heap-use-after-free and potential memory leak of orc reader (#17431) fix heap-use-after-free The OrcReader has a internal FileInputStream, If the file is empty, the memory of FileInputStream will leak. Besides, there is a Statistics instance in FileInputStream. FileInputStream maybe delete if the orc reader is inited failed, but Statistics maybe used when orc reader is closed, causing heap-use-after-free error. Potential memory leak When init file scanner in file scan node, the file scanner prepare failed, the memory of file scanner will leak. --- be/src/common/signal_handler.h | 2 +- be/src/vec/exec/format/orc/vorc_reader.cpp | 89 ++++++++++++----------------- be/src/vec/exec/format/orc/vorc_reader.h | 73 +++++++++++------------ be/src/vec/exec/scan/new_file_scan_node.cpp | 16 ++---- be/src/vec/exec/scan/new_file_scan_node.h | 2 - be/src/vec/exec/scan/vmeta_scan_node.cpp | 2 +- 6 files changed, 83 insertions(+), 101 deletions(-) diff --git a/be/src/common/signal_handler.h b/be/src/common/signal_handler.h index e88829441d..52cc797140 100644 --- a/be/src/common/signal_handler.h +++ b/be/src/common/signal_handler.h @@ -300,7 +300,7 @@ void DumpSignalInfo(int signal_number, siginfo_t* siginfo) { if (reason != nullptr) { formatter.AppendString(reason); } else { - formatter.AppendString("unkown detail explain"); + formatter.AppendString("unknown detail explain"); } formatter.AppendString(" (@0x"); formatter.AppendUint64(reinterpret_cast<uintptr_t>(siginfo->si_addr), 16); diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index 5ff453df06..6df70e0f16 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -48,9 +48,9 @@ namespace doris::vectorized { M(TypeIndex::Float64, Float64, orc::DoubleVectorBatch) void ORCFileInputStream::read(void* buf, uint64_t length, uint64_t offset) { - _statistics.read_calls++; - _statistics.read_bytes += length; - SCOPED_RAW_TIMER(&_statistics.read_time); + _statistics->fs_read_calls++; + _statistics->fs_read_bytes += length; + SCOPED_RAW_TIMER(&_statistics->fs_read_time); uint64_t has_read = 0; char* out = reinterpret_cast<char*>(buf); IOContext io_ctx; @@ -113,23 +113,28 @@ OrcReader::~OrcReader() { void OrcReader::close() { if (!_closed) { - if (_profile != nullptr) { - if (_file_reader != nullptr) { - auto& fst = _file_reader->statistics(); - COUNTER_UPDATE(_orc_profile.read_time, fst.read_time); - COUNTER_UPDATE(_orc_profile.read_calls, fst.read_calls); - COUNTER_UPDATE(_orc_profile.read_bytes, fst.read_bytes); - } - COUNTER_UPDATE(_orc_profile.column_read_time, _statistics.column_read_time); - COUNTER_UPDATE(_orc_profile.get_batch_time, _statistics.get_batch_time); - COUNTER_UPDATE(_orc_profile.parse_meta_time, _statistics.parse_meta_time); - COUNTER_UPDATE(_orc_profile.decode_value_time, _statistics.decode_value_time); - COUNTER_UPDATE(_orc_profile.decode_null_map_time, _statistics.decode_null_map_time); - } + _collect_profile_on_close(); _closed = true; } } +void OrcReader::_collect_profile_on_close() { + if (_profile != nullptr) { + COUNTER_UPDATE(_orc_profile.read_time, _statistics.fs_read_time); + COUNTER_UPDATE(_orc_profile.read_calls, _statistics.fs_read_calls); + COUNTER_UPDATE(_orc_profile.read_bytes, _statistics.fs_read_bytes); + COUNTER_UPDATE(_orc_profile.column_read_time, _statistics.column_read_time); + COUNTER_UPDATE(_orc_profile.get_batch_time, _statistics.get_batch_time); + COUNTER_UPDATE(_orc_profile.parse_meta_time, _statistics.parse_meta_time); + COUNTER_UPDATE(_orc_profile.decode_value_time, _statistics.decode_value_time); + COUNTER_UPDATE(_orc_profile.decode_null_map_time, _statistics.decode_null_map_time); + } +} + +int64_t OrcReader::size() const { + return _file_input_stream->getLength(); +} + void OrcReader::_init_profile() { if (_profile != nullptr) { static const char* orc_profile = "OrcReader"; @@ -146,33 +151,33 @@ void OrcReader::_init_profile() { } } -Status OrcReader::init_reader( - std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) { - SCOPED_RAW_TIMER(&_statistics.parse_meta_time); - if (_file_reader == nullptr) { +Status OrcReader::_create_file_reader() { + if (_file_input_stream == nullptr) { io::FileReaderSPtr inner_reader; - RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _system_properties, _file_description, &_file_system, &inner_reader, _io_ctx)); - - _file_reader = new ORCFileInputStream(_scan_range.path, inner_reader); + _file_input_stream.reset( + new ORCFileInputStream(_scan_range.path, inner_reader, &_statistics)); } - if (_file_reader->getLength() == 0) { - return Status::EndOfFile("init reader failed, empty orc file: " + _scan_range.path); + if (_file_input_stream->getLength() == 0) { + return Status::EndOfFile("empty orc file: " + _scan_range.path); } - // create orc reader try { orc::ReaderOptions options; - _reader = orc::createReader(std::unique_ptr<ORCFileInputStream>(_file_reader), options); + _reader = orc::createReader( + std::unique_ptr<ORCFileInputStream>(_file_input_stream.release()), options); } catch (std::exception& e) { return Status::InternalError("Init OrcReader failed. reason = {}", e.what()); } - if (_reader->getNumberOfRows() == 0) { - return Status::EndOfFile("init reader failed, empty orc file with row num 0: " + - _scan_range.path); - } + return Status::OK(); +} + +Status OrcReader::init_reader( + std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) { + SCOPED_RAW_TIMER(&_statistics.parse_meta_time); + RETURN_IF_ERROR(_create_file_reader()); // _init_bloom_filter(colname_to_value_range); // create orc row reader @@ -206,27 +211,7 @@ Status OrcReader::init_reader( Status OrcReader::get_parsed_schema(std::vector<std::string>* col_names, std::vector<TypeDescriptor>* col_types) { - if (_file_reader == nullptr) { - io::FileReaderSPtr inner_reader; - - RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _system_properties, - _file_description, &_file_system, - &inner_reader, _io_ctx)); - - _file_reader = new ORCFileInputStream(_scan_range.path, inner_reader); - } - if (_file_reader->getLength() == 0) { - return Status::EndOfFile("get parsed schema fail, empty orc file: " + _scan_range.path); - } - - // create orc reader - try { - orc::ReaderOptions options; - _reader = orc::createReader(std::unique_ptr<ORCFileInputStream>(_file_reader), options); - } catch (std::exception& e) { - return Status::InternalError("Init OrcReader failed. reason = {}", e.what()); - } - + RETURN_IF_ERROR(_create_file_reader()); auto& root_type = _reader->getType(); for (int i = 0; i < root_type.getSubtypeCount(); ++i) { col_names->emplace_back(_get_field_name_lower_case(&root_type, i)); diff --git a/be/src/vec/exec/format/orc/vorc_reader.h b/be/src/vec/exec/format/orc/vorc_reader.h index 4df04357fb..741addc345 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.h +++ b/be/src/vec/exec/format/orc/vorc_reader.h @@ -31,38 +31,13 @@ namespace doris::vectorized { -class ORCFileInputStream : public orc::InputStream { -public: - struct Statistics { - int64_t read_time = 0; - int64_t read_calls = 0; - int64_t read_bytes = 0; - }; - - ORCFileInputStream(const std::string& file_name, io::FileReaderSPtr file_reader) - : _file_name(file_name), _file_reader(file_reader) {} - - ~ORCFileInputStream() override = default; - - uint64_t getLength() const override { return _file_reader->size(); } - - uint64_t getNaturalReadSize() const override { return config::orc_natural_read_size_mb << 20; } - - void read(void* buf, uint64_t length, uint64_t offset) override; - - const std::string& getName() const override { return _file_name; } - - Statistics& statistics() { return _statistics; } - -private: - Statistics _statistics; - const std::string& _file_name; - io::FileReaderSPtr _file_reader; -}; - +class ORCFileInputStream; class OrcReader : public GenericReader { public: struct Statistics { + int64_t fs_read_time = 0; + int64_t fs_read_calls = 0; + int64_t fs_read_bytes = 0; int64_t column_read_time = 0; int64_t get_batch_time = 0; int64_t parse_meta_time = 0; @@ -79,10 +54,6 @@ public: IOContext* io_ctx); ~OrcReader() override; - // for test - void set_file_reader(const std::string& file_name, io::FileReaderSPtr file_reader) { - _file_reader = new ORCFileInputStream(file_name, file_reader); - } Status init_reader( std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range); @@ -91,7 +62,7 @@ public: void close(); - int64_t size() const { return _file_reader->getLength(); } + int64_t size() const; std::unordered_map<std::string, TypeDescriptor> get_name_to_type() override; Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type, @@ -111,6 +82,12 @@ private: RuntimeProfile::Counter* decode_value_time; RuntimeProfile::Counter* decode_null_map_time; }; + + // Create inner orc file, + // return EOF if file is empty + // return EROOR if encounter error. + Status _create_file_reader(); + void _init_profile(); Status _init_read_columns(); TypeDescriptor _convert_to_doris_type(const orc::Type* orc_type); @@ -261,6 +238,9 @@ private: std::string _get_field_name_lower_case(const orc::Type* orc_type, int pos); + void _collect_profile_on_close(); + +private: RuntimeProfile* _profile; const TFileScanRangeParams& _scan_params; const TFileRangeDesc& _scan_range; @@ -284,7 +264,7 @@ private: // Flag for hive engine. True if the external table engine is Hive. bool _is_hive = false; std::vector<const orc::Type*> _col_orc_type; - ORCFileInputStream* _file_reader = nullptr; + std::unique_ptr<ORCFileInputStream> _file_input_stream; Statistics _statistics; OrcProfile _orc_profile; bool _closed = false; @@ -303,4 +283,27 @@ private: DecimalScaleParams _decimal_scale_params; }; +class ORCFileInputStream : public orc::InputStream { +public: + ORCFileInputStream(const std::string& file_name, io::FileReaderSPtr file_reader, + OrcReader::Statistics* statistics) + : _file_name(file_name), _file_reader(file_reader), _statistics(statistics) {} + + ~ORCFileInputStream() override = default; + + uint64_t getLength() const override { return _file_reader->size(); } + + uint64_t getNaturalReadSize() const override { return config::orc_natural_read_size_mb << 20; } + + void read(void* buf, uint64_t length, uint64_t offset) override; + + const std::string& getName() const override { return _file_name; } + +private: + const std::string& _file_name; + io::FileReaderSPtr _file_reader; + // Owned by OrcReader + OrcReader::Statistics* _statistics; +}; + } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/new_file_scan_node.cpp b/be/src/vec/exec/scan/new_file_scan_node.cpp index a8ce6422e1..50b54f1691 100644 --- a/be/src/vec/exec/scan/new_file_scan_node.cpp +++ b/be/src/vec/exec/scan/new_file_scan_node.cpp @@ -88,20 +88,16 @@ Status NewFileScanNode::_init_scanners(std::list<VScanner*>* scanners) { } for (auto& scan_range : _scan_ranges) { - VScanner* scanner = - (VScanner*)_create_scanner(scan_range.scan_range.ext_scan_range.file_scan_range); + VScanner* scanner = new VFileScanner(_state, this, _limit_per_scanner, + scan_range.scan_range.ext_scan_range.file_scan_range, + runtime_profile(), _kv_cache); + _scanner_pool.add(scanner); + RETURN_IF_ERROR(((VFileScanner*)scanner) + ->prepare(_vconjunct_ctx_ptr.get(), &_colname_to_value_range)); scanners->push_back(scanner); } return Status::OK(); } -VScanner* NewFileScanNode::_create_scanner(const TFileScanRange& scan_range) { - VScanner* scanner = new VFileScanner(_state, this, _limit_per_scanner, scan_range, - runtime_profile(), _kv_cache); - ((VFileScanner*)scanner)->prepare(_vconjunct_ctx_ptr.get(), &_colname_to_value_range); - _scanner_pool.add(scanner); - return scanner; -} - }; // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/new_file_scan_node.h b/be/src/vec/exec/scan/new_file_scan_node.h index 8ece99a961..f45ba9ddf9 100644 --- a/be/src/vec/exec/scan/new_file_scan_node.h +++ b/be/src/vec/exec/scan/new_file_scan_node.h @@ -38,8 +38,6 @@ protected: Status _init_scanners(std::list<VScanner*>* scanners) override; private: - VScanner* _create_scanner(const TFileScanRange& scan_range); - std::vector<TScanRangeParams> _scan_ranges; KVCache<std::string> _kv_cache; }; diff --git a/be/src/vec/exec/scan/vmeta_scan_node.cpp b/be/src/vec/exec/scan/vmeta_scan_node.cpp index a404d9929b..6a9e86636c 100644 --- a/be/src/vec/exec/scan/vmeta_scan_node.cpp +++ b/be/src/vec/exec/scan/vmeta_scan_node.cpp @@ -54,8 +54,8 @@ Status VMetaScanNode::_init_scanners(std::list<VScanner*>* scanners) { for (auto& scan_range : _scan_ranges) { VMetaScanner* scanner = new VMetaScanner(_state, this, _tuple_id, scan_range, _limit_per_scanner, runtime_profile()); - RETURN_IF_ERROR(scanner->prepare(_state, _vconjunct_ctx_ptr.get())); _scanner_pool.add(scanner); + RETURN_IF_ERROR(scanner->prepare(_state, _vconjunct_ctx_ptr.get())); scanners->push_back(static_cast<VScanner*>(scanner)); } return Status::OK(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org