This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 096afe663d7 branch-4.1: [improvement](iceberg) Improve 
VIcebergSortWriter code quality (#60978) (#61468)
096afe663d7 is described below

commit 096afe663d781748138bbe207de8034da88a577a
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Wed Mar 18 10:59:18 2026 -0700

    branch-4.1: [improvement](iceberg) Improve VIcebergSortWriter code quality 
(#60978) (#61468)
    
    bp #60978
---
 be/src/pipeline/exec/operator.h                    |   6 +-
 .../exec/spill_iceberg_table_sink_operator.cpp     |  14 +-
 .../sink/writer/iceberg/viceberg_sort_writer.cpp   | 374 +++++++++++++++++++++
 .../vec/sink/writer/iceberg/viceberg_sort_writer.h | 342 +++++--------------
 .../sink/writer/iceberg/viceberg_table_writer.h    |   8 +-
 be/src/vec/spill/spill_reader.h                    |   2 +-
 .../exec/multi_cast_data_streamer_test.cpp         |   7 +-
 .../operator/spillable_operator_test_helper.cpp    |   2 +-
 8 files changed, 488 insertions(+), 267 deletions(-)

diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index a003ac1316d..080b8647a36 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -418,8 +418,8 @@ public:
 
         _spill_read_file_time =
                 ADD_TIMER_WITH_LEVEL(Base::custom_profile(), 
"SpillReadFileTime", 1);
-        _spill_read_derialize_block_timer =
-                ADD_TIMER_WITH_LEVEL(Base::custom_profile(), 
"SpillReadDerializeBlockTime", 1);
+        _spill_read_deserialize_block_timer =
+                ADD_TIMER_WITH_LEVEL(Base::custom_profile(), 
"SpillReadDeserializeBlockTime", 1);
 
         _spill_read_block_count = 
ADD_COUNTER_WITH_LEVEL(Base::custom_profile(),
                                                          
"SpillReadBlockCount", TUnit::UNIT, 1);
@@ -494,7 +494,7 @@ public:
     RuntimeProfile::Counter* _spill_read_wait_in_queue_timer = nullptr;
 
     RuntimeProfile::Counter* _spill_read_file_time = nullptr;
-    RuntimeProfile::Counter* _spill_read_derialize_block_timer = nullptr;
+    RuntimeProfile::Counter* _spill_read_deserialize_block_timer = nullptr;
     RuntimeProfile::Counter* _spill_read_block_count = nullptr;
     // Total bytes of read data in Block format(in memory format)
     RuntimeProfile::Counter* _spill_read_block_data_size = nullptr;
diff --git a/be/src/pipeline/exec/spill_iceberg_table_sink_operator.cpp 
b/be/src/pipeline/exec/spill_iceberg_table_sink_operator.cpp
index d8f577af648..46240c20ffe 100644
--- a/be/src/pipeline/exec/spill_iceberg_table_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_iceberg_table_sink_operator.cpp
@@ -54,12 +54,12 @@ bool SpillIcebergTableSinkLocalState::is_blockable() const {
 }
 
 size_t SpillIcebergTableSinkLocalState::get_reserve_mem_size(RuntimeState* 
state, bool eos) {
-    if (!_writer || !_writer->_current_writer) {
+    if (!_writer || !_writer->current_writer()) {
         return 0;
     }
 
     auto* sort_writer =
-            
dynamic_cast<vectorized::VIcebergSortWriter*>(_writer->_current_writer.get());
+            
dynamic_cast<vectorized::VIcebergSortWriter*>(_writer->current_writer().get());
     if (!sort_writer || !sort_writer->sorter()) {
         return 0;
     }
@@ -68,12 +68,12 @@ size_t 
SpillIcebergTableSinkLocalState::get_reserve_mem_size(RuntimeState* state
 }
 
 size_t SpillIcebergTableSinkLocalState::get_revocable_mem_size(RuntimeState* 
state) const {
-    if (!_writer || !_writer->_current_writer) {
+    if (!_writer || !_writer->current_writer()) {
         return 0;
     }
 
     auto* sort_writer =
-            
dynamic_cast<vectorized::VIcebergSortWriter*>(_writer->_current_writer.get());
+            
dynamic_cast<vectorized::VIcebergSortWriter*>(_writer->current_writer().get());
     if (!sort_writer || !sort_writer->sorter()) {
         return 0;
     }
@@ -83,7 +83,7 @@ size_t 
SpillIcebergTableSinkLocalState::get_revocable_mem_size(RuntimeState* sta
 
 Status SpillIcebergTableSinkLocalState::revoke_memory(
         RuntimeState* state, const std::shared_ptr<SpillContext>& 
spill_context) {
-    if (!_writer || !_writer->_current_writer) {
+    if (!_writer || !_writer->current_writer()) {
         if (spill_context) {
             spill_context->on_task_finished();
         }
@@ -91,7 +91,7 @@ Status SpillIcebergTableSinkLocalState::revoke_memory(
     }
 
     auto* sort_writer =
-            
dynamic_cast<vectorized::VIcebergSortWriter*>(_writer->_current_writer.get());
+            
dynamic_cast<vectorized::VIcebergSortWriter*>(_writer->current_writer().get());
 
     if (!sort_writer || !sort_writer->sorter()) {
         if (spill_context) {
@@ -182,7 +182,7 @@ void 
SpillIcebergTableSinkLocalState::_init_spill_counters() {
     ADD_COUNTER_WITH_LEVEL(profile, "SpillReadTaskCount", TUnit::UNIT, 1);
     ADD_TIMER_WITH_LEVEL(profile, "SpillReadTaskWaitInQueueTime", 1);
     ADD_TIMER_WITH_LEVEL(profile, "SpillReadFileTime", 1);
-    ADD_TIMER_WITH_LEVEL(profile, "SpillReadDerializeBlockTime", 1);
+    ADD_TIMER_WITH_LEVEL(profile, "SpillReadDeserializeBlockTime", 1);
     ADD_COUNTER_WITH_LEVEL(profile, "SpillReadBlockCount", TUnit::UNIT, 1);
     ADD_COUNTER_WITH_LEVEL(profile, "SpillReadBlockBytes", TUnit::BYTES, 1);
     ADD_COUNTER_WITH_LEVEL(profile, "SpillReadFileBytes", TUnit::BYTES, 1);
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_sort_writer.cpp 
b/be/src/vec/sink/writer/iceberg/viceberg_sort_writer.cpp
new file mode 100644
index 00000000000..f3dfda3f766
--- /dev/null
+++ b/be/src/vec/sink/writer/iceberg/viceberg_sort_writer.cpp
@@ -0,0 +1,374 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/sink/writer/iceberg/viceberg_sort_writer.h"
+
+#include "runtime/exec_env.h"
+#include "runtime/runtime_state.h"
+#include "vec/spill/spill_stream.h"
+#include "vec/spill/spill_stream_manager.h"
+
+namespace doris::vectorized {
+#include "common/compile_check_begin.h"
+
+Status VIcebergSortWriter::open(RuntimeState* state, RuntimeProfile* profile,
+                                const RowDescriptor* row_desc) {
+    // row_desc is required for initializing sort expressions
+    DCHECK(row_desc != nullptr);
+    _runtime_state = state;
+    _profile = profile;
+    _row_desc = row_desc;
+
+    // Initialize sort expressions from sort_info (contains ordering columns, 
asc/desc, nulls first/last)
+    RETURN_IF_ERROR(_vsort_exec_exprs.init(_sort_info, &_pool));
+    RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, *row_desc, *row_desc));
+    RETURN_IF_ERROR(_vsort_exec_exprs.open(state));
+
+    // Create FullSorter for in-memory sorting with spill support enabled.
+    // Parameters: limit=-1 (no limit), offset=0 (no offset)
+    _sorter = vectorized::FullSorter::create_unique(_vsort_exec_exprs, -1, 0, 
&_pool,
+                                                    _sort_info.is_asc_order, 
_sort_info.nulls_first,
+                                                    *row_desc, state, 
_profile);
+    _sorter->init_profile(_profile);
+    // Enable spill support so the sorter can be used with the spill framework
+    _sorter->set_enable_spill();
+    _do_spill_count_counter = ADD_COUNTER(_profile, "IcebergDoSpillCount", 
TUnit::UNIT);
+
+    // Open the underlying partition writer that handles actual file I/O
+    RETURN_IF_ERROR(_iceberg_partition_writer->open(state, profile, row_desc));
+    return Status::OK();
+}
+
+Status VIcebergSortWriter::write(vectorized::Block& block) {
+    // Append incoming block data to the sorter's internal buffer
+    RETURN_IF_ERROR(_sorter->append_block(&block));
+    _update_spill_block_batch_row_count(block);
+
+    // When accumulated data size reaches the target file size threshold,
+    // sort the data in memory and flush it directly to a Parquet/ORC file.
+    // This avoids holding too much data in memory before writing.
+    if (_sorter->data_size() >= _target_file_size_bytes) {
+        return _flush_to_file();
+    }
+
+    // If data size is below threshold, wait for more data.
+    // Note: trigger_spill() may be called externally by the memory management
+    // system if memory pressure is high.
+    return Status::OK();
+}
+
+Status VIcebergSortWriter::close(const Status& status) {
+    // Track the actual internal status of operations performed during close.
+    // This is important because if intermediate operations (like do_sort()) 
fail,
+    // we need to propagate the actual error status to the underlying 
partition writer's
+    // close() call, rather than the original status parameter which could be 
OK.
+    Status internal_status = Status::OK();
+    // Track the close status of the underlying partition writer.
+    // If _iceberg_partition_writer->close() fails (e.g., Parquet file flush 
error),
+    // we must propagate this error to the caller to avoid silent data loss.
+    Status close_status = Status::OK();
+
+    // Defer ensures the underlying partition writer is always closed and
+    // spill streams are cleaned up, regardless of whether intermediate 
operations succeed.
+    // Uses internal_status to propagate any errors that occurred during close 
operations.
+    Defer defer {[&]() {
+        // If any intermediate operation failed, pass that error to the 
partition writer;
+        // otherwise, pass the original status from the caller.
+        close_status =
+                _iceberg_partition_writer->close(internal_status.ok() ? status 
: internal_status);
+        if (!close_status.ok()) {
+            LOG(WARNING) << fmt::format("_iceberg_partition_writer close 
failed, reason: {}",
+                                        close_status.to_string());
+        }
+        _cleanup_spill_streams();
+    }};
+
+    // If the original status is already an error or the query is cancelled,
+    // skip all close operations and propagate the original error
+    if (!status.ok() || _runtime_state->is_cancelled()) {
+        return status;
+    }
+
+    // If sorter was never initialized (e.g., no data was written), nothing to 
do
+    if (_sorter == nullptr) {
+        return Status::OK();
+    }
+
+    // Check if there is any remaining data in the sorter (either unsorted or 
already sorted blocks)
+    if (!_sorter->merge_sort_state()->unsorted_block()->empty() ||
+        !_sorter->merge_sort_state()->get_sorted_block().empty()) {
+        if (_sorted_streams.empty()) {
+            // No spill has occurred, all data is in memory.
+            // Sort the remaining data, prepare for reading, and write to file.
+            internal_status = _sorter->do_sort();
+            if (!internal_status.ok()) {
+                return internal_status;
+            }
+            internal_status = _sorter->prepare_for_read(false);
+            if (!internal_status.ok()) {
+                return internal_status;
+            }
+            internal_status = _write_sorted_data();
+            return internal_status;
+        }
+
+        // Some data has already been spilled to disk.
+        // Spill the remaining in-memory data to a new spill stream.
+        internal_status = _do_spill();
+        if (!internal_status.ok()) {
+            return internal_status;
+        }
+    }
+
+    // Merge all spilled streams using multi-way merge sort and output final 
sorted data to files
+    if (!_sorted_streams.empty()) {
+        internal_status = _combine_files_output();
+        if (!internal_status.ok()) {
+            return internal_status;
+        }
+    }
+
+    // Return close_status if internal operations succeeded but the underlying
+    // partition writer's close() failed (e.g., file flush error).
+    // This prevents silent data loss where the caller thinks the write 
succeeded
+    // but the file was not properly closed.
+    return close_status;
+}
+
+void VIcebergSortWriter::_update_spill_block_batch_row_count(const 
vectorized::Block& block) {
+    auto rows = block.rows();
+    // Calculate average row size from the first non-empty block to determine
+    // the optimal batch size for spill operations
+    if (rows > 0 && 0 == _avg_row_bytes) {
+        _avg_row_bytes = std::max(1UL, block.bytes() / rows);
+        int64_t spill_batch_bytes = _runtime_state->spill_sort_batch_bytes(); 
// default 8MB
+        // Calculate how many rows fit in one spill batch (ceiling division)
+        _spill_block_batch_row_count = (spill_batch_bytes + _avg_row_bytes - 
1) / _avg_row_bytes;
+    }
+}
+
+Status VIcebergSortWriter::_flush_to_file() {
+    // Sort the accumulated data in memory
+    RETURN_IF_ERROR(_sorter->do_sort());
+    // Prepare the sorted data for sequential reading (builds merge tree if 
needed)
+    RETURN_IF_ERROR(_sorter->prepare_for_read(false));
+    // Write the sorted data to the current Parquet/ORC file
+    RETURN_IF_ERROR(_write_sorted_data());
+    // Close the current file (it has reached the target size) and open a new 
writer
+    RETURN_IF_ERROR(_close_current_writer_and_open_next());
+    // Reset the sorter state to accept new data for the next file
+    _sorter->reset();
+    return Status::OK();
+}
+
+Status VIcebergSortWriter::_write_sorted_data() {
+    // Read sorted blocks from the sorter one by one and write them
+    // to the underlying partition writer (Parquet/ORC file)
+    bool eos = false;
+    Block block;
+    while (!eos && !_runtime_state->is_cancelled()) {
+        RETURN_IF_ERROR(_sorter->get_next(_runtime_state, &block, &eos));
+        RETURN_IF_ERROR(_iceberg_partition_writer->write(block));
+        block.clear_column_data();
+    }
+    return Status::OK();
+}
+
+Status VIcebergSortWriter::_close_current_writer_and_open_next() {
+    // Save the current file name and index before closing, so the next file
+    // can use an incremented index (e.g., file_0, file_1, file_2, ...)
+    std::string current_file_name = _iceberg_partition_writer->file_name();
+    int current_file_index = _iceberg_partition_writer->file_name_index();
+    RETURN_IF_ERROR(_iceberg_partition_writer->close(Status::OK()));
+
+    // Use the lambda to create a new partition writer with the next file index
+    _iceberg_partition_writer = _create_writer_lambda(&current_file_name, 
current_file_index + 1);
+    if (!_iceberg_partition_writer) {
+        return Status::InternalError("Failed to create new partition writer");
+    }
+
+    RETURN_IF_ERROR(_iceberg_partition_writer->open(_runtime_state, _profile, 
_row_desc));
+    return Status::OK();
+}
+
+int32_t VIcebergSortWriter::_get_spill_batch_size() const {
+    // Clamp the batch row count to int32_t max to prevent overflow
+    if (_spill_block_batch_row_count > std::numeric_limits<int32_t>::max()) {
+        return std::numeric_limits<int32_t>::max();
+    }
+    return static_cast<int32_t>(_spill_block_batch_row_count);
+}
+
+Status VIcebergSortWriter::_do_spill() {
+    COUNTER_UPDATE(_do_spill_count_counter, 1);
+
+    // Explicitly sort the data before preparing for spill read.
+    // Although FullSorter::prepare_for_read(is_spill=true) internally calls 
do_sort()
+    // when there is unsorted data (see sorter.cpp), we call do_sort() 
explicitly here
+    // for clarity and to guarantee that the data written to the spill stream 
is sorted.
+    // This ensures correctness of the subsequent multi-way merge phase.
+    RETURN_IF_ERROR(_sorter->do_sort());
+
+    // prepare_for_read(is_spill=true) adjusts limit/offset for spill mode
+    // and builds the merge tree for reading sorted data
+    RETURN_IF_ERROR(_sorter->prepare_for_read(true));
+    int32_t batch_size = _get_spill_batch_size();
+
+    // Register a new spill stream to store the sorted data on disk
+    SpillStreamSPtr spilling_stream;
+    
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
+            _runtime_state, spilling_stream, 
print_id(_runtime_state->query_id()), "iceberg-sort",
+            1 /* node_id */, batch_size, 
_runtime_state->spill_sort_batch_bytes(), _profile));
+    _sorted_streams.emplace_back(spilling_stream);
+
+    // Read sorted data from the sorter in batches and write to the spill 
stream
+    bool eos = false;
+    Block block;
+    while (!eos && !_runtime_state->is_cancelled()) {
+        // Use _get_spill_batch_size() for safe narrowing conversion from 
size_t to int32_t
+        // instead of C-style cast, which includes bounds checking
+        RETURN_IF_ERROR(_sorter->merge_sort_read_for_spill(_runtime_state, 
&block,
+                                                           
_get_spill_batch_size(), &eos));
+        RETURN_IF_ERROR(spilling_stream->spill_block(_runtime_state, block, 
eos));
+        block.clear_column_data();
+    }
+    // Reset the sorter to free memory and accept new data
+    _sorter->reset();
+    return Status::OK();
+}
+
+Status VIcebergSortWriter::_combine_files_output() {
+    // If there are too many spill streams to merge at once (limited by 
memory),
+    // perform intermediate merges to reduce the number of streams
+    while (_sorted_streams.size() > 
static_cast<size_t>(_calc_max_merge_streams())) {
+        RETURN_IF_ERROR(_do_intermediate_merge());
+    }
+
+    // Create the final merger that combines all remaining spill streams
+    RETURN_IF_ERROR(_create_final_merger());
+
+    // Read merged sorted data and write to Parquet/ORC files,
+    // splitting into new files when the target file size is exceeded
+    bool eos = false;
+    Block output_block;
+    size_t current_file_bytes = _iceberg_partition_writer->written_len();
+    while (!eos && !_runtime_state->is_cancelled()) {
+        RETURN_IF_ERROR(_merger->get_next(&output_block, &eos));
+        if (output_block.rows() > 0) {
+            size_t block_bytes = output_block.bytes();
+            RETURN_IF_ERROR(_iceberg_partition_writer->write(output_block));
+            current_file_bytes += block_bytes;
+            // If the current file exceeds the target size, close it and open 
a new one
+            if (current_file_bytes > _target_file_size_bytes) {
+                RETURN_IF_ERROR(_close_current_writer_and_open_next());
+                current_file_bytes = 0;
+            }
+        }
+        output_block.clear_column_data();
+    }
+    return Status::OK();
+}
+
+Status VIcebergSortWriter::_do_intermediate_merge() {
+    int max_stream_count = _calc_max_merge_streams();
+    // Merge a subset of streams (non-final merge) to reduce total stream count
+    RETURN_IF_ERROR(_create_merger(false, _spill_block_batch_row_count, 
max_stream_count));
+
+    // Register a new spill stream for the merged output
+    int32_t batch_size = _get_spill_batch_size();
+    SpillStreamSPtr tmp_stream;
+    
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
+            _runtime_state, tmp_stream, print_id(_runtime_state->query_id()), 
"iceberg-sort-merge",
+            1 /* node_id */, batch_size, 
_runtime_state->spill_sort_batch_bytes(), _profile));
+
+    _sorted_streams.emplace_back(tmp_stream);
+
+    // Merge the selected streams and write the result to the new spill stream
+    bool eos = false;
+    Block merge_sorted_block;
+    while (!eos && !_runtime_state->is_cancelled()) {
+        merge_sorted_block.clear_column_data();
+        RETURN_IF_ERROR(_merger->get_next(&merge_sorted_block, &eos));
+        RETURN_IF_ERROR(tmp_stream->spill_block(_runtime_state, 
merge_sorted_block, eos));
+    }
+
+    // Clean up the streams that were consumed during this intermediate merge
+    for (auto& stream : _current_merging_streams) {
+        
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
+    }
+    _current_merging_streams.clear();
+    return Status::OK();
+}
+
+int VIcebergSortWriter::_calc_max_merge_streams() const {
+    // Calculate the maximum number of streams that can be merged 
simultaneously
+    // based on the available memory limit and per-stream batch size
+    auto count = _runtime_state->spill_sort_mem_limit() / 
_runtime_state->spill_sort_batch_bytes();
+    if (count > std::numeric_limits<int>::max()) {
+        return std::numeric_limits<int>::max();
+    }
+    // Ensure at least 2 streams can be merged (minimum for a merge operation)
+    return std::max(2, static_cast<int>(count));
+}
+
+Status VIcebergSortWriter::_create_merger(bool is_final_merge, size_t 
batch_size, int num_streams) {
+    // Create a multi-way merge sorter that reads from multiple sorted spill 
streams
+    std::vector<vectorized::BlockSupplier> child_block_suppliers;
+    _merger = 
std::make_unique<vectorized::VSortedRunMerger>(_sorter->get_sort_description(),
+                                                             batch_size, -1, 
0, _profile);
+    _current_merging_streams.clear();
+
+    // For final merge: merge all remaining streams
+    // For intermediate merge: merge only num_streams streams
+    size_t streams_to_merge = is_final_merge ? _sorted_streams.size() : 
num_streams;
+
+    for (size_t i = 0; i < streams_to_merge && !_sorted_streams.empty(); ++i) {
+        auto stream = _sorted_streams.front();
+        stream->set_read_counters(_profile);
+        _current_merging_streams.emplace_back(stream);
+        // Create a block supplier lambda that reads the next block from the 
spill stream
+        child_block_suppliers.emplace_back([stream](vectorized::Block* block, 
bool* eos) {
+            return stream->read_next_block_sync(block, eos);
+        });
+        _sorted_streams.pop_front();
+    }
+
+    RETURN_IF_ERROR(_merger->prepare(child_block_suppliers));
+    return Status::OK();
+}
+
+Status VIcebergSortWriter::_create_final_merger() {
+    // Final merger uses the runtime batch size and merges all remaining 
streams
+    return _create_merger(true, _runtime_state->batch_size(), 1);
+}
+
+void VIcebergSortWriter::_cleanup_spill_streams() {
+    // Clean up all remaining spill streams to release disk resources
+    for (auto& stream : _sorted_streams) {
+        
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
+    }
+    _sorted_streams.clear();
+
+    // Also clean up any streams that are currently being merged
+    for (auto& stream : _current_merging_streams) {
+        
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
+    }
+    _current_merging_streams.clear();
+}
+
+#include "common/compile_check_end.h"
+} // namespace doris::vectorized
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_sort_writer.h 
b/be/src/vec/sink/writer/iceberg/viceberg_sort_writer.h
index f9b91021f2b..3df0f5a7d28 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_sort_writer.h
+++ b/be/src/vec/sink/writer/iceberg/viceberg_sort_writer.h
@@ -22,28 +22,59 @@
 #include <utility>
 #include <vector>
 
-#include "common/config.h"
 #include "common/object_pool.h"
-#include "runtime/runtime_state.h"
 #include "util/runtime_profile.h"
 #include "vec/common/sort/sorter.h"
 #include "vec/core/block.h"
-#include "vec/exprs/vslot_ref.h"
 #include "vec/sink/writer/iceberg/viceberg_partition_writer.h"
 #include "vec/sink/writer/iceberg/vpartition_writer_base.h"
-#include "vec/spill/spill_stream.h"
-#include "vec/spill/spill_stream_manager.h"
+
+// Forward declarations to minimize header dependencies.
+// Previously, spill_stream.h and spill_stream_manager.h were included directly
+// in this header, causing heavy transitive includes for all files that include
+// viceberg_sort_writer.h. Moving implementations to .cpp allows us to 
forward-declare
+// these types and only include their headers in the .cpp file.
+namespace doris::vectorized {
+class SpillStream;
+using SpillStreamSPtr = std::shared_ptr<SpillStream>;
+} // namespace doris::vectorized
 
 namespace doris {
 class RuntimeState;
 class RuntimeProfile;
 
 namespace vectorized {
+
+/**
+ * VIcebergSortWriter is a decorator around VIcebergPartitionWriter that adds 
sort-order support.
+ *
+ * Architecture:
+ *   IPartitionWriterBase (abstract base class)
+ *       ├── VIcebergPartitionWriter  (writes data directly to Parquet/ORC 
files)
+ *       └── VIcebergSortWriter       (sorts data before delegating to 
VIcebergPartitionWriter)
+ *
+ * Key behaviors:
+ * 1. In-memory sorting: Accumulates data in a FullSorter. When accumulated 
data reaches
+ *    _target_file_size_bytes, sorts and flushes to a file, then opens a new 
writer.
+ * 2. Spill to disk: When triggered by the memory management system via 
trigger_spill(),
+ *    sorts and writes data to a SpillStream on disk.
+ * 3. Multi-way merge: When closing, merges all spilled streams using a 
VSortedRunMerger
+ *    to produce final sorted output files.
+ */
 class VIcebergSortWriter : public IPartitionWriterBase {
 public:
+    // Lambda type for creating new VIcebergPartitionWriter instances.
+    // Used when a file is completed and a new file needs to be opened.
     using CreateWriterLambda = 
std::function<std::shared_ptr<VIcebergPartitionWriter>(
             const std::string* file_name, int file_name_index)>;
 
+    /**
+     * Constructor.
+     * @param partition_writer The underlying writer that handles actual file 
I/O
+     * @param sort_info Sort specification (columns, asc/desc, nulls 
first/last)
+     * @param target_file_size_bytes Target file size before splitting to a 
new file
+     * @param create_writer_lambda Lambda for creating new writers when file 
splitting occurs
+     */
     VIcebergSortWriter(std::shared_ptr<VIcebergPartitionWriter> 
partition_writer,
                        TSortInfo sort_info, int64_t target_file_size_bytes,
                        CreateWriterLambda create_writer_lambda = nullptr)
@@ -52,77 +83,17 @@ public:
               _create_writer_lambda(std::move(create_writer_lambda)),
               _target_file_size_bytes(target_file_size_bytes) {}
 
+    // Initialize sort expressions, create FullSorter, and open the underlying 
writer
     Status open(RuntimeState* state, RuntimeProfile* profile,
-                const RowDescriptor* row_desc) override {
-        DCHECK(row_desc != nullptr);
-        _runtime_state = state;
-        _profile = profile;
-        _row_desc = row_desc;
-
-        RETURN_IF_ERROR(_vsort_exec_exprs.init(_sort_info, &_pool));
-        RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, *row_desc, 
*row_desc));
-        RETURN_IF_ERROR(_vsort_exec_exprs.open(state));
-
-        _sorter = vectorized::FullSorter::create_unique(
-                _vsort_exec_exprs, -1, 0, &_pool, _sort_info.is_asc_order, 
_sort_info.nulls_first,
-                *row_desc, state, _profile);
-        _sorter->init_profile(_profile);
-        _sorter->set_enable_spill();
-        _do_spill_count_counter = ADD_COUNTER(_profile, "IcebergDoSpillCount", 
TUnit::UNIT);
-        RETURN_IF_ERROR(_iceberg_partition_writer->open(state, profile, 
row_desc));
-        return Status::OK();
-    }
+                const RowDescriptor* row_desc) override;
 
-    Status write(vectorized::Block& block) override {
-        RETURN_IF_ERROR(_sorter->append_block(&block));
-        _update_spill_block_batch_row_count(block);
-        // sort in memory and write directly to Parquet file
-        if (_sorter->data_size() >= _target_file_size_bytes) {
-            return _flush_to_file();
-        }
-        // trigger_spill() will be called by memory management system
-        return Status::OK();
-    }
+    // Append data block to the sorter; triggers flush when target file size 
is reached
+    Status write(vectorized::Block& block) override;
 
-    Status close(const Status& status) override {
-        Defer defer {[&]() {
-            Status st = _iceberg_partition_writer->close(status);
-            if (!st.ok()) {
-                LOG(WARNING) << fmt::format("_iceberg_partition_writer close 
failed, reason: {}",
-                                            st.to_string());
-            }
-            _cleanup_spill_streams();
-        }};
-
-        if (!status.ok() || _runtime_state->is_cancelled()) {
-            return status;
-        }
-
-        if (_sorter == nullptr) {
-            return Status::OK();
-        }
-
-        if (!_sorter->merge_sort_state()->unsorted_block()->empty() ||
-            !_sorter->merge_sort_state()->get_sorted_block().empty()) {
-            if (_sorted_streams.empty()) {
-                // data remaining in memory
-                RETURN_IF_ERROR(_sorter->do_sort());
-                RETURN_IF_ERROR(_sorter->prepare_for_read(false));
-                RETURN_IF_ERROR(_write_sorted_data());
-                return Status::OK();
-            }
-
-            // spill remaining data
-            RETURN_IF_ERROR(_do_spill());
-        }
-
-        // Merge all spilled streams and output final sorted data
-        if (!_sorted_streams.empty()) {
-            RETURN_IF_ERROR(_combine_files_output());
-        }
-
-        return Status::OK();
-    }
+    // Sort remaining data, perform multi-way merge if spill occurred, and 
close the writer.
+    // Error handling: Tracks internal errors from intermediate operations and 
propagates
+    // the actual error status (not the original caller status) to the 
underlying writer.
+    Status close(const Status& status) override;
 
     inline const std::string& file_name() const override {
         return _iceberg_partition_writer->file_name();
@@ -134,197 +105,57 @@ public:
 
     inline size_t written_len() const override { return 
_iceberg_partition_writer->written_len(); }
 
+    // Returns a raw pointer to the FullSorter, used by 
SpillIcebergTableSinkLocalState
+    // to query memory usage (data_size, get_reserve_mem_size)
     auto sorter() const { return _sorter.get(); }
 
+    // Called by the memory management system to trigger spilling data to disk
     Status trigger_spill() { return _do_spill(); }
 
 private:
-    // how many rows need in spill block batch
-    void _update_spill_block_batch_row_count(const vectorized::Block& block) {
-        auto rows = block.rows();
-        if (rows > 0 && 0 == _avg_row_bytes) {
-            _avg_row_bytes = std::max(1UL, block.bytes() / rows);
-            int64_t spill_batch_bytes = 
_runtime_state->spill_sort_batch_bytes(); // default 8MB
-            _spill_block_batch_row_count =
-                    (spill_batch_bytes + _avg_row_bytes - 1) / _avg_row_bytes;
-        }
-    }
+    // Calculate average row size from the first non-empty block to determine
+    // the optimal batch row count for spill operations
+    void _update_spill_block_batch_row_count(const vectorized::Block& block);
 
-    // have enought data, flush in-memory sorted data to file
-    Status _flush_to_file() {
-        RETURN_IF_ERROR(_sorter->do_sort());
-        RETURN_IF_ERROR(_sorter->prepare_for_read(false));
-        RETURN_IF_ERROR(_write_sorted_data());
-        RETURN_IF_ERROR(_close_current_writer_and_open_next());
-        _sorter->reset();
-        return Status::OK();
-    }
+    // Sort in-memory data and flush to a Parquet/ORC file, then open a new 
writer
+    Status _flush_to_file();
 
-    // write data into file
-    Status _write_sorted_data() {
-        bool eos = false;
-        Block block;
-        while (!eos && !_runtime_state->is_cancelled()) {
-            RETURN_IF_ERROR(_sorter->get_next(_runtime_state, &block, &eos));
-            RETURN_IF_ERROR(_iceberg_partition_writer->write(block));
-            block.clear_column_data();
-        }
-        return Status::OK();
-    }
+    // Read sorted data from the sorter and write to the underlying partition 
writer
+    Status _write_sorted_data();
 
-    // close current writer and open a new one with incremented file index
-    Status _close_current_writer_and_open_next() {
-        std::string current_file_name = _iceberg_partition_writer->file_name();
-        int current_file_index = _iceberg_partition_writer->file_name_index();
-        RETURN_IF_ERROR(_iceberg_partition_writer->close(Status::OK()));
+    // Close the current partition writer and create a new one with an 
incremented file index
+    Status _close_current_writer_and_open_next();
 
-        _iceberg_partition_writer =
-                _create_writer_lambda(&current_file_name, current_file_index + 
1);
-        if (!_iceberg_partition_writer) {
-            return Status::InternalError("Failed to create new partition 
writer");
-        }
+    // Get the batch size for spill operations, clamped to int32_t max
+    int32_t _get_spill_batch_size() const;
 
-        RETURN_IF_ERROR(_iceberg_partition_writer->open(_runtime_state, 
_profile, _row_desc));
-        return Status::OK();
-    }
+    // Sort the current in-memory data and write it to a new spill stream on 
disk.
+    // Explicitly calls do_sort() before prepare_for_read() to guarantee 
sorted output.
+    Status _do_spill();
 
-    // batch size max is int32_t max
-    int32_t _get_spill_batch_size() const {
-        if (_spill_block_batch_row_count > 
std::numeric_limits<int32_t>::max()) {
-            return std::numeric_limits<int32_t>::max();
-        }
-        return static_cast<int32_t>(_spill_block_batch_row_count);
-    }
+    // Merge all spilled streams and output final sorted data to Parquet/ORC 
files.
+    // Handles file splitting when output exceeds target file size.
+    Status _combine_files_output();
 
-    Status _do_spill() {
-        COUNTER_UPDATE(_do_spill_count_counter, 1);
-        RETURN_IF_ERROR(_sorter->prepare_for_read(true));
-        int32_t batch_size = _get_spill_batch_size();
-
-        SpillStreamSPtr spilling_stream;
-        
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
-                _runtime_state, spilling_stream, 
print_id(_runtime_state->query_id()),
-                "iceberg-sort", 1 /* node_id */, batch_size,
-                _runtime_state->spill_sort_batch_bytes(), _profile));
-        _sorted_streams.emplace_back(spilling_stream);
-
-        // spill sorted data to stream
-        bool eos = false;
-        Block block;
-        while (!eos && !_runtime_state->is_cancelled()) {
-            RETURN_IF_ERROR(_sorter->merge_sort_read_for_spill(
-                    _runtime_state, &block, (int)_spill_block_batch_row_count, 
&eos));
-            RETURN_IF_ERROR(spilling_stream->spill_block(_runtime_state, 
block, eos));
-            block.clear_column_data();
-        }
-        _sorter->reset();
-        return Status::OK();
-    }
+    // Perform an intermediate merge when there are too many spill streams to 
merge at once.
+    // Merges a subset of streams into a single new stream.
+    Status _do_intermediate_merge();
 
-    // merge spilled streams and output sorted data to Parquet files
-    Status _combine_files_output() {
-        // merge until all streams can be merged in one pass
-        while (_sorted_streams.size() > 
static_cast<size_t>(_calc_max_merge_streams())) {
-            RETURN_IF_ERROR(_do_intermediate_merge());
-        }
-        RETURN_IF_ERROR(_create_final_merger());
-
-        bool eos = false;
-        Block output_block;
-        size_t current_file_bytes = _iceberg_partition_writer->written_len();
-        while (!eos && !_runtime_state->is_cancelled()) {
-            RETURN_IF_ERROR(_merger->get_next(&output_block, &eos));
-            if (output_block.rows() > 0) {
-                size_t block_bytes = output_block.bytes();
-                
RETURN_IF_ERROR(_iceberg_partition_writer->write(output_block));
-                current_file_bytes += block_bytes;
-                if (current_file_bytes > _target_file_size_bytes) {
-                    // close current writer and commit to file
-                    RETURN_IF_ERROR(_close_current_writer_and_open_next());
-                    current_file_bytes = 0;
-                }
-            }
-            output_block.clear_column_data();
-        }
-        return Status::OK();
-    }
+    // Calculate the maximum number of streams that can be merged 
simultaneously
+    // based on memory limits
+    int _calc_max_merge_streams() const;
 
-    Status _do_intermediate_merge() {
-        int max_stream_count = _calc_max_merge_streams();
-        RETURN_IF_ERROR(_create_merger(false, _spill_block_batch_row_count, 
max_stream_count));
-
-        // register new spill stream for merged output
-        int32_t batch_size = _get_spill_batch_size();
-        SpillStreamSPtr tmp_stream;
-        
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
-                _runtime_state, tmp_stream, 
print_id(_runtime_state->query_id()),
-                "iceberg-sort-merge", 1 /* node_id */, batch_size,
-                _runtime_state->spill_sort_batch_bytes(), _profile));
-
-        _sorted_streams.emplace_back(tmp_stream);
-
-        // merge current streams and write to new spill stream
-        bool eos = false;
-        Block merge_sorted_block;
-        while (!eos && !_runtime_state->is_cancelled()) {
-            merge_sorted_block.clear_column_data();
-            RETURN_IF_ERROR(_merger->get_next(&merge_sorted_block, &eos));
-            RETURN_IF_ERROR(tmp_stream->spill_block(_runtime_state, 
merge_sorted_block, eos));
-        }
-
-        // clean up merged streams
-        for (auto& stream : _current_merging_streams) {
-            
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
-        }
-        _current_merging_streams.clear();
-        return Status::OK();
-    }
+    // Create a VSortedRunMerger for merging spill streams
+    // @param is_final_merge If true, merges all remaining streams
+    // @param batch_size Number of rows per batch during merge
+    // @param num_streams Maximum number of streams to merge (used for 
intermediate merges)
+    Status _create_merger(bool is_final_merge, size_t batch_size, int 
num_streams);
 
