This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new f16615a1fc1 [branch-2.1](memory) Allocator support address sanitizers (#40836) f16615a1fc1 is described below commit f16615a1fc1fbb80c3155299bbac3001f8e68811 Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Sat Sep 14 12:12:44 2024 +0800 [branch-2.1](memory) Allocator support address sanitizers (#40836) pick #33396 #33862 #33853 #33732 #33841 #33933 #34901 #35014 --------- Co-authored-by: yiguolei <676222...@qq.com> Co-authored-by: yiguolei <yiguo...@gmail.com> --- be/src/common/config.cpp | 1 + be/src/common/config.h | 1 + be/src/olap/task/engine_alter_tablet_task.cpp | 2 +- be/src/olap/task/engine_publish_version_task.cpp | 4 +- be/src/olap/task/engine_publish_version_task.h | 2 +- be/src/runtime/exec_env.h | 10 +++ be/src/runtime/exec_env_init.cpp | 4 ++ be/src/runtime/fold_constant_executor.cpp | 2 +- be/src/runtime/fragment_mgr.cpp | 1 + be/src/runtime/memory/mem_tracker_limiter.cpp | 84 ++++++++++++++++++++++-- be/src/runtime/memory/mem_tracker_limiter.h | 16 +++++ be/src/runtime/thread_context.h | 2 +- be/src/service/backend_service.cpp | 2 +- be/src/service/internal_service.cpp | 2 +- be/src/service/point_query_executor.cpp | 11 ++-- be/src/service/point_query_executor.h | 1 - be/src/util/block_compression.cpp | 80 ++++++++++++++++++---- be/src/vec/common/allocator.cpp | 24 +++++++ be/src/vec/common/allocator.h | 25 +++++++ be/src/vec/common/pod_array_fwd.h | 7 +- bin/run-fs-benchmark.sh | 2 +- bin/start_be.sh | 3 +- run-be-ut.sh | 3 +- 23 files changed, 250 insertions(+), 39 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 1bad8b7e4cc..543492b7155 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1106,6 +1106,7 @@ DEFINE_mString(kerberos_krb5_conf_path, "/etc/krb5.conf"); DEFINE_mString(get_stack_trace_tool, "libunwind"); DEFINE_mString(dwarf_location_info_mode, "FAST"); +DEFINE_mBool(enable_address_sanitizers_with_stack_trace, "true"); // the ratio of _prefetch_size/_batch_size in AutoIncIDBuffer DEFINE_mInt64(auto_inc_prefetch_size_ratio, "10"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 45279290c0c..1afd10a34a7 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1158,6 +1158,7 @@ DECLARE_mString(kerberos_krb5_conf_path); // Values include `none`, `glog`, `boost`, `glibc`, `libunwind` DECLARE_mString(get_stack_trace_tool); +DECLARE_mBool(enable_address_sanitizers_with_stack_trace); // DISABLED: Don't resolve location info. // FAST: Perform CU lookup using .debug_aranges (might be incomplete). diff --git a/be/src/olap/task/engine_alter_tablet_task.cpp b/be/src/olap/task/engine_alter_tablet_task.cpp index f73a4f8a82d..8a8d27b7657 100644 --- a/be/src/olap/task/engine_alter_tablet_task.cpp +++ b/be/src/olap/task/engine_alter_tablet_task.cpp @@ -39,7 +39,7 @@ EngineAlterTabletTask::EngineAlterTabletTask(const TAlterTabletReqV2& request) ->get_storage_engine() ->memory_limitation_bytes_per_thread_for_schema_change(); _mem_tracker = MemTrackerLimiter::create_shared( - MemTrackerLimiter::Type::SCHEMA_CHANGE, + MemTrackerLimiter::Type::OTHER, fmt::format("EngineAlterTabletTask#baseTabletId={}:newTabletId={}", std::to_string(_alter_tablet_req.base_tablet_id), std::to_string(_alter_tablet_req.new_tablet_id)), diff --git a/be/src/olap/task/engine_publish_version_task.cpp b/be/src/olap/task/engine_publish_version_task.cpp index 6108e81bae3..9601cad88d1 100644 --- a/be/src/olap/task/engine_publish_version_task.cpp +++ b/be/src/olap/task/engine_publish_version_task.cpp @@ -81,7 +81,7 @@ EnginePublishVersionTask::EnginePublishVersionTask( _succ_tablets(succ_tablets), _discontinuous_version_tablets(discontinuous_version_tablets), _table_id_to_num_delta_rows(table_id_to_num_delta_rows) { - _mem_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::SCHEMA_CHANGE, + _mem_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER, "TabletPublishTxnTask"); } @@ -370,7 +370,7 @@ TabletPublishTxnTask::TabletPublishTxnTask(EnginePublishVersionTask* engine_task _transaction_id(transaction_id), _version(version), _tablet_info(tablet_info), - _mem_tracker(MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::SCHEMA_CHANGE, + _mem_tracker(MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER, "TabletPublishTxnTask")) { _stats.submit_time_us = MonotonicMicros(); } diff --git a/be/src/olap/task/engine_publish_version_task.h b/be/src/olap/task/engine_publish_version_task.h index 2b8b5540b22..cf03c0537b7 100644 --- a/be/src/olap/task/engine_publish_version_task.h +++ b/be/src/olap/task/engine_publish_version_task.h @@ -119,7 +119,7 @@ public: _partition_id(partition_id), _transaction_id(transaction_id), _version(version), - _mem_tracker(MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::SCHEMA_CHANGE, + _mem_tracker(MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER, "AsyncTabletPublishTask")) { _stats.submit_time_us = MonotonicMicros(); } diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 4b2478ccf99..eea44d1ba8d 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -185,6 +185,12 @@ public: std::shared_ptr<MemTrackerLimiter> segcompaction_mem_tracker() { return _segcompaction_mem_tracker; } + std::shared_ptr<MemTrackerLimiter> point_query_executor_mem_tracker() { + return _point_query_executor_mem_tracker; + } + std::shared_ptr<MemTrackerLimiter> block_compression_mem_tracker() { + return _block_compression_mem_tracker; + } std::shared_ptr<MemTrackerLimiter> rowid_storage_reader_tracker() { return _rowid_storage_reader_tracker; } @@ -363,6 +369,10 @@ private: // Count the memory consumption of segment compaction tasks. std::shared_ptr<MemTrackerLimiter> _segcompaction_mem_tracker; + // Tracking memory may be shared between multiple queries. + std::shared_ptr<MemTrackerLimiter> _point_query_executor_mem_tracker; + std::shared_ptr<MemTrackerLimiter> _block_compression_mem_tracker; + // TODO, looking forward to more accurate tracking. std::shared_ptr<MemTrackerLimiter> _rowid_storage_reader_tracker; std::shared_ptr<MemTrackerLimiter> _subcolumns_tree_tracker; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index bbd6bbc9447..8fb7bc96c0c 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -529,6 +529,10 @@ void ExecEnv::init_mem_tracker() { std::make_shared<MemTracker>("IOBufBlockMemory", _details_mem_tracker_set.get()); _segcompaction_mem_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "SegCompaction"); + _point_query_executor_mem_tracker = + MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "PointQueryExecutor"); + _block_compression_mem_tracker = + MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "BlockCompression"); _rowid_storage_reader_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "RowIdStorageReader"); _subcolumns_tree_tracker = diff --git a/be/src/runtime/fold_constant_executor.cpp b/be/src/runtime/fold_constant_executor.cpp index 5ba384baf84..cf7695c86f2 100644 --- a/be/src/runtime/fold_constant_executor.cpp +++ b/be/src/runtime/fold_constant_executor.cpp @@ -160,7 +160,7 @@ Status FoldConstantExecutor::_init(const TQueryGlobals& query_globals, fragment_params.params = params; fragment_params.protocol_version = PaloInternalServiceVersion::V1; _mem_tracker = MemTrackerLimiter::create_shared( - MemTrackerLimiter::Type::SCHEMA_CHANGE, + MemTrackerLimiter::Type::OTHER, fmt::format("FoldConstant:query_id={}", print_id(_query_id))); _runtime_state = RuntimeState::create_unique(fragment_params.params, query_options, query_globals, diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 5617861a719..829440f339e 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -289,6 +289,7 @@ std::string FragmentMgr::to_http_path(const std::string& file_name) { Status FragmentMgr::trigger_pipeline_context_report( const ReportStatusRequest req, std::shared_ptr<pipeline::PipelineFragmentContext>&& ctx) { return _async_report_thread_pool->submit_func([this, req, ctx]() { + SCOPED_ATTACH_TASK(ctx->get_query_ctx()->query_mem_tracker); coordinator_callback(req); if (!req.done) { ctx->refresh_next_report_time(); diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index 494645d56b5..c1b5879dd11 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -39,6 +39,7 @@ #include "util/perf_counters.h" #include "util/pretty_printer.h" #include "util/runtime_profile.h" +#include "util/stack_util.h" namespace doris { @@ -110,7 +111,7 @@ std::shared_ptr<MemTrackerLimiter> MemTrackerLimiter::create_shared(MemTrackerLi MemTrackerLimiter::~MemTrackerLimiter() { consume(_untracked_mem); static std::string mem_tracker_inaccurate_msg = - ", mem tracker not equal to 0 when mem tracker destruct, this usually means that " + "mem tracker not equal to 0 when mem tracker destruct, this usually means that " "memory tracking is inaccurate and SCOPED_ATTACH_TASK and " "SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER are not used correctly. " "1. For query and load, memory leaks may have occurred, it is expected that the query " @@ -124,21 +125,90 @@ MemTrackerLimiter::~MemTrackerLimiter() { "4. If you need to " "transfer memory tracking value between two trackers, can use transfer_to."; if (_consumption->current_value() != 0) { - // TODO, expect mem tracker equal to 0 at the task end. - if (doris::config::enable_memory_orphan_check && _type == Type::QUERY) { - LOG(INFO) << "mem tracker label: " << _label - << ", consumption: " << _consumption->current_value() - << ", peak consumption: " << _consumption->peak_value() - << mem_tracker_inaccurate_msg; + // TODO, expect mem tracker equal to 0 at the load/compaction/etc. task end. +#ifndef NDEBUG + if (_type == Type::QUERY) { + std::string err_msg = + fmt::format("mem tracker label: {}, consumption: {}, peak consumption: {}, {}.", + label(), _consumption->current_value(), _consumption->peak_value(), + mem_tracker_inaccurate_msg); + LOG(FATAL) << err_msg << print_address_sanitizers(); } +#endif if (ExecEnv::tracking_memory()) { ExecEnv::GetInstance()->orphan_mem_tracker()->consume(_consumption->current_value()); } _consumption->set(0); +#ifndef NDEBUG + } else if (!_address_sanitizers.empty()) { + LOG(INFO) << "[Address Sanitizer] consumption is 0, but address sanitizers not empty. " + << ", mem tracker label: " << _label + << ", peak consumption: " << _consumption->peak_value() + << print_address_sanitizers(); +#endif } memory_memtrackerlimiter_cnt << -1; } +#ifndef NDEBUG +void MemTrackerLimiter::add_address_sanitizers(void* buf, size_t size) { + if (_type == Type::QUERY) { + std::lock_guard<std::mutex> l(_address_sanitizers_mtx); + auto it = _address_sanitizers.find(buf); + if (it != _address_sanitizers.end()) { + LOG(INFO) << "[Address Sanitizer] memory buf repeat add, mem tracker label: " << _label + << ", consumption: " << _consumption->current_value() + << ", peak consumption: " << _consumption->peak_value() << ", buf: " << buf + << ", size: " << size << ", old buf: " << it->first + << ", old size: " << it->second.size + << ", new stack_trace: " << get_stack_trace(1, "DISABLED") + << ", old stack_trace: " << it->second.stack_trace; + } + + // if alignment not equal to 0, maybe usable_size > size. + AddressSanitizer as = {size, doris::config::enable_address_sanitizers_with_stack_trace + ? get_stack_trace(1, "DISABLED") + : ""}; + _address_sanitizers.emplace(buf, as); + } +} + +void MemTrackerLimiter::remove_address_sanitizers(void* buf, size_t size) { + if (_type == Type::QUERY) { + std::lock_guard<std::mutex> l(_address_sanitizers_mtx); + auto it = _address_sanitizers.find(buf); + if (it != _address_sanitizers.end()) { + if (it->second.size != size) { + LOG(INFO) << "[Address Sanitizer] free memory buf size inaccurate, mem tracker " + "label: " + << _label << ", consumption: " << _consumption->current_value() + << ", peak consumption: " << _consumption->peak_value() + << ", buf: " << buf << ", size: " << size << ", old buf: " << it->first + << ", old size: " << it->second.size + << ", new stack_trace: " << get_stack_trace(1, "DISABLED") + << ", old stack_trace: " << it->second.stack_trace; + } + _address_sanitizers.erase(buf); + } else { + LOG(INFO) << "[Address Sanitizer] memory buf not exist, mem tracker label: " << _label + << ", consumption: " << _consumption->current_value() + << ", peak consumption: " << _consumption->peak_value() << ", buf: " << buf + << ", size: " << size << ", stack_trace: " << get_stack_trace(1, "DISABLED"); + } + } +} + +std::string MemTrackerLimiter::print_address_sanitizers() { + std::lock_guard<std::mutex> l(_address_sanitizers_mtx); + std::string detail = "[Address Sanitizer]:"; + for (const auto& it : _address_sanitizers) { + detail += fmt::format("\n {}, size {}, strack trace: {}", it.first, it.second.size, + it.second.stack_trace); + } + return detail; +} +#endif + MemTracker::Snapshot MemTrackerLimiter::make_snapshot() const { Snapshot snapshot; snapshot.type = type_string(_type); diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index 67c40e1f6c5..e5c5cb1bc03 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -205,6 +205,12 @@ public: // Log the memory usage when memory limit is exceeded. std::string tracker_limit_exceeded_str(); +#ifndef NDEBUG + void add_address_sanitizers(void* buf, size_t size); + void remove_address_sanitizers(void* buf, size_t size); + std::string print_address_sanitizers(); +#endif + std::string debug_string() override { std::stringstream msg; msg << "limit: " << _limit << "; " @@ -245,6 +251,16 @@ private: // Avoid frequent printing. bool _enable_print_log_usage = false; static std::atomic<bool> _enable_print_log_process_usage; + +#ifndef NDEBUG + struct AddressSanitizer { + size_t size; + std::string stack_trace; + }; + + std::mutex _address_sanitizers_mtx; + std::unordered_map<void*, AddressSanitizer> _address_sanitizers; +#endif }; inline int64_t MemTrackerLimiter::add_untracked_mem(int64_t bytes) { diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index d1aede848ab..3c0139a02d7 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -549,7 +549,7 @@ public: // must call create_thread_local_if_not_exits() before use thread_context(). #define CONSUME_THREAD_MEM_TRACKER(size) \ do { \ - if (doris::use_mem_hook || size == 0) { \ + if (size == 0 || doris::use_mem_hook) { \ break; \ } \ if (doris::pthread_context_ptr_init) { \ diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index 0f9fd47948e..96785c9fd1a 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -102,7 +102,7 @@ void _ingest_binlog(IngestBinlogArg* arg) { const auto& local_tablet_uid = local_tablet->tablet_uid(); std::shared_ptr<MemTrackerLimiter> mem_tracker = MemTrackerLimiter::create_shared( - MemTrackerLimiter::Type::SCHEMA_CHANGE, fmt::format("IngestBinlog#TxnId={}", txn_id)); + MemTrackerLimiter::Type::OTHER, fmt::format("IngestBinlog#TxnId={}", txn_id)); SCOPED_ATTACH_TASK(mem_tracker); auto& request = arg->request; diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 13faed18e61..b2b5841618d 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -784,7 +784,7 @@ void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* c const TFileScanRangeParams& params = file_scan_range.params; std::shared_ptr<MemTrackerLimiter> mem_tracker = MemTrackerLimiter::create_shared( - MemTrackerLimiter::Type::SCHEMA_CHANGE, + MemTrackerLimiter::Type::OTHER, fmt::format("{}#{}", params.format_type, params.file_type)); SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker); diff --git a/be/src/service/point_query_executor.cpp b/be/src/service/point_query_executor.cpp index c89bc52115a..4e9295ed53d 100644 --- a/be/src/service/point_query_executor.cpp +++ b/be/src/service/point_query_executor.cpp @@ -37,6 +37,7 @@ #include "olap/tablet_schema.h" #include "runtime/exec_env.h" #include "runtime/runtime_state.h" +#include "runtime/thread_context.h" #include "util/key_util.h" #include "util/runtime_profile.h" #include "util/simd/bits.h" @@ -166,7 +167,8 @@ void RowCache::erase(const RowCacheKey& key) { } PointQueryExecutor::~PointQueryExecutor() { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->point_query_executor_mem_tracker()); _tablet.reset(); _reusable.reset(); _result_block.reset(); @@ -180,10 +182,7 @@ Status PointQueryExecutor::init(const PTabletKeyLookupRequest* request, // using cache __int128_t uuid = static_cast<__int128_t>(request->uuid().uuid_high()) << 64 | request->uuid().uuid_low(); - _mem_tracker = MemTrackerLimiter::create_shared( - MemTrackerLimiter::Type::QUERY, - fmt::format("PointQueryExecutor:{}#{}", uuid, request->tablet_id())); - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker); + SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->point_query_executor_mem_tracker()); auto cache_handle = LookupConnectionCache::instance()->get(uuid); _binary_row_format = request->is_binary_row(); _tablet = StorageEngine::instance()->tablet_manager()->get_tablet(request->tablet_id()); @@ -234,7 +233,7 @@ Status PointQueryExecutor::init(const PTabletKeyLookupRequest* request, } Status PointQueryExecutor::lookup_up() { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker); + SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->point_query_executor_mem_tracker()); RETURN_IF_ERROR(_lookup_row_key()); RETURN_IF_ERROR(_lookup_row_data()); RETURN_IF_ERROR(_output_data()); diff --git a/be/src/service/point_query_executor.h b/be/src/service/point_query_executor.h index 565a585d322..e168ef16ad7 100644 --- a/be/src/service/point_query_executor.h +++ b/be/src/service/point_query_executor.h @@ -323,7 +323,6 @@ private: std::vector<RowReadContext> _row_read_ctxs; std::shared_ptr<Reusable> _reusable; std::unique_ptr<vectorized::Block> _result_block; - std::shared_ptr<MemTrackerLimiter> _mem_tracker; Metrics _profile_metrics; bool _binary_row_format = false; }; diff --git a/be/src/util/block_compression.cpp b/be/src/util/block_compression.cpp index 10f975451d3..e71a8901421 100644 --- a/be/src/util/block_compression.cpp +++ b/be/src/util/block_compression.cpp @@ -53,6 +53,7 @@ #include "gutil/endian.h" #include "gutil/strings/substitute.h" #include "orc/OrcFile.hh" +#include "runtime/thread_context.h" #include "util/bit_util.h" #include "util/defer_op.h" #include "util/faststring.h" @@ -99,10 +100,16 @@ private: ENABLE_FACTORY_CREATOR(Context); public: - Context() : ctx(nullptr) { buffer = std::make_unique<faststring>(); } + Context() : ctx(nullptr) { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); + buffer = std::make_unique<faststring>(); + } LZ4_stream_t* ctx; std::unique_ptr<faststring> buffer; ~Context() { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); if (ctx) { LZ4_freeStream(ctx); } @@ -115,7 +122,11 @@ public: static Lz4BlockCompression s_instance; return &s_instance; } - ~Lz4BlockCompression() { _ctx_pool.clear(); } + ~Lz4BlockCompression() { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); + _ctx_pool.clear(); + } Status compress(const Slice& input, faststring* output) override { if (input.size > INT_MAX) { @@ -144,7 +155,13 @@ public: compressed_buf.size = max_len; } else { // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE - context->buffer->resize(max_len); + { + // context->buffer is resuable between queries, should accouting to + // global tracker. + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); + context->buffer->resize(max_len); + } compressed_buf.data = reinterpret_cast<char*>(context->buffer->data()); compressed_buf.size = max_len; } @@ -253,10 +270,16 @@ private: ENABLE_FACTORY_CREATOR(CContext); public: - CContext() : ctx(nullptr) { buffer = std::make_unique<faststring>(); } + CContext() : ctx(nullptr) { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); + buffer = std::make_unique<faststring>(); + } LZ4F_compressionContext_t ctx; std::unique_ptr<faststring> buffer; ~CContext() { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); if (ctx) { LZ4F_freeCompressionContext(ctx); } @@ -282,6 +305,8 @@ public: return &s_instance; } ~Lz4fBlockCompression() { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); _ctx_c_pool.clear(); _ctx_d_pool.clear(); } @@ -326,8 +351,12 @@ private: compressed_buf.data = reinterpret_cast<char*>(output->data()); compressed_buf.size = max_len; } else { - // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE - context->buffer->resize(max_len); + { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); + // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE + context->buffer->resize(max_len); + } compressed_buf.data = reinterpret_cast<char*>(context->buffer->data()); compressed_buf.size = max_len; } @@ -482,10 +511,16 @@ private: ENABLE_FACTORY_CREATOR(Context); public: - Context() : ctx(nullptr) { buffer = std::make_unique<faststring>(); } + Context() : ctx(nullptr) { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); + buffer = std::make_unique<faststring>(); + } LZ4_streamHC_t* ctx; std::unique_ptr<faststring> buffer; ~Context() { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); if (ctx) { LZ4_freeStreamHC(ctx); } @@ -498,7 +533,11 @@ public: static Lz4HCBlockCompression s_instance; return &s_instance; } - ~Lz4HCBlockCompression() { _ctx_pool.clear(); } + ~Lz4HCBlockCompression() { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); + _ctx_pool.clear(); + } Status compress(const Slice& input, faststring* output) override { std::unique_ptr<Context> context; @@ -519,7 +558,12 @@ public: compressed_buf.data = reinterpret_cast<char*>(output->data()); compressed_buf.size = max_len; } else { - context->buffer->resize(max_len); + { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); + // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE + context->buffer->resize(max_len); + } compressed_buf.data = reinterpret_cast<char*>(context->buffer->data()); compressed_buf.size = max_len; } @@ -781,10 +825,16 @@ private: ENABLE_FACTORY_CREATOR(CContext); public: - CContext() : ctx(nullptr) { buffer = std::make_unique<faststring>(); } + CContext() : ctx(nullptr) { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); + buffer = std::make_unique<faststring>(); + } ZSTD_CCtx* ctx; std::unique_ptr<faststring> buffer; ~CContext() { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); if (ctx) { ZSTD_freeCCtx(ctx); } @@ -810,6 +860,8 @@ public: return &s_instance; } ~ZstdBlockCompression() { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); _ctx_c_pool.clear(); _ctx_d_pool.clear(); } @@ -843,8 +895,12 @@ public: compressed_buf.data = reinterpret_cast<char*>(output->data()); compressed_buf.size = max_len; } else { - // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE - context->buffer->resize(max_len); + { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); + // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE + context->buffer->resize(max_len); + } compressed_buf.data = reinterpret_cast<char*>(context->buffer->data()); compressed_buf.size = max_len; } diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp index 34d71d8df3e..480734dcade 100644 --- a/be/src/vec/common/allocator.cpp +++ b/be/src/vec/common/allocator.cpp @@ -210,6 +210,30 @@ void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::throw_b throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err); } +#ifndef NDEBUG +template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator> +void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::add_address_sanitizers( + void* buf, size_t size) const { +#ifdef BE_TEST + if (!doris::ExecEnv::ready()) { + return; + } +#endif + doris::thread_context()->thread_mem_tracker()->add_address_sanitizers(buf, size); +} + +template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator> +void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::remove_address_sanitizers( + void* buf, size_t size) const { +#ifdef BE_TEST + if (!doris::ExecEnv::ready()) { + return; + } +#endif + doris::thread_context()->thread_mem_tracker()->remove_address_sanitizers(buf, size); +} +#endif + template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator> void* Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::alloc(size_t size, size_t alignment) { diff --git a/be/src/vec/common/allocator.h b/be/src/vec/common/allocator.h index 83cb6eddb7d..576ce86d928 100644 --- a/be/src/vec/common/allocator.h +++ b/be/src/vec/common/allocator.h @@ -231,6 +231,10 @@ public: void consume_memory(size_t size) const; void release_memory(size_t size) const; void throw_bad_alloc(const std::string& err) const; +#ifndef NDEBUG + void add_address_sanitizers(void* buf, size_t size) const; + void remove_address_sanitizers(void* buf, size_t size) const; +#endif void* alloc(size_t size, size_t alignment = 0); void* realloc(void* buf, size_t old_size, size_t new_size, size_t alignment = 0); @@ -238,6 +242,7 @@ public: /// Allocate memory range. void* alloc_impl(size_t size, size_t alignment = 0) { memory_check(size); + // consume memory in tracker before alloc, similar to early declaration. consume_memory(size); void* buf; size_t record_size = size; @@ -273,6 +278,9 @@ public: if constexpr (MemoryAllocator::need_record_actual_size()) { record_size = MemoryAllocator::allocated_size(buf); } +#ifndef NDEBUG + add_address_sanitizers(buf, size); +#endif } else { buf = nullptr; int res = MemoryAllocator::posix_memalign(&buf, alignment, size); @@ -282,6 +290,9 @@ public: throw_bad_alloc( fmt::format("Cannot allocate memory (posix_memalign) {}.", size)); } +#ifndef NDEBUG + add_address_sanitizers(buf, size); +#endif if constexpr (clear_memory) memset(buf, 0, size); @@ -303,6 +314,9 @@ public: throw_bad_alloc(fmt::format("Allocator: Cannot munmap {}.", size)); } } else { +#ifndef NDEBUG + remove_address_sanitizers(buf, size); +#endif MemoryAllocator::free(buf); } release_memory(size); @@ -326,6 +340,9 @@ public: if (!use_mmap || (old_size < doris::config::mmap_threshold && new_size < doris::config::mmap_threshold && alignment <= MALLOC_MIN_ALIGNMENT)) { +#ifndef NDEBUG + remove_address_sanitizers(buf, old_size); +#endif /// Resize malloc'd memory region with no special alignment requirement. void* new_buf = MemoryAllocator::realloc(buf, new_size); if (nullptr == new_buf) { @@ -333,6 +350,10 @@ public: throw_bad_alloc(fmt::format("Allocator: Cannot realloc from {} to {}.", old_size, new_size)); } +#ifndef NDEBUG + add_address_sanitizers( + new_buf, new_size); // usually, buf addr = new_buf addr, asan maybe not equal. +#endif buf = new_buf; if constexpr (clear_memory) @@ -362,6 +383,10 @@ public: // Big allocs that requires a copy. void* new_buf = alloc(new_size, alignment); memcpy(new_buf, buf, std::min(old_size, new_size)); +#ifndef NDEBUG + add_address_sanitizers(new_buf, new_size); + remove_address_sanitizers(buf, old_size); +#endif free(buf, old_size); buf = new_buf; } diff --git a/be/src/vec/common/pod_array_fwd.h b/be/src/vec/common/pod_array_fwd.h index ff00b312575..e1a428eda9d 100644 --- a/be/src/vec/common/pod_array_fwd.h +++ b/be/src/vec/common/pod_array_fwd.h @@ -36,9 +36,12 @@ template <typename T, size_t initial_bytes = 4096, typename TAllocator = Allocat size_t pad_right_ = 0, size_t pad_left_ = 0> class PODArray; -/** For columns. Padding is enough to read and write xmm-register at the address of the last element. */ +/** For columns. Padding is enough to read and write xmm-register at the address of the last element. + * TODO, pad_right is temporarily changed from 15 to 16, will waste 1 bytes, + * can rollback after fix wrong reinterpret_cast column and PODArray swap. + */ template <typename T, size_t initial_bytes = 4096, typename TAllocator = Allocator<false>> -using PaddedPODArray = PODArray<T, initial_bytes, TAllocator, 15, 16>; +using PaddedPODArray = PODArray<T, initial_bytes, TAllocator, 16, 16>; /** A helper for declaring PODArray that uses inline memory. * The initial size is set to use all the inline bytes, since using less would diff --git a/bin/run-fs-benchmark.sh b/bin/run-fs-benchmark.sh index 3076a968761..f4edd4117d0 100755 --- a/bin/run-fs-benchmark.sh +++ b/bin/run-fs-benchmark.sh @@ -189,7 +189,7 @@ else fi ## set asan and ubsan env to generate core file -export ASAN_OPTIONS=symbolize=1:abort_on_error=1:disable_coredump=0:unmap_shadow_on_exit=1:detect_container_overflow=0 +export ASAN_OPTIONS=symbolize=1:abort_on_error=1:disable_coredump=0:unmap_shadow_on_exit=1:detect_container_overflow=0:check_malloc_usable_size=0 export UBSAN_OPTIONS=print_stacktrace=1 ## set TCMALLOC_HEAP_LIMIT_MB to limit memory used by tcmalloc diff --git a/bin/start_be.sh b/bin/start_be.sh index d9800e1b047..396e75a5d48 100755 --- a/bin/start_be.sh +++ b/bin/start_be.sh @@ -267,7 +267,8 @@ fi export AWS_MAX_ATTEMPTS=2 ## set asan and ubsan env to generate core file -export ASAN_OPTIONS=symbolize=1:abort_on_error=1:disable_coredump=0:unmap_shadow_on_exit=1:detect_container_overflow=0 +## detect_container_overflow=0, https://github.com/google/sanitizers/issues/193 +export ASAN_OPTIONS=symbolize=1:abort_on_error=1:disable_coredump=0:unmap_shadow_on_exit=1:detect_container_overflow=0:check_malloc_usable_size=0 export UBSAN_OPTIONS=print_stacktrace=1 ## set TCMALLOC_HEAP_LIMIT_MB to limit memory used by tcmalloc diff --git a/run-be-ut.sh b/run-be-ut.sh index 346d5cd1ecb..f9fcf9e9d53 100755 --- a/run-be-ut.sh +++ b/run-be-ut.sh @@ -405,7 +405,8 @@ export ORC_EXAMPLE_DIR="${DORIS_HOME}/be/src/apache-orc/examples" # set asan and ubsan env to generate core file export DORIS_HOME="${DORIS_TEST_BINARY_DIR}/" -export ASAN_OPTIONS=symbolize=1:abort_on_error=1:disable_coredump=0:unmap_shadow_on_exit=1:detect_container_overflow=0 +## detect_container_overflow=0, https://github.com/google/sanitizers/issues/193 +export ASAN_OPTIONS=symbolize=1:abort_on_error=1:disable_coredump=0:unmap_shadow_on_exit=1:detect_container_overflow=0:check_malloc_usable_size=0 export UBSAN_OPTIONS=print_stacktrace=1 export JAVA_OPTS="-Xmx1024m -DlogPath=${DORIS_HOME}/log/jni.log -Xloggc:${DORIS_HOME}/log/be.gc.log.${CUR_DATE} -Dsun.java.command=DorisBE -XX:-CriticalJNINatives -DJDBC_MIN_POOL=1 -DJDBC_MAX_POOL=100 -DJDBC_MAX_IDLE_TIME=300000" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org