This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/spill_and_reserve by this push:
     new 3b20518bd73 [fix](load) fix table sink memory usage counter (#47248)
3b20518bd73 is described below

commit 3b20518bd7360888ca5eba4f9821e9c913978476
Author: TengJianPing <tengjianp...@selectdb.com>
AuthorDate: Fri Jan 24 05:08:30 2025 +0800

    [fix](load) fix table sink memory usage counter (#47248)
---
 be/src/olap/memtable_writer.cpp                    |  4 ++--
 be/src/pipeline/exec/olap_table_sink_v2_operator.h |  2 +-
 be/src/pipeline/local_exchange/local_exchanger.h   |  3 ++-
 be/src/vec/core/block.cpp                          | 17 ++++++++++----
 be/src/vec/sink/writer/async_result_writer.cpp     | 26 ++++++++++++++++++----
 be/src/vec/sink/writer/async_result_writer.h       | 10 +++++----
 .../sink/writer/iceberg/viceberg_table_writer.cpp  |  1 -
 .../sink/writer/iceberg/viceberg_table_writer.h    |  1 -
 be/src/vec/sink/writer/vhive_table_writer.h        |  1 -
 be/src/vec/sink/writer/vtablet_writer.h            |  3 +--
 be/src/vec/sink/writer/vtablet_writer_v2.h         |  3 +--
 11 files changed, 48 insertions(+), 23 deletions(-)

diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp
index 6d63890eede..cecad2ceef4 100644
--- a/be/src/olap/memtable_writer.cpp
+++ b/be/src/olap/memtable_writer.cpp
@@ -158,8 +158,8 @@ Status MemTableWriter::flush_async() {
     }
 
     VLOG_NOTICE << "flush memtable to reduce mem consumption. memtable size: "
-                << _mem_table->memory_usage() << ", tablet: " << _req.tablet_id
-                << ", load id: " << print_id(_req.load_id);
+                << PrettyPrinter::print_bytes(_mem_table->memory_usage())
+                << ", tablet: " << _req.tablet_id << ", load id: " << 
print_id(_req.load_id);
     auto s = _flush_memtable_async();
     _reset_mem_table();
     return s;
diff --git a/be/src/pipeline/exec/olap_table_sink_v2_operator.h 
b/be/src/pipeline/exec/olap_table_sink_v2_operator.h
index da5986d09ee..aa030ccf28e 100644
--- a/be/src/pipeline/exec/olap_table_sink_v2_operator.h
+++ b/be/src/pipeline/exec/olap_table_sink_v2_operator.h
@@ -68,7 +68,7 @@ public:
 
     void set_low_memory_mode(RuntimeState* state) override {
         auto& local_state = get_local_state(state);
-        local_state._writer->clear_free_blocks();
+        local_state._writer->set_low_memory_mode();
     }
 
 private:
diff --git a/be/src/pipeline/local_exchange/local_exchanger.h 
b/be/src/pipeline/local_exchange/local_exchanger.h
index 5684b418ff2..c1450cb7c69 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/local_exchange/local_exchanger.h
@@ -24,7 +24,8 @@ namespace doris {
 #include "common/compile_check_begin.h"
 namespace vectorized {
 template <typename T>
-void clear_blocks(moodycamel::ConcurrentQueue<T>& blocks);
+void clear_blocks(moodycamel::ConcurrentQueue<T>& blocks,
+                  RuntimeProfile::Counter* memory_used_counter = nullptr);
 
 class PartitionerBase;
 } // namespace vectorized
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 8584e5df3cd..bed9f7279ed 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -61,15 +61,24 @@ enum CompressionTypePB : int;
 
 namespace doris::vectorized {
 template <typename T>
-void clear_blocks(moodycamel::ConcurrentQueue<T>& blocks) {
+void clear_blocks(moodycamel::ConcurrentQueue<T>& blocks,
+                  RuntimeProfile::Counter* memory_used_counter = nullptr) {
     T block;
     while (blocks.try_dequeue(block)) {
-        // do nothing
+        if (memory_used_counter) {
+            if constexpr (std::is_same_v<T, Block>) {
+                memory_used_counter->update(-block.allocated_bytes());
+            } else {
+                memory_used_counter->update(-block->allocated_bytes());
+            }
+        }
     }
 }
 
-template void clear_blocks<Block>(moodycamel::ConcurrentQueue<Block>&);
-template void clear_blocks<BlockUPtr>(moodycamel::ConcurrentQueue<BlockUPtr>&);
+template void clear_blocks<Block>(moodycamel::ConcurrentQueue<Block>&,
+                                  RuntimeProfile::Counter* 
memory_used_counter);
+template void clear_blocks<BlockUPtr>(moodycamel::ConcurrentQueue<BlockUPtr>&,
+                                      RuntimeProfile::Counter* 
memory_used_counter);
 
 Block::Block(std::initializer_list<ColumnWithTypeAndName> il) : data {il} {
     initialize_index_by_name();
diff --git a/be/src/vec/sink/writer/async_result_writer.cpp 
b/be/src/vec/sink/writer/async_result_writer.cpp
index 28756cfe78f..5a6738191bf 100644
--- a/be/src/vec/sink/writer/async_result_writer.cpp
+++ b/be/src/vec/sink/writer/async_result_writer.cpp
@@ -58,6 +58,7 @@ Status AsyncResultWriter::sink(Block* block, bool eos) {
         _dependency->set_ready();
     }
     if (rows) {
+        _memory_used_counter->update(add_block->allocated_bytes());
         _data_queue.emplace_back(std::move(add_block));
         if (!_data_queue_is_available() && !_is_finished()) {
             _dependency->block();
@@ -81,10 +82,18 @@ std::unique_ptr<Block> 
AsyncResultWriter::_get_block_from_queue() {
     if (_data_queue_is_available()) {
         _dependency->set_ready();
     }
+    _memory_used_counter->update(-block->allocated_bytes());
     return block;
 }
 
 Status AsyncResultWriter::start_writer(RuntimeState* state, RuntimeProfile* 
profile) {
+    // Attention!!!
+    // AsyncResultWriter::open is called asynchronously,
+    // so we need to setupt the profile and memory counter here,
+    // or else the counter can be nullptr when AsyncResultWriter::sink is 
called.
+    _profile = profile;
+    _memory_used_counter = _profile->get_counter("MemoryUsage");
+
     // Should set to false here, to
     DCHECK(_finish_dependency);
     _finish_dependency->block();
@@ -222,7 +231,12 @@ void AsyncResultWriter::force_close(Status s) {
 }
 
 void AsyncResultWriter::_return_free_block(std::unique_ptr<Block> b) {
-    _free_blocks.enqueue(std::move(b));
+    if (!_low_memory_mode) {
+        auto allocated_bytes = b->allocated_bytes();
+        if (_free_blocks.enqueue(std::move(b))) {
+            _memory_used_counter->update(allocated_bytes);
+        }
+    }
 }
 
 std::unique_ptr<Block> 
AsyncResultWriter::_get_free_block(doris::vectorized::Block* block,
@@ -230,15 +244,19 @@ std::unique_ptr<Block> 
AsyncResultWriter::_get_free_block(doris::vectorized::Blo
     std::unique_ptr<Block> b;
     if (!_free_blocks.try_dequeue(b)) {
         b = block->create_same_struct_block(rows, true);
+    } else {
+        _memory_used_counter->update(-b->allocated_bytes());
     }
     b->swap(*block);
     return b;
 }
 
 template <typename T>
-void clear_blocks(moodycamel::ConcurrentQueue<T>& blocks);
-void AsyncResultWriter::clear_free_blocks() {
-    clear_blocks(_free_blocks);
+void clear_blocks(moodycamel::ConcurrentQueue<T>& blocks,
+                  RuntimeProfile::Counter* memory_used_counter = nullptr);
+void AsyncResultWriter::set_low_memory_mode() {
+    _low_memory_mode = true;
+    clear_blocks(_free_blocks, _memory_used_counter);
 }
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/sink/writer/async_result_writer.h 
b/be/src/vec/sink/writer/async_result_writer.h
index a1265655c58..dc6b90a6c27 100644
--- a/be/src/vec/sink/writer/async_result_writer.h
+++ b/be/src/vec/sink/writer/async_result_writer.h
@@ -22,13 +22,13 @@
 #include <queue> // IWYU pragma: keep
 
 #include "runtime/result_writer.h"
+#include "util/runtime_profile.h"
 #include "vec/exprs/vexpr_fwd.h"
 
 namespace doris {
 class ObjectPool;
 class RowDescriptor;
 class RuntimeState;
-class RuntimeProfile;
 class TDataSink;
 class TExpr;
 
@@ -72,22 +72,22 @@ public:
 
     Status get_writer_status() { return _writer_status.status(); }
 
-    void clear_free_blocks();
+    void set_low_memory_mode();
 
 protected:
     Status _projection_block(Block& input_block, Block* output_block);
     const VExprContextSPtrs& _vec_output_expr_ctxs;
+    RuntimeProfile* _profile = nullptr; // not owned, set when open
 
     std::unique_ptr<Block> _get_free_block(Block*, size_t rows);
 
-    void _return_free_block(std::unique_ptr<Block>);
-
 private:
     void process_block(RuntimeState* state, RuntimeProfile* profile);
     [[nodiscard]] bool _data_queue_is_available() const { return 
_data_queue.size() < QUEUE_SIZE; }
     [[nodiscard]] bool _is_finished() const { return !_writer_status.ok() || 
_eos; }
     void _set_ready_to_finish();
 
+    void _return_free_block(std::unique_ptr<Block>);
     std::unique_ptr<Block> _get_block_from_queue();
 
     static constexpr auto QUEUE_SIZE = 3;
@@ -97,11 +97,13 @@ private:
     // Default value is ok
     AtomicStatus _writer_status;
     bool _eos = false;
+    std::atomic_bool _low_memory_mode = false;
 
     std::shared_ptr<pipeline::Dependency> _dependency;
     std::shared_ptr<pipeline::Dependency> _finish_dependency;
 
     moodycamel::ConcurrentQueue<std::unique_ptr<Block>> _free_blocks;
+    RuntimeProfile::Counter* _memory_used_counter = nullptr;
 };
 
 } // namespace vectorized
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp 
b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
index 608afced8d9..81d97593e87 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
+++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
@@ -47,7 +47,6 @@ Status VIcebergTableWriter::init_properties(ObjectPool* pool) 
{
 
 Status VIcebergTableWriter::open(RuntimeState* state, RuntimeProfile* profile) 
{
     _state = state;
-    _profile = profile;
 
     // add all counter
     _written_rows_counter = ADD_COUNTER(_profile, "WrittenRows", TUnit::UNIT);
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h 
b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
index e53c7020a68..b3389d94880 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
+++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
@@ -116,7 +116,6 @@ private:
     // Currently it is a copy, maybe it is better to use move semantics to 
eliminate it.
     TDataSink _t_sink;
     RuntimeState* _state = nullptr;
-    RuntimeProfile* _profile = nullptr;
 
     std::shared_ptr<doris::iceberg::Schema> _schema;
     std::unique_ptr<doris::iceberg::PartitionSpec> _partition_spec;
diff --git a/be/src/vec/sink/writer/vhive_table_writer.h 
b/be/src/vec/sink/writer/vhive_table_writer.h
index 9361fdbc408..1a2cfd3e8d2 100644
--- a/be/src/vec/sink/writer/vhive_table_writer.h
+++ b/be/src/vec/sink/writer/vhive_table_writer.h
@@ -71,7 +71,6 @@ private:
     // Currently it is a copy, maybe it is better to use move semantics to 
eliminate it.
     TDataSink _t_sink;
     RuntimeState* _state = nullptr;
-    RuntimeProfile* _profile = nullptr;
     std::vector<int> _partition_columns_input_index;
     std::set<size_t> _non_write_columns_indices;
     std::unordered_map<std::string, std::shared_ptr<VHivePartitionWriter>> 
_partitions_to_writers;
diff --git a/be/src/vec/sink/writer/vtablet_writer.h 
b/be/src/vec/sink/writer/vtablet_writer.h
index 52aa0f6b918..62528a57114 100644
--- a/be/src/vec/sink/writer/vtablet_writer.h
+++ b/be/src/vec/sink/writer/vtablet_writer.h
@@ -683,8 +683,7 @@ private:
 
     VOlapTablePartitionParam* _vpartition = nullptr;
 
-    RuntimeState* _state = nullptr;     // not owned, set when open
-    RuntimeProfile* _profile = nullptr; // not owned, set when open
+    RuntimeState* _state = nullptr; // not owned, set when open
 
     VRowDistribution _row_distribution;
     // reuse to avoid frequent memory allocation and release.
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h 
b/be/src/vec/sink/writer/vtablet_writer_v2.h
index b6b6b623698..788af3d1213 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.h
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.h
@@ -217,8 +217,7 @@ private:
 
     VOlapTablePartitionParam* _vpartition = nullptr;
 
-    RuntimeState* _state = nullptr;     // not owned, set when open
-    RuntimeProfile* _profile = nullptr; // not owned, set when open
+    RuntimeState* _state = nullptr; // not owned, set when open
 
     std::unordered_set<int64_t> _opened_partitions;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to