This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push: new 3b20518bd73 [fix](load) fix table sink memory usage counter (#47248) 3b20518bd73 is described below commit 3b20518bd7360888ca5eba4f9821e9c913978476 Author: TengJianPing <tengjianp...@selectdb.com> AuthorDate: Fri Jan 24 05:08:30 2025 +0800 [fix](load) fix table sink memory usage counter (#47248) --- be/src/olap/memtable_writer.cpp | 4 ++-- be/src/pipeline/exec/olap_table_sink_v2_operator.h | 2 +- be/src/pipeline/local_exchange/local_exchanger.h | 3 ++- be/src/vec/core/block.cpp | 17 ++++++++++---- be/src/vec/sink/writer/async_result_writer.cpp | 26 ++++++++++++++++++---- be/src/vec/sink/writer/async_result_writer.h | 10 +++++---- .../sink/writer/iceberg/viceberg_table_writer.cpp | 1 - .../sink/writer/iceberg/viceberg_table_writer.h | 1 - be/src/vec/sink/writer/vhive_table_writer.h | 1 - be/src/vec/sink/writer/vtablet_writer.h | 3 +-- be/src/vec/sink/writer/vtablet_writer_v2.h | 3 +-- 11 files changed, 48 insertions(+), 23 deletions(-) diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp index 6d63890eede..cecad2ceef4 100644 --- a/be/src/olap/memtable_writer.cpp +++ b/be/src/olap/memtable_writer.cpp @@ -158,8 +158,8 @@ Status MemTableWriter::flush_async() { } VLOG_NOTICE << "flush memtable to reduce mem consumption. memtable size: " - << _mem_table->memory_usage() << ", tablet: " << _req.tablet_id - << ", load id: " << print_id(_req.load_id); + << PrettyPrinter::print_bytes(_mem_table->memory_usage()) + << ", tablet: " << _req.tablet_id << ", load id: " << print_id(_req.load_id); auto s = _flush_memtable_async(); _reset_mem_table(); return s; diff --git a/be/src/pipeline/exec/olap_table_sink_v2_operator.h b/be/src/pipeline/exec/olap_table_sink_v2_operator.h index da5986d09ee..aa030ccf28e 100644 --- a/be/src/pipeline/exec/olap_table_sink_v2_operator.h +++ b/be/src/pipeline/exec/olap_table_sink_v2_operator.h @@ -68,7 +68,7 @@ public: void set_low_memory_mode(RuntimeState* state) override { auto& local_state = get_local_state(state); - local_state._writer->clear_free_blocks(); + local_state._writer->set_low_memory_mode(); } private: diff --git a/be/src/pipeline/local_exchange/local_exchanger.h b/be/src/pipeline/local_exchange/local_exchanger.h index 5684b418ff2..c1450cb7c69 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.h +++ b/be/src/pipeline/local_exchange/local_exchanger.h @@ -24,7 +24,8 @@ namespace doris { #include "common/compile_check_begin.h" namespace vectorized { template <typename T> -void clear_blocks(moodycamel::ConcurrentQueue<T>& blocks); +void clear_blocks(moodycamel::ConcurrentQueue<T>& blocks, + RuntimeProfile::Counter* memory_used_counter = nullptr); class PartitionerBase; } // namespace vectorized diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index 8584e5df3cd..bed9f7279ed 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -61,15 +61,24 @@ enum CompressionTypePB : int; namespace doris::vectorized { template <typename T> -void clear_blocks(moodycamel::ConcurrentQueue<T>& blocks) { +void clear_blocks(moodycamel::ConcurrentQueue<T>& blocks, + RuntimeProfile::Counter* memory_used_counter = nullptr) { T block; while (blocks.try_dequeue(block)) { - // do nothing + if (memory_used_counter) { + if constexpr (std::is_same_v<T, Block>) { + memory_used_counter->update(-block.allocated_bytes()); + } else { + memory_used_counter->update(-block->allocated_bytes()); + } + } } } -template void clear_blocks<Block>(moodycamel::ConcurrentQueue<Block>&); -template void clear_blocks<BlockUPtr>(moodycamel::ConcurrentQueue<BlockUPtr>&); +template void clear_blocks<Block>(moodycamel::ConcurrentQueue<Block>&, + RuntimeProfile::Counter* memory_used_counter); +template void clear_blocks<BlockUPtr>(moodycamel::ConcurrentQueue<BlockUPtr>&, + RuntimeProfile::Counter* memory_used_counter); Block::Block(std::initializer_list<ColumnWithTypeAndName> il) : data {il} { initialize_index_by_name(); diff --git a/be/src/vec/sink/writer/async_result_writer.cpp b/be/src/vec/sink/writer/async_result_writer.cpp index 28756cfe78f..5a6738191bf 100644 --- a/be/src/vec/sink/writer/async_result_writer.cpp +++ b/be/src/vec/sink/writer/async_result_writer.cpp @@ -58,6 +58,7 @@ Status AsyncResultWriter::sink(Block* block, bool eos) { _dependency->set_ready(); } if (rows) { + _memory_used_counter->update(add_block->allocated_bytes()); _data_queue.emplace_back(std::move(add_block)); if (!_data_queue_is_available() && !_is_finished()) { _dependency->block(); @@ -81,10 +82,18 @@ std::unique_ptr<Block> AsyncResultWriter::_get_block_from_queue() { if (_data_queue_is_available()) { _dependency->set_ready(); } + _memory_used_counter->update(-block->allocated_bytes()); return block; } Status AsyncResultWriter::start_writer(RuntimeState* state, RuntimeProfile* profile) { + // Attention!!! + // AsyncResultWriter::open is called asynchronously, + // so we need to setupt the profile and memory counter here, + // or else the counter can be nullptr when AsyncResultWriter::sink is called. + _profile = profile; + _memory_used_counter = _profile->get_counter("MemoryUsage"); + // Should set to false here, to DCHECK(_finish_dependency); _finish_dependency->block(); @@ -222,7 +231,12 @@ void AsyncResultWriter::force_close(Status s) { } void AsyncResultWriter::_return_free_block(std::unique_ptr<Block> b) { - _free_blocks.enqueue(std::move(b)); + if (!_low_memory_mode) { + auto allocated_bytes = b->allocated_bytes(); + if (_free_blocks.enqueue(std::move(b))) { + _memory_used_counter->update(allocated_bytes); + } + } } std::unique_ptr<Block> AsyncResultWriter::_get_free_block(doris::vectorized::Block* block, @@ -230,15 +244,19 @@ std::unique_ptr<Block> AsyncResultWriter::_get_free_block(doris::vectorized::Blo std::unique_ptr<Block> b; if (!_free_blocks.try_dequeue(b)) { b = block->create_same_struct_block(rows, true); + } else { + _memory_used_counter->update(-b->allocated_bytes()); } b->swap(*block); return b; } template <typename T> -void clear_blocks(moodycamel::ConcurrentQueue<T>& blocks); -void AsyncResultWriter::clear_free_blocks() { - clear_blocks(_free_blocks); +void clear_blocks(moodycamel::ConcurrentQueue<T>& blocks, + RuntimeProfile::Counter* memory_used_counter = nullptr); +void AsyncResultWriter::set_low_memory_mode() { + _low_memory_mode = true; + clear_blocks(_free_blocks, _memory_used_counter); } } // namespace vectorized } // namespace doris diff --git a/be/src/vec/sink/writer/async_result_writer.h b/be/src/vec/sink/writer/async_result_writer.h index a1265655c58..dc6b90a6c27 100644 --- a/be/src/vec/sink/writer/async_result_writer.h +++ b/be/src/vec/sink/writer/async_result_writer.h @@ -22,13 +22,13 @@ #include <queue> // IWYU pragma: keep #include "runtime/result_writer.h" +#include "util/runtime_profile.h" #include "vec/exprs/vexpr_fwd.h" namespace doris { class ObjectPool; class RowDescriptor; class RuntimeState; -class RuntimeProfile; class TDataSink; class TExpr; @@ -72,22 +72,22 @@ public: Status get_writer_status() { return _writer_status.status(); } - void clear_free_blocks(); + void set_low_memory_mode(); protected: Status _projection_block(Block& input_block, Block* output_block); const VExprContextSPtrs& _vec_output_expr_ctxs; + RuntimeProfile* _profile = nullptr; // not owned, set when open std::unique_ptr<Block> _get_free_block(Block*, size_t rows); - void _return_free_block(std::unique_ptr<Block>); - private: void process_block(RuntimeState* state, RuntimeProfile* profile); [[nodiscard]] bool _data_queue_is_available() const { return _data_queue.size() < QUEUE_SIZE; } [[nodiscard]] bool _is_finished() const { return !_writer_status.ok() || _eos; } void _set_ready_to_finish(); + void _return_free_block(std::unique_ptr<Block>); std::unique_ptr<Block> _get_block_from_queue(); static constexpr auto QUEUE_SIZE = 3; @@ -97,11 +97,13 @@ private: // Default value is ok AtomicStatus _writer_status; bool _eos = false; + std::atomic_bool _low_memory_mode = false; std::shared_ptr<pipeline::Dependency> _dependency; std::shared_ptr<pipeline::Dependency> _finish_dependency; moodycamel::ConcurrentQueue<std::unique_ptr<Block>> _free_blocks; + RuntimeProfile::Counter* _memory_used_counter = nullptr; }; } // namespace vectorized diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp index 608afced8d9..81d97593e87 100644 --- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp +++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp @@ -47,7 +47,6 @@ Status VIcebergTableWriter::init_properties(ObjectPool* pool) { Status VIcebergTableWriter::open(RuntimeState* state, RuntimeProfile* profile) { _state = state; - _profile = profile; // add all counter _written_rows_counter = ADD_COUNTER(_profile, "WrittenRows", TUnit::UNIT); diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h index e53c7020a68..b3389d94880 100644 --- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h +++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h @@ -116,7 +116,6 @@ private: // Currently it is a copy, maybe it is better to use move semantics to eliminate it. TDataSink _t_sink; RuntimeState* _state = nullptr; - RuntimeProfile* _profile = nullptr; std::shared_ptr<doris::iceberg::Schema> _schema; std::unique_ptr<doris::iceberg::PartitionSpec> _partition_spec; diff --git a/be/src/vec/sink/writer/vhive_table_writer.h b/be/src/vec/sink/writer/vhive_table_writer.h index 9361fdbc408..1a2cfd3e8d2 100644 --- a/be/src/vec/sink/writer/vhive_table_writer.h +++ b/be/src/vec/sink/writer/vhive_table_writer.h @@ -71,7 +71,6 @@ private: // Currently it is a copy, maybe it is better to use move semantics to eliminate it. TDataSink _t_sink; RuntimeState* _state = nullptr; - RuntimeProfile* _profile = nullptr; std::vector<int> _partition_columns_input_index; std::set<size_t> _non_write_columns_indices; std::unordered_map<std::string, std::shared_ptr<VHivePartitionWriter>> _partitions_to_writers; diff --git a/be/src/vec/sink/writer/vtablet_writer.h b/be/src/vec/sink/writer/vtablet_writer.h index 52aa0f6b918..62528a57114 100644 --- a/be/src/vec/sink/writer/vtablet_writer.h +++ b/be/src/vec/sink/writer/vtablet_writer.h @@ -683,8 +683,7 @@ private: VOlapTablePartitionParam* _vpartition = nullptr; - RuntimeState* _state = nullptr; // not owned, set when open - RuntimeProfile* _profile = nullptr; // not owned, set when open + RuntimeState* _state = nullptr; // not owned, set when open VRowDistribution _row_distribution; // reuse to avoid frequent memory allocation and release. diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h b/be/src/vec/sink/writer/vtablet_writer_v2.h index b6b6b623698..788af3d1213 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.h +++ b/be/src/vec/sink/writer/vtablet_writer_v2.h @@ -217,8 +217,7 @@ private: VOlapTablePartitionParam* _vpartition = nullptr; - RuntimeState* _state = nullptr; // not owned, set when open - RuntimeProfile* _profile = nullptr; // not owned, set when open + RuntimeState* _state = nullptr; // not owned, set when open std::unordered_set<int64_t> _opened_partitions; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org