-    int _calc_max_merge_streams() const {
-        auto count =
-                _runtime_state->spill_sort_mem_limit() / 
_runtime_state->spill_sort_batch_bytes();
-        if (count > std::numeric_limits<int>::max()) {
-            return std::numeric_limits<int>::max();
-        }
-        return std::max(2, static_cast<int>(count));
-    }
+    // Create the final merger that merges all remaining spill streams
+    Status _create_final_merger();
 
-    // create merger for merging spill streams
-    Status _create_merger(bool is_final_merge, size_t batch_size, int 
num_streams) {
-        std::vector<vectorized::BlockSupplier> child_block_suppliers;
-        _merger = 
std::make_unique<vectorized::VSortedRunMerger>(_sorter->get_sort_description(),
-                                                                 batch_size, 
-1, 0, _profile);
-        _current_merging_streams.clear();
-        size_t streams_to_merge = is_final_merge ? _sorted_streams.size() : 
num_streams;
-
-        for (size_t i = 0; i < streams_to_merge && !_sorted_streams.empty(); 
++i) {
-            auto stream = _sorted_streams.front();
-            stream->set_read_counters(_profile);
-            _current_merging_streams.emplace_back(stream);
-            child_block_suppliers.emplace_back([stream](vectorized::Block* 
block, bool* eos) {
-                return stream->read_next_block_sync(block, eos);
-            });
-            _sorted_streams.pop_front();
-        }
-
-        RETURN_IF_ERROR(_merger->prepare(child_block_suppliers));
-        return Status::OK();
-    }
-
-    Status _create_final_merger() { return _create_merger(true, 
_runtime_state->batch_size(), 1); }
-
-    // clean up all spill streams to ensure proper resource cleanup
-    void _cleanup_spill_streams() {
-        for (auto& stream : _sorted_streams) {
-            
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
-        }
-        _sorted_streams.clear();
-
-        for (auto& stream : _current_merging_streams) {
-            
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
-        }
-        _current_merging_streams.clear();
-    }
+    // Release all spill stream resources (both pending and currently merging)
+    void _cleanup_spill_streams();
 
     RuntimeState* _runtime_state = nullptr;
     RuntimeProfile* _profile = nullptr;
