This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch spill_repartition
in repository https://gitbox.apache.org/repos/asf/doris.git

commit a631dac0897eb40bb911d05130554cbde4fb72d7
Author: yiguolei <[email protected]>
AuthorDate: Wed Mar 4 15:48:40 2026 +0800

    update dir and file meta realtime
---
 be/src/vec/spill/spill_file.cpp        | 15 +++++++----
 be/src/vec/spill/spill_file.h          | 12 +++++++--
 be/src/vec/spill/spill_file_writer.cpp | 49 +++++++++++++++++++++++-----------
 be/src/vec/spill/spill_file_writer.h   |  6 ++---
 4 files changed, 56 insertions(+), 26 deletions(-)

diff --git a/be/src/vec/spill/spill_file.cpp b/be/src/vec/spill/spill_file.cpp
index 81d8d322478..ff0e16ec4a1 100644
--- a/be/src/vec/spill/spill_file.cpp
+++ b/be/src/vec/spill/spill_file.cpp
@@ -68,8 +68,7 @@ Status SpillFile::create_writer(RuntimeState* state, 
RuntimeProfile* profile,
                                 SpillFileWriterSPtr& writer) {
     writer = std::make_shared<SpillFileWriter>(shared_from_this(), state, 
profile, _data_dir,
                                                _spill_dir);
-    // record active writer
-    _active_writer = writer.get();
+    // _active_writer is set in SpillFileWriter constructor via the shared_ptr
     return Status::OK();
 }
 
@@ -79,12 +78,18 @@ SpillFileReaderSPtr SpillFile::create_reader(RuntimeState* 
state, RuntimeProfile
     return std::make_shared<SpillFileReader>(state, profile, _spill_dir, 
_part_count);
 }
 
-void SpillFile::finish_writing(int64_t total_written_bytes, size_t part_count) 
{
-    _total_written_bytes = total_written_bytes;
-    _part_count = part_count;
+void SpillFile::finish_writing() {
     _ready_for_reading = true;
     // writer finished; clear active writer pointer
     _active_writer = nullptr;
 }
 
+void SpillFile::update_written_bytes(int64_t delta_bytes) {
+    _total_written_bytes += delta_bytes;
+}
+
+void SpillFile::increment_part_count() {
+    ++_part_count;
+}
+
 } // namespace doris::vectorized
diff --git a/be/src/vec/spill/spill_file.h b/be/src/vec/spill/spill_file.h
index 362b522f10c..c2fa0151f8c 100644
--- a/be/src/vec/spill/spill_file.h
+++ b/be/src/vec/spill/spill_file.h
@@ -90,8 +90,16 @@ private:
     friend class SpillFileWriter;
     friend class SpillFileManager;
 
-    /// Called by SpillFileWriter::close() to finalize metadata.
-    void finish_writing(int64_t total_written_bytes, size_t part_count);
+    /// Called by SpillFileWriter::close() to mark writing as complete.
+    void finish_writing();
+
+    /// Called by SpillFileWriter to incrementally track bytes written to disk.
+    /// This ensures SpillFile always knows the correct _total_written_bytes 
for
+    /// gc() accounting, even if the writer's close() is never properly called.
+    void update_written_bytes(int64_t delta_bytes);
+
+    /// Called by SpillFileWriter when a part file is completed.
+    void increment_part_count();
 
     SpillDataDir* _data_dir = nullptr;
     // Absolute path: data_dir->get_spill_data_path() + "/" + relative_path
