This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new f16615a1fc1 [branch-2.1](memory) Allocator support address sanitizers 
(#40836)
f16615a1fc1 is described below

commit f16615a1fc1fbb80c3155299bbac3001f8e68811
Author: Xinyi Zou <zouxiny...@gmail.com>
AuthorDate: Sat Sep 14 12:12:44 2024 +0800

    [branch-2.1](memory) Allocator support address sanitizers (#40836)
    
    pick
    #33396
    #33862
    #33853
    #33732
    #33841
    #33933
    #34901
    #35014
    
    ---------
    
    Co-authored-by: yiguolei <676222...@qq.com>
    Co-authored-by: yiguolei <yiguo...@gmail.com>
---
 be/src/common/config.cpp                         |  1 +
 be/src/common/config.h                           |  1 +
 be/src/olap/task/engine_alter_tablet_task.cpp    |  2 +-
 be/src/olap/task/engine_publish_version_task.cpp |  4 +-
 be/src/olap/task/engine_publish_version_task.h   |  2 +-
 be/src/runtime/exec_env.h                        | 10 +++
 be/src/runtime/exec_env_init.cpp                 |  4 ++
 be/src/runtime/fold_constant_executor.cpp        |  2 +-
 be/src/runtime/fragment_mgr.cpp                  |  1 +
 be/src/runtime/memory/mem_tracker_limiter.cpp    | 84 ++++++++++++++++++++++--
 be/src/runtime/memory/mem_tracker_limiter.h      | 16 +++++
 be/src/runtime/thread_context.h                  |  2 +-
 be/src/service/backend_service.cpp               |  2 +-
 be/src/service/internal_service.cpp              |  2 +-
 be/src/service/point_query_executor.cpp          | 11 ++--
 be/src/service/point_query_executor.h            |  1 -
 be/src/util/block_compression.cpp                | 80 ++++++++++++++++++----
 be/src/vec/common/allocator.cpp                  | 24 +++++++
 be/src/vec/common/allocator.h                    | 25 +++++++
 be/src/vec/common/pod_array_fwd.h                |  7 +-
 bin/run-fs-benchmark.sh                          |  2 +-
 bin/start_be.sh                                  |  3 +-
 run-be-ut.sh                                     |  3 +-
 23 files changed, 250 insertions(+), 39 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 1bad8b7e4cc..543492b7155 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1106,6 +1106,7 @@ DEFINE_mString(kerberos_krb5_conf_path, "/etc/krb5.conf");
 
 DEFINE_mString(get_stack_trace_tool, "libunwind");
 DEFINE_mString(dwarf_location_info_mode, "FAST");
+DEFINE_mBool(enable_address_sanitizers_with_stack_trace, "true");
 
 // the ratio of _prefetch_size/_batch_size in AutoIncIDBuffer
 DEFINE_mInt64(auto_inc_prefetch_size_ratio, "10");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 45279290c0c..1afd10a34a7 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1158,6 +1158,7 @@ DECLARE_mString(kerberos_krb5_conf_path);
 
 // Values include `none`, `glog`, `boost`, `glibc`, `libunwind`
 DECLARE_mString(get_stack_trace_tool);
+DECLARE_mBool(enable_address_sanitizers_with_stack_trace);
 
 // DISABLED: Don't resolve location info.
 // FAST: Perform CU lookup using .debug_aranges (might be incomplete).
diff --git a/be/src/olap/task/engine_alter_tablet_task.cpp 
b/be/src/olap/task/engine_alter_tablet_task.cpp
index f73a4f8a82d..8a8d27b7657 100644
--- a/be/src/olap/task/engine_alter_tablet_task.cpp
+++ b/be/src/olap/task/engine_alter_tablet_task.cpp
@@ -39,7 +39,7 @@ EngineAlterTabletTask::EngineAlterTabletTask(const 
TAlterTabletReqV2& request)
                              ->get_storage_engine()
                              
->memory_limitation_bytes_per_thread_for_schema_change();
     _mem_tracker = MemTrackerLimiter::create_shared(
-            MemTrackerLimiter::Type::SCHEMA_CHANGE,
+            MemTrackerLimiter::Type::OTHER,
             fmt::format("EngineAlterTabletTask#baseTabletId={}:newTabletId={}",
                         std::to_string(_alter_tablet_req.base_tablet_id),
                         std::to_string(_alter_tablet_req.new_tablet_id)),
diff --git a/be/src/olap/task/engine_publish_version_task.cpp 
b/be/src/olap/task/engine_publish_version_task.cpp
index 6108e81bae3..9601cad88d1 100644
--- a/be/src/olap/task/engine_publish_version_task.cpp
+++ b/be/src/olap/task/engine_publish_version_task.cpp
@@ -81,7 +81,7 @@ EnginePublishVersionTask::EnginePublishVersionTask(
           _succ_tablets(succ_tablets),
           _discontinuous_version_tablets(discontinuous_version_tablets),
           _table_id_to_num_delta_rows(table_id_to_num_delta_rows) {
-    _mem_tracker = 
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::SCHEMA_CHANGE,
+    _mem_tracker = 
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER,
                                                     "TabletPublishTxnTask");
 }
 
@@ -370,7 +370,7 @@ 
TabletPublishTxnTask::TabletPublishTxnTask(EnginePublishVersionTask* engine_task
           _transaction_id(transaction_id),
           _version(version),
           _tablet_info(tablet_info),
-          
_mem_tracker(MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::SCHEMA_CHANGE,
+          
_mem_tracker(MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER,
                                                         
"TabletPublishTxnTask")) {
     _stats.submit_time_us = MonotonicMicros();
 }
diff --git a/be/src/olap/task/engine_publish_version_task.h 
b/be/src/olap/task/engine_publish_version_task.h
index 2b8b5540b22..cf03c0537b7 100644
--- a/be/src/olap/task/engine_publish_version_task.h
+++ b/be/src/olap/task/engine_publish_version_task.h
@@ -119,7 +119,7 @@ public:
               _partition_id(partition_id),
               _transaction_id(transaction_id),
               _version(version),
-              
_mem_tracker(MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::SCHEMA_CHANGE,
+              
_mem_tracker(MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER,
                                                             
"AsyncTabletPublishTask")) {
         _stats.submit_time_us = MonotonicMicros();
     }
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 4b2478ccf99..eea44d1ba8d 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -185,6 +185,12 @@ public:
     std::shared_ptr<MemTrackerLimiter> segcompaction_mem_tracker() {
         return _segcompaction_mem_tracker;
     }
+    std::shared_ptr<MemTrackerLimiter> point_query_executor_mem_tracker() {
+        return _point_query_executor_mem_tracker;
+    }
+    std::shared_ptr<MemTrackerLimiter> block_compression_mem_tracker() {
+        return _block_compression_mem_tracker;
+    }
     std::shared_ptr<MemTrackerLimiter> rowid_storage_reader_tracker() {
         return _rowid_storage_reader_tracker;
     }
@@ -363,6 +369,10 @@ private:
     // Count the memory consumption of segment compaction tasks.
     std::shared_ptr<MemTrackerLimiter> _segcompaction_mem_tracker;
 
+    // Tracking memory may be shared between multiple queries.
+    std::shared_ptr<MemTrackerLimiter> _point_query_executor_mem_tracker;
+    std::shared_ptr<MemTrackerLimiter> _block_compression_mem_tracker;
+
     // TODO, looking forward to more accurate tracking.
     std::shared_ptr<MemTrackerLimiter> _rowid_storage_reader_tracker;
     std::shared_ptr<MemTrackerLimiter> _subcolumns_tree_tracker;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index bbd6bbc9447..8fb7bc96c0c 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -529,6 +529,10 @@ void ExecEnv::init_mem_tracker() {
             std::make_shared<MemTracker>("IOBufBlockMemory", 
_details_mem_tracker_set.get());
     _segcompaction_mem_tracker =
             MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, 
"SegCompaction");
+    _point_query_executor_mem_tracker =
+            MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, 
"PointQueryExecutor");
+    _block_compression_mem_tracker =
+            MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, 
"BlockCompression");
     _rowid_storage_reader_tracker =
             MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, 
"RowIdStorageReader");
     _subcolumns_tree_tracker =
diff --git a/be/src/runtime/fold_constant_executor.cpp 
b/be/src/runtime/fold_constant_executor.cpp
index 5ba384baf84..cf7695c86f2 100644
--- a/be/src/runtime/fold_constant_executor.cpp
+++ b/be/src/runtime/fold_constant_executor.cpp
@@ -160,7 +160,7 @@ Status FoldConstantExecutor::_init(const TQueryGlobals& 
query_globals,
     fragment_params.params = params;
     fragment_params.protocol_version = PaloInternalServiceVersion::V1;
     _mem_tracker = MemTrackerLimiter::create_shared(
-            MemTrackerLimiter::Type::SCHEMA_CHANGE,
+            MemTrackerLimiter::Type::OTHER,
             fmt::format("FoldConstant:query_id={}", print_id(_query_id)));
     _runtime_state =
             RuntimeState::create_unique(fragment_params.params, query_options, 
query_globals,
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 5617861a719..829440f339e 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -289,6 +289,7 @@ std::string FragmentMgr::to_http_path(const std::string& 
file_name) {
 Status FragmentMgr::trigger_pipeline_context_report(
         const ReportStatusRequest req, 
std::shared_ptr<pipeline::PipelineFragmentContext>&& ctx) {
     return _async_report_thread_pool->submit_func([this, req, ctx]() {
+        SCOPED_ATTACH_TASK(ctx->get_query_ctx()->query_mem_tracker);
         coordinator_callback(req);
         if (!req.done) {
             ctx->refresh_next_report_time();
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp 
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 494645d56b5..c1b5879dd11 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -39,6 +39,7 @@
 #include "util/perf_counters.h"
 #include "util/pretty_printer.h"
 #include "util/runtime_profile.h"
+#include "util/stack_util.h"
 
 namespace doris {
 
@@ -110,7 +111,7 @@ std::shared_ptr<MemTrackerLimiter> 
MemTrackerLimiter::create_shared(MemTrackerLi
 MemTrackerLimiter::~MemTrackerLimiter() {
     consume(_untracked_mem);
     static std::string mem_tracker_inaccurate_msg =
-            ", mem tracker not equal to 0 when mem tracker destruct, this 
usually means that "
+            "mem tracker not equal to 0 when mem tracker destruct, this 
usually means that "
             "memory tracking is inaccurate and SCOPED_ATTACH_TASK and "
             "SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER are not used correctly. "
             "1. For query and load, memory leaks may have occurred, it is 
expected that the query "
@@ -124,21 +125,90 @@ MemTrackerLimiter::~MemTrackerLimiter() {
             "4. If you need to "
             "transfer memory tracking value between two trackers, can use 
transfer_to.";
     if (_consumption->current_value() != 0) {
-        // TODO, expect mem tracker equal to 0 at the task end.
-        if (doris::config::enable_memory_orphan_check && _type == Type::QUERY) 
{
-            LOG(INFO) << "mem tracker label: " << _label
-                      << ", consumption: " << _consumption->current_value()
-                      << ", peak consumption: " << _consumption->peak_value()
-                      << mem_tracker_inaccurate_msg;
+        // TODO, expect mem tracker equal to 0 at the load/compaction/etc. 
task end.
+#ifndef NDEBUG
+        if (_type == Type::QUERY) {
+            std::string err_msg =
+                    fmt::format("mem tracker label: {}, consumption: {}, peak 
consumption: {}, {}.",
+                                label(), _consumption->current_value(), 
_consumption->peak_value(),
+                                mem_tracker_inaccurate_msg);
+            LOG(FATAL) << err_msg << print_address_sanitizers();
         }
+#endif
         if (ExecEnv::tracking_memory()) {
             
ExecEnv::GetInstance()->orphan_mem_tracker()->consume(_consumption->current_value());
         }
         _consumption->set(0);
+#ifndef NDEBUG
+    } else if (!_address_sanitizers.empty()) {
+        LOG(INFO) << "[Address Sanitizer] consumption is 0, but address 
sanitizers not empty. "
+                  << ", mem tracker label: " << _label
+                  << ", peak consumption: " << _consumption->peak_value()
+                  << print_address_sanitizers();
+#endif
     }
     memory_memtrackerlimiter_cnt << -1;
 }
 
+#ifndef NDEBUG
+void MemTrackerLimiter::add_address_sanitizers(void* buf, size_t size) {
+    if (_type == Type::QUERY) {
+        std::lock_guard<std::mutex> l(_address_sanitizers_mtx);
+        auto it = _address_sanitizers.find(buf);
+        if (it != _address_sanitizers.end()) {
+            LOG(INFO) << "[Address Sanitizer] memory buf repeat add, mem 
tracker label: " << _label
+                      << ", consumption: " << _consumption->current_value()
+                      << ", peak consumption: " << _consumption->peak_value() 
<< ", buf: " << buf
+                      << ", size: " << size << ", old buf: " << it->first
+                      << ", old size: " << it->second.size
+                      << ", new stack_trace: " << get_stack_trace(1, 
"DISABLED")
+                      << ", old stack_trace: " << it->second.stack_trace;
+        }
+
+        // if alignment not equal to 0, maybe usable_size > size.
+        AddressSanitizer as = {size, 
doris::config::enable_address_sanitizers_with_stack_trace
+                                             ? get_stack_trace(1, "DISABLED")
+                                             : ""};
+        _address_sanitizers.emplace(buf, as);
+    }
+}
+
+void MemTrackerLimiter::remove_address_sanitizers(void* buf, size_t size) {
+    if (_type == Type::QUERY) {
+        std::lock_guard<std::mutex> l(_address_sanitizers_mtx);
+        auto it = _address_sanitizers.find(buf);
+        if (it != _address_sanitizers.end()) {
+            if (it->second.size != size) {
+                LOG(INFO) << "[Address Sanitizer] free memory buf size 
inaccurate, mem tracker "
+                             "label: "
+                          << _label << ", consumption: " << 
_consumption->current_value()
+                          << ", peak consumption: " << 
_consumption->peak_value()
+                          << ", buf: " << buf << ", size: " << size << ", old 
buf: " << it->first
+                          << ", old size: " << it->second.size
+                          << ", new stack_trace: " << get_stack_trace(1, 
"DISABLED")
+                          << ", old stack_trace: " << it->second.stack_trace;
+            }
+            _address_sanitizers.erase(buf);
+        } else {
+            LOG(INFO) << "[Address Sanitizer] memory buf not exist, mem 
tracker label: " << _label
+                      << ", consumption: " << _consumption->current_value()
+                      << ", peak consumption: " << _consumption->peak_value() 
<< ", buf: " << buf
+                      << ", size: " << size << ", stack_trace: " << 
get_stack_trace(1, "DISABLED");
+        }
+    }
+}
+
+std::string MemTrackerLimiter::print_address_sanitizers() {
+    std::lock_guard<std::mutex> l(_address_sanitizers_mtx);
+    std::string detail = "[Address Sanitizer]:";
+    for (const auto& it : _address_sanitizers) {
+        detail += fmt::format("\n    {}, size {}, strack trace: {}", it.first, 
it.second.size,
+                              it.second.stack_trace);
+    }
+    return detail;
+}
+#endif
+
 MemTracker::Snapshot MemTrackerLimiter::make_snapshot() const {
     Snapshot snapshot;
     snapshot.type = type_string(_type);
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h 
b/be/src/runtime/memory/mem_tracker_limiter.h
index 67c40e1f6c5..e5c5cb1bc03 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -205,6 +205,12 @@ public:
     // Log the memory usage when memory limit is exceeded.
     std::string tracker_limit_exceeded_str();
 
+#ifndef NDEBUG
+    void add_address_sanitizers(void* buf, size_t size);
+    void remove_address_sanitizers(void* buf, size_t size);
+    std::string print_address_sanitizers();
+#endif
+
     std::string debug_string() override {
         std::stringstream msg;
         msg << "limit: " << _limit << "; "
@@ -245,6 +251,16 @@ private:
     // Avoid frequent printing.
     bool _enable_print_log_usage = false;
     static std::atomic<bool> _enable_print_log_process_usage;
+
+#ifndef NDEBUG
+    struct AddressSanitizer {
+        size_t size;
+        std::string stack_trace;
+    };
+
+    std::mutex _address_sanitizers_mtx;
+    std::unordered_map<void*, AddressSanitizer> _address_sanitizers;
+#endif
 };
 
 inline int64_t MemTrackerLimiter::add_untracked_mem(int64_t bytes) {
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index d1aede848ab..3c0139a02d7 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -549,7 +549,7 @@ public:
 // must call create_thread_local_if_not_exits() before use thread_context().
 #define CONSUME_THREAD_MEM_TRACKER(size)                                       
                    \
     do {                                                                       
                    \
-        if (doris::use_mem_hook || size == 0) {                                
                    \
+        if (size == 0 || doris::use_mem_hook) {                                
                    \
             break;                                                             
                    \
         }                                                                      
                    \
         if (doris::pthread_context_ptr_init) {                                 
                    \
diff --git a/be/src/service/backend_service.cpp 
b/be/src/service/backend_service.cpp
index 0f9fd47948e..96785c9fd1a 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -102,7 +102,7 @@ void _ingest_binlog(IngestBinlogArg* arg) {
     const auto& local_tablet_uid = local_tablet->tablet_uid();
 
     std::shared_ptr<MemTrackerLimiter> mem_tracker = 
MemTrackerLimiter::create_shared(
-            MemTrackerLimiter::Type::SCHEMA_CHANGE, 
fmt::format("IngestBinlog#TxnId={}", txn_id));
+            MemTrackerLimiter::Type::OTHER, 
fmt::format("IngestBinlog#TxnId={}", txn_id));
     SCOPED_ATTACH_TASK(mem_tracker);
 
     auto& request = arg->request;
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 13faed18e61..b2b5841618d 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -784,7 +784,7 @@ void 
PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* c
         const TFileScanRangeParams& params = file_scan_range.params;
 
         std::shared_ptr<MemTrackerLimiter> mem_tracker = 
MemTrackerLimiter::create_shared(
-                MemTrackerLimiter::Type::SCHEMA_CHANGE,
+                MemTrackerLimiter::Type::OTHER,
                 fmt::format("{}#{}", params.format_type, params.file_type));
         SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker);
 
diff --git a/be/src/service/point_query_executor.cpp 
b/be/src/service/point_query_executor.cpp
index c89bc52115a..4e9295ed53d 100644
--- a/be/src/service/point_query_executor.cpp
+++ b/be/src/service/point_query_executor.cpp
@@ -37,6 +37,7 @@
 #include "olap/tablet_schema.h"
 #include "runtime/exec_env.h"
 #include "runtime/runtime_state.h"
+#include "runtime/thread_context.h"
 #include "util/key_util.h"
 #include "util/runtime_profile.h"
 #include "util/simd/bits.h"
@@ -166,7 +167,8 @@ void RowCache::erase(const RowCacheKey& key) {
 }
 
 PointQueryExecutor::~PointQueryExecutor() {
-    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker);
+    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+            ExecEnv::GetInstance()->point_query_executor_mem_tracker());
     _tablet.reset();
     _reusable.reset();
     _result_block.reset();
@@ -180,10 +182,7 @@ Status PointQueryExecutor::init(const 
PTabletKeyLookupRequest* request,
     // using cache
     __int128_t uuid =
             static_cast<__int128_t>(request->uuid().uuid_high()) << 64 | 
request->uuid().uuid_low();
-    _mem_tracker = MemTrackerLimiter::create_shared(
-            MemTrackerLimiter::Type::QUERY,
-            fmt::format("PointQueryExecutor:{}#{}", uuid, 
request->tablet_id()));
-    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker);
+    
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->point_query_executor_mem_tracker());
     auto cache_handle = LookupConnectionCache::instance()->get(uuid);
     _binary_row_format = request->is_binary_row();
     _tablet = 
StorageEngine::instance()->tablet_manager()->get_tablet(request->tablet_id());
@@ -234,7 +233,7 @@ Status PointQueryExecutor::init(const 
PTabletKeyLookupRequest* request,
 }
 
 Status PointQueryExecutor::lookup_up() {
-    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker);
+    
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->point_query_executor_mem_tracker());
     RETURN_IF_ERROR(_lookup_row_key());
     RETURN_IF_ERROR(_lookup_row_data());
     RETURN_IF_ERROR(_output_data());
diff --git a/be/src/service/point_query_executor.h 
b/be/src/service/point_query_executor.h
index 565a585d322..e168ef16ad7 100644
--- a/be/src/service/point_query_executor.h
+++ b/be/src/service/point_query_executor.h
@@ -323,7 +323,6 @@ private:
     std::vector<RowReadContext> _row_read_ctxs;
     std::shared_ptr<Reusable> _reusable;
     std::unique_ptr<vectorized::Block> _result_block;
-    std::shared_ptr<MemTrackerLimiter> _mem_tracker;
     Metrics _profile_metrics;
     bool _binary_row_format = false;
 };
diff --git a/be/src/util/block_compression.cpp 
b/be/src/util/block_compression.cpp
index 10f975451d3..e71a8901421 100644
--- a/be/src/util/block_compression.cpp
+++ b/be/src/util/block_compression.cpp
@@ -53,6 +53,7 @@
 #include "gutil/endian.h"
 #include "gutil/strings/substitute.h"
 #include "orc/OrcFile.hh"
+#include "runtime/thread_context.h"
 #include "util/bit_util.h"
 #include "util/defer_op.h"
 #include "util/faststring.h"
@@ -99,10 +100,16 @@ private:
         ENABLE_FACTORY_CREATOR(Context);
 
     public:
-        Context() : ctx(nullptr) { buffer = std::make_unique<faststring>(); }
+        Context() : ctx(nullptr) {
+            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                    ExecEnv::GetInstance()->block_compression_mem_tracker());
+            buffer = std::make_unique<faststring>();
+        }
         LZ4_stream_t* ctx;
         std::unique_ptr<faststring> buffer;
         ~Context() {
+            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                    ExecEnv::GetInstance()->block_compression_mem_tracker());
             if (ctx) {
                 LZ4_freeStream(ctx);
             }
@@ -115,7 +122,11 @@ public:
         static Lz4BlockCompression s_instance;
         return &s_instance;
     }
-    ~Lz4BlockCompression() { _ctx_pool.clear(); }
+    ~Lz4BlockCompression() {
+        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                ExecEnv::GetInstance()->block_compression_mem_tracker());
+        _ctx_pool.clear();
+    }
 
     Status compress(const Slice& input, faststring* output) override {
         if (input.size > INT_MAX) {
@@ -144,7 +155,13 @@ public:
                 compressed_buf.size = max_len;
             } else {
                 // reuse context buffer if max_len <= 
MAX_COMPRESSION_BUFFER_FOR_REUSE
-                context->buffer->resize(max_len);
+                {
+                    // context->buffer is resuable between queries, should 
accouting to
+                    // global tracker.
+                    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                            
ExecEnv::GetInstance()->block_compression_mem_tracker());
+                    context->buffer->resize(max_len);
+                }
                 compressed_buf.data = 
reinterpret_cast<char*>(context->buffer->data());
                 compressed_buf.size = max_len;
             }
@@ -253,10 +270,16 @@ private:
         ENABLE_FACTORY_CREATOR(CContext);
 
     public:
-        CContext() : ctx(nullptr) { buffer = std::make_unique<faststring>(); }
+        CContext() : ctx(nullptr) {
+            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                    ExecEnv::GetInstance()->block_compression_mem_tracker());
+            buffer = std::make_unique<faststring>();
+        }
         LZ4F_compressionContext_t ctx;
         std::unique_ptr<faststring> buffer;
         ~CContext() {
+            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                    ExecEnv::GetInstance()->block_compression_mem_tracker());
             if (ctx) {
                 LZ4F_freeCompressionContext(ctx);
             }
@@ -282,6 +305,8 @@ public:
         return &s_instance;
     }
     ~Lz4fBlockCompression() {
+        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                ExecEnv::GetInstance()->block_compression_mem_tracker());
         _ctx_c_pool.clear();
         _ctx_d_pool.clear();
     }
@@ -326,8 +351,12 @@ private:
                 compressed_buf.data = reinterpret_cast<char*>(output->data());
                 compressed_buf.size = max_len;
             } else {
-                // reuse context buffer if max_len <= 
MAX_COMPRESSION_BUFFER_FOR_REUSE
-                context->buffer->resize(max_len);
+                {
+                    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                            
ExecEnv::GetInstance()->block_compression_mem_tracker());
+                    // reuse context buffer if max_len <= 
MAX_COMPRESSION_BUFFER_FOR_REUSE
+                    context->buffer->resize(max_len);
+                }
                 compressed_buf.data = 
reinterpret_cast<char*>(context->buffer->data());
                 compressed_buf.size = max_len;
             }
@@ -482,10 +511,16 @@ private:
         ENABLE_FACTORY_CREATOR(Context);
 
     public:
-        Context() : ctx(nullptr) { buffer = std::make_unique<faststring>(); }
+        Context() : ctx(nullptr) {
+            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                    ExecEnv::GetInstance()->block_compression_mem_tracker());
+            buffer = std::make_unique<faststring>();
+        }
         LZ4_streamHC_t* ctx;
         std::unique_ptr<faststring> buffer;
         ~Context() {
+            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                    ExecEnv::GetInstance()->block_compression_mem_tracker());
             if (ctx) {
                 LZ4_freeStreamHC(ctx);
             }
@@ -498,7 +533,11 @@ public:
         static Lz4HCBlockCompression s_instance;
         return &s_instance;
     }
-    ~Lz4HCBlockCompression() { _ctx_pool.clear(); }
+    ~Lz4HCBlockCompression() {
+        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                ExecEnv::GetInstance()->block_compression_mem_tracker());
+        _ctx_pool.clear();
+    }
 
     Status compress(const Slice& input, faststring* output) override {
         std::unique_ptr<Context> context;
@@ -519,7 +558,12 @@ public:
                 compressed_buf.data = reinterpret_cast<char*>(output->data());
                 compressed_buf.size = max_len;
             } else {
-                context->buffer->resize(max_len);
+                {
+                    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                            
ExecEnv::GetInstance()->block_compression_mem_tracker());
+                    // reuse context buffer if max_len <= 
MAX_COMPRESSION_BUFFER_FOR_REUSE
+                    context->buffer->resize(max_len);
+                }
                 compressed_buf.data = 
reinterpret_cast<char*>(context->buffer->data());
                 compressed_buf.size = max_len;
             }
