This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch spill_repartition in repository https://gitbox.apache.org/repos/asf/doris.git
commit a631dac0897eb40bb911d05130554cbde4fb72d7 Author: yiguolei <[email protected]> AuthorDate: Wed Mar 4 15:48:40 2026 +0800 update dir and file meta realtime --- be/src/vec/spill/spill_file.cpp | 15 +++++++---- be/src/vec/spill/spill_file.h | 12 +++++++-- be/src/vec/spill/spill_file_writer.cpp | 49 +++++++++++++++++++++++----------- be/src/vec/spill/spill_file_writer.h | 6 ++--- 4 files changed, 56 insertions(+), 26 deletions(-) diff --git a/be/src/vec/spill/spill_file.cpp b/be/src/vec/spill/spill_file.cpp index 81d8d322478..ff0e16ec4a1 100644 --- a/be/src/vec/spill/spill_file.cpp +++ b/be/src/vec/spill/spill_file.cpp @@ -68,8 +68,7 @@ Status SpillFile::create_writer(RuntimeState* state, RuntimeProfile* profile, SpillFileWriterSPtr& writer) { writer = std::make_shared<SpillFileWriter>(shared_from_this(), state, profile, _data_dir, _spill_dir); - // record active writer - _active_writer = writer.get(); + // _active_writer is set in SpillFileWriter constructor via the shared_ptr return Status::OK(); } @@ -79,12 +78,18 @@ SpillFileReaderSPtr SpillFile::create_reader(RuntimeState* state, RuntimeProfile return std::make_shared<SpillFileReader>(state, profile, _spill_dir, _part_count); } -void SpillFile::finish_writing(int64_t total_written_bytes, size_t part_count) { - _total_written_bytes = total_written_bytes; - _part_count = part_count; +void SpillFile::finish_writing() { _ready_for_reading = true; // writer finished; clear active writer pointer _active_writer = nullptr; } +void SpillFile::update_written_bytes(int64_t delta_bytes) { + _total_written_bytes += delta_bytes; +} + +void SpillFile::increment_part_count() { + ++_part_count; +} + } // namespace doris::vectorized diff --git a/be/src/vec/spill/spill_file.h b/be/src/vec/spill/spill_file.h index 362b522f10c..c2fa0151f8c 100644 --- a/be/src/vec/spill/spill_file.h +++ b/be/src/vec/spill/spill_file.h @@ -90,8 +90,16 @@ private: friend class SpillFileWriter; friend class SpillFileManager; - /// Called by SpillFileWriter::close() to finalize metadata. - void finish_writing(int64_t total_written_bytes, size_t part_count); + /// Called by SpillFileWriter::close() to mark writing as complete. + void finish_writing(); + + /// Called by SpillFileWriter to incrementally track bytes written to disk. + /// This ensures SpillFile always knows the correct _total_written_bytes for + /// gc() accounting, even if the writer's close() is never properly called. + void update_written_bytes(int64_t delta_bytes); + + /// Called by SpillFileWriter when a part file is completed. + void increment_part_count(); SpillDataDir* _data_dir = nullptr; // Absolute path: data_dir->get_spill_data_path() + "/" + relative_path diff --git a/be/src/vec/spill/spill_file_writer.cpp b/be/src/vec/spill/spill_file_writer.cpp index d0ef50dbe60..3d3839621dd 100644 --- a/be/src/vec/spill/spill_file_writer.cpp +++ b/be/src/vec/spill/spill_file_writer.cpp @@ -46,10 +46,7 @@ SpillFileWriter::SpillFileWriter(const std::shared_ptr<SpillFile>& spill_file, R _memory_used_counter = common_profile->get_counter("MemoryUsage"); // Register this writer as the active writer for the SpillFile. - auto spill_file_locked = _spill_file_wptr.lock(); - if (spill_file_locked) { - spill_file_locked->_active_writer = this; - } + spill_file->_active_writer = this; // Custom (spill-specific) counters RuntimeProfile* custom_profile = profile->get_child("CustomCounters"); @@ -85,7 +82,7 @@ Status SpillFileWriter::_open_next_part() { return Status::OK(); } -Status SpillFileWriter::_close_current_part() { +Status SpillFileWriter::_close_current_part(const std::shared_ptr<SpillFile>& spill_file) { if (!_file_writer) { return Status::OK(); } @@ -111,6 +108,11 @@ Status SpillFileWriter::_close_current_part() { } _data_dir->update_spill_data_usage(meta_size); ExecEnv::GetInstance()->spill_file_mgr()->update_spill_write_bytes(meta_size); + // Incrementally update SpillFile's accounting so gc() can always + // decrement the correct amount, even if close() is never called. + if (spill_file) { + spill_file->update_written_bytes(meta_size); + } RETURN_IF_ERROR(_file_writer->close()); _file_writer.reset(); @@ -118,6 +120,9 @@ Status SpillFileWriter::_close_current_part() { // Advance to next part ++_current_part_index; ++_total_parts; + if (spill_file) { + spill_file->increment_part_count(); + } _part_written_blocks = 0; _part_written_bytes = 0; _part_max_sub_block_size = 0; @@ -126,15 +131,26 @@ Status SpillFileWriter::_close_current_part() { return Status::OK(); } -Status SpillFileWriter::_rotate_if_needed() { +Status SpillFileWriter::_rotate_if_needed(const std::shared_ptr<SpillFile>& spill_file) { if (_file_writer && _part_written_bytes >= _max_part_size) { - RETURN_IF_ERROR(_close_current_part()); + RETURN_IF_ERROR(_close_current_part(spill_file)); } return Status::OK(); } Status SpillFileWriter::write_block(RuntimeState* state, const Block& block) { DCHECK(!_closed); + + // Lock the SpillFile to ensure it is still alive. If it has already been + // destroyed (gc'd), we must not write any more data because the disk + // accounting would be out of sync. + auto spill_file = _spill_file_wptr.lock(); + if (!spill_file) { + return Status::Error<INTERNAL_ERROR>( + "SpillFile has been destroyed, cannot write more data, spill_dir={}", + _spill_dir); + } + // Lazily open the first part if (!_file_writer) { RETURN_IF_ERROR(_open_next_part()); @@ -148,10 +164,10 @@ Status SpillFileWriter::write_block(RuntimeState* state, const Block& block) { COUNTER_UPDATE(_write_rows_counter, rows); COUNTER_UPDATE(_write_block_bytes_counter, block.bytes()); - RETURN_IF_ERROR(_write_internal(block)); + RETURN_IF_ERROR(_write_internal(block, spill_file)); // Auto-rotate if current part is full - return _rotate_if_needed(); + return _rotate_if_needed(spill_file); } Status SpillFileWriter::close() { @@ -164,25 +180,23 @@ Status SpillFileWriter::close() { return Status::Error<INTERNAL_ERROR>("fault_inject spill_file spill_eof failed"); }); - RETURN_IF_ERROR(_close_current_part()); - - // Use weak_ptr lock to safely access SpillFile during close. - // If the SpillFile has already been destroyed, we skip the callback. auto spill_file = _spill_file_wptr.lock(); + RETURN_IF_ERROR(_close_current_part(spill_file)); + if (spill_file) { if (spill_file->_active_writer != this) { return Status::Error<INTERNAL_ERROR>( "SpillFileWriter close() called but not registered as active writer, possible " "double close or logic error"); } - spill_file->finish_writing(_total_written_bytes, _total_parts); - spill_file->_active_writer = nullptr; + spill_file->finish_writing(); } return Status::OK(); } -Status SpillFileWriter::_write_internal(const Block& block) { +Status SpillFileWriter::_write_internal(const Block& block, + const std::shared_ptr<SpillFile>& spill_file) { size_t uncompressed_bytes = 0, compressed_bytes = 0; Status status; @@ -241,6 +255,9 @@ Status SpillFileWriter::_write_internal(const Block& block) { _part_written_bytes += buff_size; _total_written_bytes += buff_size; ++_part_written_blocks; + // Incrementally update SpillFile so gc() can always + // decrement the correct amount from _data_dir. + spill_file->update_written_bytes(buff_size); } }}; { diff --git a/be/src/vec/spill/spill_file_writer.h b/be/src/vec/spill/spill_file_writer.h index 8492d11ba13..e0e88bcd8cc 100644 --- a/be/src/vec/spill/spill_file_writer.h +++ b/be/src/vec/spill/spill_file_writer.h @@ -66,13 +66,13 @@ private: Status _open_next_part(); /// Close the current part: write footer, close FileWriter, update stats. - Status _close_current_part(); + Status _close_current_part(const std::shared_ptr<SpillFile>& spill_file); /// If current part size >= _max_part_size, close it. - Status _rotate_if_needed(); + Status _rotate_if_needed(const std::shared_ptr<SpillFile>& spill_file); /// Serialize and write a single block to the current part. - Status _write_internal(const Block& block); + Status _write_internal(const Block& block, const std::shared_ptr<SpillFile>& spill_file); // ── Back-reference ── std::weak_ptr<SpillFile> _spill_file_wptr; // weak ref; use lock() in close() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
