This is an automated email from the ASF dual-hosted git repository.
Gabriel39 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5cc48ea03a7 [bug](iceberg) fix iceberg sink writer with spill report
error (#62899)
5cc48ea03a7 is described below
commit 5cc48ea03a77167cf6abd96adae4d13a4d648c63
Author: zhangstar333 <[email protected]>
AuthorDate: Mon May 25 14:56:00 2026 +0800
[bug](iceberg) fix iceberg sink writer with spill report error (#62899)
spill thread with write thread is different, so add mutex in the write
function.
```
mysql> create table web_sales
-> order by (ws_sold_date_sk, ws_item_sk, ws_order_number)
-> PROPERTIES (
-> 'write-format'='parquet'
-> )
-> as select * from tpcds_sf1000_parquet.web_sales;
ERROR 1105 (HY000): errCode = 2, detailMessage =
(172.20.49.239)[INTERNAL_ERROR][E6] Size of permutation (40800) is less than
required (48960) 0# doris::Exception::Exception(int,
std::basic_string_view<char, std::char_traits<char> > const&, bool) at
/home/zcp/repo_center/doris_branch-4.1/doris/be/src/common/exception.cpp:0
1# doris::Exception::Exception(int, std::basic_string_view<char,
std::char_traits<char> > const&) at
/usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/basic_stri
mysql>
```
---
.../operator/spill_iceberg_table_sink_operator.cpp | 33 +++++++++++-----------
.../sink/writer/iceberg/viceberg_sort_writer.cpp | 29 +++++++++++++++++++
.../sink/writer/iceberg/viceberg_sort_writer.h | 25 ++++++++++++----
.../sink/writer/iceberg/viceberg_table_writer.cpp | 6 ++--
.../sink/writer/iceberg/viceberg_table_writer.h | 10 +++++--
5 files changed, 75 insertions(+), 28 deletions(-)
diff --git a/be/src/exec/operator/spill_iceberg_table_sink_operator.cpp
b/be/src/exec/operator/spill_iceberg_table_sink_operator.cpp
index 8d2a2041ea9..67040e2762d 100644
--- a/be/src/exec/operator/spill_iceberg_table_sink_operator.cpp
+++ b/be/src/exec/operator/spill_iceberg_table_sink_operator.cpp
@@ -53,43 +53,42 @@ bool SpillIcebergTableSinkLocalState::is_blockable() const {
}
size_t SpillIcebergTableSinkLocalState::get_reserve_mem_size(RuntimeState*
state, bool eos) {
- if (!_writer || !_writer->current_writer()) {
+ if (!_writer) {
return 0;
}
-
- auto* sort_writer =
dynamic_cast<VIcebergSortWriter*>(_writer->current_writer().get());
- if (!sort_writer || !sort_writer->sorter()) {
+ auto current_writer = _writer->current_writer();
+ auto* sort_writer =
dynamic_cast<VIcebergSortWriter*>(current_writer.get());
+ if (!sort_writer) {
return 0;
}
- return sort_writer->sorter()->get_reserve_mem_size(state, eos);
+ return sort_writer->get_reserve_mem_size(state, eos);
}
size_t SpillIcebergTableSinkLocalState::get_revocable_mem_size(RuntimeState*
state) const {
- if (!_writer || !_writer->current_writer()) {
+ if (!_writer) {
return 0;
}
-
- auto* sort_writer =
dynamic_cast<VIcebergSortWriter*>(_writer->current_writer().get());
- if (!sort_writer || !sort_writer->sorter()) {
+ auto current_writer = _writer->current_writer();
+ auto* sort_writer =
dynamic_cast<VIcebergSortWriter*>(current_writer.get());
+ if (!sort_writer) {
return 0;
}
- return sort_writer->sorter()->data_size();
+ return sort_writer->data_size();
}
Status SpillIcebergTableSinkLocalState::revoke_memory(RuntimeState* state) {
- if (!_writer || !_writer->current_writer()) {
+ if (!_writer) {
return Status::OK();
}
-
- auto* sort_writer =
dynamic_cast<VIcebergSortWriter*>(_writer->current_writer().get());
-
- if (!sort_writer || !sort_writer->sorter()) {
+ auto current_writer = _writer->current_writer();
+ auto* sort_writer =
dynamic_cast<VIcebergSortWriter*>(current_writer.get());
+ if (!sort_writer) {
return Status::OK();
}
- auto exception_catch_func = [sort_writer]() {
+ auto exception_catch_func = [current_writer, sort_writer]() {
auto status = [&]() {
RETURN_IF_CATCH_EXCEPTION({ return sort_writer->trigger_spill();
});
}();
@@ -173,4 +172,4 @@ void
SpillIcebergTableSinkLocalState::_init_spill_counters() {
ADD_COUNTER_WITH_LEVEL(profile, "SpillWriteFileCurrentCount", TUnit::UNIT,
1);
}
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.cpp
b/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.cpp
index 3d36e1f10eb..3827cb6d925 100644
--- a/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.cpp
+++ b/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.cpp
@@ -55,6 +55,8 @@ Status VIcebergSortWriter::open(RuntimeState* state,
RuntimeProfile* profile,
}
Status VIcebergSortWriter::write(Block& block) {
+ std::lock_guard<std::mutex> lock(_sorter_mutex);
+
// Append incoming block data to the sorter's internal buffer
RETURN_IF_ERROR(_sorter->append_block(&block));
_update_spill_block_batch_row_count(block);
@@ -72,7 +74,34 @@ Status VIcebergSortWriter::write(Block& block) {
return Status::OK();
}
+size_t VIcebergSortWriter::data_size() const {
+ std::lock_guard<std::mutex> lock(_sorter_mutex);
+ return _sorter == nullptr ? 0 : _sorter->data_size();
+}
+
+size_t VIcebergSortWriter::get_reserve_mem_size(RuntimeState* state, bool eos)
const {
+ std::lock_guard<std::mutex> lock(_sorter_mutex);
+ return _sorter == nullptr ? 0 : _sorter->get_reserve_mem_size(state, eos);
+}
+
+Status VIcebergSortWriter::trigger_spill() {
+ std::lock_guard<std::mutex> lock(_sorter_mutex);
+ if (_closed || _sorter == nullptr) {
+ return Status::OK();
+ }
+ return _do_spill();
+}
+
Status VIcebergSortWriter::close(const Status& status) {
+ std::lock_guard<std::mutex> lock(_sorter_mutex);
+ if (_closed) {
+ return Status::OK();
+ }
+ Defer mark_closed {[&]() { _closed = true; }};
+ return _close_locked(status);
+}
+
+Status VIcebergSortWriter::_close_locked(const Status& status) {
// Track the actual internal status of operations performed during close.
// This is important because if intermediate operations (like do_sort())
fail,
// we need to propagate the actual error status to the underlying
partition writer's
diff --git a/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.h
b/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.h
index 45858f473c9..e1e512f0a0c 100644
--- a/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.h
+++ b/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.h
@@ -18,7 +18,7 @@
#pragma once
#include <cstdint>
-#include <limits>
+#include <mutex>
#include <utility>
#include <vector>
@@ -101,12 +101,12 @@ public:
inline size_t written_len() const override { return
_iceberg_partition_writer->written_len(); }
- // Returns a raw pointer to the FullSorter, used by
SpillIcebergTableSinkLocalState
- // to query memory usage (data_size, get_reserve_mem_size)
- auto sorter() const { return _sorter.get(); }
+ size_t data_size() const;
+
+ size_t get_reserve_mem_size(RuntimeState* state, bool eos) const;
// Called by the memory management system to trigger spilling data to disk
- Status trigger_spill() { return _do_spill(); }
+ Status trigger_spill();
private:
// Calculate average row size from the first non-empty block to determine
@@ -129,6 +129,8 @@ private:
// Explicitly calls do_sort() before prepare_for_read() to guarantee
sorted output.
Status _do_spill();
+ Status _close_locked(const Status& status);
+
// Merge all spilled streams and output final sorted data to Parquet/ORC
files.
// Handles file splitting when output exceeds target file size.
Status _combine_files_output();
@@ -168,6 +170,17 @@ private:
std::unique_ptr<FullSorter> _sorter;
std::unique_ptr<VSortedRunMerger> _merger;
+ // Serialize all accesses to _sorter because async writes and revoke
spills run on
+ // different thread pools but touch the same FullSorter instance.
+ mutable std::mutex _sorter_mutex;
+
+ // Set to true once close() has finished tearing down the sorter /
underlying writer.
+ // Late-arriving revoke spills (which run on a different thread than the
async writer)
+ // must become no-ops after close, otherwise they would write to a fresh
spill stream
+ // whose data never gets merged out (close has already produced the final
output and
+ // cleaned up spill files).
+ bool _closed = false;
+
// Queue of spill files waiting to be merged (FIFO order)
std::deque<SpillFileSPtr> _sorted_spill_files;
// Files currently being consumed by the merger
@@ -185,4 +198,4 @@ private:
RuntimeProfile::Counter* _do_spill_count_counter = nullptr;
};
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/exec/sink/writer/iceberg/viceberg_table_writer.cpp
b/be/src/exec/sink/writer/iceberg/viceberg_table_writer.cpp
index 6a67b6ca8bd..5584491e99c 100644
--- a/be/src/exec/sink/writer/iceberg/viceberg_table_writer.cpp
+++ b/be/src/exec/sink/writer/iceberg/viceberg_table_writer.cpp
@@ -283,7 +283,7 @@ Status VIcebergTableWriter::_write_prepared_block(Block&
output_block) {
SCOPED_RAW_TIMER(&_partition_writers_write_ns);
output_block.erase(_non_write_columns_indices);
RETURN_IF_ERROR(writer->write(output_block));
- _current_writer = writer;
+ _current_writer.store(writer);
return Status::OK();
}
@@ -326,7 +326,7 @@ Status VIcebergTableWriter::_write_prepared_block(Block&
output_block) {
SCOPED_RAW_TIMER(&_partition_writers_write_ns);
output_block.erase(_non_write_columns_indices);
RETURN_IF_ERROR(writer->write(output_block));
- _current_writer = writer;
+ _current_writer.store(writer);
return Status::OK();
}
@@ -429,7 +429,7 @@ Status VIcebergTableWriter::_write_prepared_block(Block&
output_block) {
Block filtered_block;
RETURN_IF_ERROR(_filter_block(output_block, &it->second,
&filtered_block));
RETURN_IF_ERROR(it->first->write(filtered_block));
- _current_writer = it->first;
+ _current_writer.store(it->first);
}
return Status::OK();
}
diff --git a/be/src/exec/sink/writer/iceberg/viceberg_table_writer.h
b/be/src/exec/sink/writer/iceberg/viceberg_table_writer.h
index f94ce4feb6b..cc7cec1fdad 100644
--- a/be/src/exec/sink/writer/iceberg/viceberg_table_writer.h
+++ b/be/src/exec/sink/writer/iceberg/viceberg_table_writer.h
@@ -19,6 +19,7 @@
#include <gen_cpp/DataSinks_types.h>
+#include "common/atomic_shared_ptr.h"
#include "core/block/block.h"
#include "core/column/column.h"
#include "exec/sink/writer/async_result_writer.h"
@@ -66,12 +67,17 @@ public:
// Getter for the current partition writer.
// Used by SpillIcebergTableSinkLocalState to access the current writer for
// memory management operations (get_reserve_mem_size, revocable_mem_size,
etc.).
- const std::shared_ptr<IPartitionWriterBase>& current_writer() const {
return _current_writer; }
+ // Returns a snapshot by value: the async writer thread updates
_current_writer
+ // concurrently with the spill/revoke path, so callers must hold their own
copy
+ // while operating on it instead of dereferencing the underlying member
directly.
+ std::shared_ptr<IPartitionWriterBase> current_writer() const { return
_current_writer.load(); }
private:
// The currently active partition writer (may be VIcebergPartitionWriter
or VIcebergSortWriter).
// Updated during write() to track which writer received the most recent
data.
- std::shared_ptr<IPartitionWriterBase> _current_writer;
+ // Wrapped in atomic_shared_ptr because revoke_memory /
get_revocable_mem_size run on
+ // a different thread than the async writer that assigns to it.
+ doris::atomic_shared_ptr<IPartitionWriterBase> _current_writer;
class IcebergPartitionColumn {
public:
IcebergPartitionColumn(const iceberg::PartitionField& field,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]