This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1018657d9d [Enhancement](SparkLoad): avoid BE OOM in push task, fix 
#15572 (#15620)
1018657d9d is described below

commit 1018657d9d7bd725386f43c60617244e44fc81eb
Author: spaces-x <weixiao5...@gmail.com>
AuthorDate: Thu Jan 5 10:20:32 2023 +0800

    [Enhancement](SparkLoad): avoid BE OOM in push task, fix #15572 (#15620)
    
    Release memory pool held by the parquet reader when the data has been 
flushed by rowset writter.
    Co-authored-by: spaces-x <weixian...@meituan.com>
---
 be/src/common/config.h          |  2 ++
 be/src/exec/parquet_scanner.cpp |  1 +
 be/src/olap/push_handler.cpp    | 15 +++++++++++----
 be/src/olap/push_handler.h      |  1 +
 4 files changed, 15 insertions(+), 4 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 056ea2ab75..eb3cda8d28 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -495,6 +495,8 @@ CONF_mInt64(write_buffer_size, "209715200");
 
 // max buffer size used in memtable for the aggregated table
 CONF_mInt64(memtable_max_buffer_size, "419430400");
+// write buffer size in push task for sparkload, default 1GB
+CONF_mInt64(flush_size_for_sparkload, "1073741824");
 
 // following 2 configs limit the memory consumption of load process on a 
Backend.
 // eg: memory limit to 80% of mem limit config but up to 100GB(default)
diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp
index 074f7d35a7..2714ffb785 100644
--- a/be/src/exec/parquet_scanner.cpp
+++ b/be/src/exec/parquet_scanner.cpp
@@ -66,6 +66,7 @@ Status ParquetScanner::get_next(Tuple* tuple, MemPool* 
tuple_pool, bool* eof, bo
 
         COUNTER_UPDATE(_rows_read_counter, 1);
         SCOPED_TIMER(_materialize_timer);
+        // TODO(weixiang): check whether shallow copy is enough
         RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool, fill_tuple));
         break; // break always
     }
diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp
index 3e8b457663..55472c6d82 100644
--- a/be/src/olap/push_handler.cpp
+++ b/be/src/olap/push_handler.cpp
@@ -242,13 +242,18 @@ Status PushHandler::_convert_v2(TabletSharedPtr 
cur_tablet, RowsetSharedPtr* cur
             }
 
             // 3. Init Row
-            uint8_t* tuple_buf = 
reader->mem_pool()->allocate(schema->schema_size());
-            ContiguousRow row(schema.get(), tuple_buf);
+            std::unique_ptr<uint8_t[]> tuple_buf(new 
uint8_t[schema->schema_size()]);
+            ContiguousRow row(schema.get(), tuple_buf.get());
 
             // 4. Read data from broker and write into cur_tablet
             // Convert from raw to delta
             VLOG_NOTICE << "start to convert etl file to delta.";
             while (!reader->eof()) {
+                if (reader->mem_pool()->mem_tracker()->consumption() >
+                    config::flush_size_for_sparkload) {
+                    RETURN_NOT_OK(rowset_writer->flush());
+                    reader->mem_pool()->free_all();
+                }
                 res = reader->next(&row);
                 if (!res.ok()) {
                     LOG(WARNING) << "read next row failed."
@@ -824,7 +829,9 @@ Status PushBrokerReader::init(const Schema* schema, const 
TBrokerScanRange& t_sc
     }
     _runtime_profile = _runtime_state->runtime_profile();
     _runtime_profile->set_name("PushBrokerReader");
-    _mem_pool.reset(new MemPool());
+    _mem_pool.reset(new MemPool(_runtime_state->scanner_mem_tracker().get()));
+    _tuple_buffer_pool.reset(new 
MemPool(_runtime_state->scanner_mem_tracker().get()));
+
     _counter.reset(new ScannerCounter());
 
     // init scanner
@@ -856,7 +863,7 @@ Status PushBrokerReader::init(const Schema* schema, const 
TBrokerScanRange& t_sc
     }
 
     int tuple_buffer_size = _tuple_desc->byte_size();
-    void* tuple_buffer = _mem_pool->allocate(tuple_buffer_size);
+    void* tuple_buffer = _tuple_buffer_pool->allocate(tuple_buffer_size);
     if (tuple_buffer == nullptr) {
         LOG(WARNING) << "Allocate memory for tuple failed";
         return Status::Error<PUSH_INIT_ERROR>();
diff --git a/be/src/olap/push_handler.h b/be/src/olap/push_handler.h
index 1d8536f85e..1fe1d42823 100644
--- a/be/src/olap/push_handler.h
+++ b/be/src/olap/push_handler.h
@@ -206,6 +206,7 @@ private:
     std::unique_ptr<RuntimeState> _runtime_state;
     RuntimeProfile* _runtime_profile;
     std::unique_ptr<MemPool> _mem_pool;
+    std::unique_ptr<MemPool> _tuple_buffer_pool;
     std::unique_ptr<ScannerCounter> _counter;
     std::unique_ptr<BaseScanner> _scanner;
     // Not used, just for placeholding


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to