@@ -332,19 +163,28 @@ private:
     ObjectPool _pool;
     TSortInfo _sort_info;
     VSortExecExprs _vsort_exec_exprs;
+    // The underlying partition writer that handles actual Parquet/ORC file I/O
     std::shared_ptr<VIcebergPartitionWriter> _iceberg_partition_writer;
-    CreateWriterLambda _create_writer_lambda; // creating new writers after 
commit
+    // Lambda for creating new writers when file splitting occurs
+    CreateWriterLambda _create_writer_lambda;
 
-    // Sorter and merger
+    // Sorter and merger for handling in-memory sorting and multi-way merge
     std::unique_ptr<vectorized::FullSorter> _sorter;
     std::unique_ptr<vectorized::VSortedRunMerger> _merger;
+    // Queue of spill streams waiting to be merged (FIFO order)
     std::deque<vectorized::SpillStreamSPtr> _sorted_streams;
+    // Streams currently being consumed by the merger
     std::vector<vectorized::SpillStreamSPtr> _current_merging_streams;
 
-    int64_t _target_file_size_bytes = 0; //config::iceberg_sink_max_file_size 
default 1GB
+    // Target file size in bytes; files are split when this threshold is 
exceeded
+    // Default: config::iceberg_sink_max_file_size (1GB)
+    int64_t _target_file_size_bytes = 0;
+    // Average row size in bytes, computed from the first non-empty block
     size_t _avg_row_bytes = 0;
