This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new b52b572ade2 [branch-2.1](memory) When Load ends, check memory tracker value returns is equal to 0 (#40850) b52b572ade2 is described below commit b52b572ade243a21feb3fcda6618669f7c27355e Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Sun Sep 15 23:47:53 2024 +0800 [branch-2.1](memory) When Load ends, check memory tracker value returns is equal to 0 (#40850) pick #38960 #39908 #40043 #40092 #40016 #40439 --------- Co-authored-by: hui lai <1353307...@qq.com> Co-authored-by: yiguolei <676222...@qq.com> --- be/src/common/config.cpp | 4 ++ be/src/common/config.h | 3 + be/src/exec/data_sink.cpp | 8 +++ be/src/exec/exec_node.cpp | 4 ++ be/src/http/action/http_stream.cpp | 23 +++++-- be/src/http/action/stream_load.cpp | 11 +++- be/src/io/file_factory.cpp | 9 +-- be/src/io/fs/stream_load_pipe.cpp | 7 ++- be/src/pipeline/pipeline_fragment_context.cpp | 4 ++ be/src/runtime/exec_env.h | 4 ++ be/src/runtime/exec_env_init.cpp | 2 + be/src/runtime/memory/mem_tracker_limiter.cpp | 71 +++++++++++++--------- be/src/runtime/memory/mem_tracker_limiter.h | 2 + be/src/runtime/stream_load/stream_load_context.h | 15 ++++- .../runtime/stream_load/stream_load_executor.cpp | 4 ++ be/src/runtime/thread_context.h | 47 ++++++++------ be/src/service/internal_service.cpp | 3 +- be/src/util/byte_buffer.h | 28 ++++++--- be/src/util/faststring.h | 3 +- be/src/util/slice.h | 7 ++- be/src/vec/common/allocator.h | 11 ++++ be/src/vec/sink/vtablet_block_convertor.cpp | 11 ++-- be/src/vec/sink/vtablet_block_convertor.h | 13 +++- be/test/util/byte_buffer2_test.cpp | 3 +- 24 files changed, 214 insertions(+), 83 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 543492b7155..473842a1886 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -140,6 +140,10 @@ DEFINE_mInt64(stacktrace_in_alloc_large_memory_bytes, "2147483648"); DEFINE_mInt64(crash_in_alloc_large_memory_bytes, "-1"); +// default is true. if any memory tracking in Orphan mem tracker will report error. +// !! not modify the default value of this conf!! otherwise memory errors cannot be detected in time. +// allocator free memory not need to check, because when the thread memory tracker label is Orphan, +// use the tracker saved in Allocator. DEFINE_mBool(enable_memory_orphan_check, "true"); // The maximum time a thread waits for full GC. Currently only query will wait for full gc. diff --git a/be/src/common/config.h b/be/src/common/config.h index 1afd10a34a7..a3d4b35dce8 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -194,6 +194,9 @@ DECLARE_mInt64(stacktrace_in_alloc_large_memory_bytes); DECLARE_mInt64(crash_in_alloc_large_memory_bytes); // default is true. if any memory tracking in Orphan mem tracker will report error. +// !! not modify the default value of this conf!! otherwise memory errors cannot be detected in time. +// allocator free memory not need to check, because when the thread memory tracker label is Orphan, +// use the tracker saved in Allocator. DECLARE_mBool(enable_memory_orphan_check); // The maximum time a thread waits for a full GC. Currently only query will wait for full gc. diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp index dc651080298..0a05fb62e9b 100644 --- a/be/src/exec/data_sink.cpp +++ b/be/src/exec/data_sink.cpp @@ -181,6 +181,10 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: { Status status = Status::OK(); DCHECK(thrift_sink.__isset.olap_table_sink); +#ifndef NDEBUG + DCHECK(state->get_query_ctx() != nullptr); + state->get_query_ctx()->query_mem_tracker->is_group_commit_load = true; +#endif sink->reset(new vectorized::GroupCommitBlockSink(pool, row_desc, output_exprs, &status)); RETURN_IF_ERROR(status); break; @@ -349,6 +353,10 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: { Status status = Status::OK(); DCHECK(thrift_sink.__isset.olap_table_sink); +#ifndef NDEBUG + DCHECK(state->get_query_ctx() != nullptr); + state->get_query_ctx()->query_mem_tracker->is_group_commit_load = true; +#endif sink->reset(new vectorized::GroupCommitBlockSink(pool, row_desc, output_exprs, &status)); RETURN_IF_ERROR(status); break; diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 247ec2323d5..d0124a7141e 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -463,6 +463,10 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN return Status::OK(); case TPlanNodeType::GROUP_COMMIT_SCAN_NODE: +#ifndef NDEBUG + DCHECK(state->get_query_ctx() != nullptr); + state->get_query_ctx()->query_mem_tracker->is_group_commit_load = true; +#endif *node = pool->add(new vectorized::GroupCommitScanNode(pool, tnode, descs)); return Status::OK(); diff --git a/be/src/http/action/http_stream.cpp b/be/src/http/action/http_stream.cpp index 7dd85653002..83ce0ce82cc 100644 --- a/be/src/http/action/http_stream.cpp +++ b/be/src/http/action/http_stream.cpp @@ -234,26 +234,37 @@ void HttpStreamAction::on_chunk_data(HttpRequest* req) { struct evhttp_request* ev_req = req->get_evhttp_request(); auto evbuf = evhttp_request_get_input_buffer(ev_req); + SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker()); + int64_t start_read_data_time = MonotonicNanos(); + Status st = ctx->allocate_schema_buffer(); + if (!st.ok()) { + ctx->status = st; + return; + } while (evbuffer_get_length(evbuf) > 0) { - auto bb = ByteBuffer::allocate(128 * 1024); + ByteBufferPtr bb; + st = ByteBuffer::allocate(128 * 1024, &bb); + if (!st.ok()) { + ctx->status = st; + return; + } auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity); bb->pos = remove_bytes; bb->flip(); - auto st = ctx->body_sink->append(bb); + st = ctx->body_sink->append(bb); // schema_buffer stores 1M of data for parsing column information // need to determine whether to cache for the first time if (ctx->is_read_schema) { - if (ctx->schema_buffer->pos + remove_bytes < config::stream_tvf_buffer_size) { - ctx->schema_buffer->put_bytes(bb->ptr, remove_bytes); + if (ctx->schema_buffer()->pos + remove_bytes < config::stream_tvf_buffer_size) { + ctx->schema_buffer()->put_bytes(bb->ptr, remove_bytes); } else { LOG(INFO) << "use a portion of data to request fe to obtain column information"; ctx->is_read_schema = false; ctx->status = process_put(req, ctx); } } - - if (!st.ok() && !ctx->status.ok()) { + if (!st.ok()) { LOG(WARNING) << "append body content failed. errmsg=" << st << ", " << ctx->brief(); ctx->status = st; return; diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 3f32655cf14..2036043b4d4 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -339,13 +339,20 @@ void StreamLoadAction::on_chunk_data(HttpRequest* req) { struct evhttp_request* ev_req = req->get_evhttp_request(); auto evbuf = evhttp_request_get_input_buffer(ev_req); + SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker()); + int64_t start_read_data_time = MonotonicNanos(); while (evbuffer_get_length(evbuf) > 0) { - auto bb = ByteBuffer::allocate(128 * 1024); + ByteBufferPtr bb; + Status st = ByteBuffer::allocate(128 * 1024, &bb); + if (!st.ok()) { + ctx->status = st; + return; + } auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity); bb->pos = remove_bytes; bb->flip(); - auto st = ctx->body_sink->append(bb); + st = ctx->body_sink->append(bb); if (!st.ok()) { LOG(WARNING) << "append body content failed. errmsg=" << st << ", " << ctx->brief(); ctx->status = st; diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp index 4d6158f8f7e..95d53732088 100644 --- a/be/src/io/file_factory.cpp +++ b/be/src/io/file_factory.cpp @@ -161,13 +161,14 @@ Status FileFactory::create_pipe_reader(const TUniqueId& load_id, io::FileReaderS if (!stream_load_ctx) { return Status::InternalError("unknown stream load id: {}", UniqueId(load_id).to_string()); } - if (need_schema == true) { + if (need_schema) { + RETURN_IF_ERROR(stream_load_ctx->allocate_schema_buffer()); // Here, a portion of the data is processed to parse column information auto pipe = std::make_shared<io::StreamLoadPipe>( io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */, - stream_load_ctx->schema_buffer->pos /* total_length */); - stream_load_ctx->schema_buffer->flip(); - RETURN_IF_ERROR(pipe->append(stream_load_ctx->schema_buffer)); + stream_load_ctx->schema_buffer()->pos /* total_length */); + stream_load_ctx->schema_buffer()->flip(); + RETURN_IF_ERROR(pipe->append(stream_load_ctx->schema_buffer())); RETURN_IF_ERROR(pipe->finish()); *file_reader = std::move(pipe); } else { diff --git a/be/src/io/fs/stream_load_pipe.cpp b/be/src/io/fs/stream_load_pipe.cpp index ecce306bdf1..392125e6fc0 100644 --- a/be/src/io/fs/stream_load_pipe.cpp +++ b/be/src/io/fs/stream_load_pipe.cpp @@ -111,7 +111,9 @@ Status StreamLoadPipe::read_one_message(std::unique_ptr<uint8_t[]>* data, size_t } Status StreamLoadPipe::append_and_flush(const char* data, size_t size, size_t proto_byte_size) { - ByteBufferPtr buf = ByteBuffer::allocate(BitUtil::RoundUpToPowerOfTwo(size + 1)); + SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker()); + ByteBufferPtr buf; + RETURN_IF_ERROR(ByteBuffer::allocate(128 * 1024, &buf)); buf->put_bytes(data, size); buf->flip(); return _append(buf, proto_byte_size); @@ -145,7 +147,8 @@ Status StreamLoadPipe::append(const char* data, size_t size) { // need to allocate a new chunk, min chunk is 64k size_t chunk_size = std::max(_min_chunk_size, size - pos); chunk_size = BitUtil::RoundUpToPowerOfTwo(chunk_size); - _write_buf = ByteBuffer::allocate(chunk_size); + SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker()); + RETURN_IF_ERROR(ByteBuffer::allocate(chunk_size, &_write_buf)); _write_buf->put_bytes(data + pos, size - pos); return Status::OK(); } diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index dab359ed040..550e60c210c 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -846,6 +846,10 @@ Status PipelineFragmentContext::_create_sink(int sender_id, const TDataSink& thr break; } case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: { +#ifndef NDEBUG + DCHECK(state->get_query_ctx() != nullptr); + state->get_query_ctx()->query_mem_tracker->is_group_commit_load = true; +#endif sink_ = std::make_shared<GroupCommitBlockSinkOperatorBuilder>(next_operator_builder_id(), _sink.get()); break; diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index eea44d1ba8d..18061e04528 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -185,6 +185,9 @@ public: std::shared_ptr<MemTrackerLimiter> segcompaction_mem_tracker() { return _segcompaction_mem_tracker; } + std::shared_ptr<MemTrackerLimiter> stream_load_pipe_tracker() { + return _stream_load_pipe_tracker; + } std::shared_ptr<MemTrackerLimiter> point_query_executor_mem_tracker() { return _point_query_executor_mem_tracker; } @@ -368,6 +371,7 @@ private: std::shared_ptr<MemTracker> _brpc_iobuf_block_memory_tracker; // Count the memory consumption of segment compaction tasks. std::shared_ptr<MemTrackerLimiter> _segcompaction_mem_tracker; + std::shared_ptr<MemTrackerLimiter> _stream_load_pipe_tracker; // Tracking memory may be shared between multiple queries. std::shared_ptr<MemTrackerLimiter> _point_query_executor_mem_tracker; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 8fb7bc96c0c..5d2ab598b33 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -539,6 +539,8 @@ void ExecEnv::init_mem_tracker() { MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "SubcolumnsTree"); _s3_file_buffer_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "S3FileBuffer"); + _stream_load_pipe_tracker = + MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "StreamLoadPipe"); } void ExecEnv::_register_metrics() { diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index c1b5879dd11..bfe3a5441da 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -114,6 +114,8 @@ MemTrackerLimiter::~MemTrackerLimiter() { "mem tracker not equal to 0 when mem tracker destruct, this usually means that " "memory tracking is inaccurate and SCOPED_ATTACH_TASK and " "SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER are not used correctly. " + "If the log is truncated, search for `Address Sanitizer` in the be.INFO log to see " + "more information." "1. For query and load, memory leaks may have occurred, it is expected that the query " "mem tracker will be bound to the thread context using SCOPED_ATTACH_TASK and " "SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER before all memory alloc and free. " @@ -127,7 +129,7 @@ MemTrackerLimiter::~MemTrackerLimiter() { if (_consumption->current_value() != 0) { // TODO, expect mem tracker equal to 0 at the load/compaction/etc. task end. #ifndef NDEBUG - if (_type == Type::QUERY) { + if (_type == Type::QUERY || (_type == Type::LOAD && !is_group_commit_load)) { std::string err_msg = fmt::format("mem tracker label: {}, consumption: {}, peak consumption: {}, {}.", label(), _consumption->current_value(), _consumption->peak_value(), @@ -140,11 +142,11 @@ MemTrackerLimiter::~MemTrackerLimiter() { } _consumption->set(0); #ifndef NDEBUG - } else if (!_address_sanitizers.empty()) { - LOG(INFO) << "[Address Sanitizer] consumption is 0, but address sanitizers not empty. " - << ", mem tracker label: " << _label - << ", peak consumption: " << _consumption->peak_value() - << print_address_sanitizers(); + } else if (!_address_sanitizers.empty() && !is_group_commit_load) { + LOG(FATAL) << "[Address Sanitizer] consumption is 0, but address sanitizers not empty. " + << ", mem tracker label: " << _label + << ", peak consumption: " << _consumption->peak_value() + << print_address_sanitizers(); #endif } memory_memtrackerlimiter_cnt << -1; @@ -152,17 +154,17 @@ MemTrackerLimiter::~MemTrackerLimiter() { #ifndef NDEBUG void MemTrackerLimiter::add_address_sanitizers(void* buf, size_t size) { - if (_type == Type::QUERY) { + if (_type == Type::QUERY || (_type == Type::LOAD && !is_group_commit_load)) { std::lock_guard<std::mutex> l(_address_sanitizers_mtx); auto it = _address_sanitizers.find(buf); if (it != _address_sanitizers.end()) { - LOG(INFO) << "[Address Sanitizer] memory buf repeat add, mem tracker label: " << _label - << ", consumption: " << _consumption->current_value() - << ", peak consumption: " << _consumption->peak_value() << ", buf: " << buf - << ", size: " << size << ", old buf: " << it->first - << ", old size: " << it->second.size - << ", new stack_trace: " << get_stack_trace(1, "DISABLED") - << ", old stack_trace: " << it->second.stack_trace; + _error_address_sanitizers.emplace_back( + fmt::format("[Address Sanitizer] memory buf repeat add, mem tracker label: {}, " + "consumption: {}, peak consumption: {}, buf: {}, size: {}, old " + "buf: {}, old size: {}, new stack_trace: {}, old stack_trace: {}.", + _label, _consumption->current_value(), _consumption->peak_value(), + buf, size, it->first, it->second.size, + get_stack_trace(1, "FULL_WITH_INLINE"), it->second.stack_trace)); } // if alignment not equal to 0, maybe usable_size > size. @@ -174,26 +176,26 @@ void MemTrackerLimiter::add_address_sanitizers(void* buf, size_t size) { } void MemTrackerLimiter::remove_address_sanitizers(void* buf, size_t size) { - if (_type == Type::QUERY) { + if (_type == Type::QUERY || (_type == Type::LOAD && !is_group_commit_load)) { std::lock_guard<std::mutex> l(_address_sanitizers_mtx); auto it = _address_sanitizers.find(buf); if (it != _address_sanitizers.end()) { if (it->second.size != size) { - LOG(INFO) << "[Address Sanitizer] free memory buf size inaccurate, mem tracker " - "label: " - << _label << ", consumption: " << _consumption->current_value() - << ", peak consumption: " << _consumption->peak_value() - << ", buf: " << buf << ", size: " << size << ", old buf: " << it->first - << ", old size: " << it->second.size - << ", new stack_trace: " << get_stack_trace(1, "DISABLED") - << ", old stack_trace: " << it->second.stack_trace; + _error_address_sanitizers.emplace_back(fmt::format( + "[Address Sanitizer] free memory buf size inaccurate, mem tracker label: " + "{}, consumption: {}, peak consumption: {}, buf: {}, size: {}, old buf: " + "{}, old size: {}, new stack_trace: {}, old stack_trace: {}.", + _label, _consumption->current_value(), _consumption->peak_value(), buf, + size, it->first, it->second.size, get_stack_trace(1, "FULL_WITH_INLINE"), + it->second.stack_trace)); } _address_sanitizers.erase(buf); } else { - LOG(INFO) << "[Address Sanitizer] memory buf not exist, mem tracker label: " << _label - << ", consumption: " << _consumption->current_value() - << ", peak consumption: " << _consumption->peak_value() << ", buf: " << buf - << ", size: " << size << ", stack_trace: " << get_stack_trace(1, "DISABLED"); + _error_address_sanitizers.emplace_back(fmt::format( + "[Address Sanitizer] memory buf not exist, mem tracker label: {}, consumption: " + "{}, peak consumption: {}, buf: {}, size: {}, stack_trace: {}.", + _label, _consumption->current_value(), _consumption->peak_value(), buf, size, + get_stack_trace(1, "FULL_WITH_INLINE"))); } } } @@ -201,9 +203,20 @@ void MemTrackerLimiter::remove_address_sanitizers(void* buf, size_t size) { std::string MemTrackerLimiter::print_address_sanitizers() { std::lock_guard<std::mutex> l(_address_sanitizers_mtx); std::string detail = "[Address Sanitizer]:"; + detail += "\n memory not be freed:"; for (const auto& it : _address_sanitizers) { - detail += fmt::format("\n {}, size {}, strack trace: {}", it.first, it.second.size, - it.second.stack_trace); + auto msg = fmt::format( + "\n [Address Sanitizer] buf not be freed, mem tracker label: {}, consumption: " + "{}, peak consumption: {}, buf: {}, size {}, strack trace: {}", + _label, _consumption->current_value(), _consumption->peak_value(), it.first, + it.second.size, it.second.stack_trace); + LOG(INFO) << msg; + detail += msg; + } + detail += "\n incorrect memory alloc and free:"; + for (const auto& err_msg : _error_address_sanitizers) { + LOG(INFO) << err_msg; + detail += fmt::format("\n {}", err_msg); } return detail; } diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index e5c5cb1bc03..344f3dc92b6 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -209,6 +209,7 @@ public: void add_address_sanitizers(void* buf, size_t size); void remove_address_sanitizers(void* buf, size_t size); std::string print_address_sanitizers(); + bool is_group_commit_load {false}; #endif std::string debug_string() override { @@ -260,6 +261,7 @@ private: std::mutex _address_sanitizers_mtx; std::unordered_map<void*, AddressSanitizer> _address_sanitizers; + std::vector<std::string> _error_address_sanitizers; #endif }; diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index 1dc7ccf73ba..2ccf8ce5014 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -37,6 +37,7 @@ #include "common/utils.h" #include "runtime/exec_env.h" #include "runtime/stream_load/stream_load_executor.h" +#include "runtime/thread_context.h" #include "util/byte_buffer.h" #include "util/time.h" #include "util/uid_util.h" @@ -118,6 +119,17 @@ public: // also print the load source info if detail is set to true std::string brief(bool detail = false) const; + Status allocate_schema_buffer() { + if (_schema_buffer == nullptr) { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->stream_load_pipe_tracker()); + return ByteBuffer::allocate(config::stream_tvf_buffer_size, &_schema_buffer); + } + return Status::OK(); + } + + ByteBufferPtr schema_buffer() { return _schema_buffer; } + public: static const int default_txn_id = -1; // load type, eg: ROUTINE LOAD/MANUAL LOAD @@ -182,8 +194,6 @@ public: std::shared_ptr<MessageBodySink> body_sink; std::shared_ptr<io::StreamLoadPipe> pipe; - ByteBufferPtr schema_buffer = ByteBuffer::allocate(config::stream_tvf_buffer_size); - TStreamLoadPutResult put_result; TStreamLoadMultiTablePutResult multi_table_put_result; @@ -241,6 +251,7 @@ public: private: ExecEnv* _exec_env = nullptr; + ByteBufferPtr _schema_buffer; }; } // namespace doris diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index 0761b445bee..0616c6474aa 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -142,6 +142,10 @@ Status StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte << ", write_data_cost_ms=" << ctx->write_data_cost_nanos / 1000000; }; + // Reset thread memory tracker, otherwise SCOPED_ATTACH_TASK will be called nested, nesting is + // not allowed, first time in on_chunk_data, second time in StreamLoadExecutor::execute_plan_fragment. + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); + if (ctx->put_result.__isset.params) { st = _exec_env->fragment_mgr()->exec_plan_fragment(ctx->put_result.params, QuerySource::STREAM_LOAD, exec_fragment); diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index 3c0139a02d7..885d616eb06 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -50,7 +50,7 @@ // Used after SCOPED_ATTACH_TASK, in order to count the memory into another // MemTrackerLimiter instead of the MemTrackerLimiter added by the attach task. #define SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(arg1) \ - auto VARNAME_LINENUM(switch_mem_tracker) = SwitchThreadMemTrackerLimiter(arg1) + auto VARNAME_LINENUM(switch_mem_tracker) = doris::SwitchThreadMemTrackerLimiter(arg1) // Looking forward to tracking memory during thread execution into MemTracker. // Usually used to record query more detailed memory, including ExecNode operators. @@ -167,8 +167,7 @@ static std::string memory_orphan_check_msg = "each thread is expected to use SCOPED_ATTACH_TASK to bind a MemTrackerLimiter belonging " "to Query/Load/Compaction/Other Tasks, otherwise memory alloc using Doris Allocator in the " "thread will crash. If you want to switch MemTrackerLimiter during thread execution, " - "please use SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER, do not repeat Attach. Of course, you " - "can modify enable_memory_orphan_check=false in be.conf to avoid this crash."; + "please use SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER, do not repeat Attach."; // The thread context saves some info about a working thread. // 2 required info: @@ -219,9 +218,9 @@ public: ss << std::this_thread::get_id(); return ss.str(); } - // After thread_mem_tracker_mgr is initialized, the current thread Hook starts to - // consume/release mem_tracker. - // Note that the use of shared_ptr will cause a crash. The guess is that there is an + // Note that if set global Memory Hook, After thread_mem_tracker_mgr is initialized, + // the current thread Hook starts to consume/release mem_tracker. + // the use of shared_ptr will cause a crash. The guess is that there is an // intermediate state during the copy construction of shared_ptr. Shared_ptr is not equal // to nullptr, but the object it points to is not initialized. At this time, when the memory // is released somewhere, the hook is triggered to cause the crash. @@ -315,7 +314,7 @@ public: // The brpc server should respond as quickly as possible. bthread_context->thread_mem_tracker_mgr->disable_wait_gc(); // set the data so that next time bthread_getspecific in the thread returns the data. - CHECK(0 == bthread_setspecific(btls_key, bthread_context) || k_doris_exit); + CHECK(0 == bthread_setspecific(btls_key, bthread_context) || doris::k_doris_exit); } DCHECK(bthread_context != nullptr); bthread_context->thread_local_handle_count++; @@ -357,7 +356,7 @@ static ThreadContext* thread_context(bool allow_return_null = false) { // in bthread // bthread switching pthread may be very frequent, remember not to use lock or other time-consuming operations. auto* bthread_context = static_cast<ThreadContext*>(bthread_getspecific(btls_key)); - DCHECK(bthread_context != nullptr); + DCHECK(bthread_context != nullptr && bthread_context->thread_local_handle_count > 0); return bthread_context; } if (allow_return_null) { @@ -443,30 +442,38 @@ public: class SwitchThreadMemTrackerLimiter { public: - explicit SwitchThreadMemTrackerLimiter(const std::shared_ptr<MemTrackerLimiter>& mem_tracker) { + explicit SwitchThreadMemTrackerLimiter( + const std::shared_ptr<doris::MemTrackerLimiter>& mem_tracker) { DCHECK(mem_tracker); - ThreadLocalHandle::create_thread_local_if_not_exits(); - _old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker(); - thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker); + doris::ThreadLocalHandle::create_thread_local_if_not_exits(); + if (mem_tracker != thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()) { + _old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker(); + thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker); + } } - explicit SwitchThreadMemTrackerLimiter(const QueryThreadContext& query_thread_context) { - ThreadLocalHandle::create_thread_local_if_not_exits(); + explicit SwitchThreadMemTrackerLimiter(const doris::QueryThreadContext& query_thread_context) { + doris::ThreadLocalHandle::create_thread_local_if_not_exits(); DCHECK(thread_context()->task_id() == query_thread_context.query_id); // workload group alse not change DCHECK(query_thread_context.query_mem_tracker); - _old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker(); - thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker( - query_thread_context.query_mem_tracker); + if (query_thread_context.query_mem_tracker != + thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()) { + _old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker(); + thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker( + query_thread_context.query_mem_tracker); + } } ~SwitchThreadMemTrackerLimiter() { - thread_context()->thread_mem_tracker_mgr->detach_limiter_tracker(_old_mem_tracker); - ThreadLocalHandle::del_thread_local_if_count_is_zero(); + if (_old_mem_tracker != nullptr) { + thread_context()->thread_mem_tracker_mgr->detach_limiter_tracker(_old_mem_tracker); + } + doris::ThreadLocalHandle::del_thread_local_if_count_is_zero(); } private: - std::shared_ptr<MemTrackerLimiter> _old_mem_tracker; + std::shared_ptr<doris::MemTrackerLimiter> _old_mem_tracker {nullptr}; }; class AddThreadMemTrackerConsumer { diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index b2b5841618d..04e616b53b7 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -785,7 +785,8 @@ void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* c std::shared_ptr<MemTrackerLimiter> mem_tracker = MemTrackerLimiter::create_shared( MemTrackerLimiter::Type::OTHER, - fmt::format("{}#{}", params.format_type, params.file_type)); + fmt::format("InternalService::fetch_table_schema:{}#{}", params.format_type, + params.file_type)); SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker); // make sure profile is desctructed after reader cause PrefetchBufferedReader diff --git a/be/src/util/byte_buffer.h b/be/src/util/byte_buffer.h index aab8fd42db6..17764b9e4f6 100644 --- a/be/src/util/byte_buffer.h +++ b/be/src/util/byte_buffer.h @@ -23,19 +23,26 @@ #include <memory> #include "common/logging.h" +#include "common/status.h" +#include "runtime/thread_context.h" +#include "vec/common/allocator.h" +#include "vec/common/allocator_fwd.h" namespace doris { struct ByteBuffer; using ByteBufferPtr = std::shared_ptr<ByteBuffer>; -struct ByteBuffer { - static ByteBufferPtr allocate(size_t size) { - ByteBufferPtr ptr(new ByteBuffer(size)); - return ptr; +struct ByteBuffer : private Allocator<false> { + static Status allocate(const size_t size, ByteBufferPtr* ptr) { + RETURN_IF_CATCH_EXCEPTION({ *ptr = ByteBufferPtr(new ByteBuffer(size)); }); + return Status::OK(); } - ~ByteBuffer() { delete[] ptr; } + ~ByteBuffer() { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker_); + Allocator<false>::free(ptr, capacity); + } void put_bytes(const char* data, size_t size) { memcpy(ptr + pos, data, size); @@ -56,14 +63,21 @@ struct ByteBuffer { size_t remaining() const { return limit - pos; } bool has_remaining() const { return limit > pos; } - char* const ptr; + char* ptr; size_t pos; size_t limit; size_t capacity; private: ByteBuffer(size_t capacity_) - : ptr(new char[capacity_]), pos(0), limit(capacity_), capacity(capacity_) {} + : pos(0), + limit(capacity_), + capacity(capacity_), + mem_tracker_(doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()) { + ptr = reinterpret_cast<char*>(Allocator<false>::alloc(capacity_)); + } + + std::shared_ptr<MemTrackerLimiter> mem_tracker_; }; } // namespace doris diff --git a/be/src/util/faststring.h b/be/src/util/faststring.h index 9308a4d20bb..eae7db53625 100644 --- a/be/src/util/faststring.h +++ b/be/src/util/faststring.h @@ -85,7 +85,8 @@ public: OwnedSlice build() { uint8_t* ret = data_; if (ret == initial_data_) { - ret = reinterpret_cast<uint8_t*>(Allocator::alloc(len_)); + ret = reinterpret_cast<uint8_t*>(Allocator::alloc(capacity_)); + DCHECK(len_ <= capacity_); memcpy(ret, data_, len_); } OwnedSlice result(ret, len_, capacity_); diff --git a/be/src/util/slice.h b/be/src/util/slice.h index bae33d4ee75..c1a006a3f9f 100644 --- a/be/src/util/slice.h +++ b/be/src/util/slice.h @@ -358,7 +358,12 @@ public: return *this; } - ~OwnedSlice() { Allocator::free(_slice.data, _capacity); } + ~OwnedSlice() { + if (_slice.data != nullptr) { + DCHECK(_capacity != 0); + Allocator::free(_slice.data, _capacity); + } + } const Slice& slice() const { return _slice; } diff --git a/be/src/vec/common/allocator.h b/be/src/vec/common/allocator.h index 576ce86d928..39197c75f92 100644 --- a/be/src/vec/common/allocator.h +++ b/be/src/vec/common/allocator.h @@ -87,6 +87,10 @@ static constexpr size_t MALLOC_MIN_ALIGNMENT = 8; // is always a multiple of sixteen. (https://www.gnu.org/software/libc/manual/html_node/Aligned-Memory-Blocks.html) static constexpr int ALLOCATOR_ALIGNMENT_16 = 16; +namespace doris { +class MemTrackerLimiter; +} + class DefaultMemoryAllocator { public: static void* malloc(size_t size) __THROW { return std::malloc(size); } @@ -228,6 +232,13 @@ public: // alloc will continue to execute, so the consume memtracker is forced. void memory_check(size_t size) const; // Increases consumption of this tracker by 'bytes'. + // some special cases: + // 1. objects that inherit Allocator will not be shared by multiple queries. + // non-compliant: page cache, ORC ByteBuffer. + // 2. objects that inherit Allocator will only free memory allocated by themselves. + // non-compliant: phmap, the memory alloced by an object may be transferred to another object and then free. + // 3. the memory tracker in TLS is the same during the construction of objects that inherit Allocator + // and during subsequent memory allocation. void consume_memory(size_t size) const; void release_memory(size_t size) const; void throw_bad_alloc(const std::string& err) const; diff --git a/be/src/vec/sink/vtablet_block_convertor.cpp b/be/src/vec/sink/vtablet_block_convertor.cpp index 96de68f5976..086c9a3ddd0 100644 --- a/be/src/vec/sink/vtablet_block_convertor.cpp +++ b/be/src/vec/sink/vtablet_block_convertor.cpp @@ -182,12 +182,11 @@ DecimalType OlapTableBlockConvertor::_get_decimalv3_min_or_max(const TypeDescrip return DecimalType(value); } -Status OlapTableBlockConvertor::_validate_column(RuntimeState* state, const TypeDescriptor& type, - bool is_nullable, vectorized::ColumnPtr column, - size_t slot_index, bool* stop_processing, - fmt::memory_buffer& error_prefix, - const uint32_t row_count, - vectorized::IColumn::Permutation* rows) { +Status OlapTableBlockConvertor::_internal_validate_column( + RuntimeState* state, const TypeDescriptor& type, bool is_nullable, + vectorized::ColumnPtr column, size_t slot_index, bool* stop_processing, + fmt::memory_buffer& error_prefix, const uint32_t row_count, + vectorized::IColumn::Permutation* rows) { DCHECK((rows == nullptr) || (rows->size() == row_count)); fmt::memory_buffer error_msg; auto set_invalid_and_append_error_msg = [&](int row) { diff --git a/be/src/vec/sink/vtablet_block_convertor.h b/be/src/vec/sink/vtablet_block_convertor.h index 0db340ce6c2..7f866c38032 100644 --- a/be/src/vec/sink/vtablet_block_convertor.h +++ b/be/src/vec/sink/vtablet_block_convertor.h @@ -69,7 +69,18 @@ private: Status _validate_column(RuntimeState* state, const TypeDescriptor& type, bool is_nullable, vectorized::ColumnPtr column, size_t slot_index, bool* stop_processing, fmt::memory_buffer& error_prefix, const uint32_t row_count, - vectorized::IColumn::Permutation* rows = nullptr); + vectorized::IColumn::Permutation* rows = nullptr) { + RETURN_IF_CATCH_EXCEPTION({ + return _internal_validate_column(state, type, is_nullable, column, slot_index, + stop_processing, error_prefix, row_count, rows); + }); + } + + Status _internal_validate_column(RuntimeState* state, const TypeDescriptor& type, + bool is_nullable, vectorized::ColumnPtr column, + size_t slot_index, bool* stop_processing, + fmt::memory_buffer& error_prefix, const uint32_t row_count, + vectorized::IColumn::Permutation* rows = nullptr); // make input data valid for OLAP table // return number of invalid/filtered rows. diff --git a/be/test/util/byte_buffer2_test.cpp b/be/test/util/byte_buffer2_test.cpp index 04b62cd5fe8..73c38c9e404 100644 --- a/be/test/util/byte_buffer2_test.cpp +++ b/be/test/util/byte_buffer2_test.cpp @@ -32,7 +32,8 @@ public: }; TEST_F(ByteBufferTest, normal) { - auto buf = ByteBuffer::allocate(4); + ByteBufferPtr buf; + Status st = ByteBuffer::allocate(4, &buf); EXPECT_EQ(0, buf->pos); EXPECT_EQ(4, buf->limit); EXPECT_EQ(4, buf->capacity); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org