@@ -781,10 +825,16 @@ private:
         ENABLE_FACTORY_CREATOR(CContext);
 
     public:
-        CContext() : ctx(nullptr) { buffer = std::make_unique<faststring>(); }
+        CContext() : ctx(nullptr) {
+            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                    ExecEnv::GetInstance()->block_compression_mem_tracker());
+            buffer = std::make_unique<faststring>();
+        }
         ZSTD_CCtx* ctx;
         std::unique_ptr<faststring> buffer;
         ~CContext() {
+            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                    ExecEnv::GetInstance()->block_compression_mem_tracker());
             if (ctx) {
                 ZSTD_freeCCtx(ctx);
             }
@@ -810,6 +860,8 @@ public:
         return &s_instance;
     }
     ~ZstdBlockCompression() {
+        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                ExecEnv::GetInstance()->block_compression_mem_tracker());
         _ctx_c_pool.clear();
         _ctx_d_pool.clear();
     }
@@ -843,8 +895,12 @@ public:
                 compressed_buf.data = reinterpret_cast<char*>(output->data());
                 compressed_buf.size = max_len;
             } else {
-                // reuse context buffer if max_len <= 
MAX_COMPRESSION_BUFFER_FOR_REUSE
-                context->buffer->resize(max_len);
+                {
+                    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                            
ExecEnv::GetInstance()->block_compression_mem_tracker());
+                    // reuse context buffer if max_len <= 
MAX_COMPRESSION_BUFFER_FOR_REUSE
+                    context->buffer->resize(max_len);
+                }
                 compressed_buf.data = 