+    // Number of rows per spill batch, computed from average row size and 
spill_sort_batch_bytes
     size_t _spill_block_batch_row_count = 4096;
 
+    // Counter tracking how many times spill has been triggered
     RuntimeProfile::Counter* _do_spill_count_counter = nullptr;
 };
 
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h 
b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
index 842f962713f..0ef79bf9d75 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
+++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
@@ -60,9 +60,15 @@ public:
 
     Status close(Status) override;
 
-    std::shared_ptr<IPartitionWriterBase> _current_writer;
+    // Getter for the current partition writer.
+    // Used by SpillIcebergTableSinkLocalState to access the current writer for
+    // memory management operations (get_reserve_mem_size, revocable_mem_size, 
etc.).
+    const std::shared_ptr<IPartitionWriterBase>& current_writer() const { 
return _current_writer; }
 
 private:
+    // The currently active partition writer (may be VIcebergPartitionWriter 
or VIcebergSortWriter).
+    // Updated during write() to track which writer received the most recent 
data.
+    std::shared_ptr<IPartitionWriterBase> _current_writer;
     class IcebergPartitionColumn {
     public:
         IcebergPartitionColumn(const iceberg::PartitionField& field,
diff --git a/be/src/vec/spill/spill_reader.h b/be/src/vec/spill/spill_reader.h
index 8f379cf78af..1999b8a9d88 100644
--- a/be/src/vec/spill/spill_reader.h
+++ b/be/src/vec/spill/spill_reader.h
@@ -61,7 +61,7 @@ public:
         RuntimeProfile* custom_profile = 
operator_profile->get_child("CustomCounters");
         DCHECK(custom_profile != nullptr);
         _read_file_timer = custom_profile->get_counter("SpillReadFileTime");
-        _deserialize_timer = 
custom_profile->get_counter("SpillReadDerializeBlockTime");
+        _deserialize_timer = 
custom_profile->get_counter("SpillReadDeserializeBlockTime");
         _read_block_count = custom_profile->get_counter("SpillReadBlockCount");
         _read_block_data_size = 
custom_profile->get_counter("SpillReadBlockBytes");
         _read_file_size = custom_profile->get_counter("SpillReadFileBytes");
diff --git a/be/test/pipeline/exec/multi_cast_data_streamer_test.cpp 
b/be/test/pipeline/exec/multi_cast_data_streamer_test.cpp
index e3bd5353d5b..72170bc7fe0 100644
--- a/be/test/pipeline/exec/multi_cast_data_streamer_test.cpp
+++ b/be/test/pipeline/exec/multi_cast_data_streamer_test.cpp
@@ -56,8 +56,8 @@ public:
             ADD_COUNTER_WITH_LEVEL(custom_profile.get(), 
"SpillWriteFileBytes", TUnit::BYTES, 1);
             ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillWriteRows", 
TUnit::UNIT, 1);
             ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillReadFileTime", 
TUnit::UNIT, 1);
-            ADD_COUNTER_WITH_LEVEL(custom_profile.get(), 
"SpillReadDerializeBlockTime", TUnit::UNIT,
-                                   1);
+            ADD_COUNTER_WITH_LEVEL(custom_profile.get(), 
"SpillReadDeserializeBlockTime",
+                                   TUnit::UNIT, 1);
             ADD_COUNTER_WITH_LEVEL(custom_profile.get(), 
"SpillReadBlockCount", TUnit::UNIT, 1);
             ADD_COUNTER_WITH_LEVEL(custom_profile.get(), 
"SpillReadBlockBytes", TUnit::UNIT, 1);
             ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillReadFileBytes", 
