github-actions[bot] commented on code in PR #60978:
URL: https://github.com/apache/doris/pull/60978#discussion_r2876960788


##########
be/src/vec/sink/writer/iceberg/viceberg_sort_writer.cpp:
##########
@@ -0,0 +1,362 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/sink/writer/iceberg/viceberg_sort_writer.h"
+
+#include "runtime/exec_env.h"
+#include "runtime/runtime_state.h"
+#include "vec/spill/spill_stream.h"
+#include "vec/spill/spill_stream_manager.h"
+
+namespace doris::vectorized {
+
+Status VIcebergSortWriter::open(RuntimeState* state, RuntimeProfile* profile,
+                                const RowDescriptor* row_desc) {
+    // row_desc is required for initializing sort expressions
+    DCHECK(row_desc != nullptr);
+    _runtime_state = state;
+    _profile = profile;
+    _row_desc = row_desc;
+
+    // Initialize sort expressions from sort_info (contains ordering columns, 
asc/desc, nulls first/last)
+    RETURN_IF_ERROR(_vsort_exec_exprs.init(_sort_info, &_pool));
+    RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, *row_desc, *row_desc));
+    RETURN_IF_ERROR(_vsort_exec_exprs.open(state));
+
+    // Create FullSorter for in-memory sorting with spill support enabled.
+    // Parameters: limit=-1 (no limit), offset=0 (no offset)
+    _sorter = vectorized::FullSorter::create_unique(_vsort_exec_exprs, -1, 0, 
&_pool,
+                                                    _sort_info.is_asc_order, 
_sort_info.nulls_first,
+                                                    *row_desc, state, 
_profile);
+    _sorter->init_profile(_profile);
+    // Enable spill support so the sorter can be used with the spill framework
+    _sorter->set_enable_spill();
+    _do_spill_count_counter = ADD_COUNTER(_profile, "IcebergDoSpillCount", 
TUnit::UNIT);
+
+    // Open the underlying partition writer that handles actual file I/O
+    RETURN_IF_ERROR(_iceberg_partition_writer->open(state, profile, row_desc));
+    return Status::OK();
+}
+
+Status VIcebergSortWriter::write(vectorized::Block& block) {
+    // Append incoming block data to the sorter's internal buffer
+    RETURN_IF_ERROR(_sorter->append_block(&block));
+    _update_spill_block_batch_row_count(block);
+
+    // When accumulated data size reaches the target file size threshold,
+    // sort the data in memory and flush it directly to a Parquet/ORC file.
+    // This avoids holding too much data in memory before writing.
+    if (_sorter->data_size() >= _target_file_size_bytes) {
+        return _flush_to_file();
+    }
+
+    // If data size is below threshold, wait for more data.
+    // Note: trigger_spill() may be called externally by the memory management
+    // system if memory pressure is high.
+    return Status::OK();
+}
+
+Status VIcebergSortWriter::close(const Status& status) {
+    // Track the actual internal status of operations performed during close.
+    // This is important because if intermediate operations (like do_sort()) 
fail,
+    // we need to propagate the actual error status to the underlying 
partition writer's
+    // close() call, rather than the original status parameter which could be 
OK.
+    Status internal_status = Status::OK();
+
+    // Defer ensures the underlying partition writer is always closed and
+    // spill streams are cleaned up, regardless of whether intermediate 
operations succeed.
+    // Uses internal_status to propagate any errors that occurred during close 
operations.
+    Defer defer {[&]() {
+        // If any intermediate operation failed, pass that error to the 
partition writer;
+        // otherwise, pass the original status from the caller.
+        Status st =
+                _iceberg_partition_writer->close(internal_status.ok() ? status 
: internal_status);
+        if (!st.ok()) {
+            LOG(WARNING) << fmt::format("_iceberg_partition_writer close 
failed, reason: {}",
+                                        st.to_string());

Review Comment:
   **[Medium]** The Defer lambda catches the return status of 
`_iceberg_partition_writer->close()` but only logs a warning and discards it. 
If the underlying writer's close fails (e.g., a Parquet file flush error), the 
caller of `VIcebergSortWriter::close()` will receive `Status::OK()` (from line 
140) while the actual file close error is silently lost.
   
   This could mask data loss—the caller thinks the write succeeded but the file 
was not properly closed. Consider propagating this error, e.g., by capturing it 
in a local variable and returning it from `close()`.
   
   Note: This was the same behavior in the original code before this PR, but 
since this PR explicitly aims to fix error handling in `close()` (item 4 in the 
PR description), it would be good to address this too.



##########
be/src/vec/sink/writer/iceberg/viceberg_sort_writer.cpp:
##########
@@ -0,0 +1,362 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/sink/writer/iceberg/viceberg_sort_writer.h"
+
+#include "runtime/exec_env.h"
+#include "runtime/runtime_state.h"
+#include "vec/spill/spill_stream.h"
+#include "vec/spill/spill_stream_manager.h"
+
+namespace doris::vectorized {
+
+Status VIcebergSortWriter::open(RuntimeState* state, RuntimeProfile* profile,
+                                const RowDescriptor* row_desc) {
+    // row_desc is required for initializing sort expressions
+    DCHECK(row_desc != nullptr);
+    _runtime_state = state;
+    _profile = profile;
+    _row_desc = row_desc;
+
+    // Initialize sort expressions from sort_info (contains ordering columns, 
asc/desc, nulls first/last)
+    RETURN_IF_ERROR(_vsort_exec_exprs.init(_sort_info, &_pool));
+    RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, *row_desc, *row_desc));
+    RETURN_IF_ERROR(_vsort_exec_exprs.open(state));
+
+    // Create FullSorter for in-memory sorting with spill support enabled.
+    // Parameters: limit=-1 (no limit), offset=0 (no offset)
+    _sorter = vectorized::FullSorter::create_unique(_vsort_exec_exprs, -1, 0, 
&_pool,
+                                                    _sort_info.is_asc_order, 
_sort_info.nulls_first,
+                                                    *row_desc, state, 
_profile);
+    _sorter->init_profile(_profile);
+    // Enable spill support so the sorter can be used with the spill framework
+    _sorter->set_enable_spill();
+    _do_spill_count_counter = ADD_COUNTER(_profile, "IcebergDoSpillCount", 
TUnit::UNIT);
+
+    // Open the underlying partition writer that handles actual file I/O
+    RETURN_IF_ERROR(_iceberg_partition_writer->open(state, profile, row_desc));
+    return Status::OK();
+}
+
+Status VIcebergSortWriter::write(vectorized::Block& block) {
+    // Append incoming block data to the sorter's internal buffer
+    RETURN_IF_ERROR(_sorter->append_block(&block));
+    _update_spill_block_batch_row_count(block);
+
+    // When accumulated data size reaches the target file size threshold,
+    // sort the data in memory and flush it directly to a Parquet/ORC file.
+    // This avoids holding too much data in memory before writing.
+    if (_sorter->data_size() >= _target_file_size_bytes) {
+        return _flush_to_file();
+    }
+
+    // If data size is below threshold, wait for more data.
+    // Note: trigger_spill() may be called externally by the memory management
+    // system if memory pressure is high.
+    return Status::OK();
+}
+
+Status VIcebergSortWriter::close(const Status& status) {
+    // Track the actual internal status of operations performed during close.
+    // This is important because if intermediate operations (like do_sort()) 
fail,
+    // we need to propagate the actual error status to the underlying 
partition writer's
+    // close() call, rather than the original status parameter which could be 
OK.
+    Status internal_status = Status::OK();
+
+    // Defer ensures the underlying partition writer is always closed and
+    // spill streams are cleaned up, regardless of whether intermediate 
operations succeed.
+    // Uses internal_status to propagate any errors that occurred during close 
operations.
+    Defer defer {[&]() {
+        // If any intermediate operation failed, pass that error to the 
partition writer;
+        // otherwise, pass the original status from the caller.
+        Status st =
+                _iceberg_partition_writer->close(internal_status.ok() ? status 
: internal_status);
+        if (!st.ok()) {
+            LOG(WARNING) << fmt::format("_iceberg_partition_writer close 
failed, reason: {}",
+                                        st.to_string());
+        }
+        _cleanup_spill_streams();
+    }};
+
+    // If the original status is already an error or the query is cancelled,
+    // skip all close operations and propagate the original error
+    if (!status.ok() || _runtime_state->is_cancelled()) {
+        return status;
+    }
+
+    // If sorter was never initialized (e.g., no data was written), nothing to 
do
+    if (_sorter == nullptr) {
+        return Status::OK();
+    }
+
+    // Check if there is any remaining data in the sorter (either unsorted or 
already sorted blocks)
+    if (!_sorter->merge_sort_state()->unsorted_block()->empty() ||
+        !_sorter->merge_sort_state()->get_sorted_block().empty()) {
+        if (_sorted_streams.empty()) {
+            // No spill has occurred, all data is in memory.
+            // Sort the remaining data, prepare for reading, and write to file.
+            internal_status = _sorter->do_sort();
+            if (!internal_status.ok()) {
+                return internal_status;
+            }
+            internal_status = _sorter->prepare_for_read(false);
+            if (!internal_status.ok()) {
+                return internal_status;
+            }
+            internal_status = _write_sorted_data();
+            return internal_status;
+        }
+
+        // Some data has already been spilled to disk.
+        // Spill the remaining in-memory data to a new spill stream.
+        internal_status = _do_spill();
+        if (!internal_status.ok()) {
+            return internal_status;
+        }
+    }
+
+    // Merge all spilled streams using multi-way merge sort and output final 
sorted data to files
+    if (!_sorted_streams.empty()) {
+        internal_status = _combine_files_output();
+        if (!internal_status.ok()) {
+            return internal_status;
+        }
+    }
+
+    return Status::OK();
+}
+
+void VIcebergSortWriter::_update_spill_block_batch_row_count(const 
vectorized::Block& block) {
+    auto rows = block.rows();
+    // Calculate average row size from the first non-empty block to determine
+    // the optimal batch size for spill operations
+    if (rows > 0 && 0 == _avg_row_bytes) {
+        _avg_row_bytes = std::max(1UL, block.bytes() / rows);
+        int64_t spill_batch_bytes = _runtime_state->spill_sort_batch_bytes(); 
// default 8MB
+        // Calculate how many rows fit in one spill batch (ceiling division)
+        _spill_block_batch_row_count = (spill_batch_bytes + _avg_row_bytes - 
1) / _avg_row_bytes;
+    }
+}
+
+Status VIcebergSortWriter::_flush_to_file() {
+    // Sort the accumulated data in memory
+    RETURN_IF_ERROR(_sorter->do_sort());
+    // Prepare the sorted data for sequential reading (builds merge tree if 
needed)
+    RETURN_IF_ERROR(_sorter->prepare_for_read(false));
+    // Write the sorted data to the current Parquet/ORC file
+    RETURN_IF_ERROR(_write_sorted_data());
+    // Close the current file (it has reached the target size) and open a new 
writer
+    RETURN_IF_ERROR(_close_current_writer_and_open_next());
+    // Reset the sorter state to accept new data for the next file
+    _sorter->reset();
+    return Status::OK();
+}
+
+Status VIcebergSortWriter::_write_sorted_data() {
+    // Read sorted blocks from the sorter one by one and write them
+    // to the underlying partition writer (Parquet/ORC file)
+    bool eos = false;
+    Block block;
+    while (!eos && !_runtime_state->is_cancelled()) {
+        RETURN_IF_ERROR(_sorter->get_next(_runtime_state, &block, &eos));
+        RETURN_IF_ERROR(_iceberg_partition_writer->write(block));
+        block.clear_column_data();
+    }
+    return Status::OK();
+}
+
+Status VIcebergSortWriter::_close_current_writer_and_open_next() {
+    // Save the current file name and index before closing, so the next file
+    // can use an incremented index (e.g., file_0, file_1, file_2, ...)
+    std::string current_file_name = _iceberg_partition_writer->file_name();
+    int current_file_index = _iceberg_partition_writer->file_name_index();
+    RETURN_IF_ERROR(_iceberg_partition_writer->close(Status::OK()));
+
+    // Use the lambda to create a new partition writer with the next file index
+    _iceberg_partition_writer = _create_writer_lambda(&current_file_name, 
current_file_index + 1);
+    if (!_iceberg_partition_writer) {
+        return Status::InternalError("Failed to create new partition writer");
+    }
+
+    RETURN_IF_ERROR(_iceberg_partition_writer->open(_runtime_state, _profile, 
_row_desc));
+    return Status::OK();
+}
+
+int32_t VIcebergSortWriter::_get_spill_batch_size() const {
+    // Clamp the batch row count to int32_t max to prevent overflow
+    if (_spill_block_batch_row_count > std::numeric_limits<int32_t>::max()) {
+        return std::numeric_limits<int32_t>::max();
+    }
+    return static_cast<int32_t>(_spill_block_batch_row_count);
+}
+
+Status VIcebergSortWriter::_do_spill() {
+    COUNTER_UPDATE(_do_spill_count_counter, 1);
+
+    // Explicitly sort the data before preparing for spill read.
+    // Although FullSorter::prepare_for_read(is_spill=true) internally calls 
do_sort()
+    // when there is unsorted data (see sorter.cpp), we call do_sort() 
explicitly here
+    // for clarity and to guarantee that the data written to the spill stream 
is sorted.
+    // This ensures correctness of the subsequent multi-way merge phase.
+    RETURN_IF_ERROR(_sorter->do_sort());
+
+    // prepare_for_read(is_spill=true) adjusts limit/offset for spill mode
+    // and builds the merge tree for reading sorted data
+    RETURN_IF_ERROR(_sorter->prepare_for_read(true));
+    int32_t batch_size = _get_spill_batch_size();
+
+    // Register a new spill stream to store the sorted data on disk
+    SpillStreamSPtr spilling_stream;
+    
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
+            _runtime_state, spilling_stream, 
print_id(_runtime_state->query_id()), "iceberg-sort",
+            1 /* node_id */, batch_size, 
_runtime_state->spill_sort_batch_bytes(), _profile));
+    _sorted_streams.emplace_back(spilling_stream);
+
+    // Read sorted data from the sorter in batches and write to the spill 
stream
+    bool eos = false;
+    Block block;
+    while (!eos && !_runtime_state->is_cancelled()) {
+        RETURN_IF_ERROR(_sorter->merge_sort_read_for_spill(
+                _runtime_state, &block, (int)_spill_block_batch_row_count, 
&eos));
+        RETURN_IF_ERROR(spilling_stream->spill_block(_runtime_state, block, 
eos));

Review Comment:
   **[Medium]** This uses a C-style cast `(int)_spill_block_batch_row_count` 
which is a narrowing conversion from `size_t` to `int`. The 
`_get_spill_batch_size()` method was specifically created (a few lines above) 
to handle this conversion safely with bounds checking. This line should use it:
   ```cpp
   RETURN_IF_ERROR(_sorter->merge_sort_read_for_spill(
           _runtime_state, &block, _get_spill_batch_size(), &eos));
   ```
   Using the C-style cast defeats the purpose of having the safe helper method.



##########
be/src/vec/sink/writer/iceberg/viceberg_sort_writer.cpp:
##########
@@ -0,0 +1,362 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/sink/writer/iceberg/viceberg_sort_writer.h"
+
+#include "runtime/exec_env.h"
+#include "runtime/runtime_state.h"
+#include "vec/spill/spill_stream.h"
+#include "vec/spill/spill_stream_manager.h"

Review Comment:
   **[Low]** This new `.cpp` file is missing `compile_check_begin.h` / 
`compile_check_end.h` includes. The codebase convention (and the code-review 
checklist) requires new files to include these paired headers to enable 
`-Wconversion` as an error, which would have caught the narrowing cast issue on 
line 234. Consider adding:
   ```cpp
   #include "common/compile_check_begin.h"
   ```
   after the last include, and:
   ```cpp
   #include "common/compile_check_end.h"
   ```
   at the end of the file after the namespace closing brace.



##########
be/src/pipeline/exec/spill_iceberg_table_sink_operator.cpp:
##########
@@ -182,7 +182,7 @@ void 
SpillIcebergTableSinkLocalState::_init_spill_counters() {
     ADD_COUNTER_WITH_LEVEL(profile, "SpillReadTaskCount", TUnit::UNIT, 1);
     ADD_TIMER_WITH_LEVEL(profile, "SpillReadTaskWaitInQueueTime", 1);
     ADD_TIMER_WITH_LEVEL(profile, "SpillReadFileTime", 1);
-    ADD_TIMER_WITH_LEVEL(profile, "SpillReadDerializeBlockTime", 1);
+    ADD_TIMER_WITH_LEVEL(profile, "SpillReadDeserializeBlockTime", 1);
     ADD_COUNTER_WITH_LEVEL(profile, "SpillReadBlockCount", TUnit::UNIT, 1);

Review Comment:
   **[Medium]** The typo fix from `SpillReadDerializeBlockTime` to 
`SpillReadDeserializeBlockTime` is correct, but it's **incomplete across the 
codebase**. Two test files still use the old misspelled name and need to be 
updated as well:
   
   1. `be/test/pipeline/operator/spillable_operator_test_helper.cpp` line 56: 
`"SpillReadDerializeBlockTime"`
   2. `be/test/pipeline/exec/multi_cast_data_streamer_test.cpp` lines 59 and 
103: `"SpillReadDerializeBlockTime"`
   
   After this PR, the production code registers the counter as 
`SpillReadDeserializeBlockTime`, but these tests will still register counters 
with the old name `SpillReadDerializeBlockTime`. This will cause 
`get_counter()` lookups to return `nullptr` when the spill reader tries to find 
its counter by the new name, potentially causing test failures or null 
dereferences.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to