This is an automated email from the ASF dual-hosted git repository. zouxinyi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 86246f78bd0 [opt](bytebuffer) allocate ByteBuffer memory by Allocator and make it exception safe (#38960) 86246f78bd0 is described below commit 86246f78bd0973943fd7a2ff1159d4cd730a5d24 Author: hui lai <1353307...@qq.com> AuthorDate: Mon Aug 12 11:17:43 2024 +0800 [opt](bytebuffer) allocate ByteBuffer memory by Allocator and make it exception safe (#38960) At present, the memory allocation of `ByteBuffer` is done through `new char[capacity_]`. Now, it is uniformly allocated by `Allocator` for the following purposes: 1. Better memory statistics 2. Better support for memory limit check --- be/src/http/action/http_stream.cpp | 51 +++++++++++++--------- be/src/http/action/stream_load.cpp | 32 +++++++++----- be/src/io/fs/stream_load_pipe.cpp | 7 ++- be/src/runtime/exec_env.h | 4 ++ be/src/runtime/exec_env_init.cpp | 2 + be/src/runtime/stream_load/stream_load_context.h | 8 +++- .../runtime/stream_load/stream_load_executor.cpp | 4 ++ be/src/util/byte_buffer.h | 19 +++++--- 8 files changed, 87 insertions(+), 40 deletions(-) diff --git a/be/src/http/action/http_stream.cpp b/be/src/http/action/http_stream.cpp index e7bfa839111..afeb251ca41 100644 --- a/be/src/http/action/http_stream.cpp +++ b/be/src/http/action/http_stream.cpp @@ -234,31 +234,40 @@ void HttpStreamAction::on_chunk_data(HttpRequest* req) { struct evhttp_request* ev_req = req->get_evhttp_request(); auto evbuf = evhttp_request_get_input_buffer(ev_req); + SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker()); + int64_t start_read_data_time = MonotonicNanos(); while (evbuffer_get_length(evbuf) > 0) { - auto bb = ByteBuffer::allocate(128 * 1024); - auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity); - bb->pos = remove_bytes; - bb->flip(); - auto st = ctx->body_sink->append(bb); - // schema_buffer stores 1M of data for parsing column information - // need to determine whether to cache for the first time - if (ctx->is_read_schema) { - if (ctx->schema_buffer->pos + remove_bytes < config::stream_tvf_buffer_size) { - ctx->schema_buffer->put_bytes(bb->ptr, remove_bytes); - } else { - LOG(INFO) << "use a portion of data to request fe to obtain column information"; - ctx->is_read_schema = false; - ctx->status = process_put(req, ctx); + try { + auto bb = ByteBuffer::allocate(128 * 1024); + auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity); + bb->pos = remove_bytes; + bb->flip(); + auto st = ctx->body_sink->append(bb); + // schema_buffer stores 1M of data for parsing column information + // need to determine whether to cache for the first time + if (ctx->is_read_schema) { + if (ctx->schema_buffer->pos + remove_bytes < config::stream_tvf_buffer_size) { + ctx->schema_buffer->put_bytes(bb->ptr, remove_bytes); + } else { + LOG(INFO) << "use a portion of data to request fe to obtain column information"; + ctx->is_read_schema = false; + ctx->status = process_put(req, ctx); + } } + if (!st.ok() && !ctx->status.ok()) { + LOG(WARNING) << "append body content failed. errmsg=" << st << ", " << ctx->brief(); + ctx->status = st; + return; + } + ctx->receive_bytes += remove_bytes; + } catch (const doris::Exception& e) { + if (e.code() == doris::ErrorCode::MEM_ALLOC_FAILED) { + ctx->status = Status::MemoryLimitExceeded( + fmt::format("PreCatch error code:{}, {}, ", e.code(), e.to_string())); + } + ctx->status = Status::Error<false>(e.code(), e.to_string()); } - - if (!st.ok() && !ctx->status.ok()) { - LOG(WARNING) << "append body content failed. errmsg=" << st << ", " << ctx->brief(); - ctx->status = st; - return; - } - ctx->receive_bytes += remove_bytes; } // after all the data has been read and it has not reached 1M, it will execute here if (ctx->is_read_schema) { diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 75d6943d3c6..d0c5dff2075 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -340,19 +340,29 @@ void StreamLoadAction::on_chunk_data(HttpRequest* req) { struct evhttp_request* ev_req = req->get_evhttp_request(); auto evbuf = evhttp_request_get_input_buffer(ev_req); + SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker()); + int64_t start_read_data_time = MonotonicNanos(); while (evbuffer_get_length(evbuf) > 0) { - auto bb = ByteBuffer::allocate(128 * 1024); - auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity); - bb->pos = remove_bytes; - bb->flip(); - auto st = ctx->body_sink->append(bb); - if (!st.ok()) { - LOG(WARNING) << "append body content failed. errmsg=" << st << ", " << ctx->brief(); - ctx->status = st; - return; - } - ctx->receive_bytes += remove_bytes; + try { + auto bb = ByteBuffer::allocate(128 * 1024); + auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity); + bb->pos = remove_bytes; + bb->flip(); + auto st = ctx->body_sink->append(bb); + if (!st.ok()) { + LOG(WARNING) << "append body content failed. errmsg=" << st << ", " << ctx->brief(); + ctx->status = st; + return; + } + ctx->receive_bytes += remove_bytes; + } catch (const doris::Exception& e) { + if (e.code() == doris::ErrorCode::MEM_ALLOC_FAILED) { + ctx->status = Status::MemoryLimitExceeded( + fmt::format("PreCatch error code:{}, {}, ", e.code(), e.to_string())); + } + ctx->status = Status::Error<false>(e.code(), e.to_string()); + } } int64_t read_data_time = MonotonicNanos() - start_read_data_time; int64_t last_receive_and_read_data_cost_nanos = ctx->receive_and_read_data_cost_nanos; diff --git a/be/src/io/fs/stream_load_pipe.cpp b/be/src/io/fs/stream_load_pipe.cpp index 21c3856a815..ce91a2e8391 100644 --- a/be/src/io/fs/stream_load_pipe.cpp +++ b/be/src/io/fs/stream_load_pipe.cpp @@ -111,7 +111,9 @@ Status StreamLoadPipe::read_one_message(std::unique_ptr<uint8_t[]>* data, size_t } Status StreamLoadPipe::append_and_flush(const char* data, size_t size, size_t proto_byte_size) { - ByteBufferPtr buf = ByteBuffer::allocate(BitUtil::RoundUpToPowerOfTwo(size + 1)); + SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker()); + ByteBufferPtr buf; + RETURN_IF_ERROR_OR_CATCH_EXCEPTION(ByteBuffer::create_and_allocate(buf, 128 * 1024)); buf->put_bytes(data, size); buf->flip(); return _append(buf, proto_byte_size); @@ -145,7 +147,8 @@ Status StreamLoadPipe::append(const char* data, size_t size) { // need to allocate a new chunk, min chunk is 64k size_t chunk_size = std::max(_min_chunk_size, size - pos); chunk_size = BitUtil::RoundUpToPowerOfTwo(chunk_size); - _write_buf = ByteBuffer::allocate(chunk_size); + SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker()); + RETURN_IF_ERROR_OR_CATCH_EXCEPTION(ByteBuffer::create_and_allocate(_write_buf, chunk_size)); _write_buf->put_bytes(data + pos, size - pos); return Status::OK(); } diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 04bd5aa672a..e686df2dfd6 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -180,6 +180,9 @@ public: std::shared_ptr<MemTrackerLimiter> segcompaction_mem_tracker() { return _segcompaction_mem_tracker; } + std::shared_ptr<MemTrackerLimiter> stream_load_pipe_tracker() { + return _stream_load_pipe_tracker; + } std::shared_ptr<MemTrackerLimiter> point_query_executor_mem_tracker() { return _point_query_executor_mem_tracker; } @@ -358,6 +361,7 @@ private: std::shared_ptr<MemTracker> _brpc_iobuf_block_memory_tracker; // Count the memory consumption of segment compaction tasks. std::shared_ptr<MemTrackerLimiter> _segcompaction_mem_tracker; + std::shared_ptr<MemTrackerLimiter> _stream_load_pipe_tracker; // Tracking memory may be shared between multiple queries. std::shared_ptr<MemTrackerLimiter> _point_query_executor_mem_tracker; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 8c308f4b4d8..d160e9abdc2 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -616,6 +616,8 @@ void ExecEnv::init_mem_tracker() { MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "SubcolumnsTree"); _s3_file_buffer_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "S3FileBuffer"); + _stream_load_pipe_tracker = + MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "StreamLoadPipe"); } void ExecEnv::_register_metrics() { diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index 633c3af428b..f7c4a0d474f 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -37,6 +37,7 @@ #include "common/utils.h" #include "runtime/exec_env.h" #include "runtime/stream_load/stream_load_executor.h" +#include "runtime/thread_context.h" #include "util/byte_buffer.h" #include "util/time.h" #include "util/uid_util.h" @@ -95,9 +96,14 @@ class StreamLoadContext { public: StreamLoadContext(ExecEnv* exec_env) : id(UniqueId::gen_uid()), _exec_env(exec_env) { start_millis = UnixMillis(); + SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker()); + schema_buffer = ByteBuffer::allocate(config::stream_tvf_buffer_size); } ~StreamLoadContext() { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->stream_load_pipe_tracker()); + schema_buffer.reset(); if (need_rollback) { _exec_env->stream_load_executor()->rollback_txn(this); need_rollback = false; @@ -184,7 +190,7 @@ public: std::shared_ptr<MessageBodySink> body_sink; std::shared_ptr<io::StreamLoadPipe> pipe; - ByteBufferPtr schema_buffer = ByteBuffer::allocate(config::stream_tvf_buffer_size); + ByteBufferPtr schema_buffer; TStreamLoadPutResult put_result; TStreamLoadMultiTablePutResult multi_table_put_result; diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index 28b0556aafd..4ddd29ac9c3 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -142,6 +142,10 @@ Status StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte << ", write_data_cost_ms=" << ctx->write_data_cost_nanos / 1000000; }; + // Reset thread memory tracker, otherwise SCOPED_ATTACH_TASK will be called nested, nesting is + // not allowed, first time in on_chunk_data, second time in StreamLoadExecutor::execute_plan_fragment. + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); + if (ctx->put_result.__isset.params) { st = _exec_env->fragment_mgr()->exec_plan_fragment(ctx->put_result.params, exec_fragment); } else { diff --git a/be/src/util/byte_buffer.h b/be/src/util/byte_buffer.h index aab8fd42db6..e8eadf69e02 100644 --- a/be/src/util/byte_buffer.h +++ b/be/src/util/byte_buffer.h @@ -23,19 +23,27 @@ #include <memory> #include "common/logging.h" +#include "common/status.h" +#include "vec/common/allocator.h" +#include "vec/common/allocator_fwd.h" namespace doris { struct ByteBuffer; using ByteBufferPtr = std::shared_ptr<ByteBuffer>; -struct ByteBuffer { +struct ByteBuffer : private Allocator<false> { static ByteBufferPtr allocate(size_t size) { ByteBufferPtr ptr(new ByteBuffer(size)); return ptr; } - ~ByteBuffer() { delete[] ptr; } + static Status create_and_allocate(ByteBufferPtr& ptr, size_t size) { + ptr = ByteBufferPtr(new ByteBuffer(size)); + return Status::OK(); + } + + ~ByteBuffer() { Allocator<false>::free(ptr, capacity); } void put_bytes(const char* data, size_t size) { memcpy(ptr + pos, data, size); @@ -56,14 +64,15 @@ struct ByteBuffer { size_t remaining() const { return limit - pos; } bool has_remaining() const { return limit > pos; } - char* const ptr; + char* ptr; size_t pos; size_t limit; size_t capacity; private: - ByteBuffer(size_t capacity_) - : ptr(new char[capacity_]), pos(0), limit(capacity_), capacity(capacity_) {} + ByteBuffer(size_t capacity_) : pos(0), limit(capacity_), capacity(capacity_) { + ptr = reinterpret_cast<char*>(Allocator<false>::alloc(capacity_)); + } }; } // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org