TUnit::UNIT, 1);
@@ -100,7 +100,8 @@ public:
             ADD_TIMER_WITH_LEVEL(source_custom_profiles[i].get(), 
"SpillReadTaskWaitInQueueTime",
                                  1);
             ADD_TIMER_WITH_LEVEL(source_custom_profiles[i].get(), 
"SpillReadFileTime", 1);
-            ADD_TIMER_WITH_LEVEL(source_custom_profiles[i].get(), 
"SpillReadDerializeBlockTime", 1);
+            ADD_TIMER_WITH_LEVEL(source_custom_profiles[i].get(), 
"SpillReadDeserializeBlockTime",
+                                 1);
             ADD_COUNTER_WITH_LEVEL(source_custom_profiles[i].get(), 
"SpillReadBlockCount",
                                    TUnit::UNIT, 1);
             ADD_COUNTER_WITH_LEVEL(source_custom_profiles[i].get(), 
"SpillReadBlockBytes",
diff --git a/be/test/pipeline/operator/spillable_operator_test_helper.cpp 
b/be/test/pipeline/operator/spillable_operator_test_helper.cpp
index 3e580365196..1399e73ec01 100644
--- a/be/test/pipeline/operator/spillable_operator_test_helper.cpp
+++ b/be/test/pipeline/operator/spillable_operator_test_helper.cpp
@@ -53,7 +53,7 @@ void SpillableOperatorTestHelper::SetUp() {
     ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillWriteFileBytes", 
TUnit::BYTES, 1);
     ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillWriteRows", 
TUnit::UNIT, 1);
     ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillReadFileTime", 
TUnit::UNIT, 1);
-    ADD_COUNTER_WITH_LEVEL(custom_profile.get(), 
"SpillReadDerializeBlockTime", TUnit::UNIT, 1);
+    ADD_COUNTER_WITH_LEVEL(custom_profile.get(), 
"SpillReadDeserializeBlockTime", TUnit::UNIT, 1);
     ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillReadBlockCount", 
TUnit::UNIT, 1);
     ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillReadBlockBytes", 
TUnit::UNIT, 1);
     ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillReadFileBytes", 
TUnit::UNIT, 1);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to