This is an automated email from the ASF dual-hosted git repository.

zouxinyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 86246f78bd0 [opt](bytebuffer) allocate ByteBuffer memory by Allocator 
and make it exception safe (#38960)
86246f78bd0 is described below

commit 86246f78bd0973943fd7a2ff1159d4cd730a5d24
Author: hui lai <1353307...@qq.com>
AuthorDate: Mon Aug 12 11:17:43 2024 +0800

    [opt](bytebuffer) allocate ByteBuffer memory by Allocator and make it 
exception safe (#38960)
    
    At present, the memory allocation of `ByteBuffer` is done through `new
    char[capacity_]`. Now, it is uniformly allocated by `Allocator` for the
    following purposes:
    1. Better memory statistics
    2. Better support for memory limit check
---
 be/src/http/action/http_stream.cpp                 | 51 +++++++++++++---------
 be/src/http/action/stream_load.cpp                 | 32 +++++++++-----
 be/src/io/fs/stream_load_pipe.cpp                  |  7 ++-
 be/src/runtime/exec_env.h                          |  4 ++
 be/src/runtime/exec_env_init.cpp                   |  2 +
 be/src/runtime/stream_load/stream_load_context.h   |  8 +++-
 .../runtime/stream_load/stream_load_executor.cpp   |  4 ++
 be/src/util/byte_buffer.h                          | 19 +++++---
 8 files changed, 87 insertions(+), 40 deletions(-)

diff --git a/be/src/http/action/http_stream.cpp 
b/be/src/http/action/http_stream.cpp
index e7bfa839111..afeb251ca41 100644
--- a/be/src/http/action/http_stream.cpp
+++ b/be/src/http/action/http_stream.cpp
@@ -234,31 +234,40 @@ void HttpStreamAction::on_chunk_data(HttpRequest* req) {
     struct evhttp_request* ev_req = req->get_evhttp_request();
     auto evbuf = evhttp_request_get_input_buffer(ev_req);
 
+    SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker());
+
     int64_t start_read_data_time = MonotonicNanos();
     while (evbuffer_get_length(evbuf) > 0) {
-        auto bb = ByteBuffer::allocate(128 * 1024);
-        auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity);
-        bb->pos = remove_bytes;
-        bb->flip();
-        auto st = ctx->body_sink->append(bb);
-        // schema_buffer stores 1M of data for parsing column information
-        // need to determine whether to cache for the first time
-        if (ctx->is_read_schema) {
-            if (ctx->schema_buffer->pos + remove_bytes < 
config::stream_tvf_buffer_size) {
-                ctx->schema_buffer->put_bytes(bb->ptr, remove_bytes);
-            } else {
-                LOG(INFO) << "use a portion of data to request fe to obtain 
column information";
-                ctx->is_read_schema = false;
-                ctx->status = process_put(req, ctx);
+        try {
+            auto bb = ByteBuffer::allocate(128 * 1024);
+            auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity);
+            bb->pos = remove_bytes;
+            bb->flip();
+            auto st = ctx->body_sink->append(bb);
+            // schema_buffer stores 1M of data for parsing column information
+            // need to determine whether to cache for the first time
+            if (ctx->is_read_schema) {
+                if (ctx->schema_buffer->pos + remove_bytes < 
config::stream_tvf_buffer_size) {
+                    ctx->schema_buffer->put_bytes(bb->ptr, remove_bytes);
+                } else {
+                    LOG(INFO) << "use a portion of data to request fe to 
obtain column information";
+                    ctx->is_read_schema = false;
+                    ctx->status = process_put(req, ctx);
+                }
             }
+            if (!st.ok() && !ctx->status.ok()) {
+                LOG(WARNING) << "append body content failed. errmsg=" << st << 
", " << ctx->brief();
+                ctx->status = st;
+                return;
+            }
+            ctx->receive_bytes += remove_bytes;
+        } catch (const doris::Exception& e) {
+            if (e.code() == doris::ErrorCode::MEM_ALLOC_FAILED) {
+                ctx->status = Status::MemoryLimitExceeded(
+                        fmt::format("PreCatch error code:{}, {}, ", e.code(), 
e.to_string()));
+            }
+            ctx->status = Status::Error<false>(e.code(), e.to_string());
         }
-
-        if (!st.ok() && !ctx->status.ok()) {
-            LOG(WARNING) << "append body content failed. errmsg=" << st << ", 
" << ctx->brief();
-            ctx->status = st;
-            return;
-        }
-        ctx->receive_bytes += remove_bytes;
     }
     // after all the data has been read and it has not reached 1M, it will 
execute here
     if (ctx->is_read_schema) {
diff --git a/be/src/http/action/stream_load.cpp 
b/be/src/http/action/stream_load.cpp
index 75d6943d3c6..d0c5dff2075 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -340,19 +340,29 @@ void StreamLoadAction::on_chunk_data(HttpRequest* req) {
     struct evhttp_request* ev_req = req->get_evhttp_request();
     auto evbuf = evhttp_request_get_input_buffer(ev_req);
 
+    SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker());
+
     int64_t start_read_data_time = MonotonicNanos();
     while (evbuffer_get_length(evbuf) > 0) {
-        auto bb = ByteBuffer::allocate(128 * 1024);
-        auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity);
-        bb->pos = remove_bytes;
-        bb->flip();
-        auto st = ctx->body_sink->append(bb);
-        if (!st.ok()) {
-            LOG(WARNING) << "append body content failed. errmsg=" << st << ", 
" << ctx->brief();
-            ctx->status = st;
-            return;
-        }
-        ctx->receive_bytes += remove_bytes;
+        try {
+            auto bb = ByteBuffer::allocate(128 * 1024);
+            auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity);
+            bb->pos = remove_bytes;
+            bb->flip();
+            auto st = ctx->body_sink->append(bb);
+            if (!st.ok()) {
+                LOG(WARNING) << "append body content failed. errmsg=" << st << 
", " << ctx->brief();
+                ctx->status = st;
+                return;
+            }
+            ctx->receive_bytes += remove_bytes;
+        } catch (const doris::Exception& e) {
+            if (e.code() == doris::ErrorCode::MEM_ALLOC_FAILED) {
+                ctx->status = Status::MemoryLimitExceeded(
+                        fmt::format("PreCatch error code:{}, {}, ", e.code(), 
e.to_string()));
+            }
+            ctx->status = Status::Error<false>(e.code(), e.to_string());
+        }
     }
     int64_t read_data_time = MonotonicNanos() - start_read_data_time;
     int64_t last_receive_and_read_data_cost_nanos = 
ctx->receive_and_read_data_cost_nanos;
diff --git a/be/src/io/fs/stream_load_pipe.cpp 
b/be/src/io/fs/stream_load_pipe.cpp
index 21c3856a815..ce91a2e8391 100644
--- a/be/src/io/fs/stream_load_pipe.cpp
+++ b/be/src/io/fs/stream_load_pipe.cpp
@@ -111,7 +111,9 @@ Status 
StreamLoadPipe::read_one_message(std::unique_ptr<uint8_t[]>* data, size_t
 }
 
 Status StreamLoadPipe::append_and_flush(const char* data, size_t size, size_t 
proto_byte_size) {
-    ByteBufferPtr buf = ByteBuffer::allocate(BitUtil::RoundUpToPowerOfTwo(size 
+ 1));
+    SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker());
+    ByteBufferPtr buf;
+    RETURN_IF_ERROR_OR_CATCH_EXCEPTION(ByteBuffer::create_and_allocate(buf, 
128 * 1024));
     buf->put_bytes(data, size);
     buf->flip();
     return _append(buf, proto_byte_size);
@@ -145,7 +147,8 @@ Status StreamLoadPipe::append(const char* data, size_t 
size) {
     // need to allocate a new chunk, min chunk is 64k
     size_t chunk_size = std::max(_min_chunk_size, size - pos);
     chunk_size = BitUtil::RoundUpToPowerOfTwo(chunk_size);
-    _write_buf = ByteBuffer::allocate(chunk_size);
+    SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker());
+    
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(ByteBuffer::create_and_allocate(_write_buf, 
chunk_size));
     _write_buf->put_bytes(data + pos, size - pos);
     return Status::OK();
 }
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 04bd5aa672a..e686df2dfd6 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -180,6 +180,9 @@ public:
     std::shared_ptr<MemTrackerLimiter> segcompaction_mem_tracker() {
         return _segcompaction_mem_tracker;
     }