reinterpret_cast<char*>(context->buffer->data());
                 compressed_buf.size = max_len;
             }
diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp
index 34d71d8df3e..480734dcade 100644
--- a/be/src/vec/common/allocator.cpp
+++ b/be/src/vec/common/allocator.cpp
@@ -210,6 +210,30 @@ void Allocator<clear_memory_, mmap_populate, use_mmap, 
MemoryAllocator>::throw_b
     throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err);
 }
 
+#ifndef NDEBUG
+template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename 
MemoryAllocator>
+void Allocator<clear_memory_, mmap_populate, use_mmap, 
MemoryAllocator>::add_address_sanitizers(
+        void* buf, size_t size) const {
+#ifdef BE_TEST
+    if (!doris::ExecEnv::ready()) {
+        return;
+    }
+#endif
+    doris::thread_context()->thread_mem_tracker()->add_address_sanitizers(buf, 
size);
+}
+
+template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename 
MemoryAllocator>
+void Allocator<clear_memory_, mmap_populate, use_mmap, 
MemoryAllocator>::remove_address_sanitizers(
+        void* buf, size_t size) const {
+#ifdef BE_TEST
+    if (!doris::ExecEnv::ready()) {
+        return;
+    }
+#endif
+    
doris::thread_context()->thread_mem_tracker()->remove_address_sanitizers(buf, 
size);
+}
+#endif
+
 template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename 
