This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new b52b572ade2 [branch-2.1](memory) When Load ends, check memory tracker 
value returns is equal to 0 (#40850)
b52b572ade2 is described below

commit b52b572ade243a21feb3fcda6618669f7c27355e
Author: Xinyi Zou <zouxiny...@gmail.com>
AuthorDate: Sun Sep 15 23:47:53 2024 +0800

    [branch-2.1](memory) When Load ends, check memory tracker value returns is 
equal to 0 (#40850)
    
    pick
    #38960
    #39908
    #40043
    #40092
    #40016
    #40439
    
    ---------
    
    Co-authored-by: hui lai <1353307...@qq.com>
    Co-authored-by: yiguolei <676222...@qq.com>
---
 be/src/common/config.cpp                           |  4 ++
 be/src/common/config.h                             |  3 +
 be/src/exec/data_sink.cpp                          |  8 +++
 be/src/exec/exec_node.cpp                          |  4 ++
 be/src/http/action/http_stream.cpp                 | 23 +++++--
 be/src/http/action/stream_load.cpp                 | 11 +++-
 be/src/io/file_factory.cpp                         |  9 +--
 be/src/io/fs/stream_load_pipe.cpp                  |  7 ++-
 be/src/pipeline/pipeline_fragment_context.cpp      |  4 ++
 be/src/runtime/exec_env.h                          |  4 ++
 be/src/runtime/exec_env_init.cpp                   |  2 +
 be/src/runtime/memory/mem_tracker_limiter.cpp      | 71 +++++++++++++---------
 be/src/runtime/memory/mem_tracker_limiter.h        |  2 +
 be/src/runtime/stream_load/stream_load_context.h   | 15 ++++-
 .../runtime/stream_load/stream_load_executor.cpp   |  4 ++
 be/src/runtime/thread_context.h                    | 47 ++++++++------
 be/src/service/internal_service.cpp                |  3 +-
 be/src/util/byte_buffer.h                          | 28 ++++++---
 be/src/util/faststring.h                           |  3 +-
 be/src/util/slice.h                                |  7 ++-
 be/src/vec/common/allocator.h                      | 11 ++++
 be/src/vec/sink/vtablet_block_convertor.cpp        | 11 ++--
 be/src/vec/sink/vtablet_block_convertor.h          | 13 +++-
 be/test/util/byte_buffer2_test.cpp                 |  3 +-
 24 files changed, 214 insertions(+), 83 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 543492b7155..473842a1886 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -140,6 +140,10 @@ DEFINE_mInt64(stacktrace_in_alloc_large_memory_bytes, 
"2147483648");
 
 DEFINE_mInt64(crash_in_alloc_large_memory_bytes, "-1");
 
+// default is true. if any memory tracking in Orphan mem tracker will report 
error.
+// !! not modify the default value of this conf!! otherwise memory errors 
cannot be detected in time.
+// allocator free memory not need to check, because when the thread memory 
tracker label is Orphan,
+// use the tracker saved in Allocator.
 DEFINE_mBool(enable_memory_orphan_check, "true");
 
 // The maximum time a thread waits for full GC. Currently only query will wait 
for full gc.
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 1afd10a34a7..a3d4b35dce8 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -194,6 +194,9 @@ DECLARE_mInt64(stacktrace_in_alloc_large_memory_bytes);
 DECLARE_mInt64(crash_in_alloc_large_memory_bytes);
 
 // default is true. if any memory tracking in Orphan mem tracker will report 
error.
+// !! not modify the default value of this conf!! otherwise memory errors 
cannot be detected in time.
+// allocator free memory not need to check, because when the thread memory 
tracker label is Orphan,
+// use the tracker saved in Allocator.
 DECLARE_mBool(enable_memory_orphan_check);
 
 // The maximum time a thread waits for a full GC. Currently only query will 
wait for full gc.
diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp
index dc651080298..0a05fb62e9b 100644
--- a/be/src/exec/data_sink.cpp
+++ b/be/src/exec/data_sink.cpp
@@ -181,6 +181,10 @@ Status DataSink::create_data_sink(ObjectPool* pool, const 
TDataSink& thrift_sink
     case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
         Status status = Status::OK();
         DCHECK(thrift_sink.__isset.olap_table_sink);
+#ifndef NDEBUG
+        DCHECK(state->get_query_ctx() != nullptr);
+        state->get_query_ctx()->query_mem_tracker->is_group_commit_load = true;
+#endif
         sink->reset(new vectorized::GroupCommitBlockSink(pool, row_desc, 
output_exprs, &status));
         RETURN_IF_ERROR(status);
         break;
@@ -349,6 +353,10 @@ Status DataSink::create_data_sink(ObjectPool* pool, const 
TDataSink& thrift_sink
     case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
         Status status = Status::OK();
         DCHECK(thrift_sink.__isset.olap_table_sink);
+#ifndef NDEBUG
+        DCHECK(state->get_query_ctx() != nullptr);
+        state->get_query_ctx()->query_mem_tracker->is_group_commit_load = true;
+#endif
         sink->reset(new vectorized::GroupCommitBlockSink(pool, row_desc, 
output_exprs, &status));
         RETURN_IF_ERROR(status);
         break;
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index 247ec2323d5..d0124a7141e 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -463,6 +463,10 @@ Status ExecNode::create_node(RuntimeState* state, 
ObjectPool* pool, const TPlanN
         return Status::OK();
 
     case TPlanNodeType::GROUP_COMMIT_SCAN_NODE:
+#ifndef NDEBUG
+        DCHECK(state->get_query_ctx() != nullptr);
+        state->get_query_ctx()->query_mem_tracker->is_group_commit_load = true;
+#endif
         *node = pool->add(new vectorized::GroupCommitScanNode(pool, tnode, 
descs));
         return Status::OK();
 
diff --git a/be/src/http/action/http_stream.cpp 
b/be/src/http/action/http_stream.cpp
index 7dd85653002..83ce0ce82cc 100644
--- a/be/src/http/action/http_stream.cpp
+++ b/be/src/http/action/http_stream.cpp
@@ -234,26 +234,37 @@ void HttpStreamAction::on_chunk_data(HttpRequest* req) {
     struct evhttp_request* ev_req = req->get_evhttp_request();
     auto evbuf = evhttp_request_get_input_buffer(ev_req);
 
+    SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker());
+
     int64_t start_read_data_time = MonotonicNanos();
+    Status st = ctx->allocate_schema_buffer();
+    if (!st.ok()) {
+        ctx->status = st;
+        return;
+    }
     while (evbuffer_get_length(evbuf) > 0) {
-        auto bb = ByteBuffer::allocate(128 * 1024);
+        ByteBufferPtr bb;
+        st = ByteBuffer::allocate(128 * 1024, &bb);
+        if (!st.ok()) {
+            ctx->status = st;
+            return;
+        }
         auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity);
         bb->pos = remove_bytes;
         bb->flip();
-        auto st = ctx->body_sink->append(bb);
+        st = ctx->body_sink->append(bb);
         // schema_buffer stores 1M of data for parsing column information
         // need to determine whether to cache for the first time
         if (ctx->is_read_schema) {
-            if (ctx->schema_buffer->pos + remove_bytes < 
config::stream_tvf_buffer_size) {
-                ctx->schema_buffer->put_bytes(bb->ptr, remove_bytes);
+            if (ctx->schema_buffer()->pos + remove_bytes < 
config::stream_tvf_buffer_size) {
+                ctx->schema_buffer()->put_bytes(bb->ptr, remove_bytes);
             } else {
                 LOG(INFO) << "use a portion of data to request fe to obtain 
column information";
                 ctx->is_read_schema = false;
                 ctx->status = process_put(req, ctx);
             }
         }
-
-        if (!st.ok() && !ctx->status.ok()) {
+        if (!st.ok()) {
             LOG(WARNING) << "append body content failed. errmsg=" << st << ", 
" << ctx->brief();
             ctx->status = st;
             return;
diff --git a/be/src/http/action/stream_load.cpp 
b/be/src/http/action/stream_load.cpp
index 3f32655cf14..2036043b4d4 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -339,13 +339,20 @@ void StreamLoadAction::on_chunk_data(HttpRequest* req) {
     struct evhttp_request* ev_req = req->get_evhttp_request();
     auto evbuf = evhttp_request_get_input_buffer(ev_req);
 
+    SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker());
+
     int64_t start_read_data_time = MonotonicNanos();
     while (evbuffer_get_length(evbuf) > 0) {
-        auto bb = ByteBuffer::allocate(128 * 1024);
+        ByteBufferPtr bb;
+        Status st = ByteBuffer::allocate(128 * 1024, &bb);
+        if (!st.ok()) {
+            ctx->status = st;
+            return;
+        }
         auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity);
         bb->pos = remove_bytes;
         bb->flip();
-        auto st = ctx->body_sink->append(bb);
+        st = ctx->body_sink->append(bb);
         if (!st.ok()) {
             LOG(WARNING) << "append body content failed. errmsg=" << st << ", 
" << ctx->brief();
             ctx->status = st;
diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp
index 4d6158f8f7e..95d53732088 100644
--- a/be/src/io/file_factory.cpp
+++ b/be/src/io/file_factory.cpp
@@ -161,13 +161,14 @@ Status FileFactory::create_pipe_reader(const TUniqueId& 
load_id, io::FileReaderS
     if (!stream_load_ctx) {
         return Status::InternalError("unknown stream load id: {}", 
UniqueId(load_id).to_string());
     }
-    if (need_schema == true) {
+    if (need_schema) {
+        RETURN_IF_ERROR(stream_load_ctx->allocate_schema_buffer());
         // Here, a portion of the data is processed to parse column information
         auto pipe = std::make_shared<io::StreamLoadPipe>(
                 io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 
/* min_chunk_size */,
-                stream_load_ctx->schema_buffer->pos /* total_length */);
-        stream_load_ctx->schema_buffer->flip();
-        RETURN_IF_ERROR(pipe->append(stream_load_ctx->schema_buffer));
+                stream_load_ctx->schema_buffer()->pos /* total_length */);
+        stream_load_ctx->schema_buffer()->flip();
+        RETURN_IF_ERROR(pipe->append(stream_load_ctx->schema_buffer()));
         RETURN_IF_ERROR(pipe->finish());
         *file_reader = std::move(pipe);
     } else {
diff --git a/be/src/io/fs/stream_load_pipe.cpp 
b/be/src/io/fs/stream_load_pipe.cpp
index ecce306bdf1..392125e6fc0 100644
--- a/be/src/io/fs/stream_load_pipe.cpp
+++ b/be/src/io/fs/stream_load_pipe.cpp
@@ -111,7 +111,9 @@ Status 
StreamLoadPipe::read_one_message(std::unique_ptr<uint8_t[]>* data, size_t
 }
 
 Status StreamLoadPipe::append_and_flush(const char* data, size_t size, size_t 
proto_byte_size) {
-    ByteBufferPtr buf = ByteBuffer::allocate(BitUtil::RoundUpToPowerOfTwo(size 
+ 1));
+    SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker());
+    ByteBufferPtr buf;
+    RETURN_IF_ERROR(ByteBuffer::allocate(128 * 1024, &buf));
     buf->put_bytes(data, size);
     buf->flip();
     return _append(buf, proto_byte_size);
@@ -145,7 +147,8 @@ Status StreamLoadPipe::append(const char* data, size_t 
size) {
     // need to allocate a new chunk, min chunk is 64k
     size_t chunk_size = std::max(_min_chunk_size, size - pos);
     chunk_size = BitUtil::RoundUpToPowerOfTwo(chunk_size);
-    _write_buf = ByteBuffer::allocate(chunk_size);
+    SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker());
+    RETURN_IF_ERROR(ByteBuffer::allocate(chunk_size, &_write_buf));
     _write_buf->put_bytes(data + pos, size - pos);
     return Status::OK();
 }
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index dab359ed040..550e60c210c 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -846,6 +846,10 @@ Status PipelineFragmentContext::_create_sink(int 
sender_id, const TDataSink& thr
         break;
     }
     case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
+#ifndef NDEBUG
+        DCHECK(state->get_query_ctx() != nullptr);
+        state->get_query_ctx()->query_mem_tracker->is_group_commit_load = true;
+#endif
         sink_ = 
std::make_shared<GroupCommitBlockSinkOperatorBuilder>(next_operator_builder_id(),
                                                                       
_sink.get());
         break;
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index eea44d1ba8d..18061e04528 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -185,6 +185,9 @@ public:
     std::shared_ptr<MemTrackerLimiter> segcompaction_mem_tracker() {
         return _segcompaction_mem_tracker;
     }
+    std::shared_ptr<MemTrackerLimiter> stream_load_pipe_tracker() {
+        return _stream_load_pipe_tracker;
+    }
     std::shared_ptr<MemTrackerLimiter> point_query_executor_mem_tracker() {
         return _point_query_executor_mem_tracker;
     }
@@ -368,6 +371,7 @@ private:
     std::shared_ptr<MemTracker> _brpc_iobuf_block_memory_tracker;
     // Count the memory consumption of segment compaction tasks.
     std::shared_ptr<MemTrackerLimiter> _segcompaction_mem_tracker;
+    std::shared_ptr<MemTrackerLimiter> _stream_load_pipe_tracker;
 
     // Tracking memory may be shared between multiple queries.
     std::shared_ptr<MemTrackerLimiter> _point_query_executor_mem_tracker;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 8fb7bc96c0c..5d2ab598b33 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -539,6 +539,8 @@ void ExecEnv::init_mem_tracker() {
             MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, 
"SubcolumnsTree");
     _s3_file_buffer_tracker =
             MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, 
"S3FileBuffer");
+    _stream_load_pipe_tracker =
+            MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, 
"StreamLoadPipe");
 }
 
 void ExecEnv::_register_metrics() {
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp 
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index c1b5879dd11..bfe3a5441da 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -114,6 +114,8 @@ MemTrackerLimiter::~MemTrackerLimiter() {
             "mem tracker not equal to 0 when mem tracker destruct, this 
usually means that "
             "memory tracking is inaccurate and SCOPED_ATTACH_TASK and "
             "SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER are not used correctly. "
+            "If the log is truncated, search for `Address Sanitizer` in the 
be.INFO log to see "
+            "more information."
             "1. For query and load, memory leaks may have occurred, it is 
expected that the query "
             "mem tracker will be bound to the thread context using 
SCOPED_ATTACH_TASK and "
             "SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER before all memory alloc 
and free. "
@@ -127,7 +129,7 @@ MemTrackerLimiter::~MemTrackerLimiter() {
     if (_consumption->current_value() != 0) {
         // TODO, expect mem tracker equal to 0 at the load/compaction/etc. 
task end.
 #ifndef NDEBUG
-        if (_type == Type::QUERY) {
+        if (_type == Type::QUERY || (_type == Type::LOAD && 
!is_group_commit_load)) {
             std::string err_msg =
                     fmt::format("mem tracker label: {}, consumption: {}, peak 
consumption: {}, {}.",
                                 label(), _consumption->current_value(), 
_consumption->peak_value(),
@@ -140,11 +142,11 @@ MemTrackerLimiter::~MemTrackerLimiter() {
         }
         _consumption->set(0);
 #ifndef NDEBUG
-    } else if (!_address_sanitizers.empty()) {
-        LOG(INFO) << "[Address Sanitizer] consumption is 0, but address 
sanitizers not empty. "
-                  << ", mem tracker label: " << _label
-                  << ", peak consumption: " << _consumption->peak_value()
-                  << print_address_sanitizers();
+    } else if (!_address_sanitizers.empty() && !is_group_commit_load) {
+        LOG(FATAL) << "[Address Sanitizer] consumption is 0, but address 
sanitizers not empty. "
+                   << ", mem tracker label: " << _label
+                   << ", peak consumption: " << _consumption->peak_value()
+                   << print_address_sanitizers();
 #endif
     }
     memory_memtrackerlimiter_cnt << -1;
@@ -152,17 +154,17 @@ MemTrackerLimiter::~MemTrackerLimiter() {
 
 #ifndef NDEBUG
 void MemTrackerLimiter::add_address_sanitizers(void* buf, size_t size) {
-    if (_type == Type::QUERY) {
+    if (_type == Type::QUERY || (_type == Type::LOAD && 
!is_group_commit_load)) {
         std::lock_guard<std::mutex> l(_address_sanitizers_mtx);
         auto it = _address_sanitizers.find(buf);
         if (it != _address_sanitizers.end()) {
-            LOG(INFO) << "[Address Sanitizer] memory buf repeat add, mem 
tracker label: " << _label
-                      << ", consumption: " << _consumption->current_value()
-                      << ", peak consumption: " << _consumption->peak_value() 
<< ", buf: " << buf
-                      << ", size: " << size << ", old buf: " << it->first
-                      << ", old size: " << it->second.size
-                      << ", new stack_trace: " << get_stack_trace(1, 
"DISABLED")
-                      << ", old stack_trace: " << it->second.stack_trace;
+            _error_address_sanitizers.emplace_back(
+                    fmt::format("[Address Sanitizer] memory buf repeat add, 
mem tracker label: {}, "
+                                "consumption: {}, peak consumption: {}, buf: 
{}, size: {}, old "
+                                "buf: {}, old size: {}, new stack_trace: {}, 
old stack_trace: {}.",
+                                _label, _consumption->current_value(), 
_consumption->peak_value(),
+                                buf, size, it->first, it->second.size,
+                                get_stack_trace(1, "FULL_WITH_INLINE"), 
it->second.stack_trace));
         }
 
         // if alignment not equal to 0, maybe usable_size > size.
@@ -174,26 +176,26 @@ void MemTrackerLimiter::add_address_sanitizers(void* buf, 
size_t size) {
 }
 
 void MemTrackerLimiter::remove_address_sanitizers(void* buf, size_t size) {
-    if (_type == Type::QUERY) {
+    if (_type == Type::QUERY || (_type == Type::LOAD && 
!is_group_commit_load)) {
         std::lock_guard<std::mutex> l(_address_sanitizers_mtx);
         auto it = _address_sanitizers.find(buf);
         if (it != _address_sanitizers.end()) {
             if (it->second.size != size) {
-                LOG(INFO) << "[Address Sanitizer] free memory buf size 
inaccurate, mem tracker "
-                             "label: "
-                          << _label << ", consumption: " << 
_consumption->current_value()
-                          << ", peak consumption: " << 
_consumption->peak_value()
-                          << ", buf: " << buf << ", size: " << size << ", old 
buf: " << it->first
-                          << ", old size: " << it->second.size
-                          << ", new stack_trace: " << get_stack_trace(1, 
"DISABLED")
-                          << ", old stack_trace: " << it->second.stack_trace;
+                _error_address_sanitizers.emplace_back(fmt::format(
+                        "[Address Sanitizer] free memory buf size inaccurate, 
mem tracker label: "
+                        "{}, consumption: {}, peak consumption: {}, buf: {}, 
size: {}, old buf: "
+                        "{}, old size: {}, new stack_trace: {}, old 
stack_trace: {}.",
+                        _label, _consumption->current_value(), 
_consumption->peak_value(), buf,
+                        size, it->first, it->second.size, get_stack_trace(1, 
"FULL_WITH_INLINE"),
+                        it->second.stack_trace));
             }
             _address_sanitizers.erase(buf);
         } else {
-            LOG(INFO) << "[Address Sanitizer] memory buf not exist, mem 
tracker label: " << _label
-                      << ", consumption: " << _consumption->current_value()
-                      << ", peak consumption: " << _consumption->peak_value() 
<< ", buf: " << buf
-                      << ", size: " << size << ", stack_trace: " << 
get_stack_trace(1, "DISABLED");
+            _error_address_sanitizers.emplace_back(fmt::format(
+                    "[Address Sanitizer] memory buf not exist, mem tracker 
label: {}, consumption: "
+                    "{}, peak consumption: {}, buf: {}, size: {}, stack_trace: 
{}.",
+                    _label, _consumption->current_value(), 
_consumption->peak_value(), buf, size,
+                    get_stack_trace(1, "FULL_WITH_INLINE")));
         }
     }
 }
@@ -201,9 +203,20 @@ void MemTrackerLimiter::remove_address_sanitizers(void* 
buf, size_t size) {
 std::string MemTrackerLimiter::print_address_sanitizers() {
     std::lock_guard<std::mutex> l(_address_sanitizers_mtx);
     std::string detail = "[Address Sanitizer]:";
+    detail += "\n memory not be freed:";
     for (const auto& it : _address_sanitizers) {
-        detail += fmt::format("\n    {}, size {}, strack trace: {}", it.first, 
it.second.size,
-                              it.second.stack_trace);
+        auto msg = fmt::format(
+                "\n    [Address Sanitizer] buf not be freed, mem tracker 
label: {}, consumption: "
+                "{}, peak consumption: {}, buf: {}, size {}, strack trace: {}",
+                _label, _consumption->current_value(), 
_consumption->peak_value(), it.first,
+                it.second.size, it.second.stack_trace);
+        LOG(INFO) << msg;
+        detail += msg;
+    }
+    detail += "\n incorrect memory alloc and free:";
+    for (const auto& err_msg : _error_address_sanitizers) {
+        LOG(INFO) << err_msg;
+        detail += fmt::format("\n    {}", err_msg);
     }
     return detail;
 }
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h 
b/be/src/runtime/memory/mem_tracker_limiter.h
index e5c5cb1bc03..344f3dc92b6 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -209,6 +209,7 @@ public:
     void add_address_sanitizers(void* buf, size_t size);
     void remove_address_sanitizers(void* buf, size_t size);
     std::string print_address_sanitizers();
+    bool is_group_commit_load {false};
 #endif
 
     std::string debug_string() override {
@@ -260,6 +261,7 @@ private:
 
     std::mutex _address_sanitizers_mtx;
     std::unordered_map<void*, AddressSanitizer> _address_sanitizers;
+    std::vector<std::string> _error_address_sanitizers;
 #endif
 };
 
diff --git a/be/src/runtime/stream_load/stream_load_context.h 
b/be/src/runtime/stream_load/stream_load_context.h
index 1dc7ccf73ba..2ccf8ce5014 100644
--- a/be/src/runtime/stream_load/stream_load_context.h
+++ b/be/src/runtime/stream_load/stream_load_context.h
@@ -37,6 +37,7 @@
 #include "common/utils.h"
 #include "runtime/exec_env.h"
 #include "runtime/stream_load/stream_load_executor.h"
+#include "runtime/thread_context.h"
 #include "util/byte_buffer.h"
 #include "util/time.h"
 #include "util/uid_util.h"
@@ -118,6 +119,17 @@ public:
     // also print the load source info if detail is set to true
     std::string brief(bool detail = false) const;
 
+    Status allocate_schema_buffer() {
+        if (_schema_buffer == nullptr) {
+            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                    ExecEnv::GetInstance()->stream_load_pipe_tracker());
+            return ByteBuffer::allocate(config::stream_tvf_buffer_size, 
&_schema_buffer);
+        }
+        return Status::OK();
+    }
+
+    ByteBufferPtr schema_buffer() { return _schema_buffer; }
+
 public:
     static const int default_txn_id = -1;
     // load type, eg: ROUTINE LOAD/MANUAL LOAD
@@ -182,8 +194,6 @@ public:
     std::shared_ptr<MessageBodySink> body_sink;
     std::shared_ptr<io::StreamLoadPipe> pipe;
 
-    ByteBufferPtr schema_buffer = 
ByteBuffer::allocate(config::stream_tvf_buffer_size);
-
     TStreamLoadPutResult put_result;
     TStreamLoadMultiTablePutResult multi_table_put_result;
 
@@ -241,6 +251,7 @@ public:
 
 private:
     ExecEnv* _exec_env = nullptr;
+    ByteBufferPtr _schema_buffer;
 };
 
 } // namespace doris
diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp 
b/be/src/runtime/stream_load/stream_load_executor.cpp
index 0761b445bee..0616c6474aa 100644
--- a/be/src/runtime/stream_load/stream_load_executor.cpp
+++ b/be/src/runtime/stream_load/stream_load_executor.cpp
@@ -142,6 +142,10 @@ Status 
StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
                   << ", write_data_cost_ms=" << ctx->write_data_cost_nanos / 
1000000;
     };
 
+    // Reset thread memory tracker, otherwise SCOPED_ATTACH_TASK will be 
called nested, nesting is
+    // not allowed, first time in on_chunk_data, second time in 
StreamLoadExecutor::execute_plan_fragment.
+    
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
+
     if (ctx->put_result.__isset.params) {
         st = 
_exec_env->fragment_mgr()->exec_plan_fragment(ctx->put_result.params,
                                                            
QuerySource::STREAM_LOAD, exec_fragment);
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index 3c0139a02d7..885d616eb06 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -50,7 +50,7 @@
 // Used after SCOPED_ATTACH_TASK, in order to count the memory into another
 // MemTrackerLimiter instead of the MemTrackerLimiter added by the attach task.
 #define SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(arg1) \
-    auto VARNAME_LINENUM(switch_mem_tracker) = 
SwitchThreadMemTrackerLimiter(arg1)
+    auto VARNAME_LINENUM(switch_mem_tracker) = 
doris::SwitchThreadMemTrackerLimiter(arg1)
 
 // Looking forward to tracking memory during thread execution into MemTracker.
 // Usually used to record query more detailed memory, including ExecNode 
operators.
@@ -167,8 +167,7 @@ static std::string memory_orphan_check_msg =
         "each thread is expected to use SCOPED_ATTACH_TASK to bind a 
MemTrackerLimiter belonging "
         "to Query/Load/Compaction/Other Tasks, otherwise memory alloc using 
Doris Allocator in the "
         "thread will crash. If you want to switch MemTrackerLimiter during 
thread execution, "
-        "please use SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER, do not repeat 
Attach. Of course, you "
-        "can modify enable_memory_orphan_check=false in be.conf to avoid this 
crash.";
+        "please use SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER, do not repeat 
Attach.";
 
 // The thread context saves some info about a working thread.
 // 2 required info:
@@ -219,9 +218,9 @@ public:
         ss << std::this_thread::get_id();
         return ss.str();
     }
-    // After thread_mem_tracker_mgr is initialized, the current thread Hook 
starts to
-    // consume/release mem_tracker.
-    // Note that the use of shared_ptr will cause a crash. The guess is that 
there is an
+    // Note that if set global Memory Hook, After thread_mem_tracker_mgr is 
initialized,
+    // the current thread Hook starts to consume/release mem_tracker.
+    // the use of shared_ptr will cause a crash. The guess is that there is an
     // intermediate state during the copy construction of shared_ptr. 
Shared_ptr is not equal
     // to nullptr, but the object it points to is not initialized. At this 
time, when the memory
     // is released somewhere, the hook is triggered to cause the crash.
@@ -315,7 +314,7 @@ public:
                 // The brpc server should respond as quickly as possible.
                 bthread_context->thread_mem_tracker_mgr->disable_wait_gc();
                 // set the data so that next time bthread_getspecific in the 
thread returns the data.
-                CHECK(0 == bthread_setspecific(btls_key, bthread_context) || 
k_doris_exit);
+                CHECK(0 == bthread_setspecific(btls_key, bthread_context) || 
doris::k_doris_exit);
             }
             DCHECK(bthread_context != nullptr);
             bthread_context->thread_local_handle_count++;
@@ -357,7 +356,7 @@ static ThreadContext* thread_context(bool allow_return_null 
= false) {
         // in bthread
         // bthread switching pthread may be very frequent, remember not to use 
lock or other time-consuming operations.
         auto* bthread_context = 
static_cast<ThreadContext*>(bthread_getspecific(btls_key));
-        DCHECK(bthread_context != nullptr);
+        DCHECK(bthread_context != nullptr && 
bthread_context->thread_local_handle_count > 0);
         return bthread_context;
     }
     if (allow_return_null) {
@@ -443,30 +442,38 @@ public:
 
 class SwitchThreadMemTrackerLimiter {
 public:
-    explicit SwitchThreadMemTrackerLimiter(const 
std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
+    explicit SwitchThreadMemTrackerLimiter(
+            const std::shared_ptr<doris::MemTrackerLimiter>& mem_tracker) {
         DCHECK(mem_tracker);
-        ThreadLocalHandle::create_thread_local_if_not_exits();
-        _old_mem_tracker = 
thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
-        
thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker);
+        doris::ThreadLocalHandle::create_thread_local_if_not_exits();
+        if (mem_tracker != 
thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()) {
+            _old_mem_tracker = 
thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
+            
thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker);
+        }
     }
 
-    explicit SwitchThreadMemTrackerLimiter(const QueryThreadContext& 
query_thread_context) {
-        ThreadLocalHandle::create_thread_local_if_not_exits();
+    explicit SwitchThreadMemTrackerLimiter(const doris::QueryThreadContext& 
query_thread_context) {
+        doris::ThreadLocalHandle::create_thread_local_if_not_exits();
         DCHECK(thread_context()->task_id() ==
                query_thread_context.query_id); // workload group alse not 
change
         DCHECK(query_thread_context.query_mem_tracker);
-        _old_mem_tracker = 
thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
-        thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(
-                query_thread_context.query_mem_tracker);
+        if (query_thread_context.query_mem_tracker !=
+            thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()) {
+            _old_mem_tracker = 
thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
+            thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(
+                    query_thread_context.query_mem_tracker);
+        }
     }
 
     ~SwitchThreadMemTrackerLimiter() {
-        
thread_context()->thread_mem_tracker_mgr->detach_limiter_tracker(_old_mem_tracker);
-        ThreadLocalHandle::del_thread_local_if_count_is_zero();
+        if (_old_mem_tracker != nullptr) {
+            
thread_context()->thread_mem_tracker_mgr->detach_limiter_tracker(_old_mem_tracker);
+        }
+        doris::ThreadLocalHandle::del_thread_local_if_count_is_zero();
     }
 
 private:
-    std::shared_ptr<MemTrackerLimiter> _old_mem_tracker;
+    std::shared_ptr<doris::MemTrackerLimiter> _old_mem_tracker {nullptr};
 };
 
 class AddThreadMemTrackerConsumer {
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index b2b5841618d..04e616b53b7 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -785,7 +785,8 @@ void 
PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* c
 
         std::shared_ptr<MemTrackerLimiter> mem_tracker = 
MemTrackerLimiter::create_shared(
                 MemTrackerLimiter::Type::OTHER,
-                fmt::format("{}#{}", params.format_type, params.file_type));
+                fmt::format("InternalService::fetch_table_schema:{}#{}", 
params.format_type,
+                            params.file_type));
         SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker);
 
         // make sure profile is desctructed after reader cause 
PrefetchBufferedReader
diff --git a/be/src/util/byte_buffer.h b/be/src/util/byte_buffer.h
index aab8fd42db6..17764b9e4f6 100644
--- a/be/src/util/byte_buffer.h
+++ b/be/src/util/byte_buffer.h
@@ -23,19 +23,26 @@
 #include <memory>
 
 #include "common/logging.h"
+#include "common/status.h"
+#include "runtime/thread_context.h"
+#include "vec/common/allocator.h"
+#include "vec/common/allocator_fwd.h"
 
 namespace doris {
 
 struct ByteBuffer;
 using ByteBufferPtr = std::shared_ptr<ByteBuffer>;
 
-struct ByteBuffer {
-    static ByteBufferPtr allocate(size_t size) {
-        ByteBufferPtr ptr(new ByteBuffer(size));
-        return ptr;
+struct ByteBuffer : private Allocator<false> {
+    static Status allocate(const size_t size, ByteBufferPtr* ptr) {
+        RETURN_IF_CATCH_EXCEPTION({ *ptr = ByteBufferPtr(new 
ByteBuffer(size)); });
+        return Status::OK();
     }
 
-    ~ByteBuffer() { delete[] ptr; }
+    ~ByteBuffer() {
+        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker_);
+        Allocator<false>::free(ptr, capacity);
+    }
 
     void put_bytes(const char* data, size_t size) {
         memcpy(ptr + pos, data, size);
@@ -56,14 +63,21 @@ struct ByteBuffer {
     size_t remaining() const { return limit - pos; }
     bool has_remaining() const { return limit > pos; }
 
-    char* const ptr;
+    char* ptr;
     size_t pos;
     size_t limit;
     size_t capacity;
 
 private:
     ByteBuffer(size_t capacity_)
-            : ptr(new char[capacity_]), pos(0), limit(capacity_), 
capacity(capacity_) {}
+            : pos(0),
+              limit(capacity_),
+              capacity(capacity_),
+              
mem_tracker_(doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker())
 {
+        ptr = reinterpret_cast<char*>(Allocator<false>::alloc(capacity_));
+    }
+
+    std::shared_ptr<MemTrackerLimiter> mem_tracker_;
 };
 
 } // namespace doris
diff --git a/be/src/util/faststring.h b/be/src/util/faststring.h
index 9308a4d20bb..eae7db53625 100644
--- a/be/src/util/faststring.h
+++ b/be/src/util/faststring.h
@@ -85,7 +85,8 @@ public:
     OwnedSlice build() {
         uint8_t* ret = data_;
         if (ret == initial_data_) {
-            ret = reinterpret_cast<uint8_t*>(Allocator::alloc(len_));
+            ret = reinterpret_cast<uint8_t*>(Allocator::alloc(capacity_));
+            DCHECK(len_ <= capacity_);
             memcpy(ret, data_, len_);
         }
         OwnedSlice result(ret, len_, capacity_);
diff --git a/be/src/util/slice.h b/be/src/util/slice.h
index bae33d4ee75..c1a006a3f9f 100644
--- a/be/src/util/slice.h
+++ b/be/src/util/slice.h
@@ -358,7 +358,12 @@ public:
         return *this;
     }
 
-    ~OwnedSlice() { Allocator::free(_slice.data, _capacity); }
+    ~OwnedSlice() {
+        if (_slice.data != nullptr) {
+            DCHECK(_capacity != 0);
+            Allocator::free(_slice.data, _capacity);
+        }
+    }
 
     const Slice& slice() const { return _slice; }
 
diff --git a/be/src/vec/common/allocator.h b/be/src/vec/common/allocator.h
index 576ce86d928..39197c75f92 100644
--- a/be/src/vec/common/allocator.h
+++ b/be/src/vec/common/allocator.h
@@ -87,6 +87,10 @@ static constexpr size_t MALLOC_MIN_ALIGNMENT = 8;
 // is always a multiple of sixteen. 
(https://www.gnu.org/software/libc/manual/html_node/Aligned-Memory-Blocks.html)
 static constexpr int ALLOCATOR_ALIGNMENT_16 = 16;
 
+namespace doris {
+class MemTrackerLimiter;
+}
+
 class DefaultMemoryAllocator {
 public:
     static void* malloc(size_t size) __THROW { return std::malloc(size); }
@@ -228,6 +232,13 @@ public:
     // alloc will continue to execute, so the consume memtracker is forced.
     void memory_check(size_t size) const;
     // Increases consumption of this tracker by 'bytes'.
+    // some special cases:
+    // 1. objects that inherit Allocator will not be shared by multiple 
queries.
+    //  non-compliant: page cache, ORC ByteBuffer.
+    // 2. objects that inherit Allocator will only free memory allocated by 
themselves.
+    //  non-compliant: phmap, the memory alloced by an object may be 
transferred to another object and then free.
+    // 3. the memory tracker in TLS is the same during the construction of 
objects that inherit Allocator
+    //  and during subsequent memory allocation.
     void consume_memory(size_t size) const;
     void release_memory(size_t size) const;
     void throw_bad_alloc(const std::string& err) const;
diff --git a/be/src/vec/sink/vtablet_block_convertor.cpp 
b/be/src/vec/sink/vtablet_block_convertor.cpp
index 96de68f5976..086c9a3ddd0 100644
--- a/be/src/vec/sink/vtablet_block_convertor.cpp
+++ b/be/src/vec/sink/vtablet_block_convertor.cpp
@@ -182,12 +182,11 @@ DecimalType 
OlapTableBlockConvertor::_get_decimalv3_min_or_max(const TypeDescrip
     return DecimalType(value);
 }
 
-Status OlapTableBlockConvertor::_validate_column(RuntimeState* state, const 
TypeDescriptor& type,
-                                                 bool is_nullable, 
vectorized::ColumnPtr column,
-                                                 size_t slot_index, bool* 
stop_processing,
-                                                 fmt::memory_buffer& 
error_prefix,
-                                                 const uint32_t row_count,
-                                                 
vectorized::IColumn::Permutation* rows) {
+Status OlapTableBlockConvertor::_internal_validate_column(
+        RuntimeState* state, const TypeDescriptor& type, bool is_nullable,
+        vectorized::ColumnPtr column, size_t slot_index, bool* stop_processing,
+        fmt::memory_buffer& error_prefix, const uint32_t row_count,
+        vectorized::IColumn::Permutation* rows) {
     DCHECK((rows == nullptr) || (rows->size() == row_count));
     fmt::memory_buffer error_msg;
     auto set_invalid_and_append_error_msg = [&](int row) {
diff --git a/be/src/vec/sink/vtablet_block_convertor.h 
b/be/src/vec/sink/vtablet_block_convertor.h
index 0db340ce6c2..7f866c38032 100644
--- a/be/src/vec/sink/vtablet_block_convertor.h
+++ b/be/src/vec/sink/vtablet_block_convertor.h
@@ -69,7 +69,18 @@ private:
     Status _validate_column(RuntimeState* state, const TypeDescriptor& type, 
bool is_nullable,
                             vectorized::ColumnPtr column, size_t slot_index, 
bool* stop_processing,
                             fmt::memory_buffer& error_prefix, const uint32_t 
row_count,
-                            vectorized::IColumn::Permutation* rows = nullptr);
+                            vectorized::IColumn::Permutation* rows = nullptr) {
+        RETURN_IF_CATCH_EXCEPTION({
+            return _internal_validate_column(state, type, is_nullable, column, 
slot_index,
+                                             stop_processing, error_prefix, 
row_count, rows);
+        });
+    }
+
+    Status _internal_validate_column(RuntimeState* state, const 
TypeDescriptor& type,
+                                     bool is_nullable, vectorized::ColumnPtr 
column,
+                                     size_t slot_index, bool* stop_processing,
+                                     fmt::memory_buffer& error_prefix, const 
uint32_t row_count,
+                                     vectorized::IColumn::Permutation* rows = 
nullptr);
 
     // make input data valid for OLAP table
     // return number of invalid/filtered rows.
diff --git a/be/test/util/byte_buffer2_test.cpp 
b/be/test/util/byte_buffer2_test.cpp
index 04b62cd5fe8..73c38c9e404 100644
--- a/be/test/util/byte_buffer2_test.cpp
+++ b/be/test/util/byte_buffer2_test.cpp
@@ -32,7 +32,8 @@ public:
 };
 
 TEST_F(ByteBufferTest, normal) {
-    auto buf = ByteBuffer::allocate(4);
+    ByteBufferPtr buf;
+    Status st = ByteBuffer::allocate(4, &buf);
     EXPECT_EQ(0, buf->pos);
     EXPECT_EQ(4, buf->limit);
     EXPECT_EQ(4, buf->capacity);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to