+    std::shared_ptr<MemTrackerLimiter> stream_load_pipe_tracker() {
+        return _stream_load_pipe_tracker;
+    }
     std::shared_ptr<MemTrackerLimiter> point_query_executor_mem_tracker() {
         return _point_query_executor_mem_tracker;
     }
@@ -358,6 +361,7 @@ private:
     std::shared_ptr<MemTracker> _brpc_iobuf_block_memory_tracker;
     // Count the memory consumption of segment compaction tasks.
     std::shared_ptr<MemTrackerLimiter> _segcompaction_mem_tracker;
+    std::shared_ptr<MemTrackerLimiter> _stream_load_pipe_tracker;
 
     // Tracking memory may be shared between multiple queries.
     std::shared_ptr<MemTrackerLimiter> _point_query_executor_mem_tracker;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 8c308f4b4d8..d160e9abdc2 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -616,6 +616,8 @@ void ExecEnv::init_mem_tracker() {
             MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, 
"SubcolumnsTree");
     _s3_file_buffer_tracker =
             MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, 
"S3FileBuffer");
+    _stream_load_pipe_tracker =
+            MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, 
"StreamLoadPipe");
 }
 
 void ExecEnv::_register_metrics() {
diff --git a/be/src/runtime/stream_load/stream_load_context.h 
b/be/src/runtime/stream_load/stream_load_context.h
index 633c3af428b..f7c4a0d474f 100644
--- a/be/src/runtime/stream_load/stream_load_context.h
+++ b/be/src/runtime/stream_load/stream_load_context.h
@@ -37,6 +37,7 @@
 #include "common/utils.h"
 #include "runtime/exec_env.h"
 #include "runtime/stream_load/stream_load_executor.h"
+#include "runtime/thread_context.h"
 #include "util/byte_buffer.h"
 #include "util/time.h"
 #include "util/uid_util.h"
@@ -95,9 +96,14 @@ class StreamLoadContext {
 public:
     StreamLoadContext(ExecEnv* exec_env) : id(UniqueId::gen_uid()), 
_exec_env(exec_env) {
         start_millis = UnixMillis();
+        SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker());
+        schema_buffer = ByteBuffer::allocate(config::stream_tvf_buffer_size);
     }
 
     ~StreamLoadContext() {
+        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                ExecEnv::GetInstance()->stream_load_pipe_tracker());
+        schema_buffer.reset();
         if (need_rollback) {
             _exec_env->stream_load_executor()->rollback_txn(this);
             need_rollback = false;
@@ -184,7 +190,7 @@ public:
     std::shared_ptr<MessageBodySink> body_sink;
     std::shared_ptr<io::StreamLoadPipe> pipe;
 
-    ByteBufferPtr schema_buffer = 
ByteBuffer::allocate(config::stream_tvf_buffer_size);
+    ByteBufferPtr schema_buffer;
 
     TStreamLoadPutResult put_result;
     TStreamLoadMultiTablePutResult multi_table_put_result;
diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp 
b/be/src/runtime/stream_load/stream_load_executor.cpp
index 28b0556aafd..4ddd29ac9c3 100644
--- a/be/src/runtime/stream_load/stream_load_executor.cpp
+++ b/be/src/runtime/stream_load/stream_load_executor.cpp
@@ -142,6 +142,10 @@ Status 
StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
                   << ", write_data_cost_ms=" << ctx->write_data_cost_nanos / 
1000000;
     };
 
+    // Reset thread memory tracker, otherwise SCOPED_ATTACH_TASK will be 
called nested, nesting is
+    // not allowed, first time in on_chunk_data, second time in 
StreamLoadExecutor::execute_plan_fragment.
+    
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
+
     if (ctx->put_result.__isset.params) {
         st = 
_exec_env->fragment_mgr()->exec_plan_fragment(ctx->put_result.params, 
exec_fragment);
     } else {
diff --git a/be/src/util/byte_buffer.h b/be/src/util/byte_buffer.h
index aab8fd42db6..e8eadf69e02 100644
--- a/be/src/util/byte_buffer.h
+++ b/be/src/util/byte_buffer.h
@@ -23,19 +23,27 @@
 #include <memory>
 
 #include "common/logging.h"
+#include "common/status.h"
+#include "vec/common/allocator.h"
+#include "vec/common/allocator_fwd.h"
 
 namespace doris {
 
 struct ByteBuffer;
 using ByteBufferPtr = std::shared_ptr<ByteBuffer>;
 
-struct ByteBuffer {
+struct ByteBuffer : private Allocator<false> {
     static ByteBufferPtr allocate(size_t size) {
         ByteBufferPtr ptr(new ByteBuffer(size));
         return ptr;
     }
 
-    ~ByteBuffer() { delete[] ptr; }
+    static Status create_and_allocate(ByteBufferPtr& ptr, size_t size) {
+        ptr = ByteBufferPtr(new ByteBuffer(size));
+        return Status::OK();
+    }
+
+    ~ByteBuffer() { Allocator<false>::free(ptr, capacity); }
 
     void put_bytes(const char* data, size_t size) {
         memcpy(ptr + pos, data, size);
@@ -56,14 +64,15 @@ struct ByteBuffer {
     size_t remaining() const { return limit - pos; }
     bool has_remaining() const { return limit > pos; }
 
-    char* const ptr;
+    char* ptr;
     size_t pos;
     size_t limit;
     size_t capacity;
 
 private:
-    ByteBuffer(size_t capacity_)
-            : ptr(new char[capacity_]), pos(0), limit(capacity_), 
capacity(capacity_) {}
+    ByteBuffer(size_t capacity_) : pos(0), limit(capacity_), 
capacity(capacity_) {
+        ptr = reinterpret_cast<char*>(Allocator<false>::alloc(capacity_));
+    }
 };
 
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to