diff --git a/be/src/vec/spill/spill_file_writer.cpp 
b/be/src/vec/spill/spill_file_writer.cpp
index d0ef50dbe60..3d3839621dd 100644
--- a/be/src/vec/spill/spill_file_writer.cpp
+++ b/be/src/vec/spill/spill_file_writer.cpp
@@ -46,10 +46,7 @@ SpillFileWriter::SpillFileWriter(const 
std::shared_ptr<SpillFile>& spill_file, R
     _memory_used_counter = common_profile->get_counter("MemoryUsage");
 
     // Register this writer as the active writer for the SpillFile.
-    auto spill_file_locked = _spill_file_wptr.lock();
-    if (spill_file_locked) {
-        spill_file_locked->_active_writer = this;
-    }
+    spill_file->_active_writer = this;
 
     // Custom (spill-specific) counters
     RuntimeProfile* custom_profile = profile->get_child("CustomCounters");
@@ -85,7 +82,7 @@ Status SpillFileWriter::_open_next_part() {
     return Status::OK();
 }
 
-Status SpillFileWriter::_close_current_part() {
+Status SpillFileWriter::_close_current_part(const std::shared_ptr<SpillFile>& 
spill_file) {
     if (!_file_writer) {
         return Status::OK();
     }
@@ -111,6 +108,11 @@ Status SpillFileWriter::_close_current_part() {
     }
     _data_dir->update_spill_data_usage(meta_size);
     
ExecEnv::GetInstance()->spill_file_mgr()->update_spill_write_bytes(meta_size);
+    // Incrementally update SpillFile's accounting so gc() can always
+    // decrement the correct amount, even if close() is never called.
+    if (spill_file) {
+        spill_file->update_written_bytes(meta_size);
+    }
 
     RETURN_IF_ERROR(_file_writer->close());
     _file_writer.reset();
@@ -118,6 +120,9 @@ Status SpillFileWriter::_close_current_part() {
     // Advance to next part
     ++_current_part_index;
     ++_total_parts;
+    if (spill_file) {
+        spill_file->increment_part_count();
+    }
     _part_written_blocks = 0;
     _part_written_bytes = 0;
     _part_max_sub_block_size = 0;
@@ -126,15 +131,26 @@ Status SpillFileWriter::_close_current_part() {
     return Status::OK();
 }
 
-Status SpillFileWriter::_rotate_if_needed() {
+Status SpillFileWriter::_rotate_if_needed(const std::shared_ptr<SpillFile>& 
spill_file) {
     if (_file_writer && _part_written_bytes >= _max_part_size) {
-        RETURN_IF_ERROR(_close_current_part());
+        RETURN_IF_ERROR(_close_current_part(spill_file));
     }
     return Status::OK();
 }
 
 Status SpillFileWriter::write_block(RuntimeState* state, const Block& block) {
     DCHECK(!_closed);
+
+    // Lock the SpillFile to ensure it is still alive. If it has already been
+    // destroyed (gc'd), we must not write any more data because the disk
+    // accounting would be out of sync.
+    auto spill_file = _spill_file_wptr.lock();
+    if (!spill_file) {
+        return Status::Error<INTERNAL_ERROR>(
+                "SpillFile has been destroyed, cannot write more data, 
spill_dir={}",
+                _spill_dir);
+    }
+
     // Lazily open the first part
     if (!_file_writer) {
         RETURN_IF_ERROR(_open_next_part());
@@ -148,10 +164,10 @@ Status SpillFileWriter::write_block(RuntimeState* state, 
const Block& block) {
     COUNTER_UPDATE(_write_rows_counter, rows);
     COUNTER_UPDATE(_write_block_bytes_counter, block.bytes());
 
-    RETURN_IF_ERROR(_write_internal(block));
+    RETURN_IF_ERROR(_write_internal(block, spill_file));
 
     // Auto-rotate if current part is full
-    return _rotate_if_needed();
+    return _rotate_if_needed(spill_file);
 }
 
 Status SpillFileWriter::close() {
@@ -164,25 +180,23 @@ Status SpillFileWriter::close() {
         return Status::Error<INTERNAL_ERROR>("fault_inject spill_file 
spill_eof failed");
     });
 
-    RETURN_IF_ERROR(_close_current_part());
-
-    // Use weak_ptr lock to safely access SpillFile during close.
-    // If the SpillFile has already been destroyed, we skip the callback.
     auto spill_file = _spill_file_wptr.lock();
+    RETURN_IF_ERROR(_close_current_part(spill_file));
+
     if (spill_file) {
         if (spill_file->_active_writer != this) {
             return Status::Error<INTERNAL_ERROR>(
                     "SpillFileWriter close() called but not registered as 
active writer, possible "
                     "double close or logic error");
         }
-        spill_file->finish_writing(_total_written_bytes, _total_parts);
-        spill_file->_active_writer = nullptr;
+        spill_file->finish_writing();
     }
 
     return Status::OK();
 }
 
-Status SpillFileWriter::_write_internal(const Block& block) {
+Status SpillFileWriter::_write_internal(const Block& block,
+                                        const std::shared_ptr<SpillFile>& 
spill_file) {
     size_t uncompressed_bytes = 0, compressed_bytes = 0;
 
     Status status;
@@ -241,6 +255,9 @@ Status SpillFileWriter::_write_internal(const Block& block) 
{
                     _part_written_bytes += buff_size;
                     _total_written_bytes += buff_size;
                     ++_part_written_blocks;
+                    // Incrementally update SpillFile so gc() can always
+                    // decrement the correct amount from _data_dir.
+                    spill_file->update_written_bytes(buff_size);
                 }
             }};
             {
diff --git a/be/src/vec/spill/spill_file_writer.h 
b/be/src/vec/spill/spill_file_writer.h
index 8492d11ba13..e0e88bcd8cc 100644
--- a/be/src/vec/spill/spill_file_writer.h
+++ b/be/src/vec/spill/spill_file_writer.h
@@ -66,13 +66,13 @@ private:
     Status _open_next_part();
 
     /// Close the current part: write footer, close FileWriter, update stats.
-    Status _close_current_part();
+    Status _close_current_part(const std::shared_ptr<SpillFile>& spill_file);
 
     /// If current part size >= _max_part_size, close it.
-    Status _rotate_if_needed();
+    Status _rotate_if_needed(const std::shared_ptr<SpillFile>& spill_file);
 
     /// Serialize and write a single block to the current part.
-    Status _write_internal(const Block& block);
+    Status _write_internal(const Block& block, const 
std::shared_ptr<SpillFile>& spill_file);
 
     // ── Back-reference ──
     std::weak_ptr<SpillFile> _spill_file_wptr; // weak ref; use lock() in 
close()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to