MemoryAllocator>
 void* Allocator<clear_memory_, mmap_populate, use_mmap, 
MemoryAllocator>::alloc(size_t size,
                                                                                
 size_t alignment) {
diff --git a/be/src/vec/common/allocator.h b/be/src/vec/common/allocator.h
index 83cb6eddb7d..576ce86d928 100644
--- a/be/src/vec/common/allocator.h
+++ b/be/src/vec/common/allocator.h
@@ -231,6 +231,10 @@ public:
     void consume_memory(size_t size) const;
     void release_memory(size_t size) const;
     void throw_bad_alloc(const std::string& err) const;
+#ifndef NDEBUG
+    void add_address_sanitizers(void* buf, size_t size) const;
+    void remove_address_sanitizers(void* buf, size_t size) const;
+#endif
 
     void* alloc(size_t size, size_t alignment = 0);
     void* realloc(void* buf, size_t old_size, size_t new_size, size_t 
alignment = 0);
@@ -238,6 +242,7 @@ public:
     /// Allocate memory range.
     void* alloc_impl(size_t size, size_t alignment = 0) {
         memory_check(size);
+        // consume memory in tracker before alloc, similar to early 
declaration.
         consume_memory(size);
         void* buf;
         size_t record_size = size;
@@ -273,6 +278,9 @@ public:
                 if constexpr (MemoryAllocator::need_record_actual_size()) {
                     record_size = MemoryAllocator::allocated_size(buf);
                 }
+#ifndef NDEBUG
+                add_address_sanitizers(buf, size);
+#endif
             } else {
                 buf = nullptr;
                 int res = MemoryAllocator::posix_memalign(&buf, alignment, 
size);
@@ -282,6 +290,9 @@ public:
                     throw_bad_alloc(
                             fmt::format("Cannot allocate memory 
(posix_memalign) {}.", size));
                 }
+#ifndef NDEBUG
+                add_address_sanitizers(buf, size);
+#endif
 
                 if constexpr (clear_memory) memset(buf, 0, size);
 
@@ -303,6 +314,9 @@ public:
                 throw_bad_alloc(fmt::format("Allocator: Cannot munmap {}.", 
size));
             }
         } else {
+#ifndef NDEBUG
+            remove_address_sanitizers(buf, size);
+#endif
             MemoryAllocator::free(buf);
         }
         release_memory(size);
@@ -326,6 +340,9 @@ public:
         if (!use_mmap ||
             (old_size < doris::config::mmap_threshold && new_size < 
doris::config::mmap_threshold &&
              alignment <= MALLOC_MIN_ALIGNMENT)) {
+#ifndef NDEBUG
+            remove_address_sanitizers(buf, old_size);
+#endif
             /// Resize malloc'd memory region with no special alignment 
requirement.
             void* new_buf = MemoryAllocator::realloc(buf, new_size);
             if (nullptr == new_buf) {
@@ -333,6 +350,10 @@ public:
                 throw_bad_alloc(fmt::format("Allocator: Cannot realloc from {} 
to {}.", old_size,
                                             new_size));
             }
+#ifndef NDEBUG
+            add_address_sanitizers(
+                    new_buf, new_size); // usually, buf addr = new_buf addr, 
asan maybe not equal.
+#endif
 
             buf = new_buf;
             if constexpr (clear_memory)
@@ -362,6 +383,10 @@ public:
             // Big allocs that requires a copy.
             void* new_buf = alloc(new_size, alignment);
             memcpy(new_buf, buf, std::min(old_size, new_size));
+#ifndef NDEBUG
+            add_address_sanitizers(new_buf, new_size);
+            remove_address_sanitizers(buf, old_size);
+#endif
             free(buf, old_size);
             buf = new_buf;
         }
diff --git a/be/src/vec/common/pod_array_fwd.h 
b/be/src/vec/common/pod_array_fwd.h
index ff00b312575..e1a428eda9d 100644
--- a/be/src/vec/common/pod_array_fwd.h
+++ b/be/src/vec/common/pod_array_fwd.h
@@ -36,9 +36,12 @@ template <typename T, size_t initial_bytes = 4096, typename 
TAllocator = Allocat
           size_t pad_right_ = 0, size_t pad_left_ = 0>
 class PODArray;
 
-/** For columns. Padding is enough to read and write xmm-register at the 
address of the last element. */
+/** For columns. Padding is enough to read and write xmm-register at the 
address of the last element. 
+  * TODO, pad_right is temporarily changed from 15 to 16, will waste 1 bytes,
+  * can rollback after fix wrong reinterpret_cast column and PODArray swap.
+  */
 template <typename T, size_t initial_bytes = 4096, typename TAllocator = 
Allocator<false>>
-using PaddedPODArray = PODArray<T, initial_bytes, TAllocator, 15, 16>;
+using PaddedPODArray = PODArray<T, initial_bytes, TAllocator, 16, 16>;
 
 /** A helper for declaring PODArray that uses inline memory.
   * The initial size is set to use all the inline bytes, since using less would
diff --git a/bin/run-fs-benchmark.sh b/bin/run-fs-benchmark.sh
index 3076a968761..f4edd4117d0 100755
--- a/bin/run-fs-benchmark.sh
+++ b/bin/run-fs-benchmark.sh
@@ -189,7 +189,7 @@ else
 fi
 
 ## set asan and ubsan env to generate core file
-export 
ASAN_OPTIONS=symbolize=1:abort_on_error=1:disable_coredump=0:unmap_shadow_on_exit=1:detect_container_overflow=0
+export 
ASAN_OPTIONS=symbolize=1:abort_on_error=1:disable_coredump=0:unmap_shadow_on_exit=1:detect_container_overflow=0:check_malloc_usable_size=0
 export UBSAN_OPTIONS=print_stacktrace=1
 
 ## set TCMALLOC_HEAP_LIMIT_MB to limit memory used by tcmalloc
diff --git a/bin/start_be.sh b/bin/start_be.sh
index d9800e1b047..396e75a5d48 100755
--- a/bin/start_be.sh
+++ b/bin/start_be.sh
@@ -267,7 +267,8 @@ fi
 export AWS_MAX_ATTEMPTS=2
 
 ## set asan and ubsan env to generate core file
-export 
ASAN_OPTIONS=symbolize=1:abort_on_error=1:disable_coredump=0:unmap_shadow_on_exit=1:detect_container_overflow=0
+## detect_container_overflow=0, https://github.com/google/sanitizers/issues/193
+export 
ASAN_OPTIONS=symbolize=1:abort_on_error=1:disable_coredump=0:unmap_shadow_on_exit=1:detect_container_overflow=0:check_malloc_usable_size=0
 export UBSAN_OPTIONS=print_stacktrace=1
 
 ## set TCMALLOC_HEAP_LIMIT_MB to limit memory used by tcmalloc
diff --git a/run-be-ut.sh b/run-be-ut.sh
index 346d5cd1ecb..f9fcf9e9d53 100755
--- a/run-be-ut.sh
+++ b/run-be-ut.sh
@@ -405,7 +405,8 @@ export 
ORC_EXAMPLE_DIR="${DORIS_HOME}/be/src/apache-orc/examples"
 
 # set asan and ubsan env to generate core file
 export DORIS_HOME="${DORIS_TEST_BINARY_DIR}/"
-export 
ASAN_OPTIONS=symbolize=1:abort_on_error=1:disable_coredump=0:unmap_shadow_on_exit=1:detect_container_overflow=0
+## detect_container_overflow=0, https://github.com/google/sanitizers/issues/193
+export 
ASAN_OPTIONS=symbolize=1:abort_on_error=1:disable_coredump=0:unmap_shadow_on_exit=1:detect_container_overflow=0:check_malloc_usable_size=0
 export UBSAN_OPTIONS=print_stacktrace=1
 export JAVA_OPTS="-Xmx1024m -DlogPath=${DORIS_HOME}/log/jni.log 
-Xloggc:${DORIS_HOME}/log/be.gc.log.${CUR_DATE} -Dsun.java.command=DorisBE 
-XX:-CriticalJNINatives -DJDBC_MIN_POOL=1 -DJDBC_MAX_POOL=100 
-DJDBC_MAX_IDLE_TIME=300000"
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to