This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 1018657d9d [Enhancement](SparkLoad): avoid BE OOM in push task, fix #15572 (#15620) 1018657d9d is described below commit 1018657d9d7bd725386f43c60617244e44fc81eb Author: spaces-x <weixiao5...@gmail.com> AuthorDate: Thu Jan 5 10:20:32 2023 +0800 [Enhancement](SparkLoad): avoid BE OOM in push task, fix #15572 (#15620) Release memory pool held by the parquet reader when the data has been flushed by rowset writter. Co-authored-by: spaces-x <weixian...@meituan.com> --- be/src/common/config.h | 2 ++ be/src/exec/parquet_scanner.cpp | 1 + be/src/olap/push_handler.cpp | 15 +++++++++++---- be/src/olap/push_handler.h | 1 + 4 files changed, 15 insertions(+), 4 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 056ea2ab75..eb3cda8d28 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -495,6 +495,8 @@ CONF_mInt64(write_buffer_size, "209715200"); // max buffer size used in memtable for the aggregated table CONF_mInt64(memtable_max_buffer_size, "419430400"); +// write buffer size in push task for sparkload, default 1GB +CONF_mInt64(flush_size_for_sparkload, "1073741824"); // following 2 configs limit the memory consumption of load process on a Backend. // eg: memory limit to 80% of mem limit config but up to 100GB(default) diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp index 074f7d35a7..2714ffb785 100644 --- a/be/src/exec/parquet_scanner.cpp +++ b/be/src/exec/parquet_scanner.cpp @@ -66,6 +66,7 @@ Status ParquetScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bo COUNTER_UPDATE(_rows_read_counter, 1); SCOPED_TIMER(_materialize_timer); + // TODO(weixiang): check whether shallow copy is enough RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool, fill_tuple)); break; // break always } diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp index 3e8b457663..55472c6d82 100644 --- a/be/src/olap/push_handler.cpp +++ b/be/src/olap/push_handler.cpp @@ -242,13 +242,18 @@ Status PushHandler::_convert_v2(TabletSharedPtr cur_tablet, RowsetSharedPtr* cur } // 3. Init Row - uint8_t* tuple_buf = reader->mem_pool()->allocate(schema->schema_size()); - ContiguousRow row(schema.get(), tuple_buf); + std::unique_ptr<uint8_t[]> tuple_buf(new uint8_t[schema->schema_size()]); + ContiguousRow row(schema.get(), tuple_buf.get()); // 4. Read data from broker and write into cur_tablet // Convert from raw to delta VLOG_NOTICE << "start to convert etl file to delta."; while (!reader->eof()) { + if (reader->mem_pool()->mem_tracker()->consumption() > + config::flush_size_for_sparkload) { + RETURN_NOT_OK(rowset_writer->flush()); + reader->mem_pool()->free_all(); + } res = reader->next(&row); if (!res.ok()) { LOG(WARNING) << "read next row failed." @@ -824,7 +829,9 @@ Status PushBrokerReader::init(const Schema* schema, const TBrokerScanRange& t_sc } _runtime_profile = _runtime_state->runtime_profile(); _runtime_profile->set_name("PushBrokerReader"); - _mem_pool.reset(new MemPool()); + _mem_pool.reset(new MemPool(_runtime_state->scanner_mem_tracker().get())); + _tuple_buffer_pool.reset(new MemPool(_runtime_state->scanner_mem_tracker().get())); + _counter.reset(new ScannerCounter()); // init scanner @@ -856,7 +863,7 @@ Status PushBrokerReader::init(const Schema* schema, const TBrokerScanRange& t_sc } int tuple_buffer_size = _tuple_desc->byte_size(); - void* tuple_buffer = _mem_pool->allocate(tuple_buffer_size); + void* tuple_buffer = _tuple_buffer_pool->allocate(tuple_buffer_size); if (tuple_buffer == nullptr) { LOG(WARNING) << "Allocate memory for tuple failed"; return Status::Error<PUSH_INIT_ERROR>(); diff --git a/be/src/olap/push_handler.h b/be/src/olap/push_handler.h index 1d8536f85e..1fe1d42823 100644 --- a/be/src/olap/push_handler.h +++ b/be/src/olap/push_handler.h @@ -206,6 +206,7 @@ private: std::unique_ptr<RuntimeState> _runtime_state; RuntimeProfile* _runtime_profile; std::unique_ptr<MemPool> _mem_pool; + std::unique_ptr<MemPool> _tuple_buffer_pool; std::unique_ptr<ScannerCounter> _counter; std::unique_ptr<BaseScanner> _scanner; // Not used, just for placeholding --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org